Medial Code Documentation
Loading...
Searching...
No Matches
parquet_parser.h
Go to the documentation of this file.
1
7#ifndef DMLC_DATA_PARQUET_PARSER_H_
8#define DMLC_DATA_PARQUET_PARSER_H_
9
10#include <dmlc/data.h>
11#include <dmlc/strtonum.h>
12#include <dmlc/parameter.h>
13#include <cmath>
14#include <cstring>
15#include <map>
16#include <string>
17#include <limits>
18#include <future>
19#include <algorithm>
20#include <memory>
21#include <vector>
22#include "../data/row_block.h"
23#include "../data/parser.h"
24#include "arrow/io/api.h"
25#include "parquet/api/reader.h"
26
27
28namespace dmlc {
29namespace data {
30
31struct ParquetParserParam : public Parameter<ParquetParserParam> {
32 std::string format;
33 int label_column;
34 int weight_column;
35 int nthreads;
36
37 DMLC_DECLARE_PARAMETER(ParquetParserParam) {
38 DMLC_DECLARE_FIELD(format).set_default("parquet")
39 .describe("File format.");
40 DMLC_DECLARE_FIELD(label_column).set_default(0)
41 .describe("Column index (0-based) that will put into label.");
42 DMLC_DECLARE_FIELD(weight_column).set_default(-1)
43 .describe("Column index that will put into instance weights.");
44 DMLC_DECLARE_FIELD(nthreads).set_default(1)
45 .describe("Column index that will put into instance weights.");
46 }
47};
48
49template <typename IndexType, typename DType = real_t>
50class ParquetParser : public ParserImpl<IndexType, DType> {
51 public:
52 ParquetParser(const std::string& filename,
53 const std::map<std::string, std::string>& args) : row_groups_read_(0) {
54 param_.Init(args);
55 nthread_ = param_.nthreads;
56 CHECK_EQ(param_.format, "parquet");
57
58 parquet_reader_ = parquet::ParquetFileReader::OpenFile(filename, false);
59 metadata_ = parquet_reader_->metadata();
60 num_rows_ = metadata_->num_rows();
61 num_cols_ = metadata_->num_columns();
62 num_row_groups_ = metadata_->num_row_groups();
63
64 have_next_ = (num_rows_ != 0);
65 }
66
72 virtual bool ParseNext(std::vector<RowBlockContainer<IndexType, DType> > *data);
73
74 protected:
75 virtual void ParseRowGroup(int row_group_id,
77
78 virtual size_t BytesRead(void) const {
79 return -1;
80 }
81
82 virtual void BeforeFirst(void) {}
83
84 private:
85 ParquetParserParam param_;
86 // handle for reading parquet files
87 std::unique_ptr<parquet::ParquetFileReader> parquet_reader_;
88 std::shared_ptr<parquet::FileMetaData> metadata_;
89 // number of rows having read
90 int num_rows_;
91 int num_cols_;
92 int num_row_groups_;
93 int row_groups_read_;
94 // whether we have reached end of parquet file
95 bool have_next_;
96 // number of threads; hardcoded 4 for now
97 int nthread_;
98};
99
100template <typename IndexType, typename DType>
103 if (!have_next_) {
104 parquet_reader_->Close();
105 return false;
106 }
107 std::vector<std::future<void>> futures;
108
109 int next_row_groups = std::min(nthread_, num_row_groups_ - row_groups_read_);
110 data->resize(next_row_groups);
111 futures.resize(next_row_groups);
112
113 for (int tid = 0; tid < next_row_groups; ++tid) {
114 int row_group_id = row_groups_read_ + tid;
115 futures[tid] = std::async(std::launch::async, [&, row_group_id, data, tid] {
116 ParseRowGroup(row_group_id, &(*data)[tid]);
117 });
118 }
119
120 for (int i = 0; i < next_row_groups; ++i) {
121 futures[i].wait();
122 }
123
124 row_groups_read_ += next_row_groups;
125 have_next_ = (row_groups_read_ < num_row_groups_);
126 return true;
127}
128
129template <typename IndexType, typename DType>
131ParseRowGroup(int row_group_id,
133 out->Clear();
134 DType v;
135
136 std::shared_ptr<parquet::RowGroupReader> row_group_reader
137 = parquet_reader_->RowGroup(row_group_id);
138 std::vector<std::shared_ptr<parquet::ColumnReader>> all_column_readers;
139 std::vector<parquet::FloatReader*> all_float_readers;
140
141 // get all the column readers; will iterate each column row-wise later
142 for (int i_col = 0; i_col < num_cols_; ++i_col) {
143 all_column_readers.push_back(row_group_reader->Column(i_col));
144 all_float_readers.push_back(
145 static_cast<parquet::FloatReader*>(all_column_readers[i_col].get()));
146 }
147
148 int num_rows_this_group = metadata_->RowGroup(row_group_id)->num_rows();
149 constexpr int chunk_size = 1;
150 int64_t values_read;
151
152 for (int i_row = 0; i_row < num_rows_this_group; i_row++) {
153 IndexType idx = 0;
154 DType label = DType(0.0f);
155 real_t weight = std::numeric_limits<real_t>::quiet_NaN();
156
157 for (int i_col = 0; i_col < num_cols_; i_col++) {
158 all_float_readers[i_col]->ReadBatch(chunk_size, nullptr, nullptr, &v, &values_read);
159 CHECK_EQ(values_read, chunk_size);
160 if (i_col == param_.label_column) {
161 label = v;
162 } else if (std::is_same<DType, real_t>::value
163 && i_col == param_.weight_column) {
164 weight = v;
165 } else {
166 out->value.push_back(v);
167 out->index.push_back(idx++);
168 }
169 }
170
171 out->label.push_back(label);
172 if (!std::isnan(weight)) {
173 out->weight.push_back(weight);
174 }
175 out->offset.push_back(out->index.size());
176 }
177 CHECK(out->label.size() + 1 == out->offset.size());
178 CHECK(out->weight.size() == 0 || out->weight.size() + 1 == out->offset.size());
179}
180
181} // namespace data
182} // namespace dmlc
183#endif // DMLC_DATA_PARQUET_PARSER_H_
Definition parquet_parser.h:50
virtual size_t BytesRead(void) const
Definition parquet_parser.h:78
virtual void BeforeFirst(void)
set before first of the item
Definition parquet_parser.h:82
virtual bool ParseNext(std::vector< RowBlockContainer< IndexType, DType > > *data)
read in next several blocks of data
Definition parquet_parser.h:102
base class for parser to parse data
Definition parser.h:24
defines common input data structure, and interface for handling the input data
Provide lightweight util to do parameter setup and checking.
namespace for dmlc
Definition array_view.h:12
float real_t
this defines the float point that will be used to store feature values
Definition data.h:26
A faster implementation of strtof and strtod.
Definition parquet_parser.h:31
dynamic data structure that holds a row block of data
Definition row_block.h:27