Yaniv Bronhaim has uploaded a new change for review.
Change subject: Adding threads limitation to misc.tmap ......................................................................
Adding threads limitation to misc.tmap
Using threads queue to keep the threads order and limit threads count
I moved the function code next to itmap for readability. Currently noone calls to tmap function, so I don't care to change and fix it now to avoid future bugs.
Exceptions that related to the queue usage will be raised as part of the function's exceptions.
Signed-off-by: Yaniv Bronhaim ybronhei@redhat.com Change-Id: I07845bfd78b9215e8994ac2ebe46a7ff78c85625 --- M vdsm/storage/misc.py 1 file changed, 59 insertions(+), 36 deletions(-)
git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/58/8858/1
diff --git a/vdsm/storage/misc.py b/vdsm/storage/misc.py index 426d181..f1a4581 100644 --- a/vdsm/storage/misc.py +++ b/vdsm/storage/misc.py @@ -1085,42 +1085,6 @@ return tuple(res)
-def tmap(func, iterable): - resultsDict = {} - error = [None] - - def wrapper(f, arg, index): - try: - resultsDict[index] = f(arg) - except Exception, e: - # We will throw the last error received - # we can only throw one error, and the - # last one is as good as any. This shouldn't - # happen. Wrapped methods should not throw - # exceptions, if this happens it's a bug - log.error("tmap caught an unexpected error", exc_info=True) - error[0] = e - resultsDict[index] = None - - threads = [] - for i, arg in enumerate(iterable): - t = threading.Thread(target=wrapper, args=(func, arg, i)) - threads.append(t) - t.start() - - for t in threads: - t.join() - - results = [None] * len(resultsDict) - for i, result in resultsDict.iteritems(): - results[i] = result - - if error[0] is not None: - raise error[0] - - return tuple(results) - - def getfds(): return [int(fd) for fd in os.listdir("/proc/self/fd")]
@@ -1251,6 +1215,65 @@ raise exception
+def tmap(func, iterable, maxthreads=UNLIMITED_THREADS): + """ + Make an iterator that computes the function using arguments from the + iterable by running each operation in a different thread in specific + order. + maxthreads stands for maximum threads that we can initiate simultaneosly. + If we reached to max threads the function waits for thread to + finish before initiate the next one. + """ + if maxthreads < 1 and maxthreads != UNLIMITED_THREADS: + raise ValueError("Wrong input passed to function tmap: %s", maxthreads) + + resultsDict = {} + error = [None] + + def wrapper(f, arg, index): + try: + resultsDict[index] = f(arg) + except Exception, e: + # We will throw the last error received + # we can only throw one error, and the + # last one is as good as any. This shouldn't + # happen. Wrapped methods should not throw + # exceptions, if this happens it's a bug + log.error("tmap caught an unexpected error", exc_info=True) + error[0] = e + resultsDict[index] = None + + if maxthreads != UNLIMITED_THREADS: + threadsQueue = Queue.Queue(maxthreads) + else: + threadsQueue = Queue.Queue() + + for i, arg in enumerate(iterable): + if not threadsQueue.full(): + t = threading.Thread(target=wrapper, args=(func, arg, i)) + threadsQueue.put_nowait(t) + t.start() + else: + # waits for the first unfinished thread in list to finish if we + # have already initiate all possible thread's slots (maxthreads) + if threadsQueue.empty(): + raise RuntimeError("No thread initieated") + else: + threadsQueue.get_nowait().join() + + while not threadsQueue.empty(): + threadsQueue.get(False).join() + + results = [None] * len(resultsDict) + for i, result in resultsDict.iteritems(): + results[i] = result + + if error[0] is not None: + raise error[0] + + return tuple(results) + + def itmap(func, iterable, maxthreads=UNLIMITED_THREADS): """ Make an iterator that computes the function using
-- To view, visit http://gerrit.ovirt.org/8858 To unsubscribe, visit http://gerrit.ovirt.org/settings
Gerrit-MessageType: newchange Gerrit-Change-Id: I07845bfd78b9215e8994ac2ebe46a7ff78c85625 Gerrit-PatchSet: 1 Gerrit-Project: vdsm Gerrit-Branch: master Gerrit-Owner: Yaniv Bronhaim ybronhei@redhat.com
Shu Ming has posted comments on this change.
Change subject: Adding threads limitation to misc.tmap ......................................................................
Patch Set 2: I would prefer that you didn't submit this
(2 inline comments)
.................................................... Commit Message Line 8: Line 9: Using threads queue to keep the threads order and limit threads count Line 10: Line 11: I moved the function code next to itmap for readability. Line 12: Currently noone calls to tmap function, so I don't care to change and tmap--->tmap() Line 13: fix it now to avoid future bugs. Line 14: Line 15: Exceptions that related to the queue usage will be raised as part of Line 16: the function's exceptions.
.................................................... File vdsm/storage/misc.py Line 1227: if maxthreads < 1 and maxthreads != UNLIMITED_THREADS: Line 1228: raise ValueError("Wrong input passed to function tmap: %s", maxthreads) Line 1229: Line 1230: resultsDict = {} Line 1231: error = [None] I am not sure if we need a list here for error. In the code below, only e[0] is referenced. Why not make it a simple variable? Line 1232: Line 1233: def wrapper(f, arg, index): Line 1234: try: Line 1235: resultsDict[index] = f(arg)
-- To view, visit http://gerrit.ovirt.org/8858 To unsubscribe, visit http://gerrit.ovirt.org/settings
Gerrit-MessageType: comment Gerrit-Change-Id: I07845bfd78b9215e8994ac2ebe46a7ff78c85625 Gerrit-PatchSet: 2 Gerrit-Project: vdsm Gerrit-Branch: master Gerrit-Owner: Yaniv Bronhaim ybronhei@redhat.com Gerrit-Reviewer: Saggi Mizrahi smizrahi@redhat.com Gerrit-Reviewer: Shu Ming shuming@linux.vnet.ibm.com Gerrit-Reviewer: Yaniv Bronhaim ybronhei@redhat.com
Zhou Zheng Sheng has posted comments on this change.
Change subject: Adding threads limitation to misc.tmap ......................................................................
Patch Set 2: I would prefer that you didn't submit this
(3 inline comments)
.................................................... File vdsm/storage/misc.py Line 1227: if maxthreads < 1 and maxthreads != UNLIMITED_THREADS: Line 1228: raise ValueError("Wrong input passed to function tmap: %s", maxthreads) Line 1229: Line 1230: resultsDict = {} Line 1231: error = [None] List is used here mainly because itself is thread-safe. Do I get your idea, Yaniv? If yes, a comment can be added here for explanation. Line 1232: Line 1233: def wrapper(f, arg, index): Line 1234: try: Line 1235: resultsDict[index] = f(arg)
Line 1252: if threadsQueue.full(): Line 1253: # Wait for the first unfinished thread in list to finish if we Line 1254: # have already initiate all possible thread's slots (maxthreads) Line 1255: if threadsQueue.empty(): Line 1256: raise RuntimeError("Queue is empty") I notice that threadsQueue is accessed only by the current thread, it's not possible to be full() and empty() at the same time. Why do you add this check? Line 1257: else: Line 1258: threadsQueue.get_nowait().join() Line 1259: t = threading.Thread(target=wrapper, args=(func, arg, i)) Line 1260: threadsQueue.put_nowait(t)
Line 1267: for i, result in resultsDict.iteritems(): Line 1268: results[i] = result Line 1269: Line 1270: if error[0] is not None: Line 1271: raise error[0] Maybe you can exchange the order of code snippet in line 1270-1271 with line 1266-1268. Line 1272: Line 1273: return tuple(results) Line 1274: Line 1275:
-- To view, visit http://gerrit.ovirt.org/8858 To unsubscribe, visit http://gerrit.ovirt.org/settings
Gerrit-MessageType: comment Gerrit-Change-Id: I07845bfd78b9215e8994ac2ebe46a7ff78c85625 Gerrit-PatchSet: 2 Gerrit-Project: vdsm Gerrit-Branch: master Gerrit-Owner: Yaniv Bronhaim ybronhei@redhat.com Gerrit-Reviewer: Saggi Mizrahi smizrahi@redhat.com Gerrit-Reviewer: Shu Ming shuming@linux.vnet.ibm.com Gerrit-Reviewer: Yaniv Bronhaim ybronhei@redhat.com Gerrit-Reviewer: Zhou Zheng Sheng zhshzhou@linux.vnet.ibm.com
ShaoHe Feng has posted comments on this change.
Change subject: Adding threads limitation to misc.tmap ......................................................................
Patch Set 2: I would prefer that you didn't submit this
(1 inline comment)
.................................................... Commit Message Line 8: Line 9: Using threads queue to keep the threads order and limit threads count Line 10: Line 11: I moved the function code next to itmap for readability. Line 12: Currently noone calls to tmap function, so I don't care to change and do you meas none calls to tmap function? noone --> none but there is one call misc.itmap in ./vdsm/storage/fileSD.py. Line 13: fix it now to avoid future bugs. Line 14: Line 15: Exceptions that related to the queue usage will be raised as part of Line 16: the function's exceptions.
-- To view, visit http://gerrit.ovirt.org/8858 To unsubscribe, visit http://gerrit.ovirt.org/settings
Gerrit-MessageType: comment Gerrit-Change-Id: I07845bfd78b9215e8994ac2ebe46a7ff78c85625 Gerrit-PatchSet: 2 Gerrit-Project: vdsm Gerrit-Branch: master Gerrit-Owner: Yaniv Bronhaim ybronhei@redhat.com Gerrit-Reviewer: Saggi Mizrahi smizrahi@redhat.com Gerrit-Reviewer: ShaoHe Feng shaohef@linux.vnet.ibm.com Gerrit-Reviewer: Shu Ming shuming@linux.vnet.ibm.com Gerrit-Reviewer: Yaniv Bronhaim ybronhei@redhat.com Gerrit-Reviewer: Zhou Zheng Sheng zhshzhou@linux.vnet.ibm.com
Yaniv Bronhaim has posted comments on this change.
Change subject: Adding threads limitation to misc.tmap ......................................................................
Patch Set 2: (4 inline comments)
.................................................... Commit Message Line 8: Line 9: Using threads queue to keep the threads order and limit threads count Line 10: Line 11: I moved the function code next to itmap for readability. Line 12: Currently noone calls to tmap function, so I don't care to change and itmap is different and not depended on tmap Line 13: fix it now to avoid future bugs. Line 14: Line 15: Exceptions that related to the queue usage will be raised as part of Line 16: the function's exceptions.
.................................................... File vdsm/storage/misc.py Line 1227: if maxthreads < 1 and maxthreads != UNLIMITED_THREADS: Line 1228: raise ValueError("Wrong input passed to function tmap: %s", maxthreads) Line 1229: Line 1230: resultsDict = {} Line 1231: error = [None] I didn't implement this, i just added limitation of threads to this scope. I agree that there is no need for a list here, the operation on it is only assignment and it is thread safe. I'll change it. Line 1232: Line 1233: def wrapper(f, arg, index): Line 1234: try: Line 1235: resultsDict[index] = f(arg)
Line 1252: if threadsQueue.full(): Line 1253: # Wait for the first unfinished thread in list to finish if we Line 1254: # have already initiate all possible thread's slots (maxthreads) Line 1255: if threadsQueue.empty(): Line 1256: raise RuntimeError("Queue is empty") agreed.. It's a double check that we can avoid, mainly because I check that maxthreads can't be less than 1 Line 1257: else: Line 1258: threadsQueue.get_nowait().join() Line 1259: t = threading.Thread(target=wrapper, args=(func, arg, i)) Line 1260: threadsQueue.put_nowait(t)
Line 1267: for i, result in resultsDict.iteritems(): Line 1268: results[i] = result Line 1269: Line 1270: if error[0] is not None: Line 1271: raise error[0] Also agree, still I didn't want to change this implementation much because i wanted to make a small patch for 'maxthreads limitation', but again I see that we haven't used this function yet so i'll change it. Thank you. Line 1272: Line 1273: return tuple(results) Line 1274: Line 1275:
-- To view, visit http://gerrit.ovirt.org/8858 To unsubscribe, visit http://gerrit.ovirt.org/settings
Gerrit-MessageType: comment Gerrit-Change-Id: I07845bfd78b9215e8994ac2ebe46a7ff78c85625 Gerrit-PatchSet: 2 Gerrit-Project: vdsm Gerrit-Branch: master Gerrit-Owner: Yaniv Bronhaim ybronhei@redhat.com Gerrit-Reviewer: Saggi Mizrahi smizrahi@redhat.com Gerrit-Reviewer: ShaoHe Feng shaohef@linux.vnet.ibm.com Gerrit-Reviewer: Shu Ming shuming@linux.vnet.ibm.com Gerrit-Reviewer: Yaniv Bronhaim ybronhei@redhat.com Gerrit-Reviewer: Zhou Zheng Sheng zhshzhou@linux.vnet.ibm.com
Dan Kenigsberg has posted comments on this change.
Change subject: Adding threads limitation to misc.tmap ......................................................................
Patch Set 2: (1 inline comment)
.................................................... File vdsm/storage/misc.py Line 1227: if maxthreads < 1 and maxthreads != UNLIMITED_THREADS: Line 1228: raise ValueError("Wrong input passed to function tmap: %s", maxthreads) Line 1229: Line 1230: resultsDict = {} Line 1231: error = [None] the error=[None] list is important here - it is a famous python trick tom emulate pointers. otherwise, you cannot assign to it from within the function.
let it be a lesson for you not to move functions needlessly ;-) Here, it is very hard to pinpoint where are your changes, and what was the original. Line 1232: Line 1233: def wrapper(f, arg, index): Line 1234: try: Line 1235: resultsDict[index] = f(arg)
-- To view, visit http://gerrit.ovirt.org/8858 To unsubscribe, visit http://gerrit.ovirt.org/settings
Gerrit-MessageType: comment Gerrit-Change-Id: I07845bfd78b9215e8994ac2ebe46a7ff78c85625 Gerrit-PatchSet: 2 Gerrit-Project: vdsm Gerrit-Branch: master Gerrit-Owner: Yaniv Bronhaim ybronhei@redhat.com Gerrit-Reviewer: Dan Kenigsberg danken@redhat.com Gerrit-Reviewer: Saggi Mizrahi smizrahi@redhat.com Gerrit-Reviewer: ShaoHe Feng shaohef@linux.vnet.ibm.com Gerrit-Reviewer: Shu Ming shuming@linux.vnet.ibm.com Gerrit-Reviewer: Yaniv Bronhaim ybronhei@redhat.com Gerrit-Reviewer: Zhou Zheng Sheng zhshzhou@linux.vnet.ibm.com
ShaoHe Feng has posted comments on this change.
Change subject: Adding threads limitation to misc.tmap ......................................................................
Patch Set 3: I would prefer that you didn't submit this
(1 inline comment)
.................................................... File tests/miscTests.py Line 184: def testLimit(self): Line 185: def dummy(arg): Line 186: return arg Line 187: data = frozenset([1, 2, 3, 4, 5]) Line 188: ret = frozenset(misc.tmap(dummy, data, 2)) the return of tmap is in order, different with itamp, so why need frozenset? Line 189: self.assertEquals(data, ret) Line 190: Line 191: def testErrMethod(self): Line 192: exceptionStr = ("It's time to kick ass and chew bubble gum... " +
-- To view, visit http://gerrit.ovirt.org/8858 To unsubscribe, visit http://gerrit.ovirt.org/settings
Gerrit-MessageType: comment Gerrit-Change-Id: I07845bfd78b9215e8994ac2ebe46a7ff78c85625 Gerrit-PatchSet: 3 Gerrit-Project: vdsm Gerrit-Branch: master Gerrit-Owner: Yaniv Bronhaim ybronhei@redhat.com Gerrit-Reviewer: Dan Kenigsberg danken@redhat.com Gerrit-Reviewer: Saggi Mizrahi smizrahi@redhat.com Gerrit-Reviewer: ShaoHe Feng shaohef@linux.vnet.ibm.com Gerrit-Reviewer: Shu Ming shuming@linux.vnet.ibm.com Gerrit-Reviewer: Yaniv Bronhaim ybronhei@redhat.com Gerrit-Reviewer: Zhou Zheng Sheng zhshzhou@linux.vnet.ibm.com
Yaniv Bronhaim has posted comments on this change.
Change subject: Adding threads limitation to misc.tmap ......................................................................
Patch Set 3: (1 inline comment)
.................................................... File tests/miscTests.py Line 184: def testLimit(self): Line 185: def dummy(arg): Line 186: return arg Line 187: data = frozenset([1, 2, 3, 4, 5]) Line 188: ret = frozenset(misc.tmap(dummy, data, 2)) The use of a frozenset instead of set means that the set values won't be changed during the code .. maybe you meant why do i need set at for.. i supposed to use a list or a tuple to verify that the values returned with the same order Line 189: self.assertEquals(data, ret) Line 190: Line 191: def testErrMethod(self): Line 192: exceptionStr = ("It's time to kick ass and chew bubble gum... " +
-- To view, visit http://gerrit.ovirt.org/8858 To unsubscribe, visit http://gerrit.ovirt.org/settings
Gerrit-MessageType: comment Gerrit-Change-Id: I07845bfd78b9215e8994ac2ebe46a7ff78c85625 Gerrit-PatchSet: 3 Gerrit-Project: vdsm Gerrit-Branch: master Gerrit-Owner: Yaniv Bronhaim ybronhei@redhat.com Gerrit-Reviewer: Dan Kenigsberg danken@redhat.com Gerrit-Reviewer: Saggi Mizrahi smizrahi@redhat.com Gerrit-Reviewer: ShaoHe Feng shaohef@linux.vnet.ibm.com Gerrit-Reviewer: Shu Ming shuming@linux.vnet.ibm.com Gerrit-Reviewer: Yaniv Bronhaim ybronhei@redhat.com Gerrit-Reviewer: Zhou Zheng Sheng zhshzhou@linux.vnet.ibm.com
Dan Kenigsberg has posted comments on this change.
Change subject: Adding threads limitation to misc.tmap ......................................................................
Patch Set 4: (1 inline comment)
.................................................... File vdsm/storage/misc.py Line 1214: if exception is not None: Line 1215: raise exception Line 1216: Line 1217: Line 1218: def tmap(func, iterable, maxthreads=UNLIMITED_THREADS): please be a sport and keep the function in-place to simplify the review process? Line 1219: """ Line 1220: Make an iterator that computes the function using arguments from the Line 1221: iterable by running each operation in a different thread in specific Line 1222: order.
-- To view, visit http://gerrit.ovirt.org/8858 To unsubscribe, visit http://gerrit.ovirt.org/settings
Gerrit-MessageType: comment Gerrit-Change-Id: I07845bfd78b9215e8994ac2ebe46a7ff78c85625 Gerrit-PatchSet: 4 Gerrit-Project: vdsm Gerrit-Branch: master Gerrit-Owner: Yaniv Bronhaim ybronhei@redhat.com Gerrit-Reviewer: Dan Kenigsberg danken@redhat.com Gerrit-Reviewer: Saggi Mizrahi smizrahi@redhat.com Gerrit-Reviewer: ShaoHe Feng shaohef@linux.vnet.ibm.com Gerrit-Reviewer: Shu Ming shuming@linux.vnet.ibm.com Gerrit-Reviewer: Yaniv Bronhaim ybronhei@redhat.com Gerrit-Reviewer: Zhou Zheng Sheng zhshzhou@linux.vnet.ibm.com
Zhou Zheng Sheng has posted comments on this change.
Change subject: Adding threads limitation to misc.tmap ......................................................................
Patch Set 7: I would prefer that you didn't submit this
(1 inline comment)
Just one suggestion. The rest of the code looks good.
.................................................... File vdsm/storage/misc.py Line 1097: if maxthreads < 1 and maxthreads != UNLIMITED_THREADS: Line 1098: raise ValueError("Wrong input passed to function tmap: %s", maxthreads) Line 1099: Line 1100: resultsDict = {} Line 1101: tmap.error = None I think tmap.error is like a static variable in C. If there are two parallel calls to tmap, they will override each other's tmap.error.
I am for the "error = [None]" trick. It just needs a simple comment. Line 1102: Line 1103: def wrapper(f, arg, index): Line 1104: try: Line 1105: resultsDict[index] = f(arg)
-- To view, visit http://gerrit.ovirt.org/8858 To unsubscribe, visit http://gerrit.ovirt.org/settings
Gerrit-MessageType: comment Gerrit-Change-Id: I07845bfd78b9215e8994ac2ebe46a7ff78c85625 Gerrit-PatchSet: 7 Gerrit-Project: vdsm Gerrit-Branch: master Gerrit-Owner: Yaniv Bronhaim ybronhei@redhat.com Gerrit-Reviewer: Dan Kenigsberg danken@redhat.com Gerrit-Reviewer: Saggi Mizrahi smizrahi@redhat.com Gerrit-Reviewer: ShaoHe Feng shaohef@linux.vnet.ibm.com Gerrit-Reviewer: Shu Ming shuming@linux.vnet.ibm.com Gerrit-Reviewer: Yaniv Bronhaim ybronhei@redhat.com Gerrit-Reviewer: Zhou Zheng Sheng zhshzhou@linux.vnet.ibm.com
Yaniv Bronhaim has posted comments on this change.
Change subject: Adding threads limitation to misc.tmap ......................................................................
Patch Set 7: (1 inline comment)
.................................................... File vdsm/storage/misc.py Line 1097: if maxthreads < 1 and maxthreads != UNLIMITED_THREADS: Line 1098: raise ValueError("Wrong input passed to function tmap: %s", maxthreads) Line 1099: Line 1100: resultsDict = {} Line 1101: tmap.error = None You right! I can also create a namespace here... like: tmapNS = namespace() and use tmapNS.error to keep this variable's scope anytime you call this function. how is that sound? is it easier to understand the list thing than creating a namespace? Line 1102: Line 1103: def wrapper(f, arg, index): Line 1104: try: Line 1105: resultsDict[index] = f(arg)
-- To view, visit http://gerrit.ovirt.org/8858 To unsubscribe, visit http://gerrit.ovirt.org/settings
Gerrit-MessageType: comment Gerrit-Change-Id: I07845bfd78b9215e8994ac2ebe46a7ff78c85625 Gerrit-PatchSet: 7 Gerrit-Project: vdsm Gerrit-Branch: master Gerrit-Owner: Yaniv Bronhaim ybronhei@redhat.com Gerrit-Reviewer: Dan Kenigsberg danken@redhat.com Gerrit-Reviewer: Saggi Mizrahi smizrahi@redhat.com Gerrit-Reviewer: ShaoHe Feng shaohef@linux.vnet.ibm.com Gerrit-Reviewer: Shu Ming shuming@linux.vnet.ibm.com Gerrit-Reviewer: Yaniv Bronhaim ybronhei@redhat.com Gerrit-Reviewer: Zhou Zheng Sheng zhshzhou@linux.vnet.ibm.com
Zhou Zheng Sheng has posted comments on this change.
Change subject: Adding threads limitation to misc.tmap ......................................................................
Patch Set 7: (1 inline comment)
.................................................... File vdsm/storage/misc.py Line 1097: if maxthreads < 1 and maxthreads != UNLIMITED_THREADS: Line 1098: raise ValueError("Wrong input passed to function tmap: %s", maxthreads) Line 1099: Line 1100: resultsDict = {} Line 1101: tmap.error = None Both ways are good. I like the list trick, it's concise, just needs a small piece of comment, such as "# a trick for enabling wrapper write to the error variable in outer namespace"
Can you use "resultsDict['error']" to store the error, then you can get rid of the list, and there will be no need for extra comment. Line 1102: Line 1103: def wrapper(f, arg, index): Line 1104: try: Line 1105: resultsDict[index] = f(arg)
-- To view, visit http://gerrit.ovirt.org/8858 To unsubscribe, visit http://gerrit.ovirt.org/settings
Gerrit-MessageType: comment Gerrit-Change-Id: I07845bfd78b9215e8994ac2ebe46a7ff78c85625 Gerrit-PatchSet: 7 Gerrit-Project: vdsm Gerrit-Branch: master Gerrit-Owner: Yaniv Bronhaim ybronhei@redhat.com Gerrit-Reviewer: Dan Kenigsberg danken@redhat.com Gerrit-Reviewer: Saggi Mizrahi smizrahi@redhat.com Gerrit-Reviewer: ShaoHe Feng shaohef@linux.vnet.ibm.com Gerrit-Reviewer: Shu Ming shuming@linux.vnet.ibm.com Gerrit-Reviewer: Yaniv Bronhaim ybronhei@redhat.com Gerrit-Reviewer: Zhou Zheng Sheng zhshzhou@linux.vnet.ibm.com
Yaniv Bronhaim has posted comments on this change.
Change subject: Adding threads limitation to misc.tmap ......................................................................
Patch Set 7: (1 inline comment)
.................................................... File vdsm/storage/misc.py Line 1097: if maxthreads < 1 and maxthreads != UNLIMITED_THREADS: Line 1098: raise ValueError("Wrong input passed to function tmap: %s", maxthreads) Line 1099: Line 1100: resultsDict = {} Line 1101: tmap.error = None I can't, later we iterate between all resultsDict items and use them as results. I'll get the list trick back and explain by comment as you suggested. thanks Line 1102: Line 1103: def wrapper(f, arg, index): Line 1104: try: Line 1105: resultsDict[index] = f(arg)
-- To view, visit http://gerrit.ovirt.org/8858 To unsubscribe, visit http://gerrit.ovirt.org/settings
Gerrit-MessageType: comment Gerrit-Change-Id: I07845bfd78b9215e8994ac2ebe46a7ff78c85625 Gerrit-PatchSet: 7 Gerrit-Project: vdsm Gerrit-Branch: master Gerrit-Owner: Yaniv Bronhaim ybronhei@redhat.com Gerrit-Reviewer: Dan Kenigsberg danken@redhat.com Gerrit-Reviewer: Saggi Mizrahi smizrahi@redhat.com Gerrit-Reviewer: ShaoHe Feng shaohef@linux.vnet.ibm.com Gerrit-Reviewer: Shu Ming shuming@linux.vnet.ibm.com Gerrit-Reviewer: Yaniv Bronhaim ybronhei@redhat.com Gerrit-Reviewer: Zhou Zheng Sheng zhshzhou@linux.vnet.ibm.com
Shu Ming has posted comments on this change.
Change subject: Adding threads limitation to misc.tmap ......................................................................
Patch Set 8: Looks good to me, but someone else must approve
It looks good to me, but I still get two questions. I) It seems that misc.tmap() and misc.itamp() are only implementation different after this patch. Why do we need two copys? II) It seems that misc.tmap() isn't called in anywhere.
-- To view, visit http://gerrit.ovirt.org/8858 To unsubscribe, visit http://gerrit.ovirt.org/settings
Gerrit-MessageType: comment Gerrit-Change-Id: I07845bfd78b9215e8994ac2ebe46a7ff78c85625 Gerrit-PatchSet: 8 Gerrit-Project: vdsm Gerrit-Branch: master Gerrit-Owner: Yaniv Bronhaim ybronhei@redhat.com Gerrit-Reviewer: Dan Kenigsberg danken@redhat.com Gerrit-Reviewer: Saggi Mizrahi smizrahi@redhat.com Gerrit-Reviewer: ShaoHe Feng shaohef@linux.vnet.ibm.com Gerrit-Reviewer: Shu Ming shuming@linux.vnet.ibm.com Gerrit-Reviewer: Yaniv Bronhaim ybronhei@redhat.com Gerrit-Reviewer: Zhou Zheng Sheng zhshzhou@linux.vnet.ibm.com
Dan Kenigsberg has posted comments on this change.
Change subject: Adding threads limitation to misc.tmap ......................................................................
Patch Set 8: I would prefer that you didn't submit this
I agree with Shu Ming - if we would like to keep tmap (which is a question of its own right) - we'd better implement it as a wrapper around itmap. Copying most of itmap code seems like a bad practice.
-- To view, visit http://gerrit.ovirt.org/8858 To unsubscribe, visit http://gerrit.ovirt.org/settings
Gerrit-MessageType: comment Gerrit-Change-Id: I07845bfd78b9215e8994ac2ebe46a7ff78c85625 Gerrit-PatchSet: 8 Gerrit-Project: vdsm Gerrit-Branch: master Gerrit-Owner: Yaniv Bronhaim ybronhei@redhat.com Gerrit-Reviewer: Dan Kenigsberg danken@redhat.com Gerrit-Reviewer: Saggi Mizrahi smizrahi@redhat.com Gerrit-Reviewer: ShaoHe Feng shaohef@linux.vnet.ibm.com Gerrit-Reviewer: Shu Ming shuming@linux.vnet.ibm.com Gerrit-Reviewer: Yaniv Bronhaim ybronhei@redhat.com Gerrit-Reviewer: Zhou Zheng Sheng zhshzhou@linux.vnet.ibm.com
Yaniv Bronhaim has posted comments on this change.
Change subject: Adding threads limitation to misc.tmap ......................................................................
Patch Set 8:
Not sure about that, tmap returns after all threads are done or raise exception if one of them fails.
In itmap we act different, we yeild each time only one thread result, and if its exception, the caller should check the value and distinguish that.
tmap much more intuitive, keeping the order, and process all operations before returning (wait with join on all threads). Nobody uses tmap, but we see a call to itmap that wants to get each finished thread as fast as possible.
The implementation is too different to reuse: 1. itmap puts the exception inside the queue 2. tmap reads and keep args order by using the queue 3. tmap waits on all threads 4. itmap yield result instead returning all together
I don't know if we will ever see a use in tmap, but reuse doesn't seem helpful here. Unless we change tmap to yeild the result as itmap does, and treat exception as return value, then we can reuse itmap code.
-- To view, visit http://gerrit.ovirt.org/8858 To unsubscribe, visit http://gerrit.ovirt.org/settings
Gerrit-MessageType: comment Gerrit-Change-Id: I07845bfd78b9215e8994ac2ebe46a7ff78c85625 Gerrit-PatchSet: 8 Gerrit-Project: vdsm Gerrit-Branch: master Gerrit-Owner: Yaniv Bronhaim ybronhei@redhat.com Gerrit-Reviewer: Dan Kenigsberg danken@redhat.com Gerrit-Reviewer: Saggi Mizrahi smizrahi@redhat.com Gerrit-Reviewer: ShaoHe Feng shaohef@linux.vnet.ibm.com Gerrit-Reviewer: Shu Ming shuming@linux.vnet.ibm.com Gerrit-Reviewer: Yaniv Bronhaim ybronhei@redhat.com Gerrit-Reviewer: Zhou Zheng Sheng zhshzhou@linux.vnet.ibm.com
Zhou Zheng Sheng has posted comments on this change.
Change subject: Adding threads limitation to misc.tmap ......................................................................
Patch Set 8: Looks good to me, but someone else must approve
Yaniv, your thoughts is very reasonable. The current implementation looks good to me. I find it's hard to reuse itmap unless we make some modifications to it. I can propose a way to reuse itmap. I am not good at written English, so if you do not mind, I can explain the idea with code.
Firstly, we shoud let wrapper function in itmap produce an index with result.
def wrapper(ind, value): try: respQueue.put((ind, func(value))) except Exception, e: respQueue.put((ind, e))
Secondly, in itmap, we should provide the the index to the wrapper.
for ind, arg in enumerate(iterable): t = threading.Thread(target=wrapper, args=(ind, arg,)) t.start()
At last, in tmap, we can get the index and value from itmap as follow,
def tmap(func, iterable, maxthreads=UNLIMITED_THREADS): ex = None resultDict = {} for ind, r in itmap(func, iterable, maxthreads): if isinstance(r, Exception): ex = r else resultDict[ind] = result if ex is not None: raise ex results = [None] * len(resultsDict) for i, result in resultsDict.iteritems(): results[i] = result return results
Untested, please treat as pseudocode.
-- To view, visit http://gerrit.ovirt.org/8858 To unsubscribe, visit http://gerrit.ovirt.org/settings
Gerrit-MessageType: comment Gerrit-Change-Id: I07845bfd78b9215e8994ac2ebe46a7ff78c85625 Gerrit-PatchSet: 8 Gerrit-Project: vdsm Gerrit-Branch: master Gerrit-Owner: Yaniv Bronhaim ybronhei@redhat.com Gerrit-Reviewer: Dan Kenigsberg danken@redhat.com Gerrit-Reviewer: Saggi Mizrahi smizrahi@redhat.com Gerrit-Reviewer: ShaoHe Feng shaohef@linux.vnet.ibm.com Gerrit-Reviewer: Shu Ming shuming@linux.vnet.ibm.com Gerrit-Reviewer: Yaniv Bronhaim ybronhei@redhat.com Gerrit-Reviewer: Zhou Zheng Sheng zhshzhou@linux.vnet.ibm.com
Yaniv Bronhaim has posted comments on this change.
Change subject: Adding threads limitation to misc.tmap ......................................................................
Patch Set 8:
This changes the return value of itmap, with your suggestion itmap returns (index,val). I'm not sure if we want this kind of change.. only fileSD calls to itmap and I can modify it if this change sounds right. But then the use in itmap will look weird (itmap(...).next()[1] :/)
Do you think it helps? or just redundant because nobody calls to tmap? As part of infra team, I'm not sure if to keep unused functions just to have more infrastructure is the right thing to do. In my opinion, I prefer to remove tmap because of its useless.
If you initiate multiply threads with same implementation, you don't really care about the order..
-- To view, visit http://gerrit.ovirt.org/8858 To unsubscribe, visit http://gerrit.ovirt.org/settings
Gerrit-MessageType: comment Gerrit-Change-Id: I07845bfd78b9215e8994ac2ebe46a7ff78c85625 Gerrit-PatchSet: 8 Gerrit-Project: vdsm Gerrit-Branch: master Gerrit-Owner: Yaniv Bronhaim ybronhei@redhat.com Gerrit-Reviewer: Dan Kenigsberg danken@redhat.com Gerrit-Reviewer: Saggi Mizrahi smizrahi@redhat.com Gerrit-Reviewer: ShaoHe Feng shaohef@linux.vnet.ibm.com Gerrit-Reviewer: Shu Ming shuming@linux.vnet.ibm.com Gerrit-Reviewer: Yaniv Bronhaim ybronhei@redhat.com Gerrit-Reviewer: Zhou Zheng Sheng zhshzhou@linux.vnet.ibm.com
Zhou Zheng Sheng has posted comments on this change.
Change subject: Adding threads limitation to misc.tmap ......................................................................
Patch Set 8:
I think itmap and tmap are in different Paradigms. With itmap, you can start to iterate the returned object immediately regardless the threads finish their work, and in iteration you can yield another generator as well. It's like lazy evaluation.
With tmap, the function waits all the threads finish the work and returns the values in the same order. It may not be very efficient as tmap but it does mimics the behaviour of traditional map with a better performance. So if I am using these functions, sometimes I would like to use itmap, sometimes tmap.
-- To view, visit http://gerrit.ovirt.org/8858 To unsubscribe, visit http://gerrit.ovirt.org/settings
Gerrit-MessageType: comment Gerrit-Change-Id: I07845bfd78b9215e8994ac2ebe46a7ff78c85625 Gerrit-PatchSet: 8 Gerrit-Project: vdsm Gerrit-Branch: master Gerrit-Owner: Yaniv Bronhaim ybronhei@redhat.com Gerrit-Reviewer: Dan Kenigsberg danken@redhat.com Gerrit-Reviewer: Saggi Mizrahi smizrahi@redhat.com Gerrit-Reviewer: ShaoHe Feng shaohef@linux.vnet.ibm.com Gerrit-Reviewer: Shu Ming shuming@linux.vnet.ibm.com Gerrit-Reviewer: Yaniv Bronhaim ybronhei@redhat.com Gerrit-Reviewer: Zhou Zheng Sheng zhshzhou@linux.vnet.ibm.com
Shu Ming has posted comments on this change.
Change subject: Adding threads limitation to misc.tmap ......................................................................
Patch Set 2: (2 inline comments)
.................................................... File vdsm/storage/misc.py Line 1227: if maxthreads < 1 and maxthreads != UNLIMITED_THREADS: Line 1228: raise ValueError("Wrong input passed to function tmap: %s", maxthreads) Line 1229: Line 1230: resultsDict = {} Line 1231: error = [None] Interesting, so it a way to make error[0] assignment in wrapper() to be effective outside of wrapper(). Line 1232: Line 1233: def wrapper(f, arg, index): Line 1234: try: Line 1235: resultsDict[index] = f(arg)
Line 1252: if threadsQueue.full(): Line 1253: # Wait for the first unfinished thread in list to finish if we Line 1254: # have already initiate all possible thread's slots (maxthreads) Line 1255: if threadsQueue.empty(): Line 1256: raise RuntimeError("Queue is empty") If it is that, why not use more intuitive way to check that like: if maxthreads < 1: Line 1257: else: Line 1258: threadsQueue.get_nowait().join() Line 1259: t = threading.Thread(target=wrapper, args=(func, arg, i)) Line 1260: threadsQueue.put_nowait(t)
-- To view, visit http://gerrit.ovirt.org/8858 To unsubscribe, visit http://gerrit.ovirt.org/settings
Gerrit-MessageType: comment Gerrit-Change-Id: I07845bfd78b9215e8994ac2ebe46a7ff78c85625 Gerrit-PatchSet: 2 Gerrit-Project: vdsm Gerrit-Branch: master Gerrit-Owner: Yaniv Bronhaim ybronhei@redhat.com Gerrit-Reviewer: Dan Kenigsberg danken@redhat.com Gerrit-Reviewer: Saggi Mizrahi smizrahi@redhat.com Gerrit-Reviewer: ShaoHe Feng shaohef@linux.vnet.ibm.com Gerrit-Reviewer: Shu Ming shuming@linux.vnet.ibm.com Gerrit-Reviewer: Yaniv Bronhaim ybronhei@redhat.com Gerrit-Reviewer: Zhou Zheng Sheng zhshzhou@linux.vnet.ibm.com
Yaniv Bronhaim has abandoned this change.
Change subject: Adding threads limitation to misc.tmap ......................................................................
Patch Set 8: Abandoned
no need for now.
-- To view, visit http://gerrit.ovirt.org/8858 To unsubscribe, visit http://gerrit.ovirt.org/settings
Gerrit-MessageType: abandon Gerrit-Change-Id: I07845bfd78b9215e8994ac2ebe46a7ff78c85625 Gerrit-PatchSet: 8 Gerrit-Project: vdsm Gerrit-Branch: master Gerrit-Owner: Yaniv Bronhaim ybronhei@redhat.com Gerrit-Reviewer: Dan Kenigsberg danken@redhat.com Gerrit-Reviewer: Saggi Mizrahi smizrahi@redhat.com Gerrit-Reviewer: ShaoHe Feng shaohef@linux.vnet.ibm.com Gerrit-Reviewer: Shu Ming shuming@linux.vnet.ibm.com Gerrit-Reviewer: Yaniv Bronhaim ybronhei@redhat.com Gerrit-Reviewer: Zhou Zheng Sheng zhshzhou@linux.vnet.ibm.com
vdsm-patches@lists.fedorahosted.org