diff options
Diffstat (limited to 'python/psutil/test/_posix.py')
-rw-r--r-- | python/psutil/test/_posix.py | 258 |
1 files changed, 258 insertions, 0 deletions
diff --git a/python/psutil/test/_posix.py b/python/psutil/test/_posix.py new file mode 100644 index 0000000000..e6c56aac3e --- /dev/null +++ b/python/psutil/test/_posix.py @@ -0,0 +1,258 @@ +#!/usr/bin/env python + +# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. +# Use of this source code is governed by a BSD-style license that can be +# found in the LICENSE file. + +"""POSIX specific tests. These are implicitly run by test_psutil.py.""" + +import datetime +import os +import subprocess +import sys +import time + +import psutil + +from psutil._compat import PY3, callable +from test_psutil import LINUX, SUNOS, OSX, BSD, PYTHON, POSIX, TRAVIS +from test_psutil import (get_test_subprocess, skip_on_access_denied, + retry_before_failing, reap_children, sh, unittest, + get_kernel_version, wait_for_pid) + + +def ps(cmd): + """Expects a ps command with a -o argument and parse the result + returning only the value of interest. + """ + if not LINUX: + cmd = cmd.replace(" --no-headers ", " ") + if SUNOS: + cmd = cmd.replace("-o command", "-o comm") + cmd = cmd.replace("-o start", "-o stime") + p = subprocess.Popen(cmd, shell=1, stdout=subprocess.PIPE) + output = p.communicate()[0].strip() + if PY3: + output = str(output, sys.stdout.encoding) + if not LINUX: + output = output.split('\n')[1].strip() + try: + return int(output) + except ValueError: + return output + + +@unittest.skipUnless(POSIX, "not a POSIX system") +class PosixSpecificTestCase(unittest.TestCase): + """Compare psutil results against 'ps' command line utility.""" + + @classmethod + def setUpClass(cls): + cls.pid = get_test_subprocess([PYTHON, "-E", "-O"], + stdin=subprocess.PIPE).pid + wait_for_pid(cls.pid) + + @classmethod + def tearDownClass(cls): + reap_children() + + # for ps -o arguments see: http://unixhelp.ed.ac.uk/CGI/man-cgi?ps + + def test_process_parent_pid(self): + ppid_ps = ps("ps --no-headers -o ppid -p %s" % self.pid) + ppid_psutil = psutil.Process(self.pid).ppid() + self.assertEqual(ppid_ps, ppid_psutil) + + def test_process_uid(self): + uid_ps = ps("ps --no-headers -o uid -p %s" % self.pid) + uid_psutil = psutil.Process(self.pid).uids().real + self.assertEqual(uid_ps, uid_psutil) + + def test_process_gid(self): + gid_ps = ps("ps --no-headers -o rgid -p %s" % self.pid) + gid_psutil = psutil.Process(self.pid).gids().real + self.assertEqual(gid_ps, gid_psutil) + + def test_process_username(self): + username_ps = ps("ps --no-headers -o user -p %s" % self.pid) + username_psutil = psutil.Process(self.pid).username() + self.assertEqual(username_ps, username_psutil) + + @skip_on_access_denied() + @retry_before_failing() + def test_process_rss_memory(self): + # give python interpreter some time to properly initialize + # so that the results are the same + time.sleep(0.1) + rss_ps = ps("ps --no-headers -o rss -p %s" % self.pid) + rss_psutil = psutil.Process(self.pid).memory_info()[0] / 1024 + self.assertEqual(rss_ps, rss_psutil) + + @skip_on_access_denied() + @retry_before_failing() + def test_process_vsz_memory(self): + # give python interpreter some time to properly initialize + # so that the results are the same + time.sleep(0.1) + vsz_ps = ps("ps --no-headers -o vsz -p %s" % self.pid) + vsz_psutil = psutil.Process(self.pid).memory_info()[1] / 1024 + self.assertEqual(vsz_ps, vsz_psutil) + + def test_process_name(self): + # use command + arg since "comm" keyword not supported on all platforms + name_ps = ps("ps --no-headers -o command -p %s" % ( + self.pid)).split(' ')[0] + # remove path if there is any, from the command + name_ps = os.path.basename(name_ps).lower() + name_psutil = psutil.Process(self.pid).name().lower() + self.assertEqual(name_ps, name_psutil) + + @unittest.skipIf(OSX or BSD, + 'ps -o start not available') + def test_process_create_time(self): + time_ps = ps("ps --no-headers -o start -p %s" % self.pid).split(' ')[0] + time_psutil = psutil.Process(self.pid).create_time() + time_psutil_tstamp = datetime.datetime.fromtimestamp( + time_psutil).strftime("%H:%M:%S") + # sometimes ps shows the time rounded up instead of down, so we check + # for both possible values + round_time_psutil = round(time_psutil) + round_time_psutil_tstamp = datetime.datetime.fromtimestamp( + round_time_psutil).strftime("%H:%M:%S") + self.assertIn(time_ps, [time_psutil_tstamp, round_time_psutil_tstamp]) + + def test_process_exe(self): + ps_pathname = ps("ps --no-headers -o command -p %s" % + self.pid).split(' ')[0] + psutil_pathname = psutil.Process(self.pid).exe() + try: + self.assertEqual(ps_pathname, psutil_pathname) + except AssertionError: + # certain platforms such as BSD are more accurate returning: + # "/usr/local/bin/python2.7" + # ...instead of: + # "/usr/local/bin/python" + # We do not want to consider this difference in accuracy + # an error. + adjusted_ps_pathname = ps_pathname[:len(ps_pathname)] + self.assertEqual(ps_pathname, adjusted_ps_pathname) + + def test_process_cmdline(self): + ps_cmdline = ps("ps --no-headers -o command -p %s" % self.pid) + psutil_cmdline = " ".join(psutil.Process(self.pid).cmdline()) + if SUNOS: + # ps on Solaris only shows the first part of the cmdline + psutil_cmdline = psutil_cmdline.split(" ")[0] + self.assertEqual(ps_cmdline, psutil_cmdline) + + @retry_before_failing() + def test_pids(self): + # Note: this test might fail if the OS is starting/killing + # other processes in the meantime + if SUNOS: + cmd = ["ps", "ax"] + else: + cmd = ["ps", "ax", "-o", "pid"] + p = get_test_subprocess(cmd, stdout=subprocess.PIPE) + output = p.communicate()[0].strip() + if PY3: + output = str(output, sys.stdout.encoding) + pids_ps = [] + for line in output.split('\n')[1:]: + if line: + pid = int(line.split()[0].strip()) + pids_ps.append(pid) + # remove ps subprocess pid which is supposed to be dead in meantime + pids_ps.remove(p.pid) + pids_psutil = psutil.pids() + pids_ps.sort() + pids_psutil.sort() + + # on OSX ps doesn't show pid 0 + if OSX and 0 not in pids_ps: + pids_ps.insert(0, 0) + + if pids_ps != pids_psutil: + difference = [x for x in pids_psutil if x not in pids_ps] + \ + [x for x in pids_ps if x not in pids_psutil] + self.fail("difference: " + str(difference)) + + # for some reason ifconfig -a does not report all interfaces + # returned by psutil + @unittest.skipIf(SUNOS, "test not reliable on SUNOS") + @unittest.skipIf(TRAVIS, "test not reliable on Travis") + def test_nic_names(self): + p = subprocess.Popen("ifconfig -a", shell=1, stdout=subprocess.PIPE) + output = p.communicate()[0].strip() + if PY3: + output = str(output, sys.stdout.encoding) + for nic in psutil.net_io_counters(pernic=True).keys(): + for line in output.split(): + if line.startswith(nic): + break + else: + self.fail( + "couldn't find %s nic in 'ifconfig -a' output\n%s" % ( + nic, output)) + + @retry_before_failing() + def test_users(self): + out = sh("who") + lines = out.split('\n') + users = [x.split()[0] for x in lines] + self.assertEqual(len(users), len(psutil.users())) + terminals = [x.split()[1] for x in lines] + for u in psutil.users(): + self.assertTrue(u.name in users, u.name) + self.assertTrue(u.terminal in terminals, u.terminal) + + def test_fds_open(self): + # Note: this fails from time to time; I'm keen on thinking + # it doesn't mean something is broken + def call(p, attr): + args = () + attr = getattr(p, name, None) + if attr is not None and callable(attr): + if name == 'rlimit': + args = (psutil.RLIMIT_NOFILE,) + attr(*args) + else: + attr + + p = psutil.Process(os.getpid()) + failures = [] + ignored_names = ['terminate', 'kill', 'suspend', 'resume', 'nice', + 'send_signal', 'wait', 'children', 'as_dict'] + if LINUX and get_kernel_version() < (2, 6, 36): + ignored_names.append('rlimit') + if LINUX and get_kernel_version() < (2, 6, 23): + ignored_names.append('num_ctx_switches') + for name in dir(psutil.Process): + if (name.startswith('_') or name in ignored_names): + continue + else: + try: + num1 = p.num_fds() + for x in range(2): + call(p, name) + num2 = p.num_fds() + except psutil.AccessDenied: + pass + else: + if abs(num2 - num1) > 1: + fail = "failure while processing Process.%s method " \ + "(before=%s, after=%s)" % (name, num1, num2) + failures.append(fail) + if failures: + self.fail('\n' + '\n'.join(failures)) + + +def main(): + test_suite = unittest.TestSuite() + test_suite.addTest(unittest.makeSuite(PosixSpecificTestCase)) + result = unittest.TextTestRunner(verbosity=2).run(test_suite) + return result.wasSuccessful() + +if __name__ == '__main__': + if not main(): + sys.exit(1) |