зеркало из https://github.com/Azure/autorest.az.git
[DoNotMerge] Add coverage tool (#472)
* add coverage function * fix test * add coverage script. * fix test * fix system exit exception * fix test * fix test
This commit is contained in:
Родитель
8631578d3c
Коммит
f2708d2114
|
@ -0,0 +1,216 @@
|
|||
#!/bin/python
|
||||
import os
|
||||
import time
|
||||
import subprocess
|
||||
import datetime as dt
|
||||
from urllib import parse
|
||||
|
||||
from sqlalchemy import Column, DateTime, ForeignKey, Integer, String, create_engine
|
||||
from sqlalchemy.orm import sessionmaker
|
||||
from sqlalchemy.ext.declarative import declarative_base
|
||||
|
||||
|
||||
# get db info
|
||||
HostName = os.environ.get("DB_HOST_NAME")
|
||||
LoginName = os.environ.get("DB_LOGIN_NAME")
|
||||
Password = parse.quote_plus(os.environ.get("DB_PASSWORD")) # encode special chars
|
||||
DBName = os.environ.get("DB_NAME")
|
||||
# DEV_PREFIX = "D:/dev/"
|
||||
DEV_PREFIX = os.environ.get("DEV_PREFIX")
|
||||
AZ_PREFIX = DEV_PREFIX + "autorest.az"
|
||||
AZ_CLI_EXT_PREFIX = DEV_PREFIX + "azure-cli-extensions"
|
||||
AWAGGER_PREFIX = DEV_PREFIX + "azure-rest-api-specs"
|
||||
|
||||
|
||||
Base = declarative_base()
|
||||
|
||||
# Models
|
||||
class TblAztestBatch(Base):
|
||||
__tablename__ = "tbl_aztest_batch"
|
||||
|
||||
id = Column(Integer, primary_key=True)
|
||||
batch_name = Column(String, nullable=True)
|
||||
start_dt = Column(DateTime, default=dt.datetime.utcnow)
|
||||
end_dt = Column(DateTime, default=dt.datetime.utcnow)
|
||||
|
||||
class TblRp(Base):
|
||||
__tablename__ = "tbl_rp"
|
||||
|
||||
id = Column(Integer, primary_key=True)
|
||||
rp_name = Column(String)
|
||||
|
||||
class TblAztestStep(Base):
|
||||
__tablename__ = "tbl_aztest_step"
|
||||
|
||||
id = Column(Integer, primary_key=True)
|
||||
aztest_batch_id = Column(Integer, ForeignKey(TblAztestBatch.id))
|
||||
rp_id = Column(Integer, ForeignKey(TblRp.id))
|
||||
testcase_name = Column(String)
|
||||
step_name = Column(String)
|
||||
result = Column(Integer, default=1)
|
||||
error_message = Column(String, default="")
|
||||
error_stack = Column(String, default="")
|
||||
error_normalized = Column(String, default="")
|
||||
start_dt = Column(DateTime, default=dt.datetime.utcnow)
|
||||
end_dt = Column(DateTime, default=dt.datetime.utcnow)
|
||||
|
||||
# Client
|
||||
class DBClient:
|
||||
|
||||
def __init__(self):
|
||||
# pyodbc
|
||||
odbc_connect = parse.quote_plus('DRIVER={ODBC Driver 17 for SQL Server};SERVER='+HostName+';DATABASE='+DBName+';UID='+LoginName+';PWD='+ Password)
|
||||
self._engine = create_engine('mssql+pyodbc:///?odbc_connect={}'.format(odbc_connect))
|
||||
self._session_maker = sessionmaker(bind=self._engine)
|
||||
|
||||
def __enter__(self):
|
||||
self._session = self._session_maker()
|
||||
return self._session
|
||||
|
||||
def __exit__(self, *args, **kwargs):
|
||||
self._session.close()
|
||||
|
||||
def test(client):
|
||||
rs = client.execute("select * from tbl_aztest_batch")
|
||||
print(rs.fetchone())
|
||||
|
||||
def calcCoverage(client, testcase_name, test_path, repo, tbl_batch, debug=False):
|
||||
if not os.path.exists(test_path):
|
||||
return
|
||||
|
||||
if not debug:
|
||||
tbl_rp = client.query(TblRp).filter(TblRp.rp_name==repo).first()
|
||||
if not tbl_rp:
|
||||
# create rp
|
||||
tbl_rp = TblRp(rp_name=repo)
|
||||
client.add(tbl_rp)
|
||||
tbl_rp = client.query(TblRp).filter(TblRp.rp_name==repo).first()
|
||||
|
||||
steps = None
|
||||
coverage = None
|
||||
with open(test_path, 'r') as f:
|
||||
text = f.readlines()
|
||||
steps = text[1:-1]
|
||||
coverage = text[-1]
|
||||
|
||||
for step in steps:
|
||||
scenario = step.split('|')[1:-1]
|
||||
if len(scenario) != 7:
|
||||
continue
|
||||
step_name, result, error_message, error_stack, error_normalized, start_dt, end_dt = scenario
|
||||
start_dt = dt.datetime.strptime(start_dt, "%Y-%m-%d %H:%M:%S.%f")
|
||||
end_dt = dt.datetime.strptime(end_dt, "%Y-%m-%d %H:%M:%S.%f")
|
||||
result = True if result == "successed" else False
|
||||
|
||||
if debug:
|
||||
# debug
|
||||
# print(step_name, result)
|
||||
print(scenario)
|
||||
else:
|
||||
# add step
|
||||
tbl_az_step = TblAztestStep(
|
||||
aztest_batch_id=tbl_batch.id,
|
||||
rp_id=tbl_rp.id,
|
||||
testcase_name=testcase_name,
|
||||
step_name=step_name,
|
||||
result=result,
|
||||
error_message=error_message,
|
||||
error_stack=error_stack,
|
||||
error_normalized=error_normalized,
|
||||
start_dt=start_dt,
|
||||
end_dt=end_dt
|
||||
)
|
||||
client.add(tbl_az_step)
|
||||
|
||||
# commit
|
||||
client.commit()
|
||||
|
||||
def runTask(args):
|
||||
try:
|
||||
if os.name == 'nt':
|
||||
output = subprocess.check_output(args, stderr=subprocess.STDOUT, shell=True, universal_newlines=True)
|
||||
else:
|
||||
output = subprocess.check_output(args, stderr=subprocess.STDOUT, universal_newlines=True)
|
||||
except subprocess.CalledProcessError as exc:
|
||||
print("Status : FAIL", exc.returncode)
|
||||
raise AssertionError(exc.output)
|
||||
else:
|
||||
print("Output: \n{}\n".format(output))
|
||||
|
||||
# Collection
|
||||
def repoColletor(debug=False, enable_batch=False, run_codegen=False, is_live=False):
|
||||
az_cli_ext_prefix = AZ_CLI_EXT_PREFIX + "/src"
|
||||
use_az = "--use=" + AZ_PREFIX
|
||||
use_az_cli_folder = "--azure-cli-extension-folder=" + AZ_CLI_EXT_PREFIX
|
||||
|
||||
repos = list()
|
||||
|
||||
# run codegen (the rp names in specification are different from cli_extension )
|
||||
if run_codegen:
|
||||
for rp in os.listdir(AWAGGER_PREFIX + '/specification/'):
|
||||
if rp == 'testserver':
|
||||
swagger_arg_str = AZ_PREFIX + '/src/test/scenarios/testserver/configuration/readme.md'
|
||||
else:
|
||||
swagger_arg_str = AWAGGER_PREFIX + '/specification/'+ rp + '/resource-manager/readme.md'
|
||||
|
||||
cmd_codegen = ['autorest', '--version=3.0.6271', '--az', use_az, use_az_cli_folder, swagger_arg_str]
|
||||
print(" ".join(cmd_codegen))
|
||||
try:
|
||||
runTask(cmd_codegen)
|
||||
except:
|
||||
continue
|
||||
|
||||
for rp in os.listdir(az_cli_ext_prefix):
|
||||
if os.path.isdir(os.path.join(az_cli_ext_prefix, rp)):
|
||||
repos.append(rp)
|
||||
|
||||
# run codegen (only available for kusto now.)
|
||||
if run_codegen:
|
||||
# add extension
|
||||
cmd_add_ext = ['azdev', 'extension', 'add', rp]
|
||||
print(" ".join(cmd_add_ext))
|
||||
try:
|
||||
runTask(" ".join(cmd_add_ext))
|
||||
except:
|
||||
continue
|
||||
|
||||
# az test
|
||||
cmd_az_test = ['azdev', 'test', rp, "--discover"]
|
||||
if is_live:
|
||||
cmd_az_test += ['--live']
|
||||
print(" ".join(cmd_az_test))
|
||||
try:
|
||||
runTask(cmd_az_test)
|
||||
except:
|
||||
continue
|
||||
|
||||
with DBClient() as client:
|
||||
print("Uploading data..")
|
||||
if enable_batch:
|
||||
# unable now
|
||||
pass
|
||||
|
||||
if not debug and repos:
|
||||
# create batch
|
||||
tbl_batch = TblAztestBatch(batch_name="")
|
||||
client.add(tbl_batch)
|
||||
client.commit()
|
||||
tbl_batch = client.query(TblAztestBatch).order_by(TblAztestBatch.start_dt.desc()).first()
|
||||
else:
|
||||
tbl_batch = None
|
||||
|
||||
for repo in repos:
|
||||
c_name = "test_{}_scenario_coverage.md".format(repo)
|
||||
c_path = os.path.join(az_cli_ext_prefix, repo, "azext_" + repo, "tests", "latest", c_name)
|
||||
|
||||
calcCoverage(client, c_name, c_path, repo, tbl_batch, debug=debug)
|
||||
print("Completed.")
|
||||
|
||||
def main():
|
||||
# When debug is True, it won't upload test data to database.
|
||||
debug = os.environ.get("CALC_COVERAGE_DEBUG", True)
|
||||
repoColletor(debug=debug, run_codegen=False)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
|
@ -22,13 +22,17 @@ export function GenerateAzureCliTestInit(model: CodeModelAz): string[] {
|
|||
output.push("import os");
|
||||
output.push("import sys");
|
||||
output.push("import traceback");
|
||||
output.push("import datetime as dt");
|
||||
output.push("");
|
||||
output.push("from azure.core.exceptions import AzureError");
|
||||
output.push("from azure.cli.testsdk.exceptions import CliTestError, CliExecutionError, JMESPathCheckAssertionError");
|
||||
output.push("");
|
||||
output.push("");
|
||||
output.push("__path__ = __import__('pkgutil').extend_path(__path__, __name__)");
|
||||
output.push("exceptions = []");
|
||||
output.push('');
|
||||
output.push('test_map = dict()');
|
||||
output.push('SUCCESSED = "successed"');
|
||||
output.push('FAILED = "failed"');
|
||||
output.push('');
|
||||
output.push('def try_manual(func):');
|
||||
output.push(' def import_manual_function(origin_func):');
|
||||
|
@ -58,19 +62,49 @@ export function GenerateAzureCliTestInit(model: CodeModelAz): string[] {
|
|||
output.push(' func_to_call = get_func_to_call()');
|
||||
output.push(' print("running {}()...".format(func.__name__))');
|
||||
output.push(' try:');
|
||||
output.push(' return func_to_call(*args, **kwargs)');
|
||||
output.push(' except (AssertionError, AzureError, CliTestError, CliExecutionError, JMESPathCheckAssertionError) as e:');
|
||||
output.push(' test_map[func.__name__] = dict()');
|
||||
output.push(' test_map[func.__name__]["result"] = SUCCESSED');
|
||||
output.push(' test_map[func.__name__]["error_message"] = ""');
|
||||
output.push(' test_map[func.__name__]["error_stack"] = ""');
|
||||
output.push(' test_map[func.__name__]["error_normalized"] = ""');
|
||||
output.push(' test_map[func.__name__]["start_dt"] = dt.datetime.utcnow()');
|
||||
output.push(' ret = func_to_call(*args, **kwargs)');
|
||||
output.push(' except (AssertionError, AzureError, CliTestError, CliExecutionError, SystemExit, JMESPathCheckAssertionError) as e:');
|
||||
output.push(' test_map[func.__name__]["end_dt"] = dt.datetime.utcnow()');
|
||||
output.push(' test_map[func.__name__]["result"] = FAILED');
|
||||
output.push(' test_map[func.__name__]["error_message"] = str(e).replace("\\r\\n", " ").replace("\\n", " ")[:500]');
|
||||
output.push(' test_map[func.__name__]["error_stack"] = traceback.format_exc().replace("\\r\\n", " ").replace("\\n", " ")[:500]');
|
||||
output.push(' print("--------------------------------------")');
|
||||
output.push(' print("step exception: ", e)');
|
||||
output.push(' print("--------------------------------------", file=sys.stderr)');
|
||||
output.push(' print("step exception in {}: {}".format(func.__name__, e), file=sys.stderr)');
|
||||
output.push(' traceback.print_exc()');
|
||||
output.push(' exceptions.append((func.__name__, sys.exc_info()))');
|
||||
output.push(' else:');
|
||||
output.push(' test_map[func.__name__]["end_dt"] = dt.datetime.utcnow()');
|
||||
output.push(' return ret');
|
||||
output.push('');
|
||||
output.push(' if inspect.isclass(func):');
|
||||
output.push(' return get_func_to_call()');
|
||||
output.push(' return wrapper');
|
||||
output.push('');
|
||||
output.push('def calc_coverage(filename):');
|
||||
output.push(' filename = filename.split(".")[0]');
|
||||
output.push(' coverage_name = filename + "_coverage.md"');
|
||||
output.push(' with open(coverage_name, "w") as f:');
|
||||
output.push(' f.write("|Scenario|Result|ErrorMessage|ErrorStack|ErrorNormalized|StartDt|EndDt|\\n")');
|
||||
output.push(' failed = 0');
|
||||
output.push(' total = len(test_map)');
|
||||
output.push(' covered = 0');
|
||||
output.push(' for k, v in test_map.items():');
|
||||
output.push(' if not k.startswith("step_"):');
|
||||
output.push(' total -= 1');
|
||||
output.push(' continue');
|
||||
output.push(' if v["result"] == SUCCESSED:');
|
||||
output.push(' covered += 1');
|
||||
output.push(' f.write("|{step_name}|{result}|{error_message}|{error_stack}|{error_normalized}|{start_dt}|{end_dt}|\\n".format(step_name=k, **v))');
|
||||
output.push(' f.write("Coverage: {}/{}\\n".format(covered, total))');
|
||||
output.push(' print("Create coverage\\n", file=sys.stderr)');
|
||||
output.push('');
|
||||
output.push('def raise_if():');
|
||||
output.push(' if exceptions:');
|
||||
|
|
|
@ -33,7 +33,7 @@ export function GenerateAzureCliTestScenario(model: CodeModelAz): string[] {
|
|||
head.push("");
|
||||
head.push("import os");
|
||||
head.push("from azure.cli.testsdk import ScenarioTest");
|
||||
head.push("from .. import try_manual, raise_if");
|
||||
head.push("from .. import try_manual, raise_if, calc_coverage");
|
||||
//head.push("from .preparers import (VirtualNetworkPreparer, VnetSubnetPreparer)");
|
||||
steps.push("");
|
||||
steps.push("");
|
||||
|
@ -140,6 +140,7 @@ export function GenerateAzureCliTestScenario(model: CodeModelAz): string[] {
|
|||
funcScenario.push("");
|
||||
funcScenario.push("");
|
||||
body.push(` call_scenario(self${parameterLine()})`);
|
||||
body.push(` calc_coverage(__file__)`);
|
||||
body.push(` raise_if()`);
|
||||
body.push("");
|
||||
|
||||
|
|
|
@ -12,13 +12,17 @@ import inspect
|
|||
import os
|
||||
import sys
|
||||
import traceback
|
||||
import datetime as dt
|
||||
|
||||
from azure.core.exceptions import AzureError
|
||||
from azure.cli.testsdk.exceptions import CliTestError, CliExecutionError, JMESPathCheckAssertionError
|
||||
|
||||
|
||||
__path__ = __import__('pkgutil').extend_path(__path__, __name__)
|
||||
exceptions = []
|
||||
|
||||
test_map = dict()
|
||||
SUCCESSED = "successed"
|
||||
FAILED = "failed"
|
||||
|
||||
def try_manual(func):
|
||||
def import_manual_function(origin_func):
|
||||
|
@ -48,19 +52,49 @@ def try_manual(func):
|
|||
func_to_call = get_func_to_call()
|
||||
print("running {}()...".format(func.__name__))
|
||||
try:
|
||||
return func_to_call(*args, **kwargs)
|
||||
except (AssertionError, AzureError, CliTestError, CliExecutionError, JMESPathCheckAssertionError) as e:
|
||||
test_map[func.__name__] = dict()
|
||||
test_map[func.__name__]["result"] = SUCCESSED
|
||||
test_map[func.__name__]["error_message"] = ""
|
||||
test_map[func.__name__]["error_stack"] = ""
|
||||
test_map[func.__name__]["error_normalized"] = ""
|
||||
test_map[func.__name__]["start_dt"] = dt.datetime.utcnow()
|
||||
ret = func_to_call(*args, **kwargs)
|
||||
except (AssertionError, AzureError, CliTestError, CliExecutionError, SystemExit, JMESPathCheckAssertionError) as e:
|
||||
test_map[func.__name__]["end_dt"] = dt.datetime.utcnow()
|
||||
test_map[func.__name__]["result"] = FAILED
|
||||
test_map[func.__name__]["error_message"] = str(e).replace("\r\n", " ").replace("\n", " ")[:500]
|
||||
test_map[func.__name__]["error_stack"] = traceback.format_exc().replace("\r\n", " ").replace("\n", " ")[:500]
|
||||
print("--------------------------------------")
|
||||
print("step exception: ", e)
|
||||
print("--------------------------------------", file=sys.stderr)
|
||||
print("step exception in {}: {}".format(func.__name__, e), file=sys.stderr)
|
||||
traceback.print_exc()
|
||||
exceptions.append((func.__name__, sys.exc_info()))
|
||||
else:
|
||||
test_map[func.__name__]["end_dt"] = dt.datetime.utcnow()
|
||||
return ret
|
||||
|
||||
if inspect.isclass(func):
|
||||
return get_func_to_call()
|
||||
return wrapper
|
||||
|
||||
def calc_coverage(filename):
|
||||
filename = filename.split(".")[0]
|
||||
coverage_name = filename + "_coverage.md"
|
||||
with open(coverage_name, "w") as f:
|
||||
f.write("|Scenario|Result|ErrorMessage|ErrorStack|ErrorNormalized|StartDt|EndDt|\n")
|
||||
failed = 0
|
||||
total = len(test_map)
|
||||
covered = 0
|
||||
for k, v in test_map.items():
|
||||
if not k.startswith("step_"):
|
||||
total -= 1
|
||||
continue
|
||||
if v["result"] == SUCCESSED:
|
||||
covered += 1
|
||||
f.write("|{step_name}|{result}|{error_message}|{error_stack}|{error_normalized}|{start_dt}|{end_dt}|\n".format(step_name=k, **v))
|
||||
f.write("Coverage: {}/{}\n".format(covered, total))
|
||||
print("Create coverage\n", file=sys.stderr)
|
||||
|
||||
def raise_if():
|
||||
if exceptions:
|
||||
|
|
|
@ -10,7 +10,7 @@
|
|||
|
||||
import os
|
||||
from azure.cli.testsdk import ScenarioTest
|
||||
from .. import try_manual, raise_if
|
||||
from .. import try_manual, raise_if, calc_coverage
|
||||
from azure.cli.testsdk import ResourceGroupPreparer
|
||||
|
||||
|
||||
|
@ -105,4 +105,5 @@ class AttestationManagementClientScenarioTest(ScenarioTest):
|
|||
def test_attestation(self, rg, rg_2, rg_3):
|
||||
|
||||
call_scenario(self, rg, rg_2, rg_3)
|
||||
calc_coverage(__file__)
|
||||
raise_if()
|
||||
|
|
|
@ -12,13 +12,17 @@ import inspect
|
|||
import os
|
||||
import sys
|
||||
import traceback
|
||||
import datetime as dt
|
||||
|
||||
from azure.core.exceptions import AzureError
|
||||
from azure.cli.testsdk.exceptions import CliTestError, CliExecutionError, JMESPathCheckAssertionError
|
||||
|
||||
|
||||
__path__ = __import__('pkgutil').extend_path(__path__, __name__)
|
||||
exceptions = []
|
||||
|
||||
test_map = dict()
|
||||
SUCCESSED = "successed"
|
||||
FAILED = "failed"
|
||||
|
||||
def try_manual(func):
|
||||
def import_manual_function(origin_func):
|
||||
|
@ -48,19 +52,49 @@ def try_manual(func):
|
|||
func_to_call = get_func_to_call()
|
||||
print("running {}()...".format(func.__name__))
|
||||
try:
|
||||
return func_to_call(*args, **kwargs)
|
||||
except (AssertionError, AzureError, CliTestError, CliExecutionError, JMESPathCheckAssertionError) as e:
|
||||
test_map[func.__name__] = dict()
|
||||
test_map[func.__name__]["result"] = SUCCESSED
|
||||
test_map[func.__name__]["error_message"] = ""
|
||||
test_map[func.__name__]["error_stack"] = ""
|
||||
test_map[func.__name__]["error_normalized"] = ""
|
||||
test_map[func.__name__]["start_dt"] = dt.datetime.utcnow()
|
||||
ret = func_to_call(*args, **kwargs)
|
||||
except (AssertionError, AzureError, CliTestError, CliExecutionError, SystemExit, JMESPathCheckAssertionError) as e:
|
||||
test_map[func.__name__]["end_dt"] = dt.datetime.utcnow()
|
||||
test_map[func.__name__]["result"] = FAILED
|
||||
test_map[func.__name__]["error_message"] = str(e).replace("\r\n", " ").replace("\n", " ")[:500]
|
||||
test_map[func.__name__]["error_stack"] = traceback.format_exc().replace("\r\n", " ").replace("\n", " ")[:500]
|
||||
print("--------------------------------------")
|
||||
print("step exception: ", e)
|
||||
print("--------------------------------------", file=sys.stderr)
|
||||
print("step exception in {}: {}".format(func.__name__, e), file=sys.stderr)
|
||||
traceback.print_exc()
|
||||
exceptions.append((func.__name__, sys.exc_info()))
|
||||
else:
|
||||
test_map[func.__name__]["end_dt"] = dt.datetime.utcnow()
|
||||
return ret
|
||||
|
||||
if inspect.isclass(func):
|
||||
return get_func_to_call()
|
||||
return wrapper
|
||||
|
||||
def calc_coverage(filename):
|
||||
filename = filename.split(".")[0]
|
||||
coverage_name = filename + "_coverage.md"
|
||||
with open(coverage_name, "w") as f:
|
||||
f.write("|Scenario|Result|ErrorMessage|ErrorStack|ErrorNormalized|StartDt|EndDt|\n")
|
||||
failed = 0
|
||||
total = len(test_map)
|
||||
covered = 0
|
||||
for k, v in test_map.items():
|
||||
if not k.startswith("step_"):
|
||||
total -= 1
|
||||
continue
|
||||
if v["result"] == SUCCESSED:
|
||||
covered += 1
|
||||
f.write("|{step_name}|{result}|{error_message}|{error_stack}|{error_normalized}|{start_dt}|{end_dt}|\n".format(step_name=k, **v))
|
||||
f.write("Coverage: {}/{}\n".format(covered, total))
|
||||
print("Create coverage\n", file=sys.stderr)
|
||||
|
||||
def raise_if():
|
||||
if exceptions:
|
||||
|
|
|
@ -10,7 +10,7 @@
|
|||
|
||||
import os
|
||||
from azure.cli.testsdk import ScenarioTest
|
||||
from .. import try_manual, raise_if
|
||||
from .. import try_manual, raise_if, calc_coverage
|
||||
|
||||
|
||||
TEST_DIR = os.path.abspath(os.path.join(os.path.abspath(__file__), '..'))
|
||||
|
@ -38,4 +38,5 @@ class AutoRestTestServiceScenarioTest(ScenarioTest):
|
|||
def test_boolean(self):
|
||||
|
||||
call_scenario(self)
|
||||
calc_coverage(__file__)
|
||||
raise_if()
|
||||
|
|
|
@ -12,13 +12,17 @@ import inspect
|
|||
import os
|
||||
import sys
|
||||
import traceback
|
||||
import datetime as dt
|
||||
|
||||
from azure.core.exceptions import AzureError
|
||||
from azure.cli.testsdk.exceptions import CliTestError, CliExecutionError, JMESPathCheckAssertionError
|
||||
|
||||
|
||||
__path__ = __import__('pkgutil').extend_path(__path__, __name__)
|
||||
exceptions = []
|
||||
|
||||
test_map = dict()
|
||||
SUCCESSED = "successed"
|
||||
FAILED = "failed"
|
||||
|
||||
def try_manual(func):
|
||||
def import_manual_function(origin_func):
|
||||
|
@ -48,19 +52,49 @@ def try_manual(func):
|
|||
func_to_call = get_func_to_call()
|
||||
print("running {}()...".format(func.__name__))
|
||||
try:
|
||||
return func_to_call(*args, **kwargs)
|
||||
except (AssertionError, AzureError, CliTestError, CliExecutionError, JMESPathCheckAssertionError) as e:
|
||||
test_map[func.__name__] = dict()
|
||||
test_map[func.__name__]["result"] = SUCCESSED
|
||||
test_map[func.__name__]["error_message"] = ""
|
||||
test_map[func.__name__]["error_stack"] = ""
|
||||
test_map[func.__name__]["error_normalized"] = ""
|
||||
test_map[func.__name__]["start_dt"] = dt.datetime.utcnow()
|
||||
ret = func_to_call(*args, **kwargs)
|
||||
except (AssertionError, AzureError, CliTestError, CliExecutionError, SystemExit, JMESPathCheckAssertionError) as e:
|
||||
test_map[func.__name__]["end_dt"] = dt.datetime.utcnow()
|
||||
test_map[func.__name__]["result"] = FAILED
|
||||
test_map[func.__name__]["error_message"] = str(e).replace("\r\n", " ").replace("\n", " ")[:500]
|
||||
test_map[func.__name__]["error_stack"] = traceback.format_exc().replace("\r\n", " ").replace("\n", " ")[:500]
|
||||
print("--------------------------------------")
|
||||
print("step exception: ", e)
|
||||
print("--------------------------------------", file=sys.stderr)
|
||||
print("step exception in {}: {}".format(func.__name__, e), file=sys.stderr)
|
||||
traceback.print_exc()
|
||||
exceptions.append((func.__name__, sys.exc_info()))
|
||||
else:
|
||||
test_map[func.__name__]["end_dt"] = dt.datetime.utcnow()
|
||||
return ret
|
||||
|
||||
if inspect.isclass(func):
|
||||
return get_func_to_call()
|
||||
return wrapper
|
||||
|
||||
def calc_coverage(filename):
|
||||
filename = filename.split(".")[0]
|
||||
coverage_name = filename + "_coverage.md"
|
||||
with open(coverage_name, "w") as f:
|
||||
f.write("|Scenario|Result|ErrorMessage|ErrorStack|ErrorNormalized|StartDt|EndDt|\n")
|
||||
failed = 0
|
||||
total = len(test_map)
|
||||
covered = 0
|
||||
for k, v in test_map.items():
|
||||
if not k.startswith("step_"):
|
||||
total -= 1
|
||||
continue
|
||||
if v["result"] == SUCCESSED:
|
||||
covered += 1
|
||||
f.write("|{step_name}|{result}|{error_message}|{error_stack}|{error_normalized}|{start_dt}|{end_dt}|\n".format(step_name=k, **v))
|
||||
f.write("Coverage: {}/{}\n".format(covered, total))
|
||||
print("Create coverage\n", file=sys.stderr)
|
||||
|
||||
def raise_if():
|
||||
if exceptions:
|
||||
|
|
|
@ -10,7 +10,7 @@
|
|||
|
||||
import os
|
||||
from azure.cli.testsdk import ScenarioTest
|
||||
from .. import try_manual, raise_if
|
||||
from .. import try_manual, raise_if, calc_coverage
|
||||
from azure.cli.testsdk import ResourceGroupPreparer
|
||||
|
||||
|
||||
|
@ -465,4 +465,5 @@ class DFAZManagementClientScenarioTest(ScenarioTest):
|
|||
})
|
||||
|
||||
call_scenario(self, rg)
|
||||
calc_coverage(__file__)
|
||||
raise_if()
|
||||
|
|
|
@ -12,13 +12,17 @@ import inspect
|
|||
import os
|
||||
import sys
|
||||
import traceback
|
||||
import datetime as dt
|
||||
|
||||
from azure.core.exceptions import AzureError
|
||||
from azure.cli.testsdk.exceptions import CliTestError, CliExecutionError, JMESPathCheckAssertionError
|
||||
|
||||
|
||||
__path__ = __import__('pkgutil').extend_path(__path__, __name__)
|
||||
exceptions = []
|
||||
|
||||
test_map = dict()
|
||||
SUCCESSED = "successed"
|
||||
FAILED = "failed"
|
||||
|
||||
def try_manual(func):
|
||||
def import_manual_function(origin_func):
|
||||
|
@ -48,19 +52,49 @@ def try_manual(func):
|
|||
func_to_call = get_func_to_call()
|
||||
print("running {}()...".format(func.__name__))
|
||||
try:
|
||||
return func_to_call(*args, **kwargs)
|
||||
except (AssertionError, AzureError, CliTestError, CliExecutionError, JMESPathCheckAssertionError) as e:
|
||||
test_map[func.__name__] = dict()
|
||||
test_map[func.__name__]["result"] = SUCCESSED
|
||||
test_map[func.__name__]["error_message"] = ""
|
||||
test_map[func.__name__]["error_stack"] = ""
|
||||
test_map[func.__name__]["error_normalized"] = ""
|
||||
test_map[func.__name__]["start_dt"] = dt.datetime.utcnow()
|
||||
ret = func_to_call(*args, **kwargs)
|
||||
except (AssertionError, AzureError, CliTestError, CliExecutionError, SystemExit, JMESPathCheckAssertionError) as e:
|
||||
test_map[func.__name__]["end_dt"] = dt.datetime.utcnow()
|
||||
test_map[func.__name__]["result"] = FAILED
|
||||
test_map[func.__name__]["error_message"] = str(e).replace("\r\n", " ").replace("\n", " ")[:500]
|
||||
test_map[func.__name__]["error_stack"] = traceback.format_exc().replace("\r\n", " ").replace("\n", " ")[:500]
|
||||
print("--------------------------------------")
|
||||
print("step exception: ", e)
|
||||
print("--------------------------------------", file=sys.stderr)
|
||||
print("step exception in {}: {}".format(func.__name__, e), file=sys.stderr)
|
||||
traceback.print_exc()
|
||||
exceptions.append((func.__name__, sys.exc_info()))
|
||||
else:
|
||||
test_map[func.__name__]["end_dt"] = dt.datetime.utcnow()
|
||||
return ret
|
||||
|
||||
if inspect.isclass(func):
|
||||
return get_func_to_call()
|
||||
return wrapper
|
||||
|
||||
def calc_coverage(filename):
|
||||
filename = filename.split(".")[0]
|
||||
coverage_name = filename + "_coverage.md"
|
||||
with open(coverage_name, "w") as f:
|
||||
f.write("|Scenario|Result|ErrorMessage|ErrorStack|ErrorNormalized|StartDt|EndDt|\n")
|
||||
failed = 0
|
||||
total = len(test_map)
|
||||
covered = 0
|
||||
for k, v in test_map.items():
|
||||
if not k.startswith("step_"):
|
||||
total -= 1
|
||||
continue
|
||||
if v["result"] == SUCCESSED:
|
||||
covered += 1
|
||||
f.write("|{step_name}|{result}|{error_message}|{error_stack}|{error_normalized}|{start_dt}|{end_dt}|\n".format(step_name=k, **v))
|
||||
f.write("Coverage: {}/{}\n".format(covered, total))
|
||||
print("Create coverage\n", file=sys.stderr)
|
||||
|
||||
def raise_if():
|
||||
if exceptions:
|
||||
|
|
|
@ -10,7 +10,7 @@
|
|||
|
||||
import os
|
||||
from azure.cli.testsdk import ScenarioTest
|
||||
from .. import try_manual, raise_if
|
||||
from .. import try_manual, raise_if, calc_coverage
|
||||
from azure.cli.testsdk import ResourceGroupPreparer
|
||||
from .preparers import VirtualNetworkPreparer
|
||||
|
||||
|
@ -256,4 +256,5 @@ class ManagedNetworkManagementClientScenarioTest(ScenarioTest):
|
|||
})
|
||||
|
||||
call_scenario(self, rg)
|
||||
calc_coverage(__file__)
|
||||
raise_if()
|
||||
|
|
Загрузка…
Ссылка в новой задаче