Author: croberts Date: 2012-12-17 17:01:11 +0000 (Mon, 17 Dec 2012) New Revision: 5607
Modified: branches/elephant/cumin/model/cumin.xml branches/elephant/cumin/model/rosemary.xml branches/elephant/cumin/python/cumin/gridhadoop/datanode.py branches/elephant/cumin/python/cumin/gridhadoop/hadoop.py branches/elephant/cumin/python/cumin/gridhadoop/jobtracker.py branches/elephant/cumin/python/cumin/gridhadoop/namenode.py branches/elephant/cumin/python/cumin/gridhadoop/tasktracker.py branches/elephant/sage/python/sage/aviary/aviaryoperations.py Log: More wiring for hadoop methods.
Modified: branches/elephant/cumin/model/cumin.xml =================================================================== --- branches/elephant/cumin/model/cumin.xml 2012-12-17 15:54:45 UTC (rev 5606) +++ branches/elephant/cumin/model/cumin.xml 2012-12-17 17:01:11 UTC (rev 5607) @@ -47,7 +47,7 @@
<package name="com.redhat.cumin.grid.hadoop"> <class name="NameNode" storage="none"> - <property name="ID" type="sstr"/> + <property name="Id" type="sstr"/> <property name="Ipc" type="sstr"/> <property name="Submitted" type="sstr"/> <property name="State" type="sstr"/> @@ -56,25 +56,28 @@ <property name="Location" type="sstr"/> </class> <class name="DataNode" storage="none"> - <property name="ID" type="sstr"/> + <property name="Id" type="sstr"/> + <property name="Ipc" type="sstr"/> <property name="Submitted" type="sstr"/> - <property name="Status" type="sstr"/> + <property name="State" type="sstr"/> <property name="Uptime" type="sstr"/> <property name="Owner" type="sstr"/> <property name="NameNode" type="sstr"/> </class> <class name="TaskTracker" storage="none"> - <property name="ID" type="sstr"/> + <property name="Id" type="sstr"/> + <property name="Ipc" type="sstr"/> <property name="Submitted" type="sstr"/> - <property name="Status" type="sstr"/> + <property name="State" type="sstr"/> <property name="Uptime" type="sstr"/> <property name="Owner" type="sstr"/> <property name="JobTracker" type="sstr"/> </class> <class name="JobTracker" storage="none"> - <property name="ID" type="sstr"/> + <property name="Id" type="sstr"/> + <property name="Ipc" type="sstr"/> <property name="Submitted" type="sstr"/> - <property name="Status" type="sstr"/> + <property name="State" type="sstr"/> <property name="Uptime" type="sstr"/> <property name="Owner" type="sstr"/> <property name="Location" type="sstr"/>
Modified: branches/elephant/cumin/model/rosemary.xml =================================================================== --- branches/elephant/cumin/model/rosemary.xml 2012-12-17 15:54:45 UTC (rev 5606) +++ branches/elephant/cumin/model/rosemary.xml 2012-12-17 17:01:11 UTC (rev 5607) @@ -37,7 +37,7 @@
<package name="com.redhat.cumin.grid.hadoop"> <class name="NameNode"> - <property name="ID"> + <property name="Id"> <title>Id</title> </property> <property name="Ipc"> @@ -61,56 +61,59 @@ </class>
<class name="DataNode"> - <property name="ID"> + <property name="Id"> <title>Id</title> </property> + <property name="Ipc"> + <title>Ipc</title> + </property> <property name="Submitted"> <title>Submitted</title> </property> - <property name="Status"> - <title>Status</title> + <property name="State"> + <title>State</title> </property> <property name="Uptime"> <title>Uptime</title> </property> <property name="Owner"> <title>Owner</title> - </property> - <property name="NameNode"> - <title>Name node</title> - </property> + </property> </class>
<class name="TaskTracker"> - <property name="ID"> + <property name="Id"> <title>Id</title> </property> + <property name="Ipc"> + <title>Ipc</title> + </property> <property name="Submitted"> <title>Submitted</title> </property> - <property name="Status"> - <title>Status</title> + <property name="State"> + <title>State</title> </property> <property name="Uptime"> <title>Uptime</title> </property> <property name="Owner"> <title>Owner</title> - </property> - <property name="JobTracker"> - <title>Job tracker</title> - </property> + </property> </class>
<class name="JobTracker"> - <property name="ID"> + <property name="Id"> <title>Id</title> </property> + <property name="Ipc"> + <title>Ipc</title> + </property> <property name="Submitted"> <title>Submitted</title> </property> - <property name="Status"> - <title>Status</title> + <property name="State"> + <title>State</title> </property> <property name="Uptime"> <title>Uptime</title>
Modified: branches/elephant/cumin/python/cumin/gridhadoop/datanode.py =================================================================== --- branches/elephant/cumin/python/cumin/gridhadoop/datanode.py 2012-12-17 15:54:45 UTC (rev 5606) +++ branches/elephant/cumin/python/cumin/gridhadoop/datanode.py 2012-12-17 17:01:11 UTC (rev 5607) @@ -11,15 +11,19 @@
from sage.util import *
- +class DataNodeAdapter(HadoopAdapter): + def get_sage_results(self, values): + results = self.app.remote.get_data_node_list() + return results + class DataNodeSelector(ObjectSelector): def __init__(self, app, name): cls = app.model.com_redhat_cumin_grid_hadoop.DataNode
super(DataNodeSelector, self).__init__(app, name, cls)
- self.add_search_filter(self.table.nn_col) - self.table.adapter = HadoopAdapter(app, cls) + self.add_search_filter(self.table.id_col) + self.table.adapter = DataNodeAdapter(app, cls)
task = DataNodeCreate(app) link = TaskLink(app, "dataNode_create", task) @@ -35,27 +39,19 @@ def render_title(self, session): return "HDFS data nodes"
- def get_qmf_results(self, session): - values = self.get_data_values(session) - return self.table.adapter.get_sage_results(values) - - class DataNodeTable(ObjectQmfSelectorTable): def __init__(self, app, name, cls): super(DataNodeTable, self).__init__(app, name, cls)
- self.nn_col = ObjectTableColumn(app, "nncol", cls.NameNode) - self.nn_col.width="20%" - self.id_col = ObjectTableColumn(app, "idcol", cls.ID) + self.id_col = ObjectTableColumn(app, "idcol", cls.Id) self.sub_col = ObjectTableColumn(app, "sub", cls.Submitted) - self.status_col = ObjectTableColumn(app, "statuscol", cls.Status) + self.state_col = ObjectTableColumn(app, "statecol", cls.State) self.uptime_col = ObjectTableColumn(app, "uptimecol", cls.Uptime) self.owner_col = ObjectTableColumn(app, "ownercol", cls.Owner)
- self.add_column(self.nn_col) self.add_column(self.id_col) self.add_column(self.sub_col) - self.add_column(self.status_col) + self.add_column(self.state_col) self.add_column(self.uptime_col) self.add_column(self.owner_col)
@@ -70,7 +66,7 @@ def do_invoke(self, invoc, id, args): self.invoc = invoc (hadoophost) = args - self.app.remote.stop_data_node(hadoophost, id, invoc.make_callback()) + self.app.remote.stop_data_node(hadoophost, [float(id)], invoc.make_callback())
def get_title(self, session): @@ -89,7 +85,7 @@
if not self.errors.get(session): #TODO get hosts associated with each id - hadoophost = "TODO" + hadoophost = "grid2.lab.bos.redhat.com:9090" self.task.invoke(session, ids, (hadoophost))
@@ -118,7 +114,8 @@ (binfile, owner, hadoophost, count, name_node) = args
try: - call_async(self.callback, self.fake_call, binfile, owner, hadoophost, count, name_node) + self.invoc = invoc + self.app.remote.start_data_node(hadoophost, name_node, binfile, owner, count, invoc.make_callback()) except Exception, e: invoc.status = invoc.FAILED log.debug("Creating data node failed", exc_info=True) @@ -173,61 +170,4 @@ return "Number of data nodes to start"
-class NameNodeField(ScalarField): - def __init__(self, app, name): - super(NameNodeField, self).__init__(app, name, None) - - self.param = IntegerParameter(app, "param") - self.add_parameter(self.param) - - cls = app.model.com_redhat_cumin_grid_hadoop.NameNode - - self.input = self.NameNodeOptions(app, "input", self.param) - self.add_child(self.input) - - def get(self, session): - return self.input.get(session) - - def validate(self, session): - super(NameNodeField, self).validate(session) - - name_nodes = self.input.get_items(session) - - if not name_nodes: - error = FormError("There is no name node to bind to") - self.form.errors.add(session, error) - - def render_title(self, session): - return "Name node" - - def render_help(self, session): - return "Attach data nodes to this name node" - - class NameNodeOptions(OptionInputSet): - def do_process(self, session): - cls = self.app.model.com_redhat_cumin_grid_hadoop.NameNode - id = self.param.get(session) - - if id is None: - items = self.get_items(session) - - if items: - self.param.set(session, items[0]) - - super(NameNodeField.NameNodeOptions, self).do_process \ - (session) - - def do_get_items(self, session): - cls = self.app.model.com_redhat_cumin_grid_hadoop.NameNode - name_nodes = [12345, 23456] - return name_nodes - - def render_item_value(self, session, item): - return item - - def render_item_content(self, session, item): - return xml_escape(item) - - def render_item_selected_attr(self, session, item): - if item == self.param.get(session): - return "selected="selected"" \ No newline at end of file + \ No newline at end of file
Modified: branches/elephant/cumin/python/cumin/gridhadoop/hadoop.py =================================================================== --- branches/elephant/cumin/python/cumin/gridhadoop/hadoop.py 2012-12-17 15:54:45 UTC (rev 5606) +++ branches/elephant/cumin/python/cumin/gridhadoop/hadoop.py 2012-12-17 17:01:11 UTC (rev 5607) @@ -29,23 +29,19 @@ results = MethodResult() results.status = "OK" results.data = {"node1":{"Name":"node1.lab.bos.redhat.com",\ - "ID":"12345", \ - "Submitted":"11/14/2012", \ - "Status":"FAKE*Running", \ - "Uptime":"2+00:54:56", \ - "Owner":"croberts", \ - "Location":"@http://mrg7.lab.bos.redhat.com:40222%22,%5C - "NameNode":"796210 @ http://mrg16.lab.bos.redhat.com:50061%22,%5C - "JobTracker":"799639 @ http://mrg44.lab.bos.redhat.com:57339%22%7D, \ + "id":"12345", \ + "submitted":"11/14/2012", \ + "ref":{"ipc":"some fake ipc location", "id":"12345"}, \ + "state":"FAKE*Running", \ + "uptime":"2+00:54:56", \ + "owner":"croberts"}, \ "node2":{"Name":"node2.lab.bos.redhat.com",\ - "ID":"56789", \ - "Submitted":"11/15/2012", \ - "Status":"FAKE*Running", \ - "Uptime":"1+00:24:56", \ - "Owner":"croberts", \ - "Location":"http://mrg8.lab.bos.redhat.com:40222%22,%5C - "NameNode":"720565 @ http://mrg14.lab.bos.redhat.com:44594%22,%5C - "JobTracker":"799639 @ http://mrg44.lab.bos.redhat.com:57339%22%7D%7D + "id":"56789", \ + "ref":{"ipc":"another fake ipc location", "id":"56789"}, \ + "submitted":"11/15/2012", \ + "state":"FAKE*Running", \ + "uptime":"1+00:24:56", \ + "owner":"croberts"}} return (results.status, results.data)
def do_get_data(self, values): @@ -60,11 +56,14 @@ field_data = list() for column in self.columns: try: - val = record[column.name] - except KeyError: - val = record["ID"] + val = record[column.name.lower()] + except AttributeError: + if (column.name == "Ipc"): + val = record.ref.ipc + else: + val = record.ref.id field_data.append(val) - return field_data + return field_data
class HadoopNodeDeleteForm(ObjectSelectorTaskForm): def __init__(self, app, name, task, cls): @@ -144,4 +143,66 @@
class Owner(StringField): def render_title(self, session): - return "Username of the owner" \ No newline at end of file + return "Username of the owner" + + +class NameNodeField(ScalarField): + def __init__(self, app, name): + super(NameNodeField, self).__init__(app, name, None) + + self.param = IntegerParameter(app, "param") + self.add_parameter(self.param) + + cls = app.model.com_redhat_cumin_grid_hadoop.NameNode + + self.input = self.NameNodeOptions(app, "input", self.param) + self.add_child(self.input) + + def get(self, session): + return self.input.get(session) + + def validate(self, session): + super(NameNodeField, self).validate(session) + + name_nodes = self.input.get_items(session) + + if not name_nodes: + error = FormError("There is no name node to bind to") + self.form.errors.add(session, error) + + def render_title(self, session): + return "Name node" + + def render_help(self, session): + return "Attach data nodes to this name node" + + class NameNodeOptions(OptionInputSet): + def do_process(self, session): + cls = self.app.model.com_redhat_cumin_grid_hadoop.NameNode + id = self.param.get(session) + + if id is None: + items = self.get_items(session) + + if items: + self.param.set(session, items[0]) + + super(NameNodeField.NameNodeOptions, self).do_process \ + (session) + + def do_get_items(self, session): + nodes = [] + (status, node_list) = self.app.remote.get_name_node_list() + for node in node_list: + nodes.append(node.ref.id) + return nodes + + def render_item_value(self, session, item): + return item + + def render_item_content(self, session, item): + return xml_escape(item) + + def render_item_selected_attr(self, session, item): + if item == self.param.get(session): + return "selected="selected"" \ No newline at end of file
Modified: branches/elephant/cumin/python/cumin/gridhadoop/jobtracker.py =================================================================== --- branches/elephant/cumin/python/cumin/gridhadoop/jobtracker.py 2012-12-17 15:54:45 UTC (rev 5606) +++ branches/elephant/cumin/python/cumin/gridhadoop/jobtracker.py 2012-12-17 17:01:11 UTC (rev 5607) @@ -11,15 +11,19 @@
from sage.util import *
- +class JobTrackerAdapter(HadoopAdapter): + def get_sage_results(self, values): + results = self.app.remote.get_job_tracker_list() + return results + class JobTrackerSelector(ObjectSelector): def __init__(self, app, name): cls = app.model.com_redhat_cumin_grid_hadoop.JobTracker
super(JobTrackerSelector, self).__init__(app, name, cls)
- self.add_search_filter(self.table.loc_col) - self.table.adapter = HadoopAdapter(app, cls) + self.add_search_filter(self.table.id_col) + self.table.adapter = JobTrackerAdapter(app, cls)
task = JobTrackerCreate(app) link = TaskLink(app, "jobTracker_create", task) @@ -43,18 +47,18 @@ def __init__(self, app, name, cls): super(JobTrackerTable, self).__init__(app, name, cls)
- self.loc_col = ObjectTableColumn(app, "loccol", cls.Location) - self.loc_col.width = "20%" - self.id_col = ObjectTableColumn(app, "idcol", cls.ID) + self.id_col = ObjectTableColumn(app, "idcol", cls.Id) + self.ipc_col = ObjectTableColumn(app, "ipc", cls.Ipc) + self.ipc_col.width = "20%" self.sub_col = ObjectTableColumn(app, "sub", cls.Submitted) - self.status_col = ObjectTableColumn(app, "statuscol", cls.Status) + self.state_col = ObjectTableColumn(app, "statecol", cls.State) self.uptime_col = ObjectTableColumn(app, "uptimecol", cls.Uptime) self.owner_col = ObjectTableColumn(app, "ownercol", cls.Owner)
- self.add_column(self.loc_col) self.add_column(self.id_col) + self.add_column(self.ipc_col) self.add_column(self.sub_col) - self.add_column(self.status_col) + self.add_column(self.state_col) self.add_column(self.uptime_col) self.add_column(self.owner_col)
@@ -69,7 +73,7 @@ def do_invoke(self, invoc, id, args): self.invoc = invoc (hadoophost) = args - self.app.remote.stop_job_tracker(hadoophost, id, invoc.make_callback()) + self.app.remote.stop_job_tracker(hadoophost, [float(id)], invoc.make_callback())
def get_title(self, session): return "Remove job trackers" @@ -87,7 +91,7 @@
if not self.errors.get(session): #TODO get hosts associated with each id - hadoophost = "TODO" + hadoophost = "grid2.lab.bos.redhat.com:9090" self.task.invoke(session, ids, (hadoophost))
def render_title(self, session): @@ -111,10 +115,10 @@
def do_invoke(self, session, object, invoc, args): self.invoc = invoc - (binfile, owner, hadoophost) = args + (binfile, owner, hadoophost, name_node, count) = args
try: - call_async(self.callback, self.fake_call, binfile, owner, hadoophost) + self.app.remote.start_job_tracker(hadoophost, float(name_node), binfile, owner, count, invoc.make_callback()) except Exception, e: invoc.status = invoc.FAILED log.debug("Creating job tracker failed", exc_info=True) @@ -134,8 +138,14 @@ self.owner = Owner(app, "owner") self.add_field(self.owner)
+ self.nameNode = NameNodeField(app, "namenode") + self.add_field(self.nameNode) + self.hadoophost = HadoopHostField(app, "hadoophost") - self.add_field(self.hadoophost) + self.add_field(self.hadoophost) + + self.count = self.CountField(app, "count") + self.add_field(self.count)
def process_display(self, session): self.scheduler.validate(session) @@ -150,8 +160,14 @@ binfile = self.binfile.get(session) owner = self.owner.get(session) hadoophost = self.hadoophost.get(session) - self.task.invoke(session, None, (binfile, owner, hadoophost)) + name_node = self.nameNode.get(session) + count = self.count.get(session) + self.task.invoke(session, None, (binfile, owner, hadoophost, name_node, count)) self.task.exit_with_redirect(session, url)
def render_title(self, session): - return "Create a job tracker" \ No newline at end of file + return "Create a job tracker" + + class CountField(StringField): + def render_title(self, session): + return "Number of job trackers to start" \ No newline at end of file
Modified: branches/elephant/cumin/python/cumin/gridhadoop/namenode.py =================================================================== --- branches/elephant/cumin/python/cumin/gridhadoop/namenode.py 2012-12-17 15:54:45 UTC (rev 5606) +++ branches/elephant/cumin/python/cumin/gridhadoop/namenode.py 2012-12-17 17:01:11 UTC (rev 5607) @@ -14,19 +14,6 @@ def get_sage_results(self, values): results = self.app.remote.get_name_node_list() return results - - def process_record(self, key, record): - field_data = list() - for column in self.columns: - try: - val = record[column.name.lower()] - except AttributeError: - if (column.name == "Ipc"): - val = record.ref.ipc - else: - val = record.ref.id - field_data.append(val) - return field_data
class NameNodeSelector(ObjectSelector): def __init__(self, app, name): @@ -34,7 +21,7 @@
super(NameNodeSelector, self).__init__(app, name, cls)
- self.add_search_filter(self.table.ipc_col) + self.add_search_filter(self.table.id_col) self.table.adapter = NameNodeAdapter(app, cls)
task = NameNodeCreate(app) @@ -55,16 +42,16 @@ def __init__(self, app, name, cls): super(NameNodeTable, self).__init__(app, name, cls)
+ self.id_col = ObjectTableColumn(app, "idcol", cls.Id) self.ipc_col = ObjectTableColumn(app, "ipc", cls.Ipc) - self.ipc_col.width = "20%" - self.id_col = ObjectTableColumn(app, "idcol", cls.ID) + self.ipc_col.width = "20%" self.sub_col = ObjectTableColumn(app, "sub", cls.Submitted) self.state_col = ObjectTableColumn(app, "statecol", cls.State) self.uptime_col = ObjectTableColumn(app, "uptimecol", cls.Uptime) self.owner_col = ObjectTableColumn(app, "ownercol", cls.Owner)
- self.add_column(self.ipc_col) self.add_column(self.id_col) + self.add_column(self.ipc_col) self.add_column(self.sub_col) self.add_column(self.state_col) self.add_column(self.uptime_col)
Modified: branches/elephant/cumin/python/cumin/gridhadoop/tasktracker.py =================================================================== --- branches/elephant/cumin/python/cumin/gridhadoop/tasktracker.py 2012-12-17 15:54:45 UTC (rev 5606) +++ branches/elephant/cumin/python/cumin/gridhadoop/tasktracker.py 2012-12-17 17:01:11 UTC (rev 5607) @@ -11,15 +11,19 @@
from sage.util import *
- +class TaskTrackerAdapter(HadoopAdapter): + def get_sage_results(self, values): + results = self.app.remote.get_task_tracker_list() + return results + class TaskTrackerSelector(ObjectSelector): def __init__(self, app, name): cls = app.model.com_redhat_cumin_grid_hadoop.TaskTracker
super(TaskTrackerSelector, self).__init__(app, name, cls)
- self.add_search_filter(self.table.jt_col) - self.table.adapter = HadoopAdapter(app, cls) + self.add_search_filter(self.table.id_col) + self.table.adapter = TaskTrackerAdapter(app, cls)
task = TaskTrackerCreate(app) link = TaskLink(app, "taskTracker_create", task) @@ -43,18 +47,15 @@ def __init__(self, app, name, cls): super(TaskTrackerTable, self).__init__(app, name, cls)
- self.jt_col = ObjectTableColumn(app, "jtcol", cls.JobTracker) - self.jt_col.width="20%" - self.id_col = ObjectTableColumn(app, "idcol", cls.ID) + self.id_col = ObjectTableColumn(app, "idcol", cls.Id) self.sub_col = ObjectTableColumn(app, "sub", cls.Submitted) - self.status_col = ObjectTableColumn(app, "statuscol", cls.Status) + self.state_col = ObjectTableColumn(app, "statecol", cls.State) self.uptime_col = ObjectTableColumn(app, "uptimecol", cls.Uptime) self.owner_col = ObjectTableColumn(app, "ownercol", cls.Owner)
- self.add_column(self.jt_col) self.add_column(self.id_col) self.add_column(self.sub_col) - self.add_column(self.status_col) + self.add_column(self.state_col) self.add_column(self.uptime_col) self.add_column(self.owner_col)
@@ -69,7 +70,7 @@ def do_invoke(self, invoc, id, args): self.invoc = invoc (hadoophost) = args - self.app.remote.stop_task_tracker(hadoophost, id, invoc.make_callback()) + self.app.remote.stop_task_tracker(hadoophost, [float(id)], invoc.make_callback())
def get_title(self, session): return "Remove task trackers" @@ -87,7 +88,7 @@
if not self.errors.get(session): #TODO get hosts associated with each id - hadoophost = "TODO" + hadoophost = "grid2.lab.bos.redhat.com:9090" self.task.invoke(session, ids, (hadoophost))
def render_title(self, session): @@ -114,7 +115,7 @@ (binfile, owner, hadoophost, count, job_tracker) = args
try: - call_async(self.callback, self.fake_call, binfile, owner, hadoophost, count, job_tracker) + self.app.remote.start_data_node(hadoophost, float(job_tracker), binfile, owner, count, invoc.make_callback()) except Exception, e: invoc.status = invoc.FAILED log.debug("Creating task trackers failed", exc_info=True) @@ -214,9 +215,11 @@ (session)
def do_get_items(self, session): - cls = self.app.model.com_redhat_cumin_grid_hadoop.JobTracker - name_nodes = [98765, 54321] - return name_nodes + nodes = [] + (status, node_list) = self.app.remote.get_job_tracker_list() + for node in node_list: + nodes.append(node.ref.id) + return nodes
def render_item_value(self, session, item): return item
Modified: branches/elephant/sage/python/sage/aviary/aviaryoperations.py =================================================================== --- branches/elephant/sage/python/sage/aviary/aviaryoperations.py 2012-12-17 15:54:45 UTC (rev 5606) +++ branches/elephant/sage/python/sage/aviary/aviaryoperations.py 2012-12-17 17:01:11 UTC (rev 5607) @@ -950,7 +950,7 @@ return self._operate_on_ids(host, ids, callback, "getDataNode")
def start_job_tracker(self, host, nn_id, bin_file, owner, count, callback): - return self._start_node(host, name_node_id, bin_file, owner, count, + return self._start_node(host, nn_id, bin_file, owner, count, "startJobTracker", callback)
def stop_job_tracker(self, host, ids, callback):
cumin-developers@lists.fedorahosted.org