Fix two bugs of experiment resume and improve unittest of nnimanager (#5322)

This commit is contained in:
QuanluZhang 2023-03-15 20:16:31 +08:00 коммит произвёл GitHub
Родитель 4c4b98fcb6
Коммит 2b2d6e4d2c
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
13 изменённых файлов: 517 добавлений и 326 удалений

Просмотреть файл

@ -43,7 +43,9 @@ class Recoverable:
self.recovered_max_param_id = previous_max_param_id
return previous_max_param_id
def is_created_in_previous_exp(self, param_id: int) -> bool:
def is_created_in_previous_exp(self, param_id: int | None) -> bool:
if param_id is None:
return False
return param_id <= self.recovered_max_param_id
def get_previous_param(self, param_id: int) -> dict:

Просмотреть файл

@ -87,6 +87,7 @@ class MsgDispatcher(MsgDispatcherBase):
def handle_initialize(self, data):
"""Data is search space
"""
_logger.info('Initial search space: %s', data)
self.tuner.update_search_space(data)
self.send(CommandType.Initialized, '')
@ -108,6 +109,7 @@ class MsgDispatcher(MsgDispatcherBase):
self.send(CommandType.NoMoreTrialJobs, _pack_parameter(ids[0], ''))
def handle_update_search_space(self, data):
_logger.info('New search space: %s', data)
self.tuner.update_search_space(data)
def handle_import_data(self, data):

Просмотреть файл

@ -150,7 +150,8 @@ stages:
- script: |
set -e
npm --prefix ts/nni_manager run test
npm --prefix ts/nni_manager run test -- --exclude test/core/nnimanager.test.ts
npm --prefix ts/nni_manager run test_nnimanager
cp ts/nni_manager/coverage/cobertura-coverage.xml coverage/typescript.xml
displayName: TypeScript unit test
@ -197,7 +198,8 @@ stages:
- script: |
export PATH=${PWD}/toolchain/node/bin:$PATH
npm --prefix ts/nni_manager run test
npm --prefix ts/nni_manager run test -- --exclude test/core/nnimanager.test.ts
npm --prefix ts/nni_manager run test_nnimanager
displayName: TypeScript unit test
- job: windows
@ -220,9 +222,10 @@ stages:
displayName: Python unit test
# temporarily disable this test, add it back after bug fixed
# - script: |
# npm --prefix ts/nni_manager run test
# displayName: TypeScript unit test
- script: |
npm --prefix ts/nni_manager run test -- --exclude test/core/nnimanager.test.ts
npm --prefix ts/nni_manager run test_nnimanager
displayName: TypeScript unit test
- script: |
cd test
@ -246,7 +249,9 @@ stages:
displayName: Python unit test
- script: |
CI=true npm --prefix ts/nni_manager run test --exclude test/core/nnimanager.test.ts
CI=true npm --prefix ts/nni_manager run test -- --exclude test/core/nnimanager.test.ts
# # exclude nnimanager's ut because macos in pipeline is pretty slow
# CI=true npm --prefix ts/nni_manager run test_nnimanager
displayName: TypeScript unit test
- script: |

Просмотреть файл

@ -1,5 +1,5 @@
{
"color": true,
"require": "test/register.js",
"timeout": "15s"
"timeout": "40s"
}

Просмотреть файл

@ -23,7 +23,7 @@ import os from 'os';
import path from 'path';
import type { NniManagerArgs } from './arguments';
import type { LogStream } from './log_stream';
import { LogStream } from './log_stream';
import { NniPaths, createPaths } from './paths';
import { RestManager } from './rest';

Просмотреть файл

@ -33,6 +33,7 @@ import { createDispatcherInterface, IpcInterface } from './ipcInterface';
* NNIManager which implements Manager interface
*/
class NNIManager implements Manager {
private pollInterval: number; // for unittest to modify the polling interval
private trainingService!: TrainingService;
private dispatcher: IpcInterface | undefined;
private currSubmittedTrialNum: number; // need to be recovered
@ -52,6 +53,7 @@ class NNIManager implements Manager {
private trialJobMetricListener: (metric: TrialJobMetric) => void;
constructor() {
this.pollInterval = 5;
this.currSubmittedTrialNum = 0;
this.trialConcurrencyChange = 0;
this.dispatcherPid = 0;
@ -122,7 +124,7 @@ class NNIManager implements Manager {
return this.dataStore.exportTrialHpConfigs();
}
public addRecoveredTrialJob(allTrialJobs: Array<TrialJobInfo>): void {
public addRecoveredTrialJob(allTrialJobs: Array<TrialJobInfo>): number {
const jobs: Array<TrialJobInfo> = allTrialJobs.filter((job: TrialJobInfo) => job.status === 'WAITING' || job.status === 'RUNNING');
const trialData: any[] = [];
let maxSequeceId = 0;
@ -159,6 +161,7 @@ class NNIManager implements Manager {
// next sequenceId
this.experimentProfile.nextSequenceId = maxSequeceId + 1;
return trialData.length;
}
public addCustomizedTrialJob(hyperParams: string): Promise<number> {
@ -263,7 +266,11 @@ class NNIManager implements Manager {
// Resume currSubmittedTrialNum
this.currSubmittedTrialNum = allTrialJobs.length;
this.addRecoveredTrialJob(allTrialJobs);
const recoveredTrialNum = this.addRecoveredTrialJob(allTrialJobs);
// minus the number of the recovered trials,
// the recovered trials should not be counted in maxTrialNumber.
this.log.info(`Number of current submitted trials: ${this.currSubmittedTrialNum}, where ${recoveredTrialNum} is resuming.`);
this.currSubmittedTrialNum -= recoveredTrialNum;
// Collect generated trials and imported trials
const finishedTrialData: string = await this.exportData();
@ -585,7 +592,7 @@ class NNIManager implements Manager {
}
while (!['ERROR', 'STOPPING', 'STOPPED'].includes(this.status.status)) {
this.dispatcher.sendCommand(PING);
await delay(1000 * 5);
await delay(1000 * this.pollInterval); // 5 seconds
}
}
@ -746,7 +753,7 @@ class NNIManager implements Manager {
}
}
}
await delay(1000 * 5); // 5 seconds
await delay(1000 * this.pollInterval); // 5 seconds
}
}

Просмотреть файл

@ -163,10 +163,10 @@ class WebSocketChannelImpl implements WebSocketChannel {
}
private heartbeat(): void {
if (this.waitingPong) {
this.ws.terminate(); // this will trigger "close" event
this.handleError(new Error('tuner_command_channel: Tuner loses responsive'));
}
// if (this.waitingPong) {
// this.ws.terminate(); // this will trigger "close" event
// this.handleError(new Error('tuner_command_channel: Tuner loses responsive'));
// }
this.waitingPong = true;
this.ws.ping();
@ -190,7 +190,7 @@ class WebSocketChannelImpl implements WebSocketChannel {
}
}
const channelSingleton: WebSocketChannelImpl = new WebSocketChannelImpl();
let channelSingleton: WebSocketChannelImpl = new WebSocketChannelImpl();
let heartbeatInterval: number = 5000;
@ -198,4 +198,9 @@ export namespace UnitTestHelpers {
export function setHeartbeatInterval(ms: number): void {
heartbeatInterval = ms;
}
// NOTE: this function is only for unittest of nnimanager,
// because resuming an experiment should reset websocket channel.
export function resetChannelSingleton(): void {
channelSingleton = new WebSocketChannelImpl();
}
}

Просмотреть файл

@ -5,6 +5,7 @@
"scripts": {
"build": "tsc",
"test": "nyc --reporter=cobertura --reporter=text mocha test/**/*.test.ts",
"test_nnimanager": "nyc --reporter=cobertura --reporter=text mocha test/core/nnimanager.test.ts",
"mocha": "mocha",
"eslint": "eslint . --ext .ts"
},
@ -107,4 +108,4 @@
"sourceMap": true,
"instrument": true
}
}
}

Просмотреть файл

@ -5,292 +5,396 @@
import * as fs from 'fs';
import * as os from 'os';
import { assert, expect } from 'chai';
import { assert } from 'chai';
import { Container, Scope } from 'typescript-ioc';
import * as component from '../../common/component';
import { Database, DataStore } from '../../common/datastore';
import { Manager, ExperimentProfile} from '../../common/manager';
import { TrainingService } from '../../common/trainingService';
import { cleanupUnitTest, prepareUnitTest, killPid } from '../../common/utils';
import { Database, DataStore, TrialJobInfo } from '../../common/datastore';
import { Manager, TrialJobStatistics} from '../../common/manager';
import { TrialJobDetail } from '../../common/trainingService';
import { killPid } from '../../common/utils';
import { NNIManager } from '../../core/nnimanager';
import { SqlDB } from '../../core/sqlDatabase';
import { NNIDataStore } from '../../core/nniDataStore';
import { MockedTrainingService } from '../mock/trainingService';
import { MockedDataStore } from '../mock/datastore';
import { TensorboardManager } from '../../common/tensorboardManager';
import { NNITensorboardManager } from 'extensions/nniTensorboardManager';
import { NNITensorboardManager } from '../../extensions/nniTensorboardManager';
import * as path from 'path';
import { UnitTestHelpers } from 'core/ipcInterface';
import { RestServer } from '../../rest_server';
import globals from '../../common/globals/unittest';
import { UnitTestHelpers } from '../../core/tuner_command_channel/websocket_channel';
import * as timersPromises from 'timers/promises';
async function initContainer(): Promise<void> {
prepareUnitTest();
UnitTestHelpers.disableTuner();
let nniManager: NNIManager;
let experimentParams: any = {
experimentName: 'naive_experiment',
trialConcurrency: 3,
maxExperimentDuration: '10s',
maxTrialNumber: 3,
trainingService: {
platform: 'local'
},
searchSpace: {'lr': {'_type': 'choice', '_value': [0.01,0.001,0.002,0.003,0.004]}},
// use Random because no metric data from mocked training service
tuner: {
name: 'Random'
},
// skip assessor
// assessor: {
// name: 'Medianstop'
// },
// trialCommand does not take effect in mocked training service
trialCommand: 'sleep 2',
trialCodeDirectory: '',
debug: true
}
let experimentProfile: any = {
params: experimentParams,
// the experiment profile can only keep params,
// because the update logic only touch key-values in params.
// it violates the type of ExperimentProfile, but it is okay.
}
let mockedInfo = {
"id": "unittest",
"port": 8080,
"startTime": 1605246730756,
"endTime": "N/A",
"status": "INITIALIZED",
"platform": "local",
"experimentName": "testExp",
"tag": [],
"pid": 11111,
"webuiUrl": [],
"logDir": null
}
let restServer: RestServer;
async function initContainer(mode: string = 'create'): Promise<void> {
// updating the action is not necessary for the correctness of the tests.
// keep it here as a reminder.
if (mode === 'resume') {
const globalAsAny = global as any;
globalAsAny.nni.args.action = mode;
}
restServer = new RestServer(globals.args.port, globals.args.urlPrefix);
await restServer.start();
Container.bind(Manager).to(NNIManager).scope(Scope.Singleton);
Container.bind(Database).to(SqlDB).scope(Scope.Singleton);
Container.bind(DataStore).to(MockedDataStore).scope(Scope.Singleton);
Container.bind(DataStore).to(NNIDataStore).scope(Scope.Singleton);
Container.bind(TensorboardManager).to(NNITensorboardManager).scope(Scope.Singleton);
await component.get<DataStore>(DataStore).init();
}
// FIXME: timeout on macOS
describe('Unit test for nnimanager', function () {
let nniManager: NNIManager;
let ClusterMetadataKey = 'mockedMetadataKey';
let experimentParams: any = {
experimentName: 'naive_experiment',
trialConcurrency: 3,
maxExperimentDuration: '5s',
maxTrialNumber: 3,
trainingService: {
platform: 'local'
},
searchSpace: {'lr': {'_type': 'choice', '_value': [0.01,0.001]}},
tuner: {
name: 'TPE',
classArgs: {
optimize_mode: 'maximize'
}
},
assessor: {
name: 'Medianstop'
},
trialCommand: 'sleep 2',
trialCodeDirectory: '',
debug: true
async function prepareExperiment(): Promise<void> {
// globals.showLog();
// create ~/nni-experiments/.experiment
const expsFile = path.join(globals.args.experimentsDirectory, '.experiment');
if (!fs.existsSync(expsFile)) {
fs.writeFileSync(expsFile, '{}');
}
// clean the db file under the unittest experiment directory.
// NOTE: cannot remove the whole exp directory, it seems the directory is created before this line.
const unittestPath = path.join(globals.args.experimentsDirectory, globals.args.experimentId, 'db');
fs.rmSync(unittestPath, { recursive: true, force: true });
let updateExperimentParams = {
experimentName: 'another_experiment',
trialConcurrency: 2,
maxExperimentDuration: '6s',
maxTrialNumber: 2,
trainingService: {
platform: 'local'
},
searchSpace: '{"lr": {"_type": "choice", "_value": [0.01,0.001]}}',
tuner: {
name: 'TPE',
classArgs: {
optimize_mode: 'maximize'
}
},
assessor: {
name: 'Medianstop'
},
trialCommand: 'sleep 2',
trialCodeDirectory: '',
debug: true
// Write the experiment info to ~/nni-experiments/.experiment before experiment start.
// Do not use file lock for simplicity.
// The ut also works if not updating experiment info but ExperimentsManager will complain.
const fileInfo: Buffer = fs.readFileSync(globals.paths.experimentsList);
let experimentsInformation = JSON.parse(fileInfo.toString());
experimentsInformation['unittest'] = mockedInfo;
fs.writeFileSync(globals.paths.experimentsList, JSON.stringify(experimentsInformation, null, 4));
await initContainer();
nniManager = component.get(Manager);
// if trainingService is assigned, startExperiment won't create training service again
const manager = nniManager as any;
manager.trainingService = new MockedTrainingService('create_stage');
// making the trial status polling more frequent to reduce testing time, i.e., to 1 second
manager.pollInterval = 1;
const expId: string = await nniManager.startExperiment(experimentParams);
assert.strictEqual(expId, 'unittest');
// Sleep here because the start of tuner takes a while.
// Also, wait for that some trials are submitted, waiting for at most 10 seconds.
// NOTE: this waiting period should be long enough depending on different running environment and randomness.
for (let i = 0; i < 10; i++) {
await timersPromises.setTimeout(1000);
if (manager.currSubmittedTrialNum >= 2)
break;
}
assert.isAtLeast(manager.currSubmittedTrialNum, 2);
}
let experimentProfile: any = {
params: updateExperimentParams,
id: 'test',
execDuration: 0,
logDir: '',
startTime: 0,
nextSequenceId: 0,
revision: 0
async function cleanExperiment(): Promise<void> {
const manager: any = nniManager;
await killPid(manager.dispatcherPid);
manager.dispatcherPid = 0;
await manager.stopExperimentTopHalf();
await manager.stopExperimentBottomHalf();
await restServer.shutdown();
}
async function testListTrialJobs(): Promise<void> {
await timersPromises.setTimeout(200);
const trialJobDetails = await nniManager.listTrialJobs();
assert.isAtLeast(trialJobDetails.length, 2);
}
async function testGetTrialJobValid(): Promise<void> {
const trialJobDetail = await nniManager.getTrialJob('1234');
assert.strictEqual(trialJobDetail.trialJobId, '1234');
}
async function testGetTrialJobWithInvalidId(): Promise<void> {
// query a not exist id, getTrialJob returns undefined,
// because getTrialJob queries data from db
const trialJobDetail = await nniManager.getTrialJob('4321');
assert.strictEqual(trialJobDetail, undefined);
}
async function testCancelTrialJobByUser(): Promise<void> {
await nniManager.cancelTrialJobByUser('1234');
// test datastore to verify the trial is cancelled and the event is stored in db
// NOTE: it seems a SUCCEEDED trial can also be cancelled
const manager = nniManager as any;
const trialJobInfo: TrialJobInfo = await manager.dataStore.getTrialJob('1234');
assert.strictEqual(trialJobInfo.status, 'USER_CANCELED');
}
async function testGetExperimentProfile(): Promise<void> {
const profile = await nniManager.getExperimentProfile();
assert.strictEqual(profile.id, 'unittest');
assert.strictEqual(profile.logDir, path.join(os.homedir(),'nni-experiments','unittest'));
}
async function testUpdateExperimentProfileTrialConcurrency(concurrency: number): Promise<void> {
let expParams = Object.assign({}, experimentParams); // skip deep copy of inner object
expParams.trialConcurrency = concurrency;
experimentProfile.params = expParams;
await nniManager.updateExperimentProfile(experimentProfile, 'TRIAL_CONCURRENCY');
const profile = await nniManager.getExperimentProfile();
assert.strictEqual(profile.params.trialConcurrency, concurrency);
}
async function testUpdateExperimentProfileMaxExecDuration(): Promise<void> {
let expParams = Object.assign({}, experimentParams); // skip deep copy of inner object
expParams.maxExperimentDuration = '11s';
experimentProfile.params = expParams;
await nniManager.updateExperimentProfile(experimentProfile, 'MAX_EXEC_DURATION');
const profile = await nniManager.getExperimentProfile();
assert.strictEqual(profile.params.maxExperimentDuration, '11s');
}
async function testUpdateExperimentProfileSearchSpace(space: number[]): Promise<void> {
let expParams = Object.assign({}, experimentParams); // skip deep copy of inner object
// The search space here should be dict, it is stringified within nnimanager's updateSearchSpace
const newSearchSpace = {'lr': {'_type': 'choice', '_value': space}};
expParams.searchSpace = newSearchSpace;
experimentProfile.params = expParams;
await nniManager.updateExperimentProfile(experimentProfile, 'SEARCH_SPACE');
const profile = await nniManager.getExperimentProfile();
assert.strictEqual(profile.params.searchSpace, newSearchSpace);
}
async function testUpdateExperimentProfileMaxTrialNum(maxTrialNum: number): Promise<void> {
let expParams = Object.assign({}, experimentParams); // skip deep copy of inner object
expParams.maxTrialNumber = maxTrialNum;
experimentProfile.params = expParams;
await nniManager.updateExperimentProfile(experimentProfile, 'MAX_TRIAL_NUM');
const profile = await nniManager.getExperimentProfile();
assert.strictEqual(profile.params.maxTrialNumber, maxTrialNum);
}
async function testGetStatus(): Promise<void> {
const status = nniManager.getStatus();
// it is possible that the submitted trials run too fast to reach status NO_MORE_TRIAL
assert.include(['RUNNING', 'NO_MORE_TRIAL'], status.status);
}
async function testGetMetricDataWithTrialJobId(): Promise<void> {
// Query an exist trialJobId
// The metric is synthesized in the mocked training service
await timersPromises.setTimeout(600);
const metrics = await nniManager.getMetricData('1234');
assert.strictEqual(metrics.length, 1);
assert.strictEqual(metrics[0].type, 'FINAL');
assert.strictEqual(metrics[0].data, '"0.9"');
}
async function testGetMetricDataWithInvalidTrialJobId(): Promise<void> {
// Query an invalid trialJobId
const metrics = await nniManager.getMetricData('4321');
// The returned is an empty list
assert.strictEqual(metrics.length, 0);
}
async function testGetTrialJobStatistics(): Promise<void> {
// Waiting for 1 second to make sure SUCCEEDED status has been sent from
// the mocked training service. There would be at least one trials has
// SUCCEEDED status, i.e., '3456'.
// '1234' may be in SUCCEEDED status or USER_CANCELED status,
// depending on the order of SUCCEEDED and USER_CANCELED events.
// There are 4 trials, because maxTrialNumber is updated to 4.
// Then accordingly to the mocked training service, there are two trials
// SUCCEEDED, one trial RUNNING, and one trial WAITING.
// NOTE: The WAITING trial is not always submitted before the running of this test.
// An example statistics:
// [
// { trialJobStatus: 'SUCCEEDED', trialJobNumber: 2 },
// { trialJobStatus: 'RUNNING', trialJobNumber: 1 },
// { trialJobStatus: 'WAITING', trialJobNumber: 1 }
// ]
// or
// [
// { trialJobStatus: 'USER_CANCELED', trialJobNumber: 1 },
// { trialJobStatus: 'SUCCEEDED', trialJobNumber: 1 },
// { trialJobStatus: 'RUNNING', trialJobNumber: 1 },
// { trialJobStatus: 'WAITING', trialJobNumber: 1 }
// ]
for (let i = 0; i < 5; i++) {
await timersPromises.setTimeout(500);
const trialJobDetails = await nniManager.listTrialJobs();
if (trialJobDetails.length >= 4)
break;
}
let mockedInfo = {
"unittest": {
"port": 8080,
"startTime": 1605246730756,
"endTime": "N/A",
"status": "INITIALIZED",
"platform": "local",
"experimentName": "testExp",
"tag": [], "pid": 11111,
"webuiUrl": [],
"logDir": null
const statistics = await nniManager.getTrialJobStatistics();
assert.isAtLeast(statistics.length, 2);
const succeededTrials: TrialJobStatistics | undefined = statistics.find(element => element.trialJobStatus === 'SUCCEEDED');
if (succeededTrials) {
if (succeededTrials.trialJobNumber !== 2) {
const canceledTrials: TrialJobStatistics | undefined = statistics.find(element => element.trialJobStatus === 'USER_CANCELED');
if (canceledTrials)
assert.strictEqual(canceledTrials.trialJobNumber, 1);
else
assert.fail('USER_CANCELED trial not found when succeeded trial number is not 2!');
}
}
else
assert.fail('SUCCEEDED trial not found!');
const runningTrials: TrialJobStatistics | undefined = statistics.find(element => element.trialJobStatus === 'RUNNING');
if (runningTrials)
assert.strictEqual(runningTrials.trialJobNumber, 1);
else
assert.fail('RUNNING trial not found!');
const waitingTrials: TrialJobStatistics | undefined = statistics.find(element => element.trialJobStatus === 'WAITING');
if (waitingTrials)
assert.strictEqual(waitingTrials.trialJobNumber, 1);
else
assert.fail('RUNNING trial not found!');
}
async function testFinalExperimentStatus(): Promise<void> {
const status = nniManager.getStatus();
assert.notEqual(status.status, 'ERROR');
}
before(async () => {
await initContainer();
fs.writeFileSync('.experiment.test', JSON.stringify(mockedInfo));
nniManager = component.get(Manager);
describe('Unit test for nnimanager basic testing', function () {
const expId: string = await nniManager.startExperiment(experimentParams);
assert.strictEqual(expId, 'unittest');
before(prepareExperiment);
// TODO:
// In current architecture we cannot prevent NNI manager from creating a training service.
// The training service must be manually stopped here or its callbacks will block exit.
// I'm planning on a custom training service register system similar to custom tuner,
// and when that is done we can let NNI manager to use MockedTrainingService through config.
const manager = nniManager as any;
manager.trainingService.removeTrialJobMetricListener(manager.trialJobMetricListener);
manager.trainingService.cleanUp();
// it('test addCustomizedTrialJob', () => testAddCustomizedTrialJob());
it('test listTrialJobs', () => testListTrialJobs());
it('test getTrialJob valid', () => testGetTrialJobValid());
it('test getTrialJob with invalid id', () => testGetTrialJobWithInvalidId());
it('test cancelTrialJobByUser', () => testCancelTrialJobByUser());
it('test getExperimentProfile', () => testGetExperimentProfile());
it('test updateExperimentProfile TRIAL_CONCURRENCY', () => testUpdateExperimentProfileTrialConcurrency(4));
it('test updateExperimentProfile MAX_EXEC_DURATION', () => testUpdateExperimentProfileMaxExecDuration());
it('test updateExperimentProfile SEARCH_SPACE', () => testUpdateExperimentProfileSearchSpace([0.01,0.001,0.002,0.003,0.004,0.005]));
it('test updateExperimentProfile MAX_TRIAL_NUM', () => testUpdateExperimentProfileMaxTrialNum(4));
it('test getStatus', () => testGetStatus());
it('test getMetricData with trialJobId', () => testGetMetricDataWithTrialJobId());
it('test getMetricData with invalid trialJobId', () => testGetMetricDataWithInvalidTrialJobId());
it('test getTrialJobStatistics', () => testGetTrialJobStatistics());
// TODO: test experiment changes from Done to Running, after maxTrialNumber/maxExecutionDuration is updated.
// FIXME: make sure experiment crash leads to the ERROR state.
it('test the final experiment status is not ERROR', () => testFinalExperimentStatus());
manager.trainingService = new MockedTrainingService();
})
after(cleanExperiment);
after(async () => {
// FIXME: more proper clean up
const manager: any = nniManager;
await killPid(manager.dispatcherPid);
manager.dispatcherPid = 0;
await manager.stopExperimentTopHalf();
cleanupUnitTest();
})
});
async function resumeExperiment(): Promise<void> {
globals.reset();
// the following function call show nnimanager.log in console
// globals.showLog();
// explicitly reset the websocket channel because it is singleton, does not work when two experiments
// (one is start and the other is resume) run in the same process.
UnitTestHelpers.resetChannelSingleton();
await initContainer('resume');
nniManager = component.get(Manager);
// if trainingService is assigned, startExperiment won't create training service again
const manager = nniManager as any;
manager.trainingService = new MockedTrainingService('resume_stage');
// making the trial status polling more frequent to reduce testing time, i.e., to 1 second
manager.pollInterval = 1;
// as nniManager is a singleton, manually reset its member variables here.
manager.currSubmittedTrialNum = 0;
manager.trialConcurrencyChange = 0;
manager.dispatcherPid = 0;
manager.waitingTrials = [];
manager.trialJobs = new Map<string, TrialJobDetail>();
manager.trialDataForTuner = '';
manager.trialDataForResume = '';
manager.readonly = false;
manager.status = {
status: 'INITIALIZED',
errors: []
};
await nniManager.resumeExperiment(false);
}
it('test addCustomizedTrialJob', () => {
return nniManager.addCustomizedTrialJob('"hyperParams"').then(() => {
async function testMaxTrialNumberAfterResume(): Promise<void> {
// testing the resumed nnimanager correctly counts (max) trial number
// waiting 18 seconds to make trials reach maxTrialNum, waiting this long
// because trial concurrency is set to 1 and macos CI is pretty slow.
await timersPromises.setTimeout(18000);
const trialJobDetails = await nniManager.listTrialJobs();
assert.strictEqual(trialJobDetails.length, 5);
}
}).catch((error) => {
assert.fail(error);
})
})
async function testAddCustomizedTrialJobFail(): Promise<void> {
// will fail because the max trial number has already reached
await nniManager.addCustomizedTrialJob('{"lr": 0.006}')
.catch((err: Error) => {
assert.strictEqual(err.message, 'reach maxTrialNum');
});
}
async function testAddCustomizedTrialJob(): Promise<void> {
// max trial number has been extended to 7, adding customized trial here will be succeeded
const sequenceId = await nniManager.addCustomizedTrialJob('{"lr": 0.006}');
await timersPromises.setTimeout(1000);
const trialJobDetails = await nniManager.listTrialJobs();
const customized = trialJobDetails.find(element =>
element.hyperParameters !== undefined
&& element.hyperParameters[0] === '{"parameter_id":null,"parameter_source":"customized","parameters":{"lr":0.006}}');
assert.notEqual(customized, undefined);
}
it('test listTrialJobs', () => {
return nniManager.listTrialJobs().then(function (trialjobdetails) {
expect(trialjobdetails.length).to.be.equal(2);
}).catch((error) => {
assert.fail(error);
})
})
// NOTE: this describe should be executed in couple with the above describe
describe('Unit test for nnimanager resume testing', function() {
it('test getTrialJob valid', () => {
//query a exist id
return nniManager.getTrialJob('1234').then(function (trialJobDetail) {
expect(trialJobDetail.trialJobId).to.be.equal('1234');
}).catch((error) => {
assert.fail(error);
})
})
before(resumeExperiment);
it('test getTrialJob with invalid id', () => {
//query a not exist id, and the function should throw error, and should not process then() method
return nniManager.getTrialJob('4567').then((_jobid) => {
assert.fail();
}).catch((_error) => {
assert.isTrue(true);
})
})
// First update maxTrialNumber to 5 for the second test
it('test updateExperimentProfile TRIAL_CONCURRENCY', () => testUpdateExperimentProfileTrialConcurrency(1));
it('test updateExperimentProfile MAX_TRIAL_NUM', () => testUpdateExperimentProfileMaxTrialNum(5));
it('test max trial number after resume', () => testMaxTrialNumberAfterResume());
it('test add customized trial job failure', () => testAddCustomizedTrialJobFail());
// update search to contain only one hyper config, update maxTrialNum to add additional two trial budget,
// then a customized trial can be submitted successfully.
// NOTE: trial concurrency should be set to 1 to avoid tuner sending too many trials before the space is updated
it('test updateExperimentProfile SEARCH_SPACE', () => testUpdateExperimentProfileSearchSpace([0.008]));
it('test updateExperimentProfile MAX_TRIAL_NUM', () => testUpdateExperimentProfileMaxTrialNum(7));
it('test add customized trial job succeeded', () => testAddCustomizedTrialJob());
it('test the final experiment status is not ERROR', () => testFinalExperimentStatus());
it('test cancelTrialJobByUser', () => {
return nniManager.cancelTrialJobByUser('1234').then(() => {
after(cleanExperiment);
}).catch((error) => {
console.log(error);
assert.fail(error);
})
})
it('test getExperimentProfile', () => {
return nniManager.getExperimentProfile().then((experimentProfile) => {
expect(experimentProfile.id).to.be.equal('unittest');
expect(experimentProfile.logDir).to.be.equal(path.join(os.homedir(),'nni-experiments','unittest'));
}).catch((error) => {
assert.fail(error);
})
})
it('test updateExperimentProfile TRIAL_CONCURRENCY', () => {
return nniManager.updateExperimentProfile(experimentProfile, 'TRIAL_CONCURRENCY').then(() => {
nniManager.getExperimentProfile().then((updateProfile) => {
expect(updateProfile.params.trialConcurrency).to.be.equal(2);
});
}).catch((error) => {
assert.fail(error);
})
})
it('test updateExperimentProfile MAX_EXEC_DURATION', () => {
return nniManager.updateExperimentProfile(experimentProfile, 'MAX_EXEC_DURATION').then(() => {
nniManager.getExperimentProfile().then((updateProfile) => {
expect(updateProfile.params.maxExperimentDuration).to.be.equal('6s');
});
}).catch((error) => {
assert.fail(error);
})
})
it('test updateExperimentProfile SEARCH_SPACE', () => {
return nniManager.updateExperimentProfile(experimentProfile, 'SEARCH_SPACE').then(() => {
nniManager.getExperimentProfile().then((updateProfile) => {
expect(updateProfile.params.searchSpace).to.be.equal('{"lr": {"_type": "choice", "_value": [0.01,0.001]}}');
});
}).catch((error) => {
assert.fail(error);
})
})
it('test updateExperimentProfile MAX_TRIAL_NUM', () => {
return nniManager.updateExperimentProfile(experimentProfile, 'MAX_TRIAL_NUM').then(() => {
nniManager.getExperimentProfile().then((updateProfile) => {
expect(updateProfile.params.maxTrialNumber).to.be.equal(2);
});
}).catch((error: any) => {
assert.fail(error);
})
})
it('test getStatus', () => {
assert.strictEqual(nniManager.getStatus().status,'RUNNING');
})
it('test getMetricData with trialJobId', () => {
//query a exist trialJobId
return nniManager.getMetricData('4321', 'CUSTOM').then((metricData) => {
expect(metricData.length).to.be.equal(1);
expect(metricData[0].trialJobId).to.be.equal('4321');
expect(metricData[0].parameterId).to.be.equal('param1');
}).catch((error) => {
assert.fail(error);
})
})
it('test getMetricData with invalid trialJobId', () => {
//query an invalid trialJobId
return nniManager.getMetricData('43210', 'CUSTOM').then((_metricData) => {
assert.fail();
}).catch((_error) => {
})
})
it('test getTrialJobStatistics', () => {
// get 3 trial jobs (init, addCustomizedTrialJob, cancelTrialJobByUser)
return nniManager.getTrialJobStatistics().then(function (trialJobStatistics) {
expect(trialJobStatistics.length).to.be.equal(2);
if (trialJobStatistics[0].trialJobStatus === 'WAITING') {
expect(trialJobStatistics[0].trialJobNumber).to.be.equal(2);
expect(trialJobStatistics[1].trialJobNumber).to.be.equal(1);
}
else {
expect(trialJobStatistics[1].trialJobNumber).to.be.equal(2);
expect(trialJobStatistics[0].trialJobNumber).to.be.equal(1);
}
}).catch((error) => {
assert.fail(error);
})
})
it('test addCustomizedTrialJob reach maxTrialNumber', () => {
// test currSubmittedTrialNum reach maxTrialNumber
return nniManager.addCustomizedTrialJob('"hyperParam"').then(() => {
nniManager.getTrialJobStatistics().then(function (trialJobStatistics) {
if (trialJobStatistics[0].trialJobStatus === 'WAITING')
expect(trialJobStatistics[0].trialJobNumber).to.be.equal(2);
else
expect(trialJobStatistics[1].trialJobNumber).to.be.equal(2);
})
}).catch((error) => {
assert.fail(error);
})
})
//it('test resumeExperiment', async () => {
//TODO: add resume experiment unit test
//})
})
});

Просмотреть файл

@ -90,7 +90,7 @@ describe('## tuner_command_channel ##', () => {
it('send', () => testSend(client1));
it('receive', () => testReceive(client1));
it('mock timeout', testError);
// it('mock timeout', testError);
it('reconnect', testReconnect);
it('send after reconnect', () => testSend(client2));

Просмотреть файл

@ -9,8 +9,8 @@ import path from 'path';
import * as component from '../../common/component';
import { cleanupUnitTest, prepareUnitTest } from '../../common/utils';
import { ExperimentsManager } from 'extensions/experiments_manager';
import globals from 'common/globals/unittest';
import { ExperimentsManager } from '../../extensions/experiments_manager';
import globals from '../../common/globals/unittest';
let tempDir: string | null = null;
let experimentManager: ExperimentsManager;

Просмотреть файл

@ -3,6 +3,8 @@
'use strict';
import { assert } from 'chai';
import { EventEmitter } from 'events';
import { Deferred } from 'ts-deferred';
import { Provider } from 'typescript-ioc';
@ -10,57 +12,65 @@ import { MethodNotImplementedError } from '../../common/errors';
import { TrainingService, TrialJobApplicationForm, TrialJobDetail, TrialJobMetric } from '../../common/trainingService';
const testTrainingServiceProvider: Provider = {
get: () => { return new MockedTrainingService(); }
get: () => { return new MockedTrainingService(''); }
};
class MockedTrainingService extends TrainingService {
public mockedMetaDataValue: string = "default";
public jobDetail1: TrialJobDetail = {
id: '1234',
status: 'SUCCEEDED',
submitTime: Date.now(),
startTime: Date.now(),
endTime: Date.now(),
tags: ['test'],
url: 'http://test',
workingDirectory: '/tmp/mocked',
form: {
sequenceId: 0,
hyperParameters: { value: '', index: 0 }
},
};
public jobDetail2: TrialJobDetail = {
id: '3456',
status: 'SUCCEEDED',
submitTime: Date.now(),
startTime: Date.now(),
endTime: Date.now(),
tags: ['test'],
url: 'http://test',
workingDirectory: '/tmp/mocked',
form: {
sequenceId: 1,
hyperParameters: { value: '', index: 1 }
},
};
const jobDetailTemplate: TrialJobDetail = {
id: 'xxxx',
status: 'WAITING',
submitTime: Date.now(),
startTime: undefined,
endTime: undefined,
tags: ['test'],
url: 'http://test',
workingDirectory: '/tmp/mocked',
form: {
sequenceId: 0,
hyperParameters: { value: '', index: 0 }
},
};
const idStatusList = [
{id: '1234', status: 'RUNNING'},
{id: '3456', status: 'RUNNING'},
{id: '5678', status: 'RUNNING'},
{id: '7890', status: 'WAITING'}];
// the first two are the resumed trials
// the last three are newly submitted, among which there is one customized trial
const idStatusListResume = [
{id: '5678', status: 'RUNNING'},
{id: '7890', status: 'RUNNING'},
{id: '9012', status: 'RUNNING'},
{id: '1011', status: 'RUNNING'},
{id: '1112', status: 'RUNNING'}];
class MockedTrainingService implements TrainingService {
private readonly eventEmitter: EventEmitter;
private mockedMetaDataValue: string = "default";
private jobDetailList: Map<string, TrialJobDetail>;
private mode: string;
private submittedCnt: number = 0;
constructor(mode: string) {
this.eventEmitter = new EventEmitter();
this.mode = mode;
this.jobDetailList = new Map<string, TrialJobDetail>();
}
public listTrialJobs(): Promise<TrialJobDetail[]> {
const deferred = new Deferred<TrialJobDetail[]>();
deferred.resolve([this.jobDetail1, this.jobDetail2]);
return deferred.promise;
const trialJobs: TrialJobDetail[] = Array.from(this.jobDetailList.values());
return Promise.resolve(trialJobs);
}
public getTrialJob(trialJobId: string): Promise<TrialJobDetail> {
const deferred = new Deferred<TrialJobDetail>();
if(trialJobId === '1234'){
deferred.resolve(this.jobDetail1);
}else if(trialJobId === '3456'){
deferred.resolve(this.jobDetail2);
}else{
deferred.reject();
const jobDetail: TrialJobDetail | undefined = this.jobDetailList.get(trialJobId);
if (jobDetail !== undefined) {
return Promise.resolve(jobDetail);
}
else {
return Promise.reject('job id error');
}
return deferred.promise;
}
public getTrialFile(_trialJobId: string, _fileName: string): Promise<string> {
@ -72,14 +82,72 @@ class MockedTrainingService extends TrainingService {
}
public addTrialJobMetricListener(_listener: (_metric: TrialJobMetric) => void): void {
this.eventEmitter.on('metric', _listener);
}
public removeTrialJobMetricListener(_listener: (_metric: TrialJobMetric) => void): void {
this.eventEmitter.off('metric', _listener);
}
public submitTrialJob(_form: TrialJobApplicationForm): Promise<TrialJobDetail> {
const deferred = new Deferred<TrialJobDetail>();
return deferred.promise;
if (this.mode === 'create_stage') {
assert(this.submittedCnt < idStatusList.length);
const submittedOne: TrialJobDetail = Object.assign({},
jobDetailTemplate, idStatusList[this.submittedCnt],
{submitTime: Date.now(), startTime: Date.now(), form: _form});
this.jobDetailList.set(submittedOne.id, submittedOne);
this.submittedCnt++;
// only update the first two trials to SUCCEEDED
if (['1234', '3456'].includes(submittedOne.id)) {
// Emit metric data here for simplicity
// Set timeout to make sure when the metric is received by nnimanager,
// the corresponding trial job exists.
setTimeout(() => {
this.eventEmitter.emit('metric', {
id: submittedOne.id,
data: JSON.stringify({
'parameter_id': JSON.parse(submittedOne.form.hyperParameters.value)['parameter_id'],
'trial_job_id': submittedOne.id,
'type': 'FINAL',
'sequence': 0,
'value': '0.9'})
});
}, 100);
setTimeout(() => {
this.jobDetailList.set(submittedOne.id, Object.assign({}, submittedOne, {endTime: Date.now(), status: 'SUCCEEDED'}));
}, 150);
}
return Promise.resolve(submittedOne);
}
else if (this.mode === 'resume_stage') {
assert(this.submittedCnt < idStatusListResume.length);
const submittedOne: TrialJobDetail = Object.assign({},
jobDetailTemplate, idStatusListResume[this.submittedCnt],
{submitTime: Date.now(), startTime: Date.now(), form: _form});
this.jobDetailList.set(submittedOne.id, submittedOne);
this.submittedCnt++;
// Emit metric data here for simplicity
// Set timeout to make sure when the metric is received by nnimanager,
// the corresponding trial job exists.
setTimeout(() => {
this.eventEmitter.emit('metric', {
id: submittedOne.id,
data: JSON.stringify({
'parameter_id': JSON.parse(submittedOne.form.hyperParameters.value)['parameter_id'],
'trial_job_id': submittedOne.id,
'type': 'FINAL',
'sequence': 0,
'value': '0.9'})
});
}, 100);
setTimeout(() => {
this.jobDetailList.set(submittedOne.id, Object.assign({}, submittedOne, {endTime: Date.now(), status: 'SUCCEEDED'}));
}, 150);
return Promise.resolve(submittedOne);
}
else {
throw new Error('Unknown mode for the mocked training service!');
}
}
public updateTrialJob(_trialJobId: string, _form: TrialJobApplicationForm): Promise<TrialJobDetail> {
@ -91,13 +159,10 @@ class MockedTrainingService extends TrainingService {
}
public cancelTrialJob(trialJobId: string, _isEarlyStopped: boolean = false): Promise<void> {
const deferred = new Deferred<void>();
if(trialJobId === '1234' || trialJobId === '3456'){
deferred.resolve();
}else{
deferred.reject('job id error');
}
return deferred.promise;
if (this.jobDetailList.has(trialJobId))
return Promise.resolve();
else
return Promise.reject('job id error');
}
public setClusterMetadata(key: string, value: string): Promise<void> {

Просмотреть файл

@ -268,7 +268,7 @@ async function beforeHook(): Promise<void> {
async function afterHook() {
if (tmpDir !== null) {
await fs.rm(tmpDir, { force: true, recursive: true });
try { await fs.rm(tmpDir, { force: true, recursive: true }) } catch { };
}
if (server !== null) {