Author: gnichols
Date: 2009-12-22 17:58:11 +0000 (Tue, 22 Dec 2009)
New Revision: 271
Added:
trunk/v7/command-popen2.py
Log:
484657 - hts uses code deprecated in python 2.6
Added: trunk/v7/command-popen2.py
===================================================================
--- trunk/v7/command-popen2.py (rev 0)
+++ trunk/v7/command-popen2.py 2009-12-22 17:58:11 UTC (rev 271)
@@ -0,0 +1,275 @@
+# Copyright (c) 2006 Red Hat, Inc. All rights reserved. This copyrighted material
+# is made available to anyone wishing to use, modify, copy, or
+# redistribute it subject to the terms and conditions of the GNU General
+# Public License v.2.
+#
+# This program is distributed in the hope that it will be useful, but WITHOUT ANY
+# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+# PARTICULAR PURPOSE. See the GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+#
+# Author: Greg Nichols
+#
+# Overview:
+# The Command class is a wrapper for shell commands that performs
+# validation and error checking. Example usage can be seen in the
+# __main__ self test function at the end of this file.
+
+
+import os,re, commands, exceptions, popen2, string, sys
+
+
+class Command:
+
+ def __init__(self, command):
+ """ Creates a Command object that wraps the shell command
+ via the supplied string, similar to os.system. The
+ constuctor does not actually execute the command.
+ """
+ self.command = command
+ self.regex = None
+ self.singleLine = True
+ self.regexGroup = None
+ self.output = None
+ self.errors = None
+ self.returnValue = 0
+ self.signal = 0
+
+ def _run(self):
+ commandPipe = popen2.Popen3(self.command, capturestderr=True)
+ self.output = commandPipe.fromchild.readlines()
+ result = commandPipe.wait()
+ self.returnValue = (result >> 8) & 0xFF
+ self.signal = result & 0xFF
+ self.errors = None
+
+ if commandPipe.childerr:
+ self.errors = commandPipe.childerr.readlines()
+ # always print error messages
+ for line in self.errors:
+ sys.stderr.write( line )
+ sys.stderr.flush()
+
+ def _checkErrors(self):
+
+ if self.errors and len(self.errors) > 0:
+ raise V7CommandException(self, "has output on stderr")
+ if self.returnValue != 0:
+ # if error returned, show stdout whether echo or run was called
+ for line in self.output:
+ sys.stdout.write( line )
+ sys.stdout.flush()
+ raise V7CommandException(self, "returned %d" % self.returnValue)
+
+ def run(self):
+ """ This method runs the command to produce an action. Any output
+ to stdout is not echoed to the caller. """
+ self._run()
+ self._checkErrors()
+ return 0
+
+
+ def echo(self):
+ """ output is equivalent to run, except that the commands'
output
+ is echo'd on stdout. """
+ self._run()
+ self._checkErrors()
+ for line in self.output:
+ sys.stdout.write( line )
+ return 0
+
+ def echoIgnoreErrors(self):
+ """ like echo, except don't raise exception on errors
"""
+ self._run()
+ for line in self.output:
+ sys.stdout.write( line )
+ return 0
+
+ def printOutput(self):
+ for line in self.output:
+ sys.stdout.write( line )
+ sys.stdout.flush()
+
+ def printErrors(self):
+ for line in self.errors:
+ sys.stderr.write( line )
+ sys.stderr.flush()
+
+
+ def _getString(self, regex=None, regexGroup=None, singleLine=True, returnList=False,
ignoreErrors=False):
+ """
+ Get the string from the command's output. With default parameters
+ it returns the command's single line of output as a string.
+
+ If singleLine is True, and multiple lines are found in the output,
+ V7CommandException is raised.
+
+ The regex parameter allows the output to be searched for a regular
+ expression match. If no regexGroup is supplied, the entire pattern
+ match is returned. The regexGroup parameter allows named
+ substrings of the match to be returned if regex has named groups
+ via the "(?P<name>)" syntax. If no match is found,
+ V7CommandException is raised.
+ """
+ self.regex = regex
+ self.singleLine = singleLine
+ self.regexGroup = regexGroup
+
+ self._run()
+
+ if self.singleLine:
+ if len(self.output) > 1:
+ raise V7CommandException(self, "Found %u lines of output, expected
1" % len(self.output))
+
+ line = self.output[0].strip()
+ if not self.regex:
+ return line
+ # otherwise, try the regex
+ pattern = re.compile(self.regex)
+ match = pattern.match(line)
+ if match:
+ if self.regexGroup:
+ return match.group(self.regexGroup)
+ # otherwise, no group, return the whole line
+ return line
+
+ # no regex match try a grep-style match
+ if not self.regexGroup:
+ match = pattern.search(line)
+ if match:
+ return match.group()
+
+ # otherwise
+ raise V7CommandException(self, "no match for regular expression %s"
% self.regex)
+
+
+ #otherwise, multi-line or single-line regex
+ if not self.regex:
+ raise V7CommandError(self, "no regular expression set for multi-line
command")
+ pattern = re.compile(self.regex)
+ result = None
+ if returnList:
+ result = list()
+ for line in self.output:
+ if self.regexGroup:
+ match = pattern.match(line)
+ if match:
+ if self.regexGroup:
+ if returnList:
+ result.append(match.group(self.regexGroup))
+ else:
+ return match.group(self.regexGroup)
+ else:
+ # otherwise, return the matching line
+ match = pattern.search(line)
+ if match:
+ if returnList:
+ result.append(match.group())
+ else:
+ return match.group()
+ if result:
+ return result
+
+ # otherwise
+ raise V7CommandException(self, "no match for regular expression %s" %
self.regex)
+
+ def getStringList(self, regex=None, regexGroup=None, ignoreErrors=False):
+ """ like getString, except return a complete list of matches on
multiple lines."""
+ result = self._getString(regex, regexGroup, singleLine=False, returnList=True)
+ if not ignoreErrors:
+ self._checkErrors()
+ return result
+
+ def getString(self, regex=None, regexGroup=None, singleLine=True,
ignoreErrors=False):
+ """ like getString, except return a complete list of matches on
multiple lines."""
+ result = self._getString(regex, regexGroup, singleLine, returnList=False)
+ if not ignoreErrors:
+ self._checkErrors()
+ return result
+
+
+ def getInteger(self, regex=None, regexGroup=None, singleLine=True):
+ """
+ getInteger is the same as getString, except the output is required to
+ be an Integer.
+ """
+ value = self.getString(regex, regexGroup, singleLine)
+ return string.atoi(value)
+
+class V7CommandException(exceptions.Exception):
+ def __init__(self, command, message):
+ self.message = message
+ self.command = command
+
+ def __str__(self):
+ return "\"%s\" %s" % (self.command.command, self.message)
+
+
+if __name__ == "__main__":
+
+ try:
+ # positive test: run
+ command = Command("ls -a")
+ print "ls -a:"
+ command.run()
+
+ # positive test: simple match
+ command = Command("date")
+ print "is it 2008?"
+ print command.getString("2008")
+
+ # positive test: regex on single line
+ command = Command("date")
+ print "day of the week via date: %s" %
command.getString(regex="^(?P<day>[MTWF][a-z]).*$",
regexGroup="day")
+
+ # positive test: regex on multiline
+ command = Command("who")
+ print "a device via who: %s" %
command.getString(regex="^(?P<user>[a-z]+) (?P<device>[a-z/]+[0-9]*)[
\t]*(?P<date>2008-\d+-\d+).*$", regexGroup="device",
singleLine=False)
+
+ #positive test: integer - simple match
+ command = Command("du .")
+ print "simple du: %u" % command.getInteger(regex="\d+",
singleLine=False)
+
+ #positive test: integer
+ command = Command("df .")
+ print "blocks via df: %u" % command.getInteger(regex="^[
\t]+(?P<blocks>[0-9]+[ \t]+).*$", regexGroup="blocks",
singleLine=False)
+
+ except V7CommandException, e:
+ print e
+
+ # negative test: fail simple match
+ try:
+ print "is it 1999?:"
+ command = Command("date")
+ command.getString(regex="1999")
+ except V7CommandException, e:
+ print e
+
+ # negative test: return value
+ try:
+ print "negative test: return value:"
+ command = Command("exit 1")
+ print "call exit(1)" % command.getString()
+ except V7CommandException, e:
+ print e
+
+ # negative test: expect single line, get multiple
+ try:
+ print "negative test: more than one line of output"
+ command = Command("who")
+ print "who: %s" % command.getString()
+ except V7CommandException, e:
+ print e
+
+ # negative test: expect single line, get multiple
+ try:
+ print "negative test: output on stderr"
+ command = Command("echo \"boguserror\" >&2; echo
\"boguserror\"")
+ print "error echo: %s" %
command.getString(regex="boguserror", singleLine=False)
+ except V7CommandException, e:
+ print e
+
+