From: Radek Pazdera <rpazdera(a)redhat.com>
This commit introduces a new test - packet assert, that can be used to
test if a specific amount of packets with certain properties were
transfered through some interface.
Packet properties can be verified through the pcap "filter" option or
through greping tcpdump output by using "grep-for" option (grep-for can
be used multiple times.
Here's a quick examle:
<command type="test" value="PacketAssert"
bg_id="1">
<options>
<option name="interface" value="br0" />
<option name="filter" value="dst port 1234" />
<option name="grep_for" value="IP" />
<option name="min" value="1" />
<option name="max" value="5" />
</options>
</command>
<command type="exec" value="sleep 3" />
<command type="intr" value="1" />
This assert evaluates true if one to five IP packets with destination
port of 1234 are transfered through interface br0 within the time period
of sleep (i.e. 3 seconds).
PacketAssert must always be run in background. It is evaluated at the
time of interruption with "intr". It will not evaluate correctly in
case the command is killed (with "kill")!
One example recipe to demonstrate it's capabilities was added as well.
Signed-off-by: Radek Pazdera <rpazdera(a)redhat.com>
---
Tests/TestPacketAssert.py | 140 +++++++++++++++++++++++++++++++++++++
example_recipes/packet_assert.xml | 15 ++++
2 files changed, 155 insertions(+), 0 deletions(-)
create mode 100644 Tests/TestPacketAssert.py
create mode 100644 example_recipes/packet_assert.xml
diff --git a/Tests/TestPacketAssert.py b/Tests/TestPacketAssert.py
new file mode 100644
index 0000000..5f869db
--- /dev/null
+++ b/Tests/TestPacketAssert.py
@@ -0,0 +1,140 @@
+"""
+Test if packets were transfered through some interface correctly.
+This test is using tcpdump.
+
+Copyright 2012 Red Hat, Inc.
+Licensed under the GNU General Public License, version 2 as
+published by the Free Software Foundation; see COPYING for details.
+"""
+
+__author__ = """
+rpazdera(a)redhat.com (Radek Pazdera)
+"""
+
+import logging
+import subprocess
+import re
+import signal
+from Common.TestsCommon import TestGeneric
+
+class TestPacketAssert(TestGeneric):
+ """ Assert for number of incomming/outgoing packets
+ Capturing backend of this class is provided by
+ tcpdump(8).
+ """
+
+ _cmd = ""
+ _tcpdump = None
+ _grep_filters = []
+
+ _min_cond = 1
+ _max_cond = None
+
+ _num_recv = 0
+
+ def _set_interrupt_handler(self):
+ signal.signal(signal.SIGINT, self._interrupt_handler)
+ signal.signal(signal.SIGTERM, self._interrupt_handler)
+
+ def _interrupt_handler(self, signum, frame):
+ """ Kill tcpdump when interrupted """
+ self._tcpdump.terminate()
+
+ def _prepare_grep_filters(self):
+ """ Parse `grep_for' test options """
+ filters = self.get_multi_opt("grep_for")
+
+ for filt in filters:
+ if filt != None:
+ self._grep_filters.append(filt)
+
+ def _prepare_conditions(self):
+ """ Parse `min' and `max' """
+ min_packets = self.get_opt("min")
+ max_packets = self.get_opt("max")
+
+ if min_packets != None:
+ self._min_cond = int(min_packets)
+
+ if max_packets != None:
+ self._max_cond = int(max_packets)
+
+ def _compose_cmd(self):
+ """ Create a command from the recipe options """
+ cmd = ""
+
+ interface = self.get_mopt("interface")
+ pcap_filter = self.get_mopt("filter")
+
+ cmd = "tcpdump -p -nn -i %s \"%s\"" % (interface,
pcap_filter)
+ self._cmd = cmd
+
+ def _execute_tcpdump(self):
+ """ Start tcpdump in the background """
+ cmd = self._cmd
+ proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ self._tcpdump = proc
+
+ def _process_captured_line(self, line):
+ """ Apply filters and see if the packet passed them
"""
+ if len(self._grep_filters):
+ for filt in self._grep_filters:
+ if not re.search(filt, line):
+ return
+
+ self._num_recv += 1
+
+ def _evaluate_results(self):
+ """ Compare results with the conditions """
+ num = self._num_recv
+ if num >= self._min_cond:
+ if self._max_cond != None:
+ return num <= self._max_cond
+ return True
+ return False
+
+ def run(self):
+ self._set_interrupt_handler()
+
+ self._prepare_grep_filters()
+ self._prepare_conditions()
+ self._compose_cmd()
+ self._execute_tcpdump()
+
+ logging.info("Capturing started")
+
+ line = ""
+ tcpdump_output = self._tcpdump.stdout
+ while True:
+ if self._tcpdump.poll() != None:
+ if self._tcpdump.returncode > 0:
+ raise Exception("tcpdump terminated with error")
+ else:
+ break
+
+ try:
+ next_line = tcpdump_output.readline()
+ except IOError: # Interrupted system call
+ continue
+
+ if next_line == "":
+ continue
+
+ next_line = next_line.strip("\n")
+
+ if re.match("[0-9]+\:[0-9]+\:[0-9\.]+", next_line) and line !=
"":
+ self._process_captured_line(line)
+ line = next_line
+ else:
+ line += next_line
+
+ logging.info("Capturing finished. Received %d packets",
self._num_recv)
+ res = {"received": self._num_recv,
+ "min": self._min_cond,
+ "max": self._max_cond}
+
+ if self._evaluate_results():
+ return self.set_pass(res)
+
+ return self.set_fail("PacketAssert failed!", res)
diff --git a/example_recipes/packet_assert.xml b/example_recipes/packet_assert.xml
new file mode 100644
index 0000000..11d20b7
--- /dev/null
+++ b/example_recipes/packet_assert.xml
@@ -0,0 +1,15 @@
+<nettestrecipe>
+ <command_sequence>
+ <command type="test" value="PacketAssert"
bg_id="1">
+ <options>
+ <option name="interface" value="br0" />
+ <option name="filter" value="" />
+ <option name="grep_for" value="IP" />
+ <option name="min" value="1" />
+ <option name="max" value="5" />
+ </options>
+ </command>
+ <command type="exec" value="sleep 3" />
+ <command type="intr" value="1" />
+ </command_sequence>
+</nettestrecipe>
--
1.7.7.6
Show replies by thread