Merged PR 1451: Multi-node Jobs

Allow multi-node jobs using torch.distributed

Sanity-checks:
[x] Multi-node Job: https://aka.ms/amlt?q=d68m9
[x] Single-node Job:  https://aka.ms/amlt?q=d68nu
This commit is contained in:
Mirian Hipolito Garcia 2023-01-04 15:11:59 +00:00
Родитель 0e8762b0e8
Коммит 0477a95306
3 изменённых файлов: 17 добавлений и 16 удалений

Просмотреть файл

@ -244,7 +244,7 @@ class Client:
# Ensure the client is assigned to the correct GPU
if torch.cuda.is_available() and torch.cuda.device_count() == federated.size():
torch.cuda.set_device(federated.rank())
torch.cuda.set_device(federated.local_rank())
# Process inputs and initialize variables
client_id, data_strcts, config, send_gradients = client_data

Просмотреть файл

@ -1,6 +1,7 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.
import os
import cProfile
import logging
@ -41,11 +42,15 @@ def encode_string(word, string_to_int = True):
def rank():
""" Return rank of node. """
return dist.get_rank()
return int(os.environ['RANK'])
def local_rank():
""" Return local rank of node. """
return int(os.environ['LOCAL_RANK'])
def size():
""" Returns number of nodes in the distributed group, including server. """
return dist.get_world_size()
return int(os.environ['WORLD_SIZE'])
def _recv(x, src=0):
""" Receives tensors with a single element or a list of tensors
@ -213,7 +218,7 @@ def append_async_requests(node_request_map, node):
""" Appends the asynchronous request sent to each worker during
asynchronous training. """
ack = to_device(torch.zeros(1))
ack = to_device(torch.tensor(1))
req = dist.irecv(tensor=ack, src=node)
node_request_map.append((node,req))
return node_request_map

Просмотреть файл

@ -88,20 +88,16 @@ def run_worker(model_path, config, task, data_path, local_rank, backend):
server_config = config["server_config"]
# Backend initialization
WORLD_RANK = federated.rank()
LOCAL_RANK = federated.local_rank()
print_rank(f"Backend: {backend}")
dist.init_process_group(backend=backend, init_method=None)
rank = dist.get_rank()
if torch.cuda.is_available():
torch.cuda.set_device(rank)
# Get the rank on NCCL/GLOO
rank = local_rank if local_rank > -1 else federated.rank()
dist.init_process_group(backend=backend, init_method=None, rank=WORLD_RANK, world_size=federated.size())
# Assign NCCL thread to a specific GPU
if torch.cuda.is_available():
n_gpus = torch.cuda.device_count()
torch.cuda.set_device(federated.rank() % n_gpus)
print_rank(f"Assigning worker to GPU {federated.rank() % n_gpus}")
print_rank(f"Assigning worker to GPU {LOCAL_RANK}")
device = torch.device("cuda:{}".format(LOCAL_RANK))
torch.cuda.set_device(device)
# Make the Model to distribute to workers
model = make_model(model_config)
@ -119,7 +115,7 @@ def run_worker(model_path, config, task, data_path, local_rank, backend):
config["server_config"]["data_config"]["num_clients"] = num_clients
# Instantiate the Server object on the first thread
if rank == 0:
if WORLD_RANK == 0:
try:
print_rank('Server data preparation')
@ -169,7 +165,7 @@ def run_worker(model_path, config, task, data_path, local_rank, backend):
else:
# Instantiate client-processing Worker on remaining threads
print_rank("Worker on node {}: process started".format(rank))
print_rank("Worker on node {}: process started".format(WORLD_RANK))
client_config = config["client_config"]
worker = federated.Worker(
model=model,