7#ifndef DMLC_DATA_PARQUET_PARSER_H_
8#define DMLC_DATA_PARQUET_PARSER_H_
22#include "../data/row_block.h"
23#include "../data/parser.h"
24#include "arrow/io/api.h"
25#include "parquet/api/reader.h"
38 DMLC_DECLARE_FIELD(format).set_default(
"parquet")
39 .describe(
"File format.");
40 DMLC_DECLARE_FIELD(label_column).set_default(0)
41 .describe(
"Column index (0-based) that will put into label.");
42 DMLC_DECLARE_FIELD(weight_column).set_default(-1)
43 .describe(
"Column index that will put into instance weights.");
44 DMLC_DECLARE_FIELD(nthreads).set_default(1)
45 .describe(
"Column index that will put into instance weights.");
49template <
typename IndexType,
typename DType = real_t>
53 const std::map<std::string, std::string>& args) : row_groups_read_(0) {
55 nthread_ = param_.nthreads;
56 CHECK_EQ(param_.format,
"parquet");
58 parquet_reader_ = parquet::ParquetFileReader::OpenFile(filename,
false);
59 metadata_ = parquet_reader_->metadata();
60 num_rows_ = metadata_->num_rows();
61 num_cols_ = metadata_->num_columns();
62 num_row_groups_ = metadata_->num_row_groups();
64 have_next_ = (num_rows_ != 0);
75 virtual void ParseRowGroup(
int row_group_id,
87 std::unique_ptr<parquet::ParquetFileReader> parquet_reader_;
88 std::shared_ptr<parquet::FileMetaData> metadata_;
100template <
typename IndexType,
typename DType>
104 parquet_reader_->Close();
107 std::vector<std::future<void>> futures;
109 int next_row_groups = std::min(nthread_, num_row_groups_ - row_groups_read_);
110 data->resize(next_row_groups);
111 futures.resize(next_row_groups);
113 for (
int tid = 0; tid < next_row_groups; ++tid) {
114 int row_group_id = row_groups_read_ + tid;
115 futures[tid] = std::async(std::launch::async, [&, row_group_id, data, tid] {
116 ParseRowGroup(row_group_id, &(*data)[tid]);
120 for (
int i = 0; i < next_row_groups; ++i) {
124 row_groups_read_ += next_row_groups;
125 have_next_ = (row_groups_read_ < num_row_groups_);
129template <
typename IndexType,
typename DType>
136 std::shared_ptr<parquet::RowGroupReader> row_group_reader
137 = parquet_reader_->RowGroup(row_group_id);
138 std::vector<std::shared_ptr<parquet::ColumnReader>> all_column_readers;
139 std::vector<parquet::FloatReader*> all_float_readers;
142 for (
int i_col = 0; i_col < num_cols_; ++i_col) {
143 all_column_readers.push_back(row_group_reader->Column(i_col));
144 all_float_readers.push_back(
145 static_cast<parquet::FloatReader*
>(all_column_readers[i_col].get()));
148 int num_rows_this_group = metadata_->RowGroup(row_group_id)->num_rows();
149 constexpr int chunk_size = 1;
152 for (
int i_row = 0; i_row < num_rows_this_group; i_row++) {
154 DType label = DType(0.0f);
155 real_t weight = std::numeric_limits<real_t>::quiet_NaN();
157 for (
int i_col = 0; i_col < num_cols_; i_col++) {
158 all_float_readers[i_col]->ReadBatch(chunk_size,
nullptr,
nullptr, &v, &values_read);
159 CHECK_EQ(values_read, chunk_size);
160 if (i_col == param_.label_column) {
162 }
else if (std::is_same<DType, real_t>::value
163 && i_col == param_.weight_column) {
166 out->value.push_back(v);
167 out->index.push_back(idx++);
171 out->label.push_back(label);
172 if (!std::isnan(weight)) {
173 out->weight.push_back(weight);
175 out->offset.push_back(out->index.size());
177 CHECK(out->label.size() + 1 == out->offset.size());
178 CHECK(out->weight.size() == 0 || out->weight.size() + 1 == out->offset.size());
Definition parquet_parser.h:50
virtual size_t BytesRead(void) const
Definition parquet_parser.h:78
virtual void BeforeFirst(void)
set before first of the item
Definition parquet_parser.h:82
virtual bool ParseNext(std::vector< RowBlockContainer< IndexType, DType > > *data)
read in next several blocks of data
Definition parquet_parser.h:102
base class for parser to parse data
Definition parser.h:24
defines common input data structure, and interface for handling the input data
Provide lightweight util to do parameter setup and checking.
namespace for dmlc
Definition array_view.h:12
float real_t
this defines the float point that will be used to store feature values
Definition data.h:26
A faster implementation of strtof and strtod.
Definition parquet_parser.h:31
dynamic data structure that holds a row block of data
Definition row_block.h:27