13 size_t ntotal = index_.size();
15 size_t nstep = (ntotal + nsplit - 1) / nsplit;
16 if (rank * nstep >= ntotal)
return;
17 index_begin_ = rank * nstep;
19 if ((rank + 1) * nstep < ntotal) {
20 index_end_ = (rank + 1) * nstep;
24 index_end_ = index_.size();
38 current_index_ = index_begin_;
43void IndexedRecordIOSplitter::ReadIndexFile(FileSystem *fs,
const std::string& index_uri) {
44 std::vector<URI> expanded_list = this->
ConvertToURIs(index_uri);
45 CHECK_EQ(expanded_list.size(), 1ul)
46 <<
"IndexedRecordIOSplitter does not support multiple index files";
47 for (
size_t i = 0; i < expanded_list.size(); ++i) {
48 const URI& path = expanded_list[i];
49 std::unique_ptr<dmlc::Stream> file_stream(fs->Open(path,
"r",
true));
51 std::vector<size_t> temp;
53 while (index_file >> index >> offset) {
54 temp.push_back(offset);
56 std::sort(temp.begin(), temp.end());
57 for (
size_t j = 0; j < temp.size() - 1; ++j) {
58 index_.push_back(std::make_pair(temp[j], temp[j + 1] - temp[j]));
60 index_.push_back(std::make_pair(temp.back(),
file_offset_.back() - temp.back()));
70 if (fi->Read(&v,
sizeof(v)) == 0)
return nstep;
73 CHECK(fi->Read(&lrec,
sizeof(lrec)) != 0)
74 <<
"invalid record io format";
75 nstep +=
sizeof(lrec);
77 if (cflag == 0 || cflag == 1)
break;
81 return nstep - 2 *
sizeof(uint32_t);
88 CHECK_EQ((
reinterpret_cast<size_t>(begin) & 3UL), 0U);
89 CHECK_EQ((
reinterpret_cast<size_t>(end) & 3UL), 0U);
90 const uint32_t *pbegin =
reinterpret_cast<const uint32_t *
>(begin);
91 const uint32_t *p =
reinterpret_cast<const uint32_t *
>(end);
92 CHECK(p >= pbegin + 2);
93 for (p = p - 2; p != pbegin; --p) {
96 if (cflag == 0 || cflag == 1) {
97 return reinterpret_cast<const char*
>(p);
105 if (chunk->begin == chunk->end)
return false;
106 CHECK(chunk->begin + 2 *
sizeof(uint32_t) <= chunk->end)
107 <<
"Invalid RecordIO Format";
108 CHECK_EQ((
reinterpret_cast<size_t>(chunk->begin) & 3UL), 0U);
109 CHECK_EQ((
reinterpret_cast<size_t>(chunk->end) & 3UL), 0U);
110 uint32_t *p =
reinterpret_cast<uint32_t *
>(chunk->begin);
114 out_rec->
dptr = chunk->begin + 2 *
sizeof(uint32_t);
116 chunk->begin += 2 *
sizeof(uint32_t) + (((clen + 3U) >> 2U) << 2U);
117 CHECK(chunk->begin <= chunk->end) <<
"Invalid RecordIO Format";
118 out_rec->
size = clen;
119 if (cflag == 0)
return true;
122 CHECK(cflag == 1U) <<
"Invalid RecordIO Format";
123 while (cflag != 3U) {
124 CHECK(chunk->begin + 2 *
sizeof(uint32_t) <= chunk->end);
125 p =
reinterpret_cast<uint32_t *
>(chunk->begin);
130 std::memcpy(
reinterpret_cast<char*
>(out_rec->
dptr) + out_rec->
size,
131 &kMagic,
sizeof(kMagic));
132 out_rec->
size +=
sizeof(kMagic);
135 std::memmove(
reinterpret_cast<char*
>(out_rec->
dptr) + out_rec->
size,
136 chunk->begin + 2 *
sizeof(uint32_t), clen);
137 out_rec->
size += clen;
139 chunk->begin += 2 *
sizeof(uint32_t) + (((clen + 3U) >> 2U) << 2U);
145 size_t max_size = *size;
146 size_t nread = this->
Read(
reinterpret_cast<char*
>(buf),
148 if (nread == 0)
return false;
149 if (nread != max_size) {
156 return this->
NextBatch(out_chunk, batch_size_);
163 size_t n = n_overflow_ == 0?n_records:n_overflow_;
165 if (current_index_ < permutation_.size()) {
166 offset_curr_ = index_[permutation_[current_index_]].first;
167 buffer_size_ = index_[permutation_[current_index_]].second/
sizeof(uint32_t);
168 size_t new_file_ptr = std::upper_bound(
file_offset_.begin(),
193 n_overflow_ = n - n_read;
200 if (n_overflow_ == 0) {
201 last = std::min(current_index_ + n_records, index_end_);
202 n_overflow_ = current_index_ + n_records - last;
204 last = std::min(current_index_ + n_overflow_, index_end_);
205 n_overflow_ = current_index_ + n_overflow_ - last;
207 buffer_size_ = (index_[last].first - index_[current_index_].first)/INDEXED_RECORDIO_ALIGN;
208 current_index_ = last;
223 permutation_.clear();
224 for (
size_t i = index_begin_; i < index_end_; ++i) {
225 permutation_.push_back(i);
227 std::shuffle(permutation_.begin(), permutation_.end(), rnd_);
230 current_index_ = index_begin_;
static const uint32_t kMagic
magic number of recordio note: (kMagic >> 29U) & 7 > 3 this ensures lrec will not be kMagic
Definition recordio.h:45
static uint32_t DecodeFlag(uint32_t rec)
decode the flag part of lrecord
Definition recordio.h:60
static uint32_t DecodeLength(uint32_t rec)
decode the length part of lrecord
Definition recordio.h:68
virtual void Seek(size_t pos)=0
seek to certain position of the file
interface of stream I/O for serialization
Definition io.h:30
size_t SeekRecordBegin(Stream *fi) override
seek to the beginning of the first record in current file pointer
Definition indexed_recordio_split.cc:66
bool NextBatchEx(Chunk *out_chunk, size_t n_records) override
fill the given chunk with new batch of data without using internal temporary chunk
Definition indexed_recordio_split.cc:159
bool NextChunk(Blob *out_chunk) override
get a chunk of memory that can contain multiple records, the caller needs to parse the content of the...
Definition indexed_recordio_split.cc:155
const char * FindLastRecordBegin(const char *begin, const char *end) override
find the last occurance of record header
Definition indexed_recordio_split.cc:86
bool ReadChunk(void *buf, size_t *size) override
read a chunk of data into buf the data can span multiple records, but cannot contain partial records
Definition indexed_recordio_split.cc:144
bool ExtractNextRecord(Blob *out_rec, Chunk *chunk) override
extract next record from the chunk
Definition indexed_recordio_split.cc:104
void ResetPartition(unsigned rank, unsigned nsplit) override
reset the Input split to a certain part id, The InputSplit will be pointed to the head of the new spe...
Definition indexed_recordio_split.cc:12
void BeforeFirst(void) override
reset the position of InputSplit to beginning
Definition indexed_recordio_split.cc:221
bool NextBatch(Blob *out_chunk, size_t n_records) override
get a chunk of memory that can contain multiple records, with hint for how many records is needed,...
Definition indexed_recordio_split.cc:214
a std::istream class that can can wrap Stream objects, can use istream with that output to underlying...
Definition io.h:385
defines serializable interface of dmlc
defines logging macros of dmlc allows use of GLOG, fall back to internal implementation when disabled
input split that splits indexed recordio files
namespace for dmlc
Definition array_view.h:12
recordio that is able to pack binary data into a splittable format, useful to exchange data in binary...