Author: tmckay
Date: 2011-08-26 14:18:11 +0000 (Fri, 26 Aug 2011)
New Revision: 4936
Modified:
trunk/cumin/python/cumin/grid/job.py
trunk/cumin/python/cumin/model.py
trunk/cumin/python/cumin/util.py
trunk/sage/python/sage/aviary/aviaryoperations.py
trunk/sage/python/sage/qmf/qmfoperations.py
Log:
Add get_job_summaries in aviaryoperations, pass necessary objects to
methods in aviaryoperations to fully qualify all id fields in arguments
to aviary
Modified: trunk/cumin/python/cumin/grid/job.py
===================================================================
--- trunk/cumin/python/cumin/grid/job.py 2011-08-17 18:17:46 UTC (rev 4935)
+++ trunk/cumin/python/cumin/grid/job.py 2011-08-26 14:18:11 UTC (rev 4936)
@@ -67,6 +67,17 @@
cls = self.app.model.com_redhat_grid.JobServer
return cls.get_object(session.cursor, _id=submission._jobserverRef_id)
+ def get_trifecta(self, session, id):
+ # return submission, job_server, and scheduler too!
+ submission = self.get_submission(session, id)
+
+ cls = self.app.model.com_redhat_grid.JobServer
+ js = cls.get_object(session.cursor, _id=submission._jobserverRef_id)
+
+ cls = self.app.model.com_redhat_grid.Scheduler
+ sched = cls.get_object(session.cursor, _id=js._schedulerRef_id)
+ return (submission, js, sched)
+
def get_scheduler(self, session, id):
job_server = self.get_job_server(session, id)
@@ -121,9 +132,13 @@
def get_qmf_results(self, values):
session = values['session']
submission = self.submission.get(session)
+ try:
+ js = self.app.model.get_jobserver_from_submission(session,
+ submission).Machine
+ except:
+ js = ""
+ return self.app.model.get_submission_job_summaries(submission, js)
- return self.app.model.get_submission_job_summaries(submission)
-
def do_get_data(self, values):
results = self.get_qmf_results(values)
summaries = results.data
@@ -285,10 +300,10 @@
self.form = JobObjectSelectorTaskForm(app, self.name, self, verb)
self.method = None
- def do_invoke(self, invoc, job_id, scheduler, reason):
+ def do_invoke(self, invoc, job_id, scheduler, reason, submission):
assert self.method
- results = self.method(scheduler, job_id, reason)
+ results = self.method(scheduler, job_id, reason, submission)
if results.error:
raise results.error
@@ -385,12 +400,12 @@
selection = self.selection.get(session)
reason = self.get_reason(session, self.verb)
- scheduler = self.get_scheduler(session)
+ submission, scheduler = self.get_submission_sched(session)
- self.task.invoke(session, selection, scheduler, reason)
+ self.task.invoke(session, selection, scheduler, reason, submission)
self.task.exit_with_redirect(session)
- def get_scheduler(self, session):
+ def get_submission_sched(self, session):
submission_id = self.submission_id.get(session)
cls = self.app.model.com_redhat_grid.Submission
submission = cls.get_object_by_id(session.cursor, submission_id)
@@ -399,7 +414,8 @@
job_server = cls.get_object(session.cursor, _id=submission._jobserverRef_id)
cls = self.app.model.com_redhat_grid.Scheduler
- return cls.get_object(session.cursor, _id=job_server._schedulerRef_id)
+ sched = cls.get_object(session.cursor, _id=job_server._schedulerRef_id)
+ return (submission, sched)
def render_content(self, session, *args):
content = super(JobObjectSelectorTaskForm, self).render_content(session, *args)
@@ -428,9 +444,11 @@
if not ad_list and not error:
ad_list = list()
id = self.frame.id.get(session)
- job_server = self.frame.get_job_server(session, id)
+ submission, job_server, sched = self.frame.get_trifecta(session, id)
job_id = self.frame.job_id.get(session)
- results = self.app.remote.get_job_ad(job_server, job_id, {'JobAd': {}})
+ results = self.app.remote.get_job_ad(job_server, job_id,
+ sched.Name, submission,
+ default={'JobAd': {}})
error = results.error
self.qmf_error.set(session, error)
ads = results.data['JobAd']
@@ -514,9 +532,11 @@
if not ad_list and not error:
ad_list = list()
id = self.frame.id.get(session)
- job_server = self.frame.get_job_server(session, id)
+ submission, job_server, sched = self.frame.get_trifecta(session, id)
job_id = self.frame.job_id.get(session)
- results = self.app.remote.get_job_ad(job_server, job_id, {'JobAd': {}})
+ results = self.app.remote.get_job_ad(job_server, job_id,
+ sched.Name, submission,
+ default={'JobAd': {}})
error = results.error
self.qmf_error.set(session, error)
ads = results.data['JobAd']
@@ -844,10 +864,13 @@
id = self.frame.id.get(session)
scheduler = self.frame.get_scheduler(session, id)
job_id = self.frame.job_id.get(session)
+ submission = self.frame.get_submission(session, id)
task = self.frame.set_ad_task
for field in just_ads:
- task.invoke(session, scheduler, job_id, field, str(just_ads[field]), ads[field]["error"])
+ task.invoke(session, scheduler, job_id, field,
+ str(just_ads[field]), ads[field]["error"],
+ submission)
self.process_cancel(session)
class OutputFile(Widget):
@@ -862,12 +885,16 @@
def render_content(self, session):
id = self.frame.id.get(session)
- job_server = self.frame.get_job_server(session, id)
+ submission, job_server, scheduler = self.frame.get_trifecta(session, id)
job_id = self.frame.job_id.get(session)
state, file, start, end = self.get_file_args(session)
+
if file:
result = self.app.remote.fetch_job_data(job_server, job_id, state,
- file, start, end, {'Data': ""})
+ file, start, end,
+ scheduler.Name,
+ submission,
+ default={'Data': ""})
if result.error:
return result.status
return escape_entity(result.data['Data'])
@@ -921,7 +948,8 @@
def add_path(path, filename):
# If filename does not begin with an absolute
# path, prepend the path value to filename
- if not filename.startswith("/"):
+ if filename is not None and \
+ not filename.startswith("/"):
if not path.endswith("/"):
path += "/"
filename = path + filename
@@ -1085,8 +1113,8 @@
scheduler = self.task.frame.get_scheduler(session, id)
reason = self.get_reason(session, self.verb)
job_id = self.job_id.get(session)
-
- self.task.invoke(session, scheduler, job_id, reason)
+ submission = self.task.frame.get_submission(session, id)
+ self.task.invoke(session, scheduler, job_id, reason, submission)
self.task.exit_with_redirect(session)
class ReasonField(StringField):
@@ -1103,15 +1131,16 @@
job_id = self.frame.job_id.get(osession)
self.form.job_id.set(session, job_id)
- def do_invoke(self, invoc, scheduler, job_id, reason):
+ def do_invoke(self, invoc, scheduler, job_id, reason, submission):
raise Exception("oops, not implemented!")
class JobHold(JobAction):
def __init__(self, app, frame):
super(JobHold, self).__init__(app, frame, "held")
- def do_invoke(self, invoc, scheduler, job_id, reason):
- self.app.remote.hold_job(scheduler, job_id, reason, invoc.make_callback())
+ def do_invoke(self, invoc, scheduler, job_id, reason, submission):
+ self.app.remote.hold_job(scheduler, job_id, reason, submission,
+ callback=invoc.make_callback())
def get_title(self, session):
return "Hold Job"
@@ -1120,8 +1149,9 @@
def __init__(self, app, frame):
super(JobRelease, self).__init__(app, frame, "released")
- def do_invoke(self, invoc, scheduler, job_id, reason):
- self.app.remote.release_job(scheduler, job_id, reason, invoc.make_callback())
+ def do_invoke(self, invoc, scheduler, job_id, reason, submission):
+ self.app.remote.release_job(scheduler, job_id, reason, submission,
+ callback=invoc.make_callback())
def get_title(self, session):
return "Release Job"
@@ -1130,8 +1160,9 @@
def __init__(self, app, frame):
super(JobRemove, self).__init__(app, frame, "removed")
- def do_invoke(self, invoc, scheduler, job_id, reason):
- self.app.remote.remove_job(scheduler, job_id, reason, invoc.make_callback())
+ def do_invoke(self, invoc, scheduler, job_id, reason, submission):
+ self.app.remote.remove_job(scheduler, job_id, reason, submission,
+ callback=invoc.make_callback())
def get_title(self, session):
return "Remove Job"
@@ -1161,12 +1192,14 @@
def get_description(self, session):
return "Edit Ad"
- def do_invoke(self, invoc, scheduler, job_id, name, value, error):
+ def do_invoke(self, invoc, scheduler, job_id, name, value, error, submission):
# Don't make the call, but we want to see the banner
# so invoke the callback directly with the error as status
if error:
invoc.make_callback()(name + ": " + str(error), None)
else:
- self.app.remote.set_job_attribute(scheduler, job_id, name, value,
- invoc.make_callback())
+ self.app.remote.set_job_attribute(scheduler,
+ job_id, name, value,
+ invoc.make_callback(),
+ submission)
Modified: trunk/cumin/python/cumin/model.py
===================================================================
--- trunk/cumin/python/cumin/model.py 2011-08-17 18:17:46 UTC (rev 4935)
+++ trunk/cumin/python/cumin/model.py 2011-08-26 14:18:11 UTC (rev 4936)
@@ -179,7 +179,7 @@
finally:
self.lock.release()
- def get_submission_job_summaries(self, submission):
+ def get_submission_job_summaries(self, submission, machine_name=""):
if not submission:
store = SubmissionJobSummaryStore(self, None)
store.exception = Exception("Missing Submission")
@@ -192,7 +192,7 @@
store = self.job_summaries_by_submission[submission._id]
store.extend_updates()
except KeyError:
- store = SubmissionJobSummaryStore(self, submission)
+ store = SubmissionJobSummaryStore(self, submission, machine_name)
store.start_updates()
self.job_summaries_by_submission[submission._id] = store
@@ -201,6 +201,15 @@
finally:
self.lock.release()
+ def get_jobserver_from_submission(self, session, submission):
+ cls = self.com_redhat_grid.JobServer
+ job_server = cls.get_object(session.cursor, _id=submission._jobserverRef_id)
+ return job_server
+
+ def get_scheduler_from_jobserver(self, session, job_server):
+ cls = self.app.model.com_redhat_grid.Scheduler
+ return cls.get_object(session.cursor, _id=job_server._schedulerRef_id)
+
class CuminProperty(object):
def __init__(self, cls, name):
self.model = cls.model
@@ -757,20 +766,24 @@
super(NegotiatorLimitStore, self).delete()
class SubmissionJobSummaryStore(ObjectStore):
- def __init__(self, model, submission):
+ def __init__(self, model, submission, machine_name=""):
super(SubmissionJobSummaryStore, self).__init__(model)
self.submission = submission
+ self.machine_name = machine_name
def update(self, cursor):
def completion(status, data):
self.status = status
try:
- self.data = data["Jobs"]
+ if data is not None:
+ self.data = data["Jobs"]
except KeyError:
pass
- self.model.app.remote.get_job_summaries(self.submission, completion)
+ self.model.app.remote.get_job_summaries(self.submission,
+ completion,
+ self.machine_name)
def delete(self):
del self.model.job_summaries_by_submission[self.submission._id]
Modified: trunk/cumin/python/cumin/util.py
===================================================================
--- trunk/cumin/python/cumin/util.py 2011-08-17 18:17:46 UTC (rev 4935)
+++ trunk/cumin/python/cumin/util.py 2011-08-26 14:18:11 UTC (rev 4936)
@@ -1,6 +1,7 @@
import sys
import os
import logging
+import string
from crypt import crypt
from datetime import datetime, timedelta
@@ -160,14 +161,20 @@
return dvalue
class JobStatusInfo(object):
- stat_strings = ["Unexpanded", "Idle", "Running", "Removed", "Completed", "Held", "Submission Error"]
+ stat_strings = ["Unexpanded", "Idle", "Running", "Removed", "Completed", "Held", "Submission error"]
stat_colors = ["red", "clear", "green", "black", "blue", "yellow", "red"]
@classmethod
def get_status_string(cls, stat):
- try:
- return cls.stat_strings[stat]
- except:
- return ""
+ # Allow strings to be passed in for flexibility.
+ # Enforcing cumin case should make strings
+ # match stat_strings for expected values
+ if type(stat) in (str, unicode):
+ return string.capitalize(stat)
+ else:
+ try:
+ return cls.stat_strings[stat]
+ except:
+ return "Unrecognized"
@classmethod
def get_status_color(cls, stat):
Modified: trunk/sage/python/sage/aviary/aviaryoperations.py
===================================================================
--- trunk/sage/python/sage/aviary/aviaryoperations.py 2011-08-17 18:17:46 UTC (rev 4935)
+++ trunk/sage/python/sage/aviary/aviaryoperations.py 2011-08-26 14:18:11 UTC (rev 4936)
@@ -5,12 +5,13 @@
import urllib2
import socket
import string
+import time
from suds import *
from suds.client import Client
from sage.util import CallSync, CallThread, ObjectPool, host_list
+from datetime import datetime
-
log = logging.getLogger("sage.aviary")
#f = open("./suds.client.log", 'a+')
@@ -41,19 +42,17 @@
- Add logging to Aviary
-- See if we can fill in the scheduler/pool information.
-What is cumin passing? (the scheduler object has this stuff,
-but what about hierarchical collectors?)
+- Test pool/schedd/stuff with hierarchical collectors
-- get the submission objects passed down if possible
-for job control functions. This would be extra stuff for QMF, but helpful with
-Aviary.
-
- Add a summary comment at the top like QMF opreations
- can we use default/timeout with suds?
-yes, there is at least a timeout that can be set once,
-it might be possible to change the timeout per call.
+looks like the way to do this is through the Transport object that
+is set in the client. The timeout is set on the Transport itself,
+and the Transport can be changed on the client with set_options.
+Alternatively, we could retain a reference to the Transport in
+the clients that we pool so that we can call set_options on the
+transport.
'''
class AviaryOperations(object):
@@ -62,7 +61,7 @@
self.datadir = datadir
# job_servers and query_servers are comma separated lists of
- # network locations. See comments on host_port_list for format.
+ # network locations. See comments on host_list() for format.
# Replace any occurrence of locahost with output of gethostname()
# before parsing to match Machine fields of QMF objects later on.
host = socket.gethostname()
@@ -90,7 +89,7 @@
# job server operations
- def set_job_attribute(self, scheduler, job_id, name, value, callback):
+ def set_job_attribute(self, scheduler, job_id, name, value, callback, submission):
assert callback
job_client = self.job_client_pool.get_object()
@@ -116,6 +115,8 @@
jobId.job = job_id
jobId.pool = scheduler.Pool
jobId.scheduler = scheduler.Name
+ jobId.submission.name = submission.Name
+ jobId.submission.owner = submission.Owner
# Make attribute parameter from name and value
aviary_attr = job_client.factory.create('ns0:Attribute')
@@ -134,7 +135,7 @@
def my_callback(result):
# Turn this back off before we put it back in the pool
# so allow_overrides isn't set for someone else...
- job_client.set_allow_overrides(False)
+ job_client.set_enable_attributes(False)
self.job_client_pool.return_object(job_client)
result = self._pretty_result(result, scheduler.Machine)
if isinstance(result, Exception):
@@ -144,7 +145,11 @@
# we'll pass it anyway even though Cumin does not care
# at the present time
status = AviaryOperations._get_status(result.status)
- callback(status, result.id)
+ if status == "OK" and hasattr(result, "id"):
+ id = result.id
+ else:
+ id = None
+ callback(status, id)
host = self._get_host(scheduler.Machine, self.job_servers)
if host == "":
@@ -170,7 +175,8 @@
# "extras" fields and set allowOverrides to True.
# (otherwise, Requirements will be limited to particular
# resource constraint types defined by aviary)
- job_client.set_allow_overrides(True)
+ job_client.set_enable_attributes(True)
+ job_client.set_attributes({"allowOverrides": True})
extras = list()
for k, v in ad.iteritems():
# We don't need to send descriptors down to aviary
@@ -198,37 +204,76 @@
t = CallThread(job_client.service.submitJob, my_callback, *args)
t.start()
- def hold_job(self, scheduler, job_id, reason,
- callback=None, default=None, timeout=5):
+ def hold_job(self, scheduler, job_id, reason, submission, *args, **kwargs):
'''
This method is asynchronous iff 'callback' is supplied.
+
+ kwargs will be searched for 'callback', 'default' and 'timeout' arguments.
+
+ These optional arguments were moved to kwargs for compatibility
+ with another implementation of the same routine.
'''
- return self._control_job(scheduler, job_id, reason,
+
+ callback = "callback" in kwargs and kwargs["callback"] or None
+ default = "default" in kwargs and kwargs["default"] or None
+ timeout = "timeout" in kwargs and kwargs["timeout"] or 5
+
+ return self._control_job(scheduler, job_id, reason, submission,
"holdJob",
callback, default, timeout)
- def release_job(self, scheduler, job_id, reason,
- callback=None, default=None, timeout=5):
+ def release_job(self, scheduler, job_id, reason, submission, *args, **kwargs):
'''
This method is asynchronous iff 'callback' is supplied.
+
+ kwargs will be searched for 'callback', 'default' and 'timeout' arguments.
+
+ These optional arguments were moved to kwargs for compatibility
+ with another implementation of the same routine.
'''
- return self._control_job(scheduler, job_id, reason,
+
+ callback = "callback" in kwargs and kwargs["callback"] or None
+ default = "default" in kwargs and kwargs["default"] or None
+ timeout = "timeout" in kwargs and kwargs["timeout"] or 5
+
+ return self._control_job(scheduler, job_id, reason, submission,
"releaseJob",
callback, default, timeout)
- def remove_job(self, scheduler, job_id, reason,
- callback=None, default=None, timeout=5):
+ def remove_job(self, scheduler, job_id, reason, submission, *args, **kwargs):
'''
This method is asynchronous iff 'callback' is supplied.
+
+ kwargs will be searched for 'callback', 'default' and 'timeout' arguments.
+
+ These optional arguments were moved to kwargs for compatibility
+ with another implementation of the same routine.
'''
- return self._control_job(scheduler, job_id, reason,
+
+ callback = "callback" in kwargs and kwargs["callback"] or None
+ default = "default" in kwargs and kwargs["default"] or None
+ timeout = "timeout" in kwargs and kwargs["timeout"] or 5
+
+ return self._control_job(scheduler, job_id, reason, submission,
"removeJob",
callback, default, timeout)
+#########################
# query server ops
+########################
def fetch_job_data(self, job_server, job_id, ftype, file, start, end,
- default=None, timeout=5):
+ scheduler_name, submission, *args, **kwargs):
+ '''
+ kwargs will be searched for 'default' and 'timeout' arguments.
+
+ These optional arguments were moved to kwargs for compatibility
+ with another implementation of the same routine.
+ '''
+
+ default = "default" in kwargs and kwargs["default"] or None
+ timeout = "timeout" in kwargs and kwargs["timeout"] or 5
+
# Aviary doesn't use the file name as does QMF, instead it
# specifies the file type and lets condor figure out the path.
@@ -242,9 +287,12 @@
data = None
else:
status = AviaryOperations._get_status(result.status)
- # Match the format expected by Cumin. This is
- # the format used by the QMF call...
- data = {'Data': result.content}
+ if status == "OK" and hasattr(result, "content"):
+ # Match the format expected by Cumin. This is
+ # the format used by the QMF call...
+ data = {'Data': result.content}
+ else:
+ data = None
return (status, data)
host = self._get_host(job_server.Machine, self.query_servers)
@@ -261,6 +309,9 @@
jobData = client.factory.create('ns0:JobData')
jobData.id.job = job_id
jobData.id.pool = job_server.Pool
+ jobData.id.scheduler = scheduler_name
+ jobData.id.submission.name = submission.Name
+ jobData.id.submission.owner = submission.Owner
# Translate cumin file type to Aviary file type
if ftype == "e":
@@ -282,11 +333,22 @@
self.query_client_pool.return_object(client)
return res;
- def get_job_ad(self, job_server, job_id, default=None, timeout=5):
-
+ def get_job_ad(self, job_server, job_id, scheduler_name, submission,
+ *args, **kwargs):
+ '''
+ kwargs will be searched for 'default' and 'timeout' arguments.
+
+ These optional arguments were moved to kwargs for compatibility
+ with another implementation of the same routine.
+ '''
+
+ default = "default" in kwargs and kwargs["default"] or None
+ timeout = "timeout" in kwargs and kwargs["timeout"] or 5
+
client = self.query_client_pool.get_object()
def make_tuple(attr):
+ # Attempt to cast the value into the specified type
if attr.type in self.aviary_to_type:
try:
v = self.aviary_to_type[attr.type](attr.value)
@@ -307,12 +369,15 @@
data = None
else:
status = AviaryOperations._get_status(result[0].status)
- # Match the format expected by Cumin. This is
- # the format used by the QMF call. We have a list
- # of attributes in attrs that we need to make into
- # a dictionary
- ads = make_dict(result[0].details.attrs)
- data = {'JobAd': ads}
+ if status == "OK":
+ # Match the format expected by Cumin. This is
+ # the format used by the QMF call. We have a list
+ # of attributes in attrs that we need to make into
+ # a dictionary
+ ads = make_dict(result[0].details.attrs)
+ data = {'JobAd': ads}
+ else:
+ data = None
return (status, data)
host = self._get_host(job_server.Machine, self.query_servers)
@@ -329,15 +394,99 @@
jobId = client.factory.create('ns0:JobID')
jobId.job = job_id
jobId.pool = job_server.Pool
- # still don't have the scheduler filled in here
- # or the submission. Ditto with other jobId values in this module
+ jobId.scheduler = scheduler_name
+ jobId.submission.name = submission.Name
+ jobId.submission.owner = submission.Owner
res = self._call_sync(my_process_results,
client.service.getJobDetails, jobId)
self.query_client_pool.return_object(client)
return res;
+ def get_job_summaries(self, submission, callback, machine_name):
+ assert callback
+
+ query_client = self.query_client_pool.get_object()
+
+ def to_int_seconds(dt):
+ # Change a datetime.datetime into int seconds since epoch
+ # Note, this works nicely if the datetime happens to include microseconds
+ # since the call to timetuple will drop them. Stuff coming back from
+ # condor should not have microseconds anyway.
+ return int(time.mktime(dt.timetuple()))
+
+ def get_string(job, attr):
+ # Cast suds text types into str so we have standard Py types
+ # Handles optional strings as well
+ if hasattr(job, attr):
+ return str(getattr(job, attr))
+ return ""
+
+ def adapt(jobs):
+ # Make an aviary job summary look like the canonical form
+ # that cumin is expecting (actually the QMF form because of history).
+ result = list()
+ for job in jobs:
+ cluster, proc = job.id.job.split(".")
+ j = dict()
+ j["ClusterId"] = int(cluster)
+ j["Cmd"] = str(job.cmd)
+ j["EnteredCurrentStatus"] = to_int_seconds(job.last_update)
+ j["GlobalJobId"] = job.id.scheduler + \
+ "#" + job.id.job + \
+ "#" + str(to_int_seconds(job.queued))
+ j["JobStatus"] = str(job.job_status)
+ j["ProcId"] = int(proc)
+ j["QDate"] = to_int_seconds(job.queued)
+
+ # These may be null...
+ j["Args"] = get_string(job, "args1")
+ j["ReleaseReason"] = get_string(job, "released")
+ j["HoldReason"] = get_string(job, "held")
+ result.append(j)
+ return result
+
+ def my_callback(result):
+ query_client.set_enable_attributes(False)
+ self.query_client_pool.return_object(query_client)
+ result = self._pretty_result(result, machine_name)
+ if isinstance(result, Exception):
+ callback(result, None)
+ else:
+ status = AviaryOperations._get_status(result[0].status)
+ if status == "OK" and hasattr(result[0], "jobs"):
+ data = {"Jobs": adapt(result[0].jobs)}
+ else:
+ data = {"Jobs": None}
+ callback(status, data)
+
+ host = self._get_host(machine_name, self.query_servers)
+ if host == "":
+ self._raise_no_host(machine_name)
+
+ service = host + "getSubmissionSummary"
+
+ # Have to set the URL for the method. This might go away someday...
+ query_client.set_options(location=service)
+
+ # What we really want here is the job summaries from the
+ # submission summary response. To get those, we have to
+ # set an extra attribute on the client...
+ query_client.set_enable_attributes(True)
+ query_client.set_attributes({"includeJobSummaries": "true"})
+
+ # Make a submission id. (see query wsdl)
+ subId = query_client.factory.create('ns0:SubmissionID')
+ subId.name = submission.Name
+ subId.owner = submission.Owner
+
+ t = CallThread(query_client.service.getSubmissionSummary,
+ my_callback, subId)
+ t.start()
+
+########################################################
# Secret private implementation stuff, don't look!
+########################################################
@classmethod
def _type_to_aviary(cls):
@@ -407,7 +556,8 @@
sync.get_completion()(*cb_args)
return sync
- def _control_job(self, scheduler, job_id, reason, meth_name,
+ def _control_job(self, scheduler, job_id, reason, submission,
+ meth_name,
callback, default, timeout):
host = self._get_host(scheduler.Machine, self.job_servers)
@@ -428,6 +578,8 @@
jobId.job = job_id
jobId.pool = scheduler.Pool
jobId.scheduler = scheduler.Name
+ jobId.submission.name = submission.Name
+ jobId.submission.owner = submission.Owner
if callback:
def my_callback(result):
@@ -461,16 +613,19 @@
to the suds message after marshalling.
'''
def __init__(self):
- self.allow = False
+ self.set_attributes = False
+ self.attributes = dict()
def marshalled(self, context):
- if self.allow:
+ if self.set_attributes:
sj_body = context.envelope.getChild('Body')[0]
- sj_body.attributes.append(Attribute("allowOverrides", "true"))
+ for k,v in self.attributes.iteritems():
+ sj_body.attributes.append(Attribute(k, v))
+
except:
class OverridesPlugin(object):
def __init__(self):
- self.allow = False
+ self.set_attributes = False
class OverrideClient(Client):
'''
@@ -484,9 +639,28 @@
except:
super(OverrideClient, self).__init__(*args)
- def set_allow_overrides(self, truth):
- self.override.allow = truth
+ def set_enable_attributes(self, truth):
+ '''
+ Set the flag that controls whether extra attributes are marshalled.
+ The attribute set is specified through the set_attributes() method.
+ If 'truth' is True, then service calls made through this client
+ will have the extra attributes appended to the attribute set of service
+ request.
+ '''
+ self.override.set_attributes = truth
+
+ def set_attributes(self, attrs):
+ '''
+ Specify extra attributes to append to the attribute set
+ of service requests made through this client.
+
+ The extra attributes will be added if set_enable_attributes()
+ has been called with a value of True sometime prior to the
+ time of a request.
+ '''
+ self.override.attributes = attrs
+
class JobClientPool(ObjectPool):
def __init__(self, job_wsdl, max_size):
super(JobClientPool, self).__init__(max_size)
@@ -501,4 +675,4 @@
self.query_wsdl = query_wsdl
def create_object(self):
- return Client(self.query_wsdl)
+ return OverrideClient(self.query_wsdl)
Modified: trunk/sage/python/sage/qmf/qmfoperations.py
===================================================================
--- trunk/sage/python/sage/qmf/qmfoperations.py 2011-08-17 18:17:46 UTC (rev 4935)
+++ trunk/sage/python/sage/qmf/qmfoperations.py 2011-08-26 14:18:11 UTC (rev 4936)
@@ -81,7 +81,7 @@
# scheduler operations
- def set_job_attribute(self, scheduler, job_id, name, value, callback):
+ def set_job_attribute(self, scheduler, job_id, name, value, callback, *args):
assert callback
self._call(scheduler, "SetJobAttribute", callback, 0, 0,
job_id, name, value)
@@ -90,37 +90,84 @@
assert callback
self._call(scheduler, "SubmitJob", callback, 0, 0, ad)
- def hold_job(self, scheduler, job_id, reason,
- callback=None, default=None, timeout=5):
+ def hold_job(self, scheduler, job_id, reason, *args, **kwargs):
'''
This method is asynchronous iff 'callback' is supplied.
+
+ kwargs will be searched for 'callback', 'default' and 'timeout' arguments.
+
+ These optional arguments were moved to kwargs for compatibility
+ with another implementation of the same routine.
'''
+
+ callback = "callback" in kwargs and kwargs["callback"] or None
+ default = "default" in kwargs and kwargs["default"] or None
+ timeout = "timeout" in kwargs and kwargs["timeout"] or 5
+
return self._call(scheduler, "HoldJob", callback, default,
timeout, job_id, reason)
- def release_job(self, scheduler, job_id, reason,
- callback=None, default=None, timeout=5):
+ def release_job(self, scheduler, job_id, reason, *args, **kwargs):
'''
This method is asynchronous iff 'callback' is supplied.
+
+ kwargs will be searched for 'callback', 'default' and 'timeout' arguments.
+
+ These optional arguments were moved to kwargs for compatibility
+ with another implementation of the same routine.
'''
+
+ callback = "callback" in kwargs and kwargs["callback"] or None
+ default = "default" in kwargs and kwargs["default"] or None
+ timeout = "timeout" in kwargs and kwargs["timeout"] or 5
+
return self._call(scheduler, "ReleaseJob", callback, default,
timeout, job_id, reason)
- def remove_job(self, scheduler, job_id, reason,
- callback=None, default=None, timeout=5):
+ def remove_job(self, scheduler, job_id, reason, *args, **kwargs):
'''
This method is asynchronous iff 'callback' is supplied.
+
+ kwargs will be searched for 'callback', 'default' and 'timeout' arguments.
+
+ These optional arguments were moved to kwargs for compatibility
+ with another implementation of the same routine.
'''
+
+ callback = "callback" in kwargs and kwargs["callback"] or None
+ default = "default" in kwargs and kwargs["default"] or None
+ timeout = "timeout" in kwargs and kwargs["timeout"] or 5
+
return self._call(scheduler, "RemoveJob", callback, default,
timeout, job_id, reason)
# jobserver operations
- def get_job_ad(self, job_server, job_id, default=None, timeout=5):
+ def get_job_ad(self, job_server, job_id, *args, **kwargs):
+ '''
+ kwargs will be searched for 'default' and 'timeout' arguments.
+
+ These optional arguments were moved to kwargs for compatibility
+ with another implementation of the same routine.
+ '''
+
+ default = "default" in kwargs and kwargs["default"] or None
+ timeout = "timeout" in kwargs and kwargs["timeout"] or 5
+
return self._call(job_server, "GetJobAd", 0, default, timeout, job_id)
def fetch_job_data(self, job_server, job_id, ftype, file, start, end,
- default=None, timeout=5):
+ *args, **kwargs):
+ '''
+ kwargs will be searched for 'default' and 'timeout' arguments.
+
+ These optional arguments were moved to kwargs for compatibility
+ with another implementation of the same routine.
+ '''
+
+ default = "default" in kwargs and kwargs["default"] or None
+ timeout = "timeout" in kwargs and kwargs["timeout"] or 5
+
# QMF doesn't use the ftype value, it just uses the filename
return self._call(job_server, "FetchJobData", 0, default, timeout,
job_id, file, start, end)
@@ -158,7 +205,7 @@
# submission operations
- def get_job_summaries(self, submission, callback):
+ def get_job_summaries(self, submission, callback, *args):
assert callback
return self._call(submission, "GetJobSummaries", callback, 0, 0)