Medial Code Documentation
Loading...
Searching...
No Matches
single_file_split.h
Go to the documentation of this file.
1
7#ifndef DMLC_IO_SINGLE_FILE_SPLIT_H_
8#define DMLC_IO_SINGLE_FILE_SPLIT_H_
9
10#include <dmlc/base.h>
11#include <dmlc/io.h>
12#include <dmlc/logging.h>
13#include <sys/stat.h>
14#include <cstdio>
15#include <string>
16#include <algorithm>
17
18#ifdef _WIN32
19#define stat_struct __stat64
20#define fstat _fstat64
21#define fileno _fileno
22#else // _WIN32
23#define stat_struct stat
24#endif // _WIN32
25
26namespace dmlc {
27namespace io {
33 public:
34 explicit SingleFileSplit(const char *fname)
35 : use_stdin_(false), buffer_size_(kBufferSize),
36 chunk_begin_(NULL), chunk_end_(NULL) {
37 if (!std::strcmp(fname, "stdin")) {
38#ifndef DMLC_STRICT_CXX98_
39 use_stdin_ = true; fp_ = stdin;
40#endif
41 }
42 if (!use_stdin_) {
43#if DMLC_USE_FOPEN64
44 fp_ = fopen64(fname, "rb");
45#else
46 fp_ = fopen(fname, "rb");
47#endif
48 CHECK(fp_ != NULL) << "SingleFileSplit: fail to open " << fname;
49 }
50 buffer_.resize(kBufferSize);
51 }
52 virtual ~SingleFileSplit(void) {
53 if (!use_stdin_) std::fclose(fp_);
54 }
55 virtual void BeforeFirst(void) {
56 fseek(fp_, 0, SEEK_SET);
57 }
58 virtual void HintChunkSize(size_t chunk_size) {
59 buffer_size_ = std::max(chunk_size, buffer_size_);
60 }
61 virtual size_t GetTotalSize(void) {
62 struct stat_struct buf;
63 fstat(fileno(fp_), &buf);
64 return buf.st_size;
65 }
66 virtual size_t Read(void *ptr, size_t size) {
67 return std::fread(ptr, 1, size, fp_);
68 }
69 virtual void ResetPartition(unsigned part_index, unsigned num_parts) {
70 CHECK(part_index == 0 && num_parts == 1);
71 this->BeforeFirst();
72 }
73 virtual void Write(const void * /*ptr*/, size_t /*size*/) {
74 LOG(FATAL) << "InputSplit do not support write";
75 }
76 virtual bool NextRecord(Blob *out_rec) {
77 if (chunk_begin_ == chunk_end_) {
78 if (!LoadChunk()) return false;
79 }
80 char *next = FindNextRecord(chunk_begin_,
81 chunk_end_);
82 out_rec->dptr = chunk_begin_;
83 out_rec->size = next - chunk_begin_;
84 chunk_begin_ = next;
85 return true;
86 }
87 virtual bool NextChunk(Blob *out_chunk) {
88 if (chunk_begin_ == chunk_end_) {
89 if (!LoadChunk()) return false;
90 }
91 out_chunk->dptr = chunk_begin_;
92 out_chunk->size = chunk_end_ - chunk_begin_;
93 chunk_begin_ = chunk_end_;
94 return true;
95 }
96 inline bool ReadChunk(void *buf, size_t *size) {
97 size_t max_size = *size;
98 if (max_size <= overflow_.length()) {
99 *size = 0; return true;
100 }
101 if (overflow_.length() != 0) {
102 std::memcpy(buf, BeginPtr(overflow_), overflow_.length());
103 }
104 size_t olen = overflow_.length();
105 overflow_.resize(0);
106 size_t nread = this->Read(reinterpret_cast<char*>(buf) + olen,
107 max_size - olen);
108 nread += olen;
109 if (nread == 0) return false;
110 if (nread != max_size) {
111 *size = nread;
112 return true;
113 } else {
114 const char *bptr = reinterpret_cast<const char*>(buf);
115 // return the last position where a record starts
116 const char *bend = this->FindLastRecordBegin(bptr, bptr + max_size);
117 *size = bend - bptr;
118 overflow_.resize(max_size - *size);
119 if (overflow_.length() != 0) {
120 std::memcpy(BeginPtr(overflow_), bend, overflow_.length());
121 }
122 return true;
123 }
124 }
125
126 protected:
127 inline const char* FindLastRecordBegin(const char *begin,
128 const char *end) {
129 if (begin == end) return begin;
130 for (const char *p = end - 1; p != begin; --p) {
131 if (*p == '\n' || *p == '\r') return p + 1;
132 }
133 return begin;
134 }
135 inline char* FindNextRecord(char *begin, char *end) {
136 char *p;
137 for (p = begin; p != end; ++p) {
138 if (*p == '\n' || *p == '\r') break;
139 }
140 for (; p != end; ++p) {
141 if (*p != '\n' && *p != '\r') return p;
142 }
143 return end;
144 }
145 inline bool LoadChunk(void) {
146 if (buffer_.length() < buffer_size_) {
147 buffer_.resize(buffer_size_);
148 }
149 while (true) {
150 size_t size = buffer_.length();
151 if (!ReadChunk(BeginPtr(buffer_), &size)) return false;
152 if (size == 0) {
153 buffer_.resize(buffer_.length() * 2);
154 } else {
155 chunk_begin_ = reinterpret_cast<char *>(BeginPtr(buffer_));
156 chunk_end_ = chunk_begin_ + size;
157 break;
158 }
159 }
160 return true;
161 }
162
163 private:
164 // buffer size
165 static const size_t kBufferSize = 1 << 18UL;
166 // file
167 std::FILE *fp_;
168 bool use_stdin_;
169 // internal overflow
170 std::string overflow_;
171 // internal buffer
172 std::string buffer_;
173 // internal buffer size
174 size_t buffer_size_;
175 // beginning of chunk
176 char *chunk_begin_;
177 // end of chunk
178 char *chunk_end_;
179};
180} // namespace io
181} // namespace dmlc
182#endif // DMLC_IO_SINGLE_FILE_SPLIT_H_
input split creates that allows reading of records from split of data, independent part that covers a...
Definition io.h:155
line split implementation from single FILE simply returns lines of files, used for stdin
Definition single_file_split.h:32
virtual bool NextRecord(Blob *out_rec)
get the next record, the returning value is valid until next call to NextRecord, NextChunk or NextBat...
Definition single_file_split.h:76
virtual size_t GetTotalSize(void)
get the total size of the InputSplit
Definition single_file_split.h:61
virtual void ResetPartition(unsigned part_index, unsigned num_parts)
reset the Input split to a certain part id, The InputSplit will be pointed to the head of the new spe...
Definition single_file_split.h:69
virtual void BeforeFirst(void)
reset the position of InputSplit to beginning
Definition single_file_split.h:55
virtual bool NextChunk(Blob *out_chunk)
get a chunk of memory that can contain multiple records, the caller needs to parse the content of the...
Definition single_file_split.h:87
virtual void HintChunkSize(size_t chunk_size)
hint the inputsplit how large the chunk size it should return when implementing NextChunk this is a h...
Definition single_file_split.h:58
defines configuration macros
defines serializable interface of dmlc
defines logging macros of dmlc allows use of GLOG, fall back to internal implementation when disabled
namespace for dmlc
Definition array_view.h:12
T * BeginPtr(std::vector< T > &vec)
safely get the beginning address of a vector
Definition base.h:284
a blob of memory region
Definition io.h:158
size_t size
size of the memory region
Definition io.h:162
void * dptr
points to start of the memory region
Definition io.h:160