Medial Code Documentation
Loading...
Searching...
No Matches
io.h
Go to the documentation of this file.
1
6#ifndef DMLC_IO_H_
7#define DMLC_IO_H_
8#include <cstdio>
9#include <string>
10#include <cstring>
11#include <vector>
12#include <istream>
13#include <ostream>
14#include <streambuf>
15#include "./logging.h"
16
17// include uint64_t only to make io standalone
18#ifdef _MSC_VER
20typedef unsigned __int64 uint64_t;
21#else
22#include <inttypes.h>
23#endif
24
26namespace dmlc {
30class Stream { // NOLINT(*)
31 public:
38 virtual size_t Read(void *ptr, size_t size) = 0;
44 virtual void Write(const void *ptr, size_t size) = 0;
46 virtual ~Stream(void) {}
57 static Stream *Create(const char *uri,
58 const char* const flag,
59 bool allow_null = false);
60 // helper functions to write/read different data structures
73 template<typename T>
74 inline void Write(const T &data);
87 template<typename T>
88 inline bool Read(T *out_data);
95 template<typename T>
96 inline void WriteArray(const T* data, size_t num_elems);
104 template<typename T>
105 inline bool ReadArray(T* data, size_t num_elems);
106};
107
109class SeekStream: public Stream {
110 public:
111 // virtual destructor
112 virtual ~SeekStream(void) {}
114 virtual void Seek(size_t pos) = 0;
116 virtual size_t Tell(void) = 0;
127 static SeekStream *CreateForRead(const char *uri,
128 bool allow_null = false);
129};
130
133 public:
135 virtual ~Serializable() {}
140 virtual void Load(Stream *fi) = 0;
145 virtual void Save(Stream *fo) const = 0;
146};
147
156 public:
158 struct Blob {
160 void *dptr;
162 size_t size;
163 };
171 virtual void HintChunkSize(size_t /*chunk_size*/) {}
173 virtual size_t GetTotalSize(void) = 0;
175 virtual void BeforeFirst(void) = 0;
189 virtual bool NextRecord(Blob *out_rec) = 0;
209 virtual bool NextChunk(Blob *out_chunk) = 0;
228 virtual bool NextBatch(Blob *out_chunk, size_t /*n_records*/) { return NextChunk(out_chunk); }
230 virtual ~InputSplit(void) DMLC_THROW_EXCEPTION {}
238 virtual void ResetPartition(unsigned part_index, unsigned num_parts) = 0;
257 static InputSplit* Create(const char *uri,
258 unsigned part_index,
259 unsigned num_parts,
260 const char *type);
289 static InputSplit* Create(const char *uri,
290 const char *index_uri,
291 unsigned part_index,
292 unsigned num_parts,
293 const char *type,
294 const bool shuffle = false,
295 const int seed = 0,
296 const size_t batch_size = 256,
297 const bool recurse_directories = false);
298};
299
300#ifndef _LIBCPP_SGX_NO_IOSTREAMS
314class ostream : public std::basic_ostream<char> {
315 public:
321 explicit ostream(Stream *stream,
322 size_t buffer_size = (1 << 10))
323 : std::basic_ostream<char>(NULL), buf_(buffer_size) {
324 this->set_stream(stream);
325 }
326 // explictly synchronize the buffer
327 virtual ~ostream() DMLC_NO_EXCEPTION {
328 buf_.pubsync();
329 }
334 inline void set_stream(Stream *stream) {
335 buf_.set_stream(stream);
336 this->rdbuf(&buf_);
337 }
338
340 inline size_t bytes_written(void) const {
341 return buf_.bytes_out();
342 }
343
344 private:
345 // internal streambuf
346 class OutBuf : public std::streambuf {
347 public:
348 explicit OutBuf(size_t buffer_size)
349 : stream_(NULL), buffer_(buffer_size), bytes_out_(0) {
350 if (buffer_size == 0) buffer_.resize(2);
351 }
352 // set stream to the buffer
353 inline void set_stream(Stream *stream);
354
355 inline size_t bytes_out() const { return bytes_out_; }
356 private:
358 Stream *stream_;
360 std::vector<char> buffer_;
362 size_t bytes_out_;
363 // override sync
364 inline int_type sync(void);
365 // override overflow
366 inline int_type overflow(int c);
367 };
369 OutBuf buf_;
370};
371
385class istream : public std::basic_istream<char> {
386 public:
392 explicit istream(Stream *stream,
393 size_t buffer_size = (1 << 10))
394 : std::basic_istream<char>(NULL), buf_(buffer_size) {
395 this->set_stream(stream);
396 }
397 virtual ~istream() DMLC_NO_EXCEPTION {}
402 inline void set_stream(Stream *stream) {
403 buf_.set_stream(stream);
404 this->rdbuf(&buf_);
405 }
407 inline size_t bytes_read(void) const {
408 return buf_.bytes_read();
409 }
410
411 private:
412 // internal streambuf
413 class InBuf : public std::streambuf {
414 public:
415 explicit InBuf(size_t buffer_size)
416 : stream_(NULL), bytes_read_(0),
417 buffer_(buffer_size) {
418 if (buffer_size == 0) buffer_.resize(2);
419 }
420 // set stream to the buffer
421 inline void set_stream(Stream *stream);
422 // return how many bytes read so far
423 inline size_t bytes_read(void) const {
424 return bytes_read_;
425 }
426 private:
428 Stream *stream_;
430 size_t bytes_read_;
432 std::vector<char> buffer_;
433 // override underflow
434 inline int_type underflow();
435 };
437 InBuf buf_;
438};
439#endif
440} // namespace dmlc
441
442#include "./serializer.h"
443
444namespace dmlc {
445// implementations of inline functions
446template<typename T>
447inline void Stream::Write(const T &data) {
449}
450template<typename T>
451inline bool Stream::Read(T *out_data) {
452 return serializer::Handler<T>::Read(this, out_data);
453}
454
455template<typename T>
456inline void Stream::WriteArray(const T* data, size_t num_elems) {
457 for (size_t i = 0; i < num_elems; ++i) {
458 this->Write<T>(data[i]);
459 }
460}
461
462template<typename T>
463inline bool Stream::ReadArray(T* data, size_t num_elems) {
464 for (size_t i = 0; i < num_elems; ++i) {
465 if (!this->Read<T>(data + i)) return false;
466 }
467 return true;
468}
469
470#ifndef _LIBCPP_SGX_NO_IOSTREAMS
471// implementations for ostream
472inline void ostream::OutBuf::set_stream(Stream *stream) {
473 if (stream_ != NULL) this->pubsync();
474 this->stream_ = stream;
475 this->setp(&buffer_[0], &buffer_[0] + buffer_.size() - 1);
476}
477inline int ostream::OutBuf::sync(void) {
478 if (stream_ == NULL) return -1;
479 std::ptrdiff_t n = pptr() - pbase();
480 stream_->Write(pbase(), n);
481 this->pbump(-static_cast<int>(n));
482 bytes_out_ += n;
483 return 0;
484}
485inline int ostream::OutBuf::overflow(int c) {
486 *(this->pptr()) = c;
487 std::ptrdiff_t n = pptr() - pbase();
488 this->pbump(-static_cast<int>(n));
489 if (c == EOF) {
490 stream_->Write(pbase(), n);
491 bytes_out_ += n;
492 } else {
493 stream_->Write(pbase(), n + 1);
494 bytes_out_ += n + 1;
495 }
496 return c;
497}
498
499// implementations for istream
500inline void istream::InBuf::set_stream(Stream *stream) {
501 stream_ = stream;
502 this->setg(&buffer_[0], &buffer_[0], &buffer_[0]);
503}
504inline int istream::InBuf::underflow() {
505 char *bhead = &buffer_[0];
506 if (this->gptr() == this->egptr()) {
507 size_t sz = stream_->Read(bhead, buffer_.size());
508 this->setg(bhead, bhead, bhead + sz);
509 bytes_read_ += sz;
510 }
511 if (this->gptr() == this->egptr()) {
512 return traits_type::eof();
513 } else {
514 return traits_type::to_int_type(*gptr());
515 }
516}
517#endif
518
519namespace io {
521struct URI {
523 std::string protocol;
527 std::string host;
529 std::string name;
531 URI(void) {}
535 explicit URI(const char *uri) {
536 const char *p = std::strstr(uri, "://");
537 if (p == NULL) {
538 name = uri;
539 } else {
540 protocol = std::string(uri, p - uri + 3);
541 uri = p + 3;
542 p = std::strchr(uri, '/');
543 if (p == NULL) {
544 host = uri; name = '/';
545 } else {
546 host = std::string(uri, p - uri);
547 name = p;
548 }
549 }
550 }
552 inline std::string str(void) const {
553 return protocol + host + name;
554 }
555};
556
558enum FileType {
560 kFile,
562 kDirectory
563};
564
566struct FileInfo {
568 URI path;
570 size_t size;
572 FileType type;
574 FileInfo() : size(0), type(kFile) {}
575};
576
578class FileSystem {
579 public:
587 static FileSystem *GetInstance(const URI &path);
589 virtual ~FileSystem() {}
595 virtual FileInfo GetPathInfo(const URI &path) = 0;
601 virtual void ListDirectory(const URI &path, std::vector<FileInfo> *out_list) = 0;
607 virtual void ListDirectoryRecursive(const URI &path,
608 std::vector<FileInfo> *out_list);
616 virtual Stream *Open(const URI &path,
617 const char* const flag,
618 bool allow_null = false) = 0;
625 virtual SeekStream *OpenForRead(const URI &path,
626 bool allow_null = false) = 0;
627};
628
629} // namespace io
630} // namespace dmlc
631#endif // DMLC_IO_H_
input split creates that allows reading of records from split of data, independent part that covers a...
Definition io.h:155
virtual ~InputSplit(void) DMLC_THROW_EXCEPTION
destructor
Definition io.h:230
virtual bool NextRecord(Blob *out_rec)=0
get the next record, the returning value is valid until next call to NextRecord, NextChunk or NextBat...
virtual bool NextChunk(Blob *out_chunk)=0
get a chunk of memory that can contain multiple records, the caller needs to parse the content of the...
virtual void BeforeFirst(void)=0
reset the position of InputSplit to beginning
static InputSplit * Create(const char *uri, unsigned part_index, unsigned num_parts, const char *type)
factory function: create input split given a uri
Definition io.cc:74
virtual void ResetPartition(unsigned part_index, unsigned num_parts)=0
reset the Input split to a certain part id, The InputSplit will be pointed to the head of the new spe...
virtual bool NextBatch(Blob *out_chunk, size_t)
get a chunk of memory that can contain multiple records, with hint for how many records is needed,...
Definition io.h:228
virtual size_t GetTotalSize(void)=0
get the total size of the InputSplit
virtual void HintChunkSize(size_t)
hint the inputsplit how large the chunk size it should return when implementing NextChunk this is a h...
Definition io.h:171
interface of i/o stream that support seek
Definition io.h:109
virtual void Seek(size_t pos)=0
seek to certain position of the file
virtual size_t Tell(void)=0
tell the position of the stream
static SeekStream * CreateForRead(const char *uri, bool allow_null=false)
generic factory function create an SeekStream for read only, the stream will close the underlying fil...
Definition io.cc:140
interface for serializable objects
Definition io.h:132
virtual void Load(Stream *fi)=0
load the model from a stream
virtual void Save(Stream *fo) const =0
saves the model to a stream
virtual ~Serializable()
virtual destructor
Definition io.h:135
interface of stream I/O for serialization
Definition io.h:30
void WriteArray(const T *data, size_t num_elems)
Endian aware write array of data.
Definition io.h:456
virtual ~Stream(void)
virtual destructor
Definition io.h:46
virtual void Write(const void *ptr, size_t size)=0
writes data to a stream
virtual size_t Read(void *ptr, size_t size)=0
reads data from a stream
bool ReadArray(T *data, size_t num_elems)
Endian aware read array of data.
Definition io.h:463
static Stream * Create(const char *uri, const char *const flag, bool allow_null=false)
generic factory function create an stream, the stream will close the underlying files upon deletion
Definition io.cc:132
a std::istream class that can can wrap Stream objects, can use istream with that output to underlying...
Definition io.h:385
size_t bytes_read(void) const
Definition io.h:407
void set_stream(Stream *stream)
set internal stream to be stream, reset states
Definition io.h:402
istream(Stream *stream, size_t buffer_size=(1<< 10))
construct std::ostream type
Definition io.h:392
a std::ostream class that can can wrap Stream objects, can use ostream with that output to underlying...
Definition io.h:314
void set_stream(Stream *stream)
set internal stream to be stream, reset states
Definition io.h:334
ostream(Stream *stream, size_t buffer_size=(1<< 10))
construct std::ostream type
Definition io.h:321
size_t bytes_written(void) const
Definition io.h:340
defines console logging options for xgboost. Use to enforce unified print behavior.
namespace for dmlc
Definition array_view.h:12
dmlc::SeekStream SeekStream
re-use definition of dmlc::SeekStream
Definition io.h:24
dmlc::Stream Stream
defines stream used in rabit see definition of Stream in dmlc/io.h
Definition rabit.h:27
Definition StdDeque.h:58
serializer template class that helps serialization. This file do not need to be directly used by most...
a blob of memory region
Definition io.h:158
size_t size
size of the memory region
Definition io.h:162
void * dptr
points to start of the memory region
Definition io.h:160
static void Write(Stream *strm, const T &data)
write data to stream
Definition serializer.h:265
static bool Read(Stream *strm, T *data)
read data to stream
Definition serializer.h:283