8#ifndef DMLC_DATA_DISK_ROW_ITER_H_
9#define DMLC_DATA_DISK_ROW_ITER_H_
21#if DMLC_ENABLE_STD_THREAD
28template<
typename IndexType,
typename DType = real_t>
29class DiskRowIter:
public RowBlockIter<IndexType, DType> {
32 static const size_t kPageSize = 64UL << 20UL;
38 explicit DiskRowIter(Parser<IndexType, DType> *parser,
39 const char *cache_file,
41 : cache_file_(cache_file), fi_(NULL) {
43 if (!TryLoadCache()) {
44 this->BuildCache(parser);
46 <<
"failed to build cache file " << cache_file;
49 this->BuildCache(parser);
51 <<
"failed to build cache file " << cache_file;
55 virtual ~DiskRowIter(
void) {
59 virtual void BeforeFirst(
void) {
62 virtual bool Next(
void) {
64 row_ = iter_.Value().GetBlock();
70 virtual const RowBlock<IndexType, DType> &Value(
void)
const {
73 virtual size_t NumCol(
void)
const {
79 std::string cache_file_;
85 RowBlock<IndexType, DType> row_;
87 ThreadedIter<RowBlockContainer<IndexType, DType> > iter_;
89 inline bool TryLoadCache(
void);
91 inline void BuildCache(Parser<IndexType, DType> *parser);
95template<
typename IndexType,
typename DType>
96inline bool DiskRowIter<IndexType, DType>::TryLoadCache(
void) {
98 if (fi == NULL)
return false;
100 iter_.Init([fi](RowBlockContainer<IndexType, DType> **dptr) {
102 *dptr =
new RowBlockContainer<IndexType, DType>();
104 return (*dptr)->Load(fi);
106 [fi]() { fi->Seek(0); });
110template<
typename IndexType,
typename DType>
111inline void DiskRowIter<IndexType, DType>::
112BuildCache(Parser<IndexType, DType> *parser) {
115 RowBlockContainer<IndexType, DType> data;
118 while (parser->Next()) {
119 data.Push(parser->Value());
120 double tdiff =
GetTime() - tstart;
121 if (data.MemCostBytes() >= kPageSize) {
122 size_t bytes_read = parser->BytesRead();
123 bytes_read = bytes_read >> 20UL;
124 LOG(INFO) << bytes_read <<
"MB read,"
125 << bytes_read / tdiff <<
" MB/sec";
126 num_col_ = std::max(num_col_,
127 static_cast<size_t>(data.max_index) + 1);
132 if (data.Size() != 0) {
133 num_col_ = std::max(num_col_,
134 static_cast<size_t>(data.max_index) + 1);
138 double tdiff =
GetTime() - tstart;
139 LOG(INFO) <<
"finish reading at %g MB/sec"
140 << (parser->BytesRead() >> 20UL) / tdiff;
static SeekStream * CreateForRead(const char *uri, bool allow_null=false)
generic factory function create an SeekStream for read only, the stream will close the underlying fil...
Definition io.cc:140
static Stream * Create(const char *uri, const char *const flag, bool allow_null=false)
generic factory function create an stream, the stream will close the underlying files upon deletion
Definition io.cc:132
defines common input data structure, and interface for handling the input data
defines serializable interface of dmlc
defines logging macros of dmlc allows use of GLOG, fall back to internal implementation when disabled
cross platform timer for timing
iterator parser to parse libsvm format
namespace for dmlc
Definition array_view.h:12
double GetTime(void)
return time in seconds
Definition timer.h:27
dmlc::SeekStream SeekStream
re-use definition of dmlc::SeekStream
Definition io.h:24
dmlc::Stream Stream
defines stream used in rabit see definition of Stream in dmlc/io.h
Definition rabit.h:27
additional data structure to support RowBlock data structure
thread backed iterator that can be used to implement general thread-based pipeline such as prefetch a...