Nir Soffer has uploaded a new change for review.
Change subject: lib: Revert and refine error handling in tmap()
......................................................................
lib: Revert and refine error handling in tmap()
In commit 2b7155b696 (lib: Simplify and generalize concurrent.tmap()),
we simplified error handling by returning a named tuple with function
results. This turned out less useful then the original error handling.
This patch returns the previous error handling:
- Functions passed to tmap() should not raise - if they raise, this is
considered a bug in the function.
- The last error is raised by tmap() instead of returning the result.
This make it easier to fail loudly for unexpected errors.
- The original exception is re-raised now with the original traceback.
- Error handling is documented properly now
Previously you had to make sure function raises to signal failures:
def func():
try:
code that should not fail...
code that may fail...
code that should not fail...
except ExpectedError:
log.error(...)
raise
except Exception:
log.exception(...)
raise
results = concurrent.tmap(func, values)
if not all(r.succeeded for r in results):
...
Returning the result as is lets us have nicer code:
def func():
code that should not fail...
try:
code that may fail...
except ExpectedError:
log.error(...)
return False
code that should not fail...
return True
succeeded = concurrent.tmap(func, values)
if not all(succeeded):
...
We can ignore unexpected errors, since tmap() will log them and fail
loudly. We can also minimize try except block for expected errors.
Change-Id: I0154b28ff7822c63e77181bbbf444c712bd0c31e
Signed-off-by: Nir Soffer <nsoffer(a)redhat.com>
---
M lib/vdsm/concurrent.py
M tests/concurrentTests.py
2 files changed, 45 insertions(+), 19 deletions(-)
git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/11/39211/1
diff --git a/lib/vdsm/concurrent.py b/lib/vdsm/concurrent.py
index 64e072d..5498052 100644
--- a/lib/vdsm/concurrent.py
+++ b/lib/vdsm/concurrent.py
@@ -18,22 +18,42 @@
# Refer to the README and COPYING files for full details of the license
#
+import logging
import threading
-from collections import namedtuple
-
-
-Result = namedtuple("Result", ["succeeded", "value"])
+import sys
def tmap(func, iterable):
+ """
+ Run func with arguments from iterable in multiple threads, a returning the
+ output in order of arguments.
+
+ func should not raise exceptions - we consider this a bug in func, and will
+ fail the call and re-raise the exception in the caller thread.
+
+ Expected exceptions should be handled in func. If the caller likes to
+ handle the error later, func should return it:
+
+ def func(value):
+ try:
+ return something(value)
+ except ExpectedError as e:
+ return e
+
+ Unexpected exceptions should not be handled, as they are logged in the
+ worker threads and re-raised in the caller thread. If multiple excpetions
+ raised, only the last one will be re-raised in the caller thread.
+ """
args = list(iterable)
results = [None] * len(args)
+ error = [None]
def worker(i, f, arg):
try:
- results[i] = Result(True, f(arg))
- except Exception as e:
- results[i] = Result(False, e)
+ results[i] = f(arg)
+ except Exception:
+ error[0] = sys.exc_info()
+ logging.exception("Unhandled exception in tmap worker thread")
threads = []
for i, arg in enumerate(args):
@@ -45,4 +65,8 @@
for t in threads:
t.join()
+ if error[0] is not None:
+ t, v, tb = error[0]
+ raise t, v, tb
+
return results
diff --git a/tests/concurrentTests.py b/tests/concurrentTests.py
index 307e397..5c0646b 100644
--- a/tests/concurrentTests.py
+++ b/tests/concurrentTests.py
@@ -26,13 +26,16 @@
from vdsm import concurrent
+class Error(Exception):
+ pass
+
+
class TMapTests(VdsmTestCase):
def test_results(self):
values = tuple(range(10))
results = concurrent.tmap(lambda x: x, values)
- expected = [concurrent.Result(True, x) for x in values]
- self.assertEqual(results, expected)
+ self.assertEqual(results, list(values))
def test_results_order(self):
def func(x):
@@ -40,8 +43,7 @@
return x
values = tuple(random.random() * 0.1 for x in range(10))
results = concurrent.tmap(func, values)
- expected = [concurrent.Result(True, x) for x in values]
- self.assertEqual(results, expected)
+ self.assertEqual(results, list(values))
def test_concurrency(self):
start = time.time()
@@ -49,12 +51,12 @@
elapsed = time.time() - start
self.assertTrue(0.1 < elapsed < 0.2)
- def test_error(self):
- error = RuntimeError("No result for you!")
-
+ def test_raise_last_error(self):
def func(x):
- raise error
-
- results = concurrent.tmap(func, range(10))
- expected = [concurrent.Result(False, error)] * 10
- self.assertEqual(results, expected)
+ raise Error(x)
+ try:
+ concurrent.tmap(func, (1, 2, 3))
+ except Error as e:
+ self.assertEqual(e.args, (3,))
+ else:
+ self.fail("Exception was not raised")
--
To view, visit
https://gerrit.ovirt.org/39211
To unsubscribe, visit
https://gerrit.ovirt.org/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I0154b28ff7822c63e77181bbbf444c712bd0c31e
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Nir Soffer <nsoffer(a)redhat.com>