Medial Code Documentation
Loading...
Searching...
No Matches
indexed_recordio_split.h
Go to the documentation of this file.
1
6#ifndef DMLC_IO_INDEXED_RECORDIO_SPLIT_H_
7#define DMLC_IO_INDEXED_RECORDIO_SPLIT_H_
8
9#include <dmlc/io.h>
10#include <dmlc/recordio.h>
11#include <vector>
12#include <cstdio>
13#include <string>
14#include <cstring>
15#include <utility>
16#include <random>
17#include "./input_split_base.h"
18
19namespace dmlc {
20namespace io {
21const unsigned INDEXED_RECORDIO_ALIGN = 4;
24 public:
25 IndexedRecordIOSplitter(FileSystem *fs,
26 const char *uri,
27 const char *index_uri,
28 unsigned rank,
29 unsigned nsplit,
30 const size_t batch_size,
31 const bool shuffle,
32 const int seed = 0) {
33 this->shuffle_ = shuffle;
34 if (shuffle) SetRandomSeed(seed);
35 this->batch_size_ = batch_size;
36 this->Init(fs, uri, INDEXED_RECORDIO_ALIGN);
37 this->ReadIndexFile(fs, index_uri);
38 this->ResetPartition(rank, nsplit);
39 }
40
41 bool IsTextParser(void) override {
42 return false;
43 }
44 bool ExtractNextRecord(Blob *out_rec, Chunk *chunk) override;
45 bool ReadChunk(void *buf, size_t *size) override;
46 bool NextChunk(Blob *out_chunk) override;
47 void BeforeFirst(void) override;
48 bool NextBatch(Blob *out_chunk, size_t n_records) override;
49 bool NextRecord(Blob *out_rec) override {
50 while (!ExtractNextRecord(out_rec, &tmp_chunk_)) {
51 if (!tmp_chunk_.Load(this, buffer_size_)) return false;
52 ++current_index_;
53 }
54 return true;
55 }
56 void SetRandomSeed(size_t seed) {
57 rnd_.seed(kRandMagic + seed);
58 }
59 void SetBatchSize(int batch_size) {
60 this->batch_size_ = batch_size;
61 }
62 bool NextChunkEx(Chunk *out_chunk) override {
63 return NextBatchEx(out_chunk, batch_size_);
64 }
65 bool NextBatchEx(Chunk *out_chunk, size_t n_records) override;
66
67 protected:
68 size_t SeekRecordBegin(Stream *fi) override;
69 const char*
70 FindLastRecordBegin(const char *begin, const char *end) override;
71 virtual void ReadIndexFile(FileSystem *fs, const std::string& index_uri);
72 void ResetPartition(unsigned rank, unsigned nsplit) override;
73
74 std::vector<std::pair<size_t, size_t> > index_;
75 std::vector<size_t> permutation_;
76 bool shuffle_;
77 size_t current_index_;
78 size_t index_begin_;
79 size_t index_end_;
80 size_t batch_size_;
81 size_t n_overflow_;
82 const int kRandMagic = 111;
83 std::mt19937 rnd_;
84};
85} // namespace io
86} // namespace dmlc
87#endif // DMLC_IO_INDEXED_RECORDIO_SPLIT_H_
interface of stream I/O for serialization
Definition io.h:30
class that splits the recordIO file by record
Definition indexed_recordio_split.h:23
size_t SeekRecordBegin(Stream *fi) override
seek to the beginning of the first record in current file pointer
Definition indexed_recordio_split.cc:66
bool NextBatchEx(Chunk *out_chunk, size_t n_records) override
fill the given chunk with new batch of data without using internal temporary chunk
Definition indexed_recordio_split.cc:159
bool NextRecord(Blob *out_rec) override
get the next record, the returning value is valid until next call to NextRecord, NextChunk or NextBat...
Definition indexed_recordio_split.h:49
bool NextChunk(Blob *out_chunk) override
get a chunk of memory that can contain multiple records, the caller needs to parse the content of the...
Definition indexed_recordio_split.cc:155
const char * FindLastRecordBegin(const char *begin, const char *end) override
find the last occurance of record header
Definition indexed_recordio_split.cc:86
bool ReadChunk(void *buf, size_t *size) override
read a chunk of data into buf the data can span multiple records, but cannot contain partial records
Definition indexed_recordio_split.cc:144
bool ExtractNextRecord(Blob *out_rec, Chunk *chunk) override
extract next record from the chunk
Definition indexed_recordio_split.cc:104
void ResetPartition(unsigned rank, unsigned nsplit) override
reset the Input split to a certain part id, The InputSplit will be pointed to the head of the new spe...
Definition indexed_recordio_split.cc:12
void BeforeFirst(void) override
reset the position of InputSplit to beginning
Definition indexed_recordio_split.cc:221
bool NextBatch(Blob *out_chunk, size_t n_records) override
get a chunk of memory that can contain multiple records, with hint for how many records is needed,...
Definition indexed_recordio_split.cc:214
bool NextChunkEx(Chunk *out_chunk) override
fill the given chunk with new data without using internal temporary chunk
Definition indexed_recordio_split.h:62
bool IsTextParser(void) override
query whether this object is a text parser
Definition indexed_recordio_split.h:41
class to construct input split from multiple files
Definition input_split_base.h:21
void Init(FileSystem *fs, const char *uri, size_t align_bytes, const bool recurse_directories=false)
intialize the base before doing anything
Definition input_split_base.cc:13
size_t buffer_size_
buffer size
Definition input_split_base.h:141
Chunk tmp_chunk_
temporal chunk
Definition input_split_base.h:139
defines serializable interface of dmlc
base class to construct input split from multiple files
namespace for dmlc
Definition array_view.h:12
recordio that is able to pack binary data into a splittable format, useful to exchange data in binary...
a blob of memory region
Definition io.h:158
helper struct to hold chunk data with internal pointer to move along the record
Definition input_split_base.h:27