зеркало из https://github.com/microsoft/CCF.git
Ci/test wait on client (#438)
This commit is contained in:
Родитель
aa86312e3e
Коммит
21335f9386
|
@ -7,8 +7,6 @@ from statistics import mean, harmonic_mean, median, pstdev
|
||||||
|
|
||||||
from loguru import logger as LOG
|
from loguru import logger as LOG
|
||||||
|
|
||||||
COMMIT_COUNT_CUTOFF = 15
|
|
||||||
|
|
||||||
|
|
||||||
class TxRates:
|
class TxRates:
|
||||||
def __init__(self, primary):
|
def __init__(self, primary):
|
||||||
|
@ -60,19 +58,12 @@ class TxRates:
|
||||||
rv = client.rpc("getCommit", {})
|
rv = client.rpc("getCommit", {})
|
||||||
result = rv.to_dict()
|
result = rv.to_dict()
|
||||||
next_commit = result["result"]["commit"]
|
next_commit = result["result"]["commit"]
|
||||||
if self.commit == next_commit:
|
more_to_process = self.commit != next_commit
|
||||||
self.same_commit_count += 1
|
|
||||||
else:
|
|
||||||
self.same_commit_count = 0
|
|
||||||
|
|
||||||
self.commit = next_commit
|
self.commit = next_commit
|
||||||
|
|
||||||
if self.same_commit_count >= COMMIT_COUNT_CUTOFF:
|
return more_to_process
|
||||||
self._get_metrics()
|
|
||||||
return False
|
|
||||||
return True
|
|
||||||
|
|
||||||
def _get_metrics(self):
|
def get_metrics(self):
|
||||||
with self.primary.user_client(format="json") as client:
|
with self.primary.user_client(format="json") as client:
|
||||||
rv = client.rpc("getMetrics", {})
|
rv = client.rpc("getMetrics", {})
|
||||||
result = rv.to_dict()
|
result = rv.to_dict()
|
||||||
|
|
|
@ -158,7 +158,7 @@ class SSHRemote(CmdMixin):
|
||||||
session = self.client.open_sftp()
|
session = self.client.open_sftp()
|
||||||
for path in self.files:
|
for path in self.files:
|
||||||
# Some files can be glob patterns
|
# Some files can be glob patterns
|
||||||
for f in glob.glob(os.path.basename(path)):
|
for f in glob.glob(path):
|
||||||
tgt_path = os.path.join(self.root, os.path.basename(f))
|
tgt_path = os.path.join(self.root, os.path.basename(f))
|
||||||
LOG.info("[{}] copy {} from {}".format(self.hostname, tgt_path, f))
|
LOG.info("[{}] copy {} from {}".format(self.hostname, tgt_path, f))
|
||||||
session.put(f, tgt_path)
|
session.put(f, tgt_path)
|
||||||
|
@ -215,7 +215,7 @@ class SSHRemote(CmdMixin):
|
||||||
for filepath in (self.err, self.out):
|
for filepath in (self.err, self.out):
|
||||||
try:
|
try:
|
||||||
local_filepath = "{}_{}_{}".format(
|
local_filepath = "{}_{}_{}".format(
|
||||||
self.hostname, filename, self.name
|
self.hostname, os.path.basename(filepath), self.name
|
||||||
)
|
)
|
||||||
session.get(filepath, local_filepath)
|
session.get(filepath, local_filepath)
|
||||||
LOG.info("Downloaded {}".format(local_filepath))
|
LOG.info("Downloaded {}".format(local_filepath))
|
||||||
|
@ -274,20 +274,27 @@ class SSHRemote(CmdMixin):
|
||||||
client = self._connect_new()
|
client = self._connect_new()
|
||||||
try:
|
try:
|
||||||
for _ in range(timeout):
|
for _ in range(timeout):
|
||||||
_, stdout, _ = client.exec_command(f"grep -F '{line}' {self.root}/out")
|
_, stdout, _ = client.exec_command(f"grep -F '{line}' {self.out}")
|
||||||
if stdout.channel.recv_exit_status() == 0:
|
if stdout.channel.recv_exit_status() == 0:
|
||||||
return
|
return
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
raise ValueError(
|
raise ValueError(f"{line} not found in stdout after {timeout} seconds")
|
||||||
"{} not found in stdout after {} seconds".format(line, timeout)
|
|
||||||
)
|
|
||||||
finally:
|
finally:
|
||||||
client.close()
|
client.close()
|
||||||
|
|
||||||
|
def check_for_stdout_line(self, line, timeout):
|
||||||
|
client = self._connect_new()
|
||||||
|
for _ in range(timeout):
|
||||||
|
_, stdout, _ = client.exec_command(f"grep -F '{line}' {self.out}")
|
||||||
|
if stdout.channel.recv_exit_status() == 0:
|
||||||
|
return True
|
||||||
|
time.sleep(1)
|
||||||
|
return False
|
||||||
|
|
||||||
def print_and_upload_result(self, name, metrics, lines):
|
def print_and_upload_result(self, name, metrics, lines):
|
||||||
client = self._connect_new()
|
client = self._connect_new()
|
||||||
try:
|
try:
|
||||||
_, stdout, _ = client.exec_command(f"tail -{lines} {self.root}/out")
|
_, stdout, _ = client.exec_command(f"tail -{lines} {self.out}")
|
||||||
if stdout.channel.recv_exit_status() == 0:
|
if stdout.channel.recv_exit_status() == 0:
|
||||||
LOG.success(f"Result for {self.name}:")
|
LOG.success(f"Result for {self.name}:")
|
||||||
self._print_upload_perf(name, metrics, stdout.read().splitlines())
|
self._print_upload_perf(name, metrics, stdout.read().splitlines())
|
||||||
|
@ -421,13 +428,22 @@ class LocalRemote(CmdMixin):
|
||||||
for _ in range(timeout):
|
for _ in range(timeout):
|
||||||
with open(self.out, "rb") as out:
|
with open(self.out, "rb") as out:
|
||||||
for out_line in out:
|
for out_line in out:
|
||||||
if line.strip() in out_line.strip().decode():
|
if line in out_line.decode():
|
||||||
return
|
return
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
raise ValueError(
|
raise ValueError(
|
||||||
"{} not found in stdout after {} seconds".format(line, timeout)
|
"{} not found in stdout after {} seconds".format(line, timeout)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def check_for_stdout_line(self, line, timeout):
|
||||||
|
for _ in range(timeout):
|
||||||
|
with open(self.out, "rb") as out:
|
||||||
|
for out_line in out:
|
||||||
|
if line in out_line.decode():
|
||||||
|
return True
|
||||||
|
time.sleep(1)
|
||||||
|
return False
|
||||||
|
|
||||||
def print_and_upload_result(self, name, metrics, line):
|
def print_and_upload_result(self, name, metrics, line):
|
||||||
with open(self.out, "rb") as out:
|
with open(self.out, "rb") as out:
|
||||||
lines = out.read().splitlines()
|
lines = out.read().splitlines()
|
||||||
|
|
|
@ -99,5 +99,8 @@ class CCFRemoteClient(object):
|
||||||
LOG.exception("Failed to wait on client {}".format(self.name))
|
LOG.exception("Failed to wait on client {}".format(self.name))
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
def check_done(self):
|
||||||
|
return self.remote.check_for_stdout_line(line="Global commit", timeout=5)
|
||||||
|
|
||||||
def print_and_upload_result(self, name, metrics):
|
def print_and_upload_result(self, name, metrics):
|
||||||
self.remote.print_and_upload_result(name, metrics, self.LINES_RESULT_FROM_END)
|
self.remote.print_and_upload_result(name, metrics, self.LINES_RESULT_FROM_END)
|
||||||
|
|
|
@ -129,17 +129,27 @@ def run(build_directory, get_command, args):
|
||||||
tx_rates = infra.rates.TxRates(primary)
|
tx_rates = infra.rates.TxRates(primary)
|
||||||
while True:
|
while True:
|
||||||
if not tx_rates.process_next():
|
if not tx_rates.process_next():
|
||||||
|
stop_waiting = True
|
||||||
for i, remote_client in enumerate(clients):
|
for i, remote_client in enumerate(clients):
|
||||||
remote_client.wait()
|
done = remote_client.check_done()
|
||||||
remote_client.print_and_upload_result(args.label, metrics)
|
# all the clients need to be done
|
||||||
remote_client.stop()
|
LOG.info(
|
||||||
break
|
f"Client {i} has {'completed' if done else 'not completed'} running"
|
||||||
|
)
|
||||||
|
stop_waiting = stop_waiting and done
|
||||||
|
if stop_waiting:
|
||||||
|
break
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
|
|
||||||
|
tx_rates.get_metrics()
|
||||||
|
for remote_client in clients:
|
||||||
|
remote_client.print_and_upload_result(args.label, metrics)
|
||||||
|
remote_client.stop()
|
||||||
|
|
||||||
LOG.info(f"Rates:\n{tx_rates}")
|
LOG.info(f"Rates:\n{tx_rates}")
|
||||||
tx_rates.save_results(args.metrics_file)
|
tx_rates.save_results(args.metrics_file)
|
||||||
metrics.publish()
|
metrics.publish()
|
||||||
|
|
||||||
except KeyboardInterrupt:
|
except Exception:
|
||||||
for remote_client in clients:
|
for remote_client in clients:
|
||||||
remote_client.stop()
|
remote_client.stop()
|
||||||
|
|
Загрузка…
Ссылка в новой задаче