Author: tmckay Date: 2012-12-17 15:54:45 +0000 (Mon, 17 Dec 2012) New Revision: 5606
Modified: branches/elephant/sage/python/sage/aviary/aviaryoperations.py Log: Add start methods for all node/trackers
Modified: branches/elephant/sage/python/sage/aviary/aviaryoperations.py =================================================================== --- branches/elephant/sage/python/sage/aviary/aviaryoperations.py 2012-12-17 15:04:46 UTC (rev 5605) +++ branches/elephant/sage/python/sage/aviary/aviaryoperations.py 2012-12-17 15:54:45 UTC (rev 5606) @@ -939,12 +939,20 @@ def get_name_node(self, host, ids, callback=None): return self._operate_on_ids(host, ids, callback, "getNameNode")
+ def start_data_node(self, host, nn_id, bin_file, owner, count, callback): + return self._start_node(host, nn_id, bin_file, owner, count, + "startDataNode", callback) + def stop_data_node(self, host, ids, callback): self._operate_on_ids(host, ids, callback, "stopDataNode")
def get_data_node(self, host, ids, callback=None): return self._operate_on_ids(host, ids, callback, "getDataNode")
+ def start_job_tracker(self, host, nn_id, bin_file, owner, count, callback): + return self._start_node(host, name_node_id, bin_file, owner, count, + "startJobTracker", callback) + def stop_job_tracker(self, host, ids, callback): self._operate_on_ids(host, ids, callback, "stopJobTracker")
@@ -954,6 +962,10 @@ def stop_task_tracker(self, host, ids, callback): self._operate_on_ids(host, ids, callback, "stopTaskTracker")
+ def start_task_tracker(self, host, nn_id, bin_file, owner, count, callback): + return self._start_node(host, name_node_id, bin_file, owner, count, + "startTaskTracker", callback) + def get_task_tracker(self, host, ids, callback=None): return self._operate_on_ids(host, ids, callback, "getTaskTracker")
@@ -969,6 +981,44 @@ def get_task_tracker_list(self, callback=None): return self._get_node_list(self.get_task_tracker, callback)
+ def _start_node(self, host, ref_id, bin_file, owner, count, + meth_name, callback): + assert callable(callback) + + client = self.client_pool.get_object() + self._setup_client(client, + self.servers, + host, + meth_name) + + def result_tuple(result, host): + data = None + result = self._pretty_result(result, host) + if isinstance(result, Exception): + status = result + else: + status = _AviaryCommon._get_status(result.status) + if status == "OK" and hasattr(result, "ref"): + data = result.ref + return (status, data) + + def my_callback(result): + self.client_pool.return_object(client) + callback(*result_tuple(result, host)) + + ref = client.factory.create("ns1:HadoopID") + try: + # In the future, if we would like to use URLs as ids, + # we can test for a URL value and set new_ref.ipc instead + #ref.ipc = id + ref.id = str(ref_id) + except Exception: + raise Exception("Invalid HadoopID value") + + t = CallThread(self.call_client_retry, my_callback, + client, meth_name, ref, bin_file, owner, count) + t.start() + def _get_node_list(self, proc, callback): try: status = "OK"
cumin-developers@lists.fedorahosted.org