Author: tmckay Date: 2011-08-02 15:23:08 +0000 (Tue, 02 Aug 2011) New Revision: 4900
Modified: trunk/sage/python/sage/wallaby/wallabyoperations.py Log: Additional routines added to the API, internal tweaks.
Modified: trunk/sage/python/sage/wallaby/wallabyoperations.py =================================================================== --- trunk/sage/python/sage/wallaby/wallabyoperations.py 2011-08-01 20:13:05 UTC (rev 4899) +++ trunk/sage/python/sage/wallaby/wallabyoperations.py 2011-08-02 15:23:08 UTC (rev 4900) @@ -67,14 +67,15 @@ self._stop = False
# Cached data. Each of the keys in this dictionary is the name of - # an attribute on the Wallaby Store object. + # an attribute on the Wallaby Store object, with the exception of + # WBTypes.TAGS. The TAGS data is a subset of the GROUPS produced + # in this module. self._cache = {WBTypes.NODES: self.CacheData(refresh_interval), - WBTypes.GROUPS: self.CacheData(refresh_interval)} + WBTypes.GROUPS: self.CacheData(refresh_interval), + WBTypes.TAGS: self.CacheData(refresh_interval, + synthetic=self._generate_tag_data)}
- # Tags are special. They are just a subset of groups, but it - # takes time to filter the list so it is handled in a separate - # thread when group data is received. - self._tags = [] + # Cache a list of nodes that are members of a tag self._nodes_by_tag = dict()
# Lock is used for synchronization with the caching thread and @@ -143,24 +144,14 @@
# Okay, now we're ready to retrieve data while not self._stop: - start_processing = time.time() for attr, val in self._cache.iteritems(): # val.remaining is the number of seconds left before # the next update of this data item. None is "forever". - if val.remaining is not None and val.remaining <= 0: - log.debug("WallabyOperations: refreshing %s" % attr) - try: - # Wallaby API uses extensions to __getattr__ on - # the Store to retrieve objects from the Broker - # and return a list of proxy objects. - start = time.time() - d = getattr(self._store, attr, []) - except: - d = [] - delta = time.time() - start - log.debug("WallabyOperations: %s seconds to refresh %s" % (delta, attr)) - + # Synthetic items are not retreived from the store. + if not val.synthetic and \ + val.remaining is not None and val.remaining <= 0: + d = get_values(attr, getattr, self._store, attr, []) # If the data is empty, _set_cache will leave the # remaining field set to 0 for the attribute so we # will try to get it again on our next retry. @@ -168,6 +159,13 @@ # interval for this attribute. self._set_cache(attr, d)
+ # Now handle the synthetics. val.synthetic generates + # and stores it's own results. + for attr, val in self._cache.iteritems(): + if val.synthetic and \ + val.remaining is not None and val.remaining <= 0: + get_values(attr, val.synthetic, *val.args) + log.debug("WallabyOperations: total refresh processing time %s" \ % (time.time() - start_processing))
@@ -204,6 +202,20 @@ self._set_cache(attr, []) #end maintain_cache
+ def get_values(attr, call, *args): + log.debug("WallabyOperations: refreshing %s" % attr) + try: + # Wallaby API uses extensions to __getattr__ on + # the Store to retrieve objects from the Broker + # and return a list of proxy objects. + start = time.time() + d = call(*args) + except: + d = [] + delta = time.time() - start + log.debug("WallabyOperations: %s seconds to refresh %s" % (delta, attr)) + return d + if self._maintain_cache is not None and \ self._maintain_cache.isAlive(): # No, you can't start another one. @@ -261,10 +273,6 @@ else: do_notify = False for attr in items: - if attr == WBTypes.TAGS: - # Tags are really just groups with a filter - # in get_data() - attr = WBTypes.GROUPS if attr in self._cache: do_notify = True self._cache[attr].refresh = True @@ -275,20 +283,65 @@
def get_data(self, which): ''' - Return the most recently cached value for the specified data. + Return a list of cached values for the specified category.
- which -- specifies the data item. Attributes of WBTypes + The values returned will be proxy objects constructed by + the Wallaby client library. + + which -- specifies the category. Attributes of WBTypes define valid values for "which" ''' d = [] self._lock.acquire() if which in self._cache: - d = self._cache[which].data - elif which == WBTypes.TAGS: - d = self._tags + d = self._cache[which].data.values() self._lock.release() return d
+ def get_names(self, which): + ''' + Return a list of cached names for the specified category. + + The values returned will be the names of objects constructed + by the Wallaby client library. + + which -- specifies the category. Attributes of WBTypes + define valid values for "which" + ''' + d = [] + self._lock.acquire() + if which in self._cache: + d = self._cache[which].data.keys() + self._lock.release() + return d + + def get_node_by_name(self, name): + ''' + Return a cached wallaby.Node object by name. + + If name does not designate a currently cached + object, None is returned. + ''' + return self._lookup_by_name(WBTypes.NODES, name) + + def get_group_by_name(self, name): + ''' + Return a cached wallaby.Group object by name. + + If name does not designate a currently cached + object, None is returned. + ''' + return self._lookup_by_name(WBTypes.GROUPS, name) + + def get_tag_by_name(self, name): + ''' + Return a cached wallaby.Tag object by name. + + If name does not designate a currently cached + object, None is returned. + ''' + return self._lookup_by_name(WBTypes.TAGS, name) + def get_node_names(self, tag): ''' Return a list of node names associated with the tag. @@ -320,9 +373,6 @@ refresh -- the interval in seconds. If None, the specified item will be updated iff refresh() is called. ''' - if which == WBTypes.TAGS: - which = WBTypes.GROUPS - if which in self._cache: self._lock.acquire() self._cache[which].interval = refresh @@ -343,41 +393,7 @@ if t is not None and t < min: t = min return t - - def _set_cache(self, attr, data): - self._lock.acquire() - self._cache[attr].data = data - self._cache[attr].reset_remaining(len(data) == 0) - self._lock.release() - log.debug("WallabyOperations: %s list updated (%s items)" % (attr, len(data))) - - # Filter groups for the subset of tags - if attr == WBTypes.GROUPS: - self._generate_tag_list(data) - - def _generate_tag_list(self, data):
- def process_tags(groups): - - # get the tag list - tags = [] - nodes_by_tag = dict() - for g in groups: - if g.getConfig() == {} and not g.name.startswith("+++"): - tags.append(g) - nodes = g.membership() - nodes_by_tag[g.name] = nodes - self._store_tag_list(tags, nodes_by_tag) - - t = CallThread(process_tags, None, data) - t.start() - - def _store_tag_list(self, tags, nodes_by_tag): - self._lock.acquire() - self._tags = tags - self._nodes_by_tag = nodes_by_tag - self._lock.release() - def _get_store(self): # Ideally there should only be a single Store object from a single agent. # We constrain the search a bit, hopefully more efficient @@ -395,16 +411,54 @@ pass return store
+ def _generate_tag_data(self): + # figure out the tag list and nodes per tag + groups = self.get_data(WBTypes.GROUPS) + tags = [] + nodes_by_tag = dict() + for g in groups: + if not g.name.startswith("+++") and g.getConfig() == {}: + tags.append(g) + nodes = g.membership() + nodes_by_tag[g.name] = nodes + + self._lock.acquire() + self._cache[WBTypes.TAGS].data = self._to_dict(tags) + self._cache[WBTypes.TAGS].reset_remaining(len(tags) == 0) + self._nodes_by_tag = nodes_by_tag + self._lock.release() + log.debug("WallabyOperations: %s list updated (%s items)" % (WBTypes.TAGS, len(tags))) + + def _set_cache(self, attr, data): + self._lock.acquire() + self._cache[attr].data = self._to_dict(data) + self._cache[attr].reset_remaining(len(data) == 0) + self._lock.release() + log.debug("WallabyOperations: %s list updated (%s items)" % (attr, len(data))) + + def _to_dict(self, data): + return dict([(x.name, x) for x in data]) + + def _lookup_by_name(self, which, name): + n = None + self._lock.acquire() + if name in self._cache[which].data: + n = self._cache[which].data[name] + self._lock.release() + return n + class CacheData(object): - def __init__(self, interval): - # This attribure is only referenced from the cache thread - # so it does not need to be protected + def __init__(self, interval, synthetic=False, args=()): + # These attributes are only referenced from the cache thread + # so they do not need to be protected self.remaining = 0 + self.synthetic = synthetic + self.args = args
# Use of these these items need to be protected by the lock self.refresh = False self.interval = interval - self.data = [] + self.data = {}
def reset_remaining(self, retry): # If retry is True, we want to try to get the data @@ -435,5 +489,11 @@ def get_data(self, *args, **kwargs): return []
+ def get_names(self, *args, **kwargs): + return [] + + def get_node_names(self, *args, **kwargs): + return [] + def set_interval(self, *args, **kwargs): pass
cumin-developers@lists.fedorahosted.org