Author: tmckay
Date: 2011-07-27 20:41:25 +0000 (Wed, 27 Jul 2011)
New Revision: 4894
Modified:
trunk/cumin/bin/cumin-web
trunk/cumin/python/cumin/config.py
trunk/cumin/python/cumin/main.py
trunk/sage/python/sage/util.py
trunk/sage/python/sage/wallaby/wallabyoperations.py
Log:
More wallaby interface stuff.
Modified: trunk/cumin/bin/cumin-web
===================================================================
--- trunk/cumin/bin/cumin-web 2011-07-27 13:27:17 UTC (rev 4893)
+++ trunk/cumin/bin/cumin-web 2011-07-27 20:41:25 UTC (rev 4894)
@@ -22,6 +22,14 @@
cumin.aviary_job_port = values.aviary_job_port
cumin.aviary_query_port = values.aviary_query_port
+
+def set_wallaby_configs(cumin, values, brokers):
+ if values.wallaby_broker == "":
+ cumin.wallaby_broker = brokers[0]
+ else:
+ cumin.wallaby_broker = values.wallaby_broker
+ cumin.wallaby_refresh = values.wallaby_refresh
+
def main():
# Do our own simple option check so we can redirect IO early
# without worrying about other options or the behavior of optParse
@@ -89,7 +97,6 @@
values.log_max_archives)
broker_uris = [x.strip() for x in opts.brokers.split(",")]
-
cumin = Cumin(config.get_home(), broker_uris, opts.database,
opts.host, opts.port, values.persona)
@@ -97,6 +104,8 @@
cumin.sasl_mech_list = values.sasl_mech_list.upper()
set_aviary_configs(cumin, values)
+ set_wallaby_configs(cumin, values, broker_uris)
+
cumin.debug = opts.debug
cumin.user = values.user
cumin.update_interval = values.update_interval
@@ -104,7 +113,6 @@
cumin.fast_view_attributes = [x.strip() for x in
values.fast_view_attributes.split(',')]
-
# set default values for form inputs
cumin.set_form_defaults(values.request_memory,
values.request_memory_vm,
Modified: trunk/cumin/python/cumin/config.py
===================================================================
--- trunk/cumin/python/cumin/config.py 2011-07-27 13:27:17 UTC (rev 4893)
+++ trunk/cumin/python/cumin/config.py 2011-07-27 20:41:25 UTC (rev 4894)
@@ -149,6 +149,12 @@
# previous behavior
param = ConfigParameter(self, "sasl-mech-list", str)
+ param = ConfigParameter(self, "wallaby-broker", str)
+ param.default = ""
+
+ param = ConfigParameter(self, "wallaby-refresh", int)
+ param.default = 60
+
param = ConfigParameter(self, "use-aviary", bool)
param.default = True
Modified: trunk/cumin/python/cumin/main.py
===================================================================
--- trunk/cumin/python/cumin/main.py 2011-07-27 13:27:17 UTC (rev 4893)
+++ trunk/cumin/python/cumin/main.py 2011-07-27 20:41:25 UTC (rev 4894)
@@ -91,6 +91,10 @@
self.aviary_job_port = None
self.aviary_query_port = None
+ self.wallaby = None
+ self.wallaby_broker = None
+ self.wallaby_refresh = 60
+
def server_alive(self):
return self.server.server_alive()
@@ -183,8 +187,13 @@
self.remote.add_mechanisms(ops)
# Create RPC interface for Wallaby
- self.wallaby = WallabyOperations("wallaby", self.session)
-
+ # The Wallaby client API needs a broker connection. The broker host
+ # for Wallaby is not necessarily the same as the broker used by
+ # cumin-web for grid data, and flags set on the Session for cumin
+ # may not be appropriate for the Wallaby API. So, it gets its own.
+ self.wallaby = WallabyOperations(self.wallaby_broker,
+ self.wallaby_refresh)
+
self.model.init()
self.session.init()
self.database.init()
@@ -227,6 +236,7 @@
self.session.start()
self.server.start()
+ self.wallaby.start()
def stop(self):
log.info("Stopping %s", self)
Modified: trunk/sage/python/sage/util.py
===================================================================
--- trunk/sage/python/sage/util.py 2011-07-27 13:27:17 UTC (rev 4893)
+++ trunk/sage/python/sage/util.py 2011-07-27 20:41:25 UTC (rev 4894)
@@ -108,7 +108,7 @@
'''
Simple thread that executes result = routine(args).
This thread is useful for calling synchronous routines asynchronously.
- Result is returned by calling callback(result).
+ Result is returned by calling callback(result) if callback is not None.
If routine raises an exception, callback(exception) is called.
'''
def __init__(self, routine, callback, *args, **kwargs):
@@ -124,7 +124,8 @@
result = self.routine(*self.args, **self.kwargs)
except Exception, e:
result = e
- self.callback(result)
+ if self.callback is not None:
+ self.callback(result)
class ObjectPool(object):
'''
Modified: trunk/sage/python/sage/wallaby/wallabyoperations.py
===================================================================
--- trunk/sage/python/sage/wallaby/wallabyoperations.py 2011-07-27 13:27:17 UTC (rev
4893)
+++ trunk/sage/python/sage/wallaby/wallabyoperations.py 2011-07-27 20:41:25 UTC (rev
4894)
@@ -1,9 +1,236 @@
import logging
+import time
+from qmf.console import Session
+from sage.util import CallThread
+from threading import Lock, Condition
+
+imports_ok = True
+try:
+ import wallaby
+ from wallaby_collections import StorePatches
+except:
+ imports_ok = False
+
log = logging.getLogger("sage.wallaby")
-class WallabyOperations(object):
- def __init__(self, name, session):
+if imports_ok:
+ class WallabyOperations(object):
+ def __init__(self, broker_uri, refresh_interval=60):
+ self.broker_uri = broker_uri
- self.name = name
- self.session = session
+ # A wallaby Store object
+ self._store = None
+
+ # A QMF broker
+ self._broker = None
+
+ # The cache maintenance thread
+ self.maintain_cache = None
+
+ # Stop the maintenance thread
+ self._stop = False
+
+ # Length of time the maintenance thread will sleep
+ # (in seconds) before it next attempts to refresh cached
+ # objects after a successful attempt. Failed attempts
+ # result in a 5 second retry time.
+ self.refresh_interval = refresh_interval
+
+ # Cached node list
+ self.nodes = []
+
+ # Protect cached info, give the cache maintenance
+ # thread a condition to sleep on that it may be woken
+ # up early.
+ self.lock = Lock()
+ self.condition = Condition(self.lock)
+
+ def stop(self, wait=False, timeout=None):
+ '''
+ Wake the caching thread if asleep and cause it to exit.
+ If wait is True, the call will return after the thread
+ has exited or timeout seconds has passed if timeout is
+ not None.
+ '''
+ self._stop = True
+ self.refresh()
+ if wait:
+ self.maintain_cache.join(timeout)
+ log.debug("WallabyOperations: stopped cache maintenance thread")
+
+ def refresh(self):
+ '''
+ Wake the caching thread if asleep and cause it to iterate.
+ '''
+ self.condition.acquire()
+ self.condition.notify()
+ self.condition.release()
+
+ def start(self):
+ '''
+ Start the caching thread.
+
+ This thread will attempt to connect to the broker and retrieve
+ a wallaby Store object. If successful, it will periodically
+ retrieve and cache necessary wallaby objects.
+
+ Only one caching thread may run at a time. The thread may
+ be restarted if it has previously been stopped.
+ '''
+ # The connection to the broker can actually take a long
+ # time to complete. We don't want to hang a calling function,
+ # so we handle the connection and retrieval of the
+ # initial Store object from Wallaby in a thread.
+ # (There may need to be more work here if the broker or wallaby
+ # going away and coming back causes a problem, but with
+ # manageConnections=True and well-known agent/object ids for
+ # Wallaby it appears to recover on its own...)
+
+ # Similarly, getting node lists etc may take a long time
+ # especially over a slow network. So we use the same thread
+ # to retrieve things like node lists at defined intervals.
+
+ # 'self' here is really a term of art since this is a local
+ # function, but it refers to the WallabyOperations object
+ # so the code reads nicely
+ def maintain_cache(self):
+ # Get initinal connection and Store obect
+ self.session = Session(manageConnections=True)
+ self.broker = self.session.addBroker(self.broker_uri)
+ while not self._stop:
+ self._store = self._get_store()
+ if self._store is not None:
+ log.debug("WallabyOperations: found wallaby store
object")
+ break
+
+ # Allow us to be woken up early...
+ self.condition.acquire()
+ self.condition.wait(5)
+ self.condition.release()
+
+ # Okay, we have a Store object so now we can
+ # get node lists and such
+ while not self._stop:
+ # Uses API extensions to __getattr__ on the Store to
+ # retrieve a list of nodes from the Broker and wrap them
+ # in Wallaby proxy objects. This might take a while.
+ try:
+ nodes = self._store.nodes
+ except:
+ nodes = []
+ self._set_node_list(nodes)
+ if nodes in ([], None):
+ # Hmm, try again shortly
+ sleep = 5
+ else:
+ # We found some nodes so cache them and
+ # sleep for a normal interval
+ sleep = self.refresh_interval
+
+ self.condition.acquire()
+ self.condition.wait(sleep)
+ self.condition.release()
+
+ # And wipe out the node list if we have been
+ # stopped....
+ self._set_node_list([])
+
+ if self.maintain_cache is not None and \
+ self.maintain_cache.isAlive():
+ # No, you can't start another one.
+ return False
+
+ self._stop = False
+ self.maintain_cache = CallThread(maintain_cache, None, self)
+ self.maintain_cache.daemon = True
+ self.maintain_cache.start()
+ log.debug("WallabyOperations: start cache maintenance thread")
+ return True
+
+ def get_node_list(self):
+ '''
+ Return the latest list of wallaby.Node objects retrieved by
+ the caching thread.
+ '''
+ # Who really knows for sure how the GIL and byte code
+ # works in Python? Play it safe, bump the reference
+ # count on self.nodes inside the lock and return.
+ self.lock.acquire()
+ n = self.nodes
+ self.lock.release()
+ return n
+
+ # Super secret private implementation stuff. Don't look!
+
+ def _set_node_list(self, nodes):
+ self.lock.acquire()
+ self.nodes = nodes
+ self.lock.release()
+ log.debug("WallabyOperations: Node list updated (%s nodes)" %
len(nodes))
+
+ def _get_store(self):
+ # Ideally there should only be a single Store object from a single agent.
+ # We constrain the search a bit, hopefully more efficient
+ store = None
+ try:
+ agents = [agent for agent in self.session.getAgents() \
+ if "com.redhat.grid.config:Store" in agent.label]
+ for agent in agents:
+ s = self.session.getObjects(_agent = agent, _class =
"Store")
+ if len(s) > 0:
+ # And finally wrap the QMF object in a wallaby.Store wrapper
+ store = wallaby.Store(s[0], self.session)
+ break
+ except:
+ pass
+ return store
+else:
+ class WallabyOperations(object):
+ '''
+ Dummy object when wallaby imports fail
+ '''
+ def __init__(self, broker_uri, refresh_interval=60):
+ self.broker_uri = broker_uri
+ self.refresh_interval = refresh_interval
+
+ def stop(self, wait=False, timeout=None):
+ '''
+ Wake the caching thread if asleep and cause it to exit.
+ If wait is True, the call will return after the thread
+ has exited or timeout seconds has passed if timeout is
+ not None.
+ '''
+ pass
+
+ def refresh(self):
+ '''
+ Wake the caching thread if asleep and cause it to iterate.
+ '''
+ pass
+
+ def start(self):
+ '''
+ Start the caching thread.
+
+ This thread will attempt to connect to the broker and retrieve
+ a wallaby Store object. If successful, it will periodically
+ retrieve and cache necessary wallaby objects.
+
+ Only one caching thread may run at a time. The thread may
+ be restarted if it has previously been stopped.
+ '''
+ log.debug("WallabyOperations: using dummy implementation, imports
failed")
+ return False
+
+ def get_node_list(self):
+ '''
+ Return the latest list of wallaby.Node objects retrieved by
+ the caching thread.
+ '''
+ return []
+
+
+
+
+