7#ifndef DMLC_IO_THREADED_INPUT_SPLIT_H_
8#define DMLC_IO_THREADED_INPUT_SPLIT_H_
12#if DMLC_ENABLE_STD_THREAD
23class ThreadedInputSplit :
public InputSplit {
29 explicit ThreadedInputSplit(InputSplitBase *base,
const size_t batch_size)
30 : buffer_size_(InputSplitBase::kBufferSize),
31 batch_size_(batch_size),
32 base_(base), tmp_chunk_(NULL) {
33 iter_.set_max_capacity(2);
35 iter_.Init([
this](InputSplitBase::Chunk **dptr) {
37 *dptr =
new InputSplitBase::Chunk(buffer_size_);
39 return base_->NextBatchEx(*dptr, batch_size_);
41 [base]() { base->BeforeFirst(); });
44 virtual ~ThreadedInputSplit(
void) {
49 virtual void BeforeFirst() {
51 if (tmp_chunk_ != NULL) {
52 iter_.Recycle(&tmp_chunk_);
55 virtual void HintChunkSize(
size_t chunk_size) {
56 buffer_size_ = std::max(chunk_size /
sizeof(uint32_t), buffer_size_);
59 virtual bool NextRecord(Blob *out_rec) {
60 if (tmp_chunk_ == NULL) {
61 if (!iter_.Next(&tmp_chunk_))
return false;
63 while (!base_->ExtractNextRecord(out_rec, tmp_chunk_)) {
64 iter_.Recycle(&tmp_chunk_);
65 if (!iter_.Next(&tmp_chunk_))
return false;
70 virtual bool NextChunk(Blob *out_chunk) {
71 if (tmp_chunk_ == NULL) {
72 if (!iter_.Next(&tmp_chunk_))
return false;
74 while (!base_->ExtractNextChunk(out_chunk, tmp_chunk_)) {
75 iter_.Recycle(&tmp_chunk_);
76 if (!iter_.Next(&tmp_chunk_))
return false;
81 virtual size_t GetTotalSize(
void) {
82 return base_->GetTotalSize();
85 virtual void ResetPartition(
unsigned part_index,
unsigned num_parts) {
86 base_->ResetPartition(part_index, num_parts);
96 InputSplitBase *base_;
98 ThreadedIter<InputSplitBase::Chunk> iter_;
100 InputSplitBase::Chunk *tmp_chunk_;
defines configuration macros
namespace for dmlc
Definition array_view.h:12
thread backed iterator that can be used to implement general thread-based pipeline such as prefetch a...