Medial Code Documentation
Loading...
Searching...
No Matches
Data Structures | Public Member Functions | Static Public Attributes | Protected Types | Protected Member Functions | Static Protected Member Functions | Protected Attributes
rabit::engine::AllreduceBase Class Reference

implementation of basic Allreduce engine More...

#include <allreduce_base.h>

Inheritance diagram for rabit::engine::AllreduceBase:
rabit::engine::IEngine rabit::engine::AllreduceMock

Data Structures

struct  LinkRecord
 
struct  RefLinkVector
 simple data structure that works like a vector but takes reference instead of space More...
 
struct  ReturnType
 struct return type to avoid implicit conversion to int/bool More...
 

Public Member Functions

virtual bool Init (int argc, char *argv[])
 
virtual bool Shutdown ()
 
virtual void SetParam (const char *name, const char *val)
 set parameters to the engine
 
void TrackerPrint (const std::string &msg) override
 print the msg in the tracker, this function can be used to communicate the information of the progress to the user who monitors the tracker
 
int GetRingPrevRank () const override
 get rank of previous node in ring topology
 
int GetRank () const override
 get rank
 
int GetWorldSize () const override
 get rank
 
bool IsDistributed () const override
 whether is distributed or not
 
std::string GetHost () const override
 get rank
 
void Allgather (void *sendrecvbuf_, size_t total_size, size_t slice_begin, size_t slice_end, size_t size_prev_slice) override
 internal Allgather function, each node have a segment of data in the ring of sendrecvbuf, the data provided by current node k is [slice_begin, slice_end), the next node's segment must start with slice_end after the call of Allgather, sendrecvbuf_ contains all the contents including all segments use a ring based algorithm
 
void Allreduce (void *sendrecvbuf_, size_t type_nbytes, size_t count, ReduceFunction reducer, PreprocFunction prepare_fun=nullptr, void *prepare_arg=nullptr) override
 perform in-place allreduce, on sendrecvbuf this function is NOT thread-safe
 
void Broadcast (void *sendrecvbuf_, size_t total_size, int root) override
 broadcast data from root to all nodes
 
int LoadCheckPoint () override
 deprecated
 
void CheckPoint () override
 Increase internal version number. Deprecated.
 
int VersionNumber () const override
 
void ReportStatus () const
 report current status to the job tracker depending on the job tracker we are in
 
- Public Member Functions inherited from rabit::engine::IEngine
 ~IEngine ()=default
 virtual destructor
 

Static Public Attributes

static const int kMagic = 0xff99
 

Protected Types

enum  ReturnTypeEnum {
  kSuccess , kConnReset , kRecvZeroLen , kSockError ,
  kGetExcept
}
 enumeration of possible returning results from Try functions More...
 

Protected Member Functions

xgboost::collective::TCPSocket ConnectTracker () const
 initialize connection to the tracker
 
bool ReConnectLinks (const char *cmd="start")
 connect to the tracker to fix the the missing links this function is also used when the engine start up
 
ReturnType TryAllreduce (void *sendrecvbuf_, size_t type_nbytes, size_t count, ReduceFunction reducer)
 perform in-place allreduce, on sendrecvbuf, this function can fail, and will return the cause of failure
 
ReturnType TryBroadcast (void *sendrecvbuf_, size_t size, int root)
 broadcast data from root to all nodes, this function can fail,and will return the cause of failure
 
ReturnType TryAllreduceTree (void *sendrecvbuf_, size_t type_nbytes, size_t count, ReduceFunction reducer)
 perform in-place allreduce, on sendrecvbuf, this function implements tree-shape reduction
 
ReturnType TryAllgatherRing (void *sendrecvbuf_, size_t total_size, size_t slice_begin, size_t slice_end, size_t size_prev_slice)
 internal Allgather function, each node have a segment of data in the ring of sendrecvbuf, the data provided by current node k is [slice_begin, slice_end), the next node's segment must start with slice_end after the call of Allgather, sendrecvbuf_ contains all the contents including all segments use a ring based algorithm
 
ReturnType TryReduceScatterRing (void *sendrecvbuf_, size_t type_nbytes, size_t count, ReduceFunction reducer)
 perform in-place allreduce, reduce on the sendrecvbuf,
 
ReturnType TryAllreduceRing (void *sendrecvbuf_, size_t type_nbytes, size_t count, ReduceFunction reducer)
 perform in-place allreduce, on sendrecvbuf use a ring based algorithm, reduce-scatter + allgather
 
ReturnType ReportError (LinkRecord *link, ReturnType err)
 function used to report error when a link goes wrong
 

Static Protected Member Functions

static ReturnType Errno2Return ()
 translate errno to return type
 

Protected Attributes

int seq_counter {0}
 
int version_number {0}
 
bool hadoop_mode
 
int parent_index
 
int parent_rank
 
std::vector< LinkRecordall_links
 
LinkRecorderr_link
 
RefLinkVector tree_links
 
LinkRecordring_prev
 
LinkRecordring_next
 
std::vector< std::string > env_vars
 
std::string task_id
 
std::string host_uri
 
std::string tracker_uri
 
std::string dmlc_role
 
int tracker_port
 
size_t reduce_buffer_size
 
int reduce_method
 
size_t reduce_ring_mincount
 
size_t tree_reduce_minsize
 
int rank
 
int world_size
 
int connect_retry
 
std::chrono::seconds timeout_sec {std::chrono::seconds{1800}}
 
bool rabit_timeout = false
 
bool rabit_enable_tcp_no_delay = false
 

Additional Inherited Members

- Public Types inherited from rabit::engine::IEngine
typedef void() PreprocFunction(void *arg)
 Preprocessing function, that is called before AllReduce, used to prepare the data used by AllReduce.
 
typedef void() ReduceFunction(const void *src, void *dst, int count, const MPI::Datatype &dtype)
 reduce function, the same form of MPI reduce function is used, to be compatible with MPI interface In all the functions, the memory is ensured to aligned to 64-bit which means it is OK to cast src,dst to double* int* etc
 

Detailed Description

implementation of basic Allreduce engine

Member Enumeration Documentation

◆ ReturnTypeEnum

enumeration of possible returning results from Try functions

Enumerator
kSuccess 

execution is successful

kConnReset 

a link was reset by peer

kRecvZeroLen 

received a zero length message

kSockError 

a neighbor node go down, the connection is dropped

kGetExcept 

another node which is not my neighbor go down, get Out-of-Band exception notification from my neighbor

Member Function Documentation

◆ Allgather()

void rabit::engine::AllreduceBase::Allgather ( void *  sendrecvbuf_,
size_t  total_size,
size_t  slice_begin,
size_t  slice_end,
size_t  size_prev_slice 
)
inlineoverridevirtual

internal Allgather function, each node have a segment of data in the ring of sendrecvbuf, the data provided by current node k is [slice_begin, slice_end), the next node's segment must start with slice_end after the call of Allgather, sendrecvbuf_ contains all the contents including all segments use a ring based algorithm

Parameters
sendrecvbuf_buffer for both sending and receiving data, it is a ring conceptually
total_sizetotal size of data to be gathered
slice_beginbeginning of the current slice
slice_endend of the current slice
size_prev_slicesize of the previous slice i.e. slice of node (rank - 1) % world_size

Implements rabit::engine::IEngine.

Reimplemented in rabit::engine::AllreduceMock.

◆ Allreduce()

void rabit::engine::AllreduceBase::Allreduce ( void *  sendrecvbuf_,
size_t  type_nbytes,
size_t  count,
ReduceFunction  reducer,
PreprocFunction  prepare_fun = nullptr,
void *  prepare_arg = nullptr 
)
inlineoverridevirtual

perform in-place allreduce, on sendrecvbuf this function is NOT thread-safe

Parameters
sendrecvbuf_buffer for both sending and receiving data
type_nbytesthe unit number of bytes the type have
countnumber of elements to be reduced
reducerreduce function
prepare_funcLazy preprocessing function, lazy prepare_fun(prepare_arg) will be called by the function before performing Allreduce, to initialize the data in sendrecvbuf_. If the result of Allreduce can be recovered directly, then prepare_func will NOT be called
prepare_argargument used to passed into the lazy preprocessing function

Implements rabit::engine::IEngine.

Reimplemented in rabit::engine::AllreduceMock.

◆ Broadcast()

void rabit::engine::AllreduceBase::Broadcast ( void *  sendrecvbuf_,
size_t  total_size,
int  root 
)
inlineoverridevirtual

broadcast data from root to all nodes

Parameters
sendrecvbuf_buffer for both sending and receiving data
sizethe size of the data to be broadcasted
rootthe root worker id to broadcast the data
_filecaller file name used to generate unique cache key
_linecaller line number used to generate unique cache key
_callercaller function name used to generate unique cache key

Implements rabit::engine::IEngine.

Reimplemented in rabit::engine::AllreduceMock.

◆ CheckPoint()

void rabit::engine::AllreduceBase::CheckPoint ( )
inlineoverridevirtual

Increase internal version number. Deprecated.

Implements rabit::engine::IEngine.

Reimplemented in rabit::engine::AllreduceMock.

◆ ConnectTracker()

xgboost::collective::TCPSocket rabit::engine::AllreduceBase::ConnectTracker ( ) const
protected

initialize connection to the tracker

Returns
a socket that initializes the connection

◆ GetHost()

std::string rabit::engine::AllreduceBase::GetHost ( void  ) const
inlineoverridevirtual

get rank

Implements rabit::engine::IEngine.

◆ GetRank()

int rabit::engine::AllreduceBase::GetRank ( void  ) const
inlineoverridevirtual

get rank

Implements rabit::engine::IEngine.

◆ GetRingPrevRank()

int rabit::engine::AllreduceBase::GetRingPrevRank ( void  ) const
inlineoverridevirtual

get rank of previous node in ring topology

Implements rabit::engine::IEngine.

◆ GetWorldSize()

int rabit::engine::AllreduceBase::GetWorldSize ( void  ) const
inlineoverridevirtual

get rank

Implements rabit::engine::IEngine.

◆ IsDistributed()

bool rabit::engine::AllreduceBase::IsDistributed ( void  ) const
inlineoverridevirtual

whether is distributed or not

Implements rabit::engine::IEngine.

◆ LoadCheckPoint()

int rabit::engine::AllreduceBase::LoadCheckPoint ( )
inlineoverridevirtual

deprecated

See also
CheckPoint, VersionNumber

Implements rabit::engine::IEngine.

Reimplemented in rabit::engine::AllreduceMock.

◆ ReConnectLinks()

bool rabit::engine::AllreduceBase::ReConnectLinks ( const char *  cmd = "start")
protected

connect to the tracker to fix the the missing links this function is also used when the engine start up

Parameters
cmdpossible command to sent to tracker

◆ ReportError()

ReturnType rabit::engine::AllreduceBase::ReportError ( LinkRecord link,
ReturnType  err 
)
inlineprotected

function used to report error when a link goes wrong

Parameters
linkthe pointer to the link who causes the error
errthe error type

◆ SetParam()

void rabit::engine::AllreduceBase::SetParam ( const char *  name,
const char *  val 
)
virtual

set parameters to the engine

Parameters
nameparameter name
valparameter value

Reimplemented in rabit::engine::AllreduceMock.

◆ TrackerPrint()

void rabit::engine::AllreduceBase::TrackerPrint ( const std::string &  msg)
overridevirtual

print the msg in the tracker, this function can be used to communicate the information of the progress to the user who monitors the tracker

Parameters
msgmessage to be printed in the tracker

Implements rabit::engine::IEngine.

◆ TryAllgatherRing()

AllreduceBase::ReturnType rabit::engine::AllreduceBase::TryAllgatherRing ( void *  sendrecvbuf_,
size_t  total_size,
size_t  slice_begin,
size_t  slice_end,
size_t  size_prev_slice 
)
protected

internal Allgather function, each node have a segment of data in the ring of sendrecvbuf, the data provided by current node k is [slice_begin, slice_end), the next node's segment must start with slice_end after the call of Allgather, sendrecvbuf_ contains all the contents including all segments use a ring based algorithm

Parameters
sendrecvbuf_buffer for both sending and receiving data, it is a ring conceptually
total_sizetotal size of data to be gathered
slice_beginbeginning of the current slice
slice_endend of the current slice
size_prev_slicesize of the previous slice i.e. slice of node (rank - 1) % world_size
Returns
this function can return kSuccess, kSockError, kGetExcept, see ReturnType for details
See also
ReturnType
Parameters
sendrecvbuf_buffer for both sending and receiving data, it is a ring conceptually
total_sizetotal size of data to be gathered
slice_beginbeginning of the current slice
slice_endend of the current slice
size_prev_slicesize of the previous slice i.e. slice of node (rank - 1) % world_size

◆ TryAllreduce()

AllreduceBase::ReturnType rabit::engine::AllreduceBase::TryAllreduce ( void *  sendrecvbuf_,
size_t  type_nbytes,
size_t  count,
ReduceFunction  reducer 
)
protected

perform in-place allreduce, on sendrecvbuf, this function can fail, and will return the cause of failure

NOTE on Allreduce: The kSuccess TryAllreduce does NOT mean every node have successfully finishes TryAllreduce. It only means the current node get the correct result of Allreduce. However, it means every node finishes LAST call(instead of this one) of Allreduce/Bcast

Parameters
sendrecvbuf_buffer for both sending and receiving data
type_nbytesthe unit number of bytes the type have
countnumber of elements to be reduced
reducerreduce function
Returns
this function can return kSuccess, kSockError, kGetExcept, see ReturnType for details
See also
ReturnType

◆ TryAllreduceRing()

AllreduceBase::ReturnType rabit::engine::AllreduceBase::TryAllreduceRing ( void *  sendrecvbuf_,
size_t  type_nbytes,
size_t  count,
ReduceFunction  reducer 
)
protected

perform in-place allreduce, on sendrecvbuf use a ring based algorithm, reduce-scatter + allgather

perform in-place allreduce, on sendrecvbuf use a ring based algorithm

Parameters
sendrecvbuf_buffer for both sending and receiving data
type_nbytesthe unit number of bytes the type have
countnumber of elements to be reduced
reducerreduce function
Returns
this function can return kSuccess, kSockError, kGetExcept, see ReturnType for details
See also
ReturnType

◆ TryAllreduceTree()

AllreduceBase::ReturnType rabit::engine::AllreduceBase::TryAllreduceTree ( void *  sendrecvbuf_,
size_t  type_nbytes,
size_t  count,
ReduceFunction  reducer 
)
protected

perform in-place allreduce, on sendrecvbuf, this function implements tree-shape reduction

Parameters
sendrecvbuf_buffer for both sending and receiving data
type_nbytesthe unit number of bytes the type have
countnumber of elements to be reduced
reducerreduce function
Returns
this function can return kSuccess, kSockError, kGetExcept, see ReturnType for details
See also
ReturnType

◆ TryBroadcast()

AllreduceBase::ReturnType rabit::engine::AllreduceBase::TryBroadcast ( void *  sendrecvbuf_,
size_t  total_size,
int  root 
)
protected

broadcast data from root to all nodes, this function can fail,and will return the cause of failure

Parameters
sendrecvbuf_buffer for both sending and receiving data
sizethe size of the data to be broadcasted
rootthe root worker id to broadcast the data
Returns
this function can return kSuccess, kSockError, kGetExcept, see ReturnType for details
See also
ReturnType
Parameters
sendrecvbuf_buffer for both sending and receiving data
total_sizethe size of the data to be broadcasted
rootthe root worker id to broadcast the data
Returns
this function can return kSuccess, kSockError, kGetExcept, see ReturnType for details
See also
ReturnType

◆ TryReduceScatterRing()

AllreduceBase::ReturnType rabit::engine::AllreduceBase::TryReduceScatterRing ( void *  sendrecvbuf_,
size_t  type_nbytes,
size_t  count,
ReduceFunction  reducer 
)
protected

perform in-place allreduce, reduce on the sendrecvbuf,

perform in-place allreduce, on sendrecvbuf, this function can fail, and will return the cause of failure

after the function, node k get k-th segment of the reduction result the k-th segment is defined by [k * step, min((k + 1) * step,count) ) where step = ceil(count / world_size)

Parameters
sendrecvbuf_buffer for both sending and receiving data
type_nbytesthe unit number of bytes the type have
countnumber of elements to be reduced
reducerreduce function
Returns
this function can return kSuccess, kSockError, kGetExcept, see ReturnType for details
See also
ReturnType, TryAllreduce

Ring-based algorithm

Parameters
sendrecvbuf_buffer for both sending and receiving data
type_nbytesthe unit number of bytes the type have
countnumber of elements to be reduced
reducerreduce function
Returns
this function can return kSuccess, kSockError, kGetExcept, see ReturnType for details
See also
ReturnType, TryAllreduce

◆ VersionNumber()

int rabit::engine::AllreduceBase::VersionNumber ( void  ) const
inlineoverridevirtual
Returns
version number of current stored model, which means how many calls to CheckPoint we made so far
See also
LoadCheckPoint, CheckPoint

Implements rabit::engine::IEngine.


The documentation for this class was generated from the following files: