Medial Code Documentation
|
implementation of basic Allreduce engine More...
#include <allreduce_base.h>
Data Structures | |
struct | LinkRecord |
struct | RefLinkVector |
simple data structure that works like a vector but takes reference instead of space More... | |
struct | ReturnType |
struct return type to avoid implicit conversion to int/bool More... | |
Public Member Functions | |
virtual bool | Init (int argc, char *argv[]) |
virtual bool | Shutdown () |
virtual void | SetParam (const char *name, const char *val) |
set parameters to the engine | |
void | TrackerPrint (const std::string &msg) override |
print the msg in the tracker, this function can be used to communicate the information of the progress to the user who monitors the tracker | |
int | GetRingPrevRank () const override |
get rank of previous node in ring topology | |
int | GetRank () const override |
get rank | |
int | GetWorldSize () const override |
get rank | |
bool | IsDistributed () const override |
whether is distributed or not | |
std::string | GetHost () const override |
get rank | |
void | Allgather (void *sendrecvbuf_, size_t total_size, size_t slice_begin, size_t slice_end, size_t size_prev_slice) override |
internal Allgather function, each node have a segment of data in the ring of sendrecvbuf, the data provided by current node k is [slice_begin, slice_end), the next node's segment must start with slice_end after the call of Allgather, sendrecvbuf_ contains all the contents including all segments use a ring based algorithm | |
void | Allreduce (void *sendrecvbuf_, size_t type_nbytes, size_t count, ReduceFunction reducer, PreprocFunction prepare_fun=nullptr, void *prepare_arg=nullptr) override |
perform in-place allreduce, on sendrecvbuf this function is NOT thread-safe | |
void | Broadcast (void *sendrecvbuf_, size_t total_size, int root) override |
broadcast data from root to all nodes | |
int | LoadCheckPoint () override |
deprecated | |
void | CheckPoint () override |
Increase internal version number. Deprecated. | |
int | VersionNumber () const override |
void | ReportStatus () const |
report current status to the job tracker depending on the job tracker we are in | |
![]() | |
~IEngine ()=default | |
virtual destructor | |
Static Public Attributes | |
static const int | kMagic = 0xff99 |
Protected Types | |
enum | ReturnTypeEnum { kSuccess , kConnReset , kRecvZeroLen , kSockError , kGetExcept } |
enumeration of possible returning results from Try functions More... | |
Protected Member Functions | |
xgboost::collective::TCPSocket | ConnectTracker () const |
initialize connection to the tracker | |
bool | ReConnectLinks (const char *cmd="start") |
connect to the tracker to fix the the missing links this function is also used when the engine start up | |
ReturnType | TryAllreduce (void *sendrecvbuf_, size_t type_nbytes, size_t count, ReduceFunction reducer) |
perform in-place allreduce, on sendrecvbuf, this function can fail, and will return the cause of failure | |
ReturnType | TryBroadcast (void *sendrecvbuf_, size_t size, int root) |
broadcast data from root to all nodes, this function can fail,and will return the cause of failure | |
ReturnType | TryAllreduceTree (void *sendrecvbuf_, size_t type_nbytes, size_t count, ReduceFunction reducer) |
perform in-place allreduce, on sendrecvbuf, this function implements tree-shape reduction | |
ReturnType | TryAllgatherRing (void *sendrecvbuf_, size_t total_size, size_t slice_begin, size_t slice_end, size_t size_prev_slice) |
internal Allgather function, each node have a segment of data in the ring of sendrecvbuf, the data provided by current node k is [slice_begin, slice_end), the next node's segment must start with slice_end after the call of Allgather, sendrecvbuf_ contains all the contents including all segments use a ring based algorithm | |
ReturnType | TryReduceScatterRing (void *sendrecvbuf_, size_t type_nbytes, size_t count, ReduceFunction reducer) |
perform in-place allreduce, reduce on the sendrecvbuf, | |
ReturnType | TryAllreduceRing (void *sendrecvbuf_, size_t type_nbytes, size_t count, ReduceFunction reducer) |
perform in-place allreduce, on sendrecvbuf use a ring based algorithm, reduce-scatter + allgather | |
ReturnType | ReportError (LinkRecord *link, ReturnType err) |
function used to report error when a link goes wrong | |
Static Protected Member Functions | |
static ReturnType | Errno2Return () |
translate errno to return type | |
Protected Attributes | |
int | seq_counter {0} |
int | version_number {0} |
bool | hadoop_mode |
int | parent_index |
int | parent_rank |
std::vector< LinkRecord > | all_links |
LinkRecord * | err_link |
RefLinkVector | tree_links |
LinkRecord * | ring_prev |
LinkRecord * | ring_next |
std::vector< std::string > | env_vars |
std::string | task_id |
std::string | host_uri |
std::string | tracker_uri |
std::string | dmlc_role |
int | tracker_port |
size_t | reduce_buffer_size |
int | reduce_method |
size_t | reduce_ring_mincount |
size_t | tree_reduce_minsize |
int | rank |
int | world_size |
int | connect_retry |
std::chrono::seconds | timeout_sec {std::chrono::seconds{1800}} |
bool | rabit_timeout = false |
bool | rabit_enable_tcp_no_delay = false |
Additional Inherited Members | |
![]() | |
typedef void() | PreprocFunction(void *arg) |
Preprocessing function, that is called before AllReduce, used to prepare the data used by AllReduce. | |
typedef void() | ReduceFunction(const void *src, void *dst, int count, const MPI::Datatype &dtype) |
reduce function, the same form of MPI reduce function is used, to be compatible with MPI interface In all the functions, the memory is ensured to aligned to 64-bit which means it is OK to cast src,dst to double* int* etc | |
implementation of basic Allreduce engine
|
protected |
enumeration of possible returning results from Try functions
|
inlineoverridevirtual |
internal Allgather function, each node have a segment of data in the ring of sendrecvbuf, the data provided by current node k is [slice_begin, slice_end), the next node's segment must start with slice_end after the call of Allgather, sendrecvbuf_ contains all the contents including all segments use a ring based algorithm
sendrecvbuf_ | buffer for both sending and receiving data, it is a ring conceptually |
total_size | total size of data to be gathered |
slice_begin | beginning of the current slice |
slice_end | end of the current slice |
size_prev_slice | size of the previous slice i.e. slice of node (rank - 1) % world_size |
Implements rabit::engine::IEngine.
Reimplemented in rabit::engine::AllreduceMock.
|
inlineoverridevirtual |
perform in-place allreduce, on sendrecvbuf this function is NOT thread-safe
sendrecvbuf_ | buffer for both sending and receiving data |
type_nbytes | the unit number of bytes the type have |
count | number of elements to be reduced |
reducer | reduce function |
prepare_func | Lazy preprocessing function, lazy prepare_fun(prepare_arg) will be called by the function before performing Allreduce, to initialize the data in sendrecvbuf_. If the result of Allreduce can be recovered directly, then prepare_func will NOT be called |
prepare_arg | argument used to passed into the lazy preprocessing function |
Implements rabit::engine::IEngine.
Reimplemented in rabit::engine::AllreduceMock.
|
inlineoverridevirtual |
broadcast data from root to all nodes
sendrecvbuf_ | buffer for both sending and receiving data |
size | the size of the data to be broadcasted |
root | the root worker id to broadcast the data |
_file | caller file name used to generate unique cache key |
_line | caller line number used to generate unique cache key |
_caller | caller function name used to generate unique cache key |
Implements rabit::engine::IEngine.
Reimplemented in rabit::engine::AllreduceMock.
|
inlineoverridevirtual |
Increase internal version number. Deprecated.
Implements rabit::engine::IEngine.
Reimplemented in rabit::engine::AllreduceMock.
|
protected |
initialize connection to the tracker
|
inlineoverridevirtual |
get rank
Implements rabit::engine::IEngine.
|
inlineoverridevirtual |
get rank
Implements rabit::engine::IEngine.
|
inlineoverridevirtual |
get rank of previous node in ring topology
Implements rabit::engine::IEngine.
|
inlineoverridevirtual |
get rank
Implements rabit::engine::IEngine.
|
inlineoverridevirtual |
whether is distributed or not
Implements rabit::engine::IEngine.
|
inlineoverridevirtual |
deprecated
Implements rabit::engine::IEngine.
Reimplemented in rabit::engine::AllreduceMock.
|
protected |
connect to the tracker to fix the the missing links this function is also used when the engine start up
cmd | possible command to sent to tracker |
|
inlineprotected |
function used to report error when a link goes wrong
link | the pointer to the link who causes the error |
err | the error type |
|
virtual |
set parameters to the engine
name | parameter name |
val | parameter value |
Reimplemented in rabit::engine::AllreduceMock.
|
overridevirtual |
print the msg in the tracker, this function can be used to communicate the information of the progress to the user who monitors the tracker
msg | message to be printed in the tracker |
Implements rabit::engine::IEngine.
|
protected |
internal Allgather function, each node have a segment of data in the ring of sendrecvbuf, the data provided by current node k is [slice_begin, slice_end), the next node's segment must start with slice_end after the call of Allgather, sendrecvbuf_ contains all the contents including all segments use a ring based algorithm
sendrecvbuf_ | buffer for both sending and receiving data, it is a ring conceptually |
total_size | total size of data to be gathered |
slice_begin | beginning of the current slice |
slice_end | end of the current slice |
size_prev_slice | size of the previous slice i.e. slice of node (rank - 1) % world_size |
sendrecvbuf_ | buffer for both sending and receiving data, it is a ring conceptually |
total_size | total size of data to be gathered |
slice_begin | beginning of the current slice |
slice_end | end of the current slice |
size_prev_slice | size of the previous slice i.e. slice of node (rank - 1) % world_size |
|
protected |
perform in-place allreduce, on sendrecvbuf, this function can fail, and will return the cause of failure
NOTE on Allreduce: The kSuccess TryAllreduce does NOT mean every node have successfully finishes TryAllreduce. It only means the current node get the correct result of Allreduce. However, it means every node finishes LAST call(instead of this one) of Allreduce/Bcast
sendrecvbuf_ | buffer for both sending and receiving data |
type_nbytes | the unit number of bytes the type have |
count | number of elements to be reduced |
reducer | reduce function |
|
protected |
perform in-place allreduce, on sendrecvbuf use a ring based algorithm, reduce-scatter + allgather
perform in-place allreduce, on sendrecvbuf use a ring based algorithm
sendrecvbuf_ | buffer for both sending and receiving data |
type_nbytes | the unit number of bytes the type have |
count | number of elements to be reduced |
reducer | reduce function |
|
protected |
perform in-place allreduce, on sendrecvbuf, this function implements tree-shape reduction
sendrecvbuf_ | buffer for both sending and receiving data |
type_nbytes | the unit number of bytes the type have |
count | number of elements to be reduced |
reducer | reduce function |
|
protected |
broadcast data from root to all nodes, this function can fail,and will return the cause of failure
sendrecvbuf_ | buffer for both sending and receiving data |
size | the size of the data to be broadcasted |
root | the root worker id to broadcast the data |
sendrecvbuf_ | buffer for both sending and receiving data |
total_size | the size of the data to be broadcasted |
root | the root worker id to broadcast the data |
|
protected |
perform in-place allreduce, reduce on the sendrecvbuf,
perform in-place allreduce, on sendrecvbuf, this function can fail, and will return the cause of failure
after the function, node k get k-th segment of the reduction result the k-th segment is defined by [k * step, min((k + 1) * step,count) ) where step = ceil(count / world_size)
sendrecvbuf_ | buffer for both sending and receiving data |
type_nbytes | the unit number of bytes the type have |
count | number of elements to be reduced |
reducer | reduce function |
Ring-based algorithm
sendrecvbuf_ | buffer for both sending and receiving data |
type_nbytes | the unit number of bytes the type have |
count | number of elements to be reduced |
reducer | reduce function |
|
inlineoverridevirtual |
Implements rabit::engine::IEngine.