Medial Code Documentation
Loading...
Searching...
No Matches
cached_input_split.h
Go to the documentation of this file.
1
9#ifndef DMLC_IO_CACHED_INPUT_SPLIT_H_
10#define DMLC_IO_CACHED_INPUT_SPLIT_H_
11
12#include <dmlc/base.h>
13// this code depends on c++11
14
15#if DMLC_ENABLE_STD_THREAD
16#include <dmlc/threadediter.h>
17#include <string>
18#include <algorithm>
19#include "./input_split_base.h"
20
21namespace dmlc {
22namespace io {
28class CachedInputSplit : public InputSplit {
29 public:
36 CachedInputSplit(InputSplitBase *base,
37 const char *cache_file,
38 bool reuse_exist_cache = true)
39 : buffer_size_(InputSplitBase::kBufferSize),
40 cache_file_(cache_file),
41 fo_(NULL), fi_(NULL),
42 base_(base), tmp_chunk_(NULL),
43 iter_preproc_(NULL) {
44 if (reuse_exist_cache) {
45 if (!this->InitCachedIter()) {
46 this->InitPreprocIter();
47 }
48 } else {
49 this->InitPreprocIter();
50 }
51 }
52 // destructor
53 virtual ~CachedInputSplit(void) {
54 // NOTE delete can handle NULL ptr
55 // deletion order matters
56 delete iter_preproc_;
57 delete fo_;
58 iter_cached_.Destroy();
59 delete tmp_chunk_;
60 delete base_;
61 delete fi_;
62 }
63 virtual void BeforeFirst(void) {
64 // if preprocessing did not end
65 // pull data from preprocessing module
66 if (iter_preproc_ != NULL) {
67 if (tmp_chunk_ != NULL) {
68 iter_preproc_->Recycle(&tmp_chunk_);
69 }
70 while (iter_preproc_->Next(&tmp_chunk_)) {
71 iter_preproc_->Recycle(&tmp_chunk_);
72 }
73 // finalize the push out process
74 delete iter_preproc_;
75 delete fo_;
76 iter_preproc_ = NULL;
77 fo_ = NULL;
78 CHECK(this->InitCachedIter())
79 << "Failed to initialize CachedIter";
80 } else {
81 iter_cached_.BeforeFirst();
82 }
83 if (tmp_chunk_ != NULL) {
84 iter_cached_.Recycle(&tmp_chunk_);
85 }
86 }
87 virtual void ResetPartition(unsigned /*part_index*/, unsigned /*num_parts*/) {
88 LOG(FATAL) << "ResetPartition is not supported in CachedInputSplit";
89 }
90 virtual void HintChunkSize(size_t chunk_size) {
91 buffer_size_ = std::max(chunk_size / sizeof(uint32_t), buffer_size_);
92 }
93 virtual size_t GetTotalSize(void) {
94 return base_->GetTotalSize();
95 }
96 // implement next record
97 virtual bool NextRecord(Blob *out_rec) {
98 auto *iter = iter_preproc_ != NULL ? iter_preproc_ : &iter_cached_;
99 if (tmp_chunk_ == NULL) {
100 if (!iter->Next(&tmp_chunk_)) return false;
101 }
102 while (!base_->ExtractNextRecord(out_rec, tmp_chunk_)) {
103 iter->Recycle(&tmp_chunk_);
104 if (!iter->Next(&tmp_chunk_)) return false;
105 }
106 return true;
107 }
108 // implement next chunk
109 virtual bool NextChunk(Blob *out_chunk) {
110 auto *iter = iter_preproc_ != NULL ? iter_preproc_ : &iter_cached_;
111 if (tmp_chunk_ == NULL) {
112 if (!iter->Next(&tmp_chunk_)) return false;
113 }
114 while (!base_->ExtractNextChunk(out_chunk, tmp_chunk_)) {
115 iter->Recycle(&tmp_chunk_);
116 if (!iter->Next(&tmp_chunk_)) return false;
117 }
118 return true;
119 }
120
121 private:
123 size_t buffer_size_;
125 std::string cache_file_;
127 dmlc::Stream *fo_;
129 dmlc::SeekStream *fi_;
131 InputSplitBase *base_;
133 InputSplitBase::Chunk *tmp_chunk_;
135 ThreadedIter<InputSplitBase::Chunk> *iter_preproc_;
137 ThreadedIter<InputSplitBase::Chunk> iter_cached_;
139 inline void InitPreprocIter(void);
145 inline bool InitCachedIter(void);
146};
147
148inline void CachedInputSplit:: InitPreprocIter(void) {
149 fo_ = dmlc::Stream::Create(cache_file_.c_str(), "w");
150 iter_preproc_ = new ThreadedIter<InputSplitBase::Chunk>();
151 iter_preproc_->set_max_capacity(16);
152 iter_preproc_->Init([this](InputSplitBase::Chunk **dptr) {
153 if (*dptr == NULL) {
154 *dptr = new InputSplitBase::Chunk(buffer_size_);
155 }
156 auto *p = *dptr;
157 if (!base_->NextChunkEx(p)) return false;
158 // after loading, save to disk
159 size_t size = p->end - p->begin;
160 fo_->Write(&size, sizeof(size));
161 fo_->Write(p->begin, size);
162 return true;
163 });
164}
165
166inline bool CachedInputSplit::InitCachedIter(void) {
167 fi_ = dmlc::SeekStream::CreateForRead(cache_file_.c_str(), true);
168 if (fi_ == NULL) return false;
169 iter_cached_.Init([this](InputSplitBase::Chunk **dptr) {
170 if (*dptr == NULL) {
171 *dptr = new InputSplitBase::Chunk(buffer_size_);
172 }
173 auto *p = *dptr;
174 // read data from cache file
175 size_t size;
176 size_t nread = fi_->Read(&size, sizeof(size));
177 if (nread == 0) return false;
178 CHECK(nread == sizeof(size))
179 << cache_file_ << " has invalid cache file format";
180 p->data.resize(size / sizeof(size_t) + 1);
181 p->begin = reinterpret_cast<char*>(BeginPtr(p->data));
182 p->end = p->begin + size;
183 CHECK(fi_->Read(p->begin, size) == size)
184 << cache_file_ << " has invalid cache file format";
185 return true;
186 },
187 [this]() { fi_->Seek(0); });
188 return true;
189}
190} // namespace io
191} // namespace dmlc
192#endif // DMLC_USE_CXX11
193#endif // DMLC_IO_CACHED_INPUT_SPLIT_H_
interface of i/o stream that support seek
Definition io.h:109
static SeekStream * CreateForRead(const char *uri, bool allow_null=false)
generic factory function create an SeekStream for read only, the stream will close the underlying fil...
Definition io.cc:140
interface of stream I/O for serialization
Definition io.h:30
static Stream * Create(const char *uri, const char *const flag, bool allow_null=false)
generic factory function create an stream, the stream will close the underlying files upon deletion
Definition io.cc:132
defines configuration macros
base class to construct input split from multiple files
namespace for dmlc
Definition array_view.h:12
T * BeginPtr(std::vector< T > &vec)
safely get the beginning address of a vector
Definition base.h:284
thread backed iterator that can be used to implement general thread-based pipeline such as prefetch a...