summaryrefslogtreecommitdiff
path: root/python/psutil/test/_posix.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/psutil/test/_posix.py')
-rw-r--r--python/psutil/test/_posix.py258
1 files changed, 258 insertions, 0 deletions
diff --git a/python/psutil/test/_posix.py b/python/psutil/test/_posix.py
new file mode 100644
index 0000000000..e6c56aac3e
--- /dev/null
+++ b/python/psutil/test/_posix.py
@@ -0,0 +1,258 @@
+#!/usr/bin/env python
+
+# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+"""POSIX specific tests. These are implicitly run by test_psutil.py."""
+
+import datetime
+import os
+import subprocess
+import sys
+import time
+
+import psutil
+
+from psutil._compat import PY3, callable
+from test_psutil import LINUX, SUNOS, OSX, BSD, PYTHON, POSIX, TRAVIS
+from test_psutil import (get_test_subprocess, skip_on_access_denied,
+ retry_before_failing, reap_children, sh, unittest,
+ get_kernel_version, wait_for_pid)
+
+
+def ps(cmd):
+ """Expects a ps command with a -o argument and parse the result
+ returning only the value of interest.
+ """
+ if not LINUX:
+ cmd = cmd.replace(" --no-headers ", " ")
+ if SUNOS:
+ cmd = cmd.replace("-o command", "-o comm")
+ cmd = cmd.replace("-o start", "-o stime")
+ p = subprocess.Popen(cmd, shell=1, stdout=subprocess.PIPE)
+ output = p.communicate()[0].strip()
+ if PY3:
+ output = str(output, sys.stdout.encoding)
+ if not LINUX:
+ output = output.split('\n')[1].strip()
+ try:
+ return int(output)
+ except ValueError:
+ return output
+
+
+@unittest.skipUnless(POSIX, "not a POSIX system")
+class PosixSpecificTestCase(unittest.TestCase):
+ """Compare psutil results against 'ps' command line utility."""
+
+ @classmethod
+ def setUpClass(cls):
+ cls.pid = get_test_subprocess([PYTHON, "-E", "-O"],
+ stdin=subprocess.PIPE).pid
+ wait_for_pid(cls.pid)
+
+ @classmethod
+ def tearDownClass(cls):
+ reap_children()
+
+ # for ps -o arguments see: http://unixhelp.ed.ac.uk/CGI/man-cgi?ps
+
+ def test_process_parent_pid(self):
+ ppid_ps = ps("ps --no-headers -o ppid -p %s" % self.pid)
+ ppid_psutil = psutil.Process(self.pid).ppid()
+ self.assertEqual(ppid_ps, ppid_psutil)
+
+ def test_process_uid(self):
+ uid_ps = ps("ps --no-headers -o uid -p %s" % self.pid)
+ uid_psutil = psutil.Process(self.pid).uids().real
+ self.assertEqual(uid_ps, uid_psutil)
+
+ def test_process_gid(self):
+ gid_ps = ps("ps --no-headers -o rgid -p %s" % self.pid)
+ gid_psutil = psutil.Process(self.pid).gids().real
+ self.assertEqual(gid_ps, gid_psutil)
+
+ def test_process_username(self):
+ username_ps = ps("ps --no-headers -o user -p %s" % self.pid)
+ username_psutil = psutil.Process(self.pid).username()
+ self.assertEqual(username_ps, username_psutil)
+
+ @skip_on_access_denied()
+ @retry_before_failing()
+ def test_process_rss_memory(self):
+ # give python interpreter some time to properly initialize
+ # so that the results are the same
+ time.sleep(0.1)
+ rss_ps = ps("ps --no-headers -o rss -p %s" % self.pid)
+ rss_psutil = psutil.Process(self.pid).memory_info()[0] / 1024
+ self.assertEqual(rss_ps, rss_psutil)
+
+ @skip_on_access_denied()
+ @retry_before_failing()
+ def test_process_vsz_memory(self):
+ # give python interpreter some time to properly initialize
+ # so that the results are the same
+ time.sleep(0.1)
+ vsz_ps = ps("ps --no-headers -o vsz -p %s" % self.pid)
+ vsz_psutil = psutil.Process(self.pid).memory_info()[1] / 1024
+ self.assertEqual(vsz_ps, vsz_psutil)
+
+ def test_process_name(self):
+ # use command + arg since "comm" keyword not supported on all platforms
+ name_ps = ps("ps --no-headers -o command -p %s" % (
+ self.pid)).split(' ')[0]
+ # remove path if there is any, from the command
+ name_ps = os.path.basename(name_ps).lower()
+ name_psutil = psutil.Process(self.pid).name().lower()
+ self.assertEqual(name_ps, name_psutil)
+
+ @unittest.skipIf(OSX or BSD,
+ 'ps -o start not available')
+ def test_process_create_time(self):
+ time_ps = ps("ps --no-headers -o start -p %s" % self.pid).split(' ')[0]
+ time_psutil = psutil.Process(self.pid).create_time()
+ time_psutil_tstamp = datetime.datetime.fromtimestamp(
+ time_psutil).strftime("%H:%M:%S")
+ # sometimes ps shows the time rounded up instead of down, so we check
+ # for both possible values
+ round_time_psutil = round(time_psutil)
+ round_time_psutil_tstamp = datetime.datetime.fromtimestamp(
+ round_time_psutil).strftime("%H:%M:%S")
+ self.assertIn(time_ps, [time_psutil_tstamp, round_time_psutil_tstamp])
+
+ def test_process_exe(self):
+ ps_pathname = ps("ps --no-headers -o command -p %s" %
+ self.pid).split(' ')[0]
+ psutil_pathname = psutil.Process(self.pid).exe()
+ try:
+ self.assertEqual(ps_pathname, psutil_pathname)
+ except AssertionError:
+ # certain platforms such as BSD are more accurate returning:
+ # "/usr/local/bin/python2.7"
+ # ...instead of:
+ # "/usr/local/bin/python"
+ # We do not want to consider this difference in accuracy
+ # an error.
+ adjusted_ps_pathname = ps_pathname[:len(ps_pathname)]
+ self.assertEqual(ps_pathname, adjusted_ps_pathname)
+
+ def test_process_cmdline(self):
+ ps_cmdline = ps("ps --no-headers -o command -p %s" % self.pid)
+ psutil_cmdline = " ".join(psutil.Process(self.pid).cmdline())
+ if SUNOS:
+ # ps on Solaris only shows the first part of the cmdline
+ psutil_cmdline = psutil_cmdline.split(" ")[0]
+ self.assertEqual(ps_cmdline, psutil_cmdline)
+
+ @retry_before_failing()
+ def test_pids(self):
+ # Note: this test might fail if the OS is starting/killing
+ # other processes in the meantime
+ if SUNOS:
+ cmd = ["ps", "ax"]
+ else:
+ cmd = ["ps", "ax", "-o", "pid"]
+ p = get_test_subprocess(cmd, stdout=subprocess.PIPE)
+ output = p.communicate()[0].strip()
+ if PY3:
+ output = str(output, sys.stdout.encoding)
+ pids_ps = []
+ for line in output.split('\n')[1:]:
+ if line:
+ pid = int(line.split()[0].strip())
+ pids_ps.append(pid)
+ # remove ps subprocess pid which is supposed to be dead in meantime
+ pids_ps.remove(p.pid)
+ pids_psutil = psutil.pids()
+ pids_ps.sort()
+ pids_psutil.sort()
+
+ # on OSX ps doesn't show pid 0
+ if OSX and 0 not in pids_ps:
+ pids_ps.insert(0, 0)
+
+ if pids_ps != pids_psutil:
+ difference = [x for x in pids_psutil if x not in pids_ps] + \
+ [x for x in pids_ps if x not in pids_psutil]
+ self.fail("difference: " + str(difference))
+
+ # for some reason ifconfig -a does not report all interfaces
+ # returned by psutil
+ @unittest.skipIf(SUNOS, "test not reliable on SUNOS")
+ @unittest.skipIf(TRAVIS, "test not reliable on Travis")
+ def test_nic_names(self):
+ p = subprocess.Popen("ifconfig -a", shell=1, stdout=subprocess.PIPE)
+ output = p.communicate()[0].strip()
+ if PY3:
+ output = str(output, sys.stdout.encoding)
+ for nic in psutil.net_io_counters(pernic=True).keys():
+ for line in output.split():
+ if line.startswith(nic):
+ break
+ else:
+ self.fail(
+ "couldn't find %s nic in 'ifconfig -a' output\n%s" % (
+ nic, output))
+
+ @retry_before_failing()
+ def test_users(self):
+ out = sh("who")
+ lines = out.split('\n')
+ users = [x.split()[0] for x in lines]
+ self.assertEqual(len(users), len(psutil.users()))
+ terminals = [x.split()[1] for x in lines]
+ for u in psutil.users():
+ self.assertTrue(u.name in users, u.name)
+ self.assertTrue(u.terminal in terminals, u.terminal)
+
+ def test_fds_open(self):
+ # Note: this fails from time to time; I'm keen on thinking
+ # it doesn't mean something is broken
+ def call(p, attr):
+ args = ()
+ attr = getattr(p, name, None)
+ if attr is not None and callable(attr):
+ if name == 'rlimit':
+ args = (psutil.RLIMIT_NOFILE,)
+ attr(*args)
+ else:
+ attr
+
+ p = psutil.Process(os.getpid())
+ failures = []
+ ignored_names = ['terminate', 'kill', 'suspend', 'resume', 'nice',
+ 'send_signal', 'wait', 'children', 'as_dict']
+ if LINUX and get_kernel_version() < (2, 6, 36):
+ ignored_names.append('rlimit')
+ if LINUX and get_kernel_version() < (2, 6, 23):
+ ignored_names.append('num_ctx_switches')
+ for name in dir(psutil.Process):
+ if (name.startswith('_') or name in ignored_names):
+ continue
+ else:
+ try:
+ num1 = p.num_fds()
+ for x in range(2):
+ call(p, name)
+ num2 = p.num_fds()
+ except psutil.AccessDenied:
+ pass
+ else:
+ if abs(num2 - num1) > 1:
+ fail = "failure while processing Process.%s method " \
+ "(before=%s, after=%s)" % (name, num1, num2)
+ failures.append(fail)
+ if failures:
+ self.fail('\n' + '\n'.join(failures))
+
+
+def main():
+ test_suite = unittest.TestSuite()
+ test_suite.addTest(unittest.makeSuite(PosixSpecificTestCase))
+ result = unittest.TextTestRunner(verbosity=2).run(test_suite)
+ return result.wasSuccessful()
+
+if __name__ == '__main__':
+ if not main():
+ sys.exit(1)