Medial Code Documentation
Loading...
Searching...
No Matches
io.h
Go to the documentation of this file.
1
7#ifndef XGBOOST_COMMON_IO_H_
8#define XGBOOST_COMMON_IO_H_
9
10#include <dmlc/io.h>
11#include <rabit/rabit.h>
12
13#include <algorithm> // for min, fill_n, copy_n
14#include <array> // for array
15#include <cstddef> // for byte, size_t
16#include <cstdlib> // for malloc, realloc, free
17#include <cstring> // for memcpy
18#include <fstream> // for ifstream
19#include <limits> // for numeric_limits
20#include <memory> // for unique_ptr
21#include <string> // for string
22#include <type_traits> // for alignment_of_v, enable_if_t
23#include <utility> // for move
24#include <vector> // for vector
25
26#include "common.h"
27#include "xgboost/string_view.h" // for StringView
28
29namespace xgboost::common {
30using MemoryFixSizeBuffer = rabit::utils::MemoryFixSizeBuffer;
31using MemoryBufferStream = rabit::utils::MemoryBufferStream;
32
38 public:
39 explicit PeekableInStream(dmlc::Stream* strm) : strm_(strm) {}
40
41 size_t Read(void* dptr, size_t size) override;
42 virtual size_t PeekRead(void* dptr, size_t size);
43
44 void Write(const void*, size_t) override {
45 LOG(FATAL) << "Not implemented";
46 }
47
48 private:
50 dmlc::Stream *strm_;
52 size_t buffer_ptr_{0};
54 std::string buffer_;
55};
62 public:
63 explicit FixedSizeStream(PeekableInStream* stream);
64 ~FixedSizeStream() override = default;
65
66 size_t Read(void* dptr, size_t size) override;
67 size_t PeekRead(void* dptr, size_t size) override;
68 [[nodiscard]] std::size_t Size() const { return buffer_.size(); }
69 [[nodiscard]] std::size_t Tell() const { return pointer_; }
70 void Seek(size_t pos);
71
72 void Write(const void*, size_t) override {
73 LOG(FATAL) << "Not implemented";
74 }
75
80 void Take(std::string* out);
81
82 private:
83 size_t pointer_{0};
84 std::string buffer_;
85};
86
96std::string LoadSequentialFile(std::string uri, bool stream = false);
97
105std::string FileExtension(std::string fname, bool lower = true);
106
110inline std::string ReadAll(dmlc::Stream* fi, PeekableInStream* fp) {
111 std::string buffer;
112 if (auto fixed_size = dynamic_cast<common::MemoryFixSizeBuffer*>(fi)) {
113 fixed_size->Seek(common::MemoryFixSizeBuffer::kSeekEnd);
114 size_t size = fixed_size->Tell();
115 buffer.resize(size);
116 fixed_size->Seek(0);
117 CHECK_EQ(fixed_size->Read(&buffer[0], size), size);
118 } else {
119 FixedSizeStream{fp}.Take(&buffer);
120 }
121 return buffer;
122}
123
127inline std::string ReadAll(std::string const &path) {
128 std::ifstream stream(path);
129 if (!stream.is_open()) {
130 LOG(FATAL) << "Could not open file " << path;
131 }
132 std::string content{std::istreambuf_iterator<char>(stream), std::istreambuf_iterator<char>()};
133 if (content.empty()) {
134 LOG(FATAL) << "Empty file " << path;
135 }
136 return content;
137}
138
139struct MMAPFile;
140
147 public:
148 // RTTI
149 enum Kind : std::uint8_t {
150 kMalloc = 0,
151 kMmap = 1,
152 };
153
154 private:
155 Kind kind_{kMalloc};
156
157 public:
158 virtual void* Data() = 0;
159 template <typename T>
160 [[nodiscard]] T* DataAs() {
161 return reinterpret_cast<T*>(this->Data());
162 }
163
164 [[nodiscard]] virtual std::size_t Size() const = 0;
165 [[nodiscard]] auto Type() const { return kind_; }
166
167 // Allow exceptions for cleaning up resource.
168 virtual ~ResourceHandler() noexcept(false);
169
170 explicit ResourceHandler(Kind kind) : kind_{kind} {}
171 // Use shared_ptr to manage a pool like resource handler. All copy and assignment
172 // operators are disabled.
173 ResourceHandler(ResourceHandler const& that) = delete;
174 ResourceHandler& operator=(ResourceHandler const& that) = delete;
175 ResourceHandler(ResourceHandler&& that) = delete;
176 ResourceHandler& operator=(ResourceHandler&& that) = delete;
180 [[nodiscard]] bool IsSameType(ResourceHandler const& that) const {
181 return this->Type() == that.Type();
182 }
183};
184
186 void* ptr_{nullptr};
187 std::size_t n_{0};
188
189 void Clear() noexcept(true) {
190 std::free(ptr_);
191 ptr_ = nullptr;
192 n_ = 0;
193 }
194
195 public:
196 explicit MallocResource(std::size_t n_bytes) : ResourceHandler{kMalloc} { this->Resize(n_bytes); }
197 ~MallocResource() noexcept(true) override { this->Clear(); }
198
199 void* Data() override { return ptr_; }
200 [[nodiscard]] std::size_t Size() const override { return n_; }
209 template <bool force_malloc = false>
210 void Resize(std::size_t n_bytes, std::byte init = std::byte{0}) {
211 // realloc(ptr, 0) works, but is deprecated.
212 if (n_bytes == 0) {
213 this->Clear();
214 return;
215 }
216
217 // If realloc fails, we need to copy the data ourselves.
218 bool need_copy{false};
219 void* new_ptr{nullptr};
220 // use realloc first, it can handle nullptr.
221 if constexpr (!force_malloc) {
222 new_ptr = std::realloc(ptr_, n_bytes);
223 }
224 // retry with malloc if realloc fails
225 if (!new_ptr) {
226 // ptr_ is preserved if realloc fails
227 new_ptr = std::malloc(n_bytes);
228 need_copy = true;
229 }
230 if (!new_ptr) {
231 // malloc fails
232 LOG(FATAL) << "bad_malloc: Failed to allocate " << n_bytes << " bytes.";
233 }
234
235 if (need_copy) {
236 std::copy_n(reinterpret_cast<std::byte*>(ptr_), n_, reinterpret_cast<std::byte*>(new_ptr));
237 }
238 // default initialize
239 std::fill_n(reinterpret_cast<std::byte*>(new_ptr) + n_, n_bytes - n_, init);
240 // free the old ptr if malloc is used.
241 if (need_copy) {
242 this->Clear();
243 }
244
245 ptr_ = new_ptr;
246 n_ = n_bytes;
247 }
248};
249
254 std::unique_ptr<MMAPFile> handle_;
255 std::size_t n_;
256
257 public:
258 MmapResource(std::string path, std::size_t offset, std::size_t length);
259 ~MmapResource() noexcept(false) override;
260
261 [[nodiscard]] void* Data() override;
262 [[nodiscard]] std::size_t Size() const override;
263};
264
268constexpr std::size_t IOAlignment() {
269 // For most of the pod types in XGBoost, 8 byte is sufficient.
270 return 8;
271}
272
283 std::shared_ptr<ResourceHandler> resource_;
284 std::size_t curr_ptr_{0};
285
286 // Similar to SEEK_END in libc
287 static std::size_t constexpr kSeekEnd = std::numeric_limits<std::size_t>::max();
288
289 public:
290 explicit AlignedResourceReadStream(std::shared_ptr<ResourceHandler> resource)
291 : resource_{std::move(resource)} {}
292
293 [[nodiscard]] std::shared_ptr<ResourceHandler> Share() noexcept(true) { return resource_; }
300 [[nodiscard]] auto Consume(std::size_t n_bytes) noexcept(true) {
301 auto res_size = resource_->Size();
302 auto data = reinterpret_cast<std::byte*>(resource_->Data());
303 auto ptr = data + curr_ptr_;
304
305 // Move the cursor
306 auto aligned_n_bytes = DivRoundUp(n_bytes, IOAlignment()) * IOAlignment();
307 auto aligned_forward = std::min(res_size - curr_ptr_, aligned_n_bytes);
308 std::size_t forward = std::min(res_size - curr_ptr_, n_bytes);
309
310 curr_ptr_ += aligned_forward;
311
312 return std::pair{ptr, forward};
313 }
314
315 template <typename T>
316 [[nodiscard]] auto Consume(T* out) noexcept(false) -> std::enable_if_t<std::is_pod_v<T>, bool> {
317 auto [ptr, size] = this->Consume(sizeof(T));
318 if (size != sizeof(T)) {
319 return false;
320 }
321 CHECK_EQ(reinterpret_cast<std::uintptr_t>(ptr) % std::alignment_of_v<T>, 0);
322 *out = *reinterpret_cast<T*>(ptr);
323 return true;
324 }
325
326 [[nodiscard]] virtual std::size_t Tell() noexcept(true) { return curr_ptr_; }
330 [[nodiscard]] std::size_t Read(void* ptr, std::size_t n_bytes) noexcept(true) {
331 auto [res_ptr, forward] = this->Consume(n_bytes);
332 if (forward != 0) {
333 std::memcpy(ptr, res_ptr, forward);
334 }
335 return forward;
336 }
342 template <typename T>
343 [[nodiscard]] auto Read(T* out) noexcept(false) -> std::enable_if_t<std::is_pod_v<T>, bool> {
344 return this->Consume(out);
345 }
351 template <typename T>
352 [[nodiscard]] bool Read(std::vector<T>* out) noexcept(true) {
353 std::uint64_t n{0};
354 if (!this->Consume(&n)) {
355 return false;
356 }
357 out->resize(n);
358
359 auto n_bytes = sizeof(T) * n;
360 if (this->Read(out->data(), n_bytes) != n_bytes) {
361 return false;
362 }
363 return true;
364 }
365
366 virtual ~AlignedResourceReadStream() noexcept(false);
367};
368
378 public:
386 explicit PrivateMmapConstStream(std::string path, std::size_t offset, std::size_t length)
387 : AlignedResourceReadStream{std::shared_ptr<MmapResource>{ // NOLINT
388 new MmapResource{std::move(path), offset, length}}} {}
389 ~PrivateMmapConstStream() noexcept(false) override;
390};
391
396 protected:
397 [[nodiscard]] virtual std::size_t DoWrite(const void* ptr,
398 std::size_t n_bytes) noexcept(true) = 0;
399
400 public:
401 virtual ~AlignedWriteStream() = default;
402
403 [[nodiscard]] std::size_t Write(const void* ptr, std::size_t n_bytes) noexcept(false) {
404 auto aligned_n_bytes = DivRoundUp(n_bytes, IOAlignment()) * IOAlignment();
405 auto w_n_bytes = this->DoWrite(ptr, n_bytes);
406 CHECK_EQ(w_n_bytes, n_bytes);
407 auto remaining = aligned_n_bytes - n_bytes;
408 if (remaining > 0) {
409 std::array<std::uint8_t, IOAlignment()> padding;
410 std::memset(padding.data(), '\0', padding.size());
411 w_n_bytes = this->DoWrite(padding.data(), remaining);
412 CHECK_EQ(w_n_bytes, remaining);
413 }
414 return aligned_n_bytes;
415 }
416
417 template <typename T>
418 [[nodiscard]] std::enable_if_t<std::is_pod_v<T>, std::size_t> Write(T const& v) {
419 return this->Write(&v, sizeof(T));
420 }
421};
422
427 std::unique_ptr<dmlc::Stream> pimpl_;
428
429 protected:
430 [[nodiscard]] std::size_t DoWrite(const void* ptr, std::size_t n_bytes) noexcept(true) override;
431
432 public:
433 AlignedFileWriteStream() = default;
435 ~AlignedFileWriteStream() override = default;
436};
437
442 std::unique_ptr<MemoryBufferStream> pimpl_;
443
444 protected:
445 [[nodiscard]] std::size_t DoWrite(const void* ptr, std::size_t n_bytes) noexcept(true) override;
446
447 public:
448 explicit AlignedMemWriteStream(std::string* p_buf);
449 ~AlignedMemWriteStream() override;
450
451 [[nodiscard]] std::size_t Tell() const noexcept(true);
452};
453} // namespace xgboost::common
454#endif // XGBOOST_COMMON_IO_H_
interface of stream I/O for serialization
Definition io.h:30
Output stream backed by a file.
Definition io.h:426
Output stream backed by memory buffer.
Definition io.h:441
Wrap resource into a dmlc stream.
Definition io.h:282
bool Read(std::vector< T > *out) noexcept(true)
Read a vector.
Definition io.h:352
std::size_t Read(void *ptr, std::size_t n_bytes) noexcept(true)
Read n_bytes of data, output is copied into ptr.
Definition io.h:330
auto Read(T *out) noexcept(false) -> std::enable_if_t< std::is_pod_v< T >, bool >
Read a primitive type.
Definition io.h:343
auto Consume(std::size_t n_bytes) noexcept(true)
Consume n_bytes of data, no copying is performed.
Definition io.h:300
Base class for write stream with alignment defined by IOAlignment().
Definition io.h:395
A simple class used to consume ‘dmlc::Stream’ all at once.
Definition io.h:61
void Take(std::string *out)
Take the buffer from ‘FixedSizeStream’. The one in ‘FixedSizeStream’ will be cleared out.
Definition io.cc:116
void Write(const void *, size_t) override
writes data to a stream
Definition io.h:72
size_t Read(void *dptr, size_t size) override
reads data from a stream
Definition io.cc:93
void Resize(std::size_t n_bytes, std::byte init=std::byte{0})
Resize the resource to n_bytes.
Definition io.h:210
A class for wrapping mmap as a resource for RAII.
Definition io.h:253
Input stream that support additional PeekRead operation, besides read.
Definition io.h:37
void Write(const void *, size_t) override
writes data to a stream
Definition io.h:44
size_t Read(void *dptr, size_t size) override
reads data from a stream
Definition io.cc:46
Private mmap file as a read-only stream.
Definition io.h:377
PrivateMmapConstStream(std::string path, std::size_t offset, std::size_t length)
Construct a private mmap stream.
Definition io.h:386
Handler for one-shot resource.
Definition io.h:146
bool IsSameType(ResourceHandler const &that) const
Wether two resources have the same type.
Definition io.h:180
defines serializable interface of dmlc
Definition StdDeque.h:58
Copyright 2017-2023, XGBoost Contributors.
Definition span.h:77
std::string ReadAll(dmlc::Stream *fi, PeekableInStream *fp)
Read the whole buffer from dmlc stream.
Definition io.h:110
std::string FileExtension(std::string fname, bool lower)
Get file extension from file name.
Definition io.cc:192
constexpr std::size_t IOAlignment()
Definition io.h:268
std::string LoadSequentialFile(std::string uri, bool stream)
Helper function for loading consecutive file to avoid dmlc Stream when possible.
Definition io.cc:142
This file defines rabit's Allreduce/Broadcast interface The rabit engine contains the actual implemen...
a in memory buffer that can be read and write as stream interface
Definition io.h:79
Fixed size memory buffer as a stream.
Definition io.h:28
Definition string_view.h:15
Copyright 2015-2023 by XGBoost Contributors.