[FEAT]: resume waiting/running, dedup on tuner side (TPE-only) (#4931)

This commit is contained in:
Weidan Kong 2022-08-10 05:13:24 -07:00 коммит произвёл GitHub
Родитель 2baae4d053
Коммит d03c411c8e
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
16 изменённых файлов: 100 добавлений и 19 удалений

Просмотреть файл

@ -215,7 +215,21 @@ class TpeTuner(Tuner):
loss = -loss
for key, value in param.items():
self._history[key].append(Record(value, loss))
_logger.info(f'Replayed {len(data)} trials')
self.dedup.add_history(param)
_logger.info(f'Replayed {len(data)} FINISHED trials')
def import_customized_data(self, data): # for dedup customized / resumed
if isinstance(data, str):
data = nni.load(data)
for trial in data:
# {'parameter_id': 0, 'parameter_source': 'resumed', 'parameters': {'batch_size': 128, ...}
if isinstance(trial, str):
trial = nni.load(trial)
param = format_parameters(trial['parameters'], self.space)
self._running_params[trial['parameter_id']] = param
self.dedup.add_history(param)
_logger.info(f'Replayed {len(data)} RUNING/WAITING trials')
def suggest(args, rng, space, history):
params = {}

Просмотреть файл

@ -79,6 +79,12 @@ class Deduplicator:
self._history.add(params_str)
return True
def add_history(self, formatted_parameters: FormattedParameters) -> None:
params = deformat_parameters(formatted_parameters, self._space)
params_str = typing.cast(str, nni.dump(params, sort_keys=True))
if params_str not in self._history:
self._history.add(params_str)
def _spec_never_dup(spec: ParameterSpec) -> bool:
if spec.is_nested():
return False # "not chosen" duplicates with "not chosen"

Просмотреть файл

@ -121,8 +121,14 @@ class MsgDispatcher(MsgDispatcherBase):
def handle_add_customized_trial(self, data):
# data: parameters
id_ = _create_parameter_id()
_customized_parameter_ids.add(id_)
if not isinstance(data, list):
data = [data]
for _ in data:
id_ = _create_parameter_id()
_customized_parameter_ids.add(id_)
self.tuner.import_customized_data(data)
def handle_report_metric_data(self, data):
"""
@ -187,7 +193,8 @@ class MsgDispatcher(MsgDispatcherBase):
self.tuner.receive_trial_result(id_, _trial_params[id_], value, customized=customized,
trial_job_id=data.get('trial_job_id'))
else:
_logger.warning('Find unknown job parameter id %s, maybe something goes wrong.', _trial_params[id_])
_logger.warning('Find unknown job parameter id %s, maybe something goes wrong.', id_)
_logger.warning('_trial_params %s', _trial_params)
def _handle_intermediate_metric_data(self, data):
"""Call assessor to process intermediate results

Просмотреть файл

@ -219,6 +219,14 @@ class Tuner(Recoverable):
# data: a list of dictionarys, each of which has at least two keys, 'parameter' and 'value'
pass
def import_customized_data(self, data: list[TrialRecord]) -> None:
"""
Internal API under revising, not recommended for end users.
"""
# Import resume data for avoiding duplications
# data: a list of dictionarys, each of which has at least two keys, 'parameter_id' and 'parameters'
pass
def _on_exit(self) -> None:
pass

Просмотреть файл

@ -272,7 +272,7 @@ class BuiltinTunersTestCase(TestCase):
search_space = {
"choice_str": {
"_type": "choice",
"_value": ["cat", "dog", "elephant", "cow", "sheep", "panda"]
"_value": ["cat", "dog", "elephant", "cow", "sheep", "panda", "tiger"]
}
}
elif stype == "choice_num":

Просмотреть файл

@ -4,7 +4,7 @@
import { ExperimentProfile, TrialJobStatistics } from './manager';
import { TrialJobDetail, TrialJobStatus } from './trainingService';
type TrialJobEvent = TrialJobStatus | 'USER_TO_CANCEL' | 'ADD_CUSTOMIZED' | 'ADD_HYPERPARAMETER' | 'IMPORT_DATA';
type TrialJobEvent = TrialJobStatus | 'USER_TO_CANCEL' | 'ADD_CUSTOMIZED' | 'ADD_HYPERPARAMETER' | 'IMPORT_DATA' |'ADD_RESUMED';
type MetricType = 'PERIODICAL' | 'FINAL' | 'CUSTOM' | 'REQUEST_PARAMETER';
interface ExperimentProfileRecord {

Просмотреть файл

@ -34,6 +34,7 @@ interface TrialJobApplicationForm {
readonly sequenceId: number;
readonly hyperParameters: HyperParameters;
readonly placementConstraint?: PlacementConstraint;
id?: string;
}
interface TrialCommandContent {

Просмотреть файл

@ -23,7 +23,7 @@ import {
import { delay, getCheckpointDir, getExperimentRootDir, getLogDir, getMsgDispatcherCommand, mkDirP, getTunerProc, getLogLevel, isAlive, killPid } from '../common/utils';
import {
INITIALIZE, INITIALIZED, KILL_TRIAL_JOB, NEW_TRIAL_JOB, NO_MORE_TRIAL_JOBS, PING,
REPORT_METRIC_DATA, REQUEST_TRIAL_JOBS, SEND_TRIAL_JOB_PARAMETER, TERMINATE, TRIAL_END, UPDATE_SEARCH_SPACE, IMPORT_DATA
REPORT_METRIC_DATA, REQUEST_TRIAL_JOBS, SEND_TRIAL_JOB_PARAMETER, TERMINATE, TRIAL_END, UPDATE_SEARCH_SPACE, IMPORT_DATA, ADD_CUSTOMIZED_TRIAL_JOB
} from './commands';
import { createDispatcherInterface, IpcInterface } from './ipcInterface';
@ -43,6 +43,7 @@ class NNIManager implements Manager {
private waitingTrials: TrialJobApplicationForm[];
private trialJobs: Map<string, TrialJobDetail>;
private trialDataForTuner: string;
private trialDataForResume: string;
private readonly: boolean;
private config!: ExperimentConfig;
@ -55,6 +56,7 @@ class NNIManager implements Manager {
this.waitingTrials = [];
this.trialJobs = new Map<string, TrialJobDetail>();
this.trialDataForTuner = '';
this.trialDataForResume = '';
this.readonly = false;
this.log = getLogger('NNIManager');
@ -118,6 +120,45 @@ class NNIManager implements Manager {
return this.dataStore.exportTrialHpConfigs();
}
public addRecoveredTrialJob(allTrialJobs: Array<TrialJobInfo>): void {
const jobs: Array<TrialJobInfo> = allTrialJobs.filter((job: TrialJobInfo) => job.status === 'WAITING' || job.status === 'RUNNING');
const trialData: any[] = [];
let maxSequeceId = 0;
for (const job of jobs) {
if (job.sequenceId === undefined || job.hyperParameters === undefined) {
this.log.warning('The trial to be recovered missing sequenceId and/or hyperParameters', job);
continue;
}
const params: string = job.hyperParameters[0];
const sequenceId: number = job.sequenceId;
maxSequeceId = Math.max(maxSequeceId, sequenceId);
const hyperParams = JSON.parse(params);
const packedParameter = {
parameter_id: hyperParams['parameter_id'], // eslint-disable-line @typescript-eslint/camelcase
parameter_source: 'resumed', // eslint-disable-line @typescript-eslint/camelcase
parameters: hyperParams['parameters'],
parameter_index: hyperParams['parameter_index'], // eslint-disable-line @typescript-eslint/camelcase
}
const form: TrialJobApplicationForm = {
id: job.trialJobId,
sequenceId: sequenceId,
hyperParameters: {
value: JSON.stringify(packedParameter),
index: 0
},
};
this.waitingTrials.push(form);
trialData.push(packedParameter);
this.dataStore.storeTrialJobEvent('ADD_RESUMED', job.trialJobId, '');
}
this.trialDataForResume = JSON.stringify(trialData);
// next sequenceId
this.experimentProfile.nextSequenceId = maxSequeceId + 1;
}
public addCustomizedTrialJob(hyperParams: string): Promise<number> {
if (this.readonly) {
return Promise.reject(new Error('Error: can not add customized trial job in readonly mode!'));
@ -220,11 +261,7 @@ class NNIManager implements Manager {
// Resume currSubmittedTrialNum
this.currSubmittedTrialNum = allTrialJobs.length;
// Check the final status for WAITING and RUNNING jobs
await Promise.all(allTrialJobs
.filter((job: TrialJobInfo) => job.status === 'WAITING' || job.status === 'RUNNING')
.map((job: TrialJobInfo) => this.dataStore.storeTrialJobEvent('FAILED', job.trialJobId)));
this.addRecoveredTrialJob(allTrialJobs);
// Collect generated trials and imported trials
const finishedTrialData: string = await this.exportData();
@ -807,6 +844,12 @@ class NNIManager implements Manager {
}
this.dispatcher.sendCommand(IMPORT_DATA, this.trialDataForTuner);
}
if (this.trialDataForResume.length > 0 ) {
if (this.dispatcher === undefined) {
throw new Error('Dispatcher error: tuner has not been setup');
}
this.dispatcher.sendCommand(ADD_CUSTOMIZED_TRIAL_JOB, this.trialDataForResume);
}
this.requestTrialJobs(this.experimentProfile.params.trialConcurrency);
break;
}

Просмотреть файл

@ -229,6 +229,8 @@ class MockedDataStore implements DataStore {
return 'USER_CANCELED';
case 'ADD_CUSTOMIZED':
return 'WAITING';
case 'ADD_RESUMED':
return 'WAITING';
}
return <TrialJobStatus>event;
}

Просмотреть файл

@ -116,7 +116,7 @@ class AdlTrainingService extends KubernetesTrainingService implements Kubernetes
this.kubernetesRestServerPort = restServer.clusterRestServerPort;
}
const trialJobId: string = uniqueString(5);
const trialJobId: string = form.id === undefined ? uniqueString(5) : form.id;
const adlJobName: string = `nni-exp-${this.experimentId}-trial-${trialJobId}`.toLowerCase();
const initStatus: TrialJobStatus = 'WAITING';
const codeDir = this.adlTrialConfig.codeDir;

Просмотреть файл

@ -131,7 +131,7 @@ class FrameworkControllerTrainingService extends KubernetesTrainingService imple
await this.copyExpCodeDirPromise;
}
const trialJobId: string = uniqueString(5);
const trialJobId: string = form.id === undefined ? uniqueString(5) : form.id;
// Set trial's NFS working folder
const trialWorkingFolder: string = path.join(this.CONTAINER_MOUNT_PATH, 'nni', getExperimentId(), trialJobId);
const trialLocalTempFolder: string = path.join(getExperimentRootDir(), 'trials', trialJobId);

Просмотреть файл

@ -78,7 +78,7 @@ class KubeflowTrainingService extends KubernetesTrainingService implements Kuber
await this.copyExpCodeDirPromise;
}
const trialJobId: string = uniqueString(5);
const trialJobId: string = form.id === undefined ? uniqueString(5) : form.id;
const trialWorkingFolder: string = path.join(this.CONTAINER_MOUNT_PATH, 'nni', getExperimentId(), trialJobId);
const kubeflowJobName: string = `nni-exp-${this.experimentId}-trial-${trialJobId}`.toLowerCase();
const trialLocalTempFolder: string = path.join(getExperimentRootDir(), 'trials', trialJobId);

Просмотреть файл

@ -193,7 +193,7 @@ class LocalTrainingService implements TrainingService {
}
public submitTrialJob(form: TrialJobApplicationForm): Promise<TrialJobDetail> {
const trialJobId: string = uniqueString(5);
const trialJobId: string = form.id === undefined ? uniqueString(5) : form.id;
const trialJobDetail: LocalTrialJobDetail = new LocalTrialJobDetail(
trialJobId,
'WAITING',

Просмотреть файл

@ -248,7 +248,7 @@ class PAITrainingService implements TrainingService {
public async submitTrialJob(form: TrialJobApplicationForm): Promise<TrialJobDetail> {
this.log.info('submitTrialJob: form:', form);
const trialJobId: string = uniqueString(5);
const trialJobId: string = form.id === undefined ? uniqueString(5) : form.id;
//TODO: use HDFS working folder instead
const trialWorkingFolder: string = path.join(this.expRootDir, 'trials', trialJobId);
const paiJobName: string = `nni_exp_${this.experimentId}_trial_${trialJobId}`;

Просмотреть файл

@ -226,7 +226,7 @@ class RemoteMachineTrainingService implements TrainingService {
*/
public async submitTrialJob(form: TrialJobApplicationForm): Promise<TrialJobDetail> {
// Generate trial job id(random)
const trialJobId: string = uniqueString(5);
const trialJobId: string = form.id === undefined ? uniqueString(5) : form.id;
const trialJobDetail: RemoteMachineTrialJobDetail = new RemoteMachineTrialJobDetail(
trialJobId,

Просмотреть файл

@ -160,7 +160,7 @@ class TrialDispatcher implements TrainingService {
}
public async submitTrialJob(form: TrialJobApplicationForm): Promise<TrialDetail> {
const trialId: string = uniqueString(5);
const trialId: string = form.id === undefined ? uniqueString(5) : form.id;
const trialJobDetail: TrialDetail = new TrialDetail(trialId, "WAITING", Date.now(), "", form);