Integrate eldak/distributedTesting into master

This commit is contained in:
Project Philly 2017-01-30 09:16:15 -08:00
Родитель 5673c777e7 717f198400
Коммит cfcf73cd22
5 изменённых файлов: 182 добавлений и 102 удалений

Просмотреть файл

@ -13,13 +13,12 @@ import cntk
import _cntk_py
from cntk.utils import *
from cntk.distributed import data_parallel_distributed_learner, Communicator
from cntk.device import set_default_device, gpu
from cntk.distributed import data_parallel_distributed_learner, block_momentum_distributed_learner, Communicator
# default Paths relative to current python file.
abs_path = os.path.dirname(os.path.abspath(__file__))
data_path = os.path.join(abs_path, "..", "..", "..", "DataSets", "CIFAR-10")
model_path = os.path.join(abs_path, "Models")
log_dir = None
# model dimensions
image_height = 32
@ -93,7 +92,7 @@ def create_conv_network():
# Create trainer
def create_trainer(network, epoch_size, num_quantization_bits):
def create_trainer(network, epoch_size, num_quantization_bits, block_size, warm_up):
# Set learning parameters
lr_per_sample = [0.0015625]*20 + [0.00046875]*20 + [0.00015625]*20 + [0.000046875]*10 + [0.000015625]
lr_schedule = cntk.learning_rate_schedule(lr_per_sample, unit=cntk.learner.UnitType.sample, epoch_size=epoch_size)
@ -102,13 +101,20 @@ def create_trainer(network, epoch_size, num_quantization_bits):
l2_reg_weight = 0.002
# Create learner
parameter_learner = data_parallel_distributed_learner(
cntk.learner.momentum_sgd(network['output'].parameters, lr_schedule, mm_schedule, l2_regularization_weight=l2_reg_weight),
num_quantization_bits=num_quantization_bits,
distributed_after=0)
if block_size != None and num_quantization_bits != 32:
raise RuntimeError("Block momentum cannot be used with quantization, please remove quantized_bits option.")
local_learner = cntk.learner.momentum_sgd(network['output'].parameters,
lr_schedule, mm_schedule,
l2_regularization_weight=l2_reg_weight)
if block_size != None:
learner = block_momentum_distributed_learner(local_learner, block_size=block_size)
else:
learner = data_parallel_distributed_learner(local_learner, num_quantization_bits=num_quantization_bits, distributed_after=warm_up)
# Create trainer
return cntk.Trainer(network['output'], network['ce'], network['pe'], parameter_learner)
return cntk.Trainer(network['output'], network['ce'], network['pe'], learner)
# Train and test
def train_and_test(network, trainer, train_source, test_source, progress_printer, epoch_size):
@ -153,7 +159,7 @@ def train_and_test(network, trainer, train_source, test_source, progress_printer
# Train and evaluate the network.
def convnet_cifar10_dataaug(train_data, test_data, mean_data, num_quantization_bits=32, epoch_size = 50000, max_epochs=80, log_to_file=None, num_mbs_per_log=None, gen_heartbeat=False):
def convnet_cifar10_dataaug(train_data, test_data, mean_data, epoch_size=50000, num_quantization_bits=32, block_size=3200, warm_up=0, max_epochs=2, log_to_file=None, num_mbs_per_log=None, gen_heartbeat=False):
_cntk_py.set_computation_network_trace_level(0)
progress_printer = ProgressPrinter(
@ -165,7 +171,7 @@ def convnet_cifar10_dataaug(train_data, test_data, mean_data, num_quantization_b
num_epochs=max_epochs)
network = create_conv_network()
trainer = create_trainer(network, epoch_size, num_quantization_bits)
trainer = create_trainer(network, epoch_size, num_quantization_bits, block_size, warm_up)
train_source = create_image_mb_source(train_data, mean_data, train=True, total_number_of_samples=max_epochs * epoch_size)
test_source = create_image_mb_source(test_data, mean_data, train=False, total_number_of_samples=cntk.io.FULL_DATA_SWEEP)
train_and_test(network, trainer, train_source, test_source, progress_printer, epoch_size)
@ -174,25 +180,38 @@ def convnet_cifar10_dataaug(train_data, test_data, mean_data, num_quantization_b
if __name__=='__main__':
parser = argparse.ArgumentParser()
data_path = os.path.join(abs_path, "..", "..", "..", "DataSets", "CIFAR-10")
parser.add_argument('-datadir', help='specify the location of your data');
parser.add_argument('-logdir', help='specify where the training log will be saved');
parser.add_argument('-outputdir', help='specify where the output model/checkpoint files shall be saved');
parser.add_argument('-d', '--datadir', help='Data directory where the CIFAR dataset is located', required=False, default=data_path)
parser.add_argument('-o', '--outputdir', help='Output directory for checkpoints and models', required=False, default=None)
parser.add_argument('-l', '--log', help='Log file', required=False, default=None)
parser.add_argument('-e', '--epochs', help='Total number of epochs to train', type=int, required=False, default='160')
parser.add_argument('-q', '--quantized_bits', help='Number of quantized bits used for gradient aggregation', type=int, required=False, default='32')
parser.add_argument('-a', '--distributed_after', help='Number of samples to train with before running distributed', type=int, required=False, default='0')
parser.add_argument('-b', '--block_samples', type=int, help="Number of samples per block for block momentum (BM) distributed learner (if 0 BM learner is not used)", required=False, default=None)
parser.add_argument('-device', '--device', type=int, help="Force to run the script on a specified device", required=False, default=None)
args = vars(parser.parse_args())
if args['datadir'] != None:
if args['outputdir'] is not None:
model_path = args['o'] + "/models"
if args['device'] is not None:
set_default_device(gpu(args['device']))
if args['datadir'] is not None:
data_path = args['datadir']
if args['logdir'] != None:
log_dir = args['logdir']
if args['outputdir'] != None:
model_path = args['outputdir'] + "/models"
mean_data=os.path.join(data_path, 'CIFAR-10_mean.xml')
train_data=os.path.join(data_path, 'train_map.txt')
test_data=os.path.join(data_path, 'test_map.txt')
convnet_cifar10_dataaug(train_data, test_data, mean_data, num_quantization_bits=32, max_epochs=80, log_to_file=log_dir, num_mbs_per_log=10)
Communicator.finalize()
try:
convnet_cifar10_dataaug(train_data, test_data, mean_data,
epoch_size=50000,
num_quantization_bits=args['quantized_bits'],
block_size=args['block_samples'],
warm_up=args['distributed_after'],
max_epochs=args['epochs'],
log_to_file=args['log'], num_mbs_per_log=10)
finally:
Communicator.finalize()

Просмотреть файл

@ -16,7 +16,8 @@ from cntk.ops import input_variable, cross_entropy_with_softmax, classification_
from cntk import Trainer, cntk_py
from cntk.learner import momentum_sgd, learning_rate_schedule, momentum_as_time_constant_schedule, UnitType
from _cntk_py import set_computation_network_trace_level
from cntk.distributed import data_parallel_distributed_learner, Communicator
from cntk.device import set_default_device, gpu
from cntk.distributed import data_parallel_distributed_learner, block_momentum_distributed_learner, Communicator
from resnet_models import *
@ -65,7 +66,7 @@ def create_resnet_network(network_name):
# Create trainer
def create_trainer(network, minibatch_size, epoch_size, num_quantization_bits):
def create_trainer(network, minibatch_size, epoch_size, num_quantization_bits, block_size, warm_up):
if network['name'] == 'resnet20':
lr_per_mb = [1.0]*80+[0.1]*40+[0.01]
elif network['name'] == 'resnet110':
@ -82,12 +83,17 @@ def create_trainer(network, minibatch_size, epoch_size, num_quantization_bits):
mm_schedule = momentum_as_time_constant_schedule(momentum_time_constant)
# learner object
if block_size != None and num_quantization_bits != 32:
raise RuntimeError("Block momentum cannot be used with quantization, please remove quantized_bits option.")
local_learner = momentum_sgd(network['output'].parameters, lr_schedule, mm_schedule,
l2_regularization_weight = l2_reg_weight)
learner = data_parallel_distributed_learner(learner=local_learner,
num_quantization_bits=num_quantization_bits,
distributed_after=0)
if block_size != None:
learner = block_momentum_distributed_learner(local_learner, block_size=block_size)
else:
learner = data_parallel_distributed_learner(local_learner, num_quantization_bits=num_quantization_bits, distributed_after=warm_up)
return Trainer(network['output'], network['ce'], network['pe'], learner)
# Train and test
@ -122,15 +128,17 @@ def train_and_test(network, trainer, train_source, test_source, progress_printer
metric_denom += local_mb_samples
minibatch_index += 1
fin_msg = "Final Results: Minibatch[1-{}]: errs = {:0.2f}% * {}".format(minibatch_index+1, (metric_numer*100.0)/metric_denom, metric_denom)
progress_printer.end_progress_print(fin_msg)
print("")
print("Final Results: Minibatch[1-{}]: errs = {:0.2f}% * {}".format(minibatch_index+1, (metric_numer*100.0)/metric_denom, metric_denom))
print(fin_msg)
print("")
return metric_numer/metric_denom
# Train and evaluate the network.
def resnet_cifar10(train_data, test_data, mean_data, network_name, num_quantization_bits=32, epoch_size=50000, max_epochs=160, log_to_file=None, num_mbs_per_log=None, gen_heartbeat=False, scale_up=False):
def resnet_cifar10(train_data, test_data, mean_data, network_name, epoch_size, num_quantization_bits=32, block_size=3200, warm_up=0, max_epochs=5, log_to_file=None, num_mbs_per_log=None, gen_heartbeat=False, scale_up=False):
set_computation_network_trace_level(0)
@ -149,38 +157,65 @@ def resnet_cifar10(train_data, test_data, mean_data, network_name, num_quantizat
num_epochs=max_epochs)
network = create_resnet_network(network_name)
trainer = create_trainer(network, minibatch_size, epoch_size, num_quantization_bits)
trainer = create_trainer(network, minibatch_size, epoch_size, num_quantization_bits, block_size, warm_up)
train_source = create_image_mb_source(train_data, mean_data, train=True, total_number_of_samples=max_epochs * epoch_size)
test_source = create_image_mb_source(test_data, mean_data, train=False, total_number_of_samples=cntk.io.FULL_DATA_SWEEP)
return train_and_test(network, trainer, train_source, test_source, progress_printer, minibatch_size, epoch_size)
if __name__=='__main__':
data_path = os.path.join(abs_path, "..", "..", "..", "DataSets", "CIFAR-10")
parser = argparse.ArgumentParser()
parser.add_argument('-n', '--network', help='network type, resnet20 or resnet110', required=False, default='resnet20')
parser.add_argument('-e', '--epochs', help='total epochs', type=int, required=False, default='160')
parser.add_argument('-q', '--quantize_bit', help='quantized bit', type=int, required=False, default='32')
parser.add_argument('-s', '--scale_up', help='scale up minibatch size with #workers for better parallelism', type=bool, required=False, default='False')
parser.add_argument('-a', '--distributed_after', help='number of samples to train with before running distributed', type=int, required=False, default='0')
parser.add_argument('-d', '--datadir', help='Data directory where the CIFAR dataset is located', required=False, default=data_path)
parser.add_argument('-o', '--outputdir', help='Output directory for checkpoints and models', required=False, default=None)
parser.add_argument('-l', '--log', help='Log file', required=False, default=None)
parser.add_argument('-e', '--epochs', help='Total number of epochs to train', type=int, required=False, default='160')
parser.add_argument('-es', '--epoch_size', help='Size of epoch in samples', type=int, required=False, default=None)
parser.add_argument('-q', '--quantized_bits', help='Number of quantized bits used for gradient aggregation', type=int, required=False, default='32')
parser.add_argument('-b', '--block_samples', type=int, help="Number of samples per block for block momentum (BM) distributed learner (if 0 BM learner is not used)", required=False, default=None)
parser.add_argument('-a', '--distributed_after', help='Number of samples to train with before running distributed', type=int, required=False, default='0')
parser.add_argument('-device', '--device', type=int, help="Force to run the script on a specified device", required=False, default=None)
args = vars(parser.parse_args())
num_quantization_bits = int(args['quantize_bit'])
epochs = int(args['epochs'])
distributed_after_samples = int(args['distributed_after'])
epoch_size = 50000
if args['outputdir'] != None:
model_path = args['o'] + "/models"
if args['device'] != None:
set_default_device(gpu(args['device']))
if args['datadir'] is not None:
data_path = args['datadir']
if args['epoch_size'] is not None:
epoch_size = args['epoch_size']
mean_data=os.path.join(data_path, 'CIFAR-10_mean.xml')
train_data=os.path.join(data_path, 'train_map.txt')
test_data=os.path.join(data_path, 'test_map.txt')
num_quantization_bits = args['quantized_bits']
epochs = args['epochs']
warm_up = args['distributed_after']
network_name = args['network']
scale_up = bool(args['scale_up'])
# Create distributed trainer factory
print("Start training: quantize_bit = {}, epochs = {}, distributed_after = {}".format(num_quantization_bits, epochs, distributed_after_samples))
train_data=os.path.join(data_path, 'train_map.txt')
test_data=os.path.join(data_path, 'test_map.txt')
mean_data=os.path.join(data_path, 'CIFAR-10_mean.xml')
print("Start training: quantize_bit = {}, epochs = {}, distributed_after = {}".format(num_quantization_bits, epochs, warm_up))
epoch_size = 50000
resnet_cifar10(train_data, test_data, mean_data,
network_name, num_quantization_bits, epoch_size, epochs,
scale_up=scale_up)
# Must call MPI finalize when process exit
Communicator.finalize()
try:
resnet_cifar10(train_data, test_data, mean_data,
network_name,
epoch_size,
num_quantization_bits,
block_size=args['block_samples'],
warm_up=args['distributed_after'],
max_epochs=epochs,
scale_up=scale_up,
log_to_file=args['log'])
finally:
# Must call MPI finalize when process exit
Communicator.finalize()

Просмотреть файл

@ -11,6 +11,7 @@ from cntk.ops.tests.ops_test_utils import cntk_device
from cntk.cntk_py import DeviceKind_GPU
from cntk.device import set_default_device
from cntk.io import ReaderConfig, ImageDeserializer
from cntk import distributed
import pytest
abs_path = os.path.dirname(os.path.abspath(__file__))
@ -45,6 +46,7 @@ def test_alexnet_error(device_id):
minibatch_size=16,
epoch_size=64,
max_epochs=2)
distributed.Communicator.finalize()
# expected_test_error = 0.0
# We are removing tolerance in error because running small epoch size has huge variance in accuracy. Will add

Просмотреть файл

@ -16,20 +16,35 @@ from cntk.cntk_py import DeviceKind_GPU
from cntk.device import set_default_device
abs_path = os.path.dirname(os.path.abspath(__file__))
sys.path.append(abs_path)
from run_ConvNet_CIFAR10_DataAug_Distributed import run_cifar_convnet_distributed
example_dir = os.path.join(abs_path, "..", "..", "..", "..", "Examples", "Image", "Classification", "ConvNet", "Python")
script_under_test = os.path.join(example_dir, "ConvNet_CIFAR10_DataAug_Distributed.py")
#TOLERANCE_ABSOLUTE = 2E-1
sys.path.append(example_dir)
TOLERANCE_ABSOLUTE = 2E-1
TIMEOUT_SECONDS = 300
def test_cifar_convnet_distributed_mpiexec(device_id):
if cntk_device(device_id).type() != DeviceKind_GPU:
pytest.skip('test only runs on GPU')
def data_set_directory():
try:
base_path = os.path.join(os.environ['CNTK_EXTERNAL_TESTDATA_SOURCE_DIRECTORY'],
*"Image/CIFAR/v0/cifar-10-batches-py".split("/"))
# N.B. CNTK_EXTERNAL_TESTDATA_SOURCE_DIRECTORY has {train,test}_map.txt
# and CIFAR-10_mean.xml in the base_path.
except KeyError:
base_path = os.path.join(os.path.dirname(os.path.abspath(__file__)),
*"../../../../Examples/Image/DataSets/CIFAR-10".split("/"))
cmd = ["mpiexec", "-n", "2", "python", os.path.join(abs_path, "run_ConvNet_CIFAR10_DataAug_Distributed.py")]
base_path = os.path.normpath(base_path)
os.chdir(os.path.join(base_path, '..'))
return base_path
def mpiexec_test(device_id, script, params, expected_test_error, match_exactly=True, per_minibatch_tolerance=TOLERANCE_ABSOLUTE, error_tolerance=TOLERANCE_ABSOLUTE):
if cntk_device(device_id).type() != DeviceKind_GPU:
pytest.skip('test only runs on GPU')
cmd = ["mpiexec", "-n", "2", "python", script] + params
p = subprocess.Popen(cmd, stdout=subprocess.PIPE)
if sys.version_info[0] < 3:
# TODO add timeout for Py2?
out = p.communicate()[0]
else:
try:
@ -41,11 +56,32 @@ def test_cifar_convnet_distributed_mpiexec(device_id):
results = re.findall("Final Results: Minibatch\[.+?\]: errs = (.+?)%", str_out)
assert len(results) == 2
assert results[0] == results[1]
# We are removing tolerance in error because running small epoch size has huge variance in accuracy. Will add
# tolerance back once convolution operator is determinsitic.
# expected_test_error = 0.617
# assert np.allclose(float(results[0])/100, expected_test_error,
# atol=TOLERANCE_ABSOLUTE)
if match_exactly:
assert results[0] == results[1]
else:
assert np.allclose(float(results[0]), float(results[1]), atol=per_minibatch_tolerance)
assert np.allclose(float(results[0])/100, expected_test_error, atol=error_tolerance)
def test_cifar_convnet_distributed(device_id):
params = [ "-e", "2",
"-d", data_set_directory(),
"-q", "32",
"-device", "0" ]
mpiexec_test(device_id, script_under_test, params, 0.617)
def test_cifar_convnet_distributed_1bitsgd(device_id):
params = [ "-e", "2",
"-d", data_set_directory(),
"-q", "1",
"-device", "0" ]
mpiexec_test(device_id, script_under_test, params, 0.617)
def test_cifar_convnet_distributed_block_momentum(device_id):
params = [ "-e", "2",
"-d", data_set_directory(),
"-b", "3200",
"-device", "0" ]
mpiexec_test(device_id, script_under_test, params, 0.6457, False, 10)

Просмотреть файл

@ -7,51 +7,39 @@
import numpy as np
import os
import sys
from cntk.ops.tests.ops_test_utils import cntk_device
from cntk.cntk_py import DeviceKind_GPU
from cntk.device import set_default_device
from cntk.io import FULL_DATA_SWEEP
from cntk import distributed
import pytest
import subprocess
abs_path = os.path.dirname(os.path.abspath(__file__))
example_dir = os.path.join(abs_path, "..", "..", "..", "..", "Examples", "Image", "Classification", "ResNet", "Python")
sys.path.append(example_dir)
sys.path.append(abs_path)
sys.path.append(os.path.join(abs_path, "..", "..", "..", "..", "Examples", "Image", "Classification", "ResNet", "Python"))
from prepare_test_data import prepare_CIFAR10_data
from TrainResNet_CIFAR10_Distributed import resnet_cifar10
#TOLERANCE_ABSOLUTE = 2E-1
from ConvNet_CIFAR10_DataAug_Distributed_test import mpiexec_test, data_set_directory
def test_cifar_resnet_distributed_error(device_id, is_1bit_sgd):
if cntk_device(device_id).type() != DeviceKind_GPU:
pytest.skip('test only runs on GPU')
set_default_device(cntk_device(device_id))
script_under_test = os.path.join(example_dir, "TrainResNet_CIFAR10_Distributed.py")
if not is_1bit_sgd:
pytest.skip('test only runs in 1-bit SGD')
def test_cifar_resnet_distributed(device_id):
params = [ "-e", "2",
"-d", data_set_directory(),
"-q", "32",
"-es", "512",
"-device", "0" ]
mpiexec_test(device_id, script_under_test, params, 0.86, False, 2)
base_path = prepare_CIFAR10_data()
# change dir to locate data.zip correctly
os.chdir(base_path)
def test_cifar_resnet_distributed_1bitsgd(device_id):
params = [ "-e", "2",
"-d", data_set_directory(),
"-q", "1",
"-es", "512",
"-device", "0" ]
mpiexec_test(device_id, script_under_test, params, 0.86, False, 2)
from _cntk_py import set_computation_network_trace_level, set_fixed_random_seed, force_deterministic_algorithms
set_computation_network_trace_level(1)
set_fixed_random_seed(1) # BUGBUG: has no effect at present # TODO: remove debugging facilities once this all works
#force_deterministic_algorithms()
# TODO: do the above; they lead to slightly different results, so not doing it for now
train_data=os.path.join(base_path, 'train_map.txt')
test_data=os.path.join(base_path, 'test_map.txt')
mean_data=os.path.join(base_path, 'CIFAR-10_mean.xml')
test_error = resnet_cifar10(train_data, test_data, mean_data, 'resnet20', epoch_size=512, max_epochs=2)
# We are removing tolerance in error because running small epoch size has huge variance in accuracy. Will add
# tolerance back once convolution operator is determinsitic.
# expected_test_error = 0.282
# assert np.allclose(test_error, expected_test_error,
# atol=TOLERANCE_ABSOLUTE)
distributed.Communicator.finalize()
def test_cifar_resnet_distributed_block_momentum(device_id):
params = [ "-e", "2",
"-d", data_set_directory(),
"-b", "3200",
"-es", "512",
"-device", "0" ]
mpiexec_test(device_id, script_under_test, params, 0.89, False, 2)