зеркало из https://github.com/mozilla/gecko-dev.git
259 строки
9.7 KiB
Python
259 строки
9.7 KiB
Python
#!/usr/bin/env python
|
|
|
|
# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
|
|
# Use of this source code is governed by a BSD-style license that can be
|
|
# found in the LICENSE file.
|
|
|
|
"""POSIX specific tests. These are implicitly run by test_psutil.py."""
|
|
|
|
import datetime
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
|
|
import psutil
|
|
|
|
from psutil._compat import PY3, callable
|
|
from test_psutil import LINUX, SUNOS, OSX, BSD, PYTHON, POSIX, TRAVIS
|
|
from test_psutil import (get_test_subprocess, skip_on_access_denied,
|
|
retry_before_failing, reap_children, sh, unittest,
|
|
get_kernel_version, wait_for_pid)
|
|
|
|
|
|
def ps(cmd):
|
|
"""Expects a ps command with a -o argument and parse the result
|
|
returning only the value of interest.
|
|
"""
|
|
if not LINUX:
|
|
cmd = cmd.replace(" --no-headers ", " ")
|
|
if SUNOS:
|
|
cmd = cmd.replace("-o command", "-o comm")
|
|
cmd = cmd.replace("-o start", "-o stime")
|
|
p = subprocess.Popen(cmd, shell=1, stdout=subprocess.PIPE)
|
|
output = p.communicate()[0].strip()
|
|
if PY3:
|
|
output = str(output, sys.stdout.encoding)
|
|
if not LINUX:
|
|
output = output.split('\n')[1].strip()
|
|
try:
|
|
return int(output)
|
|
except ValueError:
|
|
return output
|
|
|
|
|
|
@unittest.skipUnless(POSIX, "not a POSIX system")
|
|
class PosixSpecificTestCase(unittest.TestCase):
|
|
"""Compare psutil results against 'ps' command line utility."""
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
cls.pid = get_test_subprocess([PYTHON, "-E", "-O"],
|
|
stdin=subprocess.PIPE).pid
|
|
wait_for_pid(cls.pid)
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
reap_children()
|
|
|
|
# for ps -o arguments see: http://unixhelp.ed.ac.uk/CGI/man-cgi?ps
|
|
|
|
def test_process_parent_pid(self):
|
|
ppid_ps = ps("ps --no-headers -o ppid -p %s" % self.pid)
|
|
ppid_psutil = psutil.Process(self.pid).ppid()
|
|
self.assertEqual(ppid_ps, ppid_psutil)
|
|
|
|
def test_process_uid(self):
|
|
uid_ps = ps("ps --no-headers -o uid -p %s" % self.pid)
|
|
uid_psutil = psutil.Process(self.pid).uids().real
|
|
self.assertEqual(uid_ps, uid_psutil)
|
|
|
|
def test_process_gid(self):
|
|
gid_ps = ps("ps --no-headers -o rgid -p %s" % self.pid)
|
|
gid_psutil = psutil.Process(self.pid).gids().real
|
|
self.assertEqual(gid_ps, gid_psutil)
|
|
|
|
def test_process_username(self):
|
|
username_ps = ps("ps --no-headers -o user -p %s" % self.pid)
|
|
username_psutil = psutil.Process(self.pid).username()
|
|
self.assertEqual(username_ps, username_psutil)
|
|
|
|
@skip_on_access_denied()
|
|
@retry_before_failing()
|
|
def test_process_rss_memory(self):
|
|
# give python interpreter some time to properly initialize
|
|
# so that the results are the same
|
|
time.sleep(0.1)
|
|
rss_ps = ps("ps --no-headers -o rss -p %s" % self.pid)
|
|
rss_psutil = psutil.Process(self.pid).memory_info()[0] / 1024
|
|
self.assertEqual(rss_ps, rss_psutil)
|
|
|
|
@skip_on_access_denied()
|
|
@retry_before_failing()
|
|
def test_process_vsz_memory(self):
|
|
# give python interpreter some time to properly initialize
|
|
# so that the results are the same
|
|
time.sleep(0.1)
|
|
vsz_ps = ps("ps --no-headers -o vsz -p %s" % self.pid)
|
|
vsz_psutil = psutil.Process(self.pid).memory_info()[1] / 1024
|
|
self.assertEqual(vsz_ps, vsz_psutil)
|
|
|
|
def test_process_name(self):
|
|
# use command + arg since "comm" keyword not supported on all platforms
|
|
name_ps = ps("ps --no-headers -o command -p %s" % (
|
|
self.pid)).split(' ')[0]
|
|
# remove path if there is any, from the command
|
|
name_ps = os.path.basename(name_ps).lower()
|
|
name_psutil = psutil.Process(self.pid).name().lower()
|
|
self.assertEqual(name_ps, name_psutil)
|
|
|
|
@unittest.skipIf(OSX or BSD,
|
|
'ps -o start not available')
|
|
def test_process_create_time(self):
|
|
time_ps = ps("ps --no-headers -o start -p %s" % self.pid).split(' ')[0]
|
|
time_psutil = psutil.Process(self.pid).create_time()
|
|
time_psutil_tstamp = datetime.datetime.fromtimestamp(
|
|
time_psutil).strftime("%H:%M:%S")
|
|
# sometimes ps shows the time rounded up instead of down, so we check
|
|
# for both possible values
|
|
round_time_psutil = round(time_psutil)
|
|
round_time_psutil_tstamp = datetime.datetime.fromtimestamp(
|
|
round_time_psutil).strftime("%H:%M:%S")
|
|
self.assertIn(time_ps, [time_psutil_tstamp, round_time_psutil_tstamp])
|
|
|
|
def test_process_exe(self):
|
|
ps_pathname = ps("ps --no-headers -o command -p %s" %
|
|
self.pid).split(' ')[0]
|
|
psutil_pathname = psutil.Process(self.pid).exe()
|
|
try:
|
|
self.assertEqual(ps_pathname, psutil_pathname)
|
|
except AssertionError:
|
|
# certain platforms such as BSD are more accurate returning:
|
|
# "/usr/local/bin/python2.7"
|
|
# ...instead of:
|
|
# "/usr/local/bin/python"
|
|
# We do not want to consider this difference in accuracy
|
|
# an error.
|
|
adjusted_ps_pathname = ps_pathname[:len(ps_pathname)]
|
|
self.assertEqual(ps_pathname, adjusted_ps_pathname)
|
|
|
|
def test_process_cmdline(self):
|
|
ps_cmdline = ps("ps --no-headers -o command -p %s" % self.pid)
|
|
psutil_cmdline = " ".join(psutil.Process(self.pid).cmdline())
|
|
if SUNOS:
|
|
# ps on Solaris only shows the first part of the cmdline
|
|
psutil_cmdline = psutil_cmdline.split(" ")[0]
|
|
self.assertEqual(ps_cmdline, psutil_cmdline)
|
|
|
|
@retry_before_failing()
|
|
def test_pids(self):
|
|
# Note: this test might fail if the OS is starting/killing
|
|
# other processes in the meantime
|
|
if SUNOS:
|
|
cmd = ["ps", "ax"]
|
|
else:
|
|
cmd = ["ps", "ax", "-o", "pid"]
|
|
p = get_test_subprocess(cmd, stdout=subprocess.PIPE)
|
|
output = p.communicate()[0].strip()
|
|
if PY3:
|
|
output = str(output, sys.stdout.encoding)
|
|
pids_ps = []
|
|
for line in output.split('\n')[1:]:
|
|
if line:
|
|
pid = int(line.split()[0].strip())
|
|
pids_ps.append(pid)
|
|
# remove ps subprocess pid which is supposed to be dead in meantime
|
|
pids_ps.remove(p.pid)
|
|
pids_psutil = psutil.pids()
|
|
pids_ps.sort()
|
|
pids_psutil.sort()
|
|
|
|
# on OSX ps doesn't show pid 0
|
|
if OSX and 0 not in pids_ps:
|
|
pids_ps.insert(0, 0)
|
|
|
|
if pids_ps != pids_psutil:
|
|
difference = [x for x in pids_psutil if x not in pids_ps] + \
|
|
[x for x in pids_ps if x not in pids_psutil]
|
|
self.fail("difference: " + str(difference))
|
|
|
|
# for some reason ifconfig -a does not report all interfaces
|
|
# returned by psutil
|
|
@unittest.skipIf(SUNOS, "test not reliable on SUNOS")
|
|
@unittest.skipIf(TRAVIS, "test not reliable on Travis")
|
|
def test_nic_names(self):
|
|
p = subprocess.Popen("ifconfig -a", shell=1, stdout=subprocess.PIPE)
|
|
output = p.communicate()[0].strip()
|
|
if PY3:
|
|
output = str(output, sys.stdout.encoding)
|
|
for nic in psutil.net_io_counters(pernic=True).keys():
|
|
for line in output.split():
|
|
if line.startswith(nic):
|
|
break
|
|
else:
|
|
self.fail(
|
|
"couldn't find %s nic in 'ifconfig -a' output\n%s" % (
|
|
nic, output))
|
|
|
|
@retry_before_failing()
|
|
def test_users(self):
|
|
out = sh("who")
|
|
lines = out.split('\n')
|
|
users = [x.split()[0] for x in lines]
|
|
self.assertEqual(len(users), len(psutil.users()))
|
|
terminals = [x.split()[1] for x in lines]
|
|
for u in psutil.users():
|
|
self.assertTrue(u.name in users, u.name)
|
|
self.assertTrue(u.terminal in terminals, u.terminal)
|
|
|
|
def test_fds_open(self):
|
|
# Note: this fails from time to time; I'm keen on thinking
|
|
# it doesn't mean something is broken
|
|
def call(p, attr):
|
|
args = ()
|
|
attr = getattr(p, name, None)
|
|
if attr is not None and callable(attr):
|
|
if name == 'rlimit':
|
|
args = (psutil.RLIMIT_NOFILE,)
|
|
attr(*args)
|
|
else:
|
|
attr
|
|
|
|
p = psutil.Process(os.getpid())
|
|
failures = []
|
|
ignored_names = ['terminate', 'kill', 'suspend', 'resume', 'nice',
|
|
'send_signal', 'wait', 'children', 'as_dict']
|
|
if LINUX and get_kernel_version() < (2, 6, 36):
|
|
ignored_names.append('rlimit')
|
|
if LINUX and get_kernel_version() < (2, 6, 23):
|
|
ignored_names.append('num_ctx_switches')
|
|
for name in dir(psutil.Process):
|
|
if (name.startswith('_') or name in ignored_names):
|
|
continue
|
|
else:
|
|
try:
|
|
num1 = p.num_fds()
|
|
for x in range(2):
|
|
call(p, name)
|
|
num2 = p.num_fds()
|
|
except psutil.AccessDenied:
|
|
pass
|
|
else:
|
|
if abs(num2 - num1) > 1:
|
|
fail = "failure while processing Process.%s method " \
|
|
"(before=%s, after=%s)" % (name, num1, num2)
|
|
failures.append(fail)
|
|
if failures:
|
|
self.fail('\n' + '\n'.join(failures))
|
|
|
|
|
|
def main():
|
|
test_suite = unittest.TestSuite()
|
|
test_suite.addTest(unittest.makeSuite(PosixSpecificTestCase))
|
|
result = unittest.TextTestRunner(verbosity=2).run(test_suite)
|
|
return result.wasSuccessful()
|
|
|
|
if __name__ == '__main__':
|
|
if not main():
|
|
sys.exit(1)
|