Training service util: GPU scheduler (library part) (#5219)

This commit is contained in:
liuzhe-lz 2022-11-23 10:50:36 +08:00 коммит произвёл GitHub
Родитель 9a289ec21a
Коммит 387bac64d3
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
9 изменённых файлов: 700 добавлений и 5 удалений

1
dependencies/required.txt поставляемый
Просмотреть файл

@ -5,6 +5,7 @@ filelock
json_tricks >= 3.15.5
numpy < 1.22 ; python_version < "3.8"
numpy ; python_version >= "3.8"
nvidia-ml-py
packaging
pandas
prettytable

Просмотреть файл

@ -0,0 +1,6 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.
"""
Collection of scripts used by NNI manager to do jobs that are hard for node.js.
"""

Просмотреть файл

@ -0,0 +1,174 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.
"""
Collect GPU utilization metrics, and debug info if ``--detail`` is specified.
Results are printed to stdout in JSON format.
See `ts/nni_manager/common/gpu_scheduler/collect_info` for details.
"""
# pylint: disable=wildcard-import
# pylint: disable=unused-wildcard-import
from __future__ import annotations
from dataclasses import asdict, dataclass
import json
import sys
from typing import Any, Literal
from pynvml import *
errors = set()
def main() -> None:
info = collect('--detail' in sys.argv)
if info:
data = asdict(info, dict_factory=dict_factory)
data['success'] = True
else:
data = {'success': False}
if errors:
data['errors'] = sorted(errors)
print(json.dumps(data), flush=True)
def dict_factory(obj):
ret = {}
for k, v in obj:
if k.startswith('_'):
continue
if v is None:
continue
words = k.split('_')
camel_k = words[0] + ''.join(word.title() for word in words[1:])
ret[camel_k] = v
return ret
def collect(detail: bool) -> SystemInfo | None:
try:
nvmlInit()
except Exception as e:
errors.add(f'init: {e}')
return None
info = None
try:
info = SystemInfo(detail)
except Exception as e:
errors.add(f'unexpected: {e}')
try:
nvmlShutdown()
except Exception as e:
errors.add(f'shutdown: {e}')
return info
@dataclass(init=False)
class SystemInfo:
gpu_number: int = 0
driver_version: str | None = None
cuda_version: int | None = None
gpus: list[GpuInfo]
processes: list[ProcessInfo]
def __init__(self, detail: bool):
self.gpus = []
self.processes = []
try:
self.gpu_number = nvmlDeviceGetCount()
except Exception as e:
errors.add(f'gpu_number: {e}')
if detail:
try:
self.driver_version = nvmlSystemGetDriverVersion()
except Exception as e:
errors.add(f'driver_version: {e}')
try:
self.cuda_version = nvmlSystemGetCudaDriverVersion_v2()
except Exception as e:
errors.add(f'cuda_version: {e}')
self.gpus = [GpuInfo(index, detail) for index in range(self.gpu_number)]
procs = []
for gpu in self.gpus:
procs += gpu._procs
self.processes = sorted(procs, key=(lambda proc: proc.pid))
@dataclass(init=False)
class GpuInfo:
index: int
model: str | None = None
cuda_cores: int | None = None
gpu_memory: int | None = None
free_gpu_memory: int | None = None
gpu_core_utilization: float | None = None
gpu_memory_utilization: float | None = None
_procs: list[ProcessInfo]
def __init__(self, index: int, detail: bool):
self.index = index
self._procs = []
try:
device = nvmlDeviceGetHandleByIndex(self.index)
except Exception as e:
errors.add(f'device: {e}')
return
if detail:
try:
self.model = nvmlDeviceGetName(device)
except Exception as e:
errors.add(f'model: {e}')
try:
self.cuda_cores = nvmlDeviceGetNumGpuCores(device)
except Exception as e:
errors.add(f'cuda_cores: {e}')
try:
mem = nvmlDeviceGetMemoryInfo(device)
self.gpu_memory = mem.total
self.free_gpu_memory = mem.free
except Exception as e:
errors.add(f'gpu_memory: {e}')
try:
util = nvmlDeviceGetUtilizationRates(device)
self.gpu_core_utilization = util.gpu / 100
self.gpu_memory_utilization = util.memory / 100
except Exception as e:
errors.add(f'gpu_utilization: {e}')
try:
cprocs = nvmlDeviceGetComputeRunningProcesses_v3(device)
gprocs = nvmlDeviceGetGraphicsRunningProcesses_v3(device)
self._procs += [ProcessInfo(proc, self.index, 'compute', detail) for proc in cprocs]
self._procs += [ProcessInfo(proc, self.index, 'graphics', detail) for proc in gprocs]
except Exception as e:
errors.add(f'process: {e}')
@dataclass(init=False)
class ProcessInfo:
pid: int
name: str | None = None
gpu_index: int
type: Literal['compute', 'graphics']
used_gpu_memory: int | None
def __init__(self, info: Any, gpu_index: int, type_: Literal['compute', 'graphics'], detail: bool):
self.pid = info.pid
if detail:
self.name = nvmlSystemGetProcessName(self.pid)
self.gpu_index = gpu_index
self.type = type_
self.used_gpu_memory = info.usedGpuMemory
if __name__ == '__main__':
main()

Просмотреть файл

@ -19,6 +19,7 @@
"nni/tools/annotation",
"nni/tools/gpu_tool",
"nni/tools/jupyter_extension",
"nni/tools/nni_manager_scripts/collect_gpu_info.py",
"nni/tools/nnictl",
"nni/tools/trial_tool"
],

Просмотреть файл

@ -0,0 +1,68 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
import { getLogger } from 'common/log';
import { runPythonModule } from 'common/pythonScript';
const logger = getLogger('GpuInfoCollector');
export interface GpuSystemInfo {
success: boolean;
gpuNumber: number;
gpus: GpuInfo[];
processes: ProcessInfo[];
timestamp: number;
}
export interface GpuInfo {
index: number;
gpuCoreUtilization: number;
gpuMemoryUtilization: number;
}
export interface ProcessInfo {
gpuIndex: number;
type: 'compute' | 'graphics';
}
let cache: GpuSystemInfo | null = null;
const minUpdateInterval = 10 * 1000;
export async function collectGpuInfo(forceUpdate?: boolean): Promise<GpuSystemInfo | null> {
if (!forceUpdate && cache !== null) {
if (Date.now() - cache.timestamp < minUpdateInterval) {
return cache;
}
}
let str: string;
try {
const args = (forceUpdate ? [ '--detail' ] : undefined);
str = await runPythonModule('nni.tools.training_service_scripts.collect_gpu_info', args);
} catch (error) {
logger.error('Failed to collect GPU info:', error);
return null;
}
let info: GpuSystemInfo;
try {
info = JSON.parse(str);
} catch (error) {
logger.error('Failed to collect GPU info, collector output:', str);
return null;
}
if (!info.success) {
logger.error('Failed to collect GPU info, collector output:', info);
return null
}
if (forceUpdate) {
logger.info('Forced update:', info);
} else {
logger.debug(info);
}
cache = info;
return info;
}

Просмотреть файл

@ -0,0 +1,4 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
export { GpuScheduler, ScheduleRestrictions } from './scheduler';

Просмотреть файл

@ -0,0 +1,248 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
/**
* A simple GPU scheduler used by local and remote training services.
**/
import { getLogger } from 'common/log';
import { GpuSystemInfo, collectGpuInfo as origCollectGpuInfo } from './collect_info';
const logger = getLogger('GpuScheduler');
let collectGpuInfo = origCollectGpuInfo; // for ut
export interface ScheduleRestrictions {
onlyAcceptIndices?: number[];
rejectActiveGpus?: boolean;
rejectComputeActiveGpus?: boolean;
}
interface SchedulerGpuInfo {
index: number;
util: number; // theoretical utilization calculated by NNI's trialGpuNumber
coreUtil: number; // real GPU core utilization (0 ~ 1)
memUtil: number; // real GPU memory utilization (0 ~ 1)
active: boolean;
computeActive: boolean;
}
interface SchedulerTrialInfo {
gpuIndex: number;
experimentId: string;
trialId: string;
util: number;
}
export class GpuScheduler {
private gpus: SchedulerGpuInfo[] = [];
private trials: SchedulerTrialInfo[] = [];
/**
* Initialize the scheduler.
*
* If there is no GPU found, throw error.
**/
public async init(): Promise<void> {
const info = await collectGpuInfo(true);
if (info === null) {
throw new Error('GpuScheduler: Failed to collect GPU info');
}
if (info.gpuNumber === 0) {
throw new Error('GpuScheduler: No GPU found');
}
for (let i = 0; i < info.gpuNumber; i++) {
this.gpus.push({
index: i,
util: 0,
coreUtil: 0,
memUtil: 0,
active: false,
computeActive: false,
});
}
this.updateGpus(info);
}
/**
* Update GPUs' utilization info.
*
* If `force` is not true, it may use cached result.
*
* This is normally unnecessary because it will implicitly update before scheduling.
**/
public async update(force?: boolean): Promise<void> {
const info = await collectGpuInfo(force);
if (info === null) {
if (force) {
throw new Error('GpuScheduler: Failed to update GPU info');
}
return;
}
if (info.gpuNumber !== this.gpus.length) {
throw new Error(`GpuScheduler: GPU number changed from ${this.gpus.length} to ${info.gpuNumber}`);
}
this.updateGpus(info);
}
/**
* Schedule a trial and return allocated GPU indices.
*
* If the trial cannot be scheduled for now, return `null`.
* If the trial requires more then physical GPUs, throw error.
*
* This scheduler does NOT monitor the trial's life span.
* The caller must invokes `release()` or `releaseAll()` later.
*
* `gpuNumber` can either be an integer or a float between 0 and 1.
*
* The `restrictions` parameter may contain following options:
*
* - onlyAcceptIndices:
*
* Limit usable GPUs by index.
* This is `gpuIndices` in experiment config.
*
* - rejectActiveGpus:
*
* Do not use GPUs with running processes.
* This is reversed `useActiveGpu` in experiment config.
*
* - rejectComputeActiveGpus:
*
* Do not use GPUs with CUDA processes, but still use GPUs with graphics processes.
* This is useful for desktop systems with graphical interface.
**/
public async schedule(
experimentId: string,
trialId: string,
gpuNumber: number,
restrictions?: ScheduleRestrictions): Promise<number[] | null> {
if (gpuNumber >= this.gpus.length) {
// TODO: push this message to web portal
logger.error(`GpuScheduler: Only have ${this.gpus.length} GPUs, requesting ${gpuNumber}`);
return null;
}
const gpus = this.sortGpus(restrictions ?? {});
if (gpuNumber < 1) {
const gpu = gpus[0];
if (gpu.util + gpuNumber > 1.001) {
return null;
}
gpu.util += gpuNumber;
this.trials.push({ gpuIndex: gpu.index, experimentId, trialId, util: gpuNumber });
logger.debug(`Scheduled ${experimentId}/${trialId} -> ${gpu.index}`);
return [ gpu.index ];
} else {
const n = Math.round(gpuNumber);
if (gpus.length < n || gpus[n - 1].util > 0) {
return null;
}
const indices = []
for (const gpu of gpus.slice(0, n)) {
gpu.util = 1;
this.trials.push({ gpuIndex: gpu.index, experimentId, trialId, util: 1 });
indices.push(gpu.index);
}
logger.debug(`Scheduled ${experimentId}/${trialId} ->`, indices);
return indices.sort((a, b) => (a - b));
}
}
/**
* Release a trial's allocated GPUs
**/
public async release(experimentId: string, trialId: string): Promise<void> {
this.releaseByFilter(trial => (trial.experimentId === experimentId && trial.trialId === trialId));
}
/**
* Release all trials of an experiment.
*
* Useful when the experiment is shutting down or has lost response.
**/
public async releaseAll(experimentId: string): Promise<void> {
logger.info('Release whole experiment', experimentId);
this.releaseByFilter(trial => (trial.experimentId === experimentId));
}
private updateGpus(info: GpuSystemInfo): void {
for (const gpu of info.gpus) {
const index = gpu.index;
this.gpus[index].coreUtil = gpu.gpuCoreUtilization ?? 0;
this.gpus[index].memUtil = gpu.gpuMemoryUtilization ?? 0;
this.gpus[index].active = false;
this.gpus[index].computeActive = false;
}
for (const proc of info.processes) {
const index = proc.gpuIndex;
this.gpus[index].active = true;
if (proc.type === 'compute') {
this.gpus[index].computeActive = true;
}
}
}
private sortGpus(restrict: ScheduleRestrictions): SchedulerGpuInfo[] {
let gpus = this.gpus.slice(); // copy for in-place sort
if (restrict.onlyAcceptIndices) {
gpus = gpus.filter(gpu => restrict.onlyAcceptIndices!.includes(gpu.index));
}
if (restrict.rejectActiveGpus) {
gpus = gpus.filter(gpu => !gpu.active);
}
if (restrict.rejectComputeActiveGpus) {
gpus = gpus.filter(gpu => !gpu.computeActive);
}
// prefer the gpu with lower theoretical utilization;
// then the gpu without competing processes;
// then the gpu with more free memory;
// and finally the gpu with lower cuda core load.
return gpus.sort((a, b) => {
if (a.util !== b.util) {
return a.util - b.util;
}
if (a.active !== b.active) {
return Number(a.active) - Number(b.active);
}
if (a.computeActive !== b.computeActive) {
return Number(a.computeActive) - Number(b.computeActive);
}
if (a.memUtil !== b.memUtil) {
return a.memUtil - b.memUtil;
}
if (a.coreUtil !== b.coreUtil) {
return a.coreUtil - b.coreUtil;
}
return a.index - b.index;
});
}
private releaseByFilter(filter: (trial: SchedulerTrialInfo) => boolean): void {
const trials = this.trials.filter(filter);
trials.forEach(trial => {
logger.debug(`Released ${trial.experimentId}/${trial.trialId}`);
this.gpus[trial.gpuIndex].util -= trial.util;
});
this.trials = this.trials.filter(trial => !filter(trial));
}
}
export namespace UnitTestHelpers {
export function mockGpuInfo(info: GpuSystemInfo): void {
collectGpuInfo = (_?: boolean): any => Promise.resolve(info);
}
export function getGpuUtils(scheduler: GpuScheduler): number[] {
return (scheduler as any).gpus.map((gpu: SchedulerGpuInfo) => gpu.util);
}
}

Просмотреть файл

@ -7,8 +7,17 @@ import { Logger, getLogger } from './log';
const logger: Logger = getLogger('pythonScript');
export async function runPythonScript(script: string, logTag?: string): Promise<string> {
const proc = spawn(globals.args.pythonInterpreter, [ '-c', script ]);
export function runPythonScript(script: string, logTag?: string): Promise<string> {
return runPython([ '-c', script ], logTag);
}
export function runPythonModule(moduleName: string, args?: string[]): Promise<string> {
const argsArr = args ?? [];
return runPython([ '-m', moduleName , ...argsArr ], moduleName);
}
export async function runPython(args: string[], logTag?: string): Promise<string> {
const proc = spawn(globals.args.pythonInterpreter, args);
let stdout: string = '';
let stderr: string = '';
@ -23,10 +32,10 @@ export async function runPythonScript(script: string, logTag?: string): Promise<
if (stderr) {
if (logTag) {
logger.warning(`Python script [${logTag}] has stderr:`, stderr);
logger.warning(`Python command [${logTag}] has stderr:`, stderr);
} else {
logger.warning('Python script has stderr.');
logger.warning(' script:', script);
logger.warning('Python command has stderr.');
logger.warning(' args:', args);
logger.warning(' stderr:', stderr);
}
}

Просмотреть файл

@ -0,0 +1,184 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
import assert from 'assert/strict';
import type { GpuSystemInfo } from 'common/gpu_scheduler/collect_info';
import { GpuScheduler, UnitTestHelpers as Helpers } from 'common/gpu_scheduler/scheduler';
let scheduler: GpuScheduler;
const gpuInfo: GpuSystemInfo = {
success: true,
gpuNumber: 3,
gpus: [
{ index: 0, gpuCoreUtilization: 0, gpuMemoryUtilization: 0 },
{ index: 1, gpuCoreUtilization: 0, gpuMemoryUtilization: 0 },
{ index: 2, gpuCoreUtilization: 0, gpuMemoryUtilization: 0 },
],
processes: [],
timestamp: 0,
}
describe('## gpu scheduler ##', () => {
it('init', () => testInit());
it('schedule small trials', () => testScheduleSmall());
it('release all', () => testReleaseSmall());
it('schedule large trials', () => testScheduleLarge());
it('release one by one', () => testReleaseLarge());
it('schedule hybrid', () => testScheduleHybrid());
it('restrict index', () => testRestrictIndex());
it('restrict active', () => testRestrictActive());
it('prefer idle', () => testActivePriority());
it('prefer lower load', () => testUtilPriority());
});
async function testInit(): Promise<void> {
Helpers.mockGpuInfo(gpuInfo);
scheduler = new GpuScheduler();
await scheduler.init();
}
async function testScheduleSmall(): Promise<void> {
const idx1 = await scheduler.schedule('small', '1', 0.5); // [0]
const idx2 = await scheduler.schedule('small', '2', 0.5); // [1]
const idx3 = await scheduler.schedule('small', '3', 0.5); // [2]
const idx4 = await scheduler.schedule('small', '4', 0.6); // null
const idx5 = await scheduler.schedule('small', '5', 0.5); // [0]
assert.equal(idx4, null);
const count = [ 0, 0, 0 ]; // count how many times each GPU is scheduled
for (const idx of [ idx1, idx2, idx3, idx5 ]) {
assert.notEqual(idx, null);
assert.equal(idx!.length, 1);
count[idx![0]] += 1;
}
assert.deepEqual(count.sort(), [ 1, 1, 2 ]);
}
async function testReleaseSmall(): Promise<void> {
scheduler.releaseAll('small');
const utils = Helpers.getGpuUtils(scheduler);
assert.deepEqual(utils, [ 0, 0, 0 ]);
}
async function testScheduleLarge(): Promise<void> {
const idx1 = await scheduler.schedule('large1', 'x', 2); // [0,1]
const idx2 = await scheduler.schedule('large2', 'x', 2); // null
const idx3 = await scheduler.schedule('large3', 'x', 1); // [2]
assert.notEqual(idx1, null);
assert.equal(idx1!.length, 2);
assert.equal(idx2, null);
assert.notEqual(idx3, null);
assert.equal(idx3!.length, 1);
assert.deepEqual([ ...idx1!, ...idx3! ].sort(), [ 0, 1, 2 ]);
}
async function testReleaseLarge(): Promise<void> {
scheduler.release('large1', 'x');
let utils = Helpers.getGpuUtils(scheduler);
assert.deepEqual(utils.sort(), [ 0, 0, 1 ]);
scheduler.release('large3', 'x');
utils = Helpers.getGpuUtils(scheduler);
assert.deepEqual(utils.sort(), [ 0, 0, 0 ]);
}
async function testScheduleHybrid(): Promise<void> {
const idx1 = await scheduler.schedule('small', '1', 0.5); // [0]
const idx2 = await scheduler.schedule('large', '1', 1); // [1]
const idx3 = await scheduler.schedule('large', '2', 2); // null
scheduler.release('large', '1');
const idx4 = await scheduler.schedule('large', '3', 2); // [1,2]
assert.notEqual(idx1, null);
assert.equal(idx1!.length, 1);
assert.notEqual(idx2, null);
assert.equal(idx2!.length, 1);
assert.equal(idx3, null);
assert.notEqual(idx4, null);
assert.equal(idx4!.length, 2);
assert.notEqual(idx1![0], idx2![0]);
assert.deepEqual([ ...idx1!, ...idx4! ].sort(), [ 0, 1, 2 ]);
scheduler.releaseAll('small');
scheduler.releaseAll('large');
}
async function testRestrictIndex(): Promise<void> {
const idx1 = await scheduler.schedule('r', '1', 0.5, { onlyAcceptIndices: [ 1 ] });
const idx2 = await scheduler.schedule('r', '2', 2, { onlyAcceptIndices: [ 1, 2 ] });
const idx3 = await scheduler.schedule('r', '3', 1, { onlyAcceptIndices: [ 1, 2 ] });
assert.deepEqual(idx1, [ 1 ]);
assert.equal(idx2, null);
assert.deepEqual(idx3, [ 2 ]);
scheduler.releaseAll('r');
}
async function testRestrictActive(): Promise<void> {
gpuInfo.processes = [
{ gpuIndex: 0, type: 'graphics' },
{ gpuIndex: 1, type: 'compute' },
];
await scheduler.update();
const idx1 = await scheduler.schedule('r', '1', 1, { rejectActiveGpus: true });
const idx2 = await scheduler.schedule('r', '2', 1, { rejectActiveGpus: true });
const idx3 = await scheduler.schedule('r', '3', 1, { rejectComputeActiveGpus: true });
assert.deepEqual(idx1, [ 2 ]);
assert.equal(idx2, null);
assert.deepEqual(idx3, [ 0 ]);
scheduler.releaseAll('r');
}
async function testActivePriority(): Promise<void> {
gpuInfo.processes = [
{ gpuIndex: 0, type: 'graphics' },
{ gpuIndex: 1, type: 'compute' },
];
await scheduler.update();
const idx1 = await scheduler.schedule('p', '1', 1);
const idx2 = await scheduler.schedule('p', '2', 1);
const idx3 = await scheduler.schedule('p', '3', 1);
assert.deepEqual(idx1, [ 2 ]);
assert.deepEqual(idx2, [ 0 ]);
assert.deepEqual(idx3, [ 1 ]);
scheduler.releaseAll('p');
}
async function testUtilPriority(): Promise<void> {
gpuInfo.gpus[0].gpuCoreUtilization = 50;
gpuInfo.gpus[1].gpuCoreUtilization = 10;
gpuInfo.gpus[2].gpuMemoryUtilization = 20;
gpuInfo.processes = [];
await scheduler.update();
const idx1 = await scheduler.schedule('p', '1', 1);
const idx2 = await scheduler.schedule('p', '2', 1);
const idx3 = await scheduler.schedule('p', '3', 1);
assert.deepEqual(idx1, [ 1 ]);
assert.deepEqual(idx2, [ 0 ]);
assert.deepEqual(idx3, [ 2 ]);
scheduler.releaseAll('p');
}