9#ifndef DMLC_IO_CACHED_INPUT_SPLIT_H_
10#define DMLC_IO_CACHED_INPUT_SPLIT_H_
15#if DMLC_ENABLE_STD_THREAD
28class CachedInputSplit :
public InputSplit {
36 CachedInputSplit(InputSplitBase *base,
37 const char *cache_file,
38 bool reuse_exist_cache =
true)
39 : buffer_size_(InputSplitBase::kBufferSize),
40 cache_file_(cache_file),
42 base_(base), tmp_chunk_(NULL),
44 if (reuse_exist_cache) {
45 if (!this->InitCachedIter()) {
46 this->InitPreprocIter();
49 this->InitPreprocIter();
53 virtual ~CachedInputSplit(
void) {
58 iter_cached_.Destroy();
63 virtual void BeforeFirst(
void) {
66 if (iter_preproc_ != NULL) {
67 if (tmp_chunk_ != NULL) {
68 iter_preproc_->Recycle(&tmp_chunk_);
70 while (iter_preproc_->Next(&tmp_chunk_)) {
71 iter_preproc_->Recycle(&tmp_chunk_);
78 CHECK(this->InitCachedIter())
79 <<
"Failed to initialize CachedIter";
81 iter_cached_.BeforeFirst();
83 if (tmp_chunk_ != NULL) {
84 iter_cached_.Recycle(&tmp_chunk_);
87 virtual void ResetPartition(
unsigned ,
unsigned ) {
88 LOG(FATAL) <<
"ResetPartition is not supported in CachedInputSplit";
90 virtual void HintChunkSize(
size_t chunk_size) {
91 buffer_size_ = std::max(chunk_size /
sizeof(uint32_t), buffer_size_);
93 virtual size_t GetTotalSize(
void) {
94 return base_->GetTotalSize();
97 virtual bool NextRecord(Blob *out_rec) {
98 auto *iter = iter_preproc_ != NULL ? iter_preproc_ : &iter_cached_;
99 if (tmp_chunk_ == NULL) {
100 if (!iter->Next(&tmp_chunk_))
return false;
102 while (!base_->ExtractNextRecord(out_rec, tmp_chunk_)) {
103 iter->Recycle(&tmp_chunk_);
104 if (!iter->Next(&tmp_chunk_))
return false;
109 virtual bool NextChunk(Blob *out_chunk) {
110 auto *iter = iter_preproc_ != NULL ? iter_preproc_ : &iter_cached_;
111 if (tmp_chunk_ == NULL) {
112 if (!iter->Next(&tmp_chunk_))
return false;
114 while (!base_->ExtractNextChunk(out_chunk, tmp_chunk_)) {
115 iter->Recycle(&tmp_chunk_);
116 if (!iter->Next(&tmp_chunk_))
return false;
125 std::string cache_file_;
131 InputSplitBase *base_;
133 InputSplitBase::Chunk *tmp_chunk_;
135 ThreadedIter<InputSplitBase::Chunk> *iter_preproc_;
137 ThreadedIter<InputSplitBase::Chunk> iter_cached_;
139 inline void InitPreprocIter(
void);
145 inline bool InitCachedIter(
void);
148inline void CachedInputSplit:: InitPreprocIter(
void) {
150 iter_preproc_ =
new ThreadedIter<InputSplitBase::Chunk>();
151 iter_preproc_->set_max_capacity(16);
152 iter_preproc_->Init([
this](InputSplitBase::Chunk **dptr) {
154 *dptr =
new InputSplitBase::Chunk(buffer_size_);
157 if (!base_->NextChunkEx(p))
return false;
159 size_t size = p->end - p->begin;
160 fo_->Write(&size,
sizeof(size));
161 fo_->Write(p->begin, size);
166inline bool CachedInputSplit::InitCachedIter(
void) {
168 if (fi_ == NULL)
return false;
169 iter_cached_.Init([
this](InputSplitBase::Chunk **dptr) {
171 *dptr =
new InputSplitBase::Chunk(buffer_size_);
176 size_t nread = fi_->Read(&size,
sizeof(size));
177 if (nread == 0)
return false;
178 CHECK(nread ==
sizeof(size))
179 << cache_file_ <<
" has invalid cache file format";
180 p->data.resize(size /
sizeof(
size_t) + 1);
181 p->begin =
reinterpret_cast<char*
>(
BeginPtr(p->data));
182 p->end = p->begin + size;
183 CHECK(fi_->Read(p->begin, size) == size)
184 << cache_file_ <<
" has invalid cache file format";
187 [
this]() { fi_->Seek(0); });
interface of i/o stream that support seek
Definition io.h:109
static SeekStream * CreateForRead(const char *uri, bool allow_null=false)
generic factory function create an SeekStream for read only, the stream will close the underlying fil...
Definition io.cc:140
interface of stream I/O for serialization
Definition io.h:30
static Stream * Create(const char *uri, const char *const flag, bool allow_null=false)
generic factory function create an stream, the stream will close the underlying files upon deletion
Definition io.cc:132
defines configuration macros
namespace for dmlc
Definition array_view.h:12
T * BeginPtr(std::vector< T > &vec)
safely get the beginning address of a vector
Definition base.h:284
thread backed iterator that can be used to implement general thread-based pipeline such as prefetch a...