Author: tmckay Date: 2013-01-02 18:06:09 +0000 (Wed, 02 Jan 2013) New Revision: 5626
Modified: branches/elephant/sage/python/sage/aviary/aviaryoperations.py Log: Allow hadoop "get" methods to filter results based on owner. The owner argument may be a list of owners.
Modified: branches/elephant/sage/python/sage/aviary/aviaryoperations.py =================================================================== --- branches/elephant/sage/python/sage/aviary/aviaryoperations.py 2013-01-02 16:27:36 UTC (rev 5625) +++ branches/elephant/sage/python/sage/aviary/aviaryoperations.py 2013-01-02 18:06:09 UTC (rev 5626) @@ -936,8 +936,8 @@ assert callback self._operate_on_ids(host, ids, callback, "stopNameNode")
- def get_name_node(self, host, ids, callback=None): - return self._operate_on_ids(host, ids, callback, "getNameNode") + def get_name_node(self, host, ids, owner=None, callback=None): + return self._query_ids(host, ids, owner, callback, "getNameNode")
def start_data_node(self, host, nn_id, bin_file, owner, count, callback): return self._start_node(host, nn_id, bin_file, owner, count, @@ -946,8 +946,8 @@ def stop_data_node(self, host, ids, callback): self._operate_on_ids(host, ids, callback, "stopDataNode")
- def get_data_node(self, host, ids, callback=None): - return self._operate_on_ids(host, ids, callback, "getDataNode") + def get_data_node(self, host, ids, owner=None, callback=None): + return self._query_ids(host, ids, owner, callback, "getDataNode")
def start_job_tracker(self, host, nn_id, bin_file, owner, count, callback): return self._start_node(host, nn_id, bin_file, owner, count, @@ -956,8 +956,8 @@ def stop_job_tracker(self, host, ids, callback): self._operate_on_ids(host, ids, callback, "stopJobTracker")
- def get_job_tracker(self, host, ids, callback=None): - return self._operate_on_ids(host, ids, callback, "getJobTracker") + def get_job_tracker(self, host, ids, owner=None, callback=None): + return self._query_ids(host, ids, owner, callback, "getJobTracker")
def stop_task_tracker(self, host, ids, callback): self._operate_on_ids(host, ids, callback, "stopTaskTracker") @@ -966,20 +966,20 @@ return self._start_node(host, nn_id, bin_file, owner, count, "startTaskTracker", callback)
- def get_task_tracker(self, host, ids, callback=None): - return self._operate_on_ids(host, ids, callback, "getTaskTracker") + def get_task_tracker(self, host, ids, owner=None, callback=None): + return self._query_ids(host, ids, owner, callback, "getTaskTracker")
- def get_name_node_list(self, callback=None): - return self._get_node_list(self.get_name_node, callback) + def get_name_node_list(self, owner=None, callback=None): + return self._get_node_list(self.get_name_node, owner, callback)
- def get_data_node_list(self, callback=None): - return self._get_node_list(self.get_data_node, callback) + def get_data_node_list(self, owner=None, callback=None): + return self._get_node_list(self.get_data_node, owner, callback)
- def get_job_tracker_list(self, callback=None): - return self._get_node_list(self.get_job_tracker, callback) + def get_job_tracker_list(self, owner=None, callback=None): + return self._get_node_list(self.get_job_tracker, owner, callback)
- def get_task_tracker_list(self, callback=None): - return self._get_node_list(self.get_task_tracker, callback) + def get_task_tracker_list(self, owner=None, callback=None): + return self._get_node_list(self.get_task_tracker, owner, callback)
def _make_id(self, client, val):
@@ -1038,13 +1038,13 @@ client, meth_name, ref, bin_file, owner, count) t.start()
- def _get_node_list(self, proc, callback): + def _get_node_list(self, proc, owner, callback): try: status = "OK" hosts = self.get_hosts(self.resource, self.subtype) data = [] for h in hosts: - s, n = proc(h, []) + s, n = proc(h, [], owner) if s == "OK": data.extend(n) except Exception, e: @@ -1054,8 +1054,28 @@ callback(status, data) else: return (status, data) - - def _operate_on_ids(self, host, ids, callback, meth_name): + + def _query_ids(self, host, ids, owner, callback, meth_name): + # Allow query results to be constrained by owner + def filter_results(results): + if type(owner) not in (list, tuple): + o = [owner] + else: + o = owner + res = [] + for r in results: + if r.owner in o: + res.append(r) + return res + + if owner is None: + return self._operate_on_ids(host, ids, callback, meth_name) + else: + return self._operate_on_ids(host, ids, callback, meth_name, + filter_results) + + + def _operate_on_ids(self, host, ids, callback, meth_name, filter_results=None): if callback: assert callable(callback)
@@ -1079,6 +1099,8 @@ status = _AviaryCommon._get_status(result.status) if status == "OK" and hasattr(result, "results"): data = result.results + if filter_results: + data = filter_results(data) return (status, data)
if callback:
cumin-developers@lists.fedorahosted.org