Saggi Mizrahi has uploaded a new change for review.
Change subject: Fix race in ProtonReactor and add better delivery semantics ......................................................................
Fix race in ProtonReactor and add better delivery semantics
Change-Id: Ie53d6f4b8a119f8a9e366b717c22ba38bcc99e80 Signed-off-by: Saggi Mizrahi smizrahi@redhat.com --- M vdsm_api/jsonrpc/protonReactor.py 1 file changed, 60 insertions(+), 9 deletions(-)
git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/56/10256/1
diff --git a/vdsm_api/jsonrpc/protonReactor.py b/vdsm_api/jsonrpc/protonReactor.py index f5363ad..75c5e4e 100644 --- a/vdsm_api/jsonrpc/protonReactor.py +++ b/vdsm_api/jsonrpc/protonReactor.py @@ -16,6 +16,7 @@ import logging import uuid from Queue import Queue, Empty +import time
import proton
@@ -27,10 +28,11 @@ class ProtonContext(object): log = logging.getLogger("jsonrpc.ProtonContext")
- def __init__(self, reactor, messageQueue, msg): + def __init__(self, reactor, messageQueue, cxtr, msg): self._reactor = reactor self._msg = msg self._mq = messageQueue + self._cxtr = cxtr
@property def data(self): @@ -42,13 +44,14 @@ msg.body = data self._mq.put_nowait(msg) self.log.debug("Message Queued") + self._reactor._activate(self._cxtr, proton.PN_CONNECTOR_WRITABLE) self._reactor._wakeup()
class ProtonReactor(object): log = logging.getLogger("jsonrpc.ProtonReactor")
- def __init__(self, address, messageHandler): + def __init__(self, address, messageHandler, deliveryTimeout=5): self._messageHandler = messageHandler host, port = address self.host = host @@ -59,6 +62,11 @@ self._driver = proton.pn_driver()
self._sessionContexts = [] + self._deliveryTimeout = deliveryTimeout + self._activationQeue = Queue() + + def _activate(self, cxtr, cond): + self._activationQeue.put_nowait((cxtr, cond))
def _convertTimeout(self, timeout): """ @@ -182,18 +190,46 @@ proton.pn_link_open(link) link = proton.pn_link_next(link, proton.PN_LOCAL_UNINIT)
- def _processDeliveries(self, conn): + def _processDeliveries(self, conn, cxtr): delivery = proton.pn_work_head(conn) while delivery: self.log.debug("Process delivery %s" % proton.pn_delivery_tag(delivery))
if proton.pn_delivery_readable(delivery): - self._processIncoming(delivery) + self._processIncoming(delivery, cxtr) elif proton.pn_delivery_writable(delivery): self._processOutgoing(delivery)
delivery = proton.pn_work_next(delivery) + + def _cleanDeliveries(self, conn): + link = proton.pn_link_head(conn, (proton.PN_LOCAL_ACTIVE)) + while link: + d = proton.pn_unsettled_head(link) + while d: + _next = proton.pn_unsettled_next(d) + disp = proton.pn_delivery_remote_state(d) + age = time.time() - proton.pn_delivery_get_context(d) + self.log.debug("Checking delivery") + if disp and disp != proton.PN_ACCEPTED: + self.log.warn("Message was not accepted by remote end") + + if disp and proton.pn_delivery_settled(d): + self.log.debug("Message settled by remote end") + proton.pn_delivery_settle(d) + + elif age > self._deliveryTimeout: + self.log.warn("Delivary not settled by remote host") + proton.pn_delivery_settle(d) + + elif proton.pn_link_state(link) & proton.PN_REMOTE_CLOSED: + self.log.warn("Link closed before settling message") + proton.pn_delivery_settle(d) + + d = _next + + link = proton.pn_link_next(link, (proton.PN_LOCAL_ACTIVE))
def _cleanLinks(self, conn): link = proton.pn_link_head(conn, (proton.PN_LOCAL_ACTIVE | @@ -249,6 +285,10 @@ else: self.log.debug("Creating delivery") proton.pn_link_set_context(sender, msg.encode()) + if proton.pn_link_credit(sender) == 0: + self.log.debug("Not enough credit, waiting") + continue + proton.pn_delivery(sender, "response-delivery-%s" % str(uuid.uuid4()))
@@ -260,7 +300,8 @@ self._openPendingSessions(conn) self._openLinks(conn) self._queueOutgoingDeliveries(conn) - self._processDeliveries(conn) + self._processDeliveries(conn, cxtr) + self._cleanDeliveries(conn) self._cleanLinks(conn) self._cleanSessions(conn)
@@ -269,7 +310,7 @@ self.log.debug("Connection Closed") proton.pn_connection_close(conn)
- def _processIncoming(self, delivery): + def _processIncoming(self, delivery, cxtr): link = proton.pn_delivery_link(delivery) ssn = proton.pn_link_session(link) msg = [] @@ -285,7 +326,8 @@ msgObj.decode(msg) ctx = proton.pn_session_get_context(ssn) mq = ctx['mqueue'] - self._messageHandler.handleMessage(ProtonContext(self, mq, msgObj)) + self._messageHandler.handleMessage(ProtonContext(self, mq, cxtr, + msgObj))
proton.pn_delivery_settle(delivery) proton.pn_link_advance(link) @@ -308,8 +350,7 @@ else: self.log.debug("Delivery finished") proton.pn_link_set_context(link, "") - # We don't care if the delivery is successful or not - proton.pn_delivery_settle(delivery) + proton.pn_delivery_set_context(delivery, time.time()) proton.pn_link_advance(link)
def start_listening(self): @@ -319,10 +360,20 @@ raise RuntimeError("Could not listen on %s:%s" % (self.host, self.port))
+ def _emptyActivationQueue(self): + while True: + try: + args = self._activationQeue.get_nowait() + except Empty: + return + else: + proton.pn_connector_activate(*args) + def process_requests(self): self._isRunning = True while self._isRunning: self._waitDriverEvent() + self._emptyActivationQueue() self._acceptConnectionRequests() self._processConnectors()
-- To view, visit http://gerrit.ovirt.org/10256 To unsubscribe, visit http://gerrit.ovirt.org/settings
Gerrit-MessageType: newchange Gerrit-Change-Id: Ie53d6f4b8a119f8a9e366b717c22ba38bcc99e80 Gerrit-PatchSet: 1 Gerrit-Project: vdsm Gerrit-Branch: master Gerrit-Owner: Saggi Mizrahi smizrahi@redhat.com