12 CHECK(size < (1 << 29U))
13 <<
"RecordIO only accept record less than 2^29 bytes";
14 const uint32_t umagic =
kMagic;
16 const char *magic =
reinterpret_cast<const char*
>(&umagic);
17 const char *bhead =
reinterpret_cast<const char*
>(buf);
18 uint32_t len =
static_cast<uint32_t
>(size);
19 uint32_t lower_align = (len >> 2U) << 2U;
20 uint32_t upper_align = ((len + 3U) >> 2U) << 2U;
22 for (uint32_t i = 0; i < lower_align ; i += 4) {
24 if (bhead[i] == magic[0] &&
25 bhead[i + 1] == magic[1] &&
26 bhead[i + 2] == magic[2] &&
27 bhead[i + 3] == magic[3]) {
28 uint32_t lrec =
EncodeLRec(dptr == 0 ? 1U : 2U,
30 stream_->
Write(magic, 4);
31 stream_->
Write(&lrec,
sizeof(lrec));
33 stream_->
Write(bhead + dptr, i - dptr);
39 uint32_t lrec =
EncodeLRec(dptr != 0 ? 3U : 0U,
41 stream_->
Write(magic, 4);
42 stream_->
Write(&lrec,
sizeof(lrec));
44 stream_->
Write(bhead + dptr, len - dptr);
48 if (upper_align != len) {
49 stream_->
Write(&zero, upper_align - len);
54 if (end_of_stream_)
return false;
60 size_t nread = stream_->
Read(header,
sizeof(header));
62 end_of_stream_ =
true;
return false;
64 CHECK(nread ==
sizeof(header)) <<
"Inavlid RecordIO File";
68 uint32_t upper_align = ((len + 3U) >> 2U) << 2U;
69 out_rec->resize(size + upper_align);
70 if (upper_align != 0) {
71 CHECK(stream_->
Read(
BeginPtr(*out_rec) + size, upper_align) == upper_align)
72 <<
"Invalid RecordIO File upper_align=" << upper_align;
75 size += len; out_rec->resize(size);
76 if (cflag == 0U || cflag == 3U)
break;
77 out_rec->resize(size +
sizeof(kMagic));
78 std::memcpy(
BeginPtr(*out_rec) + size, &kMagic,
sizeof(kMagic));
79 size +=
sizeof(kMagic);
85inline char *FindNextRecordIOHead(
char *begin,
char *end) {
86 CHECK_EQ((
reinterpret_cast<size_t>(begin) & 3UL), 0U);
87 CHECK_EQ((
reinterpret_cast<size_t>(end) & 3UL), 0U);
88 uint32_t *p =
reinterpret_cast<uint32_t *
>(begin);
89 uint32_t *pend =
reinterpret_cast<uint32_t *
>(end);
90 for (; p + 1 < pend; ++p) {
93 if (cflag == 0 || cflag == 1) {
94 return reinterpret_cast<char*
>(p);
103 unsigned num_parts) {
104 size_t nstep = (chunk.
size + num_parts - 1) / num_parts;
106 nstep = ((nstep + 3UL) >> 2UL) << 2UL;
107 size_t begin = std::min(chunk.
size, nstep * part_index);
108 size_t end = std::min(chunk.
size, nstep * (part_index + 1));
109 char *head =
reinterpret_cast<char*
>(chunk.
dptr);
110 pbegin_ = FindNextRecordIOHead(head + begin, head + chunk.
size);
111 pend_ = FindNextRecordIOHead(head + end, head + chunk.
size);
115 if (pbegin_ >= pend_)
return false;
116 uint32_t *p =
reinterpret_cast<uint32_t *
>(pbegin_);
122 out_rec->
dptr = pbegin_ + 2 *
sizeof(uint32_t);
124 pbegin_ += 2 *
sizeof(uint32_t) + (((clen + 3U) >> 2U) << 2U);
125 CHECK(pbegin_ <= pend_) <<
"Invalid RecordIO Format";
126 out_rec->
size = clen;
131 CHECK(cflag == 1U) <<
"Invalid RecordIO Format";
134 CHECK(pbegin_ + 2 *
sizeof(uint32_t) <= pend_);
135 p =
reinterpret_cast<uint32_t *
>(pbegin_);
139 size_t tsize = temp_.length();
140 temp_.resize(tsize + clen);
142 std::memcpy(
BeginPtr(temp_) + tsize,
143 pbegin_ + 2 *
sizeof(uint32_t),
147 pbegin_ += 2 *
sizeof(uint32_t) + (((clen + 3U) >> 2U) << 2U);
148 if (cflag == 3U)
break;
149 temp_.resize(tsize +
sizeof(kMagic));
150 std::memcpy(
BeginPtr(temp_) + tsize, &kMagic,
sizeof(kMagic));
153 out_rec->
size = temp_.length();
RecordIOChunkReader(InputSplit::Blob chunk, unsigned part_index=0, unsigned num_parts=1)
constructor
Definition recordio.cc:101
bool NextRecord(InputSplit::Blob *out_rec)
read next complete record from stream the blob contains the memory content NOTE: this function is not...
Definition recordio.cc:114
bool NextRecord(std::string *out_rec)
read next complete record from stream
Definition recordio.cc:53
static const uint32_t kMagic
magic number of recordio note: (kMagic >> 29U) & 7 > 3 this ensures lrec will not be kMagic
Definition recordio.h:45
static uint32_t DecodeFlag(uint32_t rec)
decode the flag part of lrecord
Definition recordio.h:60
static uint32_t DecodeLength(uint32_t rec)
decode the length part of lrecord
Definition recordio.h:68
static uint32_t EncodeLRec(uint32_t cflag, uint32_t length)
encode the lrecord
Definition recordio.h:52
void WriteRecord(const void *buf, size_t size)
write record to the stream
Definition recordio.cc:11
virtual void Write(const void *ptr, size_t size)=0
writes data to a stream
virtual size_t Read(void *ptr, size_t size)=0
reads data from a stream
defines configuration macros
defines logging macros of dmlc allows use of GLOG, fall back to internal implementation when disabled
namespace for dmlc
Definition array_view.h:12
T * BeginPtr(std::vector< T > &vec)
safely get the beginning address of a vector
Definition base.h:284
recordio that is able to pack binary data into a splittable format, useful to exchange data in binary...