pytest: Scorecard tracking CPU and RSS

Closes #12765
This commit is contained in:
Stefan Eissing 2024-01-19 15:37:46 +01:00 коммит произвёл Daniel Stenberg
Родитель 621cab26a9
Коммит 1c550b17eb
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 5CC908FDB71E12C2
5 изменённых файлов: 286 добавлений и 111 удалений

Просмотреть файл

@ -27,3 +27,4 @@ pytest
cryptography
multipart
websockets
psutil

Просмотреть файл

@ -33,7 +33,7 @@ import sys
from statistics import mean
from typing import Dict, Any, Optional, List
from testenv import Env, Httpd, Nghttpx, CurlClient, Caddy, ExecResult, NghttpxFwd
from testenv import Env, Httpd, Nghttpx, CurlClient, Caddy, ExecResult, NghttpxQuic, RunProfile
log = logging.getLogger(__name__)
@ -122,57 +122,65 @@ class ScoreCard:
count = 1
samples = []
errors = []
profiles = []
self.info(f'single...')
for i in range(sample_size):
curl = CurlClient(env=self.env, silent=self._silent_curl)
r = curl.http_download(urls=[url], alpn_proto=proto, no_save=True,
with_headers=False)
with_headers=False, with_profile=True)
err = self._check_downloads(r, count)
if err:
errors.append(err)
else:
total_size = sum([s['size_download'] for s in r.stats])
samples.append(total_size / r.duration.total_seconds())
profiles.append(r.profile)
return {
'count': count,
'samples': sample_size,
'speed': mean(samples) if len(samples) else -1,
'errors': errors
'errors': errors,
'stats': RunProfile.AverageStats(profiles),
}
def transfer_serial(self, url: str, proto: str, count: int):
sample_size = 1
samples = []
errors = []
profiles = []
url = f'{url}?[0-{count - 1}]'
self.info(f'serial...')
for i in range(sample_size):
curl = CurlClient(env=self.env, silent=self._silent_curl)
r = curl.http_download(urls=[url], alpn_proto=proto, no_save=True,
with_headers=False)
with_headers=False, with_profile=True)
err = self._check_downloads(r, count)
if err:
errors.append(err)
else:
total_size = sum([s['size_download'] for s in r.stats])
samples.append(total_size / r.duration.total_seconds())
profiles.append(r.profile)
return {
'count': count,
'samples': sample_size,
'speed': mean(samples) if len(samples) else -1,
'errors': errors
'errors': errors,
'stats': RunProfile.AverageStats(profiles),
}
def transfer_parallel(self, url: str, proto: str, count: int):
sample_size = 1
samples = []
errors = []
profiles = []
url = f'{url}?[0-{count - 1}]'
self.info(f'parallel...')
for i in range(sample_size):
curl = CurlClient(env=self.env, silent=self._silent_curl)
r = curl.http_download(urls=[url], alpn_proto=proto, no_save=True,
with_headers=False,
with_profile=True,
extra_args=['--parallel',
'--parallel-max', str(count)])
err = self._check_downloads(r, count)
@ -181,21 +189,25 @@ class ScoreCard:
else:
total_size = sum([s['size_download'] for s in r.stats])
samples.append(total_size / r.duration.total_seconds())
profiles.append(r.profile)
return {
'count': count,
'samples': sample_size,
'speed': mean(samples) if len(samples) else -1,
'errors': errors
'errors': errors,
'stats': RunProfile.AverageStats(profiles),
}
def download_url(self, label: str, url: str, proto: str, count: int):
self.info(f' {count}x{label}: ')
props = {
'single': self.transfer_single(url=url, proto=proto, count=10),
'serial': self.transfer_serial(url=url, proto=proto, count=count),
'parallel': self.transfer_parallel(url=url, proto=proto,
count=count),
}
if count > 1:
props['serial'] = self.transfer_serial(url=url, proto=proto,
count=count)
props['parallel'] = self.transfer_parallel(url=url, proto=proto,
count=count)
self.info(f'ok.\n')
return props
@ -216,8 +228,7 @@ class ScoreCard:
'description': descr,
}
for fsize in fsizes:
label = f'{int(fsize / 1024)}KB' if fsize < 1024*1024 else \
f'{int(fsize / (1024 * 1024))}MB'
label = self.fmt_size(fsize)
fname = f'score{label}.data'
self._make_docs_file(docs_dir=self.httpd.docs_dir,
fname=fname, fsize=fsize)
@ -234,8 +245,7 @@ class ScoreCard:
'description': descr,
}
for fsize in fsizes:
label = f'{int(fsize / 1024)}KB' if fsize < 1024*1024 else \
f'{int(fsize / (1024 * 1024))}MB'
label = self.fmt_size(fsize)
fname = f'score{label}.data'
self._make_docs_file(docs_dir=self.caddy.docs_dir,
fname=fname, fsize=fsize)
@ -250,6 +260,7 @@ class ScoreCard:
sample_size = 1
samples = []
errors = []
profiles = []
url = f'{url}?[0-{count - 1}]'
extra_args = ['--parallel', '--parallel-max', str(max_parallel)] \
if max_parallel > 1 else []
@ -257,7 +268,7 @@ class ScoreCard:
for i in range(sample_size):
curl = CurlClient(env=self.env, silent=self._silent_curl)
r = curl.http_download(urls=[url], alpn_proto=proto, no_save=True,
with_headers=False,
with_headers=False, with_profile=True,
extra_args=extra_args)
err = self._check_downloads(r, count)
if err:
@ -265,30 +276,32 @@ class ScoreCard:
else:
for _ in r.stats:
samples.append(count / r.duration.total_seconds())
profiles.append(r.profile)
return {
'count': count,
'samples': sample_size,
'speed': mean(samples) if len(samples) else -1,
'errors': errors
'errors': errors,
'stats': RunProfile.AverageStats(profiles),
}
def requests_url(self, url: str, proto: str, count: int):
self.info(f' {url}: ')
props = {
'serial': self.do_requests(url=url, proto=proto, count=count),
'par-6': self.do_requests(url=url, proto=proto, count=count,
max_parallel=6),
'par-25': self.do_requests(url=url, proto=proto, count=count,
max_parallel=25),
'par-50': self.do_requests(url=url, proto=proto, count=count,
max_parallel=50),
'par-100': self.do_requests(url=url, proto=proto, count=count,
max_parallel=100),
'1': self.do_requests(url=url, proto=proto, count=count),
'6': self.do_requests(url=url, proto=proto, count=count,
max_parallel=6),
'25': self.do_requests(url=url, proto=proto, count=count,
max_parallel=25),
'50': self.do_requests(url=url, proto=proto, count=count,
max_parallel=50),
'100': self.do_requests(url=url, proto=proto, count=count,
max_parallel=100),
}
self.info(f'ok.\n')
return props
def requests(self, proto: str) -> Dict[str, Any]:
def requests(self, proto: str, req_count) -> Dict[str, Any]:
scores = {}
if self.httpd:
if proto == 'h3':
@ -305,7 +318,8 @@ class ScoreCard:
url1 = f'https://{self.env.domain1}:{port}/reqs10.data'
scores[via] = {
'description': descr,
'10KB': self.requests_url(url=url1, proto=proto, count=10000),
'count': req_count,
'10KB': self.requests_url(url=url1, proto=proto, count=req_count),
}
if self.caddy:
port = self.caddy.port
@ -317,7 +331,8 @@ class ScoreCard:
url1 = f'https://{self.env.domain1}:{port}/req10.data'
scores[via] = {
'description': descr,
'10KB': self.requests_url(url=url1, proto=proto, count=5000),
'count': req_count,
'10KB': self.requests_url(url=url1, proto=proto, count=req_count),
}
return scores
@ -325,6 +340,7 @@ class ScoreCard:
handshakes: bool = True,
downloads: Optional[List[int]] = None,
download_count: int = 50,
req_count=5000,
requests: bool = True):
self.info(f"scoring {proto}\n")
p = {}
@ -368,15 +384,22 @@ class ScoreCard:
count=download_count,
fsizes=downloads)
if requests:
score['requests'] = self.requests(proto=proto)
score['requests'] = self.requests(proto=proto, req_count=req_count)
self.info("\n")
return score
def fmt_ms(self, tval):
return f'{int(tval*1000)} ms' if tval >= 0 else '--'
def fmt_mb(self, val):
return f'{val/(1024*1024):0.000f} MB' if val >= 0 else '--'
def fmt_size(self, val):
if val >= (1024*1024*1024):
return f'{val / (1024*1024*1024):0.000f}GB'
elif val >= (1024 * 1024):
return f'{val / (1024*1024):0.000f}MB'
elif val >= 1024:
return f'{val / 1024:0.000f}KB'
else:
return f'{val:0.000f}B'
def fmt_mbs(self, val):
return f'{val/(1024*1024):0.000f} MB/s' if val >= 0 else '--'
@ -398,46 +421,93 @@ class ScoreCard:
f'{"/".join(val["ipv4-errors"] + val["ipv6-errors"]):<20}'
)
if 'downloads' in score:
print('Downloads')
print(f' {"Server":<8} {"Size":>8} {"Single":>12} {"Serial":>12}'
f' {"Parallel":>12} {"Errors":<20}')
skeys = {}
for dkey, dval in score["downloads"].items():
for k in dval.keys():
skeys[k] = True
for skey in skeys:
for dkey, dval in score["downloads"].items():
if skey in dval:
sval = dval[skey]
if isinstance(sval, str):
continue
errors = []
for key, val in sval.items():
if 'errors' in val:
errors.extend(val['errors'])
print(f' {dkey:<8} {skey:>8} '
f'{self.fmt_mbs(sval["single"]["speed"]):>12} '
f'{self.fmt_mbs(sval["serial"]["speed"]):>12} '
f'{self.fmt_mbs(sval["parallel"]["speed"]):>12} '
f' {"/".join(errors):<20}')
if 'requests' in score:
print('Requests, max in parallel')
print(f' {"Server":<8} {"Size":>8} '
f'{"1 ":>12} {"6 ":>12} {"25 ":>12} '
f'{"50 ":>12} {"100 ":>12} {"Errors":<20}')
for dkey, dval in score["requests"].items():
for skey, sval in dval.items():
if isinstance(sval, str):
# get the key names of all sizes and measurements made
sizes = []
measures = []
m_names = {}
mcol_width = 12
mcol_sw = 17
for server, server_score in score['downloads'].items():
for sskey, ssval in server_score.items():
if isinstance(ssval, str):
continue
if sskey not in sizes:
sizes.append(sskey)
for mkey, mval in server_score[sskey].items():
if mkey not in measures:
measures.append(mkey)
m_names[mkey] = f'{mkey}({mval["count"]}x)'
print('Downloads')
print(f' {"Server":<8} {"Size":>8}', end='')
for m in measures: print(f' {m_names[m]:>{mcol_width}} {"[cpu/rss]":<{mcol_sw}}', end='')
print(f' {"Errors":^20}')
for server in score['downloads']:
for size in sizes:
size_score = score['downloads'][server][size]
print(f' {server:<8} {size:>8}', end='')
errors = []
for key, val in sval.items():
for key, val in size_score.items():
if 'errors' in val:
errors.extend(val['errors'])
line = f' {dkey:<8} {skey:>8} '
for k in sval.keys():
line += f'{self.fmt_reqs(sval[k]["speed"]):>12} '
line += f' {"/".join(errors):<20}'
print(line)
for m in measures:
if m in size_score:
print(f' {self.fmt_mbs(size_score[m]["speed"]):>{mcol_width}}', end='')
s = f'[{size_score[m]["stats"]["cpu"]:>.1f}%'\
f'/{self.fmt_size(size_score[m]["stats"]["rss"])}]'
print(f' {s:<{mcol_sw}}', end='')
else:
print(' '*mcol_width, end='')
if len(errors):
print(f' {"/".join(errors):<20}')
else:
print(f' {"-":^20}')
if 'requests' in score:
sizes = []
measures = []
m_names = {}
mcol_width = 9
mcol_sw = 13
for server in score['requests']:
server_score = score['requests'][server]
for sskey, ssval in server_score.items():
if isinstance(ssval, str) or isinstance(ssval, int):
continue
if sskey not in sizes:
sizes.append(sskey)
for mkey, mval in server_score[sskey].items():
if mkey not in measures:
measures.append(mkey)
m_names[mkey] = f'{mkey}'
print('Requests, max in parallel')
print(f' {"Server":<8} {"Size":>6} {"Reqs":>6}', end='')
for m in measures: print(f' {m_names[m]:>{mcol_width}} {"[cpu/rss]":<{mcol_sw}}', end='')
print(f' {"Errors":^10}')
for server in score['requests']:
for size in sizes:
size_score = score['requests'][server][size]
count = score['requests'][server]['count']
print(f' {server:<8} {size:>6} {count:>6}', end='')
errors = []
for key, val in size_score.items():
if 'errors' in val:
errors.extend(val['errors'])
for m in measures:
if m in size_score:
print(f' {self.fmt_reqs(size_score[m]["speed"]):>{mcol_width}}', end='')
s = f'[{size_score[m]["stats"]["cpu"]:>.1f}%'\
f'/{self.fmt_size(size_score[m]["stats"]["rss"])}]'
print(f' {s:<{mcol_sw}}', end='')
else:
print(' '*mcol_width, end='')
if len(errors):
print(f' {"/".join(errors):<10}')
else:
print(f' {"-":^10}')
def parse_size(s):
@ -445,7 +515,9 @@ def parse_size(s):
if m is None:
raise Exception(f'unrecognized size: {s}')
size = int(m.group(1))
if m.group(2).lower() == 'kb':
if not m.group(2):
pass
elif m.group(2).lower() == 'kb':
size *= 1024
elif m.group(2).lower() == 'mb':
size *= 1024 * 1024
@ -466,13 +538,15 @@ def main():
parser.add_argument("-H", "--handshakes", action='store_true',
default=False, help="evaluate handshakes only")
parser.add_argument("-d", "--downloads", action='store_true',
default=False, help="evaluate downloads only")
default=False, help="evaluate downloads")
parser.add_argument("--download", action='append', type=str,
default=None, help="evaluate download size")
parser.add_argument("--download-count", action='store', type=int,
default=50, help="perform that many downloads")
parser.add_argument("-r", "--requests", action='store_true',
default=False, help="evaluate requests only")
default=False, help="evaluate requests")
parser.add_argument("--request-count", action='store', type=int,
default=5000, help="perform that many requests")
parser.add_argument("--httpd", action='store_true', default=False,
help="evaluate httpd server only")
parser.add_argument("--caddy", action='store_true', default=False,
@ -491,29 +565,23 @@ def main():
protocol = args.protocol
handshakes = True
downloads = [1024*1024, 10*1024*1024, 100*1024*1024]
downloads = [1024 * 1024, 10 * 1024 * 1024, 100 * 1024 * 1024]
if args.download is not None:
downloads = []
for x in args.download:
downloads.extend([parse_size(s) for s in x.split(',')])
requests = True
if args.downloads or args.requests or args.handshakes:
handshakes = args.handshakes
if not args.downloads:
downloads = None
requests = args.requests
test_httpd = protocol != 'h3'
test_caddy = True
if args.handshakes:
downloads = None
requests = False
if args.downloads:
handshakes = False
requests = False
if args.download:
downloads = sorted([parse_size(x) for x in args.download])
handshakes = False
requests = False
if args.requests:
handshakes = False
downloads = None
if args.caddy:
test_caddy = True
test_httpd = False
if args.httpd:
test_caddy = False
test_httpd = True
if args.caddy or args.httpd:
test_caddy = args.caddy
test_httpd = args.httpd
rv = 0
env = Env()
@ -530,7 +598,7 @@ def main():
httpd.clear_logs()
assert httpd.start()
if 'h3' == protocol:
nghttpx = NghttpxFwd(env=env)
nghttpx = NghttpxQuic(env=env)
nghttpx.clear_logs()
assert nghttpx.start()
if test_caddy and env.caddy:
@ -544,6 +612,7 @@ def main():
handshakes=handshakes,
downloads=downloads,
download_count=args.download_count,
req_count=args.request_count,
requests=requests)
if args.json:
print(json.JSONEncoder(indent=2).encode(score))

Просмотреть файл

@ -28,6 +28,7 @@ import difflib
import filecmp
import logging
import os
import time
import pytest
from testenv import Env, CurlClient

Просмотреть файл

@ -32,7 +32,7 @@ from .env import Env
from .certs import TestCA, Credentials
from .caddy import Caddy
from .httpd import Httpd
from .curl import CurlClient, ExecResult
from .curl import CurlClient, ExecResult, RunProfile
from .client import LocalClient
from .nghttpx import Nghttpx
from .nghttpx import Nghttpx, NghttpxQuic, NghttpxFwd

Просмотреть файл

@ -27,11 +27,13 @@
import json
import logging
import os
import psutil
import re
import shutil
import subprocess
from statistics import mean, fmean
from datetime import timedelta, datetime
from typing import List, Optional, Dict, Union
from typing import List, Optional, Dict, Union, Any
from urllib.parse import urlparse
from .env import Env
@ -40,18 +42,81 @@ from .env import Env
log = logging.getLogger(__name__)
class RunProfile:
STAT_KEYS = ['cpu', 'rss', 'vsz']
@classmethod
def AverageStats(cls, profiles: List['RunProfile']):
avg = {}
stats = [p.stats for p in profiles]
for key in cls.STAT_KEYS:
avg[key] = mean([s[key] for s in stats])
return avg
def __init__(self, pid: int, started_at: datetime, run_dir):
self._pid = pid
self._started_at = started_at
self._duration = timedelta(seconds=0)
self._run_dir = run_dir
self._samples = []
self._psu = None
self._stats = None
@property
def duration(self) -> timedelta:
return self._duration
@property
def stats(self) -> Optional[Dict[str,Any]]:
return self._stats
def sample(self):
elapsed = datetime.now() - self._started_at
try:
if self._psu is None:
self._psu = psutil.Process(pid=self._pid)
mem = self._psu.memory_info()
self._samples.append({
'time': elapsed,
'cpu': self._psu.cpu_percent(),
'vsz': mem.vms,
'rss': mem.rss,
})
except psutil.NoSuchProcess:
pass
def finish(self):
self._duration = datetime.now() - self._started_at
if len(self._samples) > 0:
weights = [s['time'].total_seconds() for s in self._samples]
self._stats = {}
for key in self.STAT_KEYS:
self._stats[key] = fmean([s[key] for s in self._samples], weights)
else:
self._stats = None
self._psu = None
def __repr__(self):
return f'RunProfile[pid={self._pid}, '\
f'duration={self.duration.total_seconds():.3f}s, '\
f'stats={self.stats}]'
class ExecResult:
def __init__(self, args: List[str], exit_code: int,
stdout: List[str], stderr: List[str],
duration: Optional[timedelta] = None,
with_stats: bool = False,
exception: Optional[str] = None):
exception: Optional[str] = None,
profile: Optional[RunProfile] = None):
self._args = args
self._exit_code = exit_code
self._exception = exception
self._stdout = stdout
self._stderr = stderr
self._profile = profile
self._duration = duration if duration is not None else timedelta()
self._response = None
self._responses = []
@ -116,6 +181,10 @@ class ExecResult:
def duration(self) -> timedelta:
return self._duration
@property
def profile(self) -> Optional[RunProfile]:
return self._profile
@property
def response(self) -> Optional[Dict]:
return self._response
@ -344,14 +413,15 @@ class CurlClient:
return xargs
def http_get(self, url: str, extra_args: Optional[List[str]] = None,
def_tracing: bool = True):
def_tracing: bool = True, with_profile: bool = False):
return self._raw(url, options=extra_args, with_stats=False,
def_tracing=def_tracing)
def_tracing=def_tracing, with_profile=with_profile)
def http_download(self, urls: List[str],
alpn_proto: Optional[str] = None,
with_stats: bool = True,
with_headers: bool = False,
with_profile: bool = False,
no_save: bool = False,
extra_args: List[str] = None):
if extra_args is None:
@ -373,12 +443,14 @@ class CurlClient:
])
return self._raw(urls, alpn_proto=alpn_proto, options=extra_args,
with_stats=with_stats,
with_headers=with_headers)
with_headers=with_headers,
with_profile=with_profile)
def http_upload(self, urls: List[str], data: str,
alpn_proto: Optional[str] = None,
with_stats: bool = True,
with_headers: bool = False,
with_profile: bool = False,
extra_args: Optional[List[str]] = None):
if extra_args is None:
extra_args = []
@ -391,12 +463,14 @@ class CurlClient:
])
return self._raw(urls, alpn_proto=alpn_proto, options=extra_args,
with_stats=with_stats,
with_headers=with_headers)
with_headers=with_headers,
with_profile=with_profile)
def http_put(self, urls: List[str], data=None, fdata=None,
alpn_proto: Optional[str] = None,
with_stats: bool = True,
with_headers: bool = False,
with_profile: bool = False,
extra_args: Optional[List[str]] = None):
if extra_args is None:
extra_args = []
@ -414,7 +488,8 @@ class CurlClient:
return self._raw(urls, intext=data,
alpn_proto=alpn_proto, options=extra_args,
with_stats=with_stats,
with_headers=with_headers)
with_headers=with_headers,
with_profile=with_profile)
def http_form(self, urls: List[str], form: Dict[str, str],
alpn_proto: Optional[str] = None,
@ -439,7 +514,7 @@ class CurlClient:
def response_file(self, idx: int):
return os.path.join(self._run_dir, f'download_{idx}.data')
def run_direct(self, args, with_stats: bool = False):
def run_direct(self, args, with_stats: bool = False, with_profile: bool = False):
my_args = [self._curl]
if with_stats:
my_args.extend([
@ -449,22 +524,48 @@ class CurlClient:
'-o', 'download.data',
])
my_args.extend(args)
return self._run(args=my_args, with_stats=with_stats)
return self._run(args=my_args, with_stats=with_stats, with_profile=with_profile)
def _run(self, args, intext='', with_stats: bool = False):
def _run(self, args, intext='', with_stats: bool = False, with_profile: bool = True):
self._rmf(self._stdoutfile)
self._rmf(self._stderrfile)
self._rmf(self._headerfile)
start = datetime.now()
started_at = datetime.now()
exception = None
profile = None
try:
with open(self._stdoutfile, 'w') as cout:
with open(self._stderrfile, 'w') as cerr:
p = subprocess.run(args, stderr=cerr, stdout=cout,
cwd=self._run_dir, shell=False,
input=intext.encode() if intext else None,
timeout=self._timeout)
exitcode = p.returncode
if with_profile:
started_at = datetime.now()
end_at = started_at + timedelta(seconds=self._timeout) \
if self._timeout else None
log.info(f'starting: {args}')
p = subprocess.Popen(args, stderr=cerr, stdout=cout,
cwd=self._run_dir, shell=False)
profile = RunProfile(p.pid, started_at, self._run_dir)
if intext is not None and False:
p.communicate(input=intext.encode(), timeout=1)
ptimeout = 0.0
while True:
try:
p.wait(timeout=ptimeout)
break
except subprocess.TimeoutExpired:
if end_at and datetime.now() >= end_at:
p.kill()
raise subprocess.TimeoutExpired(cmd=args, timeout=self._timeout)
profile.sample()
ptimeout = 0.01
exitcode = p.returncode
profile.finish()
log.info(f'done: exit={exitcode}, profile={profile}')
else:
p = subprocess.run(args, stderr=cerr, stdout=cout,
cwd=self._run_dir, shell=False,
input=intext.encode() if intext else None,
timeout=self._timeout)
exitcode = p.returncode
except subprocess.TimeoutExpired:
log.warning(f'Timeout after {self._timeout}s: {args}')
exitcode = -1
@ -473,20 +574,23 @@ class CurlClient:
cerrput = open(self._stderrfile).readlines()
return ExecResult(args=args, exit_code=exitcode, exception=exception,
stdout=coutput, stderr=cerrput,
duration=datetime.now() - start,
with_stats=with_stats)
duration=datetime.now() - started_at,
with_stats=with_stats,
profile=profile)
def _raw(self, urls, intext='', timeout=None, options=None, insecure=False,
alpn_proto: Optional[str] = None,
force_resolve=True,
with_stats=False,
with_headers=True,
def_tracing=True):
def_tracing=True,
with_profile=False):
args = self._complete_args(
urls=urls, timeout=timeout, options=options, insecure=insecure,
alpn_proto=alpn_proto, force_resolve=force_resolve,
with_headers=with_headers, def_tracing=def_tracing)
r = self._run(args, intext=intext, with_stats=with_stats)
r = self._run(args, intext=intext, with_stats=with_stats,
with_profile=with_profile)
if r.exit_code == 0 and with_headers:
self._parse_headerfile(self._headerfile, r=r)
if r.json: