diff --git a/tests/infra/rates.py b/tests/infra/rates.py index e3ea27b8a..c447d4e9f 100644 --- a/tests/infra/rates.py +++ b/tests/infra/rates.py @@ -7,8 +7,6 @@ from statistics import mean, harmonic_mean, median, pstdev from loguru import logger as LOG -COMMIT_COUNT_CUTOFF = 15 - class TxRates: def __init__(self, primary): @@ -60,19 +58,12 @@ class TxRates: rv = client.rpc("getCommit", {}) result = rv.to_dict() next_commit = result["result"]["commit"] - if self.commit == next_commit: - self.same_commit_count += 1 - else: - self.same_commit_count = 0 - + more_to_process = self.commit != next_commit self.commit = next_commit - if self.same_commit_count >= COMMIT_COUNT_CUTOFF: - self._get_metrics() - return False - return True + return more_to_process - def _get_metrics(self): + def get_metrics(self): with self.primary.user_client(format="json") as client: rv = client.rpc("getMetrics", {}) result = rv.to_dict() diff --git a/tests/infra/remote.py b/tests/infra/remote.py index 06ffc633f..4bdf641ca 100644 --- a/tests/infra/remote.py +++ b/tests/infra/remote.py @@ -158,7 +158,7 @@ class SSHRemote(CmdMixin): session = self.client.open_sftp() for path in self.files: # Some files can be glob patterns - for f in glob.glob(os.path.basename(path)): + for f in glob.glob(path): tgt_path = os.path.join(self.root, os.path.basename(f)) LOG.info("[{}] copy {} from {}".format(self.hostname, tgt_path, f)) session.put(f, tgt_path) @@ -215,7 +215,7 @@ class SSHRemote(CmdMixin): for filepath in (self.err, self.out): try: local_filepath = "{}_{}_{}".format( - self.hostname, filename, self.name + self.hostname, os.path.basename(filepath), self.name ) session.get(filepath, local_filepath) LOG.info("Downloaded {}".format(local_filepath)) @@ -274,20 +274,27 @@ class SSHRemote(CmdMixin): client = self._connect_new() try: for _ in range(timeout): - _, stdout, _ = client.exec_command(f"grep -F '{line}' {self.root}/out") + _, stdout, _ = client.exec_command(f"grep -F '{line}' {self.out}") if stdout.channel.recv_exit_status() == 0: return time.sleep(1) - raise ValueError( - "{} not found in stdout after {} seconds".format(line, timeout) - ) + raise ValueError(f"{line} not found in stdout after {timeout} seconds") finally: client.close() + def check_for_stdout_line(self, line, timeout): + client = self._connect_new() + for _ in range(timeout): + _, stdout, _ = client.exec_command(f"grep -F '{line}' {self.out}") + if stdout.channel.recv_exit_status() == 0: + return True + time.sleep(1) + return False + def print_and_upload_result(self, name, metrics, lines): client = self._connect_new() try: - _, stdout, _ = client.exec_command(f"tail -{lines} {self.root}/out") + _, stdout, _ = client.exec_command(f"tail -{lines} {self.out}") if stdout.channel.recv_exit_status() == 0: LOG.success(f"Result for {self.name}:") self._print_upload_perf(name, metrics, stdout.read().splitlines()) @@ -421,13 +428,22 @@ class LocalRemote(CmdMixin): for _ in range(timeout): with open(self.out, "rb") as out: for out_line in out: - if line.strip() in out_line.strip().decode(): + if line in out_line.decode(): return time.sleep(1) raise ValueError( "{} not found in stdout after {} seconds".format(line, timeout) ) + def check_for_stdout_line(self, line, timeout): + for _ in range(timeout): + with open(self.out, "rb") as out: + for out_line in out: + if line in out_line.decode(): + return True + time.sleep(1) + return False + def print_and_upload_result(self, name, metrics, line): with open(self.out, "rb") as out: lines = out.read().splitlines() diff --git a/tests/infra/remote_client.py b/tests/infra/remote_client.py index 50156be32..79efb7eee 100644 --- a/tests/infra/remote_client.py +++ b/tests/infra/remote_client.py @@ -99,5 +99,8 @@ class CCFRemoteClient(object): LOG.exception("Failed to wait on client {}".format(self.name)) raise + def check_done(self): + return self.remote.check_for_stdout_line(line="Global commit", timeout=5) + def print_and_upload_result(self, name, metrics): self.remote.print_and_upload_result(name, metrics, self.LINES_RESULT_FROM_END) diff --git a/tests/infra/runner.py b/tests/infra/runner.py index 72776ac55..2983d862b 100644 --- a/tests/infra/runner.py +++ b/tests/infra/runner.py @@ -129,17 +129,27 @@ def run(build_directory, get_command, args): tx_rates = infra.rates.TxRates(primary) while True: if not tx_rates.process_next(): + stop_waiting = True for i, remote_client in enumerate(clients): - remote_client.wait() - remote_client.print_and_upload_result(args.label, metrics) - remote_client.stop() - break + done = remote_client.check_done() + # all the clients need to be done + LOG.info( + f"Client {i} has {'completed' if done else 'not completed'} running" + ) + stop_waiting = stop_waiting and done + if stop_waiting: + break time.sleep(1) + tx_rates.get_metrics() + for remote_client in clients: + remote_client.print_and_upload_result(args.label, metrics) + remote_client.stop() + LOG.info(f"Rates:\n{tx_rates}") tx_rates.save_results(args.metrics_file) metrics.publish() - except KeyboardInterrupt: + except Exception: for remote_client in clients: remote_client.stop()