Medial Code Documentation
Loading...
Searching...
No Matches
input_split_base.cc
1// Copyright by Contributors
2#include <dmlc/logging.h>
3#include <dmlc/common.h>
4#include <algorithm>
5#include "./line_split.h"
6
7#if DMLC_USE_REGEX
8#include <regex>
9#endif
10
11namespace dmlc {
12namespace io {
13void InputSplitBase::Init(FileSystem *filesys,
14 const char *uri,
15 size_t align_bytes,
16 const bool recurse_directories) {
17 this->filesys_ = filesys;
18 // initialize the path
19 this->InitInputFileInfo(uri, recurse_directories);
20 file_offset_.resize(files_.size() + 1);
21 file_offset_[0] = 0;
22 for (size_t i = 0; i < files_.size(); ++i) {
23 file_offset_[i + 1] = file_offset_[i] + files_[i].size;
24 CHECK(files_[i].size % align_bytes == 0)
25 << "file do not align by " << align_bytes << " bytes";
26 }
27 this->align_bytes_ = align_bytes;
28}
29
31 unsigned nsplit) {
32 size_t ntotal = file_offset_.back();
33 size_t nstep = (ntotal + nsplit - 1) / nsplit;
34 // align the nstep to 4 bytes
35 nstep = ((nstep + align_bytes_ - 1) / align_bytes_) * align_bytes_;
36 offset_begin_ = std::min(nstep * rank, ntotal);
37 offset_end_ = std::min(nstep * (rank + 1), ntotal);
39 if (offset_begin_ == offset_end_) return;
40 file_ptr_ = std::upper_bound(file_offset_.begin(),
41 file_offset_.end(),
42 offset_begin_) - file_offset_.begin() - 1;
43 file_ptr_end_ = std::upper_bound(file_offset_.begin(),
44 file_offset_.end(),
45 offset_end_) - file_offset_.begin() - 1;
46 if (fs_ != NULL) {
47 delete fs_; fs_ = NULL;
48 }
49 // find the exact ending position
52 CHECK(file_ptr_end_ < files_.size());
53 fs_ = filesys_->OpenForRead(files_[file_ptr_end_].path);
56 delete fs_;
57 }
58 fs_ = filesys_->OpenForRead(files_[file_ptr_].path);
62 }
63 this->BeforeFirst();
64}
65
67 if (offset_begin_ >= offset_end_) return;
68 size_t fp = std::upper_bound(file_offset_.begin(),
69 file_offset_.end(),
70 offset_begin_) - file_offset_.begin() - 1;
71 if (file_ptr_ != fp) {
72 delete fs_;
73 file_ptr_ = fp;
74 fs_ = filesys_->OpenForRead(files_[file_ptr_].path);
75 }
76 // seek to beginning of stream
79 tmp_chunk_.begin = tmp_chunk_.end = NULL;
80 // clear overflow buffer
81 overflow_.clear();
82}
83
84InputSplitBase::~InputSplitBase(void) {
85 delete fs_;
86 // no need to delete filesystem, it was singleton
87}
88
89std::string InputSplitBase::StripEnd(std::string str, char ch) {
90 while (str.length() != 0 && str[str.length() - 1] == ch) {
91 str.resize(str.length() - 1);
92 }
93 return str;
94}
95
96std::vector<URI> InputSplitBase::ConvertToURIs(const std::string& uri) {
97 // split by :
98 const char dlm = ';';
99 std::vector<std::string> file_list = Split(uri, dlm);
100 std::vector<URI> expanded_list;
101
102 // expand by match regex pattern.
103 for (size_t i = 0; i < file_list.size(); ++i) {
104 URI path(file_list[i].c_str());
105 size_t pos = path.name.rfind('/');
106 if (pos == std::string::npos || pos + 1 == path.name.length()) {
107 expanded_list.push_back(path);
108 } else {
109 URI dir = path;
110 dir.name = path.name.substr(0, pos);
111 std::vector<FileInfo> dfiles;
112 filesys_->ListDirectory(dir, &dfiles);
113 bool exact_match = false;
114 for (size_t i = 0; i < dfiles.size(); ++i) {
115 if (StripEnd(dfiles[i].path.name, '/') == StripEnd(path.name, '/')) {
116 expanded_list.push_back(dfiles[i].path);
117 exact_match = true;
118 break;
119 }
120 }
121#if DMLC_USE_REGEX
122 if (!exact_match) {
123 std::string spattern = path.name;
124 try {
125 std::regex pattern(spattern);
126 for (size_t i = 0; i < dfiles.size(); ++i) {
127 if (dfiles[i].type != kFile || dfiles[i].size == 0) continue;
128 std::string stripped = StripEnd(dfiles[i].path.name, '/');
129 std::smatch base_match;
130 if (std::regex_match(stripped, base_match, pattern)) {
131 for (size_t j = 0; j < base_match.size(); ++j) {
132 if (base_match[j].str() == stripped) {
133 expanded_list.push_back(dfiles[i].path); break;
134 }
135 }
136 }
137 }
138 } catch (std::regex_error& e) {
139 LOG(FATAL) << e.what() << " bad regex " << spattern
140 << "This could due to compiler version, g++-4.9 is needed";
141 }
142 }
143#endif // DMLC_USE_REGEX
144 }
145 }
146 return expanded_list;
147}
148
149void InputSplitBase::InitInputFileInfo(const std::string& uri,
150 const bool recurse_directories) {
151 std::vector<URI> expanded_list = this->ConvertToURIs(uri);
152 for (size_t i = 0; i < expanded_list.size(); ++i) {
153 const URI& path = expanded_list[i];
154 FileInfo info = filesys_->GetPathInfo(path);
155 if (info.type == kDirectory) {
156 std::vector<FileInfo> dfiles;
157 if (!recurse_directories) {
158 filesys_->ListDirectory(info.path, &dfiles);
159 } else {
160 filesys_->ListDirectoryRecursive(info.path, &dfiles);
161 }
162 for (size_t i = 0; i < dfiles.size(); ++i) {
163 if (dfiles[i].size != 0 && dfiles[i].type == kFile) {
164 files_.push_back(dfiles[i]);
165 }
166 }
167 } else {
168 if (info.size != 0) {
169 files_.push_back(info);
170 }
171 }
172 }
173 CHECK_NE(files_.size(), 0U)
174 << "Cannot find any files that matches the URI pattern " << uri;
175}
176
177size_t InputSplitBase::Read(void *ptr, size_t size) {
178 const bool is_text_parser = this->IsTextParser();
179
180 if (fs_ == NULL) {
181 return 0;
182 }
183 if (offset_begin_ >= offset_end_) return 0;
184 if (offset_curr_ + size > offset_end_) {
185 size = offset_end_ - offset_curr_;
186 }
187 if (size == 0) return 0;
188 size_t nleft = size;
189 char *buf = reinterpret_cast<char*>(ptr);
190 while (true) {
191 size_t n = fs_->Read(buf, nleft);
192 nleft -= n; buf += n;
193 offset_curr_ += n;
194 if (nleft == 0) break;
195 if (n == 0) {
196 if (is_text_parser) {
197 // Insert a newline between files to handle files with NOEOL.
198 // See https://github.com/dmlc/dmlc-core/pull/385 for explanation.
199 buf[0] = '\n'; ++buf; --nleft;
200 }
201 if (offset_curr_ != file_offset_[file_ptr_ + 1]) {
202 LOG(ERROR) << "curr=" << offset_curr_
203 << ",begin=" << offset_begin_
204 << ",end=" << offset_end_
205 << ",fileptr=" << file_ptr_
206 << ",fileoffset=" << file_offset_[file_ptr_ + 1];
207 for (size_t i = 0; i < file_ptr_; ++i) {
208 LOG(ERROR) << "offset[" << i << "]=" << file_offset_[i];
209 }
210 LOG(FATAL) << "file offset not calculated correctly";
211 }
212 if (file_ptr_ + 1 >= files_.size()) break;
213 file_ptr_ += 1;
214 delete fs_;
215 fs_ = filesys_->OpenForRead(files_[file_ptr_].path);
216 }
217 }
218 return size - nleft;
219}
220
221bool InputSplitBase::ReadChunk(void *buf, size_t *size) {
222 size_t max_size = *size;
223 if (max_size <= overflow_.length()) {
224 *size = 0; return true;
225 }
226 if (overflow_.length() != 0) {
227 std::memcpy(buf, BeginPtr(overflow_), overflow_.length());
228 }
229 size_t olen = overflow_.length();
230 overflow_.resize(0);
231 size_t nread = this->Read(reinterpret_cast<char*>(buf) + olen,
232 max_size - olen);
233 nread += olen;
234 if (nread == 0) return false;
235 if (this->IsTextParser()) {
236 if (nread == olen) {
237 // Insert a newline between files to handle files with NOEOL.
238 // See https://github.com/dmlc/dmlc-core/pull/452 for explanation.
239 char *bufptr = reinterpret_cast<char*>(buf);
240 bufptr[nread] = '\n';
241 nread++;
242 }
243 } else {
244 if (nread != max_size) {
245 *size = nread;
246 return true;
247 }
248 }
249 const char *bptr = reinterpret_cast<const char*>(buf);
250 // return the last position where a record starts
251 const char *bend = this->FindLastRecordBegin(bptr, bptr + nread);
252 *size = bend - bptr;
253 overflow_.resize(nread - *size);
254 if (overflow_.length() != 0) {
255 std::memcpy(BeginPtr(overflow_), bend, overflow_.length());
256 }
257 return true;
258}
259
260bool InputSplitBase::Chunk::Load(InputSplitBase *split, size_t buffer_size) {
261 data.resize(buffer_size + 1);
262 while (true) {
263 // leave one tail chunk
264 size_t size = (data.size() - 1) * sizeof(uint32_t);
265 // set back to 0 for string safety
266 data.back() = 0;
267 if (!split->ReadChunk(BeginPtr(data), &size)) return false;
268 if (size == 0) {
269 data.resize(data.size() * 2);
270 } else {
271 begin = reinterpret_cast<char *>(BeginPtr(data));
272 end = begin + size;
273 break;
274 }
275 }
276 return true;
277}
278
279bool InputSplitBase::Chunk::Append(InputSplitBase *split, size_t buffer_size) {
280 size_t previous_size = end - begin;
281 data.resize(data.size() + buffer_size);
282 while (true) {
283 // leave one tail chunk
284 size_t size = buffer_size * sizeof(uint32_t);
285 // set back to 0 for string safety
286 data.back() = 0;
287 if (!split->ReadChunk(reinterpret_cast<char *>(BeginPtr(data)) + previous_size, &size))
288 return false;
289 if (size == 0) {
290 data.resize(data.size() * 2);
291 } else {
292 begin = reinterpret_cast<char *>(BeginPtr(data));
293 end = begin + previous_size + size;
294 break;
295 }
296 }
297 return true;
298}
299
301 if (chunk->begin == chunk->end) return false;
302 out_chunk->dptr = chunk->begin;
303 out_chunk->size = chunk->end - chunk->begin;
304 chunk->begin = chunk->end;
305 return true;
306}
307} // namespace io
308} // namespace dmlc
virtual void Seek(size_t pos)=0
seek to certain position of the file
virtual size_t Read(void *ptr, size_t size)=0
reads data from a stream
class to construct input split from multiple files
Definition input_split_base.h:21
size_t offset_begin_
beginning of offset
Definition input_split_base.h:127
size_t offset_end_
end of the offset
Definition input_split_base.h:129
SeekStream * fs_
current input stream
Definition input_split_base.h:133
size_t file_ptr_end_
file pointer where the end of file lies
Definition input_split_base.h:137
void Init(FileSystem *fs, const char *uri, size_t align_bytes, const bool recurse_directories=false)
intialize the base before doing anything
Definition input_split_base.cc:13
std::vector< FileInfo > files_
information about files
Definition input_split_base.h:131
virtual bool IsTextParser(void)=0
query whether this object is a text parser
FileSystem * filesys_
FileSystem.
Definition input_split_base.h:121
std::vector< size_t > file_offset_
byte-offset of each file
Definition input_split_base.h:123
size_t file_ptr_
file pointer of which file to read on
Definition input_split_base.h:135
virtual size_t SeekRecordBegin(Stream *fi)=0
seek to the beginning of the first record in current file pointer
bool ExtractNextChunk(Blob *out_rchunk, Chunk *chunk)
extract next chunk from the chunk
Definition input_split_base.cc:300
virtual void BeforeFirst(void)
reset the position of InputSplit to beginning
Definition input_split_base.cc:66
std::vector< URI > ConvertToURIs(const std::string &uri)
split string list of files into vector of URIs
Definition input_split_base.cc:96
virtual bool ReadChunk(void *buf, size_t *size)
read a chunk of data into buf the data can span multiple records, but cannot contain partial records
Definition input_split_base.cc:221
size_t offset_curr_
get the current offset
Definition input_split_base.h:125
size_t Read(void *ptr, size_t size)
same as stream.Read
Definition input_split_base.cc:177
virtual const char * FindLastRecordBegin(const char *begin, const char *end)=0
find the last occurance of record header
Chunk tmp_chunk_
temporal chunk
Definition input_split_base.h:139
virtual void ResetPartition(unsigned rank, unsigned nsplit)
reset the Input split to a certain part id, The InputSplit will be pointed to the head of the new spe...
Definition input_split_base.cc:30
defines logging macros of dmlc allows use of GLOG, fall back to internal implementation when disabled
base class implementation of input splitter
namespace for dmlc
Definition array_view.h:12
std::vector< std::string > Split(const std::string &s, char delim)
Split a string by delimiter.
Definition common.h:23
T * BeginPtr(std::vector< T > &vec)
safely get the beginning address of a vector
Definition base.h:284
a blob of memory region
Definition io.h:158
size_t size
size of the memory region
Definition io.h:162
void * dptr
points to start of the memory region
Definition io.h:160
helper struct to hold chunk data with internal pointer to move along the record
Definition input_split_base.h:27
defines some common utility function.