r4896 - trunk/sage/python/sage/wallaby
by tmckay@fedoraproject.org
Author: tmckay
Date: 2011-07-29 20:05:00 +0000 (Fri, 29 Jul 2011)
New Revision: 4896
Modified:
trunk/sage/python/sage/wallaby/wallabyoperations.py
Log:
Updates to WallabyOperations interface and functionality.
Modified: trunk/sage/python/sage/wallaby/wallabyoperations.py
===================================================================
--- trunk/sage/python/sage/wallaby/wallabyoperations.py 2011-07-28 12:48:36 UTC (rev 4895)
+++ trunk/sage/python/sage/wallaby/wallabyoperations.py 2011-07-29 20:05:00 UTC (rev 4896)
@@ -9,14 +9,47 @@
try:
import wallaby
from wallaby_collections import StorePatches
+ from tagging import StorePatches, NodePatches
except:
imports_ok = False
log = logging.getLogger("sage.wallaby")
+class WBTypes():
+ '''
+ Wrap symbolic names for data items tracked by WallabyOperations.
+ May be helpful in avoiding typos, good for interpreter help
+ '''
+ NODES = "nodes"
+ GROUPS = "groups"
+ TAGS = "tags"
+
if imports_ok:
class WallabyOperations(object):
+ '''
+ Wrapper around the Wallaby client library.
+ '''
def __init__(self, broker_uri, refresh_interval=None):
+ '''
+ Constructor.
+
+ broker_uri -- the URI used to connect to a QMF message broker
+ where a Wallaby agent is connected. The simplest URI is just a
+ hostname but a full URI can specify scheme://user/password@host:port
+ or a subset of those components as long as the host is included.
+ Examples:
+
+ localhost
+ localhost:5672
+ amqp://fred/flintsone@quarry.bedrock.com:1234
+
+ refresh_interval -- default refresh interval in seconds for all items
+ maintained by WallabyOperations' internal caching thread. A value of
+ None causes the caching thread to wait forever before refreshing an
+ item after a successful call unless the refresh() method is used.
+ The refresh interval may be set for items individually with the
+ set_interval() method.
+ '''
self.broker_uri = broker_uri
# A wallaby Store object
@@ -26,57 +59,40 @@
self._broker = None
# The cache maintenance thread
- self.maintain_cache = None
+ self._maintain_cache = None
# Stop the maintenance thread
self._stop = False
- # Length of time the maintenance thread will sleep
- # (in seconds) before it next attempts to refresh cached
- # objects after a successful attempt. Failed attempts
- # result in a 5 second retry time.
- self.refresh_interval = refresh_interval
+ # Cached data. Each of the keys in this dictionary is the name of
+ # an attribute on the Wallaby Store object.
+ self._cache = {WBTypes.NODES: self.CacheData(refresh_interval),
+ WBTypes.GROUPS: self.CacheData(refresh_interval)}
- # Cached node list
- self.nodes = []
+ # Lock is used for synchronization with the caching thread and
+ # for thread safety of any and all data that could be accessed
+ # by multiple threads.
+ self._lock = Lock()
+ self._condition = Condition(self._lock)
- # Protect cached info, give the cache maintenance
- # thread a condition to sleep on that it may be woken
- # up early.
- self.lock = Lock()
- self.condition = Condition(self.lock)
-
- def stop(self, wait=False, timeout=None):
+ def start(self, retry_secs=5):
'''
- Wake the caching thread if asleep and cause it to exit.
- If wait is True, the call will return after the thread
- has exited or timeout seconds has passed if timeout is
- not None.
- '''
- self._stop = True
- self.refresh()
- if wait:
- self.maintain_cache.join(timeout)
- log.debug("WallabyOperations: stopped cache maintenance thread")
-
- def refresh(self):
- '''
- Wake the caching thread if asleep and cause it to iterate.
- '''
- self.condition.acquire()
- self.condition.notify()
- self.condition.release()
-
- def start(self):
- '''
Start the caching thread.
This thread will attempt to connect to the broker and retrieve
- a wallaby Store object. If successful, it will periodically
- retrieve and cache necessary wallaby objects.
+ a Store object from the Wallaby agent. If successful, it will
+ periodically retrieve and cache data from Wallaby.
Only one caching thread may run at a time. The thread may
be restarted if it has previously been stopped.
+
+ Note, for the moment start() and stop() are not thread safe. They
+ should only be called from a single thread.
+
+ retry_secs -- how often the caching thread will retry failed
+ operations. This includes attempts to connect to the broker
+ and retrieve a Store object as well as calls to Wallaby that
+ return no data.
'''
# The connection to the broker can actually take a long
# time to complete. We don't want to hang a calling function,
@@ -95,6 +111,7 @@
# function, but it refers to the WallabyOperations object
# so the code reads nicely
def maintain_cache(self):
+
# Get initinal connection and Store obect
self.session = Session(manageConnections=True)
self.broker = self.session.addBroker(self.broker_uri)
@@ -104,71 +121,218 @@
log.debug("WallabyOperations: found wallaby store object")
break
- # Allow us to be woken up early...
- self.condition.acquire()
- self.condition.wait(5)
- self.condition.release()
+ # Check stop inside the lock to make sure that we don't miss
+ # a signal or a "stop" that was set while we were iterating.
+ self._condition.acquire()
+ if not self._stop:
+ self._condition.wait(retry_secs)
+ self._condition.release()
- # Okay, we have a Store object so now we can
- # get node lists and such
+ # Init remaining time til next update to 0 for each
+ # cached item in case the thread was restarted
+ for attr, val in self._cache.iteritems():
+ val.remaining = 0
+
+ # Okay, now we're ready to retrieve data
while not self._stop:
- # Uses API extensions to __getattr__ on the Store to
- # retrieve a list of nodes from the Broker and wrap them
- # in Wallaby proxy objects. This might take a while.
- try:
- nodes = self._store.nodes
- except:
- nodes = []
- self._set_node_list(nodes)
- if nodes in ([], None):
- # Hmm, try again shortly
- sleep = 5
- else:
- # We found some nodes so cache them and
- # sleep for a normal interval
- sleep = self.refresh_interval
- self.condition.acquire()
- self.condition.wait(sleep)
- self.condition.release()
+ start_processing = time.time()
+ for attr, val in self._cache.iteritems():
+ # val.remaining is the number of seconds left before
+ # the next update of this data item. None is "forever".
+ if val.remaining is not None and val.remaining <= 0:
+ log.debug("WallabyOperations: refreshing %s" % attr)
+ try:
+ # Wallaby API uses extensions to __getattr__ on
+ # the Store to retrieve objects from the Broker
+ # and return a list of proxy objects.
+ d = getattr(self._store, attr, [])
+ except:
+ d = []
- # And wipe out the node list if we have been
- # stopped....
- self._set_node_list([])
+ # If the data is empty, _set_cache will leave the
+ # remaining field set to 0 for the attribute so we
+ # will try to get it again on our next retry.
+ # Otherwise, remaining will be reset to the full
+ # interval for this attribute.
+ self._set_cache(attr, d)
- if self.maintain_cache is not None and \
- self.maintain_cache.isAlive():
+ log.debug("WallabyOperations: refresh processing time %s" \
+ % (time.time() - start_processing))
+
+ # Find out how long we should sleep for.
+ # Based on min remaining times for all items
+ # If minimum is 0 because we have items waiting
+ # for a retry, we fall back on retry_secs as a minimum.
+ sleep_time = self._find_min_remaining(min=retry_secs)
+
+ self._condition.acquire()
+ if not self._stop:
+ # Could be signaled, so track the actual sleep time
+ log.debug("WallabyOperations: cache thread sleeping for"\
+ " %s seconds" % sleep_time)
+ bed_time = time.time()
+ self._condition.wait(sleep_time)
+ slept = time.time() - bed_time
+ log.debug("WallabyOperations: cache thread slept for"\
+ " %s seconds" % slept)
+
+ # When we wake up from sleep here, we already
+ # have the lock so we might as well check refresh
+ # and adjust the "remaining" values
+ for attr, val in self._cache.iteritems():
+ if val.refresh: # Force an update
+ val.remaining = 0
+ val.refresh = False
+ elif val.remaining is not None:
+ val.remaining -= slept
+ self._condition.release()
+
+ # Clear cache if we have been stopped....
+ for attr in self._cache:
+ self._set_cache(attr, [])
+ #end maintain_cache
+
+ if self._maintain_cache is not None and \
+ self._maintain_cache.isAlive():
# No, you can't start another one.
return False
self._stop = False
- self.maintain_cache = CallThread(maintain_cache, None, self)
- self.maintain_cache.daemon = True
- self.maintain_cache.start()
+ self._maintain_cache = CallThread(maintain_cache, None, self)
+ self._maintain_cache.daemon = True
+ self._maintain_cache.start()
log.debug("WallabyOperations: start cache maintenance thread")
return True
- def get_node_list(self):
+ def stop(self, wait=False, timeout=None):
'''
- Return the latest list of wallaby.Node objects retrieved by
- the caching thread.
+ Stop the caching thread.
+
+ Wake the caching thread if asleep and cause it to exit.
+ The thread may be restarted again with a call to start()
+ once it has successfully exited. On exit, the thread will
+ null out cached data.
+
+ wait -- if True the call will block until the thread exits or
+ "timeout" seconds has passed if "timeout" is not None.
+
+ timeout -- how long to wait for the thread to exit if "wait" is True.
+ A value of None means wait forever.
+
+ Note, for the moment start() and stop() are not thread safe. They
+ should only be called from a single thread.
'''
- # Who really knows for sure how the GIL and byte code
- # works in Python? Play it safe, bump the reference
- # count on self.nodes inside the lock and return.
- self.lock.acquire()
- n = self.nodes
- self.lock.release()
- return n
+ if self.maintain_cache is not None:
+ self._condition.acquire()
+ self._stop = True
+ self._condition.notify()
+ self._condition.release()
+ if wait:
+ self._maintain_cache.join(timeout)
+ log.debug("WallabyOperations: stopped cache maintenance thread")
+ def refresh(self, items=()):
+ '''
+ Wake the caching thread if asleep and cause it to iterate.
+
+ items -- what data to refresh. If "items" is an empty
+ tuple, refresh all data otherwise refresh only the data specified.
+ Attributes of WBTypes define valid values for elements of "items"
+ '''
+ self._condition.acquire()
+ if len(items) == 0:
+ do_notify = True
+ for attr, val in self._cache.iteritems():
+ val.refresh = True
+ else:
+ do_notify = False
+ for attr in items:
+ if attr == WBTypes.TAGS:
+ # Tags are really just groups with a filter
+ # in get_data()
+ attr = WBTypes.GROUPS
+ if attr in self._cache:
+ do_notify = True
+ self._cache[attr].refresh = True
+
+ if do_notify:
+ self._condition.notify()
+ self._condition.release()
+
+ def get_data(self, which):
+ '''
+ Return the most recently cached value for the specified data.
+
+ which -- specifies the data item. Attributes of WBTypes
+ define valid values for "which"
+ '''
+ d = []
+ if which == WBTypes.TAGS:
+ # Tags are special for the time being.
+ # Tags are defined as non-internal groups with no parameters.
+ # Get groups and filter.
+ target = WBTypes.GROUPS
+ else:
+ target = which
+
+ if target in self._cache:
+ self._lock.acquire()
+ d = self._cache[target].data
+ self._lock.release()
+
+ if which == WBTypes.TAGS:
+ groups = d
+ d = []
+ for g in groups:
+ if g.getConfig() == {} and not g.name.startswith("+++"):
+ d.append(g)
+ return d
+
+ def set_interval(self, which, refresh):
+ '''
+ Set an individual refresh interval for a data item.
+
+ The interval set here will override the initial default
+ interval for the item created in WallabyOperations' constructor.
+
+ which -- specifies the data item. Attributes of WBTypes
+ define valid values for "which"
+
+ refresh -- the interval in seconds. If None, the specified item
+ will be updated iff refresh() is called.
+ '''
+ if which == WBTypes.TAGS:
+ which = WBTypes.GROUPS
+
+ if which in self._cache:
+ self._lock.acquire()
+ self._cache[which].interval = refresh
+ self._lock.release()
+
# Super secret private implementation stuff. Don't look!
- def _set_node_list(self, nodes):
- self.lock.acquire()
- self.nodes = nodes
- self.lock.release()
- log.debug("WallabyOperations: Node list updated (%s nodes)" % len(nodes))
+ def _find_min_remaining(self, min):
+ # None indicates forever, the biggest value
+ # Note though that None < int is True in Python!
+ t = None
+ for attr, val in self._cache.iteritems():
+ if t == None or \
+ (val.remaining is not None and val.remaining < t):
+ t = val.remaining
+ # Put a floor on this
+ if t is not None and t < min:
+ t = min
+ return t
+
+ def _set_cache(self, attr, data):
+ self._lock.acquire()
+ self._cache[attr].data = data
+ self._cache[attr].reset_remaining(len(data) == 0)
+ self._lock.release()
+ log.debug("WallabyOperations: %s list updated (%s items)" % (attr, len(data)))
+
def _get_store(self):
# Ideally there should only be a single Store object from a single agent.
# We constrain the search a bit, hopefully more efficient
@@ -185,52 +349,46 @@
except:
pass
return store
+
+ class CacheData(object):
+ def __init__(self, interval):
+ # This attribure is only referenced from the cache thread
+ # so it does not need to be protected
+ self.remaining = 0
+
+ # Use of these these items need to be protected by the lock
+ self.refresh = False
+ self.interval = interval
+ self.data = []
+
+ def reset_remaining(self, retry):
+ # If retry is True, we want to try to get the data
+ # at the next opportunity, otherwise we will wait.
+ if retry:
+ self.remaining = 0
+ else:
+ self.remaining = self.interval
+
else:
class WallabyOperations(object):
'''
Dummy object when wallaby imports fail
'''
- def __init__(self, broker_uri, refresh_interval=60):
- self.broker_uri = broker_uri
- self.refresh_interval = refresh_interval
+ def __init__(self, *args, **kwargs):
+ pass
- def stop(self, wait=False, timeout=None):
- '''
- Wake the caching thread if asleep and cause it to exit.
- If wait is True, the call will return after the thread
- has exited or timeout seconds has passed if timeout is
- not None.
- '''
+ def start(self, *args, **kwargs):
+ log.debug("WallabyOperations: uing dummy implementation, imports failed")
+ return False
+
+ def stop(self, *args, **kwargs):
pass
- def refresh(self):
- '''
- Wake the caching thread if asleep and cause it to iterate.
- '''
+ def refresh(self, *args, **kwargs):
pass
- def start(self):
- '''
- Start the caching thread.
-
- This thread will attempt to connect to the broker and retrieve
- a wallaby Store object. If successful, it will periodically
- retrieve and cache necessary wallaby objects.
-
- Only one caching thread may run at a time. The thread may
- be restarted if it has previously been stopped.
- '''
- log.debug("WallabyOperations: using dummy implementation, imports failed")
- return False
-
- def get_node_list(self):
- '''
- Return the latest list of wallaby.Node objects retrieved by
- the caching thread.
- '''
+ def get_data(self, *args, **kwargs):
return []
-
-
-
-
+ def set_interval(self, *args, **kwargs):
+ pass
12 years, 9 months
r4895 - in trunk: cumin/bin sage/python/sage/wallaby
by tmckay@fedoraproject.org
Author: tmckay
Date: 2011-07-28 12:48:36 +0000 (Thu, 28 Jul 2011)
New Revision: 4895
Modified:
trunk/cumin/bin/cumin-web
trunk/sage/python/sage/wallaby/wallabyoperations.py
Log:
Make refresh_interval None by default in WallabyOperations. Cache thread will block indefinitely after successful calls to Wallaby unless refresh() is called.
Modified: trunk/cumin/bin/cumin-web
===================================================================
--- trunk/cumin/bin/cumin-web 2011-07-27 20:41:25 UTC (rev 4894)
+++ trunk/cumin/bin/cumin-web 2011-07-28 12:48:36 UTC (rev 4895)
@@ -28,8 +28,13 @@
cumin.wallaby_broker = brokers[0]
else:
cumin.wallaby_broker = values.wallaby_broker
+ # Let 0 indicate "no timeout", since the timeout
+ # value is an int in the config and None can't be
+ # specified
cumin.wallaby_refresh = values.wallaby_refresh
-
+ if cumin.wallaby_refresh == 0:
+ cumin.wallaby_refresh = None
+
def main():
# Do our own simple option check so we can redirect IO early
# without worrying about other options or the behavior of optParse
Modified: trunk/sage/python/sage/wallaby/wallabyoperations.py
===================================================================
--- trunk/sage/python/sage/wallaby/wallabyoperations.py 2011-07-27 20:41:25 UTC (rev 4894)
+++ trunk/sage/python/sage/wallaby/wallabyoperations.py 2011-07-28 12:48:36 UTC (rev 4895)
@@ -16,7 +16,7 @@
if imports_ok:
class WallabyOperations(object):
- def __init__(self, broker_uri, refresh_interval=60):
+ def __init__(self, broker_uri, refresh_interval=None):
self.broker_uri = broker_uri
# A wallaby Store object
12 years, 9 months
r4894 - in trunk: cumin/bin cumin/python/cumin sage/python/sage sage/python/sage/wallaby
by tmckay@fedoraproject.org
Author: tmckay
Date: 2011-07-27 20:41:25 +0000 (Wed, 27 Jul 2011)
New Revision: 4894
Modified:
trunk/cumin/bin/cumin-web
trunk/cumin/python/cumin/config.py
trunk/cumin/python/cumin/main.py
trunk/sage/python/sage/util.py
trunk/sage/python/sage/wallaby/wallabyoperations.py
Log:
More wallaby interface stuff.
Modified: trunk/cumin/bin/cumin-web
===================================================================
--- trunk/cumin/bin/cumin-web 2011-07-27 13:27:17 UTC (rev 4893)
+++ trunk/cumin/bin/cumin-web 2011-07-27 20:41:25 UTC (rev 4894)
@@ -22,6 +22,14 @@
cumin.aviary_job_port = values.aviary_job_port
cumin.aviary_query_port = values.aviary_query_port
+
+def set_wallaby_configs(cumin, values, brokers):
+ if values.wallaby_broker == "":
+ cumin.wallaby_broker = brokers[0]
+ else:
+ cumin.wallaby_broker = values.wallaby_broker
+ cumin.wallaby_refresh = values.wallaby_refresh
+
def main():
# Do our own simple option check so we can redirect IO early
# without worrying about other options or the behavior of optParse
@@ -89,7 +97,6 @@
values.log_max_archives)
broker_uris = [x.strip() for x in opts.brokers.split(",")]
-
cumin = Cumin(config.get_home(), broker_uris, opts.database,
opts.host, opts.port, values.persona)
@@ -97,6 +104,8 @@
cumin.sasl_mech_list = values.sasl_mech_list.upper()
set_aviary_configs(cumin, values)
+ set_wallaby_configs(cumin, values, broker_uris)
+
cumin.debug = opts.debug
cumin.user = values.user
cumin.update_interval = values.update_interval
@@ -104,7 +113,6 @@
cumin.fast_view_attributes = [x.strip() for x in values.fast_view_attributes.split(',')]
-
# set default values for form inputs
cumin.set_form_defaults(values.request_memory,
values.request_memory_vm,
Modified: trunk/cumin/python/cumin/config.py
===================================================================
--- trunk/cumin/python/cumin/config.py 2011-07-27 13:27:17 UTC (rev 4893)
+++ trunk/cumin/python/cumin/config.py 2011-07-27 20:41:25 UTC (rev 4894)
@@ -149,6 +149,12 @@
# previous behavior
param = ConfigParameter(self, "sasl-mech-list", str)
+ param = ConfigParameter(self, "wallaby-broker", str)
+ param.default = ""
+
+ param = ConfigParameter(self, "wallaby-refresh", int)
+ param.default = 60
+
param = ConfigParameter(self, "use-aviary", bool)
param.default = True
Modified: trunk/cumin/python/cumin/main.py
===================================================================
--- trunk/cumin/python/cumin/main.py 2011-07-27 13:27:17 UTC (rev 4893)
+++ trunk/cumin/python/cumin/main.py 2011-07-27 20:41:25 UTC (rev 4894)
@@ -91,6 +91,10 @@
self.aviary_job_port = None
self.aviary_query_port = None
+ self.wallaby = None
+ self.wallaby_broker = None
+ self.wallaby_refresh = 60
+
def server_alive(self):
return self.server.server_alive()
@@ -183,8 +187,13 @@
self.remote.add_mechanisms(ops)
# Create RPC interface for Wallaby
- self.wallaby = WallabyOperations("wallaby", self.session)
-
+ # The Wallaby client API needs a broker connection. The broker host
+ # for Wallaby is not necessarily the same as the broker used by
+ # cumin-web for grid data, and flags set on the Session for cumin
+ # may not be appropriate for the Wallaby API. So, it gets its own.
+ self.wallaby = WallabyOperations(self.wallaby_broker,
+ self.wallaby_refresh)
+
self.model.init()
self.session.init()
self.database.init()
@@ -227,6 +236,7 @@
self.session.start()
self.server.start()
+ self.wallaby.start()
def stop(self):
log.info("Stopping %s", self)
Modified: trunk/sage/python/sage/util.py
===================================================================
--- trunk/sage/python/sage/util.py 2011-07-27 13:27:17 UTC (rev 4893)
+++ trunk/sage/python/sage/util.py 2011-07-27 20:41:25 UTC (rev 4894)
@@ -108,7 +108,7 @@
'''
Simple thread that executes result = routine(args).
This thread is useful for calling synchronous routines asynchronously.
- Result is returned by calling callback(result).
+ Result is returned by calling callback(result) if callback is not None.
If routine raises an exception, callback(exception) is called.
'''
def __init__(self, routine, callback, *args, **kwargs):
@@ -124,7 +124,8 @@
result = self.routine(*self.args, **self.kwargs)
except Exception, e:
result = e
- self.callback(result)
+ if self.callback is not None:
+ self.callback(result)
class ObjectPool(object):
'''
Modified: trunk/sage/python/sage/wallaby/wallabyoperations.py
===================================================================
--- trunk/sage/python/sage/wallaby/wallabyoperations.py 2011-07-27 13:27:17 UTC (rev 4893)
+++ trunk/sage/python/sage/wallaby/wallabyoperations.py 2011-07-27 20:41:25 UTC (rev 4894)
@@ -1,9 +1,236 @@
import logging
+import time
+from qmf.console import Session
+from sage.util import CallThread
+from threading import Lock, Condition
+
+imports_ok = True
+try:
+ import wallaby
+ from wallaby_collections import StorePatches
+except:
+ imports_ok = False
+
log = logging.getLogger("sage.wallaby")
-class WallabyOperations(object):
- def __init__(self, name, session):
+if imports_ok:
+ class WallabyOperations(object):
+ def __init__(self, broker_uri, refresh_interval=60):
+ self.broker_uri = broker_uri
- self.name = name
- self.session = session
+ # A wallaby Store object
+ self._store = None
+
+ # A QMF broker
+ self._broker = None
+
+ # The cache maintenance thread
+ self.maintain_cache = None
+
+ # Stop the maintenance thread
+ self._stop = False
+
+ # Length of time the maintenance thread will sleep
+ # (in seconds) before it next attempts to refresh cached
+ # objects after a successful attempt. Failed attempts
+ # result in a 5 second retry time.
+ self.refresh_interval = refresh_interval
+
+ # Cached node list
+ self.nodes = []
+
+ # Protect cached info, give the cache maintenance
+ # thread a condition to sleep on that it may be woken
+ # up early.
+ self.lock = Lock()
+ self.condition = Condition(self.lock)
+
+ def stop(self, wait=False, timeout=None):
+ '''
+ Wake the caching thread if asleep and cause it to exit.
+ If wait is True, the call will return after the thread
+ has exited or timeout seconds has passed if timeout is
+ not None.
+ '''
+ self._stop = True
+ self.refresh()
+ if wait:
+ self.maintain_cache.join(timeout)
+ log.debug("WallabyOperations: stopped cache maintenance thread")
+
+ def refresh(self):
+ '''
+ Wake the caching thread if asleep and cause it to iterate.
+ '''
+ self.condition.acquire()
+ self.condition.notify()
+ self.condition.release()
+
+ def start(self):
+ '''
+ Start the caching thread.
+
+ This thread will attempt to connect to the broker and retrieve
+ a wallaby Store object. If successful, it will periodically
+ retrieve and cache necessary wallaby objects.
+
+ Only one caching thread may run at a time. The thread may
+ be restarted if it has previously been stopped.
+ '''
+ # The connection to the broker can actually take a long
+ # time to complete. We don't want to hang a calling function,
+ # so we handle the connection and retrieval of the
+ # initial Store object from Wallaby in a thread.
+ # (There may need to be more work here if the broker or wallaby
+ # going away and coming back causes a problem, but with
+ # manageConnections=True and well-known agent/object ids for
+ # Wallaby it appears to recover on its own...)
+
+ # Similarly, getting node lists etc may take a long time
+ # especially over a slow network. So we use the same thread
+ # to retrieve things like node lists at defined intervals.
+
+ # 'self' here is really a term of art since this is a local
+ # function, but it refers to the WallabyOperations object
+ # so the code reads nicely
+ def maintain_cache(self):
+ # Get initinal connection and Store obect
+ self.session = Session(manageConnections=True)
+ self.broker = self.session.addBroker(self.broker_uri)
+ while not self._stop:
+ self._store = self._get_store()
+ if self._store is not None:
+ log.debug("WallabyOperations: found wallaby store object")
+ break
+
+ # Allow us to be woken up early...
+ self.condition.acquire()
+ self.condition.wait(5)
+ self.condition.release()
+
+ # Okay, we have a Store object so now we can
+ # get node lists and such
+ while not self._stop:
+ # Uses API extensions to __getattr__ on the Store to
+ # retrieve a list of nodes from the Broker and wrap them
+ # in Wallaby proxy objects. This might take a while.
+ try:
+ nodes = self._store.nodes
+ except:
+ nodes = []
+ self._set_node_list(nodes)
+ if nodes in ([], None):
+ # Hmm, try again shortly
+ sleep = 5
+ else:
+ # We found some nodes so cache them and
+ # sleep for a normal interval
+ sleep = self.refresh_interval
+
+ self.condition.acquire()
+ self.condition.wait(sleep)
+ self.condition.release()
+
+ # And wipe out the node list if we have been
+ # stopped....
+ self._set_node_list([])
+
+ if self.maintain_cache is not None and \
+ self.maintain_cache.isAlive():
+ # No, you can't start another one.
+ return False
+
+ self._stop = False
+ self.maintain_cache = CallThread(maintain_cache, None, self)
+ self.maintain_cache.daemon = True
+ self.maintain_cache.start()
+ log.debug("WallabyOperations: start cache maintenance thread")
+ return True
+
+ def get_node_list(self):
+ '''
+ Return the latest list of wallaby.Node objects retrieved by
+ the caching thread.
+ '''
+ # Who really knows for sure how the GIL and byte code
+ # works in Python? Play it safe, bump the reference
+ # count on self.nodes inside the lock and return.
+ self.lock.acquire()
+ n = self.nodes
+ self.lock.release()
+ return n
+
+ # Super secret private implementation stuff. Don't look!
+
+ def _set_node_list(self, nodes):
+ self.lock.acquire()
+ self.nodes = nodes
+ self.lock.release()
+ log.debug("WallabyOperations: Node list updated (%s nodes)" % len(nodes))
+
+ def _get_store(self):
+ # Ideally there should only be a single Store object from a single agent.
+ # We constrain the search a bit, hopefully more efficient
+ store = None
+ try:
+ agents = [agent for agent in self.session.getAgents() \
+ if "com.redhat.grid.config:Store" in agent.label]
+ for agent in agents:
+ s = self.session.getObjects(_agent = agent, _class = "Store")
+ if len(s) > 0:
+ # And finally wrap the QMF object in a wallaby.Store wrapper
+ store = wallaby.Store(s[0], self.session)
+ break
+ except:
+ pass
+ return store
+else:
+ class WallabyOperations(object):
+ '''
+ Dummy object when wallaby imports fail
+ '''
+ def __init__(self, broker_uri, refresh_interval=60):
+ self.broker_uri = broker_uri
+ self.refresh_interval = refresh_interval
+
+ def stop(self, wait=False, timeout=None):
+ '''
+ Wake the caching thread if asleep and cause it to exit.
+ If wait is True, the call will return after the thread
+ has exited or timeout seconds has passed if timeout is
+ not None.
+ '''
+ pass
+
+ def refresh(self):
+ '''
+ Wake the caching thread if asleep and cause it to iterate.
+ '''
+ pass
+
+ def start(self):
+ '''
+ Start the caching thread.
+
+ This thread will attempt to connect to the broker and retrieve
+ a wallaby Store object. If successful, it will periodically
+ retrieve and cache necessary wallaby objects.
+
+ Only one caching thread may run at a time. The thread may
+ be restarted if it has previously been stopped.
+ '''
+ log.debug("WallabyOperations: using dummy implementation, imports failed")
+ return False
+
+ def get_node_list(self):
+ '''
+ Return the latest list of wallaby.Node objects retrieved by
+ the caching thread.
+ '''
+ return []
+
+
+
+
+
12 years, 9 months
r4893 - branches/clarity/cumin/python/cumin/usergrid
by tmckay@fedoraproject.org
Author: tmckay
Date: 2011-07-27 13:27:17 +0000 (Wed, 27 Jul 2011)
New Revision: 4893
Modified:
branches/clarity/cumin/python/cumin/usergrid/widgets.py
Log:
Merge revision 4891 from trunk.
BZ699732
Modified: branches/clarity/cumin/python/cumin/usergrid/widgets.py
===================================================================
--- branches/clarity/cumin/python/cumin/usergrid/widgets.py 2011-07-26 12:48:11 UTC (rev 4892)
+++ branches/clarity/cumin/python/cumin/usergrid/widgets.py 2011-07-27 13:27:17 UTC (rev 4893)
@@ -112,7 +112,7 @@
self.add_filter(self.user, self.cls.Owner, cls.name)
frame = "main.submissions.submission"
- col = ObjectLinkColumn(app, "name", self.cls.Name, self.cls._id, frame)
+ col = self.UserSubmissionObjectLinkColumn(app, "name", self.cls.Name, self.cls._id, frame)
self.insert_column(0, col)
self.add_search_filter(col)
@@ -126,6 +126,13 @@
self.links.add_child(link)
self.enable_csv_export(user)
+
+ class UserSubmissionObjectLinkColumn(ObjectLinkColumn):
+ def render_cell_content(self, session, record):
+ retval = len(record) > 0 and record[self.field.index] or ""
+ if(len(record[self.field.index]) > 50):
+ retval = record[self.field.index][:50] + "..." #indicate that we truncated the name
+ return retval
class UserJobStatSet(NewStatSet):
def __init__(self, app, name, user):
12 years, 9 months
r4892 - trunk/cumin/python/cumin
by croberts@fedoraproject.org
Author: croberts
Date: 2011-07-26 12:48:11 +0000 (Tue, 26 Jul 2011)
New Revision: 4892
Modified:
trunk/cumin/python/cumin/charts.py
trunk/cumin/python/cumin/stat.py
Log:
Changing the "Mega" m to M per BZ 725473.
Modified: trunk/cumin/python/cumin/charts.py
===================================================================
--- trunk/cumin/python/cumin/charts.py 2011-07-25 15:44:13 UTC (rev 4891)
+++ trunk/cumin/python/cumin/charts.py 2011-07-26 12:48:11 UTC (rev 4892)
@@ -366,7 +366,7 @@
value = int(round(value + self.y_min))
if value >= 1000000:
- svalue = "%.2fm" % (round(value / 1000000.0, 2))
+ svalue = "%.2fM" % (round(value / 1000000.0, 2))
elif value >= 10000:
svalue = "%ik" % int(round(value / 1000.0, -1))
else:
Modified: trunk/cumin/python/cumin/stat.py
===================================================================
--- trunk/cumin/python/cumin/stat.py 2011-07-25 15:44:13 UTC (rev 4891)
+++ trunk/cumin/python/cumin/stat.py 2011-07-26 12:48:11 UTC (rev 4892)
@@ -942,7 +942,7 @@
value = int(round(i * y_step, 0))
if value >= 1000000:
- svalue = "%.2fm" % (round(value / 1000000.0, 2))
+ svalue = "%.2fM" % (round(value / 1000000.0, 2))
elif value >= 10000:
svalue = "%ik" % int(round(value / 1000.0, -1))
else:
12 years, 9 months
r4891 - trunk/cumin/python/cumin/usergrid
by croberts@fedoraproject.org
Author: croberts
Date: 2011-07-25 15:44:13 +0000 (Mon, 25 Jul 2011)
New Revision: 4891
Modified:
trunk/cumin/python/cumin/usergrid/widgets.py
Log:
Truncating the submission name for the GridUser view of submissions per BZ 699732.
Modified: trunk/cumin/python/cumin/usergrid/widgets.py
===================================================================
--- trunk/cumin/python/cumin/usergrid/widgets.py 2011-07-22 20:40:31 UTC (rev 4890)
+++ trunk/cumin/python/cumin/usergrid/widgets.py 2011-07-25 15:44:13 UTC (rev 4891)
@@ -112,7 +112,7 @@
self.add_filter(self.user, self.cls.Owner, cls.name)
frame = "main.submissions.submission"
- col = ObjectLinkColumn(app, "name", self.cls.Name, self.cls._id, frame)
+ col = self.UserSubmissionObjectLinkColumn(app, "name", self.cls.Name, self.cls._id, frame)
self.insert_column(0, col)
self.add_search_filter(col)
@@ -126,6 +126,13 @@
self.links.add_child(link)
self.enable_csv_export(user)
+
+ class UserSubmissionObjectLinkColumn(ObjectLinkColumn):
+ def render_cell_content(self, session, record):
+ retval = len(record) > 0 and record[self.field.index] or ""
+ if(len(record[self.field.index]) > 50):
+ retval = record[self.field.index][:50] + "..." #indicate that we truncated the name
+ return retval
class UserJobStatSet(NewStatSet):
def __init__(self, app, name, user):
12 years, 9 months
r4890 - trunk/sage/python/sage/wallaby
by tmckay@fedoraproject.org
Author: tmckay
Date: 2011-07-22 20:40:31 +0000 (Fri, 22 Jul 2011)
New Revision: 4890
Added:
trunk/sage/python/sage/wallaby/wallabyoperations.py
Log:
Forgot operations.
Added: trunk/sage/python/sage/wallaby/wallabyoperations.py
===================================================================
--- trunk/sage/python/sage/wallaby/wallabyoperations.py (rev 0)
+++ trunk/sage/python/sage/wallaby/wallabyoperations.py 2011-07-22 20:40:31 UTC (rev 4890)
@@ -0,0 +1,9 @@
+import logging
+
+log = logging.getLogger("sage.wallaby")
+
+class WallabyOperations(object):
+ def __init__(self, name, session):
+
+ self.name = name
+ self.session = session
12 years, 9 months
r4889 - in trunk: cumin/python/cumin sage sage/python/sage sage/python/sage/aviary sage/python/sage/wallaby
by tmckay@fedoraproject.org
Author: tmckay
Date: 2011-07-22 20:39:38 +0000 (Fri, 22 Jul 2011)
New Revision: 4889
Added:
trunk/sage/python/sage/wallaby/
trunk/sage/python/sage/wallaby/__init__.py
Modified:
trunk/cumin/python/cumin/main.py
trunk/sage/Makefile
trunk/sage/python/sage/aviary/aviaryoperations.py
Log:
Initial wallaby stub for cumin. No functionality yet.
Modified: trunk/cumin/python/cumin/main.py
===================================================================
--- trunk/cumin/python/cumin/main.py 2011-07-22 20:11:15 UTC (rev 4888)
+++ trunk/cumin/python/cumin/main.py 2011-07-22 20:39:38 UTC (rev 4889)
@@ -23,6 +23,7 @@
from widgets import *
from sage.catalog import Catalog
from sage.qmf.qmfoperations import QmfOperations
+from sage.wallaby.wallabyoperations import WallabyOperations
from wooly import Session
from cumin.stat import PieChartPage
from cumin.widgets import AboutPage
@@ -157,7 +158,11 @@
log.info("Initializing %s", self)
# Create RPC interfaces for QMF and aviary.
- # Aviary takes precedence if added
+ # These service have overlapping functionality,
+ # so they are wrapped in a sage.Catalog object
+ # which allows both to supply operations. First
+ # service in the list takes precedence for any
+ # given op...
self.remote = Catalog()
ops = [QmfOperations("qmf", self.session)]
if self.use_aviary:
@@ -177,6 +182,9 @@
self.aviary_query_port))
self.remote.add_mechanisms(ops)
+ # Create RPC interface for Wallaby
+ self.wallaby = WallabyOperations("wallaby", self.session)
+
self.model.init()
self.session.init()
self.database.init()
Modified: trunk/sage/Makefile
===================================================================
--- trunk/sage/Makefile 2011-07-22 20:11:15 UTC (rev 4888)
+++ trunk/sage/Makefile 2011-07-22 20:39:38 UTC (rev 4889)
@@ -17,8 +17,10 @@
install python/sage/aviary/*.py python/sage/aviary/*.pyc ${lib}/aviary
install -d ${lib}/qmf
install python/sage/qmf/*.py python/sage/qmf/*.pyc ${lib}/qmf
+ install -d ${lib}/wallaby
+ install python/sage/wallaby/*.py python/sage/wallaby/*.pyc ${lib}/wallaby
install -d ${share}/rpc-defs/aviary
- install rpc-defs/aviary/*.xsd rpc-defs/aviary/*.wsdl rpc-defs/aviary/*.xml rpc-defs/aviary/*.xml.* ${share}/rpc-defs/aviary
+ install rpc-defs/aviary/* ${share}/rpc-defs/aviary
install -d ${doc}
install LICENSE COPYING ${doc}
Modified: trunk/sage/python/sage/aviary/aviaryoperations.py
===================================================================
--- trunk/sage/python/sage/aviary/aviaryoperations.py 2011-07-22 20:11:15 UTC (rev 4888)
+++ trunk/sage/python/sage/aviary/aviaryoperations.py 2011-07-22 20:39:38 UTC (rev 4889)
@@ -44,11 +44,15 @@
for job control functions. This would be extra stuff for QMF, but helpful with
Aviary.
-Add a summary comment at the top like QMF opreations
+- Add a summary comment at the top like QMF opreations
-can we use default/timeout with suds?
+- can we use default/timeout with suds?
yes, there is at least a timeout that can be set once,
it might be possible to change the timeout per call.
+
+- Add some nice error message that indicates the connection was
+refused to the aviary service. This probably means that aviary
+is not running on that port -- more helpful than "connection refulsed"
'''
class AviaryOperations(object):
Added: trunk/sage/python/sage/wallaby/__init__.py
===================================================================
12 years, 9 months
r4888 - trunk/wooly/python/wooly
by tmckay@fedoraproject.org
Author: tmckay
Date: 2011-07-22 20:11:15 +0000 (Fri, 22 Jul 2011)
New Revision: 4888
Modified:
trunk/wooly/python/wooly/server.py
Log:
Occasional errors when session.visited not set...
Modified: trunk/wooly/python/wooly/server.py
===================================================================
--- trunk/wooly/python/wooly/server.py 2011-07-22 19:26:57 UTC (rev 4887)
+++ trunk/wooly/python/wooly/server.py 2011-07-22 20:11:15 UTC (rev 4888)
@@ -252,7 +252,7 @@
count = 0
for session in self.server.client_sessions_by_id.values():
- if session.visited < when:
+ if session.visited is not None and session.visited < when:
del self.server.client_sessions_by_id[session.id]
count += 1
12 years, 9 months
r4887 - branches/clarity/cumin/python/cumin
by tmckay@fedoraproject.org
Author: tmckay
Date: 2011-07-22 19:26:57 +0000 (Fri, 22 Jul 2011)
New Revision: 4887
Modified:
branches/clarity/cumin/python/cumin/stat.py
Log:
Merge fixes from trunk
BZ709477
BZ724946
(svn merge -r 4881:4883 svn+ssh://svn.fedorahosted.org/svn/cumin/trunk .)
Modified: branches/clarity/cumin/python/cumin/stat.py
===================================================================
--- branches/clarity/cumin/python/cumin/stat.py 2011-07-22 19:10:33 UTC (rev 4886)
+++ branches/clarity/cumin/python/cumin/stat.py 2011-07-22 19:26:57 UTC (rev 4887)
@@ -1183,7 +1183,7 @@
self.alpha = 1
- def get_max_min(self, session, stats, samples):
+ def get_max_min(self, session, stats, samples, time_span, end_seconds_ago):
max_value = 0
min_value = 0
points = dict()
@@ -1321,8 +1321,8 @@
return chart_obj
class PercentAreaChart(AreaChart):
- def get_max_min(self, session, stats, samples):
- max_val, min_val = super(PercentAreaChart, self).get_max_min(session, stats, samples)
+ def get_max_min(self, session, stats, samples, time_span, end_seconds_ago):
+ max_val, min_val = super(PercentAreaChart, self).get_max_min(session, stats, samples, time_span, end_seconds_ago)
percent = self.page.percent_property.get(session)
total = self.page.get_object_property(session, percent)
12 years, 9 months