Author: tmckay Date: 2011-08-26 14:18:11 +0000 (Fri, 26 Aug 2011) New Revision: 4936
Modified: trunk/cumin/python/cumin/grid/job.py trunk/cumin/python/cumin/model.py trunk/cumin/python/cumin/util.py trunk/sage/python/sage/aviary/aviaryoperations.py trunk/sage/python/sage/qmf/qmfoperations.py Log: Add get_job_summaries in aviaryoperations, pass necessary objects to methods in aviaryoperations to fully qualify all id fields in arguments to aviary
Modified: trunk/cumin/python/cumin/grid/job.py =================================================================== --- trunk/cumin/python/cumin/grid/job.py 2011-08-17 18:17:46 UTC (rev 4935) +++ trunk/cumin/python/cumin/grid/job.py 2011-08-26 14:18:11 UTC (rev 4936) @@ -67,6 +67,17 @@ cls = self.app.model.com_redhat_grid.JobServer return cls.get_object(session.cursor, _id=submission._jobserverRef_id)
+ def get_trifecta(self, session, id): + # return submission, job_server, and scheduler too! + submission = self.get_submission(session, id) + + cls = self.app.model.com_redhat_grid.JobServer + js = cls.get_object(session.cursor, _id=submission._jobserverRef_id) + + cls = self.app.model.com_redhat_grid.Scheduler + sched = cls.get_object(session.cursor, _id=js._schedulerRef_id) + return (submission, js, sched) + def get_scheduler(self, session, id): job_server = self.get_job_server(session, id)
@@ -121,9 +132,13 @@ def get_qmf_results(self, values): session = values['session'] submission = self.submission.get(session) + try: + js = self.app.model.get_jobserver_from_submission(session, + submission).Machine + except: + js = "" + return self.app.model.get_submission_job_summaries(submission, js)
- return self.app.model.get_submission_job_summaries(submission) - def do_get_data(self, values): results = self.get_qmf_results(values) summaries = results.data @@ -285,10 +300,10 @@ self.form = JobObjectSelectorTaskForm(app, self.name, self, verb) self.method = None
- def do_invoke(self, invoc, job_id, scheduler, reason): + def do_invoke(self, invoc, job_id, scheduler, reason, submission): assert self.method
- results = self.method(scheduler, job_id, reason) + results = self.method(scheduler, job_id, reason, submission) if results.error: raise results.error
@@ -385,12 +400,12 @@ selection = self.selection.get(session) reason = self.get_reason(session, self.verb)
- scheduler = self.get_scheduler(session) + submission, scheduler = self.get_submission_sched(session)
- self.task.invoke(session, selection, scheduler, reason) + self.task.invoke(session, selection, scheduler, reason, submission) self.task.exit_with_redirect(session)
- def get_scheduler(self, session): + def get_submission_sched(self, session): submission_id = self.submission_id.get(session) cls = self.app.model.com_redhat_grid.Submission submission = cls.get_object_by_id(session.cursor, submission_id) @@ -399,7 +414,8 @@ job_server = cls.get_object(session.cursor, _id=submission._jobserverRef_id)
cls = self.app.model.com_redhat_grid.Scheduler - return cls.get_object(session.cursor, _id=job_server._schedulerRef_id) + sched = cls.get_object(session.cursor, _id=job_server._schedulerRef_id) + return (submission, sched)
def render_content(self, session, *args): content = super(JobObjectSelectorTaskForm, self).render_content(session, *args) @@ -428,9 +444,11 @@ if not ad_list and not error: ad_list = list() id = self.frame.id.get(session) - job_server = self.frame.get_job_server(session, id) + submission, job_server, sched = self.frame.get_trifecta(session, id) job_id = self.frame.job_id.get(session) - results = self.app.remote.get_job_ad(job_server, job_id, {'JobAd': {}}) + results = self.app.remote.get_job_ad(job_server, job_id, + sched.Name, submission, + default={'JobAd': {}}) error = results.error self.qmf_error.set(session, error) ads = results.data['JobAd'] @@ -514,9 +532,11 @@ if not ad_list and not error: ad_list = list() id = self.frame.id.get(session) - job_server = self.frame.get_job_server(session, id) + submission, job_server, sched = self.frame.get_trifecta(session, id) job_id = self.frame.job_id.get(session) - results = self.app.remote.get_job_ad(job_server, job_id, {'JobAd': {}}) + results = self.app.remote.get_job_ad(job_server, job_id, + sched.Name, submission, + default={'JobAd': {}}) error = results.error self.qmf_error.set(session, error) ads = results.data['JobAd'] @@ -844,10 +864,13 @@ id = self.frame.id.get(session) scheduler = self.frame.get_scheduler(session, id) job_id = self.frame.job_id.get(session) + submission = self.frame.get_submission(session, id)
task = self.frame.set_ad_task for field in just_ads: - task.invoke(session, scheduler, job_id, field, str(just_ads[field]), ads[field]["error"]) + task.invoke(session, scheduler, job_id, field, + str(just_ads[field]), ads[field]["error"], + submission) self.process_cancel(session)
class OutputFile(Widget): @@ -862,12 +885,16 @@
def render_content(self, session): id = self.frame.id.get(session) - job_server = self.frame.get_job_server(session, id) + submission, job_server, scheduler = self.frame.get_trifecta(session, id) job_id = self.frame.job_id.get(session) state, file, start, end = self.get_file_args(session) + if file: result = self.app.remote.fetch_job_data(job_server, job_id, state, - file, start, end, {'Data': ""}) + file, start, end, + scheduler.Name, + submission, + default={'Data': ""}) if result.error: return result.status return escape_entity(result.data['Data']) @@ -921,7 +948,8 @@ def add_path(path, filename): # If filename does not begin with an absolute # path, prepend the path value to filename - if not filename.startswith("/"): + if filename is not None and \ + not filename.startswith("/"): if not path.endswith("/"): path += "/" filename = path + filename @@ -1085,8 +1113,8 @@ scheduler = self.task.frame.get_scheduler(session, id) reason = self.get_reason(session, self.verb) job_id = self.job_id.get(session) - - self.task.invoke(session, scheduler, job_id, reason) + submission = self.task.frame.get_submission(session, id) + self.task.invoke(session, scheduler, job_id, reason, submission) self.task.exit_with_redirect(session)
class ReasonField(StringField): @@ -1103,15 +1131,16 @@ job_id = self.frame.job_id.get(osession) self.form.job_id.set(session, job_id)
- def do_invoke(self, invoc, scheduler, job_id, reason): + def do_invoke(self, invoc, scheduler, job_id, reason, submission): raise Exception("oops, not implemented!")
class JobHold(JobAction): def __init__(self, app, frame): super(JobHold, self).__init__(app, frame, "held")
- def do_invoke(self, invoc, scheduler, job_id, reason): - self.app.remote.hold_job(scheduler, job_id, reason, invoc.make_callback()) + def do_invoke(self, invoc, scheduler, job_id, reason, submission): + self.app.remote.hold_job(scheduler, job_id, reason, submission, + callback=invoc.make_callback())
def get_title(self, session): return "Hold Job" @@ -1120,8 +1149,9 @@ def __init__(self, app, frame): super(JobRelease, self).__init__(app, frame, "released")
- def do_invoke(self, invoc, scheduler, job_id, reason): - self.app.remote.release_job(scheduler, job_id, reason, invoc.make_callback()) + def do_invoke(self, invoc, scheduler, job_id, reason, submission): + self.app.remote.release_job(scheduler, job_id, reason, submission, + callback=invoc.make_callback())
def get_title(self, session): return "Release Job" @@ -1130,8 +1160,9 @@ def __init__(self, app, frame): super(JobRemove, self).__init__(app, frame, "removed")
- def do_invoke(self, invoc, scheduler, job_id, reason): - self.app.remote.remove_job(scheduler, job_id, reason, invoc.make_callback()) + def do_invoke(self, invoc, scheduler, job_id, reason, submission): + self.app.remote.remove_job(scheduler, job_id, reason, submission, + callback=invoc.make_callback())
def get_title(self, session): return "Remove Job" @@ -1161,12 +1192,14 @@ def get_description(self, session): return "Edit Ad"
- def do_invoke(self, invoc, scheduler, job_id, name, value, error): + def do_invoke(self, invoc, scheduler, job_id, name, value, error, submission): # Don't make the call, but we want to see the banner # so invoke the callback directly with the error as status if error: invoc.make_callback()(name + ": " + str(error), None) else: - self.app.remote.set_job_attribute(scheduler, job_id, name, value, - invoc.make_callback()) + self.app.remote.set_job_attribute(scheduler, + job_id, name, value, + invoc.make_callback(), + submission)
Modified: trunk/cumin/python/cumin/model.py =================================================================== --- trunk/cumin/python/cumin/model.py 2011-08-17 18:17:46 UTC (rev 4935) +++ trunk/cumin/python/cumin/model.py 2011-08-26 14:18:11 UTC (rev 4936) @@ -179,7 +179,7 @@ finally: self.lock.release()
- def get_submission_job_summaries(self, submission): + def get_submission_job_summaries(self, submission, machine_name=""): if not submission: store = SubmissionJobSummaryStore(self, None) store.exception = Exception("Missing Submission") @@ -192,7 +192,7 @@ store = self.job_summaries_by_submission[submission._id] store.extend_updates() except KeyError: - store = SubmissionJobSummaryStore(self, submission) + store = SubmissionJobSummaryStore(self, submission, machine_name) store.start_updates()
self.job_summaries_by_submission[submission._id] = store @@ -201,6 +201,15 @@ finally: self.lock.release()
+ def get_jobserver_from_submission(self, session, submission): + cls = self.com_redhat_grid.JobServer + job_server = cls.get_object(session.cursor, _id=submission._jobserverRef_id) + return job_server + + def get_scheduler_from_jobserver(self, session, job_server): + cls = self.app.model.com_redhat_grid.Scheduler + return cls.get_object(session.cursor, _id=job_server._schedulerRef_id) + class CuminProperty(object): def __init__(self, cls, name): self.model = cls.model @@ -757,20 +766,24 @@ super(NegotiatorLimitStore, self).delete()
class SubmissionJobSummaryStore(ObjectStore): - def __init__(self, model, submission): + def __init__(self, model, submission, machine_name=""): super(SubmissionJobSummaryStore, self).__init__(model)
self.submission = submission + self.machine_name = machine_name
def update(self, cursor): def completion(status, data): self.status = status try: - self.data = data["Jobs"] + if data is not None: + self.data = data["Jobs"] except KeyError: pass
- self.model.app.remote.get_job_summaries(self.submission, completion) + self.model.app.remote.get_job_summaries(self.submission, + completion, + self.machine_name)
def delete(self): del self.model.job_summaries_by_submission[self.submission._id]
Modified: trunk/cumin/python/cumin/util.py =================================================================== --- trunk/cumin/python/cumin/util.py 2011-08-17 18:17:46 UTC (rev 4935) +++ trunk/cumin/python/cumin/util.py 2011-08-26 14:18:11 UTC (rev 4936) @@ -1,6 +1,7 @@ import sys import os import logging +import string
from crypt import crypt from datetime import datetime, timedelta @@ -160,14 +161,20 @@ return dvalue
class JobStatusInfo(object): - stat_strings = ["Unexpanded", "Idle", "Running", "Removed", "Completed", "Held", "Submission Error"] + stat_strings = ["Unexpanded", "Idle", "Running", "Removed", "Completed", "Held", "Submission error"] stat_colors = ["red", "clear", "green", "black", "blue", "yellow", "red"] @classmethod def get_status_string(cls, stat): - try: - return cls.stat_strings[stat] - except: - return "" + # Allow strings to be passed in for flexibility. + # Enforcing cumin case should make strings + # match stat_strings for expected values + if type(stat) in (str, unicode): + return string.capitalize(stat) + else: + try: + return cls.stat_strings[stat] + except: + return "Unrecognized"
@classmethod def get_status_color(cls, stat):
Modified: trunk/sage/python/sage/aviary/aviaryoperations.py =================================================================== --- trunk/sage/python/sage/aviary/aviaryoperations.py 2011-08-17 18:17:46 UTC (rev 4935) +++ trunk/sage/python/sage/aviary/aviaryoperations.py 2011-08-26 14:18:11 UTC (rev 4936) @@ -5,12 +5,13 @@ import urllib2 import socket import string +import time
from suds import * from suds.client import Client from sage.util import CallSync, CallThread, ObjectPool, host_list +from datetime import datetime
- log = logging.getLogger("sage.aviary")
#f = open("./suds.client.log", 'a+') @@ -41,19 +42,17 @@
- Add logging to Aviary
-- See if we can fill in the scheduler/pool information. -What is cumin passing? (the scheduler object has this stuff, -but what about hierarchical collectors?) +- Test pool/schedd/stuff with hierarchical collectors
-- get the submission objects passed down if possible -for job control functions. This would be extra stuff for QMF, but helpful with -Aviary. - - Add a summary comment at the top like QMF opreations
- can we use default/timeout with suds? -yes, there is at least a timeout that can be set once, -it might be possible to change the timeout per call. +looks like the way to do this is through the Transport object that +is set in the client. The timeout is set on the Transport itself, +and the Transport can be changed on the client with set_options. +Alternatively, we could retain a reference to the Transport in +the clients that we pool so that we can call set_options on the +transport. '''
class AviaryOperations(object): @@ -62,7 +61,7 @@ self.datadir = datadir
# job_servers and query_servers are comma separated lists of - # network locations. See comments on host_port_list for format. + # network locations. See comments on host_list() for format. # Replace any occurrence of locahost with output of gethostname() # before parsing to match Machine fields of QMF objects later on. host = socket.gethostname() @@ -90,7 +89,7 @@
# job server operations
- def set_job_attribute(self, scheduler, job_id, name, value, callback): + def set_job_attribute(self, scheduler, job_id, name, value, callback, submission): assert callback
job_client = self.job_client_pool.get_object() @@ -116,6 +115,8 @@ jobId.job = job_id jobId.pool = scheduler.Pool jobId.scheduler = scheduler.Name + jobId.submission.name = submission.Name + jobId.submission.owner = submission.Owner
# Make attribute parameter from name and value aviary_attr = job_client.factory.create('ns0:Attribute') @@ -134,7 +135,7 @@ def my_callback(result): # Turn this back off before we put it back in the pool # so allow_overrides isn't set for someone else... - job_client.set_allow_overrides(False) + job_client.set_enable_attributes(False) self.job_client_pool.return_object(job_client) result = self._pretty_result(result, scheduler.Machine) if isinstance(result, Exception): @@ -144,7 +145,11 @@ # we'll pass it anyway even though Cumin does not care # at the present time status = AviaryOperations._get_status(result.status) - callback(status, result.id) + if status == "OK" and hasattr(result, "id"): + id = result.id + else: + id = None + callback(status, id)
host = self._get_host(scheduler.Machine, self.job_servers) if host == "": @@ -170,7 +175,8 @@ # "extras" fields and set allowOverrides to True. # (otherwise, Requirements will be limited to particular # resource constraint types defined by aviary) - job_client.set_allow_overrides(True) + job_client.set_enable_attributes(True) + job_client.set_attributes({"allowOverrides": True}) extras = list() for k, v in ad.iteritems(): # We don't need to send descriptors down to aviary @@ -198,37 +204,76 @@ t = CallThread(job_client.service.submitJob, my_callback, *args) t.start()
- def hold_job(self, scheduler, job_id, reason, - callback=None, default=None, timeout=5): + def hold_job(self, scheduler, job_id, reason, submission, *args, **kwargs): ''' This method is asynchronous iff 'callback' is supplied. + + kwargs will be searched for 'callback', 'default' and 'timeout' arguments. + + These optional arguments were moved to kwargs for compatibility + with another implementation of the same routine. ''' - return self._control_job(scheduler, job_id, reason, + + callback = "callback" in kwargs and kwargs["callback"] or None + default = "default" in kwargs and kwargs["default"] or None + timeout = "timeout" in kwargs and kwargs["timeout"] or 5 + + return self._control_job(scheduler, job_id, reason, submission, "holdJob", callback, default, timeout)
- def release_job(self, scheduler, job_id, reason, - callback=None, default=None, timeout=5): + def release_job(self, scheduler, job_id, reason, submission, *args, **kwargs): ''' This method is asynchronous iff 'callback' is supplied. + + kwargs will be searched for 'callback', 'default' and 'timeout' arguments. + + These optional arguments were moved to kwargs for compatibility + with another implementation of the same routine. ''' - return self._control_job(scheduler, job_id, reason, + + callback = "callback" in kwargs and kwargs["callback"] or None + default = "default" in kwargs and kwargs["default"] or None + timeout = "timeout" in kwargs and kwargs["timeout"] or 5 + + return self._control_job(scheduler, job_id, reason, submission, "releaseJob", callback, default, timeout)
- def remove_job(self, scheduler, job_id, reason, - callback=None, default=None, timeout=5): + def remove_job(self, scheduler, job_id, reason, submission, *args, **kwargs): ''' This method is asynchronous iff 'callback' is supplied. + + kwargs will be searched for 'callback', 'default' and 'timeout' arguments. + + These optional arguments were moved to kwargs for compatibility + with another implementation of the same routine. ''' - return self._control_job(scheduler, job_id, reason, + + callback = "callback" in kwargs and kwargs["callback"] or None + default = "default" in kwargs and kwargs["default"] or None + timeout = "timeout" in kwargs and kwargs["timeout"] or 5 + + return self._control_job(scheduler, job_id, reason, submission, "removeJob", callback, default, timeout)
+######################### # query server ops +########################
def fetch_job_data(self, job_server, job_id, ftype, file, start, end, - default=None, timeout=5): + scheduler_name, submission, *args, **kwargs): + ''' + kwargs will be searched for 'default' and 'timeout' arguments. + + These optional arguments were moved to kwargs for compatibility + with another implementation of the same routine. + ''' + + default = "default" in kwargs and kwargs["default"] or None + timeout = "timeout" in kwargs and kwargs["timeout"] or 5 + # Aviary doesn't use the file name as does QMF, instead it # specifies the file type and lets condor figure out the path.
@@ -242,9 +287,12 @@ data = None else: status = AviaryOperations._get_status(result.status) - # Match the format expected by Cumin. This is - # the format used by the QMF call... - data = {'Data': result.content} + if status == "OK" and hasattr(result, "content"): + # Match the format expected by Cumin. This is + # the format used by the QMF call... + data = {'Data': result.content} + else: + data = None return (status, data)
host = self._get_host(job_server.Machine, self.query_servers) @@ -261,6 +309,9 @@ jobData = client.factory.create('ns0:JobData') jobData.id.job = job_id jobData.id.pool = job_server.Pool + jobData.id.scheduler = scheduler_name + jobData.id.submission.name = submission.Name + jobData.id.submission.owner = submission.Owner
# Translate cumin file type to Aviary file type if ftype == "e": @@ -282,11 +333,22 @@ self.query_client_pool.return_object(client) return res;
- def get_job_ad(self, job_server, job_id, default=None, timeout=5): - + def get_job_ad(self, job_server, job_id, scheduler_name, submission, + *args, **kwargs): + ''' + kwargs will be searched for 'default' and 'timeout' arguments. + + These optional arguments were moved to kwargs for compatibility + with another implementation of the same routine. + ''' + + default = "default" in kwargs and kwargs["default"] or None + timeout = "timeout" in kwargs and kwargs["timeout"] or 5 + client = self.query_client_pool.get_object()
def make_tuple(attr): + # Attempt to cast the value into the specified type if attr.type in self.aviary_to_type: try: v = self.aviary_to_type[attr.type](attr.value) @@ -307,12 +369,15 @@ data = None else: status = AviaryOperations._get_status(result[0].status) - # Match the format expected by Cumin. This is - # the format used by the QMF call. We have a list - # of attributes in attrs that we need to make into - # a dictionary - ads = make_dict(result[0].details.attrs) - data = {'JobAd': ads} + if status == "OK": + # Match the format expected by Cumin. This is + # the format used by the QMF call. We have a list + # of attributes in attrs that we need to make into + # a dictionary + ads = make_dict(result[0].details.attrs) + data = {'JobAd': ads} + else: + data = None return (status, data)
host = self._get_host(job_server.Machine, self.query_servers) @@ -329,15 +394,99 @@ jobId = client.factory.create('ns0:JobID') jobId.job = job_id jobId.pool = job_server.Pool - # still don't have the scheduler filled in here - # or the submission. Ditto with other jobId values in this module + jobId.scheduler = scheduler_name + jobId.submission.name = submission.Name + jobId.submission.owner = submission.Owner
res = self._call_sync(my_process_results, client.service.getJobDetails, jobId) self.query_client_pool.return_object(client) return res;
+ def get_job_summaries(self, submission, callback, machine_name): + assert callback + + query_client = self.query_client_pool.get_object() + + def to_int_seconds(dt): + # Change a datetime.datetime into int seconds since epoch + # Note, this works nicely if the datetime happens to include microseconds + # since the call to timetuple will drop them. Stuff coming back from + # condor should not have microseconds anyway. + return int(time.mktime(dt.timetuple())) + + def get_string(job, attr): + # Cast suds text types into str so we have standard Py types + # Handles optional strings as well + if hasattr(job, attr): + return str(getattr(job, attr)) + return "" + + def adapt(jobs): + # Make an aviary job summary look like the canonical form + # that cumin is expecting (actually the QMF form because of history). + result = list() + for job in jobs: + cluster, proc = job.id.job.split(".") + j = dict() + j["ClusterId"] = int(cluster) + j["Cmd"] = str(job.cmd) + j["EnteredCurrentStatus"] = to_int_seconds(job.last_update) + j["GlobalJobId"] = job.id.scheduler + \ + "#" + job.id.job + \ + "#" + str(to_int_seconds(job.queued)) + j["JobStatus"] = str(job.job_status) + j["ProcId"] = int(proc) + j["QDate"] = to_int_seconds(job.queued) + + # These may be null... + j["Args"] = get_string(job, "args1") + j["ReleaseReason"] = get_string(job, "released") + j["HoldReason"] = get_string(job, "held") + result.append(j) + return result + + def my_callback(result): + query_client.set_enable_attributes(False) + self.query_client_pool.return_object(query_client) + result = self._pretty_result(result, machine_name) + if isinstance(result, Exception): + callback(result, None) + else: + status = AviaryOperations._get_status(result[0].status) + if status == "OK" and hasattr(result[0], "jobs"): + data = {"Jobs": adapt(result[0].jobs)} + else: + data = {"Jobs": None} + callback(status, data) + + host = self._get_host(machine_name, self.query_servers) + if host == "": + self._raise_no_host(machine_name) + + service = host + "getSubmissionSummary" + + # Have to set the URL for the method. This might go away someday... + query_client.set_options(location=service) + + # What we really want here is the job summaries from the + # submission summary response. To get those, we have to + # set an extra attribute on the client... + query_client.set_enable_attributes(True) + query_client.set_attributes({"includeJobSummaries": "true"}) + + # Make a submission id. (see query wsdl) + subId = query_client.factory.create('ns0:SubmissionID') + subId.name = submission.Name + subId.owner = submission.Owner + + t = CallThread(query_client.service.getSubmissionSummary, + my_callback, subId) + t.start() + +######################################################## # Secret private implementation stuff, don't look! +########################################################
@classmethod def _type_to_aviary(cls): @@ -407,7 +556,8 @@ sync.get_completion()(*cb_args) return sync
- def _control_job(self, scheduler, job_id, reason, meth_name, + def _control_job(self, scheduler, job_id, reason, submission, + meth_name, callback, default, timeout):
host = self._get_host(scheduler.Machine, self.job_servers) @@ -428,6 +578,8 @@ jobId.job = job_id jobId.pool = scheduler.Pool jobId.scheduler = scheduler.Name + jobId.submission.name = submission.Name + jobId.submission.owner = submission.Owner
if callback: def my_callback(result): @@ -461,16 +613,19 @@ to the suds message after marshalling. ''' def __init__(self): - self.allow = False + self.set_attributes = False + self.attributes = dict()
def marshalled(self, context): - if self.allow: + if self.set_attributes: sj_body = context.envelope.getChild('Body')[0] - sj_body.attributes.append(Attribute("allowOverrides", "true")) + for k,v in self.attributes.iteritems(): + sj_body.attributes.append(Attribute(k, v)) + except: class OverridesPlugin(object): def __init__(self): - self.allow = False + self.set_attributes = False
class OverrideClient(Client): ''' @@ -484,9 +639,28 @@ except: super(OverrideClient, self).__init__(*args)
- def set_allow_overrides(self, truth): - self.override.allow = truth + def set_enable_attributes(self, truth): + ''' + Set the flag that controls whether extra attributes are marshalled.
+ The attribute set is specified through the set_attributes() method. + If 'truth' is True, then service calls made through this client + will have the extra attributes appended to the attribute set of service + request. + ''' + self.override.set_attributes = truth + + def set_attributes(self, attrs): + ''' + Specify extra attributes to append to the attribute set + of service requests made through this client. + + The extra attributes will be added if set_enable_attributes() + has been called with a value of True sometime prior to the + time of a request. + ''' + self.override.attributes = attrs + class JobClientPool(ObjectPool): def __init__(self, job_wsdl, max_size): super(JobClientPool, self).__init__(max_size) @@ -501,4 +675,4 @@ self.query_wsdl = query_wsdl
def create_object(self): - return Client(self.query_wsdl) + return OverrideClient(self.query_wsdl)
Modified: trunk/sage/python/sage/qmf/qmfoperations.py =================================================================== --- trunk/sage/python/sage/qmf/qmfoperations.py 2011-08-17 18:17:46 UTC (rev 4935) +++ trunk/sage/python/sage/qmf/qmfoperations.py 2011-08-26 14:18:11 UTC (rev 4936) @@ -81,7 +81,7 @@
# scheduler operations
- def set_job_attribute(self, scheduler, job_id, name, value, callback): + def set_job_attribute(self, scheduler, job_id, name, value, callback, *args): assert callback self._call(scheduler, "SetJobAttribute", callback, 0, 0, job_id, name, value) @@ -90,37 +90,84 @@ assert callback self._call(scheduler, "SubmitJob", callback, 0, 0, ad)
- def hold_job(self, scheduler, job_id, reason, - callback=None, default=None, timeout=5): + def hold_job(self, scheduler, job_id, reason, *args, **kwargs): ''' This method is asynchronous iff 'callback' is supplied. + + kwargs will be searched for 'callback', 'default' and 'timeout' arguments. + + These optional arguments were moved to kwargs for compatibility + with another implementation of the same routine. ''' + + callback = "callback" in kwargs and kwargs["callback"] or None + default = "default" in kwargs and kwargs["default"] or None + timeout = "timeout" in kwargs and kwargs["timeout"] or 5 + return self._call(scheduler, "HoldJob", callback, default, timeout, job_id, reason)
- def release_job(self, scheduler, job_id, reason, - callback=None, default=None, timeout=5): + def release_job(self, scheduler, job_id, reason, *args, **kwargs): ''' This method is asynchronous iff 'callback' is supplied. + + kwargs will be searched for 'callback', 'default' and 'timeout' arguments. + + These optional arguments were moved to kwargs for compatibility + with another implementation of the same routine. ''' + + callback = "callback" in kwargs and kwargs["callback"] or None + default = "default" in kwargs and kwargs["default"] or None + timeout = "timeout" in kwargs and kwargs["timeout"] or 5 + return self._call(scheduler, "ReleaseJob", callback, default, timeout, job_id, reason)
- def remove_job(self, scheduler, job_id, reason, - callback=None, default=None, timeout=5): + def remove_job(self, scheduler, job_id, reason, *args, **kwargs): ''' This method is asynchronous iff 'callback' is supplied. + + kwargs will be searched for 'callback', 'default' and 'timeout' arguments. + + These optional arguments were moved to kwargs for compatibility + with another implementation of the same routine. ''' + + callback = "callback" in kwargs and kwargs["callback"] or None + default = "default" in kwargs and kwargs["default"] or None + timeout = "timeout" in kwargs and kwargs["timeout"] or 5 + return self._call(scheduler, "RemoveJob", callback, default, timeout, job_id, reason)
# jobserver operations
- def get_job_ad(self, job_server, job_id, default=None, timeout=5): + def get_job_ad(self, job_server, job_id, *args, **kwargs): + ''' + kwargs will be searched for 'default' and 'timeout' arguments. + + These optional arguments were moved to kwargs for compatibility + with another implementation of the same routine. + ''' + + default = "default" in kwargs and kwargs["default"] or None + timeout = "timeout" in kwargs and kwargs["timeout"] or 5 + return self._call(job_server, "GetJobAd", 0, default, timeout, job_id)
def fetch_job_data(self, job_server, job_id, ftype, file, start, end, - default=None, timeout=5): + *args, **kwargs): + ''' + kwargs will be searched for 'default' and 'timeout' arguments. + + These optional arguments were moved to kwargs for compatibility + with another implementation of the same routine. + ''' + + default = "default" in kwargs and kwargs["default"] or None + timeout = "timeout" in kwargs and kwargs["timeout"] or 5 + # QMF doesn't use the ftype value, it just uses the filename return self._call(job_server, "FetchJobData", 0, default, timeout, job_id, file, start, end) @@ -158,7 +205,7 @@
# submission operations
- def get_job_summaries(self, submission, callback): + def get_job_summaries(self, submission, callback, *args): assert callback return self._call(submission, "GetJobSummaries", callback, 0, 0)
cumin-developers@lists.fedorahosted.org