Author: tmckay Date: 2013-01-17 21:08:02 +0000 (Thu, 17 Jan 2013) New Revision: 5655
Modified: branches/tmckay/cumin/python/cumin/grid/job.py branches/tmckay/cumin/python/cumin/main.py branches/tmckay/mint/python/mint/aviary/endpoints.py branches/tmckay/mint/python/mint/aviary/session.py branches/tmckay/mint/python/mint/aviary/submissions.py branches/tmckay/mint/python/mint/main.py branches/tmckay/mint/python/mint/update.py branches/tmckay/sage/python/sage/aviary/aviaryoperations.py branches/tmckay/sage/python/sage/qmf/qmfoperations.py Log: Initial version with Aviary submissions working
Modified: branches/tmckay/cumin/python/cumin/grid/job.py =================================================================== --- branches/tmckay/cumin/python/cumin/grid/job.py 2013-01-17 19:41:02 UTC (rev 5654) +++ branches/tmckay/cumin/python/cumin/grid/job.py 2013-01-17 21:08:02 UTC (rev 5655) @@ -106,10 +106,9 @@
def check_submission_membership(self, session, sub_id, job_id): # Make sure that the job belongs to the designated submission. - id = self.id.get(session) frame = None message = "" - submission, sched = self.get_submission_sched(session, id) + submission, sched = self.get_submission_sched(session, sub_id) summaries = self.app.model.get_submission_job_summaries(submission, sched.Name)
Modified: branches/tmckay/cumin/python/cumin/main.py =================================================================== --- branches/tmckay/cumin/python/cumin/main.py 2013-01-17 19:41:02 UTC (rev 5654) +++ branches/tmckay/cumin/python/cumin/main.py 2013-01-17 21:08:02 UTC (rev 5655) @@ -206,7 +206,7 @@ self.aviary_job_servers or self.aviary_query_servers: try: from sage.aviary.aviaryoperations import \ - SudsLogging, AviaryOperationsFactory + SudsLogging, AviaryOperations except: imports_ok = False if imports_ok: @@ -224,7 +224,7 @@ # are empty strings. If locator is non empty, their actual # values will be overridden but the presence of a value will # still control enable/disable. - aviary_itf = AviaryOperationsFactory("aviary", aviary_dir, + aviary_itf = AviaryOperations("aviary", aviary_dir, self.aviary_locator, self.aviary_job_servers, self.aviary_query_servers,
Modified: branches/tmckay/mint/python/mint/aviary/endpoints.py =================================================================== --- branches/tmckay/mint/python/mint/aviary/endpoints.py 2013-01-17 19:41:02 UTC (rev 5654) +++ branches/tmckay/mint/python/mint/aviary/endpoints.py 2013-01-17 21:08:02 UTC (rev 5655) @@ -18,7 +18,7 @@ def __init__(self, url, id, factory): super(SubmissionEndpoint, self).__init__(url, id) self.size = 4 # do we want to control size per endpoint record? - self.query_ops = factory.query_ops + self.app = factory.app
# Set up time spans for polling/history and initialize search indices self.init_query_indices(factory.poll_delta, @@ -58,11 +58,12 @@ QuerySubmissionsBefore(self),)
class EndpointFactory(object): - def __init__(self, query_ops, poll_delta, history_delta): + def __init__(self, app, poll_delta, history_delta):
+ self.app = app + self.poll_delta = poll_delta self.history_delta = history_delta - self.query_ops = query_ops
self.type_map = \ {("CUSTOM","QUERY_SERVER"): self._make_submission_endpoint}
Modified: branches/tmckay/mint/python/mint/aviary/session.py =================================================================== --- branches/tmckay/mint/python/mint/aviary/session.py 2013-01-17 19:41:02 UTC (rev 5654) +++ branches/tmckay/mint/python/mint/aviary/session.py 2013-01-17 21:08:02 UTC (rev 5655) @@ -42,7 +42,7 @@
self.foreman = Foreman()
- endpoint_factory = EndpointFactory(self.app.aviary_ops, + endpoint_factory = EndpointFactory(self.app, poll_delta=timedelta(days=64), history_delta=timedelta(weeks=52))
Modified: branches/tmckay/mint/python/mint/aviary/submissions.py =================================================================== --- branches/tmckay/mint/python/mint/aviary/submissions.py 2013-01-17 19:41:02 UTC (rev 5654) +++ branches/tmckay/mint/python/mint/aviary/submissions.py 2013-01-17 21:08:02 UTC (rev 5655) @@ -2,20 +2,36 @@ from util import now_minus_duration from datetime import datetime
+from mint.update import AviarySubmissionUpdate + +def max(stuff): + d = 0 + for i in stuff: + if i.qdate > d: + d = i.qdate + return d + +def min(stuff): + d = None + for i in stuff: + if d is None or i.qdate < d: + d = i.qdate + return d + class QuerySubmissionsAfter(Work): def __init__(self, ep): super(QuerySubmissionsAfter, self).__init__() self.ep = ep
def do_the_job(self, foreman): - ids = self.ep.query_ops.get_submission_ids_by_qdate( + ids = self.ep.app.aviary_ops.get_submission_ids_by_qdate( self.ep.url, self.ep.size, self.ep.after_index, "AFTER")
- print ("querying", self.ep.after_index, datetime.fromtimestamp(self.ep.after_index)) - print (ids.status, ids.got_data) + #print ("querying after", self.ep.after_index, datetime.fromtimestamp(self.ep.after_index)) + #print (ids.status, ids.got_data)
# if data.ids is empty (remaining == 0) # then we should reset qdate to some point in @@ -23,7 +39,7 @@ # yet. This is how we will accomplish polling, # just looping over the data continuously. if ids.status == 0 and ids.got_data: - print ids.data.ids + #print ids.data.ids
# ids.data.ids is a list of SubmissionIDs # Each of these ids needs to be polled to @@ -43,10 +59,11 @@ # back an empty list but sets remaining... # In that case just leave after_index as is if ids.data.ids: - self.ep.after_index = ids.data.ids[-1].qdate +# self.ep.after_index = ids.data.ids[-1].qdate + self.ep.after_index = max(ids.data.ids) else: - print "query after reset" - #self.ep.after_index = now_minus_duration(self.ep.poll_delta) + #print "query after reset" + self.ep.after_index = now_minus_duration(self.ep.poll_delta)
else: print "query after failed" @@ -63,14 +80,14 @@
# this should mark self.ep.after_queries_completed # or self.ep.before_queries_completed as necessary - ids = self.ep.query_ops.get_submission_ids_by_qdate( + ids = self.ep.app.aviary_ops.get_submission_ids_by_qdate( self.ep.url, self.ep.size, self.ep.before_index, "BEFORE")
- print ("querying", self.ep.before_index, datetime.fromtimestamp(self.ep.before_index)) - print (ids.status, ids.got_data) + #print ("querying before", self.ep.before_index, datetime.fromtimestamp(self.ep.before_index)) + #print (ids.status, ids.got_data)
# if data.ids is empty (remaining == 0) # then we should reset qdate to some point in @@ -91,18 +108,20 @@ # the qdate of the last item gives us the # starting index of the next query if ids.data.remaining: - self.ep.before_index = ids.data.ids[-1].qdate +# self.ep.before_index = ids.data.ids[-1].qdate + self.ep.before_index = min(ids.data.ids) + if self.ep.before_index <= self.ep.before_cutoff: - print ("hit before cutoff, finishing", - datetime.fromtimestamp(self.ep.before_index), - datetime.fromtimestamp(self.ep.before_cutoff)) + #print ("hit before cutoff, finishing", + # datetime.fromtimestamp(self.ep.before_index), + # datetime.fromtimestamp(self.ep.before_cutoff)) self.ep.before_queries_completed = True - #self.ep.before_index = now_minus_duration(self.ep.poll_delta) +# self.ep.before_index = now_minus_duration(self.ep.poll_delta)
else: - print "finished" + #print "query before reset" self.ep.before_queries_completed = True - #self.ep.before_index = now_minus_duration(self.ep.poll_delta) +# self.ep.before_index = now_minus_duration(self.ep.poll_delta)
else: print "query before failed" @@ -124,7 +143,7 @@ id.Name = id.name id.Owner = id.owner
- summaries = self.ep.query_ops.get_submission_summaries( + summaries = self.ep.app.aviary_ops.get_submission_summaries( self.ids, self.ep.url, include_job_summaries=False) @@ -141,7 +160,7 @@ for summary in summaries.data: summary.Name = str(summary.id.name) summary.Owner = str(summary.id.owner) - summary.Qdate = summary.id.qdate + summary.QDate = summary.id.qdate
summary.Idle = summary.idle summary.Running = summary.running @@ -151,8 +170,12 @@ summary.TransferringOutput = summary.transferring_output summary.Suspended = summary.suspended
- print summary + summary.endpoint_id = self.ep.id + #print summary.Name
+ a = AviarySubmissionUpdate(self.ep.app.model, summary) + self.ep.app.update_thread.enqueue(a) + # summary.statistics = {
# properties = Name, Owner, Qdate, jobserverRef, schedulerRef
Modified: branches/tmckay/mint/python/mint/main.py =================================================================== --- branches/tmckay/mint/python/mint/main.py 2013-01-17 19:41:02 UTC (rev 5654) +++ branches/tmckay/mint/python/mint/main.py 2013-01-17 21:08:02 UTC (rev 5655) @@ -130,6 +130,7 @@ else: log.info("Imports failed for Aviary interface, disabling")
+ # This session type sets up discovery for data objects supported by Aviary if self.aviary_ops: self.aviary_session = MintAviarySession(self, self._aviary_classes)
Modified: branches/tmckay/mint/python/mint/update.py =================================================================== --- branches/tmckay/mint/python/mint/update.py 2013-01-17 19:41:02 UTC (rev 5654) +++ branches/tmckay/mint/python/mint/update.py 2013-01-17 21:08:02 UTC (rev 5655) @@ -705,36 +705,37 @@ self.submission = submission self.cls = model.com_redhat_grid.Submission
- # the qmf type string for an attribute can be gotten - # from the cls.attr_name.type - # If you run that through the transforms, you can - # get the sql type - def do_process(self, cursor, stats): agent_id = self.get_agent_id() object_id = self.get_object_id() + + # For aviary stuff, the agent is inferred + # There really are no separate updates, so + # we just base the agent id on info in the submission try: agent = self.model.agents_by_id[agent_id] except KeyError: - raise UpdateDropped() + agent = MintAgent(self.model, agent_id) + stats.agents_created += 1
try: - obj = agent.get_object(cursor, cls, object_id) + obj = agent.get_object(cursor, self.cls, object_id) except RosemaryNotFound: - obj = self.create_object(cursor, stats, cls) + obj = self.create_object(cursor, stats, self.cls) return
self.update_object(cursor, stats, obj)
def get_agent_id(self): - return self.cls._package._name + ":" + self.cls._name \ - + ":" + selfself.submission._scheduler + return self.submission.endpoint_id.resource + ":" + \ + self.submission.endpoint_id.sub_type + ":" + \ + self.submission.endpoint_id.name
def get_object_id(self): - return self.submission.name + return self.submission.id.name
def create_object(self, cursor, stats, cls): - update_time = datetime.datetime.now() + update_time = datetime.now() create_time = update_time
obj = cls.create_object(cursor) @@ -773,8 +774,6 @@ sql = "; ".join(statements) self.execute_sql(cursor, sql, obj.__dict__)
- self.process_deferred_links(cursor, obj) - obj._save_time = datetime.now()
self.model.print_event(3, "Created %s", obj) @@ -784,21 +783,8 @@
return obj
- def process_deferred_links(self, cursor, obj): - agent = self.model.agents_by_id[obj._qmf_agent_id] - - if obj._qmf_object_id not in agent.deferred_links_by_id: - return - - links = agent.deferred_links_by_id[obj._qmf_object_id] - - for link in links: - link.realize(cursor, obj) - - del agent.deferred_links_by_id[obj._qmf_object_id] - def update_object(self, cursor, stats, obj): - update_time = datetime.datetime.now() + update_time = datetime.now() create_time = update_time
obj._qmf_update_time = update_time @@ -844,7 +830,12 @@ #stats.objects_updated_by_class[cls] += 1
def transform_value(self, prop, value): - t = qmf_type_code_by_string[attr.type] + # For some reason the timestamps in QMF values + # are in 10s of microseconds, so we need to + # handle our own conversion here + if prop.type == "absTime": + return datetime.fromtimestamp(value) + t = qmf_type_code_by_string[prop.type] return transformers[t](value)
def process_properties(self, obj, columns, cursor): @@ -859,22 +850,25 @@
class FakeOID(object): def __init__(self, agent, objname): - self.isV2 = True self.agentName = agent self.objectName = objname
- val = FakeOID("com.redhat.grid:scheduler:"+self.submission.id._scheduler, - self.submission.id._scheduler) + val = FakeOID("com.redhat.grid:scheduler:" + \ + self.submission.id._scheduler, + self.submission.id._scheduler) prop = FakeProp("schedulerRef") - col, nvalue = self.process_reference(obj, prop, val, cursor) - if nvalue != getattr(obj, col.name): - setattr(obj, col.name, nvalue) - columns.append(col) + try: + col, nvalue = self.process_reference(obj, prop, val, cursor) + if nvalue != getattr(obj, col.name): + setattr(obj, col.name, nvalue) + columns.append(col) + except MappingException, e: + log.debug(e)
for name, prop in cls._properties_by_name.iteritems(): value = getattr(self.submission, name) if value is not None: - value = transform_value(prop, value) + value = self.transform_value(prop, value) if value == getattr(obj, prop.sql_column.name): continue setattr(obj, prop.sql_column.name, value) @@ -889,44 +883,39 @@ if not ref.sql_column: raise MappingException("Reference %s has no column" % ref.name)
+ # Okay, for submissions coming from Aviary we may actually see + # a submission before we see its scheduler. So, let's go ahead + # and assume the agent and create a deferred link. + # If the agent never shows up, no harm done. We should probably + # add a timeout to the agent and delete it if it does not show up. value = None + try: + agent = self.model.agents_by_id[oid.agentName] + except KeyError: + agent = MintAgent(self.model, oid.agentName)
- if oid: - if oid.isV2: - agent_id = oid.agentName - else: - # Not much we can do but assume same agent - agent_id = self.get_agent_id() + object_id = oid.objectName + try: + that = agent.get_object(cursor, ref.that_cls, object_id) + except RosemaryNotFound: + link = DeferredLink(obj, ref) + agent.deferred_links_by_id[object_id].append(link)
- try: - agent = self.model.agents_by_id[agent_id] - except KeyError: - raise MappingException("Agent %s is unknown" % agent_id) + msg = "Deferring link to object %s %s" + raise MappingException(msg % (ref.that_cls, object_id))
- object_id = oid.objectName + value = that._id
- try: - that = agent.get_object(cursor, ref.that_cls, object_id) - except RosemaryNotFound: - link = DeferredLink(obj, ref) - agent.deferred_links_by_id[object_id].append(link) - - msg = "Deferring link to object %s %s" - raise MappingException(msg % (ref.that_cls, object_id)) - - value = that._id - return ref.sql_column, value
def process_statistics(self, obj, update_columns, insert_columns): build_columns = list() saw_change = False
- # get a list of statistic names from the cls # loop through the list of properties, look for # the value in the object - for name, stat in cls._statistics_by_name.iteritems(): + for name, stat in self.cls._statistics_by_name.iteritems(): value = getattr(self.submission, name) if value is not None: value = self.transform_value(stat, value) @@ -973,10 +962,9 @@ def __repr__(self): name = self.__class__.__name__ agent_id = self.get_agent_id() - cls = self.qmf_object.getClassKey().getClassName() obj_id = self.get_object_id()
- return "%s(%s,%s,%s)" % (name, agent_id, cls, obj_id) + return "%s(%s,%s,%s)" % (name, agent_id, self.cls, obj_id)
class UpdateDropped(Exception): pass @@ -1001,5 +989,5 @@ transformers[14] = str transformers[15] = transform_pickle
-def transform_value(attr, value): +def transform_value(attr, value): return transformers[attr.type](value)
Modified: branches/tmckay/sage/python/sage/aviary/aviaryoperations.py =================================================================== --- branches/tmckay/sage/python/sage/aviary/aviaryoperations.py 2013-01-17 19:41:02 UTC (rev 5654) +++ branches/tmckay/sage/python/sage/aviary/aviaryoperations.py 2013-01-17 21:08:02 UTC (rev 5655) @@ -771,7 +771,7 @@ self.client_pool.return_object(client) return res;
- def get_job_ad(self, job_server, job_id, scheduler_name, submission, + def get_job_ad(self, sched, job_id, submission, *args, **kwargs): ''' kwargs will be searched for 'default' and 'timeout' arguments. @@ -802,7 +802,7 @@ def my_process_results(result): data = None # Fix up the exception message if necessary - result = self._pretty_result(result, job_server.Machine) + result = self._pretty_result(result, sched.Machine) if isinstance(result, Exception): status = result else: @@ -819,14 +819,14 @@ client = self.client_pool.get_object() self._setup_client(client, self.servers, # server lookup object - job_server.Machine, # host we want + sched.Machine, # host we want "getJobDetails")
# Make a job id parameter (see job wsdl) jobId = client.factory.create('ns0:JobID') jobId.job = job_id - jobId.pool = job_server.Pool - jobId.scheduler = scheduler_name + jobId.pool = sched.Pool + jobId.scheduler = sched.Name jobId.submission.name = submission.Name jobId.submission.owner = submission.Owner
Modified: branches/tmckay/sage/python/sage/qmf/qmfoperations.py =================================================================== --- branches/tmckay/sage/python/sage/qmf/qmfoperations.py 2013-01-17 19:41:02 UTC (rev 5654) +++ branches/tmckay/sage/python/sage/qmf/qmfoperations.py 2013-01-17 21:08:02 UTC (rev 5655) @@ -164,21 +164,6 @@ assert callback return self._call(master, "Stop", callback, 0, 0, daemon)
-# submission operations - - def get_job_summaries(self, submission, callback, *args): - ''' - This method is asynchronous iff 'callback' is supplied. - - kwargs will be searched for 'callback', 'default' and 'timeout' arguments. - ''' - - callback = "callback" in kwargs and kwargs["callback"] or None - default = "default" in kwargs and kwargs["default"] or None - timeout = "timeout" in kwargs and kwargs["timeout"] or 5 - - return self._call(submission, "GetJobSummaries", callback, default, timeout) - # Secret private implementation stuff, don't look! def _call(self, obj, meth, cb, dflt, tout, *args):
cumin-developers@lists.fedorahosted.org