From 4a8508622b5e25041a4c72887589184ca287937a Mon Sep 17 00:00:00 2001 From: Bert Kleewein Date: Mon, 11 May 2020 10:16:10 -0700 Subject: [PATCH] add basic python perf test (#271) --- .flake8 | 4 +- .../wrapper/python_glue/leak_check.py | 20 +- test-runner/pytest.ini | 2 +- test-runner/test_perf.py | 240 ++++++++++++++++++ 4 files changed, 259 insertions(+), 7 deletions(-) create mode 100644 test-runner/test_perf.py diff --git a/.flake8 b/.flake8 index 6703910..4960f43 100644 --- a/.flake8 +++ b/.flake8 @@ -2,7 +2,7 @@ # E501: line length (handled by black) # W503, E203: Not PEP8 compliant (incompatible with black) # Ignore generated code -ignore = E501,W503,E203,F401,F403 +ignore = E501,W503,E203 exclude = .git, - __pycache__ \ No newline at end of file + __pycache__ diff --git a/docker_images/pythonv2/wrapper/python_glue/leak_check.py b/docker_images/pythonv2/wrapper/python_glue/leak_check.py index 81be04b..1655e1b 100644 --- a/docker_images/pythonv2/wrapper/python_glue/leak_check.py +++ b/docker_images/pythonv2/wrapper/python_glue/leak_check.py @@ -35,8 +35,13 @@ def _dump_referrers(obj): print(" dict: {}".format(referrer)) for sub_referrer in gc.get_referrers(referrer): if sub_referrer != referrers: - print(" used by: {}:{}".format(type(sub_referrer), sub_referrer)) - elif not isinstance(referrer, type): + if not inspect.ismodule(sub_referrer): + print( + " used by: {}:{}".format( + type(sub_referrer), sub_referrer + ) + ) + elif not isinstance(referrer, type) and not inspect.ismodule(referrer): print(" used by: {}:{}".format(type(referrer), referrer)) @@ -169,10 +174,17 @@ class LeakTracker(object): len(all_tracked_objects) ) ) + count = 0 for obj in all_tracked_objects: - logger.error("LEAK: {}".format(obj)) - _dump_referrers(obj) + count += 1 + if count <= 100: + logger.error("LEAK: {}".format(obj)) + _dump_referrers(obj) self.previous_leaks.append(obj) + if count < len(all_tracked_objects): + logger.errer( + "and {} more objects".format(len(all_tracked_objects) - count) + ) assert False else: logger.info("No leaks") diff --git a/test-runner/pytest.ini b/test-runner/pytest.ini index ba4f5ce..69099a1 100644 --- a/test-runner/pytest.ini +++ b/test-runner/pytest.ini @@ -1,6 +1,6 @@ [pytest] testdox_format = plaintext -addopts = --testdox --showlocals --tb=short +addopts = --testdox --tb=no -rn junit_logging=all junit_family=xunit2 junit_log_passing_tests=False diff --git a/test-runner/test_perf.py b/test-runner/test_perf.py new file mode 100644 index 0000000..0e586d9 --- /dev/null +++ b/test-runner/test_perf.py @@ -0,0 +1,240 @@ +# Copyright (c) Microsoft. All rights reserved. +# Licensed under the MIT license. See LICENSE file in the project root for +# full license information. + +import pytest +import asyncio +import datetime +import math +import sample_content +import threading +import contextlib +from horton_logging import logger + +pytestmark = pytest.mark.asyncio + +# per-test-case timeout for this module +test_timeout = 900 + + +class ExcThread(threading.Thread): + def __init__(self, target, args=None): + self.args = args if args else [] + self.target = target + self.exc = None + threading.Thread.__init__(self) + + def run(self): + try: + self.target(*self.args) + except Exception as e: + # self.exc =sys.exc_info() + self.exc = e + + +class SampleAverage(object): + def __init__(self): + self.lock = threading.Lock() + self.total = 0 + self.sample_count = 0 + + def add_sample(self, sample): + with self.lock: + self.sample_count += 1 + self.total += sample + + def get_average(self): + with self.lock: + return self.total / self.sample_count + + +class InstanceCounter(contextlib.AbstractContextManager): + def __init__(self, object_type): + self.instance_count = 0 + self.max_instances = 0 + self.lock = threading.Lock() + self.object_type = object_type + self.at_zero = threading.Event() + + def __enter__(self): + with self.lock: + self.instance_count += 1 + if self.instance_count > self.max_instances: + self.max_instances = self.instance_count + self.at_zero.clear() + + def __exit__(self, *args): + with self.lock: + self.instance_count -= 1 + logger("{} count at {}".format(self.object_type, self.instance_count)) + if self.instance_count == 0: + self.at_zero.set() + + def wait_for_zero(self): + self.at_zero.wait() + + def get_max(self): + with self.lock: + return self.max_instances + + def get_count(self): + with self.lock: + return self.instance_count + + +class LatencyMeasurement(contextlib.AbstractContextManager): + def __init__(self): + self.start_time = None + self.end_time = None + + def __enter__(self): + self.start_time = datetime.datetime.now() + + def __exit__(self, *args): + self.end_time = datetime.datetime.now() + + def get_latency(self): + return (self.end_time - self.start_time).total_seconds() + + +class PerfTest(object): + async def test_throughput(self, client): + count = 2500 + + payloads = [sample_content.make_message_payload() for x in range(0, count)] + + start_time = datetime.datetime.now() + await asyncio.gather(*[client.send_event(payload) for payload in payloads]) + duration = (datetime.datetime.now() - start_time).total_seconds() + + mps = math.floor(count / duration) + + logger( + "{} messages were sent in {} seconds for {} messages/second".format( + count, duration, mps + ) + ) + + async def do_test_multithreaded(self, client, events_per): + threads = [] + duration = 30 + + message_counter = InstanceCounter("Message") + thread_counter = InstanceCounter("Thread") + average_latency = SampleAverage() + + async def send_single(): + latency = LatencyMeasurement() + with message_counter: + with latency: + await client.send_event(sample_content.make_message_payload()) + average_latency.add_sample(latency.get_latency()) + + def threadproc(): + with thread_counter: + + async def main(): + results = await asyncio.gather( + *[send_single() for i in range(0, events_per)] + ) + + for result in results: + if isinstance(result, Exception): + raise result + + asyncio.run(main()) + + for i in range(0, duration): + t = ExcThread(target=threadproc) + t.start() + threads.append(t) + + await asyncio.sleep(1) + + old_threads = threads + threads = [] + for t in old_threads: + if t.isAlive(): + threads.append(t) + elif t.exc: + raise (t.exc) + + logger( + "events_per = {} messages left = {}".format( + events_per, message_counter.get_count() + ) + ) + + thread_counter.wait_for_zero() + + logger("{} threads max".format(thread_counter.get_max())) + logger("Average latency {} seconds".format(average_latency.get_average())) + + return thread_counter.get_max(), average_latency.get_average() + + async def test_multithreaded(self, client): + first = 1 + last = 60 + biggest_success = 0 + smallest_failure = last + 1 + found = False + results = [] + + try: + while first <= last and not found: + midpoint = (first + last) // 2 + logger("running with {} events per batch".format(midpoint)) + threads, latency = await self.do_test_multithreaded(client, midpoint) + results.append( + {"evens_per": midpoint, "max_threads": threads, "latency": latency} + ) + if threads != 1: + logger( + "FAILED with {} events per second ({},{})".format( + midpoint, threads, latency + ) + ) + assert midpoint < smallest_failure + smallest_failure = midpoint + else: + logger( + "SUCCEEDED with {} events per second ({},{})".format( + midpoint, threads, latency + ) + ) + assert midpoint > biggest_success + biggest_success = midpoint + if biggest_success + 1 >= smallest_failure: + found = True + else: + if threads > 1: + last = midpoint - 1 + else: + first = midpoint + 1 + + finally: + logger("FINAL RESULT:") + logger("biggest_success = {} events per second".format(biggest_success)) + logger("smallest_failure = {} events per second".format(smallest_failure)) + logger("INDIVIDUAL RESULTS:") + for res in results: + logger(res) + + +@pytest.mark.testgroup_edgehub_module_2h_stress +@pytest.mark.testgroup_iothub_module_2h_stress +@pytest.mark.describe("Module Client Perf") +@pytest.mark.timeout(test_timeout) +class TestModuleClientPerf(PerfTest): + @pytest.fixture + def client(self, test_module): + return test_module + + +@pytest.mark.testgroup_iothub_device_2h_stress +@pytest.mark.describe("Device Client Perf") +@pytest.mark.timeout(test_timeout) +class TestDeviceClientPerf(PerfTest): + @pytest.fixture + def client(self, test_device): + return test_device