Saggi Mizrahi has uploaded a new change for review.
Change subject: broker_support ......................................................................
broker_support
Change-Id: I8e43658f1cebd637ea3abf4396d388afa041ae71 Signed-off-by: Saggi Mizrahi smizrahi@redhat.com --- A lib/stompClient.py M lib/yajsonrpc/__init__.py M lib/yajsonrpc/stomp.py M lib/yajsonrpc/stompReactor.py 4 files changed, 340 insertions(+), 215 deletions(-)
git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/68/36368/1
diff --git a/lib/stompClient.py b/lib/stompClient.py new file mode 100644 index 0000000..d3cd0ab --- /dev/null +++ b/lib/stompClient.py @@ -0,0 +1,136 @@ +#!/usr/bin/python +import yajsonrpc as yjrpc +import yajsonrpc.stompReactor as sr +import socket +from threading import Thread, Lock +import time + +_reactor = None +_reactorLock = Lock() + + +def get_reactor(): + global _reactor + if _reactor is None: + with _reactorLock: + if _reactor is None: + _reactor = sr.StompReactor() + t = Thread(target=_reactor.process_requests) + t.setDaemon(True) + t.start() + + return _reactor + + +class EchoServer(object): + def echo(self, text): + return text + + def register_server_address(self, *args, **kwargs): + pass + + def unregister_server_address(self, *args, **kwargs): + pass + + +def dummy_server(address): + sock = socket.create_connection(address) + reactor = get_reactor() + stomp_client = reactor.createClient(sock) + server = yjrpc.JsonRpcServer(EchoServer()) + t = Thread(target=server.serve_requests) + t.setDaemon(True) + t.start() + sub = stomp_client.subscribe( + sr._DEFAULT_REQUEST_DESTINATION, + message_handler=ServerRpcContextAdapter.subscription_handler(server) + ) + server._sub_ = sub + return server + + +class ServerRpcContextAdapter(object): + @classmethod + def subscription_handler(cls, server): + def handler(sub, frame): + server.queueRequest( + ( + ServerRpcContextAdapter(sub.client, frame), + frame.body + ) + ) + + return handler + + def __init__(self, client, request_frame): + self._client = client + self._reply_to = request_frame.headers.get('reply-to', None) + + def get_local_address(self, *args, **kwargs): + return "" + + def send(self, data): + if self._reply_to is None: + return + + self._client.send( + self._reply_to, + data, + { + "content-type": "application/json", + } + ) + + +class ClientRpcTransportAdapter(object): + def __init__(self, sub, destination, client): + self._sub = sub + sub.set_message_handler(self._handle_message) + self._destination = destination + self._client = client + self._message_handler = lambda arg: None + + def setMessageHandler(self, handler): + self._message_handler = handler + + def send(self, data): + headers = { + "content-type": "application/json", + "reply-to": self._sub.destination, + } + self._client.send( + data, + self._destination, + headers, + ) + + def _handle_message(self, sub, frame): + self._message_handler((self, frame.body)) + + def close(self): + self._sub.unsubscribe() + + +def connect(address): + sock = socket.create_connection(address) + reactor = get_reactor() + stomp_client = reactor.createClient(sock) + subscription = stomp_client.subscribe(sr._DEFAULT_RESPONSE_DESTINATIOM) + client = yjrpc.JsonRpcClient( + ClientRpcTransportAdapter( + subscription, + sr._DEFAULT_REQUEST_DESTINATION, + stomp_client, + ) + ) + return client + + +BROKER_ADDRESS = ("127.0.0.1", 5445) + +server = dummy_server(BROKER_ADDRESS) + +time.sleep(2) + +client = connect(BROKER_ADDRESS) +client.callMethod("echo", ["123"], 1) diff --git a/lib/yajsonrpc/__init__.py b/lib/yajsonrpc/__init__.py index 8120c03..6e485bf 100644 --- a/lib/yajsonrpc/__init__.py +++ b/lib/yajsonrpc/__init__.py @@ -50,7 +50,7 @@ log = logging.getLogger("JsonRpcInvalidRequestError")
def __init__(self, object_name, msg_content): - self.log.error("Invalid message found " + msg_content) + self.log.error("Invalid message found %s", msg_content) JsonRpcError.__init__(self, -32600, "The JSON sent is not a valid Request object " "with " + object_name) @@ -100,7 +100,7 @@ raise JsonRpcInvalidRequestError("missing method header", obj)
reqId = obj.get("id") - if not isinstance(reqId, (str, unicode)): + if not isinstance(reqId, (str, unicode, int)): raise JsonRpcInvalidRequestError("missing request identifier", obj)
@@ -151,19 +151,7 @@ @staticmethod def decode(msg): obj = json.loads(msg, 'utf-8') - - if "result" not in obj and "error" not in obj: - raise JsonRpcInvalidRequestError("missing result or error info", - obj) - - result = obj.get('result') - error = JsonRpcError(**obj.get('error')) - - reqId = obj.get('id') - if not isinstance(reqId, (str, unicode)): - raise JsonRpcInvalidRequestError("missing response identifier", - obj) - return JsonRpcResponse(result, error, reqId) + return JsonRpcResponse.fromRawObject(obj)
@staticmethod def fromRawObject(obj): @@ -178,9 +166,6 @@ error = obj.get("error")
reqId = obj.get("id") - if not isinstance(reqId, (str, unicode)): - raise JsonRpcInvalidRequestError("missing response identifier", - obj)
return JsonRpcResponse(result, error, reqId)
@@ -297,12 +282,6 @@ self._lock = Lock() self._eventcbs = []
- def setTimeout(self, timeout): - self._transport.setTimeout(timeout) - - def connect(self): - self._transport.connect() - def callMethod(self, methodName, params=[], rid=None): resp = self.call(JsonRpcRequest(methodName, params, rid))[0] if resp.error: @@ -354,6 +333,12 @@
resp = JsonRpcResponse.fromRawObject(resp) with self._lock: + if resp.id is None: + self.log.warning( + "Got an error from server without an ID (%s)", + resp.error, + ) + ctx = self._runningRequests.pop(resp.id)
ctx.addResponse(resp) diff --git a/lib/yajsonrpc/stomp.py b/lib/yajsonrpc/stomp.py index 142b22a..250b0bd 100644 --- a/lib/yajsonrpc/stomp.py +++ b/lib/yajsonrpc/stomp.py @@ -16,13 +16,10 @@ import logging import os import socket -from threading import Timer, Event +from threading import Timer from uuid import uuid4 from collections import deque -import time
-from betterAsyncore import Dispatcher -import asyncore import re
_RE_ESCAPE_SEQUENCE = re.compile(r"\(.)") @@ -53,6 +50,16 @@ CONNECTED = "CONNECTED" ERROR = "ERROR" RECEIPT = "RECEIPT" + + +class Headers: + CONTENT_LENGTH = "content-length" + CONTENT_TYPE = "content-type" + SUBSCRIPTION = "subscription" + DESTINATION = "destination" + ACCEPT_VERSION = "accept-version" + REPLY_TO = "reply-to" + HOST = "host"
COMMANDS = tuple([command for command in dir(Command) @@ -120,8 +127,11 @@ def decodeValue(s): # Make sure to leave this check before decoding as ':' can appear in the # value after decoding using \c - if ":" in s: - raise ValueError("Contains illigal charachter `:`") + # Disabled due to bug in hornetq: + # https://issues.jboss.org/browse/HORNETQ-1454 + + # if ":" in s: + # raise ValueError("Contains illigal charachter `:`")
try: s = _RE_ESCAPE_SEQUENCE.sub( @@ -278,141 +288,15 @@ return None
-class Client(object): - def __init__(self, sock=None): - """ - Initialize the client. - - The socket parameter can be an already initialized socket. Should be - used to pass specialized socket objects like SSL sockets. - """ - if sock is None: - sock = self._sock = socket.socket() - else: - self._sock = sock - - self._map = {} - # Because we don't know how large the frames are - # we have to use non bolocking IO - sock.setblocking(False) - - # We have our own timeout for operations we - # pretend to be synchronous (like connect). - self._timeout = None - self._connected = Event() - self._subscriptions = {} - - self._aclient = None - self._adisp = None - - self._inbox = deque() - - @property - def outgoing(self): - return self._adisp.outgoing - - def _registerSubscription(self, sub): - self._subscriptions[sub.id] = sub - - def _unregisterSubscription(self, sub): - del self._subscriptions[sub.id] - - @property - def connected(self): - return self._connected.isSet() - - def handle_connect(self, aclient, frame): - self._connected.set() - - def handle_message(self, aclient, frame): - self._inbox.append(frame) - - def process(self, timeout=None): - if timeout is None: - timeout = self._timeout - - asyncore.loop(use_poll=True, timeout=timeout, map=self._map, count=1) - - def connect(self, address, hostname): - sock = self._sock - - self._aclient = AsyncClient(self, hostname) - adisp = self._adisp = AsyncDispatcher(self._aclient) - disp = self._disp = Dispatcher(adisp, sock, self._map) - sock.setblocking(True) - disp.connect(address) - sock.setblocking(False) - self.recv() # wait for CONNECTED msg - - if not self._connected.isSet(): - sock.close() - raise socket.error() - - def recv(self): - timeout = self._timeout - s = time.time() - duration = 0 - while timeout is None or duration < timeout: - try: - return self._inbox.popleft() - except IndexError: - td = timeout - duration if timeout is not None else None - self.process(td) - duration = time.time() - s - - return None - - def put_subscribe(self, destination, ack=None): - subid = self._aclient.subscribe(self._adisp, destination, ack) - sub = Subscription(self, subid, ack) - self._registerSubscription(sub) - return sub - - def put_send(self, destination, data="", headers=None): - self._aclient.send(self._adisp, destination, data, headers) - - def put(self, frame): - self._adisp.send_raw(frame) - - def send(self): - disp = self._disp - timeout = self._timeout - duration = 0 - s = time.time() - while ((timeout is None or duration < timeout) and - (disp.writable() or not self._connected.isSet())): - td = timeout - duration if timeout is not None else None - self.process(td) - duration = time.time() - s - - def gettimout(self): - return self._timeout - - def settimeout(self, value): - self._timeout = value - - class AsyncDispatcher(object): log = logging.getLogger("stomp.AsyncDispatcher")
- def __init__(self, frameHandler, bufferSize=4096): - self._frameHandler = frameHandler + def __init__(self, frame_handler, bufferSize=4096): + self._frame_handler = frame_handler self._bufferSize = bufferSize self._parser = Parser() - self._outbox = deque() self._outbuf = None self._outgoingHeartBeat = 0 - - def _queueFrame(self, frame): - self._outbox.append(frame) - - @property - def outgoing(self): - n = len(self._outbox) - if self._outbuf != "": - n += 1 - - return n
def setHeartBeat(self, outgoing, incoming=0): if incoming != 0: @@ -422,7 +306,8 @@ self._outgoingHeartBeat = outgoing
def handle_connect(self, dispatcher): - self._frameHandler.handle_connect(self) + self._outbuf = None + self._frame_handler.handle_connect(self)
def handle_read(self, dispatcher): pending = self._bufferSize @@ -444,11 +329,10 @@ if data is not None: parser.parse(data)
- frameHandler = self._frameHandler + frame_handler = self._frame_handler while parser.pending > 0: frame = parser.popFrame() - if hasattr(frameHandler, "handle_frame"): - frameHandler.handle_frame(self, frame) + frame_handler.handle_frame(self, frame)
def popFrame(self): return self._parser.popFrame() @@ -456,7 +340,7 @@ def handle_write(self, dispatcher): if self._outbuf is None: try: - frame = self._outbox.popleft() + frame = self._frame_handler.peek_message() except IndexError: return
@@ -467,14 +351,16 @@ self._lastOutgoingTimeStamp = self._milis() if numSent == len(data): self._outbuf = None + # Throw away the frame that was sent to the server + self._frame_handler.pop_message() else: self._outbuf = data[numSent:]
- def send_raw(self, frame): - self._queueFrame(frame) - def writable(self, dispatcher): - if len(self._outbox) > 0 or self._outbuf is not None: + if self._frame_handler.has_outgoing_messages: + return True + + if self._outbuf is not None: return True
if (self._outgoingHeartBeat > 0 @@ -493,51 +379,79 @@
class AsyncClient(object): - log = logging.getLogger("yajsonrpc.protocols.stomp.AsyncClient") + log = logging.getLogger("yajsonrpc.stomp.AsyncClient")
- def __init__(self, frameHandler, hostname): + def __init__(self, hostname): self._hostname = hostname - self._frameHandler = frameHandler self._connected = False + self._outbox = deque() self._error = None + self._subscriptions = {} self._commands = { Command.CONNECTED: self._process_connected, Command.MESSAGE: self._process_message, Command.RECEIPT: self._process_receipt, - Command.ERROR: self._process_error} + Command.ERROR: self._process_error, + }
@property def connected(self): return self._connected
+ def queue_frame(self, frame): + self._outbox.append(frame) + + @property + def has_outgoing_messages(self): + return (self._outbox.count > 0) + + def peek_message(self): + return self._outbox[0] + + def pop_message(self): + return self._outbox.popleft() + def getLastError(self): return self._error
- def handle_connect(self, dispatcher): + def handle_connect(self): hostname = self._hostname - frame = Frame( + # TODO : reset subscriptions + # We use appendleft to make sure this is the first frame we send in + # case of a reconnect + self._outbox.appendleft(Frame( Command.CONNECT, - {"accept-version": "1.2", - "host": hostname}) - - dispatcher.send_raw(frame) + { + Headers.ACCEPT_VERSION: "1.2", + Headers.HOST: hostname, + } + ))
def handle_frame(self, dispatcher, frame): self._commands[frame.command](frame, dispatcher)
def _process_connected(self, frame, dispatcher): self._connected = True - frameHandler = self._frameHandler - if hasattr(frameHandler, "handle_connect"): - frameHandler.handle_connect(self, frame)
self.log.debug("Stomp connection established")
def _process_message(self, frame, dispatcher): - frameHandler = self._frameHandler + sub_id = frame.headers.get(Headers.SUBSCRIPTION) + if sub_id is None: + self.log.warning( + "Got message without a subscription" + ) + return
- if hasattr(frameHandler, "handle_message"): - frameHandler.handle_message(self, frame) + sub = self._subscriptions.get(sub_id) + if sub is None: + self.log.warning( + "Got message without an unknown subscription id '%s'", + sub_id + ) + return + + sub._handle_message(frame)
def _process_receipt(self, frame, dispatcher): self.log.warning("Receipt frame received and ignored") @@ -545,42 +459,86 @@ def _process_error(self, frame, dispatcher): raise StompError(frame)
- def send(self, dispatcher, destination, data="", headers=None): - frame = Frame( + def send(self, destination, data="", headers=None): + final_headers = {"destination": destination} + final_headers.update(headers) + self.queue_frame(Frame( Command.SEND, - {"destination": destination}, - data) + final_headers, + data + ))
- dispatcher.send_raw(frame) - - def subscribe(self, dispatcher, destination, ack=None): + def subscribe( + self, + destination, + ack=None, + sub_id=None, + message_handler=None + ): if ack is None: ack = AckMode.AUTO
- subscriptionID = str(uuid4()) + if message_handler is None: + message_handler = lambda sub, frame: None
- frame = Frame( + if sub_id is None: + sub_id = str(uuid4()) + + self.queue_frame(Frame( Command.SUBSCRIBE, - {"destination": destination, - "ack": ack, - "id": subscriptionID}) + { + "destination": destination, + "ack": ack, + "id": sub_id + } + ))
- dispatcher.send_raw(frame) + sub = Subscription( + self, + destination, + sub_id, + ack, + message_handler, + ) + self._subscriptions[sub_id] = sub
- return subscriptionID + return sub
class Subscription(object): - def __init__(self, client, subid, ack): + def __init__( + self, + client, + destination, + subid, + ack, + message_handler + ): self._ack = ack self._subid = subid self._client = client self._valid = True + self._message_handler = message_handler + self._destination = destination + + def _handle_message(self, frame): + self._message_handler(self, frame) + + def set_message_handler(self, handler): + self._message_handler = handler
@property def id(self): return self._subid
+ @property + def destination(self): + return self._destination + + @property + def client(self): + return self._client + def unsubscribe(self): client = self._client subid = self._subid diff --git a/lib/yajsonrpc/stompReactor.py b/lib/yajsonrpc/stompReactor.py index 3aa85e8..68c21b6 100644 --- a/lib/yajsonrpc/stompReactor.py +++ b/lib/yajsonrpc/stompReactor.py @@ -17,6 +17,7 @@ import os import threading import logging +from collections import deque
import stomp
@@ -27,8 +28,10 @@ _STATE_MSG = "Waiting for message"
-_DEFAULT_RESPONSE_DESTINATIOM = "/queue/_local/vdsm/reponses" -_DEFAULT_REQUEST_DESTINATION = "/queue/_local/vdsm/requests" +_DEFAULT_RESPONSE_DESTINATIOM = "jms.topic.vdsm_legacy_responses" +_DEFAULT_REQUEST_DESTINATION = "jms.topic.vdsm_legacy_requests" + +_FAKE_SUB_ID = "__vdsm_fake_broker__"
def parseHeartBeatHeader(v): @@ -55,6 +58,7 @@
def __init__(self, reactor, messageHandler): self._reactor = reactor + self._outbox = deque() self._messageHandler = messageHandler self._commands = { stomp.Command.CONNECT: self._cmd_connect, @@ -62,13 +66,30 @@ stomp.Command.SUBSCRIBE: self._cmd_subscribe, stomp.Command.UNSUBSCRIBE: self._cmd_unsubscribe}
+ @property + def has_outgoing_messages(self): + return (self._outbox.count > 0) + + def peek_message(self): + return self._outbox[0] + + def pop_message(self): + return self._outbox.popleft() + + def queue_frame(self, frame): + self._outbox.append(frame) + def _cmd_connect(self, dispatcher, frame): self.log.info("Processing CONNECT request") version = frame.headers.get("accept-version", None) if version != "1.2": - res = stomp.Frame(stomp.Command.ERROR, None, "Version unsupported") + resp = stomp.Frame( + stomp.Command.ERROR, + None, + "Version unsupported" + ) else: - res = stomp.Frame(stomp.Command.CONNECTED, {"version": "1.2"}) + resp = stomp.Frame(stomp.Command.CONNECTED, {"version": "1.2"}) cx, cy = parseHeartBeatHeader( frame.headers.get("heart-beat", "0,0") ) @@ -79,10 +100,10 @@
# The server can send a heart-beat every cy ms and doesn't want # to receive any heart-beat from the client. - res.headers["heart-beat"] = "%d,0" % (cy,) + resp.headers["heart-beat"] = "%d,0" % (cy,) dispatcher.setHeartBeat(cy)
- dispatcher.send_raw(res) + self.queue_frame(resp) self._reactor.wakeup()
def _cmd_subscribe(self, dispatcher, frame): @@ -109,11 +130,12 @@ self._reactor = reactor self._messageHandler = None
+ self._aclient = aclient adisp = self._adisp = stomp.AsyncDispatcher(aclient) self._dispatcher = Dispatcher(adisp, sock=sock, map=reactor._map)
def send_raw(self, msg): - self._adisp.send_raw(msg) + self._aclient.queue_frame(msg) self._reactor.wakeup()
def setTimeout(self, timeout): @@ -161,10 +183,15 @@
def send(self, message): self.log.debug("Sending response") - res = stomp.Frame(stomp.Command.MESSAGE, - {"destination": _DEFAULT_RESPONSE_DESTINATIOM, - "content-type": "application/json"}, - message) + res = stomp.Frame( + stomp.Command.MESSAGE, + { + stomp.Headers.DESTINATION: _DEFAULT_RESPONSE_DESTINATIOM, + stomp.Headers.SUBSCRIPTION: _FAKE_SUB_ID, + stomp.Headers.CONTENT_TYPE: "application/json", + }, + message + ) self._stompConn.send_raw(res)
def close(self): @@ -182,12 +209,13 @@ self._messageHandler = None self._socket = sock
- self._aclient = stomp.AsyncClient(self, "vdsm") + self._aclient = stomp.AsyncClient("vdsm") self._stompConn = _StompConnection( self._aclient, sock, reactor ) + self._aclient.handle_connect()
def setTimeout(self, timeout): self._stompConn.setTimeout(timeout) @@ -195,7 +223,7 @@ def connect(self): self._stompConn.connect()
- def handle_message(self, impl, frame): + def handle_message(self, sub, frame): if self._messageHandler is not None: self._messageHandler((self, frame.body))
@@ -207,11 +235,29 @@ if isinstance(self._socket, SSLSocket) and self._socket.pending() > 0: self._stompConn._dispatcher.handle_read()
- def send(self, message): + def subscribe( + self, + *args, + **kwargs + ): + return self._aclient.subscribe(*args, **kwargs) + + def send( + self, + message, + destination=None, + headers=None + ): + if destination is None: + destination = _DEFAULT_REQUEST_DESTINATION + self.log.debug("Sending response") - self._aclient.send(self._stompConn, _DEFAULT_REQUEST_DESTINATION, - message, - {"content-type": "application/json"}) + self._aclient.send( + destination, + message, + headers + ) + self._reactor.wakeup()
def close(self): self._stompConn.close()
oVirt Jenkins CI Server has posted comments on this change.
Change subject: broker_support ......................................................................
Patch Set 1:
Build Failed
http://jenkins.ovirt.org/job/vdsm_master_unit_tests_gerrit_el/13734/ : FAILURE
http://jenkins.ovirt.org/job/vdsm_master_unit-tests_created/14691/ : FAILURE
http://jenkins.ovirt.org/job/vdsm_master_pep8_gerrit/14523/ : SUCCESS
Piotr Kliczewski has posted comments on this change.
Change subject: broker_support ......................................................................
Patch Set 1: Code-Review-1
(6 comments)
http://gerrit.ovirt.org/#/c/36368/1/lib/yajsonrpc/__init__.py File lib/yajsonrpc/__init__.py:
Line 98: method = obj.get("method") Line 99: if method is None: Line 100: raise JsonRpcInvalidRequestError("missing method header", obj) Line 101: Line 102: reqId = obj.get("id") Jsonrpc notifications are similar to requests and here we could not check for presence of Id. Line 103: if not isinstance(reqId, (str, unicode, int)): Line 104: raise JsonRpcInvalidRequestError("missing request identifier", Line 105: obj) Line 106:
Line 164: Line 165: result = obj.get("result") Line 166: error = obj.get("error") Line 167: Line 168: reqId = obj.get("id") According to spec id member is REQUIRED. Line 169: Line 170: return JsonRpcResponse(result, error, reqId) Line 171: Line 172:
Line 340: ) Line 341: Line 342: ctx = self._runningRequests.pop(resp.id) Line 343: Line 344: ctx.addResponse(resp) At this stage we will gate AttributeError if resp.id is None. Line 345: Line 346: self._finalizeCtx(ctx) Line 347: Line 348: def _isResponse(self, obj):
http://gerrit.ovirt.org/#/c/36368/1/lib/yajsonrpc/stomp.py File lib/yajsonrpc/stomp.py:
Line 58: SUBSCRIPTION = "subscription" Line 59: DESTINATION = "destination" Line 60: ACCEPT_VERSION = "accept-version" Line 61: REPLY_TO = "reply-to" Line 62: HOST = "host" Can you give a description for headers which are not part of stomp spec? Line 63: Line 64: Line 65: COMMANDS = tuple([command for command in dir(Command) Line 66: if not command.startswith('_')])
Line 437: Line 438: def _process_message(self, frame, dispatcher): Line 439: sub_id = frame.headers.get(Headers.SUBSCRIPTION) Line 440: if sub_id is None: Line 441: self.log.warning( subscription header is not required by spec so we should not force it. Line 442: "Got message without a subscription" Line 443: ) Line 444: return Line 445:
http://gerrit.ovirt.org/#/c/36368/1/lib/yajsonrpc/stompReactor.py File lib/yajsonrpc/stompReactor.py:
Line 200: def get_local_address(self): Line 201: return self._socket.getsockname()[0] Line 202: Line 203: Line 204: class StompClient(object): How do you want to send events? I do not see a way to do so with structure of this code. Line 205: log = logging.getLogger("jsonrpc.AsyncoreClient") Line 206: Line 207: def __init__(self, sock, reactor): Line 208: self._reactor = reactor
Piotr Kliczewski has posted comments on this change.
Change subject: broker_support ......................................................................
Patch Set 1:
(1 comment)
http://gerrit.ovirt.org/#/c/36368/1/lib/yajsonrpc/stompReactor.py File lib/yajsonrpc/stompReactor.py:
Line 27: _STATE_LEN = "Waiting for message length" Line 28: _STATE_MSG = "Waiting for message" Line 29: Line 30: Line 31: _DEFAULT_RESPONSE_DESTINATIOM = "jms.topic.vdsm_legacy_responses" This naming convention won't work for one of considered broker typologies. We should not make assumptions like this. Line 32: _DEFAULT_REQUEST_DESTINATION = "jms.topic.vdsm_legacy_requests" Line 33: Line 34: _FAKE_SUB_ID = "__vdsm_fake_broker__" Line 35:
automation@ovirt.org has posted comments on this change.
Change subject: [WIP] broker_support ......................................................................
Patch Set 2:
* Update tracker::IGNORE, no Bug-Url found * Check Bug-Url::WARN, no bug url found, make sure header matches 'Bug-Url: ' and is a valid url. * Check merged to previous::IGNORE, Not in stable branch (['ovirt-3.5', 'ovirt-3.4', 'ovirt-3.3'])
oVirt Jenkins CI Server has posted comments on this change.
Change subject: [WIP] broker_support ......................................................................
Patch Set 2:
Build Failed
http://jenkins.ovirt.org/job/vdsm_master_pep8_gerrit/15976/ : FAILURE
http://jenkins.ovirt.org/job/vdsm_master_unit_tests_gerrit_el/15175/ : FAILURE
http://jenkins.ovirt.org/job/vdsm_master_unit-tests_created/16146/ : FAILURE
http://jenkins.ovirt.org/job/vdsm_master_unit-tests_created_staging/952/ : FAILURE
automation@ovirt.org has posted comments on this change.
Change subject: [WIP] broker_support ......................................................................
Patch Set 3:
* Update tracker::IGNORE, no Bug-Url found * Check Bug-Url::WARN, no bug url found, make sure header matches 'Bug-Url: ' and is a valid url. * Check merged to previous::IGNORE, Not in stable branch (['ovirt-3.5', 'ovirt-3.4', 'ovirt-3.3'])
automation@ovirt.org has posted comments on this change.
Change subject: [WIP] broker_support ......................................................................
Patch Set 4:
* Update tracker::IGNORE, no Bug-Url found * Check Bug-Url::WARN, no bug url found, make sure header matches 'Bug-Url: ' and is a valid url. * Check merged to previous::IGNORE, Not in stable branch (['ovirt-3.5', 'ovirt-3.4', 'ovirt-3.3'])
oVirt Jenkins CI Server has posted comments on this change.
Change subject: [WIP] broker_support ......................................................................
Patch Set 4:
Build Failed
http://jenkins.ovirt.org/job/vdsm_master_pep8_gerrit/16337/ : SUCCESS
http://jenkins.ovirt.org/job/vdsm_master_unit_tests_gerrit_el/15537/ : FAILURE
http://jenkins.ovirt.org/job/vdsm_master_unit-tests_created/16507/ : FAILURE
automation@ovirt.org has posted comments on this change.
Change subject: [WIP] broker_support ......................................................................
Patch Set 5:
* Update tracker::IGNORE, no Bug-Url found * Check Bug-Url::WARN, no bug url found, make sure header matches 'Bug-Url: ' and is a valid url. * Check merged to previous::IGNORE, Not in stable branch (['ovirt-3.5', 'ovirt-3.4', 'ovirt-3.3'])
oVirt Jenkins CI Server has posted comments on this change.
Change subject: [WIP] broker_support ......................................................................
Patch Set 5:
Build Failed
http://jenkins.ovirt.org/job/vdsm_master_pep8_gerrit/16445/ : SUCCESS
http://jenkins.ovirt.org/job/vdsm_master_unit-tests_created/16616/ : ABORTED
http://jenkins.ovirt.org/job/vdsm_master_unit_tests_el_gerrit/15644/ : FAILURE
Piotr Kliczewski has posted comments on this change.
Change subject: [WIP] broker_support ......................................................................
Patch Set 5: Verified-1
(1 comment)
https://gerrit.ovirt.org/#/c/36368/5/lib/yajsonrpc/stomp.py File lib/yajsonrpc/stomp.py:
Line 409: self._outbox.append(frame) Line 410: Line 411: @property Line 412: def has_outgoing_messages(self): Line 413: return (self._outbox.count > 0) When running on el6:
AttributeError: 'collections.deque' object has no attribute 'count' Line 414: Line 415: def peek_message(self): Line 416: return self._outbox[0] Line 417:
Piotr Kliczewski has posted comments on this change.
Change subject: [WIP] broker_support ......................................................................
Patch Set 5:
(1 comment)
https://gerrit.ovirt.org/#/c/36368/5/lib/yajsonrpc/stompReactor.py File lib/yajsonrpc/stompReactor.py:
Line 64: stomp.Command.UNSUBSCRIBE: self._cmd_unsubscribe} Line 65: Line 66: @property Line 67: def has_outgoing_messages(self): Line 68: return (self._outbox.count > 0) When running on el6: AttributeError: 'collections.deque' object has no attribute 'count' Line 69: Line 70: def peek_message(self): Line 71: return self._outbox[0] Line 72:
automation@ovirt.org has posted comments on this change.
Change subject: stomp: client side subscription ......................................................................
Patch Set 6:
* Update tracker::IGNORE, no Bug-Url found * Check Bug-Url::WARN, no bug url found, make sure header matches 'Bug-Url: ' and is a valid url. * Check merged to previous::IGNORE, Not in stable branch (['ovirt-3.5', 'ovirt-3.4', 'ovirt-3.3'])
Piotr Kliczewski has posted comments on this change.
Change subject: stomp: client side subscription ......................................................................
Patch Set 6: Verified-1
We do not send heartbeats anymore.
automation@ovirt.org has posted comments on this change.
Change subject: stomp: client side subscription ......................................................................
Patch Set 7:
* Update tracker::IGNORE, no Bug-Url found * Check Bug-Url::WARN, no bug url found, make sure header matches 'Bug-Url: ' and is a valid url. * Check merged to previous::IGNORE, Not in stable branch (['ovirt-3.5', 'ovirt-3.4', 'ovirt-3.3'])
Piotr Kliczewski has posted comments on this change.
Change subject: stomp: client side subscription ......................................................................
Patch Set 7: Verified+1
Using engine 3.5 to host install vdsm and seeing no communication issues.
oVirt Jenkins CI Server has posted comments on this change.
Change subject: stomp: client side subscription ......................................................................
Patch Set 7:
Build Failed
http://jenkins.ovirt.org/job/vdsm_master_pep8_gerrit/16649/ : SUCCESS
http://jenkins.ovirt.org/job/vdsm_master_unit-tests_created/16820/ : FAILURE
automation@ovirt.org has posted comments on this change.
Change subject: stomp: client side subscription ......................................................................
Patch Set 8:
* Update tracker::IGNORE, no Bug-Url found * Check Bug-Url::WARN, no bug url found, make sure header matches 'Bug-Url: ' and is a valid url. * Check merged to previous::IGNORE, Not in stable branch (['ovirt-3.5', 'ovirt-3.4', 'ovirt-3.3'])
Piotr Kliczewski has posted comments on this change.
Change subject: stomp: client side subscription ......................................................................
Patch Set 8: Verified+1
Rebased, verified with engine 3.5
oVirt Jenkins CI Server has posted comments on this change.
Change subject: stomp: client side subscription ......................................................................
Patch Set 8:
Build Failed
http://jenkins.ovirt.org/job/vdsm_master_pep8_gerrit/16749/ : SUCCESS
http://jenkins.ovirt.org/job/vdsm_master_unit-tests_created/16921/ : FAILURE
automation@ovirt.org has posted comments on this change.
Change subject: stomp: client side subscription ......................................................................
Patch Set 9:
* Update tracker::IGNORE, no Bug-Url found * Check Bug-Url::WARN, no bug url found, make sure header matches 'Bug-Url: ' and is a valid url. * Check merged to previous::IGNORE, Not in stable branch (['ovirt-3.5', 'ovirt-3.4', 'ovirt-3.3'])
oVirt Jenkins CI Server has posted comments on this change.
Change subject: stomp: client side subscription ......................................................................
Patch Set 9:
Build Failed
http://jenkins.ovirt.org/job/vdsm_master_pep8_gerrit/16780/ : SUCCESS
http://jenkins.ovirt.org/job/vdsm_master_unit-tests_created/16952/ : FAILURE
automation@ovirt.org has posted comments on this change.
Change subject: stomp: client side subscription ......................................................................
Patch Set 10:
* Update tracker::IGNORE, no Bug-Url found * Check Bug-Url::WARN, no bug url found, make sure header matches 'Bug-Url: ' and is a valid url. * Check merged to previous::IGNORE, Not in stable branch (['ovirt-3.5', 'ovirt-3.4', 'ovirt-3.3'])
Francesco Romani has posted comments on this change.
Change subject: stomp: client side subscription ......................................................................
Patch Set 10:
(4 comments)
initial review, more will come after
https://gerrit.ovirt.org/#/c/36368/10/lib/yajsonrpc/stomp.py File lib/yajsonrpc/stomp.py:
Line 288: class AsyncDispatcher(object): Line 289: log = logging.getLogger("stomp.AsyncDispatcher") Line 290: Line 291: def __init__(self, frame_handler, bufferSize=4096): Line 292: self._frame_handler = frame_handler unrelated (but nice) rename? Line 293: self._bufferSize = bufferSize Line 294: self._parser = Parser() Line 295: self._outbuf = None Line 296: self._outgoing_heartbeat_in_milis = 0
Line 302: self._update_outgoing_heartbeat() Line 303: self._outgoing_heartbeat_in_milis = outgoing Line 304: Line 305: def handle_connect(self, dispatcher): Line 306: self._outbuf = None why we do need this now? Line 307: self._frame_handler.handle_connect(self) Line 308: Line 309: def handle_read(self, dispatcher): Line 310: try:
https://gerrit.ovirt.org/#/c/36368/10/lib/yajsonrpc/stompReactor.py File lib/yajsonrpc/stompReactor.py:
Line 74: def pop_message(self): Line 75: return self._outbox.popleft() Line 76: Line 77: def queue_frame(self, frame): Line 78: self._outbox.append(frame) seems duplicated code. Any chance to factor it out? Line 79: Line 80: def _cmd_connect(self, dispatcher, frame): Line 81: self.log.info("Processing CONNECT request") Line 82: version = frame.headers.get("accept-version", None)
https://gerrit.ovirt.org/#/c/36368/10/tests/jsonRpcHelper.py File tests/jsonRpcHelper.py:
Line 101: if handler.NAME == type: Line 102: reactor = handler._reactor Line 103: Line 104: if not client: Line 105: def client(client_socket): def? maybe something is missing here? Line 106: return StompRpcClient( Line 107: reactor.createClient(client_socket), Line 108: _FAKE_SUB_ID, Line 109: _FAKE_SUB_ID,
Piotr Kliczewski has posted comments on this change.
Change subject: stomp: client side subscription ......................................................................
Patch Set 10:
(4 comments)
https://gerrit.ovirt.org/#/c/36368/10/lib/yajsonrpc/stomp.py File lib/yajsonrpc/stomp.py:
Line 288: class AsyncDispatcher(object): Line 289: log = logging.getLogger("stomp.AsyncDispatcher") Line 290: Line 291: def __init__(self, frame_handler, bufferSize=4096): Line 292: self._frame_handler = frame_handler
unrelated (but nice) rename?
not related to the change. Just cleaning up. Line 293: self._bufferSize = bufferSize Line 294: self._parser = Parser() Line 295: self._outbuf = None Line 296: self._outgoing_heartbeat_in_milis = 0
Line 302: self._update_outgoing_heartbeat() Line 303: self._outgoing_heartbeat_in_milis = outgoing Line 304: Line 305: def handle_connect(self, dispatcher): Line 306: self._outbuf = None
why we do need this now?
it is a leftover should be in older patch. Will move it there. Line 307: self._frame_handler.handle_connect(self) Line 308: Line 309: def handle_read(self, dispatcher): Line 310: try:
https://gerrit.ovirt.org/#/c/36368/10/lib/yajsonrpc/stompReactor.py File lib/yajsonrpc/stompReactor.py:
Line 74: def pop_message(self): Line 75: return self._outbox.popleft() Line 76: Line 77: def queue_frame(self, frame): Line 78: self._outbox.append(frame)
seems duplicated code. Any chance to factor it out?
which part of it is duplicated? I am happy to refactor if needed. Line 79: Line 80: def _cmd_connect(self, dispatcher, frame): Line 81: self.log.info("Processing CONNECT request") Line 82: version = frame.headers.get("accept-version", None)
https://gerrit.ovirt.org/#/c/36368/10/tests/jsonRpcHelper.py File tests/jsonRpcHelper.py:
Line 101: if handler.NAME == type: Line 102: reactor = handler._reactor Line 103: Line 104: if not client: Line 105: def client(client_socket):
def? maybe something is missing here?
the code is intended to be like that. Line 106: return StompRpcClient( Line 107: reactor.createClient(client_socket), Line 108: _FAKE_SUB_ID, Line 109: _FAKE_SUB_ID,
oVirt Jenkins CI Server has posted comments on this change.
Change subject: stomp: client side subscription ......................................................................
Patch Set 11:
Build Started (1/2) -> http://jenkins.ovirt.org/job/vdsm_master_pep8_gerrit/16956/
automation@ovirt.org has posted comments on this change.
Change subject: stomp: client side subscription ......................................................................
Patch Set 11:
* Update tracker::IGNORE, no Bug-Url found * Check Bug-Url::WARN, no bug url found, make sure header matches 'Bug-Url: ' and is a valid url. * Check merged to previous::IGNORE, Not in stable branch (['ovirt-3.5', 'ovirt-3.4', 'ovirt-3.3'])
Piotr Kliczewski has posted comments on this change.
Change subject: stomp: client side subscription ......................................................................
Patch Set 11: Verified+1
Rebased, fixed comments, verified with latest engine (master).
oVirt Jenkins CI Server has posted comments on this change.
Change subject: stomp: client side subscription ......................................................................
Patch Set 11:
Build Started (2/2) -> http://jenkins.ovirt.org/job/vdsm_master_unit-tests_created/17129/
oVirt Jenkins CI Server has posted comments on this change.
Change subject: stomp: client side subscription ......................................................................
Patch Set 11:
Build Failed
http://jenkins.ovirt.org/job/vdsm_master_pep8_gerrit/16956/ : SUCCESS
http://jenkins.ovirt.org/job/vdsm_master_unit-tests_created/17129/ : FAILURE
Yaniv Bronhaim has posted comments on this change.
Change subject: stomp: client side subscription ......................................................................
Patch Set 11: Code-Review-1
(2 comments)
https://gerrit.ovirt.org/#/c/36368/11//COMMIT_MSG Commit Message:
Line 7: stomp: client side subscription Line 8: Line 9: When connecting to the server we need to send subscribe frame to receive Line 10: messages. Line 11: can you explain more what is done by this patch - removing client, jsonrpcClient - removing timeout (why?), adding outbox queue (how should it work?) and how asyncClient should work? Line 12: Change-Id: I8e43658f1cebd637ea3abf4396d388afa041ae71
https://gerrit.ovirt.org/#/c/36368/11/lib/yajsonrpc/__init__.py File lib/yajsonrpc/__init__.py:
Line 164: Line 165: result = obj.get("result") Line 166: error = obj.get("error") Line 167: Line 168: reqId = obj.get("id") why don't you need to check here -
if not isinstance(reqId, (str, unicode, int)):
? Line 169: Line 170: return JsonRpcResponse(result, error, reqId) Line 171: Line 172:
Piotr Kliczewski has posted comments on this change.
Change subject: stomp: client side subscription ......................................................................
Patch Set 11:
(2 comments)
https://gerrit.ovirt.org/#/c/36368/11//COMMIT_MSG Commit Message:
Line 7: stomp: client side subscription Line 8: Line 9: When connecting to the server we need to send subscribe frame to receive Line 10: messages. Line 11:
can you explain more what is done by this patch - removing client, jsonrpcC
Will update the message. Line 12: Change-Id: I8e43658f1cebd637ea3abf4396d388afa041ae71
https://gerrit.ovirt.org/#/c/36368/11/lib/yajsonrpc/__init__.py File lib/yajsonrpc/__init__.py:
Line 164: Line 165: result = obj.get("result") Line 166: error = obj.get("error") Line 167: Line 168: reqId = obj.get("id")
why don't you need to check here -
because notification do not contain id (according to spec) and we want to use the same code to parse both. Line 169: Line 170: return JsonRpcResponse(result, error, reqId) Line 171: Line 172:
automation@ovirt.org has posted comments on this change.
Change subject: stomp: client side subscription ......................................................................
Patch Set 12:
* Update tracker::IGNORE, no Bug-Url found * Check Bug-Url::WARN, no bug url found, make sure header matches 'Bug-Url: ' and is a valid url. * Check merged to previous::IGNORE, Not in stable branch (['ovirt-3.5', 'ovirt-3.4', 'ovirt-3.3'])
oVirt Jenkins CI Server has posted comments on this change.
Change subject: stomp: client side subscription ......................................................................
Patch Set 12:
Build Started (1/2) -> http://jenkins.ovirt.org/job/vdsm_master_pep8_gerrit/17577/
Piotr Kliczewski has posted comments on this change.
Change subject: stomp: client side subscription ......................................................................
Patch Set 12: Verified+1
Verified by running with the latest engine. Host installed, NFS domain configured and vm provisioned.
oVirt Jenkins CI Server has posted comments on this change.
Change subject: stomp: client side subscription ......................................................................
Patch Set 12:
Build Started (2/2) -> http://jenkins.ovirt.org/job/vdsm_master_unit-tests_created/17751/
oVirt Jenkins CI Server has posted comments on this change.
Change subject: stomp: client side subscription ......................................................................
Patch Set 12:
Build Successful
http://jenkins.ovirt.org/job/vdsm_master_pep8_gerrit/17577/ : SUCCESS
http://jenkins.ovirt.org/job/vdsm_master_unit-tests_created/17751/ : SUCCESS
oVirt Jenkins CI Server has posted comments on this change.
Change subject: stomp: client side subscription ......................................................................
Patch Set 13:
Build Started (1/2) -> http://jenkins.ovirt.org/job/vdsm_master_unit-tests_created/17790/
automation@ovirt.org has posted comments on this change.
Change subject: stomp: client side subscription ......................................................................
Patch Set 13:
* Update tracker::IGNORE, no Bug-Url found * Check Bug-Url::WARN, no bug url found, make sure header matches 'Bug-Url: ' and is a valid url. * Check merged to previous::IGNORE, Not in stable branch (['ovirt-3.5', 'ovirt-3.4', 'ovirt-3.3'])
oVirt Jenkins CI Server has posted comments on this change.
Change subject: stomp: client side subscription ......................................................................
Patch Set 13:
Build Started (2/2) -> http://jenkins.ovirt.org/job/vdsm_master_pep8_gerrit/17616/
oVirt Jenkins CI Server has posted comments on this change.
Change subject: stomp: client side subscription ......................................................................
Patch Set 13:
Build Successful
http://jenkins.ovirt.org/job/vdsm_master_pep8_gerrit/17616/ : SUCCESS
http://jenkins.ovirt.org/job/vdsm_master_unit-tests_created/17790/ : SUCCESS
Piotr Kliczewski has posted comments on this change.
Change subject: stomp: client side subscription ......................................................................
Patch Set 13: Verified+1
Verified by updating vdsm and seeing that there are no issues during communication.
automation@ovirt.org has posted comments on this change.
Change subject: stomp: client side subscription ......................................................................
Patch Set 14:
* Update tracker::IGNORE, no Bug-Url found * Check Bug-Url::WARN, no bug url found, make sure header matches 'Bug-Url: ' and is a valid url. * Check merged to previous::IGNORE, Not in stable branch (['ovirt-3.5', 'ovirt-3.4', 'ovirt-3.3'])
oVirt Jenkins CI Server has posted comments on this change.
Change subject: stomp: client side subscription ......................................................................
Patch Set 14:
Build Started (1/2) -> http://jenkins.ovirt.org/job/vdsm_master_pep8_gerrit/17778/
oVirt Jenkins CI Server has posted comments on this change.
Change subject: stomp: client side subscription ......................................................................
Patch Set 14:
Build Started (2/2) -> http://jenkins.ovirt.org/job/vdsm_master_unit-tests_created/17949/
Yaniv Bronheim has posted comments on this change.
Change subject: stomp: client side subscription ......................................................................
Patch Set 14: Code-Review-1
please split format changes and client removal from this patch. the changes to keep the subscription id and the reply-to header are quite rare - it will be much easier to review that way
Piotr Kliczewski has posted comments on this change.
Change subject: stomp: client side subscription ......................................................................
Patch Set 14:
Will split this patch.
automation@ovirt.org has posted comments on this change.
Change subject: stomp: client side subscription ......................................................................
Patch Set 15:
* Update tracker::IGNORE, no Bug-Url found * Check Bug-Url::WARN, no bug url found, make sure header matches 'Bug-Url: ' and is a valid url. * Check merged to previous::IGNORE, Not in stable branch (['ovirt-3.5', 'ovirt-3.4', 'ovirt-3.3'])
oVirt Jenkins CI Server has posted comments on this change.
Change subject: stomp: client side subscription ......................................................................
Patch Set 15:
Build Started (1/2) -> http://jenkins.ovirt.org/job/vdsm_master_pep8_gerrit/18042/
Piotr Kliczewski has posted comments on this change.
Change subject: stomp: client side subscription ......................................................................
Patch Set 15: Verified+1
Patch split into several smaller.
Verified by host deploying vdsm containing all patches from events topic. Storage domain was provisioned and vm was created. Flow of events when changing vm state was observed.
oVirt Jenkins CI Server has posted comments on this change.
Change subject: stomp: client side subscription ......................................................................
Patch Set 15:
Build Started (2/2)
0 -> http://jenkins.ovirt.org/job/vdsm_master_unit-tests_created/1271/
oVirt Jenkins CI Server has posted comments on this change.
Change subject: stomp: client side subscription ......................................................................
Patch Set 15:
Build Failed
http://jenkins.ovirt.org/job/vdsm_master_pep8_gerrit/18042/ : SUCCESS
http://jenkins.ovirt.org/job/vdsm_master_unit-tests_created/1271/ : 0
automation@ovirt.org has posted comments on this change.
Change subject: stomp: client side subscription ......................................................................
Patch Set 16:
* Update tracker::IGNORE, no Bug-Url found * Check Bug-Url::WARN, no bug url found, make sure header matches 'Bug-Url: ' and is a valid url. * Check merged to previous::IGNORE, Not in stable branch (['ovirt-3.5', 'ovirt-3.4', 'ovirt-3.3'])
oVirt Jenkins CI Server has posted comments on this change.
Change subject: stomp: client side subscription ......................................................................
Patch Set 16:
Build Started (1/2) -> http://jenkins.ovirt.org/job/vdsm_master_pep8_gerrit/18089/
oVirt Jenkins CI Server has posted comments on this change.
Change subject: stomp: client side subscription ......................................................................
Patch Set 16:
Build Started (2/2)
0 -> http://jenkins.ovirt.org/job/vdsm_master_unit-tests_created/1318/
oVirt Jenkins CI Server has posted comments on this change.
Change subject: stomp: client side subscription ......................................................................
Patch Set 16:
Build Successful
http://jenkins.ovirt.org/job/vdsm_master_pep8_gerrit/18089/ : SUCCESS
http://jenkins.ovirt.org/job/vdsm_master_unit-tests_created/1318/ : 0
Yaniv Bronhaim has posted comments on this change.
Change subject: stomp: client side subscription ......................................................................
Patch Set 16:
(2 comments)
https://gerrit.ovirt.org/#/c/36368/16/lib/yajsonrpc/stompReactor.py File lib/yajsonrpc/stompReactor.py:
Line 74: def pop_message(self): Line 75: return self._outbox.popleft() Line 76: Line 77: def queue_frame(self, frame): Line 78: self._outbox.append(frame) this code is duplicated - i'd write stompOutbox class Line 79: Line 80: def _cmd_connect(self, dispatcher, frame): Line 81: self.log.info("Processing CONNECT request") Line 82: version = frame.headers.get("accept-version", None)
Line 361: response_queue Line 362: ): Line 363: sub_id = None Line 364: if request_queue == _FAKE_SUB_ID: Line 365: sub_id = _FAKE_SUB_ID sub_id = _FAKE_SUB_ID if request_queue == _FAKE_SUB_ID else None Line 366: Line 367: return JsonRpcClient( Line 368: ClientRpcTransportAdapter( Line 369: stomp_client.subscribe(response_queue, sub_id=sub_id),
Piotr Kliczewski has posted comments on this change.
Change subject: stomp: client side subscription ......................................................................
Patch Set 16:
(2 comments)
https://gerrit.ovirt.org/#/c/36368/16/lib/yajsonrpc/stompReactor.py File lib/yajsonrpc/stompReactor.py:
Line 74: def pop_message(self): Line 75: return self._outbox.popleft() Line 76: Line 77: def queue_frame(self, frame): Line 78: self._outbox.append(frame)
this code is duplicated - i'd write stompOutbox class
I am not sure what do you suggest. Do you want to have deque wrapper? Line 79: Line 80: def _cmd_connect(self, dispatcher, frame): Line 81: self.log.info("Processing CONNECT request") Line 82: version = frame.headers.get("accept-version", None)
Line 361: response_queue Line 362: ): Line 363: sub_id = None Line 364: if request_queue == _FAKE_SUB_ID: Line 365: sub_id = _FAKE_SUB_ID
sub_id = _FAKE_SUB_ID if request_queue == _FAKE_SUB_ID else None
Done Line 366: Line 367: return JsonRpcClient( Line 368: ClientRpcTransportAdapter( Line 369: stomp_client.subscribe(response_queue, sub_id=sub_id),
Francesco Romani has posted comments on this change.
Change subject: stomp: client side subscription ......................................................................
Patch Set 16:
(7 comments)
initial review, I need (much) more time to properly understand this big and complex patch.
https://gerrit.ovirt.org/#/c/36368/16//COMMIT_MSG Commit Message:
Line 7: stomp: client side subscription Line 8: Line 9: In this patch we introduce concept of subscription for client Line 10: perspective. We move queuing functionality out of AsyncDispatcher to Line 11: frame_handler. New StompRpcClient class is repsonsible for sending typo: "repsonsible" vs "responsible" Line 12: subscriptions and ClientRpcTransportAdapter class adds 'reply-to' header Line 13: using subscriptions id. Line 14: Line 15:
https://gerrit.ovirt.org/#/c/36368/16/lib/yajsonrpc/stomp.py File lib/yajsonrpc/stomp.py:
Line 51: ERROR = "ERROR" Line 52: RECEIPT = "RECEIPT" Line 53: Line 54: Line 55: class Headers: even though for enumeration-like classes is no harm, we should always innherit from object until we switch to python 3. Line 56: CONTENT_LENGTH = "content-length" Line 57: CONTENT_TYPE = "content-type" Line 58: SUBSCRIPTION = "subscription" Line 59: DESTINATION = "destination"
Line 505: return sub Line 506: Line 507: def unsubscribe(self, sub): Line 508: self.queue_frame(Frame(Command.UNSUBSCRIBE, Line 509: {"id": sub.id})) Who cleans self._subscriptions ? Line 510: Line 511: Line 512: class _Subscription(object): Line 513:
https://gerrit.ovirt.org/#/c/36368/16/lib/yajsonrpc/stompReactor.py File lib/yajsonrpc/stompReactor.py:
Line 97: cy = max(cy, 1000) Line 98: Line 99: # The server can send a heart-beat every cy ms and doesn't want Line 100: # to receive any heart-beat from the client. Line 101: resp.headers["heart-beat"] = "%d,0" % (cy,) minor nit: no need of use a tuple here for 'cy', this should work fine and save a few characters:
resp.headers["heart-beat"] = "%d,0" % cy
If you want to do this change, let's do in a (far) future different patch. Line 102: dispatcher.setHeartBeat(cy) Line 103: Line 104: self.queue_frame(resp) Line 105: self._reactor.wakeup()
Line 127: self._socket = sock Line 128: self._reactor = reactor Line 129: self._messageHandler = None Line 130: Line 131: self._aclient = aclient maybe _async_client is a bit more explanatory here Line 132: adisp = self._adisp = stomp.AsyncDispatcher(aclient) Line 133: self._dispatcher = Dispatcher(adisp, sock=sock, map=reactor._map) Line 134: Line 135: def send_raw(self, msg):
Line 198: return self._socket.getsockname()[0] Line 199: Line 200: Line 201: class StompClient(object): Line 202: log = logging.getLogger("jsonrpc.AsyncoreClient") for a future patch: this seems to get a stale logger now. Line 203: Line 204: def __init__(self, sock, reactor): Line 205: self._reactor = reactor Line 206: self._messageHandler = None
Line 219: Line 220: def connect(self): Line 221: self._stompConn.connect() Line 222: Line 223: def handle_message(self, sub, frame): why renamed? moreover, it seems unused (also in the old code) Line 224: if self._messageHandler is not None: Line 225: self._messageHandler((self, frame.body)) Line 226: Line 227: def setMessageHandler(self, msgHandler):
Piotr Kliczewski has posted comments on this change.
Change subject: stomp: client side subscription ......................................................................
Patch Set 16:
(7 comments)
https://gerrit.ovirt.org/#/c/36368/16//COMMIT_MSG Commit Message:
Line 7: stomp: client side subscription Line 8: Line 9: In this patch we introduce concept of subscription for client Line 10: perspective. We move queuing functionality out of AsyncDispatcher to Line 11: frame_handler. New StompRpcClient class is repsonsible for sending
typo: "repsonsible" vs "responsible"
Done Line 12: subscriptions and ClientRpcTransportAdapter class adds 'reply-to' header Line 13: using subscriptions id. Line 14: Line 15:
https://gerrit.ovirt.org/#/c/36368/16/lib/yajsonrpc/stomp.py File lib/yajsonrpc/stomp.py:
Line 51: ERROR = "ERROR" Line 52: RECEIPT = "RECEIPT" Line 53: Line 54: Line 55: class Headers:
even though for enumeration-like classes is no harm, we should always innhe
Done Line 56: CONTENT_LENGTH = "content-length" Line 57: CONTENT_TYPE = "content-type" Line 58: SUBSCRIPTION = "subscription" Line 59: DESTINATION = "destination"
Line 505: return sub Line 506: Line 507: def unsubscribe(self, sub): Line 508: self.queue_frame(Frame(Command.UNSUBSCRIBE, Line 509: {"id": sub.id}))
Who cleans self._subscriptions ?
Cleanup is in separate patch. Line 510: Line 511: Line 512: class _Subscription(object): Line 513:
https://gerrit.ovirt.org/#/c/36368/16/lib/yajsonrpc/stompReactor.py File lib/yajsonrpc/stompReactor.py:
Line 97: cy = max(cy, 1000) Line 98: Line 99: # The server can send a heart-beat every cy ms and doesn't want Line 100: # to receive any heart-beat from the client. Line 101: resp.headers["heart-beat"] = "%d,0" % (cy,)
minor nit: no need of use a tuple here for 'cy', this should work fine and
Done Line 102: dispatcher.setHeartBeat(cy) Line 103: Line 104: self.queue_frame(resp) Line 105: self._reactor.wakeup()
Line 127: self._socket = sock Line 128: self._reactor = reactor Line 129: self._messageHandler = None Line 130: Line 131: self._aclient = aclient
maybe _async_client is a bit more explanatory here
Done Line 132: adisp = self._adisp = stomp.AsyncDispatcher(aclient) Line 133: self._dispatcher = Dispatcher(adisp, sock=sock, map=reactor._map) Line 134: Line 135: def send_raw(self, msg):
Line 198: return self._socket.getsockname()[0] Line 199: Line 200: Line 201: class StompClient(object): Line 202: log = logging.getLogger("jsonrpc.AsyncoreClient")
for a future patch: this seems to get a stale logger now.
It is still used in send method. Line 203: Line 204: def __init__(self, sock, reactor): Line 205: self._reactor = reactor Line 206: self._messageHandler = None
Line 219: Line 220: def connect(self): Line 221: self._stompConn.connect() Line 222: Line 223: def handle_message(self, sub, frame):
why renamed? moreover, it seems unused (also in the old code)
It is removed in following patch. Line 224: if self._messageHandler is not None: Line 225: self._messageHandler((self, frame.body)) Line 226: Line 227: def setMessageHandler(self, msgHandler):
automation@ovirt.org has posted comments on this change.
Change subject: stomp: client side subscription ......................................................................
Patch Set 17:
* Update tracker::IGNORE, no Bug-Url found * Check Bug-Url::WARN, no bug url found, make sure header matches 'Bug-Url: ' and is a valid url. * Check merged to previous::IGNORE, Not in stable branch (['ovirt-3.5', 'ovirt-3.4', 'ovirt-3.3'])
Piotr Kliczewski has posted comments on this change.
Change subject: stomp: client side subscription ......................................................................
Patch Set 17: Verified+1
Piotr Kliczewski has posted comments on this change.
Change subject: stomp: client side subscription ......................................................................
Patch Set 17:
Verified by installing a host, creating vm and seeing events being processed by the engine.
There are some issues noticed after suspending a vm but not related to this changes.
oVirt Jenkins CI Server has posted comments on this change.
Change subject: stomp: client side subscription ......................................................................
Patch Set 17:
Build Started (1/2) -> http://jenkins.ovirt.org/job/vdsm_master_pep8_gerrit/18274/
oVirt Jenkins CI Server has posted comments on this change.
Change subject: stomp: client side subscription ......................................................................
Patch Set 17:
Build Started (2/2)
0 -> http://jenkins.ovirt.org/job/vdsm_master_unit-tests_created/1504/
oVirt Jenkins CI Server has posted comments on this change.
Change subject: stomp: client side subscription ......................................................................
Patch Set 17:
Build Successful
http://jenkins.ovirt.org/job/vdsm_master_pep8_gerrit/18274/ : SUCCESS
http://jenkins.ovirt.org/job/vdsm_master_unit-tests_created/1504/ : 0
automation@ovirt.org has posted comments on this change.
Change subject: stomp: client side subscription ......................................................................
Patch Set 18:
* Update tracker::IGNORE, no Bug-Url found * Check Bug-Url::WARN, no bug url found, make sure header matches 'Bug-Url: ' and is a valid url. * Check merged to previous::IGNORE, Not in stable branch (['ovirt-3.5', 'ovirt-3.4', 'ovirt-3.3'])
automation@ovirt.org has posted comments on this change.
Change subject: stomp: client side subscription ......................................................................
Patch Set 19:
* Update tracker::IGNORE, no Bug-Url found * Check Bug-Url::WARN, no bug url found, make sure header matches 'Bug-Url: ' and is a valid url. * Check merged to previous::IGNORE, Not in stable branch (['ovirt-3.5', 'ovirt-3.4', 'ovirt-3.3'])
Yaniv Bronhaim has posted comments on this change.
Change subject: stomp: client side subscription ......................................................................
Patch Set 19:
(3 comments)
https://gerrit.ovirt.org/#/c/36368/19/tests/jsonRpcHelper.py File tests/jsonRpcHelper.py:
Line 94: xml_handler = [h for h in acceptor._handlers if h.NAME == type] Line 95: for (method, name) in bridge.getBridgeMethods(): Line 96: xml_handler[0].xml_binding.server.register_function(method, Line 97: name) Line 98: client = create why don't you just return XMLClient? its the same.. Line 99: else: Line 100: for handler in acceptor._handlers: Line 101: if handler.NAME == type: Line 102: reactor = handler._reactor
Line 100: for handler in acceptor._handlers: Line 101: if handler.NAME == type: Line 102: reactor = handler._reactor Line 103: Line 104: if not client: this can be under the else where client is still not set anyway Line 105: def client(client_socket): Line 106: return StompRpcClient( Line 107: reactor.createClient(client_socket), Line 108: _FAKE_SUB_ID,
Line 101: if handler.NAME == type: Line 102: reactor = handler._reactor Line 103: Line 104: if not client: Line 105: def client(client_socket): define the function outside and assign it as in the xml case. Line 106: return StompRpcClient( Line 107: reactor.createClient(client_socket), Line 108: _FAKE_SUB_ID, Line 109: _FAKE_SUB_ID,
Piotr Kliczewski has posted comments on this change.
Change subject: stomp: client side subscription ......................................................................
Patch Set 19:
(3 comments)
https://gerrit.ovirt.org/#/c/36368/19/tests/jsonRpcHelper.py File tests/jsonRpcHelper.py:
Line 94: xml_handler = [h for h in acceptor._handlers if h.NAME == type] Line 95: for (method, name) in bridge.getBridgeMethods(): Line 96: xml_handler[0].xml_binding.server.register_function(method, Line 97: name) Line 98: client = create
why don't you just return XMLClient? its the same..
This code was part of 3.5 already. Let's change it to look better as you suggest. Line 99: else: Line 100: for handler in acceptor._handlers: Line 101: if handler.NAME == type: Line 102: reactor = handler._reactor
Line 100: for handler in acceptor._handlers: Line 101: if handler.NAME == type: Line 102: reactor = handler._reactor Line 103: Line 104: if not client:
this can be under the else where client is still not set anyway
As above Line 105: def client(client_socket): Line 106: return StompRpcClient( Line 107: reactor.createClient(client_socket), Line 108: _FAKE_SUB_ID,
Line 101: if handler.NAME == type: Line 102: reactor = handler._reactor Line 103: Line 104: if not client: Line 105: def client(client_socket):
define the function outside and assign it as in the xml case.
as above. Line 106: return StompRpcClient( Line 107: reactor.createClient(client_socket), Line 108: _FAKE_SUB_ID, Line 109: _FAKE_SUB_ID,
Piotr Kliczewski has posted comments on this change.
Change subject: stomp: client side subscription ......................................................................
Patch Set 19:
(1 comment)
https://gerrit.ovirt.org/#/c/36368/19/tests/jsonRpcHelper.py File tests/jsonRpcHelper.py:
Line 101: if handler.NAME == type: Line 102: reactor = handler._reactor Line 103: Line 104: if not client: Line 105: def client(client_socket):
as above.
We can't define it outside due to reactor instance available only locally in constructClient. Line 106: return StompRpcClient( Line 107: reactor.createClient(client_socket), Line 108: _FAKE_SUB_ID, Line 109: _FAKE_SUB_ID,
Yeela Kaplan has posted comments on this change.
Change subject: stomp: client side subscription ......................................................................
Patch Set 19: Code-Review-1
(6 comments)
https://gerrit.ovirt.org/#/c/36368/19/lib/yajsonrpc/stomp.py File lib/yajsonrpc/stomp.py:
Line 297: self shouldn't is_empty return True when deque is empty, meaning len==0?
Looks like it's doing the opposite...
Line 379: _frame_handler why not use pop_message in the first place?
Line 484: None I think 'if headers' might be enough...
https://gerrit.ovirt.org/#/c/36368/19/lib/yajsonrpc/stompReactor.py File lib/yajsonrpc/stompReactor.py:
Line 234: def subscribe( Line 235: self, Line 236: *args, Line 237: **kwargs Line 238: ): Why are the lines for the arguments separated? That's different than any other vdsm modules. And much less readable...
I think it would be better to change it everywhere around this patch.. Line 239: return self._aclient.subscribe(*args, **kwargs) Line 240: Line 241: def send( Line 242: self,
Line 242: self, Line 243: message, Line 244: destination=None, Line 245: headers=None Line 246: ): same Line 247: if destination is None: Line 248: destination = _DEFAULT_REQUEST_DESTINATION Line 249: Line 250: self.log.debug("Sending response")
Line 357: def StompRpcClient( Line 358: stomp_client, Line 359: request_queue, Line 360: response_queue Line 361: ): same Line 362: sub_id = _FAKE_SUB_ID if request_queue == _FAKE_SUB_ID else None Line 363: Line 364: return JsonRpcClient( Line 365: ClientRpcTransportAdapter(
Piotr Kliczewski has posted comments on this change.
Change subject: stomp: client side subscription ......................................................................
Patch Set 19:
(5 comments)
https://gerrit.ovirt.org/#/c/36368/19/lib/yajsonrpc/stomp.py File lib/yajsonrpc/stomp.py:
Line 297: self
shouldn't is_empty return True when deque is empty, meaning len==0?
Will fix
Line 379: _frame_handler
why not use pop_message in the first place?
There could be more than single message in a frame.
https://gerrit.ovirt.org/#/c/36368/19/lib/yajsonrpc/stompReactor.py File lib/yajsonrpc/stompReactor.py:
Line 234: def subscribe( Line 235: self, Line 236: *args, Line 237: **kwargs Line 238: ):
Why are the lines for the arguments separated?
Will update. Line 239: return self._aclient.subscribe(*args, **kwargs) Line 240: Line 241: def send( Line 242: self,
Line 242: self, Line 243: message, Line 244: destination=None, Line 245: headers=None Line 246: ):
same
Done Line 247: if destination is None: Line 248: destination = _DEFAULT_REQUEST_DESTINATION Line 249: Line 250: self.log.debug("Sending response")
Line 357: def StompRpcClient( Line 358: stomp_client, Line 359: request_queue, Line 360: response_queue Line 361: ):
same
Done Line 362: sub_id = _FAKE_SUB_ID if request_queue == _FAKE_SUB_ID else None Line 363: Line 364: return JsonRpcClient( Line 365: ClientRpcTransportAdapter(
Piotr Kliczewski has posted comments on this change.
Change subject: stomp: client side subscription ......................................................................
Patch Set 19:
(1 comment)
https://gerrit.ovirt.org/#/c/36368/19/lib/yajsonrpc/stomp.py File lib/yajsonrpc/stomp.py:
Line 484: None
I think 'if headers' might be enough...
why?
Yaniv Bronhaim has posted comments on this change.
Change subject: stomp: client side subscription ......................................................................
Patch Set 19:
(1 comment)
https://gerrit.ovirt.org/#/c/36368/19/lib/yajsonrpc/stompReactor.py File lib/yajsonrpc/stompReactor.py:
Line 332: self._destination = destination Line 333: self._client = client Line 334: self._message_handler = lambda arg: None Line 335: Line 336: def setMessageHandler(self, handler): quite weird.. you set the handler in __init__ and allow to set another handler afterwards but will never use the new one. im confused. Line 337: self._message_handler = handler Line 338: Line 339: def send(self, data): Line 340: headers = {
Piotr Kliczewski has posted comments on this change.
Change subject: stomp: client side subscription ......................................................................
Patch Set 19:
(1 comment)
https://gerrit.ovirt.org/#/c/36368/19/lib/yajsonrpc/stompReactor.py File lib/yajsonrpc/stompReactor.py:
Line 332: self._destination = destination Line 333: self._client = client Line 334: self._message_handler = lambda arg: None Line 335: Line 336: def setMessageHandler(self, handler):
quite weird.. you set the handler in __init__ and allow to set another hand
This is contract that we use for setting a message handler. Line 337: self._message_handler = handler Line 338: Line 339: def send(self, data): Line 340: headers = {
Yaniv Bronhaim has posted comments on this change.
Change subject: stomp: client side subscription ......................................................................
Patch Set 19: Code-Review-1
(1 comment)
https://gerrit.ovirt.org/#/c/36368/19/lib/yajsonrpc/stompReactor.py File lib/yajsonrpc/stompReactor.py:
Line 332: self._destination = destination Line 333: self._client = client Line 334: self._message_handler = lambda arg: None Line 335: Line 336: def setMessageHandler(self, handler):
This is contract that we use for setting a message handler.
did you sign on that? at least call it from the constructor with none. don't you need to set this handler also to the subscription\sub (please change) ? it smalls bad Line 337: self._message_handler = handler Line 338: Line 339: def send(self, data): Line 340: headers = {
Yaniv Bronhaim has posted comments on this change.
Change subject: stomp: client side subscription ......................................................................
Patch Set 19:
(2 comments)
https://gerrit.ovirt.org/#/c/36368/19/lib/yajsonrpc/stomp.py File lib/yajsonrpc/stomp.py:
Line 379: _frame_handler
There could be more than single message in a frame.
so add comment and explain it above the peek_message call.
Line 547: self._message_handler = message_handler Line 548: self._destination = destination Line 549: Line 550: def handle_message(self, frame): Line 551: self._message_handler(self, frame) message_handler can be none as far as I see.
I really don't understand this message_handler pass over and rolling it from object to object. can't you have one singleton for each handler that we expose (what is it - json handler, stomp handler?)
it's impossible to follow which handler you pass where and if you change it in runtime (which I don't understand any reason to change the handler during runtime). if something that handle messages have states and in each state it uses different handler - the flow should be understandable by reading the class itself and not the usages which control its state from outside. Line 552: Line 553: def set_message_handler(self, handler): Line 554: self._message_handler = handler Line 555:
Piotr Kliczewski has posted comments on this change.
Change subject: stomp: client side subscription ......................................................................
Patch Set 19:
(1 comment)
https://gerrit.ovirt.org/#/c/36368/19/lib/yajsonrpc/stomp.py File lib/yajsonrpc/stomp.py:
Line 379: _frame_handler
so add comment and explain it above the peek_message call.
Done
Piotr Kliczewski has posted comments on this change.
Change subject: stomp: client side subscription ......................................................................
Patch Set 19:
(1 comment)
https://gerrit.ovirt.org/#/c/36368/19/lib/yajsonrpc/stomp.py File lib/yajsonrpc/stomp.py:
Line 547: self._message_handler = message_handler Line 548: self._destination = destination Line 549: Line 550: def handle_message(self, frame): Line 551: self._message_handler(self, frame)
message_handler can be none as far as I see.
Will provide docstring to give more understanding. Line 552: Line 553: def set_message_handler(self, handler): Line 554: self._message_handler = handler Line 555:
Piotr Kliczewski has posted comments on this change.
Change subject: stomp: client side subscription ......................................................................
Patch Set 19:
(1 comment)
https://gerrit.ovirt.org/#/c/36368/19/lib/yajsonrpc/stompReactor.py File lib/yajsonrpc/stompReactor.py:
Line 332: self._destination = destination Line 333: self._client = client Line 334: self._message_handler = lambda arg: None Line 335: Line 336: def setMessageHandler(self, handler):
did you sign on that? at least call it from the constructor with none. don'
I understand that is not easy to follow which object is actually handling the messages will create docstring to clarify. Line 337: self._message_handler = handler Line 338: Line 339: def send(self, data): Line 340: headers = {
automation@ovirt.org has posted comments on this change.
Change subject: stomp: client side subscription ......................................................................
Patch Set 20:
* Update tracker::IGNORE, no Bug-Url found * Check Bug-Url::WARN, no bug url found, make sure header matches 'Bug-Url: ' and is a valid url. * Check merged to previous::IGNORE, Not in stable branch (['ovirt-3.5', 'ovirt-3.4', 'ovirt-3.3'])
Piotr Kliczewski has posted comments on this change.
Change subject: stomp: client side subscription ......................................................................
Patch Set 20: Verified+1
Fixed comments, rebased and verified by installing a host and configuring nfs storage domain.
Yaniv Bronhaim has posted comments on this change.
Change subject: stomp: client side subscription ......................................................................
Patch Set 20:
(2 comments)
https://gerrit.ovirt.org/#/c/36368/20/lib/yajsonrpc/stomp.py File lib/yajsonrpc/stomp.py:
Line 533: """ Line 534: In order to process message we need to set message Line 535: handler which is responsible for processing jsonrpc Line 536: content of the message. Currently there are 2 handlers: Line 537: JsonRpcClient and JsonRpcServer. what do I pass , the class itself or the _handleMessage function of it? Line 538: """ Line 539: def set_message_handler(self, handler): Line 540: self._message_handler = handler Line 541:
https://gerrit.ovirt.org/#/c/36368/20/tests/jsonRpcTests.py File tests/jsonRpcTests.py:
Line 84 Line 85 Line 86 Line 87 Line 88 this is new.. was it intend?
Piotr Kliczewski has posted comments on this change.
Change subject: stomp: client side subscription ......................................................................
Patch Set 20:
(2 comments)
https://gerrit.ovirt.org/#/c/36368/20/lib/yajsonrpc/stomp.py File lib/yajsonrpc/stomp.py:
Line 533: """ Line 534: In order to process message we need to set message Line 535: handler which is responsible for processing jsonrpc Line 536: content of the message. Currently there are 2 handlers: Line 537: JsonRpcClient and JsonRpcServer.
what do I pass , the class itself or the _handleMessage function of it?
Above classes handle messages depending on which side of communication they are. I will update the docstring. Line 538: """ Line 539: def set_message_handler(self, handler): Line 540: self._message_handler = handler Line 541:
https://gerrit.ovirt.org/#/c/36368/20/tests/jsonRpcTests.py File tests/jsonRpcTests.py:
Line 84 Line 85 Line 86 Line 87 Line 88
this is new.. was it intend?
yes. this is no longer needed.
Dan Kenigsberg has posted comments on this change.
Change subject: stomp: client side subscription ......................................................................
Patch Set 20:
(9 comments)
https://gerrit.ovirt.org/#/c/36368/20/lib/yajsonrpc/__init__.py File lib/yajsonrpc/__init__.py:
Line 275: Line 276: class JsonRpcClient(object): Line 277: def __init__(self, transport): Line 278: self.log = logging.getLogger("jsonrpc.JsonRpcClient") Line 279: transport.set_message_handler(self._handleMessage) can you place these pep8 renames in their own patch? Line 280: self._transport = transport Line 281: self._runningRequests = {} Line 282: self._lock = Lock() Line 283: self._eventcbs = []
Line 337: self.log.warning( Line 338: "Got an error from server without an ID (%s)", Line 339: resp.error, Line 340: ) Line 341: unrelated Line 342: ctx = self._runningRequests.pop(resp.id) Line 343: Line 344: ctx.addResponse(resp) Line 345:
https://gerrit.ovirt.org/#/c/36368/20/lib/yajsonrpc/stomp.py File lib/yajsonrpc/stomp.py:
Line 40: "\n": "\n", Line 41: } Line 42: Line 43: Line 44: class Command(object): new-style is good - but could you place these cleanups in a separate thread Line 45: MESSAGE = "MESSAGE" Line 46: SEND = "SEND" Line 47: SUBSCRIBE = "SUBSCRIBE" Line 48: UNSUBSCRIBE = "UNSUBSCRIBE"
Line 287: except IndexError: Line 288: return None Line 289: Line 290: Line 291: class Outbox(object): can you place this refactor in its own patch? Line 292: Line 293: def __init__(self): Line 294: self._outbox = deque() Line 295:
Line 402: return int(round(monotonic_time() * 1000)) Line 403: Line 404: Line 405: class AsyncClient(object): Line 406: log = logging.getLogger("yajsonrpc.stomp.AsyncClient") unrelated log name change Line 407: Line 408: def __init__(self): Line 409: self._connected = False Line 410: self._outbox = Outbox()
Line 516: self.queue_frame(Frame(Command.UNSUBSCRIBE, Line 517: {"id": sub.id})) Line 518: Line 519: Line 520: class _Subscription(object): could you introduce this newly-used class in a separate patch? Line 521: Line 522: def __init__(self, client, destination, subid, ack, message_handler): Line 523: self._ack = ack Line 524: self._subid = subid
https://gerrit.ovirt.org/#/c/36368/20/lib/yajsonrpc/stompReactor.py File lib/yajsonrpc/stompReactor.py:
Line 24: _STATE_LEN = "Waiting for message length" Line 25: _STATE_MSG = "Waiting for message" Line 26: Line 27: Line 28: _DEFAULT_RESPONSE_DESTINATIOM = "jms.topic.vdsm_legacy_responses" please move the change to the patch that handle 3.5, 3.6+, and non-engine requests. The introduction of "legacy mode" is unrelated to multiple subscriptions. Line 29: _DEFAULT_REQUEST_DESTINATION = "jms.topic.vdsm_legacy_requests" Line 30: Line 31: _FAKE_SUB_ID = "__vdsm_fake_broker__" Line 32:
Line 87: ) Line 88: else: Line 89: resp = stomp.Frame(stomp.Command.CONNECTED, {"version": "1.2"}) Line 90: cx, cy = parseHeartBeatHeader( Line 91: frame.headers.get(stomp.Headers.HEARTEBEAT, "0,0") even introduce Headers constants is simpler if left to its own patch. Line 92: ) Line 93: Line 94: # Make sure the heart-beat interval is sane Line 95: if cy != 0:
https://gerrit.ovirt.org/#/c/36368/20/tests/jsonRpcTests.py File tests/jsonRpcTests.py:
Line 84 Line 85 Line 86 Line 87 Line 88
yes. this is no longer needed.
Done
Dan Kenigsberg has posted comments on this change.
Change subject: stomp: client side subscription ......................................................................
Patch Set 20:
(1 comment)
https://gerrit.ovirt.org/#/c/36368/20//COMMIT_MSG Commit Message:
Line 9: In this patch we introduce concept of subscription for client Line 10: perspective. We move queuing functionality out of AsyncDispatcher to Line 11: frame_handler. New StompRpcClient class is responsible for sending Line 12: subscriptions and ClientRpcTransportAdapter class adds 'reply-to' header Line 13: using subscriptions id. In this patch we introduce the concept of client subscription, which allows multiple clients.
Each client subscribes to its own destination. When processed, the responses to this client's requests are posted to that destination only.
We move queuing functionality out of AsyncDispatcher to frame_handler. introduce new subscription classes (the existing one has never been used) ...
(try to rephrase and add info) Line 14: Line 15: Line 16: Change-Id: I8e43658f1cebd637ea3abf4396d388afa041ae71 Line 17: Signed-off-by: Saggi Mizrahi smizrahi@redhat.com
Piotr Kliczewski has posted comments on this change.
Change subject: stomp: client side subscription ......................................................................
Patch Set 20:
(9 comments)
https://gerrit.ovirt.org/#/c/36368/20//COMMIT_MSG Commit Message:
Line 9: In this patch we introduce concept of subscription for client Line 10: perspective. We move queuing functionality out of AsyncDispatcher to Line 11: frame_handler. New StompRpcClient class is responsible for sending Line 12: subscriptions and ClientRpcTransportAdapter class adds 'reply-to' header Line 13: using subscriptions id.
In this patch we introduce the concept of client subscription, which allows
Done Line 14: Line 15: Line 16: Change-Id: I8e43658f1cebd637ea3abf4396d388afa041ae71 Line 17: Signed-off-by: Saggi Mizrahi smizrahi@redhat.com
https://gerrit.ovirt.org/#/c/36368/20/lib/yajsonrpc/__init__.py File lib/yajsonrpc/__init__.py:
Line 275: Line 276: class JsonRpcClient(object): Line 277: def __init__(self, transport): Line 278: self.log = logging.getLogger("jsonrpc.JsonRpcClient") Line 279: transport.set_message_handler(self._handleMessage)
can you place these pep8 renames in their own patch?
Done Line 280: self._transport = transport Line 281: self._runningRequests = {} Line 282: self._lock = Lock() Line 283: self._eventcbs = []
Line 337: self.log.warning( Line 338: "Got an error from server without an ID (%s)", Line 339: resp.error, Line 340: ) Line 341:
unrelated
Done Line 342: ctx = self._runningRequests.pop(resp.id) Line 343: Line 344: ctx.addResponse(resp) Line 345:
https://gerrit.ovirt.org/#/c/36368/20/lib/yajsonrpc/stomp.py File lib/yajsonrpc/stomp.py:
Line 40: "\n": "\n", Line 41: } Line 42: Line 43: Line 44: class Command(object):
new-style is good - but could you place these cleanups in a separate thread
Done Line 45: MESSAGE = "MESSAGE" Line 46: SEND = "SEND" Line 47: SUBSCRIBE = "SUBSCRIBE" Line 48: UNSUBSCRIBE = "UNSUBSCRIBE"
Line 287: except IndexError: Line 288: return None Line 289: Line 290: Line 291: class Outbox(object):
can you place this refactor in its own patch?
Done Line 292: Line 293: def __init__(self): Line 294: self._outbox = deque() Line 295:
Line 402: return int(round(monotonic_time() * 1000)) Line 403: Line 404: Line 405: class AsyncClient(object): Line 406: log = logging.getLogger("yajsonrpc.stomp.AsyncClient")
unrelated log name change
Done Line 407: Line 408: def __init__(self): Line 409: self._connected = False Line 410: self._outbox = Outbox()
Line 516: self.queue_frame(Frame(Command.UNSUBSCRIBE, Line 517: {"id": sub.id})) Line 518: Line 519: Line 520: class _Subscription(object):
could you introduce this newly-used class in a separate patch?
Done Line 521: Line 522: def __init__(self, client, destination, subid, ack, message_handler): Line 523: self._ack = ack Line 524: self._subid = subid
https://gerrit.ovirt.org/#/c/36368/20/lib/yajsonrpc/stompReactor.py File lib/yajsonrpc/stompReactor.py:
Line 24: _STATE_LEN = "Waiting for message length" Line 25: _STATE_MSG = "Waiting for message" Line 26: Line 27: Line 28: _DEFAULT_RESPONSE_DESTINATIOM = "jms.topic.vdsm_legacy_responses"
please move the change to the patch that handle 3.5, 3.6+, and non-engine r
Done Line 29: _DEFAULT_REQUEST_DESTINATION = "jms.topic.vdsm_legacy_requests" Line 30: Line 31: _FAKE_SUB_ID = "__vdsm_fake_broker__" Line 32:
Line 87: ) Line 88: else: Line 89: resp = stomp.Frame(stomp.Command.CONNECTED, {"version": "1.2"}) Line 90: cx, cy = parseHeartBeatHeader( Line 91: frame.headers.get(stomp.Headers.HEARTEBEAT, "0,0")
even introduce Headers constants is simpler if left to its own patch.
Done Line 92: ) Line 93: Line 94: # Make sure the heart-beat interval is sane Line 95: if cy != 0:
automation@ovirt.org has posted comments on this change.
Change subject: stomp: client side subscription ......................................................................
Patch Set 21:
* Update tracker::IGNORE, no Bug-Url found * Check Bug-Url::WARN, no bug url found, make sure header matches 'Bug-Url: ' and is a valid url. * Check merged to previous::IGNORE, Not in stable branch (['ovirt-3.5', 'ovirt-3.4', 'ovirt-3.3'])
Piotr Kliczewski has posted comments on this change.
Change subject: stomp: client side subscription ......................................................................
Patch Set 21: Verified+1
Scope reduced, rebased and verified by installing a host and running a vm to see sent events.
automation@ovirt.org has posted comments on this change.
Change subject: stomp: client side subscription ......................................................................
Patch Set 22:
* Update tracker::IGNORE, no Bug-Url found * Check Bug-Url::WARN, no bug url found, make sure header matches 'Bug-Url: ' and is a valid url. * Check merged to previous::IGNORE, Not in stable branch (['ovirt-3.5', 'ovirt-3.4', 'ovirt-3.3'])
automation@ovirt.org has posted comments on this change.
Change subject: stomp: client side subscription ......................................................................
Patch Set 23:
* Update tracker::IGNORE, no Bug-Url found * Check Bug-Url::WARN, no bug url found, make sure header matches 'Bug-Url: ' and is a valid url. * Check merged to previous::IGNORE, Not in stable branch (['ovirt-3.5', 'ovirt-3.4', 'ovirt-3.3'])
Piotr Kliczewski has posted comments on this change.
Change subject: stomp: client side subscription ......................................................................
Patch Set 23: Verified+1
Verified by vdsm updated and see that there are no issues with communication.
automation@ovirt.org has posted comments on this change.
Change subject: stomp: client side subscription ......................................................................
Patch Set 24:
* Update tracker::IGNORE, no Bug-Url found * Check Bug-Url::WARN, no bug url found, make sure header matches 'Bug-Url: ' and is a valid url. * Check merged to previous::IGNORE, Not in stable branch (['ovirt-3.5', 'ovirt-3.4', 'ovirt-3.3'])
Piotr Kliczewski has posted comments on this change.
Change subject: stomp: client side subscription ......................................................................
Patch Set 24: Verified+1
Verified by running vdsm with the latest engine change. Tested vm status changes: - starting a vm - suspending a vm - resuming a vm
automation@ovirt.org has posted comments on this change.
Change subject: stomp: client side subscription ......................................................................
Patch Set 25:
* Update tracker::IGNORE, no Bug-Url found * Check Bug-Url::WARN, no bug url found, make sure header matches 'Bug-Url: ' and is a valid url. * Check merged to previous::IGNORE, Not in stable branch (['ovirt-3.5', 'ovirt-3.4', 'ovirt-3.3'])
automation@ovirt.org has posted comments on this change.
Change subject: stomp: client side subscription ......................................................................
Patch Set 26:
* Update tracker::IGNORE, no Bug-Url found * Check Bug-Url::WARN, no bug url found, make sure header matches 'Bug-Url: ' and is a valid url. * Check merged to previous::IGNORE, Not in stable branch (['ovirt-3.5', 'ovirt-3.4', 'ovirt-3.3'])
Piotr Kliczewski has posted comments on this change.
Change subject: stomp: client side subscription ......................................................................
Patch Set 26: Verified-1
This patch set is broken due to wrong squash with Outbox class.
automation@ovirt.org has posted comments on this change.
Change subject: stomp: client side subscription ......................................................................
Patch Set 27:
* Update tracker::IGNORE, no Bug-Url found * Check Bug-Url::WARN, no bug url found, make sure header matches 'Bug-Url: ' and is a valid url. * Check merged to previous::IGNORE, Not in stable branch (['ovirt-3.5', 'ovirt-3.4', 'ovirt-3.3'])
Piotr Kliczewski has posted comments on this change.
Change subject: stomp: client side subscription ......................................................................
Patch Set 27: Verified+1
Verified by running with latest engine containing event changes.
automation@ovirt.org has posted comments on this change.
Change subject: stomp: client side subscription ......................................................................
Patch Set 28:
* Update tracker::IGNORE, no Bug-Url found * Check Bug-Url::WARN, no bug url found, make sure header matches 'Bug-Url: ' and is a valid url. * Check merged to previous::IGNORE, Not in stable branch (['ovirt-3.5', 'ovirt-3.4', 'ovirt-3.3'])
automation@ovirt.org has posted comments on this change.
Change subject: stomp: client side subscription ......................................................................
Patch Set 29:
* Update tracker::IGNORE, no Bug-Url found * Check Bug-Url::WARN, no bug url found, make sure header matches 'Bug-Url: ' and is a valid url. * Check merged to previous::IGNORE, Not in stable branch (['ovirt-3.5', 'ovirt-3.4', 'ovirt-3.3'])
Yaniv Bronhaim has posted comments on this change.
Change subject: stomp: client side subscription ......................................................................
Patch Set 29: Code-Review+1
Yaniv Bronhaim has posted comments on this change.
Change subject: stomp: client side subscription ......................................................................
Patch Set 29:
(2 comments)
https://gerrit.ovirt.org/#/c/36368/29/lib/yajsonrpc/stomp.py File lib/yajsonrpc/stomp.py:
Line 289: Line 290: Line 291: class AsyncDispatcher(object): Line 292: log = logging.getLogger("stomp.AsyncDispatcher") Line 293: comment also in later patch - please add comment what are the frame_handlers objects that we can use here Line 294: def __init__(self, frame_handler, bufferSize=4096): Line 295: self._frame_handler = frame_handler Line 296: self._bufferSize = bufferSize Line 297: self._parser = Parser()
Line 355: if numSent == len(data): Line 356: self._outbuf = None Line 357: # Throw away the frame that was sent to the server Line 358: # we do not want to do it for partially processed Line 359: # messages. this comment is redundant imo (hopefully I didn't ask for that). and I would say - "clean message after handling" and "update length for partial sent messages" Line 360: self._frame_handler.pop_message() Line 361: else: Line 362: self._outbuf = data[numSent:] Line 363:
Piotr Kliczewski has posted comments on this change.
Change subject: stomp: client side subscription ......................................................................
Patch Set 29:
(2 comments)
https://gerrit.ovirt.org/#/c/36368/29/lib/yajsonrpc/stomp.py File lib/yajsonrpc/stomp.py:
Line 289: Line 290: Line 291: class AsyncDispatcher(object): Line 292: log = logging.getLogger("stomp.AsyncDispatcher") Line 293:
comment also in later patch - please add comment what are the frame_handler
Done Line 294: def __init__(self, frame_handler, bufferSize=4096): Line 295: self._frame_handler = frame_handler Line 296: self._bufferSize = bufferSize Line 297: self._parser = Parser()
Line 355: if numSent == len(data): Line 356: self._outbuf = None Line 357: # Throw away the frame that was sent to the server Line 358: # we do not want to do it for partially processed Line 359: # messages.
this comment is redundant imo (hopefully I didn't ask for that). and I woul
I think you asked for it :P Line 360: self._frame_handler.pop_message() Line 361: else: Line 362: self._outbuf = data[numSent:] Line 363:
automation@ovirt.org has posted comments on this change.
Change subject: stomp: client side subscription ......................................................................
Patch Set 30:
* Update tracker::IGNORE, no Bug-Url found * Check Bug-Url::WARN, no bug url found, make sure header matches 'Bug-Url: ' and is a valid url. * Check merged to previous::IGNORE, Not in stable branch (['ovirt-3.5', 'ovirt-3.4', 'ovirt-3.3'])
Piotr Kliczewski has posted comments on this change.
Change subject: stomp: client side subscription ......................................................................
Patch Set 30: Verified+1
Verified with latest engine containing event changes.
Yaniv Bronhaim has posted comments on this change.
Change subject: stomp: client side subscription ......................................................................
Patch Set 30: Code-Review+1
Dan Kenigsberg has posted comments on this change.
Change subject: stomp: client side subscription ......................................................................
Patch Set 30: Code-Review+2
Dan Kenigsberg has submitted this change and it was merged.
Change subject: stomp: client side subscription ......................................................................
stomp: client side subscription
In this patch we introduce concept of subscription for client perspective. We move queuing functionality out of AsyncDispatcher to frame_handler. New StompRpcClient class is responsible for sending subscriptions and ClientRpcTransportAdapter class adds 'reply-to' header using subscriptions id.
Change-Id: I8e43658f1cebd637ea3abf4396d388afa041ae71 Signed-off-by: Saggi Mizrahi smizrahi@redhat.com Signed-off-by: pkliczewski piotr.kliczewski@gmail.com Reviewed-on: https://gerrit.ovirt.org/36368 Continuous-Integration: Jenkins CI Reviewed-by: Yaniv Bronhaim ybronhei@redhat.com Reviewed-by: Dan Kenigsberg danken@redhat.com --- M lib/yajsonrpc/__init__.py M lib/yajsonrpc/stomp.py M lib/yajsonrpc/stompreactor.py M tests/jsonRpcHelper.py M tests/jsonRpcTests.py 5 files changed, 216 insertions(+), 88 deletions(-)
Approvals: Piotr Kliczewski: Verified Yaniv Bronhaim: Looks good to me, but someone else must approve Jenkins CI: Passed CI tests Dan Kenigsberg: Looks good to me, approved
automation@ovirt.org has posted comments on this change.
Change subject: stomp: client side subscription ......................................................................
Patch Set 31:
* Update tracker::IGNORE, no Bug-Url found * Set MODIFIED::IGNORE, no Bug-Url found.
vdsm-patches@lists.fedorahosted.org