7#ifndef DMLC_IO_SINGLE_FILE_SPLIT_H_
8#define DMLC_IO_SINGLE_FILE_SPLIT_H_
19#define stat_struct __stat64
23#define stat_struct stat
35 : use_stdin_(
false), buffer_size_(kBufferSize),
36 chunk_begin_(NULL), chunk_end_(NULL) {
37 if (!std::strcmp(fname,
"stdin")) {
38#ifndef DMLC_STRICT_CXX98_
39 use_stdin_ =
true; fp_ = stdin;
44 fp_ = fopen64(fname,
"rb");
46 fp_ = fopen(fname,
"rb");
48 CHECK(fp_ != NULL) <<
"SingleFileSplit: fail to open " << fname;
50 buffer_.resize(kBufferSize);
53 if (!use_stdin_) std::fclose(fp_);
56 fseek(fp_, 0, SEEK_SET);
59 buffer_size_ = std::max(chunk_size, buffer_size_);
62 struct stat_struct buf;
63 fstat(fileno(fp_), &buf);
66 virtual size_t Read(
void *ptr,
size_t size) {
67 return std::fread(ptr, 1, size, fp_);
70 CHECK(part_index == 0 && num_parts == 1);
73 virtual void Write(
const void * ,
size_t ) {
74 LOG(FATAL) <<
"InputSplit do not support write";
77 if (chunk_begin_ == chunk_end_) {
78 if (!LoadChunk())
return false;
80 char *next = FindNextRecord(chunk_begin_,
82 out_rec->
dptr = chunk_begin_;
83 out_rec->
size = next - chunk_begin_;
88 if (chunk_begin_ == chunk_end_) {
89 if (!LoadChunk())
return false;
91 out_chunk->
dptr = chunk_begin_;
92 out_chunk->
size = chunk_end_ - chunk_begin_;
93 chunk_begin_ = chunk_end_;
96 inline bool ReadChunk(
void *buf,
size_t *size) {
97 size_t max_size = *size;
98 if (max_size <= overflow_.length()) {
99 *size = 0;
return true;
101 if (overflow_.length() != 0) {
102 std::memcpy(buf,
BeginPtr(overflow_), overflow_.length());
104 size_t olen = overflow_.length();
106 size_t nread = this->Read(
reinterpret_cast<char*
>(buf) + olen,
109 if (nread == 0)
return false;
110 if (nread != max_size) {
114 const char *bptr =
reinterpret_cast<const char*
>(buf);
116 const char *bend = this->FindLastRecordBegin(bptr, bptr + max_size);
118 overflow_.resize(max_size - *size);
119 if (overflow_.length() != 0) {
120 std::memcpy(
BeginPtr(overflow_), bend, overflow_.length());
127 inline const char* FindLastRecordBegin(
const char *begin,
129 if (begin == end)
return begin;
130 for (
const char *p = end - 1; p != begin; --p) {
131 if (*p ==
'\n' || *p ==
'\r')
return p + 1;
135 inline char* FindNextRecord(
char *begin,
char *end) {
137 for (p = begin; p != end; ++p) {
138 if (*p ==
'\n' || *p ==
'\r')
break;
140 for (; p != end; ++p) {
141 if (*p !=
'\n' && *p !=
'\r')
return p;
145 inline bool LoadChunk(
void) {
146 if (buffer_.length() < buffer_size_) {
147 buffer_.resize(buffer_size_);
150 size_t size = buffer_.length();
151 if (!ReadChunk(
BeginPtr(buffer_), &size))
return false;
153 buffer_.resize(buffer_.length() * 2);
155 chunk_begin_ =
reinterpret_cast<char *
>(
BeginPtr(buffer_));
156 chunk_end_ = chunk_begin_ + size;
165 static const size_t kBufferSize = 1 << 18UL;
170 std::string overflow_;
line split implementation from single FILE simply returns lines of files, used for stdin
Definition single_file_split.h:32
virtual bool NextRecord(Blob *out_rec)
get the next record, the returning value is valid until next call to NextRecord, NextChunk or NextBat...
Definition single_file_split.h:76
virtual size_t GetTotalSize(void)
get the total size of the InputSplit
Definition single_file_split.h:61
virtual void ResetPartition(unsigned part_index, unsigned num_parts)
reset the Input split to a certain part id, The InputSplit will be pointed to the head of the new spe...
Definition single_file_split.h:69
virtual void BeforeFirst(void)
reset the position of InputSplit to beginning
Definition single_file_split.h:55
virtual bool NextChunk(Blob *out_chunk)
get a chunk of memory that can contain multiple records, the caller needs to parse the content of the...
Definition single_file_split.h:87
virtual void HintChunkSize(size_t chunk_size)
hint the inputsplit how large the chunk size it should return when implementing NextChunk this is a h...
Definition single_file_split.h:58
defines configuration macros
defines serializable interface of dmlc
defines logging macros of dmlc allows use of GLOG, fall back to internal implementation when disabled
namespace for dmlc
Definition array_view.h:12
T * BeginPtr(std::vector< T > &vec)
safely get the beginning address of a vector
Definition base.h:284