Merge branch 'executionfilter' into v2
This commit is contained in:
Коммит
b41e682ab3
|
@ -0,0 +1,25 @@
|
|||
# python2, python3
|
||||
|
||||
import sys, json
|
||||
|
||||
j = json.load(sys.stdin)
|
||||
|
||||
commandLine = j['m_Item2'].get('commandLine')
|
||||
mpiSource = j['m_Item2']['environmentVariables'].get('CCP_MPI_SOURCE')
|
||||
if commandLine and mpiSource:
|
||||
if commandLine.startswith('mpirun ') or commandLine.startswith('mpiexec '):
|
||||
machineFileOption = '-machinefile $CCP_MPI_HOSTFILE'
|
||||
commandLineSplit = commandLine.split(' ')
|
||||
if mpiSource.endswith('/mpirun') or mpiSource.endswith('/mpiexec'):
|
||||
commandLineSplit[0] = mpiSource
|
||||
elif mpiSource.endswith('/'):
|
||||
commandLineSplit[0] = '{}{}'.format(mpiSource, commandLineSplit[0])
|
||||
elif mpiSource.endswith('/mpivars.sh'):
|
||||
commandLineSplit[0] = 'source {}; {}'.format(mpiSource, commandLineSplit[0])
|
||||
else:
|
||||
commandLineSplit[0] = '{}/{}'.format(mpiSource, commandLineSplit[0])
|
||||
commandLineSplit.insert(1, machineFileOption)
|
||||
commandLine = ' '.join(commandLineSplit)
|
||||
j['m_Item2']['commandLine'] = commandLine
|
||||
|
||||
print(json.dumps(j))
|
|
@ -0,0 +1,32 @@
|
|||
# python2, python3
|
||||
|
||||
import sys, json, subprocess, math
|
||||
|
||||
j = json.load(sys.stdin)
|
||||
|
||||
numaInfo = subprocess.check_output('lscpu | grep NUMA', shell = True)
|
||||
numaCoreId = []
|
||||
for line in numaInfo.splitlines():
|
||||
if line.startswith('NUMA') and 'CPU(s):' in line:
|
||||
coreIds = line.split()[-1].split(',')
|
||||
for coreId in coreIds:
|
||||
if '-' in coreId:
|
||||
beginEnd = map(int, coreId.split('-'))
|
||||
numaCoreId += list(range(beginEnd[0], beginEnd[1] + 1))
|
||||
else:
|
||||
numaCoreId.append(int(coreId))
|
||||
|
||||
AFFINITY_BITS = 64
|
||||
if numaCoreId:
|
||||
affinity = j['m_Item2'].get('affinity') # This is an array of signed 64 bit number, which will be converted to bit array format for adjusting core id.
|
||||
if affinity:
|
||||
affinityList = [bit for int64 in affinity for bit in list('{:064b}'.format((2 ** AFFINITY_BITS - 1) & int64))[::-1]]
|
||||
mappedCoreIds = set([numaCoreId[coreId] for coreId in range(len(affinityList)) if affinityList[coreId] == '1'])
|
||||
mappedAffinityList = ['1' if coreId in mappedCoreIds else '0' for coreId in range(int(math.ceil(float(len(numaCoreId)) / AFFINITY_BITS) * AFFINITY_BITS))]
|
||||
j['m_Item2']['affinity'] = [int(''.join(mappedAffinityList[i * AFFINITY_BITS : (i + 1) * AFFINITY_BITS - 1][::-1]), 2) - int(mappedAffinityList[(i + 1) * AFFINITY_BITS - 1]) * 2 ** (AFFINITY_BITS - 1) for i in range(len(mappedAffinityList) // AFFINITY_BITS)]
|
||||
|
||||
ccpCoreIds = j['m_Item2']['environmentVariables'].get('CCP_COREIDS')
|
||||
if ccpCoreIds:
|
||||
j['m_Item2']['environmentVariables']['CCP_COREIDS'] = ' '.join([str(numaCoreId[originalCoreId]) for originalCoreId in map(int, ccpCoreIds.split())])
|
||||
|
||||
print(json.dumps(j))
|
|
@ -1 +1 @@
|
|||
sed -s 's/123/456/g'
|
||||
cat
|
|
@ -1 +1,19 @@
|
|||
sed -s 's/123/456/g'
|
||||
#!/bin/bash
|
||||
|
||||
input=`cat`
|
||||
job_id=`echo $input | grep -o '"JobId":[[:digit:]]*' | awk -F: '{print $NF}'`
|
||||
task_id=`echo $input | grep -o '"TaskId":[[:digit:]]*' | awk -F: '{print $NF}'`
|
||||
requeue_count=`echo $input | grep -o '"taskRequeueCount":[[:digit:]]*' | awk -F: '{print $NF}'`
|
||||
log_dir=/opt/hpcnodemanager/filters/ERROR_LOG
|
||||
mkdir -p $log_dir
|
||||
log_prefix=$log_dir/$job_id.$task_id.$requeue_count
|
||||
log_input=$log_prefix.input
|
||||
log_error=$log_prefix.error
|
||||
|
||||
echo $input | (\
|
||||
python /opt/hpcnodemanager/filters/AdjustTaskAffinity.py | \
|
||||
python /opt/hpcnodemanager/filters/AdjustMpiCommand.py \
|
||||
) 2>$log_error
|
||||
|
||||
error_code=$?
|
||||
[ ! -s $log_error ] && [ "$error_code" -eq "0" ] && rm -f $log_error || (echo $input >$log_input && exit $error_code)
|
|
@ -1 +1,19 @@
|
|||
sed -s 's/123/456/g'
|
||||
#!/bin/bash
|
||||
|
||||
input=`cat`
|
||||
job_id=`echo $input | grep -o '"JobId":[[:digit:]]*' | awk -F: '{print $NF}'`
|
||||
task_id=`echo $input | grep -o '"TaskId":[[:digit:]]*' | awk -F: '{print $NF}'`
|
||||
requeue_count=`echo $input | grep -o '"taskRequeueCount":[[:digit:]]*' | awk -F: '{print $NF}'`
|
||||
log_dir=/opt/hpcnodemanager/filters/ERROR_LOG
|
||||
mkdir -p $log_dir
|
||||
log_prefix=$log_dir/$job_id.$task_id.$requeue_count
|
||||
log_input=$log_prefix.input
|
||||
log_error=$log_prefix.error
|
||||
|
||||
echo $input | (\
|
||||
python /opt/hpcnodemanager/filters/AdjustTaskAffinity.py | \
|
||||
python /opt/hpcnodemanager/filters/AdjustMpiCommand.py \
|
||||
) 2>$log_error
|
||||
|
||||
error_code=$?
|
||||
[ ! -s $log_error ] && [ "$error_code" -eq "0" ] && rm -f $log_error || (echo $input >$log_input && exit $error_code)
|
|
@ -132,6 +132,8 @@ define finish
|
|||
$(indent);
|
||||
cp filters/*.sh $(2)/filters
|
||||
$(indent);
|
||||
cp filters/*.py $(2)/filters
|
||||
$(indent);
|
||||
[ -d $(2)/logs ] || mkdir $(2)/logs
|
||||
$(indent);
|
||||
[ -d $(2)/$(LIBOUTDIR) ] || mkdir $(2)/$(LIBOUTDIR)
|
||||
|
|
Загрузка…
Ссылка в новой задаче