Medial Code Documentation
Loading...
Searching...
No Matches
disk_row_iter.h
1
8#ifndef DMLC_DATA_DISK_ROW_ITER_H_
9#define DMLC_DATA_DISK_ROW_ITER_H_
10
11#include <dmlc/io.h>
12#include <dmlc/logging.h>
13#include <dmlc/data.h>
14#include <dmlc/timer.h>
15#include <dmlc/threadediter.h>
16#include <algorithm>
17#include <string>
18#include "./row_block.h"
19#include "./libsvm_parser.h"
20
21#if DMLC_ENABLE_STD_THREAD
22namespace dmlc {
23namespace data {
28template<typename IndexType, typename DType = real_t>
29class DiskRowIter: public RowBlockIter<IndexType, DType> {
30 public:
31 // page size 64MB
32 static const size_t kPageSize = 64UL << 20UL;
38 explicit DiskRowIter(Parser<IndexType, DType> *parser,
39 const char *cache_file,
40 bool reuse_cache)
41 : cache_file_(cache_file), fi_(NULL) {
42 if (reuse_cache) {
43 if (!TryLoadCache()) {
44 this->BuildCache(parser);
45 CHECK(TryLoadCache())
46 << "failed to build cache file " << cache_file;
47 }
48 } else {
49 this->BuildCache(parser);
50 CHECK(TryLoadCache())
51 << "failed to build cache file " << cache_file;
52 }
53 delete parser;
54 }
55 virtual ~DiskRowIter(void) {
56 iter_.Destroy();
57 delete fi_;
58 }
59 virtual void BeforeFirst(void) {
60 iter_.BeforeFirst();
61 }
62 virtual bool Next(void) {
63 if (iter_.Next()) {
64 row_ = iter_.Value().GetBlock();
65 return true;
66 } else {
67 return false;
68 }
69 }
70 virtual const RowBlock<IndexType, DType> &Value(void) const {
71 return row_;
72 }
73 virtual size_t NumCol(void) const {
74 return num_col_;
75 }
76
77 private:
78 // file place
79 std::string cache_file_;
80 // input stream
81 SeekStream *fi_;
82 // maximum feature dimension
83 size_t num_col_;
84 // row block to store
85 RowBlock<IndexType, DType> row_;
86 // iterator
87 ThreadedIter<RowBlockContainer<IndexType, DType> > iter_;
88 // load disk cache file
89 inline bool TryLoadCache(void);
90 // build disk cache
91 inline void BuildCache(Parser<IndexType, DType> *parser);
92};
93
94// build disk cache
95template<typename IndexType, typename DType>
96inline bool DiskRowIter<IndexType, DType>::TryLoadCache(void) {
97 SeekStream *fi = SeekStream::CreateForRead(cache_file_.c_str(), true);
98 if (fi == NULL) return false;
99 this->fi_ = fi;
100 iter_.Init([fi](RowBlockContainer<IndexType, DType> **dptr) {
101 if (*dptr ==NULL) {
102 *dptr = new RowBlockContainer<IndexType, DType>();
103 }
104 return (*dptr)->Load(fi);
105 },
106 [fi]() { fi->Seek(0); });
107 return true;
108}
109
110template<typename IndexType, typename DType>
111inline void DiskRowIter<IndexType, DType>::
112BuildCache(Parser<IndexType, DType> *parser) {
113 Stream *fo = Stream::Create(cache_file_.c_str(), "w");
114 // back end data
115 RowBlockContainer<IndexType, DType> data;
116 num_col_ = 0;
117 double tstart = GetTime();
118 while (parser->Next()) {
119 data.Push(parser->Value());
120 double tdiff = GetTime() - tstart;
121 if (data.MemCostBytes() >= kPageSize) {
122 size_t bytes_read = parser->BytesRead();
123 bytes_read = bytes_read >> 20UL;
124 LOG(INFO) << bytes_read << "MB read,"
125 << bytes_read / tdiff << " MB/sec";
126 num_col_ = std::max(num_col_,
127 static_cast<size_t>(data.max_index) + 1);
128 data.Save(fo);
129 data.Clear();
130 }
131 }
132 if (data.Size() != 0) {
133 num_col_ = std::max(num_col_,
134 static_cast<size_t>(data.max_index) + 1);
135 data.Save(fo);
136 }
137 delete fo;
138 double tdiff = GetTime() - tstart;
139 LOG(INFO) << "finish reading at %g MB/sec"
140 << (parser->BytesRead() >> 20UL) / tdiff;
141}
142} // namespace data
143} // namespace dmlc
144#endif // DMLC_USE_CXX11
145#endif // DMLC_DATA_DISK_ROW_ITER_H_
static SeekStream * CreateForRead(const char *uri, bool allow_null=false)
generic factory function create an SeekStream for read only, the stream will close the underlying fil...
Definition io.cc:140
static Stream * Create(const char *uri, const char *const flag, bool allow_null=false)
generic factory function create an stream, the stream will close the underlying files upon deletion
Definition io.cc:132
defines common input data structure, and interface for handling the input data
defines serializable interface of dmlc
defines logging macros of dmlc allows use of GLOG, fall back to internal implementation when disabled
cross platform timer for timing
iterator parser to parse libsvm format
namespace for dmlc
Definition array_view.h:12
double GetTime(void)
return time in seconds
Definition timer.h:27
dmlc::SeekStream SeekStream
re-use definition of dmlc::SeekStream
Definition io.h:24
dmlc::Stream Stream
defines stream used in rabit see definition of Stream in dmlc/io.h
Definition rabit.h:27
additional data structure to support RowBlock data structure
thread backed iterator that can be used to implement general thread-based pipeline such as prefetch a...