Analyzer - Add support to store values of metrics in data diagnosis (#392)
**Description** Add support to store values of metrics in data diagnosis. Take the following rules as example: ``` nccl_store_rule: categories: NCCL_DIS store: True metrics: - nccl-bw:allreduce-run0/allreduce_1073741824_busbw - nccl-bw:allreduce-run1/allreduce_1073741824_busbw - nccl-bw:allreduce-run2/allreduce_1073741824_busbw - nccl-bw:allreduce-run3/allreduce_1073741824_busbw - nccl-bw:allreduce-run4/allreduce_1073741824_busbw nccl_rule: function: multi_rules criteria: 'lambda label:True if min(label["nccl_store_rule"].values())/max(label["nccl_store_rule"].values())<0.95 else False' categories: NCCL_DIS ``` **nccl_store_rule** will store the values of the metrics in dict and save them into `label["nccl_store_rule"]` , and then **rccl_rule** can use the values of metrics through `label["nccl_store_rule"].values()` in criteria
This commit is contained in:
Родитель
10a79c4ea8
Коммит
733860d715
|
@ -34,18 +34,19 @@ class DataDiagnosis(RuleBase):
|
|||
"""
|
||||
# check if rule is supported
|
||||
super()._check_and_format_rules(rule, name)
|
||||
if 'function' not in rule:
|
||||
logger.log_and_raise(exception=Exception, msg='{} lack of function'.format(name))
|
||||
if not isinstance(DiagnosisRuleType(rule['function']), DiagnosisRuleType):
|
||||
logger.log_and_raise(exception=Exception, msg='{} invalid function name'.format(name))
|
||||
# check rule format
|
||||
if 'criteria' not in rule:
|
||||
logger.log_and_raise(exception=Exception, msg='{} lack of criteria'.format(name))
|
||||
if not isinstance(eval(rule['criteria']), Callable):
|
||||
logger.log_and_raise(exception=Exception, msg='invalid criteria format')
|
||||
if rule['function'] != 'multi_rules':
|
||||
if 'metrics' not in rule:
|
||||
logger.log_and_raise(exception=Exception, msg='{} lack of metrics'.format(name))
|
||||
if 'store' not in rule:
|
||||
if 'function' not in rule:
|
||||
logger.log_and_raise(exception=Exception, msg='{} lack of function'.format(name))
|
||||
if not isinstance(DiagnosisRuleType(rule['function']), DiagnosisRuleType):
|
||||
logger.log_and_raise(exception=Exception, msg='{} invalid function name'.format(name))
|
||||
# check rule format
|
||||
if 'criteria' not in rule:
|
||||
logger.log_and_raise(exception=Exception, msg='{} lack of criteria'.format(name))
|
||||
if not isinstance(eval(rule['criteria']), Callable):
|
||||
logger.log_and_raise(exception=Exception, msg='invalid criteria format')
|
||||
if rule['function'] != 'multi_rules':
|
||||
if 'metrics' not in rule:
|
||||
logger.log_and_raise(exception=Exception, msg='{} lack of metrics'.format(name))
|
||||
if 'store' in rule and not isinstance(rule['store'], bool):
|
||||
logger.log_and_raise(exception=Exception, msg='{} store must be bool type'.format(name))
|
||||
return rule
|
||||
|
@ -117,10 +118,12 @@ class DataDiagnosis(RuleBase):
|
|||
benchmark_rules[rule] = self._check_and_format_rules(benchmark_rules[rule], rule)
|
||||
self._sb_rules[rule] = {}
|
||||
self._sb_rules[rule]['name'] = rule
|
||||
self._sb_rules[rule]['function'] = benchmark_rules[rule]['function']
|
||||
if 'function' in benchmark_rules[rule]:
|
||||
self._sb_rules[rule]['function'] = benchmark_rules[rule]['function']
|
||||
self._sb_rules[rule]['store'] = True if 'store' in benchmark_rules[
|
||||
rule] and benchmark_rules[rule]['store'] is True else False
|
||||
self._sb_rules[rule]['criteria'] = benchmark_rules[rule]['criteria']
|
||||
if 'criteria' in benchmark_rules[rule]:
|
||||
self._sb_rules[rule]['criteria'] = benchmark_rules[rule]['criteria']
|
||||
self._sb_rules[rule]['categories'] = benchmark_rules[rule]['categories']
|
||||
self._sb_rules[rule]['metrics'] = {}
|
||||
self.__get_metrics_and_baseline(rule, benchmark_rules, baseline)
|
||||
|
@ -151,16 +154,22 @@ class DataDiagnosis(RuleBase):
|
|||
issue_label = False
|
||||
details = []
|
||||
categories = set()
|
||||
violation = {}
|
||||
store_values = {}
|
||||
summary_data_row = pd.Series(index=self._enable_metrics, name=node, dtype=float)
|
||||
# Check each rule
|
||||
for rule in self._sb_rules:
|
||||
# if no criteria and store is True in a rule, store the value of metrics in the rule
|
||||
if self._sb_rules[rule]['store'] and 'criteria' not in self._sb_rules[rule]:
|
||||
store_values[rule] = {}
|
||||
for metric in self._sb_rules[rule]['metrics']:
|
||||
store_values[rule][metric] = data_row[metric]
|
||||
continue
|
||||
# Get rule op function and run the rule
|
||||
function_name = self._sb_rules[rule]['function']
|
||||
rule_op = RuleOp.get_rule_func(DiagnosisRuleType(function_name))
|
||||
violated_num = 0
|
||||
if rule_op == RuleOp.multi_rules:
|
||||
violated_num = rule_op(self._sb_rules[rule], details, categories, violation)
|
||||
violated_num = rule_op(self._sb_rules[rule], details, categories, store_values)
|
||||
elif rule_op == RuleOp.failure_check:
|
||||
violated_num = rule_op(
|
||||
data_row, self._sb_rules[rule], summary_data_row, details, categories, self._raw_rules[rule]
|
||||
|
@ -169,7 +178,7 @@ class DataDiagnosis(RuleBase):
|
|||
violated_num = rule_op(data_row, self._sb_rules[rule], summary_data_row, details, categories)
|
||||
# label the node as defective one
|
||||
if self._sb_rules[rule]['store']:
|
||||
violation[rule] = violated_num
|
||||
store_values[rule] = violated_num
|
||||
elif violated_num:
|
||||
issue_label = True
|
||||
if issue_label:
|
||||
|
|
|
@ -188,7 +188,7 @@ class RuleOp:
|
|||
return violated_metric_num
|
||||
|
||||
@staticmethod
|
||||
def multi_rules(rule, details, categories, violation):
|
||||
def multi_rules(rule, details, categories, store_values):
|
||||
"""Rule op function of multi_rules.
|
||||
|
||||
The criteria in this rule will use the combined results of multiple previous rules and their metrics
|
||||
|
@ -198,11 +198,12 @@ class RuleOp:
|
|||
rule (dict): rule including function, criteria, metrics with their baseline values and categories
|
||||
details (list): details about violated rules and related data
|
||||
categories (set): categories of violated rules
|
||||
violation (dict): the number of the metrics that violate the rules
|
||||
store_values (dict): including the number of the metrics that violate the rule, and the values of
|
||||
the metrics for the rules with 'store' True
|
||||
Returns:
|
||||
number: 0 if the rule is passed, otherwise 1
|
||||
"""
|
||||
violated = eval(rule['criteria'])(violation)
|
||||
violated = eval(rule['criteria'])(store_values)
|
||||
if not isinstance(violated, bool):
|
||||
logger.log_and_raise(exception=Exception, msg='invalid upper criteria format')
|
||||
if violated:
|
||||
|
|
|
@ -9,6 +9,7 @@ import yaml
|
|||
from pathlib import Path
|
||||
|
||||
import pandas as pd
|
||||
import numpy as np
|
||||
|
||||
from superbench.analyzer import DataDiagnosis
|
||||
import superbench.analyzer.file_handler as file_handler
|
||||
|
@ -369,6 +370,50 @@ class TestDataDiagnosis(unittest.TestCase):
|
|||
'rule3:lambda label:True if label["rule1"]+label["rule2"]>=2 else False'
|
||||
)
|
||||
|
||||
# Test multi-rule using values of metrics in criteria lambda expression
|
||||
diag1 = DataDiagnosis()
|
||||
# test _run_diagnosis_rules_for_single_node
|
||||
rules = {
|
||||
'superbench': {
|
||||
'rules': {
|
||||
'rule1': {
|
||||
'categories':
|
||||
'NCCL_DIS',
|
||||
'store':
|
||||
True,
|
||||
'metrics': [
|
||||
'nccl-bw:allreduce-run0/allreduce_1073741824_busbw',
|
||||
'nccl-bw:allreduce-run1/allreduce_1073741824_busbw',
|
||||
'nccl-bw:allreduce-run2/allreduce_1073741824_busbw'
|
||||
]
|
||||
},
|
||||
'rule2': {
|
||||
'categories': 'NCCL_DIS',
|
||||
'criteria': 'lambda label:True if min(label["rule1"].values())' + '/' +
|
||||
'max(label["rule1"].values())<0.95 else False',
|
||||
'function': 'multi_rules'
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
baseline = {}
|
||||
data = {
|
||||
'nccl-bw:allreduce-run0/allreduce_1073741824_busbw': [10, 22, 10],
|
||||
'nccl-bw:allreduce-run1/allreduce_1073741824_busbw': [23, 23, np.nan],
|
||||
'nccl-bw:allreduce-run2/allreduce_1073741824_busbw': [22, 22, np.nan]
|
||||
}
|
||||
diag1._raw_data_df = pd.DataFrame(data, index=['sb-validation-04', 'sb-validation-05', 'sb-validation-06'])
|
||||
diag1._benchmark_metrics_dict = diag1._get_metrics_by_benchmarks(list(diag1._raw_data_df.columns))
|
||||
diag1._parse_rules_and_baseline(rules, baseline)
|
||||
(details_row, summary_data_row) = diag1._run_diagnosis_rules_for_single_node('sb-validation-04')
|
||||
assert (details_row)
|
||||
assert ('NCCL_DIS' in details_row[0])
|
||||
(details_row, summary_data_row) = diag1._run_diagnosis_rules_for_single_node('sb-validation-05')
|
||||
assert (not details_row)
|
||||
(details_row, summary_data_row) = diag1._run_diagnosis_rules_for_single_node('sb-validation-06')
|
||||
assert (not details_row)
|
||||
|
||||
def test_failure_check(self):
|
||||
"""Test failure test check feature."""
|
||||
diag1 = DataDiagnosis()
|
||||
|
|
Загрузка…
Ссылка в новой задаче