Merge pull request #3 from parasailteam/across_node_setup

Across node setup
This commit is contained in:
Saeed Maleki 2021-03-31 18:00:08 -07:00 коммит произвёл GitHub
Родитель 102a204327 80ba01581c
Коммит b425f1cd91
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
25 изменённых файлов: 351 добавлений и 142 удалений

Просмотреть файл

@ -206,3 +206,8 @@ class ncclFunction<ncclFuncAllGather, NCCL_ALGO_COLLNET, PROTO, FUNC, T, UNROLL>
__device__ void run(struct ncclWorkElem* args) {}
};
template<int PROTO, class FUNC, typename T, int UNROLL>
class ncclFunction<ncclFuncAllGather, NCCL_ALGO_SCKL, PROTO, FUNC, T, UNROLL> {
public:
__device__ void run(struct ncclWorkElem* args) {}
};

Просмотреть файл

@ -602,3 +602,9 @@ class ncclFunction<ncclFuncAllReduce, NCCL_ALGO_COLLNET, NCCL_PROTO_LL128, FUNC,
public:
__device__ void run(struct ncclWorkElem* args) { }
};
template<int PROTO, class FUNC, typename T, int UNROLL>
class ncclFunction<ncclFuncAllReduce, NCCL_ALGO_SCKL, PROTO, FUNC, T, UNROLL> {
public:
__device__ void run(struct ncclWorkElem* args) {}
};

Просмотреть файл

@ -14,28 +14,49 @@ class ncclFunction<ncclFuncAllToAll, ALGO, PROTO, FUNC, T, UNROLL> {
__device__ void run(struct ncclWorkElem* args) {
const int tid = threadIdx.x;
const int nthreads = args->nThreads-WARP_SIZE;
const int bid = args->coll.bid;
const int bid = blockIdx.x;
const int nChannels = args->coll.nChannels;
struct ncclDevComm* comm = args->comm;
struct ncclChannel* channel = comm->channels+blockIdx.x;
struct scklGraph* sGraph = &channel->sGraph;
const int scklNumBlocksPerChannel = args->scklNumBlocksPerChannel;
const int channelId = bid/scklNumBlocksPerChannel;
struct ncclChannel* channel = comm->channels+channelId;
// relative bid to a channel
int rbid = bid % scklNumBlocksPerChannel;
struct scklAlgorithm* scklAlgo = &comm->scklAlgo;
struct scklThreadBlock* sckltb = &scklAlgo->scklTB[rbid];
const int stepSize = comm->buffSizes[NCCL_PROTO_SIMPLE] / (sizeof(T)*NCCL_STEPS);
const int chunkSize = stepSize * ALLTOALL_CHUNKSTEPS;
const int nranks = comm->nRanks;
const ssize_t loopSize = nChannels*(ssize_t)chunkSize;
const ssize_t size = args->coll.count;
return;
const int nChunks = scklAlgo->nChunks;
// assume that size is divisible by nchunks
const ssize_t sizePerChunk = size/nChunks;
// Compute pointers
const T * __restrict__ thisInput = (const T*)args->sendbuff;
T * __restrict__ thisOutput = (T*)args->recvbuff;
ncclPrimitives<UNROLL, ALLGATHER_CHUNKSTEPS/ALLGATHER_SLICESTEPS, ALLGATHER_SLICESTEPS, T, 3, 3, 1, FUNC>
prims(tid, nthreads, sGraph->recv, sGraph->send, thisOutput, stepSize, channel, comm, ncclShmem->ptrs, 0);
if (tid == 0 && bid == 0){
printf("connected to %d %d %d\n", sGraph->send[0], sGraph->send[1], sGraph->send[2]);
int myRank = channel->ring.devUserRanks[0];
int m1 = -1;
int recvPeer = (sckltb->type == SCKL_RECV) ? sckltb->peer : m1;
int sendPeer = (sckltb->type == SCKL_SEND) ? sckltb->peer : m1;
ncclPrimitives<UNROLL, ALLTOALL_CHUNKSTEPS/ALLTOALL_SLICESTEPS, ALLTOALL_SLICESTEPS, T, 1, 1, 1, FUNC>
prims(tid, nthreads, &recvPeer, &sendPeer, thisOutput, stepSize, channel, comm, ncclShmem->ptrs, 0);
for (ssize_t gridOffset = 0; gridOffset < sizePerChunk; gridOffset += loopSize) {
int realChunkSize = min(chunkSize, DIVUP(sizePerChunk-gridOffset,nChannels));
ALIGN_SIZE(realChunkSize, nthreads*sizeof(uint64_t)/sizeof(T));
ssize_t chunkOffset = gridOffset + channelId*realChunkSize;
ssize_t offset;
int nelem = min(realChunkSize, sizePerChunk-chunkOffset);
for (int i = 0; i < sckltb->nsteps; i++){
offset = chunkOffset + sckltb->transfers[i] * sizePerChunk;
if (sckltb->type == SCKL_SEND){
prims.directSend(thisInput + offset, offset, nelem);
} else if (sckltb->type == SCKL_RECV) {
prims.directRecv(thisOutput + offset, offset, nelem);
}
}
}
int testSize = min(chunkSize, (int)size/nChannels/nranks);
prims.directSend(thisInput, 0, testSize);
prims.directRecv(thisOutput, 0, testSize);
return;
}
};

Просмотреть файл

@ -160,3 +160,9 @@ class ncclFunction<ncclFuncBroadcast, NCCL_ALGO_COLLNET, PROTO, REDOP, T, UNROLL
public:
__device__ void run(struct ncclWorkElem* args) {}
};
template<int PROTO, class REDOP, typename T, int UNROLL>
class ncclFunction<ncclFuncBroadcast, NCCL_ALGO_SCKL, PROTO, REDOP, T, UNROLL> {
public:
__device__ void run(struct ncclWorkElem* args) {}
};

Просмотреть файл

@ -10,7 +10,6 @@
#include "collectives.h"
#include "devcomm.h"
#if __CUDA_ARCH__ >= 800
#define COLL_UNROLL 8
#define NCCL_MAX_DEV_ARITY (NCCL_MAX_TREE_ARITY-1) // Using balanced tree instead of split tree
@ -41,13 +40,13 @@ static __device__ void load_parallel(void* dst, void* src, size_t size, int tid)
int* s = (int*)src;
for (int o = tid; o < (size/sizeof(int)); o += blockDim.x) d[o] = s[o];
}
static __device__ void load_coll(struct ncclWork* localWork, struct ncclWork* hostWork, int tid, struct ncclDevComm* comm) {
static __device__ void load_coll(struct ncclWork* localWork, struct ncclWork* hostWork, int tid, struct ncclDevComm* comm, int rbid) {
__syncthreads();
load_parallel(localWork, hostWork, sizeof(struct ncclWork), tid);
// Check whether the last operation was aborted and make sure all threads exit
int abort = tid == 0 ? *(comm->abortFlag) : 0;
exitIfAbortBarrier(abort);
if (tid == 0) hostWork->elems[0].active = 0;
if (tid == 0) hostWork->elems[0].active[rbid] = 0;
}
template <ncclFunc_t FUNCTION, int ALGO, int PROTO, class REDOP, typename T, int UNROLL>
@ -80,17 +79,22 @@ __device__ void ncclKernel(struct ncclWorkElem first) {
auto f = ncclFunction<FUNCTION, ALGO, PROTO, REDOP, T, UNROLL>();
struct ncclDevComm* comm = first.comm;
struct ncclChannel* channel = comm->channels+bid;
// SCKL: this needs to be changed such that a mixture of SCKL and NCCL can be handled
const int scklNumBlocksPerChannel = first.scklNumBlocksPerChannel;
int channelId = bid / scklNumBlocksPerChannel;
int rbid = bid % scklNumBlocksPerChannel;
struct ncclChannel* channel = comm->channels+channelId;
struct ncclWorkElem* w = NULL;
uint16_t index = first.index;
/* To optimize for latency, (only) the first operation is passed as argument.*/
if (bid == 0 && first.funcIndex != FUNC_INDEX_P2P) w = &first;
if (channelId == 0 && first.funcIndex != FUNC_INDEX_P2P) w = &first;
while (1) {
if (w == NULL) {
w = shmem.localWork.elems;
load_coll(&shmem.localWork, channel->workFifo+index, tid, comm);
load_coll(&shmem.localWork, channel->workFifo+index, tid, comm, rbid);
}
if (tid < w->nThreads) {
if (w->funcIndex == FINDEX) {
@ -100,7 +104,7 @@ __device__ void ncclKernel(struct ncclWorkElem first) {
}
}
index = (index+1) % NCCL_MAX_OPS;
if (w->active == 2) {
if (w->active[rbid] == 2) {
return;
}
w = NULL;
@ -134,6 +138,7 @@ __device__ void NCCL_FUNC_NAME(func, algo, proto, redop, type)(struct ncclWorkEl
#define IMPL_COLL3(func, redop, type, ncclType) \
IMPL_COLL4(func, TREE, redop, type, ncclType) \
IMPL_COLL4(func, RING, redop, type, ncclType) \
IMPL_COLL4(func, SCKL, redop, type, ncclType) \
IMPL_COLL4(func, COLLNET, redop, type, ncclType)
#if NCCL_TYPE == 0
@ -180,4 +185,4 @@ __device__ void NCCL_FUNC_NAME(func, algo, proto, redop, type)(struct ncclWorkEl
#define IMPL_COLL_P(func)
#endif
#endif
#endif

Просмотреть файл

@ -18,7 +18,8 @@ __device__ struct ncclShmemData* ncclShmem;
#define NCCL_FUNC4(func, redop, type) \
NCCL_FUNC5(func, TREE, redop, type), \
NCCL_FUNC5(func, RING, redop, type), \
NCCL_FUNC5(func, COLLNET, redop, type)
NCCL_FUNC5(func, COLLNET, redop, type), \
NCCL_FUNC5(func, SCKL, redop, type)
// Must be consistent with ncclDataType_t
#define NCCL_FUNCS3A(func, redop) \

Просмотреть файл

@ -137,7 +137,6 @@ class ncclPrimitives {
int offset = 0;
int sliceSize = stepSize*SLICESTEPS;
int dataSize = max(DIVUP(nelem, 16*SLICESPERCHUNK)*16, sliceSize/32);
#pragma unroll
for (int slice=0; slice<SLICESPERCHUNK; ++slice) {
int realSize = max(0, min(dataSize, nelem-offset));

Просмотреть файл

@ -150,3 +150,9 @@ class ncclFunction<ncclFuncReduce, NCCL_ALGO_COLLNET, PROTO, REDOP, T, UNROLL> {
public:
__device__ void run(struct ncclWorkElem* args) {}
};
template<int PROTO, class REDOP, typename T, int UNROLL>
class ncclFunction<ncclFuncReduce, NCCL_ALGO_SCKL, PROTO, REDOP, T, UNROLL> {
public:
__device__ void run(struct ncclWorkElem* args) {}
};

Просмотреть файл

@ -194,3 +194,9 @@ class ncclFunction<ncclFuncReduceScatter, NCCL_ALGO_COLLNET, PROTO, REDOP, T, UN
public:
__device__ void run(struct ncclWorkElem* args) {}
};
template<int PROTO, class REDOP, typename T, int UNROLL>
class ncclFunction<ncclFuncReduceScatter, NCCL_ALGO_SCKL, PROTO, REDOP, T, UNROLL> {
public:
__device__ void run(struct ncclWorkElem* args) {}
};

Просмотреть файл

@ -17,7 +17,8 @@
#define NCCL_FUNC4(func, redop, type) \
(void*)NCCL_FUNC5(func, TREE, redop, type), \
(void*)NCCL_FUNC5(func, RING, redop, type), \
(void*)NCCL_FUNC5(func, COLLNET, redop, type)
(void*)NCCL_FUNC5(func, COLLNET, redop, type), \
(void*)NCCL_FUNC5(func, SCKL, redop, type)
// Must be consistent with ncclDataType_t
#define NCCL_FUNCS3A(func, redop) \
@ -96,12 +97,18 @@ static ncclResult_t getNextOp(struct ncclChannel* channel, struct ncclWork** wor
int opIndex = channel->workFifoTail%NCCL_MAX_OPS;
struct ncclWork* w = channel->workFifo+opIndex;
struct ncclWorkElem* e = w->elems;
volatile uint8_t* activePtr = (volatile uint8_t*)&e->active;
while (activePtr[0] != 0) sched_yield();
// SCKL replicates active, make sure all of them are 0
for (int i=0; i<e->scklNumBlocksPerChannel; i++){
volatile uint16_t* activePtr = (volatile uint16_t*)&e->active[i];
while (activePtr[0] != 0) sched_yield();
}
memset(w, 0, sizeof(struct ncclWork));
// Initialize with work elem if provided
if (base) memcpy(e, base, sizeof(struct ncclWorkElem));
e->active = 1;
for (int i=0; i<base->scklNumBlocksPerChannel; i++){
e->active[i] = 1;
}
e->index = opIndex;
channel->workFifoTail++;
channel->workCount++;
@ -116,6 +123,8 @@ static ncclResult_t setupLaunch(struct ncclComm* comm, struct cudaLaunchParams*
}
// Set active = 2 for the last operation and add a no-op on empty channels (p2p case).
// SCKL: this needs to be set properly as different work items can have different scklNumBlocksPerChannel
int scklNumBlocksPerChannel = 1;
for (int c=0; c<params->gridDim.x; c++) {
struct ncclChannel* channel = comm->channels+c;
if (channel->workCount == 0) {
@ -126,9 +135,18 @@ static ncclResult_t setupLaunch(struct ncclComm* comm, struct cudaLaunchParams*
e->funcIndex = FUNC_INDEX_P2P;
e->p2p.nThreads = 0;
}
channel->workFifo[(channel->workFifoTail-1)%NCCL_MAX_OPS].elems[0].active = 2;
int channelTailIndex = ((channel->workFifoTail-1)%NCCL_MAX_OPS);
scklNumBlocksPerChannel = channel->workFifo[channelTailIndex].elems[0].scklNumBlocksPerChannel;
for (int i=0; i<channel->workFifo[channelTailIndex].elems[0].scklNumBlocksPerChannel; i++) {
channel->workFifo[channelTailIndex].elems[0].active[i] = 2;
}
}
// This is the first time SCKL disassociates bids and channels
// SCKL for now we are assuming scklNumBlocksPerChannel is the same for each channel.
// multiply the number of threadblocks by scklNumBlocksPerChannel
params->gridDim.x *= scklNumBlocksPerChannel;
// Find the first operation, choose the kernel accordingly and pass it
// as the first argument.
struct ncclChannel* c0 = comm->channels;
@ -136,7 +154,10 @@ static ncclResult_t setupLaunch(struct ncclComm* comm, struct cudaLaunchParams*
struct ncclWorkElem* elem = work->elems;
memcpy(&comm->args, elem, sizeof(struct ncclWorkElem));
// As we inline the first coll directly, we can free it immediately.
if (elem->funcIndex != FUNC_INDEX_P2P) elem->active = 0;
if (elem->funcIndex != FUNC_INDEX_P2P){
for (int i=0; i<elem->scklNumBlocksPerChannel; i++)
elem->active[i] = 0;
}
params->func = ncclKerns[elem->funcIndex];
return ncclSuccess;
@ -239,6 +260,12 @@ ncclResult_t ncclBarrierEnqueueWait(ncclComm_t comm) {
// launch and the ncclProxyStart call could cause a deadlock.
// Also, starting the proxies after the CUDA launch seems to be better for
// performance (latency).
// try to find how many extra threadblocks were allocated for SCKL and adjust gridDim.x
int channelTailIndex = ((comm->channels[0].workFifoTail-1)%NCCL_MAX_OPS);
int scklNumBlocksPerChannel = comm->channels[0].workFifo[channelTailIndex].elems[0].scklNumBlocksPerChannel;
params->gridDim.x /= scklNumBlocksPerChannel;
uint64_t max = 0ULL;
for (int r=0; r<params->gridDim.x; r++) {
struct ncclChannel* channel = comm->channels+r;
@ -319,6 +346,7 @@ static ncclResult_t getAlgoInfo(struct ncclInfo* info) {
}
static ncclResult_t getPatternInfo(struct ncclInfo* info) {
switch (info->coll) {
case ncclFuncBroadcast:
info->pattern = info->algorithm == NCCL_ALGO_TREE ? ncclPatternTreeDown : ncclPatternPipelineFrom; break;
@ -330,7 +358,7 @@ static ncclResult_t getPatternInfo(struct ncclInfo* info) {
case ncclFuncAllReduce:
info->pattern = info->algorithm == NCCL_ALGO_COLLNET ? ncclPatternCollTreeUp : info->algorithm == NCCL_ALGO_TREE ? ncclPatternTreeUpDown : ncclPatternRingTwice; break;
case ncclFuncAllToAll:
info->pattern = ncclPatternRing; break; // To be change by SCKL
info->pattern = ncclPatternSckl; break;
default:
WARN("Unknown pattern for collective %d algorithm %d", info->coll, info->algorithm);
return ncclInternalError;
@ -352,6 +380,9 @@ static ncclResult_t getLoopInfo(struct ncclInfo* info) {
info->nstepsPerLoop = info->comm->nRanks-1; info->nchunksPerLoop = info->comm->nRanks; break;
case ncclPatternRingTwice:
info->nstepsPerLoop = 2*(info->comm->nRanks-1); info->nchunksPerLoop = info->comm->nRanks; break;
case ncclPatternSckl:
// SCKL needs a specific number of steps per loop for each connection. it is set properly in ncclProxySaveColl
info->nstepsPerLoop = 1; info->nchunksPerLoop = info->comm->nRanks * info->comm->scklAlgo.nChunks; break;
default:
WARN("Unknown pattern %d", info->pattern);
return ncclInternalError;
@ -377,8 +408,8 @@ static ncclResult_t computeColl(struct ncclInfo* info /* input */, struct ncclWo
work->funcIndex = FUNC_INDEX(info->coll, info->op, info->datatype, info->algorithm, info->protocol);
int stepSize = info->comm->buffSizes[info->protocol]/NCCL_STEPS;
int chunkSteps = (info->protocol == NCCL_PROTO_SIMPLE && info->algorithm == NCCL_ALGO_RING) ? info->chunkSteps : 1;
int sliceSteps = (info->protocol == NCCL_PROTO_SIMPLE && info->algorithm == NCCL_ALGO_RING) ? info->sliceSteps : 1;
int chunkSteps = (info->protocol == NCCL_PROTO_SIMPLE && ((info->algorithm == NCCL_ALGO_RING) || (info->algorithm == NCCL_ALGO_SCKL))) ? info->chunkSteps : 1;
int sliceSteps = (info->protocol == NCCL_PROTO_SIMPLE && ((info->algorithm == NCCL_ALGO_RING) || (info->algorithm == NCCL_ALGO_SCKL))) ? info->sliceSteps : 1;
int chunkSize = stepSize*chunkSteps;
// Compute lastChunkSize
@ -419,8 +450,9 @@ static ncclResult_t computeColl(struct ncclInfo* info /* input */, struct ncclWo
if (info->protocol == NCCL_PROTO_LL) chunkEffectiveSize /= 2;
if (info->protocol == NCCL_PROTO_LL128) chunkEffectiveSize = (chunkSize / NCCL_LL128_LINEELEMS) * NCCL_LL128_DATAELEMS;
//if (info->comm->rank == 0) printf("Coll %d, size %ld -> %dx%d, chunkSize %d (algo %d proto%d)\n", info->coll, info->nBytes, info->nChannels, info->nThreads, chunkSize, info->algorithm, info->protocol);
int nLoops = (int)(DIVUP(info->nBytes, (((size_t)(info->nChannels))*info->nchunksPerLoop*chunkEffectiveSize)));
proxyArgs->nsteps = info->nstepsPerLoop * nLoops * chunkSteps;
proxyArgs->nLoops = (int)(DIVUP(info->nBytes, (((size_t)(info->nChannels))*info->nchunksPerLoop*chunkEffectiveSize)));
// nstepsPerloop for sckl is incorrect and will be adjusted in ncclProxySaveColl
proxyArgs->nsteps = info->nstepsPerLoop * proxyArgs->nLoops * chunkSteps;
proxyArgs->sliceSteps = sliceSteps;
proxyArgs->chunkSteps = chunkSteps;
proxyArgs->protocol = info->protocol;
@ -433,7 +465,7 @@ static ncclResult_t computeColl(struct ncclInfo* info /* input */, struct ncclWo
TRACE(NCCL_NET,"opCount %lx slicesteps %d spl %d cpl %d nbytes %zi -> protocol %d nchannels %d nthreads %d, nloops %d nsteps %d comm %p",
proxyArgs->opCount, proxyArgs->sliceSteps, info->nstepsPerLoop, info->nchunksPerLoop, info->nBytes, info->protocol, info->nChannels, info->nThreads,
nLoops, proxyArgs->nsteps, info->comm);
proxyArgs->nLoops, proxyArgs->nsteps, info->comm);
return ncclSuccess;
}
@ -454,8 +486,19 @@ ncclResult_t ncclSaveKernel(struct ncclInfo* info) {
CUDACHECK(cudaMemcpyAsync(info->recvbuff, info->sendbuff, info->nBytes, cudaMemcpyDeviceToDevice, info->stream));
return ncclSuccess;
}
// Alltoall needs a local copy and it has no allocated channel/threadblock. the corresponding chunk is transferred here
if (info->coll == ncclFuncAllToAll){
if (info->sendbuff == info->recvbuff){
WARN("Alltoall needs separate receive and send buffers.");
return ncclInvalidArgument;
}
size_t nBytesPerRank = info->nBytes / info->comm->nRanks;
size_t rankOffset = info->comm->rank * nBytesPerRank;
CUDACHECK(cudaMemcpyAsync((int8_t*)info->recvbuff + rankOffset, (int8_t*)info->sendbuff + rankOffset, nBytesPerRank, cudaMemcpyDeviceToDevice, info->stream));
}
struct ncclWorkElem work;
// memset(&work, 0, sizeof(struct ncclWorkElem)); // setting all elements to 0 so that active (sckl) array is also 0 all the way.
struct ncclProxyArgs proxyArgs;
memset(&proxyArgs, 0, sizeof(struct ncclProxyArgs));
NCCLCHECK(computeColl(info, &work, &proxyArgs));
@ -468,7 +511,7 @@ ncclResult_t ncclSaveKernel(struct ncclInfo* info) {
for (int bid=0; bid<nChannels*nSubChannels; bid++) {
int channelId = info->comm->myParams->gridDim.x % info->comm->nChannels;
struct ncclChannel* channel = info->comm->channels+channelId;
work.scklNumBlocksPerChannel = (info->algorithm == NCCL_ALGO_SCKL) ? info->comm->scklAlgo.nBlocks : 1;
// Proxy
proxyArgs.channel = channel;
// Adjust pattern for CollNet based on channel index
@ -476,7 +519,7 @@ ncclResult_t ncclSaveKernel(struct ncclInfo* info) {
info->pattern = (channelId < info->comm->nChannels/nSubChannels) ? ncclPatternCollTreeUp : ncclPatternCollTreeDown;
}
if (proxyArgs.nsteps) NCCLCHECK(ncclProxySaveColl(&proxyArgs, info->pattern, info->root, info->comm->nRanks));
if (proxyArgs.nsteps) NCCLCHECK(ncclProxySaveColl(&proxyArgs, info->pattern, info->root, info->comm->nRanks, &info->comm->scklAlgo));
info->comm->myParams->gridDim.x++;
work.coll.bid = bid % nChannels;
@ -503,6 +546,7 @@ ncclResult_t ncclSaveCommKernels(ncclComm_t comm) {
while (comm->asyncTotalSize < channelSize * comm->nChannels && channelSize > NCCL_MIN_CHANNEL_SIZE) channelSize /= 2;
for (int c = 0; c < comm->asyncOpCount; c++) {
struct ncclInfo* info = comm->asyncOps+c;
// SCKL needs to adjust nChannels in the future
info->nChannels = std::min((int)DIVUP(info->nBytes, channelSize), comm->nChannels); // assign number of channels
NCCLCHECK(ncclSaveKernel(info));
}
@ -607,6 +651,8 @@ ncclResult_t ncclSaveP2pKernel(struct ncclInfo* info) {
NCCLCHECK(saveP2pOp(info, w, segment));
info->comm->myParams->gridDim.x = std::max<unsigned>(info->comm->myParams->gridDim.x, channelId+1);
info->comm->myParams->blockDim.x = std::max<unsigned>(info->comm->myParams->blockDim.x, info->nThreads);
// sckl does not generate p2p kernels.
w->elems[0].scklNumBlocksPerChannel = 1;
return ncclSuccess;
}

Просмотреть файл

@ -607,68 +607,99 @@ ncclResult_t ncclTopoGetLocalNet(struct ncclTopoSystem* system, int rank, int64_
return ncclSuccess;
}
ncclResult_t scklGetTopoFromXMLAndSetChannels(struct ncclComm* comm) {
ncclResult_t scklGetAlgoFromXMLAndSetComm(struct ncclComm* comm) {
char* str = getenv("SCKL_XML_FILE");
if (str){
INFO(NCCL_ENV, "SCKL_XML_FILE set by environment to %s", str);
struct ncclXml* xml;
NCCLCHECK(ncclCalloc(&xml, 1));
NCCLCHECK(scklTopoGetXmlGraphFromFile(str, xml));
NCCLCHECK(scklGetXmlAlgoFromFile(str, xml));
int rank = comm->rank;
for (int c=0; c<comm->nChannels; c++){
comm->channels[c].sGraph.nRecvPeers = 0;
comm->channels[c].sGraph.nSendPeers = 0;
}
struct scklAlgorithm* scklAlgo = &comm->scklAlgo;
// zeroing out all entries.
memset(scklAlgo, 0, sizeof(struct scklAlgorithm));
struct ncclXmlNode* topNode;
NCCLCHECK(xmlFindTag(xml, "system", &topNode));
NCCLCHECK(xmlFindTag(xml, "algo", &topNode));
int nchunks;
NCCLCHECK(xmlGetAttrInt(topNode, "nchunks", &nchunks));
scklAlgo->nChunks = nchunks;
for (int s=0; s<topNode->nSubs; s++) {
struct ncclXmlNode* node = topNode->subs[s];
if (strcmp(node->name, "gpu") == 0){
int id;
NCCLCHECK(xmlGetAttrInt(node, "id", &id));
if (id == rank){
for (int p=0; p<node->nSubs; p++) {
struct ncclXmlNode* typeOfComm = node->subs[p];
if (strcmp(typeOfComm->name, "conn") == 0){
scklAlgo->nBlocks = 0;
for (int t=0; t<node->nSubs; t++) {
struct ncclXmlNode* threadblockNode = node->subs[t];
if (strcmp(threadblockNode->name, "threadblock") == 0){
int rbid, peer;
const char* type;
NCCLCHECK(xmlGetAttrStr(typeOfComm, "type", &type));
bool isRecv = false;
bool isSend = false;
if (strcmp(type, "recv") == 0){
isRecv = true;
} else if (strcmp(type, "send") == 0){
isSend = true;
NCCLCHECK(xmlGetAttrInt(threadblockNode, "rbid", &rbid));
NCCLCHECK(xmlGetAttrInt(threadblockNode, "peer", &peer));
NCCLCHECK(xmlGetAttrStr(threadblockNode, "type", &type));
if (rbid >= SCKL_MAX_NUM_THREAD_BLOCKS_PER_CHANNEL){
WARN("Too many thread blocks are requested. Max thread blocks: %d, requested: %d", SCKL_MAX_NUM_THREAD_BLOCKS_PER_CHANNEL, rbid+1);
return ncclInternalError;
}
for (int p=0; p<typeOfComm->nSubs; p++) {
struct ncclXmlNode* peer = typeOfComm->subs[p];
int peerId;
NCCLCHECK(xmlGetAttrInt(peer, "id", &peerId));
// SCKL generates the same scklGraph for all channels for now. This will change in the future
for (int c=0; c<comm->nChannels; c++){
if (isRecv) {
if (comm->channels[c].sGraph.nRecvPeers < SCKL_MAX_NUM_CONN){
comm->channels[c].sGraph.recv[comm->channels[c].sGraph.nRecvPeers++] = peerId;
} else {
WARN("Too many recv connections for device %d channel %d -- connection to %d is ignored. This may cause deadlock in initialization.", rank, c, peerId);
}
} else if (isSend){
if (comm->channels[c].sGraph.nSendPeers < SCKL_MAX_NUM_CONN){
comm->channels[c].sGraph.send[comm->channels[c].sGraph.nSendPeers++] = peerId;
} else {
WARN("Too many recv connections for device %d channel %d -- connection to %d is ignored. This may cause deadlock in initialization.", rank, c, peerId);
}
if (rbid < 0){
WARN("rbid must be positive. rbid: %d", rbid);
return ncclInternalError;
}
scklAlgo->nBlocks = std::max(comm->scklAlgo.nBlocks, rbid+1);
struct scklThreadBlock* sTB = &scklAlgo->scklTB[rbid];
sTB->nsteps = 0;
sTB->peer = peer;
if (strcmp(type, "send") == 0){
sTB->type = SCKL_SEND;
} else if (strcmp(type, "recv") == 0) {
sTB->type = SCKL_RECV;
} else {
WARN("type of transfer is not supported: %s", type);
return ncclInternalError;
}
// setting all transfers to -1 so that the ones not set are passed during runtime.
for (int st=0; st<SCKL_MAX_NUM_STEPS; st++){
sTB->transfers[st] = -1;
}
int ntransfers = 0;
for (int st=0; st<threadblockNode->nSubs; st++) {
struct ncclXmlNode* stepNode = threadblockNode->subs[st];
if (strcmp(stepNode->name, "step") == 0){
int s, addr;
NCCLCHECK(xmlGetAttrInt(stepNode, "s", &s));
NCCLCHECK(xmlGetAttrInt(stepNode, "addr", &addr));
if (s >= SCKL_MAX_NUM_STEPS){
WARN("Too many steps are requested. Max number of steps: %d, requested: %d", SCKL_MAX_NUM_STEPS, s+1);
return ncclInternalError;
}
if (s < 0){
WARN("step must be positive: step %d", s);
return ncclInternalError;
}
sTB->nsteps = std::max(sTB->nsteps, (uint8_t)(s+1));
sTB->transfers[s] = addr;
ntransfers++;
}
}
// setting the summary of the sckl aglorithm
if (sTB->type == SCKL_SEND){
scklAlgo->sendPeers[scklAlgo->nsendPeers] = peer;
scklAlgo->nchunksForSendPeer[scklAlgo->nsendPeers] = ntransfers;
scklAlgo->nsendPeers++;
} else if (sTB->type == SCKL_RECV){
scklAlgo->recvPeers[scklAlgo->nrecvPeers] = peer;
scklAlgo->nchunksForRecvPeer[scklAlgo->nrecvPeers] = ntransfers;
scklAlgo->nrecvPeers++;
}
}
}
}
}
}
free(xml);
// free(xml);
}
return ncclSuccess;
}

Просмотреть файл

@ -53,7 +53,7 @@ ncclResult_t parseList(const char* str, const char* elems[], int nelems, int* li
// Latencies in us, Bandwidths in GB/s
// Tree { LL, LL128, Simple } , Ring { LL, LL128, Simple }
static const float baseLat [NCCL_NUM_ALGORITHMS][NCCL_NUM_PROTOCOLS] = { { 4.4, 4.4, 0 }, { 3.6, 10.0, 8.4 }, { 4.4, 4.4, 0 } };
static const float baseLat [NCCL_NUM_ALGORITHMS][NCCL_NUM_PROTOCOLS] = { { 4.4, 4.4, 0 }, { 3.6, 10.0, 8.4 }, { 4.4, 4.4, 0 }, { 0., 0., 0. } };
// NVLink, PCI, Network
#define NCCL_HW_NVLINK 0
@ -62,11 +62,11 @@ static const float baseLat [NCCL_NUM_ALGORITHMS][NCCL_NUM_PROTOCOLS] = { { 4.4,
// Tree/Simple is the latency a 256kB chunk, which is ~ base lat + 256k/12GB/s (+ 256k/12GB/s for the network).
static const float hwLat [3][NCCL_NUM_ALGORITHMS][NCCL_NUM_PROTOCOLS] =
{ /* NVLINK */
{ /* Tree (LL/LL128/Simple)*/ { .52, 1.25, 28 }, /* Ring (LL/LL128/Simple)*/ { .47, 1.9, 3.4 }, /* CollNet (LL/LL128/Simple)*/ { .5, 1.2, 4.0 } },
{ /* Tree (LL/LL128/Simple)*/ { .52, 1.25, 28 }, /* Ring (LL/LL128/Simple)*/ { .47, 1.9, 3.4 }, /* CollNet (LL/LL128/Simple)*/ { .5, 1.2, 4.0 }, /* SCKL (LL/LL128/Simple)*/ { 0., 0., 0. } },
/* PCI */
{ /* Tree (LL/LL128/Simple)*/ { 1.0, 1.9, 28 }, /* Ring (LL/LL128/Simple)*/ { 1.0, 2.5, 5.7 }, /* CollNet (LL/LL128/Simple)*/ { 1.0, 1.9, 5.5 } },
{ /* Tree (LL/LL128/Simple)*/ { 1.0, 1.9, 28 }, /* Ring (LL/LL128/Simple)*/ { 1.0, 2.5, 5.7 }, /* CollNet (LL/LL128/Simple)*/ { 1.0, 1.9, 5.5 }, /* SCKL (LL/LL128/Simple)*/ { 0., 0., 0. } },
/* NET */
{ /* Tree (LL/LL128/Simple)*/ { 5.0, 8.5, 28 }, /* Ring (LL/LL128/Simple)*/ { 2.7, 4.0, 9.6 }, /* CollNet (LL/LL128/Simple)*/ { 5.0, 5.0, 10.7 } }
{ /* Tree (LL/LL128/Simple)*/ { 5.0, 8.5, 28 }, /* Ring (LL/LL128/Simple)*/ { 2.7, 4.0, 9.6 }, /* CollNet (LL/LL128/Simple)*/ { 5.0, 5.0, 10.7 }, /* SCKL (LL/LL128/Simple)*/ { 0., 0., 0. } }
};
// LL128 max BW (per channel) for the different collectives
@ -77,13 +77,13 @@ static const double perChMaxTreeBws[2][3] = { /* Volta (N1/N2/N4) */ {26.5, 18.5
ncclResult_t ncclTopoTuneModel(struct ncclComm* comm, int minCompCap, int maxCompCap, struct ncclTopoGraph* treeGraph, struct ncclTopoGraph* ringGraph, struct ncclTopoGraph* collNetGraph) {
int simpleDefaultThreads = (ringGraph->speedIntra*ringGraph->nChannels <= PCI_WIDTH) ? 256 : NCCL_SIMPLE_MAX_NTHREADS;
comm->maxThreads[NCCL_ALGO_RING][NCCL_PROTO_SIMPLE] =
comm->maxThreads[NCCL_ALGO_RING][NCCL_PROTO_SIMPLE] = comm->maxThreads[NCCL_ALGO_SCKL][NCCL_PROTO_SIMPLE] =
getNthreads("NCCL_NTHREADS", ncclParamNthreads(), 2*WARP_SIZE, NCCL_SIMPLE_MAX_NTHREADS, simpleDefaultThreads);
comm->maxThreads[NCCL_ALGO_TREE][NCCL_PROTO_SIMPLE] = comm->maxThreads[NCCL_ALGO_COLLNET][NCCL_PROTO_SIMPLE] =
getNthreads("NCCL_NTHREADS", ncclParamNthreads(), 2*WARP_SIZE, NCCL_SIMPLE_MAX_NTHREADS, NCCL_SIMPLE_MAX_NTHREADS);
comm->maxThreads[NCCL_ALGO_RING][NCCL_PROTO_LL] = comm->maxThreads[NCCL_ALGO_TREE][NCCL_PROTO_LL] = comm->maxThreads[NCCL_ALGO_COLLNET][NCCL_PROTO_LL] =
comm->maxThreads[NCCL_ALGO_RING][NCCL_PROTO_LL] = comm->maxThreads[NCCL_ALGO_TREE][NCCL_PROTO_LL] = comm->maxThreads[NCCL_ALGO_COLLNET][NCCL_PROTO_LL] = comm->maxThreads[NCCL_ALGO_SCKL][NCCL_PROTO_LL] =
getNthreads("NCCL_NTHREADS", ncclParamNthreads(), 2*WARP_SIZE, NCCL_LL_MAX_NTHREADS, NCCL_LL_MAX_NTHREADS);
comm->maxThreads[NCCL_ALGO_RING][NCCL_PROTO_LL128] = comm->maxThreads[NCCL_ALGO_TREE][NCCL_PROTO_LL128] = comm->maxThreads[NCCL_ALGO_COLLNET][NCCL_PROTO_LL128] =
comm->maxThreads[NCCL_ALGO_RING][NCCL_PROTO_LL128] = comm->maxThreads[NCCL_ALGO_TREE][NCCL_PROTO_LL128] = comm->maxThreads[NCCL_ALGO_COLLNET][NCCL_PROTO_LL128] = comm->maxThreads[NCCL_ALGO_SCKL][NCCL_PROTO_LL128] =
getNthreads("NCCL_LL128_NTHREADS", ncclParamLl128Nthreads(), NCCL_LL128_MAX_NTHREADS/4, NCCL_LL128_MAX_NTHREADS, NCCL_LL128_MAX_NTHREADS);
int nNodes = comm->nNodes;
@ -100,7 +100,9 @@ ncclResult_t ncclTopoTuneModel(struct ncclComm* comm, int minCompCap, int maxCom
double perChMaxTreeBw = perChMaxTreeBws[compCap80][index2];
float ppn = (float)nRanks / nNodes; // if ppn < 2, then we are sending/receiving at the same GPU through the NIC, apply some bw discount
struct ncclTopoGraph* graphs[NCCL_NUM_ALGORITHMS] = { treeGraph, ringGraph, collNetGraph };
// SCKL has no topo graph and it is sharing the information with ring graph.
// SCKL algo is disabled by default and only can be used by setting NCCL_AGLO env. So the tuning does not matter at this point.
struct ncclTopoGraph* graphs[NCCL_NUM_ALGORITHMS] = { treeGraph, ringGraph, collNetGraph, ringGraph };
int intraHw[NCCL_NUM_ALGORITHMS], hw[NCCL_NUM_ALGORITHMS];
for (int a=0; a<NCCL_NUM_ALGORITHMS; a++) intraHw[a] = graphs[a]->typeIntra == LINK_NVL ? NCCL_HW_NVLINK : NCCL_HW_PCI;
for (int a=0; a<NCCL_NUM_ALGORITHMS; a++) hw[a] = nNodes == 1 ? intraHw[a] : NCCL_HW_NET;
@ -115,6 +117,22 @@ ncclResult_t ncclTopoTuneModel(struct ncclComm* comm, int minCompCap, int maxCom
// Skipping on SCKL algorithms here since SCKL tune the algorithm in the synthesizer.
for (int a=0; a<NCCL_NUM_ALGORITHMS; a++) {
if (coll == ncclFuncAllToAll || a == NCCL_ALGO_SCKL) {
// Only sckl alltoall functions with simple protocol is implemented
// SCKL algorithm is dynamic and busBw/latency can only be determined by the input XML algorithm. An analysis will be added later.
for (int p=0; p<NCCL_NUM_PROTOCOLS; p++) {
if (coll == ncclFuncAllToAll && a == NCCL_ALGO_SCKL && p == NCCL_PROTO_SIMPLE){
// Setting the bandwidth and latency values to 1.0 (some arbitrary value) so that they don't get skipped by ncclTopoGetAlgoTime
comm->bandwidths[coll][a][p] = 1.0;
comm->latencies[coll][a][p] = 1.0;
} else {
comm->bandwidths[coll][a][p] = 0.0; // This will make sure that sckl is not selected for any other scenario
comm->latencies[coll][a][p] = 0.0;
}
}
continue;
}
if (coll != ncclFuncAllReduce && a != NCCL_ALGO_RING) continue;
for (int p=0; p<NCCL_NUM_PROTOCOLS; p++) {
@ -166,7 +184,7 @@ ncclResult_t ncclTopoTuneModel(struct ncclComm* comm, int minCompCap, int maxCom
// Protocols/Algorithms enable/disable, and user overrides.
// All are enabled except ll128 which is enabled by default only in certain cases.
int protoEnable[NCCL_NUM_PROTOCOLS] = { 1, 2, 1 };
int algoEnable[NCCL_NUM_ALGORITHMS] = { 1, 1, 1 };
int algoEnable[NCCL_NUM_ALGORITHMS] = { 1, 1, 1, 0 }; // SCKL algorithms are disabled by default
const char *protoStr = getenv("NCCL_PROTO");
if (protoStr) {
@ -182,8 +200,8 @@ ncclResult_t ncclTopoTuneModel(struct ncclComm* comm, int minCompCap, int maxCom
if (comm->collNetSupport == 0) {
algoEnable[NCCL_ALGO_COLLNET] = 0;
// If user has hard set NCCL_ALGO=COLLNET, ignore it
if (algoEnable[NCCL_ALGO_RING] == 0 && algoEnable[NCCL_ALGO_TREE] == 0) {
algoEnable[NCCL_ALGO_RING] = algoEnable[NCCL_ALGO_TREE] = 1;
if (algoEnable[NCCL_ALGO_RING] == 0 && algoEnable[NCCL_ALGO_TREE] == 0 && algoEnable[NCCL_ALGO_SCKL] == 0) {
algoEnable[NCCL_ALGO_RING] = algoEnable[NCCL_ALGO_TREE] = algoEnable[NCCL_ALGO_SCKL] = 1;
if (comm->rank == 0) WARN("CollNet is not supported or fails to initialize, ignoring NCCL_ALGO=COLLNET");
}
}
@ -239,6 +257,7 @@ ncclResult_t ncclTopoTuneModel(struct ncclComm* comm, int minCompCap, int maxCom
char* str = getenv("NCCL_THREAD_THRESHOLDS");
if (str) {
INFO(NCCL_ENV, "NCCL_THREAD_THRESHOLDS set by environment to %s", str);
// TODO: SCKL needs such thresholds as well in the future.
ssize_t t[NCCL_NUM_ALGORITHMS][NCCL_NUM_PROTOCOLS] = {{ -2, -2, -2 }, { -2, -2, -2}};
sscanf(str, "%ld %ld %ld %ld %ld %ld", t[0], t[0]+1, t[0]+2, t[1], t[1]+1, t[1]+2);
for (int a=0; a<NCCL_NUM_ALGORITHMS; a++) {
@ -248,7 +267,7 @@ ncclResult_t ncclTopoTuneModel(struct ncclComm* comm, int minCompCap, int maxCom
}
}
INFO(NCCL_INIT, "threadThresholds %ld/%ld/%ld | %ld/%ld/%ld | %ld/%ld/%ld",
INFO(NCCL_INIT, "threadThresholds %ld/%ld/%ld | %ld/%ld/%ld | %ld/%ld/%ld | %ld/%ld/%ld",
comm->threadThresholds[NCCL_ALGO_TREE][NCCL_PROTO_LL],
comm->threadThresholds[NCCL_ALGO_TREE][NCCL_PROTO_LL128],
comm->threadThresholds[NCCL_ALGO_TREE][NCCL_PROTO_SIMPLE],
@ -257,7 +276,10 @@ ncclResult_t ncclTopoTuneModel(struct ncclComm* comm, int minCompCap, int maxCom
comm->threadThresholds[NCCL_ALGO_RING][NCCL_PROTO_SIMPLE],
comm->threadThresholds[NCCL_ALGO_COLLNET][NCCL_PROTO_LL],
comm->threadThresholds[NCCL_ALGO_COLLNET][NCCL_PROTO_LL128],
comm->threadThresholds[NCCL_ALGO_COLLNET][NCCL_PROTO_SIMPLE]);
comm->threadThresholds[NCCL_ALGO_COLLNET][NCCL_PROTO_SIMPLE],
comm->threadThresholds[NCCL_ALGO_SCKL][NCCL_PROTO_LL],
comm->threadThresholds[NCCL_ALGO_SCKL][NCCL_PROTO_LL128],
comm->threadThresholds[NCCL_ALGO_SCKL][NCCL_PROTO_SIMPLE]);
return ncclSuccess;
}

Просмотреть файл

@ -806,43 +806,36 @@ ncclResult_t ncclTopoGetXmlGraphFromFile(const char* xmlGraphFile, struct ncclXm
return ncclSuccess;
}
ncclResult_t scklTopoXmlPeerLoad(FILE* file, struct ncclXml* xml, struct ncclXmlNode* head) {
int id;
NCCLCHECK(xmlGetAttrInt(head, "id", &id));
struct xmlHandler handlers[] = { };
NCCLCHECK(xmlLoadSub(file, xml, head, handlers, 1));
ncclResult_t scklAlgoXmlStep(FILE* file, struct ncclXml* xml, struct ncclXmlNode* head) {
NCCLCHECK(xmlLoadSub(file, xml, head, NULL, 1));
return ncclSuccess;
}
ncclResult_t scklTopoXmlConnLoad(FILE* file, struct ncclXml* xmlGraph, struct ncclXmlNode* head) {
const char* type;
NCCLCHECK(xmlGetAttrStr(head, "type", &type));
struct xmlHandler handlers[] = { { "peer", scklTopoXmlPeerLoad } };
ncclResult_t scklAlgoXmlthreadblock(FILE* file, struct ncclXml* xmlGraph, struct ncclXmlNode* head) {
struct xmlHandler handlers[] = { { "step", scklAlgoXmlStep } };
NCCLCHECK(xmlLoadSub(file, xmlGraph, head, handlers, 1));
return ncclSuccess;
}
ncclResult_t scklTopoXmlGraphLoad(FILE* file, struct ncclXml* xmlGraph, struct ncclXmlNode* head) {
int id;
NCCLCHECK(xmlGetAttrInt(head, "id", &id));
struct xmlHandler handlers[] = { { "conn", scklTopoXmlConnLoad } };
ncclResult_t scklAlgoXmlGpu(FILE* file, struct ncclXml* xmlGraph, struct ncclXmlNode* head) {
struct xmlHandler handlers[] = { { "threadblock", scklAlgoXmlthreadblock } };
NCCLCHECK(xmlLoadSub(file, xmlGraph, head, handlers, 1));
return ncclSuccess;
}
ncclResult_t scklTopoXmlSystemLoad(FILE* file, struct ncclXml* xmlGraph, struct ncclXmlNode* head) {
struct xmlHandler handlers[] = { { "gpu", scklTopoXmlGraphLoad } };
ncclResult_t scklAlgoXmlLoad(FILE* file, struct ncclXml* xmlGraph, struct ncclXmlNode* head) {
struct xmlHandler handlers[] = { { "gpu", scklAlgoXmlGpu } };
NCCLCHECK(xmlLoadSub(file, xmlGraph, head, handlers, 1));
return ncclSuccess;
}
ncclResult_t scklTopoGetXmlGraphFromFile(const char* xmlGraphFile, struct ncclXml* xml) {
ncclResult_t scklGetXmlAlgoFromFile(const char* xmlGraphFile, struct ncclXml* xml) {
FILE* file = fopen(xmlGraphFile, "r");
if (file == NULL) {
WARN("Could not open XML SCKL graph file %s : %s", xmlGraphFile, strerror(errno));
return ncclSystemError;
}
struct xmlHandler handlers[] = { { "system", scklTopoXmlSystemLoad } };
struct xmlHandler handlers[] = { { "algo", scklAlgoXmlLoad } };
xml->maxIndex = 0;
NCCLCHECK(xmlLoadSub(file, xml, NULL, handlers, 1));
fclose(file);

Просмотреть файл

@ -11,8 +11,8 @@
// A few constraints to make the implementation easy
#define MAX_STR_LEN 255
#define MAX_ATTR_COUNT 16
#define MAX_SUBS 32
#define MAX_NODES 1024
#define MAX_SUBS 128
#define MAX_NODES 16384
#define NODE_TYPE_NONE 0
#define NODE_TYPE_OPEN 1
@ -43,7 +43,7 @@ ncclResult_t ncclTopoGetXmlFromFile(const char* xmlTopoFile, struct ncclXml* xml
ncclResult_t ncclTopoDumpXmlToFile(const char* xmlTopoFile, struct ncclXml* xml);
#define NCCL_GRAPH_XML_VERSION 1
ncclResult_t ncclTopoGetXmlGraphFromFile(const char* xmlGraphFile, struct ncclXml* xml);
ncclResult_t scklTopoGetXmlGraphFromFile(const char* xmlGraphFile, struct ncclXml* xml);
ncclResult_t scklGetXmlAlgoFromFile(const char* xmlGraphFile, struct ncclXml* xml);
/* Auto-detect functions */
ncclResult_t ncclTopoFillGpu(struct ncclXml* xml, const char* busId, struct ncclXmlNode** gpuNode);

Просмотреть файл

@ -6,6 +6,8 @@
#ifndef NCCL_CHECKS_H_
#define NCCL_CHECKS_H_
#include <thread>
#include <chrono>
#include "debug.h"
@ -51,7 +53,21 @@
} \
} while(true)
// #define DEBUG
#ifdef DEBUG
// Propagate errors up
#define NCCLCHECK(call) do { \
ncclResult_t res = call; \
if (res != ncclSuccess) { \
/* Print the back trace*/ \
printf("FAILURE %s %d\n", __FILE__, __LINE__); \
std::this_thread::sleep_for(std::chrono::seconds{3600}); \
if (ncclDebugNoWarn == 0) INFO(NCCL_ALL,"%s:%d -> %d", __FILE__, __LINE__, res); \
return res; \
} \
} while (0);
#else
#define NCCLCHECK(call) do { \
ncclResult_t res = call; \
if (res != ncclSuccess) { \
@ -60,6 +76,8 @@
return res; \
} \
} while (0);
#endif
#define NCCLCHECKGOTO(call, res, label) do { \
res = call; \

Просмотреть файл

@ -32,7 +32,8 @@
#define DECL3(func, redop, type) \
DECL4(func, RING, redop, type) \
DECL4(func, TREE, redop, type) \
DECL4(func, COLLNET, redop, type)
DECL4(func, COLLNET, redop, type) \
DECL4(func, SCKL, redop, type)
#define DECL2(func, redop) \
DECL3(func, redop, int8_t) \

Просмотреть файл

@ -58,6 +58,7 @@ struct ncclRecvMem {
struct ncclComm {
struct ncclChannel channels[MAXCHANNELS];
struct scklAlgorithm scklAlgo;
struct ncclPeerInfo* peerInfo;
struct ncclTopoSystem* topo;

Просмотреть файл

@ -15,10 +15,11 @@
typedef enum { ncclFuncBroadcast, ncclFuncReduce, ncclFuncAllGather, ncclFuncReduceScatter, ncclFuncAllReduce, ncclFuncAllToAll, ncclFuncSendRecv} ncclFunc_t;
extern const char* ncclFuncStr[NCCL_NUM_FUNCTIONS];
#define NCCL_NUM_ALGORITHMS 3 // Tree/Ring/CollNet
#define NCCL_NUM_ALGORITHMS 4 // Tree/Ring/CollNet
#define NCCL_ALGO_TREE 0
#define NCCL_ALGO_RING 1
#define NCCL_ALGO_COLLNET 2
#define NCCL_ALGO_SCKL 2
#define NCCL_ALGO_COLLNET 3
extern const char* ncclAlgoStr[NCCL_NUM_ALGORITHMS];
#define NCCL_NUM_PROTOCOLS 3 // Simple/LL/LL128
@ -116,13 +117,36 @@ struct ncclRing {
int* devUserRanks;
};
#define SCKL_MAX_NUM_CONN 16
#define SCKL_MAX_NUM_STEPS 16
#define SCKL_MAX_NUM_THREAD_BLOCKS_PER_CHANNEL 128
#define SCKL_SEND 0
#define SCKL_RECV 1
struct scklThreadBlock {
uint8_t peer;
uint8_t type; // follow SCKL_SEND and SCKL_RECV macros
uint8_t nsteps;
// step is used to index into this array. transfers[step] is the addr to transfer.
uint16_t transfers[SCKL_MAX_NUM_STEPS];
};
// gpuId is the one that is in comm->rank
struct scklAlgorithm {
// number of chunks per gpu
int nChunks;
// number of threadblocks
int nBlocks;
// rbid is used as an index into this array
struct scklThreadBlock scklTB[SCKL_MAX_NUM_THREAD_BLOCKS_PER_CHANNEL];
// these two arrays can be inferred from scklTB. they are created to use NCCL API easily
int sendPeers[SCKL_MAX_NUM_THREAD_BLOCKS_PER_CHANNEL];
int nchunksForSendPeer[SCKL_MAX_NUM_THREAD_BLOCKS_PER_CHANNEL];
int nsendPeers;
int recvPeers[SCKL_MAX_NUM_THREAD_BLOCKS_PER_CHANNEL];
int nchunksForRecvPeer[SCKL_MAX_NUM_THREAD_BLOCKS_PER_CHANNEL];
int nrecvPeers;
struct scklGraph {
int nRecvPeers;
int nSendPeers;
int recv[SCKL_MAX_NUM_CONN];
int send[SCKL_MAX_NUM_CONN];
};
#define NCCL_MAX_TREE_ARITY 3
@ -151,7 +175,9 @@ struct ncclWorkElem {
uint16_t nThreads;
uint16_t funcIndex;
uint16_t index;
uint16_t active;
// in SCKL algorithms, ncclWorkElem.active element from workFifo is replicated for for all other thread blocks
uint16_t active[SCKL_MAX_NUM_THREAD_BLOCKS_PER_CHANNEL];
uint16_t scklNumBlocksPerChannel;
const void * sendbuff;
void * recvbuff;
@ -171,13 +197,13 @@ struct ncclWorkElem {
int32_t delta;
uint16_t nThreads;
} p2p;
uint64_t align[4];
uint64_t align[28];
};
};
struct ncclWork {
struct ncclWorkElem elems[NCCL_MAX_WORK_ELEMENTS];
};
static_assert(sizeof(struct ncclWorkElem) == (0x10*sizeof(int)), "ncclWorkElem must have a pow2 size");
static_assert(sizeof(struct ncclWorkElem) == (0x80*sizeof(int)), "ncclWorkElem must have a pow2 size");
struct ncclChannel {
union {
@ -185,7 +211,6 @@ struct ncclChannel {
struct ncclRing ring;
struct ncclTree tree;
struct ncclTree collTree;
struct scklGraph sGraph;
int id;
@ -207,6 +232,7 @@ struct ncclDevComm {
int rank;
int nRanks;
int buffSizes[NCCL_NUM_PROTOCOLS];
struct scklAlgorithm scklAlgo;
// Flag to ask NCCL kernels to abort
volatile uint32_t *abortFlag;

Просмотреть файл

@ -35,8 +35,8 @@ ncclResult_t ncclTopoCheckGdr(struct ncclTopoSystem* topo, int64_t busId, int ne
// Set CPU affinity
ncclResult_t ncclTopoSetAffinity(struct ncclTopoSystem* system, int rank);
// SCKL setup peers
ncclResult_t scklGetTopoFromXMLAndSetChannels(struct ncclComm* comm);
// SCKL get alirthm from XML file and set the communicator
ncclResult_t scklGetAlgoFromXMLAndSetComm(struct ncclComm* comm);
#define NCCL_TOPO_CPU_ARCH_X86 1
#define NCCL_TOPO_CPU_ARCH_POWER 2

Просмотреть файл

@ -19,7 +19,8 @@ typedef enum {
ncclPatternTreeDown,
ncclPatternTreeUpDown,
ncclPatternCollTreeUp,
ncclPatternCollTreeDown
ncclPatternCollTreeDown,
ncclPatternSckl
} ncclPattern_t;
// Used to pass NCCL call information between functions

Просмотреть файл

@ -23,6 +23,7 @@ struct ncclProxyArgs {
int sliceSteps;
int chunkSteps;
int nsteps;
int nLoops; // SCKL uses this to calculate number of proxies
uint64_t opCount;
int protocol;
int segment; // Only for profiling
@ -79,7 +80,7 @@ enum proxyMode {
proxyTo = 2
};
ncclResult_t ncclProxySaveColl(struct ncclProxyArgs* args, int pattern, int root, int nranks);
ncclResult_t ncclProxySaveColl(struct ncclProxyArgs* args, int pattern, int root, int nranks, struct scklAlgorithm* scklAlgo);
ncclResult_t ncclProxySaveP2p(struct ncclInfo* info, struct ncclChannel* channel, int segment);
ncclResult_t ncclProxyStart(struct ncclComm* comm);
ncclResult_t ncclProxyCreate(struct ncclComm* comm);

Просмотреть файл

@ -38,7 +38,7 @@ std::chrono::high_resolution_clock::time_point ncclEpoch;
#endif
const char* ncclFuncStr[NCCL_NUM_FUNCTIONS] = { "Broadcast", "Reduce", "AllGather", "ReduceScatter", "AllReduce", "AllToAll" };
const char* ncclAlgoStr[NCCL_NUM_ALGORITHMS] = { "Tree", "Ring", "CollNet" };
const char* ncclAlgoStr[NCCL_NUM_ALGORITHMS] = { "Tree", "Ring", "CollNet", "SCKL" };
const char* ncclProtoStr[NCCL_NUM_PROTOCOLS] = { "LL", "LL128", "Simple" };
NCCL_PARAM(GroupCudaStream, "GROUP_CUDA_STREAM", NCCL_GROUP_CUDA_STREAM);
@ -273,6 +273,9 @@ static ncclResult_t devCommSetup(ncclComm_t comm) {
NCCLCHECK(ncclCudaMemcpy(comm->channels[r].ring.devUserRanks, comm->channels[r].ring.userRanks, comm->nRanks));
}
// SCKL algo is copied to the device side
comm->hostDevComm.scklAlgo = comm->scklAlgo;
// Duplicate the dev comm on the device
NCCLCHECK(ncclCudaCalloc(&comm->devComm, 1));
NCCLCHECK(ncclCudaMemcpy(comm->devComm, &comm->hostDevComm, 1));
@ -823,18 +826,6 @@ static ncclResult_t initTransportsRank(struct ncclComm* comm, ncclUniqueId* comm
NCCLCHECKGOTO(ncclTransportP2pSetup(comm, &treeGraph), ret, affinity_restore);
INFO(NCCL_INIT, "Connected all trees");
// NetSharedBuffers needs to be set for this to work across nodes.
NCCLCHECK(scklGetTopoFromXMLAndSetChannels(comm));
// Connect SCKL graph
for (int c=0; c<comm->nChannels; c++) {
struct ncclChannel* channel = comm->channels+c;
if (comm->nRanks == 1) continue;
NCCLCHECKGOTO(ncclTransportP2pConnect(comm, channel, channel->sGraph.nRecvPeers, channel->sGraph.recv, channel->sGraph.nSendPeers, channel->sGraph.send), ret, affinity_restore);
}
// It appears that graph is not really needed for P2pSetup. The only place that actually uses it is in ncclTopoGetNetDev which has a bypass for when it is set to NULL.
NCCLCHECKGOTO(ncclTransportP2pSetup(comm, NULL), ret, affinity_restore);
INFO(NCCL_INIT, "Connected SCKL graph");
// Check if we can setup CollNet
if (comm->nNodes > 1 &&
ncclParamCollNetEnable() == 1 &&
@ -862,12 +853,24 @@ static ncclResult_t initTransportsRank(struct ncclComm* comm, ncclUniqueId* comm
TRACE(NCCL_INIT, "rank %d nranks %d - CONNECTED %d RINGS AND TREES", rank, nranks, comm->nChannels);
free(rings);
// Compute time models for algorithm and protocol combinations
// Compute time models for algorithm and protocol combinations.
NCCLCHECK(ncclTopoTuneModel(comm, minCompCap, maxCompCap, &treeGraph, &ringGraph, &collNetGraph));
// Compute nChannels per peer for p2p
NCCLCHECK(ncclTopoComputeP2pChannels(comm));
// NetSharedBuffers needs to be set for this to work across nodes.
NCCLCHECK(scklGetAlgoFromXMLAndSetComm(comm));
// Connect SCKL graph
for (int c=0; c<comm->nChannels; c++) {
struct ncclChannel* channel = comm->channels+c;
if (comm->nRanks == 1) continue;
NCCLCHECKGOTO(ncclTransportP2pConnect(comm, channel, comm->scklAlgo.nrecvPeers, comm->scklAlgo.recvPeers, comm->scklAlgo.nsendPeers, comm->scklAlgo.sendPeers), ret, affinity_restore);
}
// It appears that graph is not really needed for P2pSetup. The only place that actually uses it is in ncclTopoGetNetDev which has a bypass for when it is set to NULL.
NCCLCHECKGOTO(ncclTransportP2pSetup(comm, NULL), ret, affinity_restore);
INFO(NCCL_INIT, "Connected SCKL algorithm");
NCCLCHECK(ncclCommSetIntra(comm, intraRank, intraRanks, intraRank0Comm));
if (comm->nNodes) NCCLCHECK(ncclProxyCreate(comm));

Просмотреть файл

@ -49,8 +49,8 @@ ncclResult_t ArgsCheck(struct ncclInfo* info) {
info->count = info->nBytes;
info->datatype = ncclInt8;
}
// SCKL TODO: check if ncclAllToAll needs to be involved
if (info->coll == ncclFuncAllGather || info->coll == ncclFuncReduceScatter) info->nBytes *= info->comm->nRanks; // count is per rank
if (info->coll == ncclFuncAllToAll || info->coll == ncclFuncAllGather || info->coll == ncclFuncReduceScatter) info->nBytes *= info->comm->nRanks; // count is per rank
if (info->op < 0 || info->op >= ncclNumOps) {
WARN("%s : invalid reduction operation %d", info->opName, info->op);

Просмотреть файл

@ -189,7 +189,7 @@ static ncclResult_t SaveProxy(int type, int peer, struct ncclProxyArgs* args) {
return ncclSuccess;
}
ncclResult_t ncclProxySaveColl(struct ncclProxyArgs* args, int pattern, int root, int nranks) {
ncclResult_t ncclProxySaveColl(struct ncclProxyArgs* args, int pattern, int root, int nranks, struct scklAlgorithm* scklAlgo) {
if (pattern == ncclPatternRing || pattern == ncclPatternRingTwice || pattern == ncclPatternPipelineFrom || pattern == ncclPatternPipelineTo) {
struct ncclRing* ring = &args->channel->ring;
if (NeedProxy(proxyRecv, pattern, root, ring, nranks)) NCCLCHECK(SaveProxy(proxyRecv, ring->prev, args));
@ -219,6 +219,17 @@ ncclResult_t ncclProxySaveColl(struct ncclProxyArgs* args, int pattern, int root
NCCLCHECK(SaveProxy(proxySend, tree->down[0], args));
NCCLCHECK(SaveProxy(proxyRecv, tree->up, args));
}
if (pattern == ncclPatternSckl){
// nsteps is adjusted here for SCKL algo
for (int i=0; i<scklAlgo->nrecvPeers; i++){
args->nsteps = scklAlgo->nchunksForRecvPeer[i] * args->nLoops * args->chunkSteps;
NCCLCHECK(SaveProxy(proxyRecv, scklAlgo->recvPeers[i], args));
}
for (int i=0; i<scklAlgo->nsendPeers; i++){
args->nsteps = scklAlgo->nchunksForSendPeer[i] * args->nLoops * args->chunkSteps;
NCCLCHECK(SaveProxy(proxySend, scklAlgo->sendPeers[i], args));
}
}
return ncclSuccess;
}
@ -426,7 +437,6 @@ ncclResult_t ncclProxySharedBuffersInit(struct ncclComm* comm, int cuda, int* si
char* buff;
int* used;
*size = 2*comm->p2pnChannels*state->slotSize*state->nslots;
if (cuda && state->cudaBuff[0] == NULL) {
NCCLCHECK(ncclCudaCalloc(&buff, *size));
NCCLCHECK(ncclCalloc(&used, 2*comm->p2pnChannels*state->nslots));

Просмотреть файл

@ -22,6 +22,7 @@
#include <unistd.h>
#include "ibvwrap.h"
#include <thread>
#define USE_RDMA_WRITE 1
#define MAXNAMESIZE 64