Medial Code Documentation
Loading...
Searching...
No Matches
indexed_recordio_split.cc
1// Copyright by Contributors
2#include <dmlc/recordio.h>
3#include <dmlc/logging.h>
4#include <dmlc/io.h>
5#include <algorithm>
6#include <fstream>
8
9namespace dmlc {
10namespace io {
11
12void IndexedRecordIOSplitter::ResetPartition(unsigned rank, unsigned nsplit) {
13 size_t ntotal = index_.size();
14 size_t ntotalbytes = file_offset_.back();
15 size_t nstep = (ntotal + nsplit - 1) / nsplit;
16 if (rank * nstep >= ntotal) return;
17 index_begin_ = rank * nstep;
18 offset_begin_ = index_[index_begin_].first;
19 if ((rank + 1) * nstep < ntotal) {
20 index_end_ = (rank + 1) * nstep;
21 offset_end_ = index_[index_end_].first;
22 } else {
23 offset_end_ = ntotalbytes;
24 index_end_ = index_.size();
25 index_.push_back(std::make_pair(offset_end_, 0));
26 }
28 file_ptr_ = std::upper_bound(file_offset_.begin(),
29 file_offset_.end(),
30 offset_begin_) - file_offset_.begin() - 1;
31 file_ptr_end_ = std::upper_bound(file_offset_.begin(),
32 file_offset_.end(),
33 offset_end_) - file_offset_.begin() - 1;
34 if (fs_ != NULL) {
35 delete fs_; fs_ = NULL;
36 }
37 fs_ = filesys_->OpenForRead(files_[file_ptr_].path);
38 current_index_ = index_begin_;
39 n_overflow_ = 0;
40 this->BeforeFirst();
41}
42
43void IndexedRecordIOSplitter::ReadIndexFile(FileSystem *fs, const std::string& index_uri) {
44 std::vector<URI> expanded_list = this->ConvertToURIs(index_uri);
45 CHECK_EQ(expanded_list.size(), 1ul)
46 << "IndexedRecordIOSplitter does not support multiple index files";
47 for (size_t i = 0; i < expanded_list.size(); ++i) {
48 const URI& path = expanded_list[i];
49 std::unique_ptr<dmlc::Stream> file_stream(fs->Open(path, "r", true));
50 dmlc::istream index_file(file_stream.get());
51 std::vector<size_t> temp;
52 size_t index, offset;
53 while (index_file >> index >> offset) {
54 temp.push_back(offset);
55 }
56 std::sort(temp.begin(), temp.end());
57 for (size_t j = 0; j < temp.size() - 1; ++j) {
58 index_.push_back(std::make_pair(temp[j], temp[j + 1] - temp[j]));
59 }
60 index_.push_back(std::make_pair(temp.back(), file_offset_.back() - temp.back()));
61 }
62}
63
64// Inefficient, but not used anywhere and optimization
65// would require change of the API, so I leave it as is
67 size_t nstep = 0;
68 uint32_t v, lrec;
69 while (true) {
70 if (fi->Read(&v, sizeof(v)) == 0) return nstep;
71 nstep += sizeof(v);
72 if (v == RecordIOWriter::kMagic) {
73 CHECK(fi->Read(&lrec, sizeof(lrec)) != 0)
74 << "invalid record io format";
75 nstep += sizeof(lrec);
76 uint32_t cflag = RecordIOWriter::DecodeFlag(lrec);
77 if (cflag == 0 || cflag == 1) break;
78 }
79 }
80 // should point at head of record
81 return nstep - 2 * sizeof(uint32_t);
82}
83
84// Inefficient, but not used anywhere and optimization
85// would require change of the API, so I leave it as is
87 const char *end) {
88 CHECK_EQ((reinterpret_cast<size_t>(begin) & 3UL), 0U);
89 CHECK_EQ((reinterpret_cast<size_t>(end) & 3UL), 0U);
90 const uint32_t *pbegin = reinterpret_cast<const uint32_t *>(begin);
91 const uint32_t *p = reinterpret_cast<const uint32_t *>(end);
92 CHECK(p >= pbegin + 2);
93 for (p = p - 2; p != pbegin; --p) {
94 if (p[0] == RecordIOWriter::kMagic) {
95 uint32_t cflag = RecordIOWriter::DecodeFlag(p[1]);
96 if (cflag == 0 || cflag == 1) {
97 return reinterpret_cast<const char*>(p);
98 }
99 }
100 }
101 return begin;
102}
103
105 if (chunk->begin == chunk->end) return false;
106 CHECK(chunk->begin + 2 * sizeof(uint32_t) <= chunk->end)
107 << "Invalid RecordIO Format";
108 CHECK_EQ((reinterpret_cast<size_t>(chunk->begin) & 3UL), 0U);
109 CHECK_EQ((reinterpret_cast<size_t>(chunk->end) & 3UL), 0U);
110 uint32_t *p = reinterpret_cast<uint32_t *>(chunk->begin);
111 uint32_t cflag = RecordIOWriter::DecodeFlag(p[1]);
112 uint32_t clen = RecordIOWriter::DecodeLength(p[1]);
113 // skip header
114 out_rec->dptr = chunk->begin + 2 * sizeof(uint32_t);
115 // move pbegin
116 chunk->begin += 2 * sizeof(uint32_t) + (((clen + 3U) >> 2U) << 2U);
117 CHECK(chunk->begin <= chunk->end) << "Invalid RecordIO Format";
118 out_rec->size = clen;
119 if (cflag == 0) return true;
120 const uint32_t kMagic = RecordIOWriter::kMagic;
121 // abnormal path, move data around to make a full part
122 CHECK(cflag == 1U) << "Invalid RecordIO Format";
123 while (cflag != 3U) {
124 CHECK(chunk->begin + 2 * sizeof(uint32_t) <= chunk->end);
125 p = reinterpret_cast<uint32_t *>(chunk->begin);
126 CHECK(p[0] == RecordIOWriter::kMagic);
127 cflag = RecordIOWriter::DecodeFlag(p[1]);
128 clen = RecordIOWriter::DecodeLength(p[1]);
129 // pad kmagic in between
130 std::memcpy(reinterpret_cast<char*>(out_rec->dptr) + out_rec->size,
131 &kMagic, sizeof(kMagic));
132 out_rec->size += sizeof(kMagic);
133 // move the rest of the blobs
134 if (clen != 0) {
135 std::memmove(reinterpret_cast<char*>(out_rec->dptr) + out_rec->size,
136 chunk->begin + 2 * sizeof(uint32_t), clen);
137 out_rec->size += clen;
138 }
139 chunk->begin += 2 * sizeof(uint32_t) + (((clen + 3U) >> 2U) << 2U);
140 }
141 return true;
142}
143
144bool IndexedRecordIOSplitter::ReadChunk(void *buf, size_t *size) {
145 size_t max_size = *size;
146 size_t nread = this->Read(reinterpret_cast<char*>(buf),
147 max_size);
148 if (nread == 0) return false;
149 if (nread != max_size) {
150 *size = nread;
151 }
152 return true;
153}
154
156 return this->NextBatch(out_chunk, batch_size_);
157}
158
159bool IndexedRecordIOSplitter::NextBatchEx(Chunk *chunk, size_t n_records) {
160 if (shuffle_) {
161 bool ret = true;
162 size_t n_read = 0;
163 size_t n = n_overflow_ == 0?n_records:n_overflow_;
164 while (n_read < n) {
165 if (current_index_ < permutation_.size()) {
166 offset_curr_ = index_[permutation_[current_index_]].first;
167 buffer_size_ = index_[permutation_[current_index_]].second/sizeof(uint32_t);
168 size_t new_file_ptr = std::upper_bound(file_offset_.begin(),
169 file_offset_.end(),
170 offset_curr_) - file_offset_.begin() - 1;
171 if (new_file_ptr != file_ptr_) {
172 delete fs_;
173 file_ptr_ = new_file_ptr;
174 fs_ = filesys_->OpenForRead(files_[file_ptr_].path);
175 }
177 if (n_read == 0) {
178 ret = ret && chunk->Load(this, buffer_size_);
179 } else {
180 ret = ret && chunk->Append(this, buffer_size_);
181 }
182 if (ret) {
183 ++n_read;
184 ++current_index_;
185 } else {
186 break;
187 }
188 } else {
189 break;
190 }
191 }
192 if (n_read > 0) {
193 n_overflow_ = n - n_read;
194 return true;
195 } else {
196 return false;
197 }
198 } else {
199 size_t last;
200 if (n_overflow_ == 0) {
201 last = std::min(current_index_ + n_records, index_end_);
202 n_overflow_ = current_index_ + n_records - last;
203 } else {
204 last = std::min(current_index_ + n_overflow_, index_end_);
205 n_overflow_ = current_index_ + n_overflow_ - last;
206 }
207 buffer_size_ = (index_[last].first - index_[current_index_].first)/INDEXED_RECORDIO_ALIGN;
208 current_index_ = last;
209 return chunk->Load(this, buffer_size_);
210 }
211 return true;
212}
213
214bool IndexedRecordIOSplitter::NextBatch(Blob *out_chunk, size_t batch_size) {
215 while (!ExtractNextChunk(out_chunk, &tmp_chunk_)) {
216 if (!NextBatchEx(&tmp_chunk_, batch_size)) return false;
217 }
218 return true;
219}
220
222 if (shuffle_) {
223 permutation_.clear();
224 for (size_t i = index_begin_; i < index_end_; ++i) {
225 permutation_.push_back(i);
226 }
227 std::shuffle(permutation_.begin(), permutation_.end(), rnd_);
228 current_index_ = 0;
229 } else {
230 current_index_ = index_begin_;
231 }
233}
234} // namespace io
235} // namespace dmlc
static const uint32_t kMagic
magic number of recordio note: (kMagic >> 29U) & 7 > 3 this ensures lrec will not be kMagic
Definition recordio.h:45
static uint32_t DecodeFlag(uint32_t rec)
decode the flag part of lrecord
Definition recordio.h:60
static uint32_t DecodeLength(uint32_t rec)
decode the length part of lrecord
Definition recordio.h:68
virtual void Seek(size_t pos)=0
seek to certain position of the file
interface of stream I/O for serialization
Definition io.h:30
size_t SeekRecordBegin(Stream *fi) override
seek to the beginning of the first record in current file pointer
Definition indexed_recordio_split.cc:66
bool NextBatchEx(Chunk *out_chunk, size_t n_records) override
fill the given chunk with new batch of data without using internal temporary chunk
Definition indexed_recordio_split.cc:159
bool NextChunk(Blob *out_chunk) override
get a chunk of memory that can contain multiple records, the caller needs to parse the content of the...
Definition indexed_recordio_split.cc:155
const char * FindLastRecordBegin(const char *begin, const char *end) override
find the last occurance of record header
Definition indexed_recordio_split.cc:86
bool ReadChunk(void *buf, size_t *size) override
read a chunk of data into buf the data can span multiple records, but cannot contain partial records
Definition indexed_recordio_split.cc:144
bool ExtractNextRecord(Blob *out_rec, Chunk *chunk) override
extract next record from the chunk
Definition indexed_recordio_split.cc:104
void ResetPartition(unsigned rank, unsigned nsplit) override
reset the Input split to a certain part id, The InputSplit will be pointed to the head of the new spe...
Definition indexed_recordio_split.cc:12
void BeforeFirst(void) override
reset the position of InputSplit to beginning
Definition indexed_recordio_split.cc:221
bool NextBatch(Blob *out_chunk, size_t n_records) override
get a chunk of memory that can contain multiple records, with hint for how many records is needed,...
Definition indexed_recordio_split.cc:214
size_t offset_begin_
beginning of offset
Definition input_split_base.h:127
size_t offset_end_
end of the offset
Definition input_split_base.h:129
SeekStream * fs_
current input stream
Definition input_split_base.h:133
size_t file_ptr_end_
file pointer where the end of file lies
Definition input_split_base.h:137
std::vector< FileInfo > files_
information about files
Definition input_split_base.h:131
FileSystem * filesys_
FileSystem.
Definition input_split_base.h:121
std::vector< size_t > file_offset_
byte-offset of each file
Definition input_split_base.h:123
size_t file_ptr_
file pointer of which file to read on
Definition input_split_base.h:135
bool ExtractNextChunk(Blob *out_rchunk, Chunk *chunk)
extract next chunk from the chunk
Definition input_split_base.cc:300
virtual void BeforeFirst(void)
reset the position of InputSplit to beginning
Definition input_split_base.cc:66
std::vector< URI > ConvertToURIs(const std::string &uri)
split string list of files into vector of URIs
Definition input_split_base.cc:96
size_t offset_curr_
get the current offset
Definition input_split_base.h:125
size_t buffer_size_
buffer size
Definition input_split_base.h:141
size_t Read(void *ptr, size_t size)
same as stream.Read
Definition input_split_base.cc:177
Chunk tmp_chunk_
temporal chunk
Definition input_split_base.h:139
a std::istream class that can can wrap Stream objects, can use istream with that output to underlying...
Definition io.h:385
defines serializable interface of dmlc
defines logging macros of dmlc allows use of GLOG, fall back to internal implementation when disabled
input split that splits indexed recordio files
namespace for dmlc
Definition array_view.h:12
recordio that is able to pack binary data into a splittable format, useful to exchange data in binary...
a blob of memory region
Definition io.h:158
size_t size
size of the memory region
Definition io.h:162
void * dptr
points to start of the memory region
Definition io.h:160
helper struct to hold chunk data with internal pointer to move along the record
Definition input_split_base.h:27