16 const bool recurse_directories) {
19 this->InitInputFileInfo(uri, recurse_directories);
22 for (
size_t i = 0; i <
files_.size(); ++i) {
24 CHECK(
files_[i].size % align_bytes == 0)
25 <<
"file do not align by " << align_bytes <<
" bytes";
27 this->align_bytes_ = align_bytes;
33 size_t nstep = (ntotal + nsplit - 1) / nsplit;
35 nstep = ((nstep + align_bytes_ - 1) / align_bytes_) * align_bytes_;
84InputSplitBase::~InputSplitBase(
void) {
89std::string InputSplitBase::StripEnd(std::string str,
char ch) {
90 while (str.length() != 0 && str[str.length() - 1] == ch) {
91 str.resize(str.length() - 1);
99 std::vector<std::string> file_list =
Split(uri, dlm);
100 std::vector<URI> expanded_list;
103 for (
size_t i = 0; i < file_list.size(); ++i) {
104 URI path(file_list[i].c_str());
105 size_t pos = path.name.rfind(
'/');
106 if (pos == std::string::npos || pos + 1 == path.name.length()) {
107 expanded_list.push_back(path);
110 dir.name = path.name.substr(0, pos);
111 std::vector<FileInfo> dfiles;
112 filesys_->ListDirectory(dir, &dfiles);
113 bool exact_match =
false;
114 for (
size_t i = 0; i < dfiles.size(); ++i) {
115 if (StripEnd(dfiles[i].path.name,
'/') == StripEnd(path.name,
'/')) {
116 expanded_list.push_back(dfiles[i].path);
123 std::string spattern = path.name;
125 std::regex pattern(spattern);
126 for (
size_t i = 0; i < dfiles.size(); ++i) {
127 if (dfiles[i].type != kFile || dfiles[i].size == 0)
continue;
128 std::string stripped = StripEnd(dfiles[i].path.name,
'/');
129 std::smatch base_match;
130 if (std::regex_match(stripped, base_match, pattern)) {
131 for (
size_t j = 0; j < base_match.size(); ++j) {
132 if (base_match[j].str() == stripped) {
133 expanded_list.push_back(dfiles[i].path);
break;
138 }
catch (std::regex_error& e) {
139 LOG(FATAL) << e.what() <<
" bad regex " << spattern
140 <<
"This could due to compiler version, g++-4.9 is needed";
146 return expanded_list;
149void InputSplitBase::InitInputFileInfo(
const std::string& uri,
150 const bool recurse_directories) {
152 for (
size_t i = 0; i < expanded_list.size(); ++i) {
153 const URI& path = expanded_list[i];
154 FileInfo info =
filesys_->GetPathInfo(path);
155 if (info.type == kDirectory) {
156 std::vector<FileInfo> dfiles;
157 if (!recurse_directories) {
158 filesys_->ListDirectory(info.path, &dfiles);
160 filesys_->ListDirectoryRecursive(info.path, &dfiles);
162 for (
size_t i = 0; i < dfiles.size(); ++i) {
163 if (dfiles[i].size != 0 && dfiles[i].type == kFile) {
164 files_.push_back(dfiles[i]);
168 if (info.size != 0) {
173 CHECK_NE(
files_.size(), 0U)
174 <<
"Cannot find any files that matches the URI pattern " << uri;
187 if (size == 0)
return 0;
189 char *buf =
reinterpret_cast<char*
>(ptr);
191 size_t n =
fs_->
Read(buf, nleft);
192 nleft -= n; buf += n;
194 if (nleft == 0)
break;
196 if (is_text_parser) {
199 buf[0] =
'\n'; ++buf; --nleft;
208 LOG(ERROR) <<
"offset[" << i <<
"]=" <<
file_offset_[i];
210 LOG(FATAL) <<
"file offset not calculated correctly";
222 size_t max_size = *size;
223 if (max_size <= overflow_.length()) {
224 *size = 0;
return true;
226 if (overflow_.length() != 0) {
227 std::memcpy(buf,
BeginPtr(overflow_), overflow_.length());
229 size_t olen = overflow_.length();
231 size_t nread = this->
Read(
reinterpret_cast<char*
>(buf) + olen,
234 if (nread == 0)
return false;
239 char *bufptr =
reinterpret_cast<char*
>(buf);
240 bufptr[nread] =
'\n';
244 if (nread != max_size) {
249 const char *bptr =
reinterpret_cast<const char*
>(buf);
253 overflow_.resize(nread - *size);
254 if (overflow_.length() != 0) {
255 std::memcpy(
BeginPtr(overflow_), bend, overflow_.length());
260bool InputSplitBase::Chunk::Load(
InputSplitBase *split,
size_t buffer_size) {
261 data.resize(buffer_size + 1);
264 size_t size = (data.size() - 1) *
sizeof(uint32_t);
267 if (!split->ReadChunk(
BeginPtr(data), &size))
return false;
269 data.resize(data.size() * 2);
271 begin =
reinterpret_cast<char *
>(
BeginPtr(data));
279bool InputSplitBase::Chunk::Append(InputSplitBase *split,
size_t buffer_size) {
280 size_t previous_size = end - begin;
281 data.resize(data.size() + buffer_size);
284 size_t size = buffer_size *
sizeof(uint32_t);
287 if (!split->ReadChunk(
reinterpret_cast<char *
>(
BeginPtr(data)) + previous_size, &size))
290 data.resize(data.size() * 2);
292 begin =
reinterpret_cast<char *
>(
BeginPtr(data));
293 end = begin + previous_size + size;
301 if (chunk->begin == chunk->end)
return false;
302 out_chunk->
dptr = chunk->begin;
303 out_chunk->
size = chunk->end - chunk->begin;
304 chunk->begin = chunk->end;
virtual void Seek(size_t pos)=0
seek to certain position of the file
virtual size_t Read(void *ptr, size_t size)=0
reads data from a stream
defines logging macros of dmlc allows use of GLOG, fall back to internal implementation when disabled
base class implementation of input splitter
namespace for dmlc
Definition array_view.h:12
std::vector< std::string > Split(const std::string &s, char delim)
Split a string by delimiter.
Definition common.h:23
T * BeginPtr(std::vector< T > &vec)
safely get the beginning address of a vector
Definition base.h:284
defines some common utility function.