Federico Simoncelli has uploaded a new change for review.
Change subject: utils: add CommandStream class ......................................................................
utils: add CommandStream class
Change-Id: Ie015368bb9c5992e5c73a149277c59fc4ffbd570 Signed-off-by: Federico Simoncelli fsimonce@redhat.com --- M lib/vdsm/utils.py M tests/utilsTests.py 2 files changed, 188 insertions(+), 0 deletions(-)
git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/09/33909/1
diff --git a/lib/vdsm/utils.py b/lib/vdsm/utils.py index 244f0e2..4b0ccb7 100644 --- a/lib/vdsm/utils.py +++ b/lib/vdsm/utils.py @@ -313,6 +313,90 @@ timeout = max(0, endtime - time.time())
+class CommandStream(object): + def __init__(self, command, stdoutcb, stderrcb, cwd=None, + deathSignal=0): + self.command = command + self.returncode = None + + self.child = CPopen(self.command, cwd=cwd, close_fds=True, + deathSignal=deathSignal) + + self.epoll = select.epoll() + + self.iocb = { + self.child.stdout.fileno(): stdoutcb, + self.child.stderr.fileno(): stderrcb, + } + + for fd in self.iocb.keys(): + self.epoll.register(fd, select.EPOLLIN) + + def terminate(self): + self.child.terminate() + + def kill(self): + self.child.kill() + + def write(self, data): + self.child.stdin.write(data) + + def flush(self): + self.child.stdin.flush() + + def close(self): + self.child.stdin.close() + + def _epoll_input(self, fileno): + self.iocb[fileno](os.read(fileno, io.DEFAULT_BUFFER_SIZE)) + + def _epoll_event(self, fileno): + self.epoll.unregister(fileno) + del self.iocb[fileno] + + def _epoll_timeout(self, timeout): + fdevents = NoIntrPoll(self.epoll.poll, timeout) + + for fileno, event in fdevents: + if event & select.EPOLLIN: + self._epoll_input(fileno) + elif event & (select.EPOLLHUP | select.EPOLLERR): + self._epoll_event(fileno) + # Trying to collect the child status in case the + # file descriptor was closed because the process + # terminated. + self.returncode = self.child.poll() + + def wait(self, timeout=None): + if timeout is None: + epoll_remaining = -1 + else: + endtime = os.times()[4] + timeout + + while self.returncode is None: + if timeout is not None: + epoll_remaining = endtime - os.times()[4] + if epoll_remaining <= 0: + break + + if len(self.iocb): + self._epoll_timeout(epoll_remaining) + else: + # This is a busy-loop taken from issue5673, and + # python 3.4 still uses this implementation. + # A smarter solution would be using signalfd or + # sigtimedwait but they don't seem to mix well + # with multithreading (especially under heavy + # load: tens of children from tens of threads). + # Anyway we reach this only when both stdout and + # stderr are closed, which means that in most of + # the cases the child is about to die. + time.sleep(0.0005) + self.returncode = self.child.poll() + + return self.returncode + + class AsyncProc(object): """ AsyncProc is a funky class. It wraps a standard subprocess.Popen diff --git a/tests/utilsTests.py b/tests/utilsTests.py index 6e66c02..c279f04 100644 --- a/tests/utilsTests.py +++ b/tests/utilsTests.py @@ -22,8 +22,12 @@ import contextlib import errno import logging +import operator +import signal import sys import threading + +from contextlib import contextmanager
from testlib import VdsmTestCase as TestCaseBase from testlib import permutations, expandPermutations @@ -634,3 +638,103 @@
def test_empty(self): self.assertEquals(utils._list2cmdline([]), "") + + +class CommandStreamTests(TestCaseBase): + + @contextmanager + def assertElapsed(self, limit): + start = os.times()[4] + + yield + + elapsed = os.times()[4] - start + + if elapsed < limit: + raise AssertionError("Operation time: %s" % elapsed) + + def assertNoOutput(self, data): + raise AssertionError("Unexpected data: " + repr(data)) + + def test_output(self): + text = "Hello World" + received = bytearray() + + def recv_stdout(buffer): + # cannot use received += buffer with a variable + # defined in the parent function. + operator.iadd(received, buffer) + + p = utils.CommandStream(["echo", "-n", text], + recv_stdout, + self.assertNoOutput) + + retcode = p.wait() + + self.assertEqual(retcode, 0) + self.assertEqual(text, str(received)) + + def test_write(self): + text = "Hello World" + received = bytearray() + + def recv_stdout(buffer): + # cannot use received += buffer with a variable + # defined in the parent function. + operator.iadd(received, buffer) + + p = utils.CommandStream(["cat"], recv_stdout, + self.assertNoOutput) + + p.write(text) + p.flush() + p.close() + + retcode = p.wait() + + self.assertEqual(retcode, 0) + self.assertEqual(text, str(received)) + + def test_timeout(self): + p = utils.CommandStream(["sleep", "3"], + self.assertNoOutput, + self.assertNoOutput) + + with self.assertElapsed(2): + retcode = p.wait(2) + + self.assertEqual(retcode, None) + + retcode = p.wait() + self.assertEqual(retcode, 0) + + def test_terminate(self): + p = utils.CommandStream(["sleep", "2"], + self.assertNoOutput, + self.assertNoOutput) + + p.terminate() + + retcode = p.wait() + self.assertEqual(retcode, -signal.SIGTERM) + + def test_kill(self): + p = utils.CommandStream(["sleep", "2"], + self.assertNoOutput, + self.assertNoOutput) + + p.kill() + + retcode = p.wait() + self.assertEqual(retcode, -signal.SIGKILL) + + def test_early_close(self): + p = utils.CommandStream(["bash", "-c", + "exec 1>&-; exec 2>&-; exec sleep 2"], + self.assertNoOutput, + self.assertNoOutput) + + with self.assertElapsed(2): + retcode = p.wait() + + self.assertEqual(retcode, 0)