Author: tmckay
Date: 2011-06-24 17:40:40 +0000 (Fri, 24 Jun 2011)
New Revision: 4854
Added:
branches/sage_addition/sage/
branches/sage_addition/sage/python/
branches/sage_addition/sage/python/sage/
branches/sage_addition/sage/python/sage/__init__.py
branches/sage_addition/sage/python/sage/aviaryoperations.py
branches/sage_addition/sage/python/sage/qmfoperations.py
branches/sage_addition/sage/python/sage/remoteoperations.py
Modified:
branches/sage_addition/cumin/bin/cumin-web
branches/sage_addition/cumin/python/cumin/config.py
branches/sage_addition/cumin/python/cumin/grid/daemon.py
branches/sage_addition/cumin/python/cumin/grid/job.py
branches/sage_addition/cumin/python/cumin/grid/limit.py
branches/sage_addition/cumin/python/cumin/grid/negotiator.py
branches/sage_addition/cumin/python/cumin/grid/slot.py
branches/sage_addition/cumin/python/cumin/grid/submission.py
branches/sage_addition/cumin/python/cumin/main.py
branches/sage_addition/cumin/python/cumin/messaging/brokerlink.py
branches/sage_addition/cumin/python/cumin/messaging/connection.py
branches/sage_addition/cumin/python/cumin/messaging/queue.py
branches/sage_addition/cumin/python/cumin/model.py
branches/sage_addition/cumin/python/cumin/objecttask.py
branches/sage_addition/cumin/python/cumin/qmfadapter.py
branches/sage_addition/cumin/python/cumin/task.py
branches/sage_addition/cumin/python/cumin/widgets.py
branches/sage_addition/etc/Profile.bash
branches/sage_addition/rosemary/python/rosemary/model.py
Log:
Create sage module to wrap RPC operations of various flavors.
Modify all existing QMF calls to use sage.RemoteOperations.
Removed unused classes found along the way.
Create stub for AviaryOperations, to be filled in later.
Modified: branches/sage_addition/cumin/bin/cumin-web
===================================================================
--- branches/sage_addition/cumin/bin/cumin-web 2011-06-24 15:31:24 UTC (rev 4853)
+++ branches/sage_addition/cumin/bin/cumin-web 2011-06-24 17:40:40 UTC (rev 4854)
@@ -88,6 +88,8 @@
if type(values.sasl_mech_list) == str:
cumin.sasl_mech_list = values.sasl_mech_list.upper()
+ cumin.use_aviary = values.use_aviary
+
cumin.debug = opts.debug
cumin.user = values.user
cumin.update_interval = values.update_interval
Modified: branches/sage_addition/cumin/python/cumin/config.py
===================================================================
--- branches/sage_addition/cumin/python/cumin/config.py 2011-06-24 15:31:24 UTC (rev 4853)
+++ branches/sage_addition/cumin/python/cumin/config.py 2011-06-24 17:40:40 UTC (rev 4854)
@@ -163,6 +163,9 @@
param = ConfigParameter(self, "debug", bool)
param.default = False
+ param = ConfigParameter(self, "use-aviary", bool)
+ param.default = True
+
class CuminOptionParser(OptionParser,object):
def __init__(self):
OptionParser.__init__(self)
@@ -199,7 +202,7 @@
#
# # start your business
-_logging_modules = "cumin", "mint", "parsley", "rosemary", "wooly"
+_logging_modules = "cumin", "mint", "parsley", "rosemary", "wooly, sage"
def setup_initial_logging():
for name in _logging_modules:
Modified: branches/sage_addition/cumin/python/cumin/grid/daemon.py
===================================================================
--- branches/sage_addition/cumin/python/cumin/grid/daemon.py 2011-06-24 15:31:24 UTC (rev 4853)
+++ branches/sage_addition/cumin/python/cumin/grid/daemon.py 2011-06-24 17:40:40 UTC (rev 4854)
@@ -31,7 +31,7 @@
master = self.get_master(system_name, invoc)
if master:
- self.qmf_call(invoc, master, "Start", self.target)
+ self.app.remote.start(master, self.target, invoc.make_callback())
class DaemonStop(DaemonFrameTask):
def __init__(self, app, frame, target):
@@ -49,7 +49,7 @@
master = self.get_master(system_name, invoc)
if master:
- self.qmf_call(invoc, master, "Stop", self.target)
+ self.app.remote.stop(master, self.target, invoc.make_callback())
class DaemonSelectorTask(ObjectSelectorTask):
def get_master(self, system_name, invoc):
@@ -81,7 +81,7 @@
master = self.get_master(system_name, invoc)
if master:
- self.qmf_call(invoc, master, "Start", self.target)
+ self.app.remote.start(master, self.target, invoc.make_callback())
class DaemonSelectionStop(DaemonSelectorTask):
def __init__(self, app, selector, target):
@@ -101,4 +101,4 @@
master = self.get_master(system_name, invoc)
if master:
- self.qmf_call(invoc, master, "Stop", self.target)
+ self.app.remote.stop(master, self.target, invoc.make_callback())
Modified: branches/sage_addition/cumin/python/cumin/grid/job.py
===================================================================
--- branches/sage_addition/cumin/python/cumin/grid/job.py 2011-06-24 15:31:24 UTC (rev 4853)
+++ branches/sage_addition/cumin/python/cumin/grid/job.py 2011-06-24 17:40:40 UTC (rev 4854)
@@ -10,7 +10,6 @@
from cumin.widgets import StaticColumnHeader, Wait, CuminForm,\
EditablePropertyRenderer, StateSwitch
from cumin.util import JobStatusInfo, strip_string_quotes, parse
-from cumin.model import QmfCall
from cumin.formats import fmt_datetime, fmt_link
from wooly import Widget, Parameter, Attribute
@@ -291,9 +290,7 @@
def do_invoke(self, invoc, job_id, scheduler, reason):
assert self.method
- action = QmfCall(self.app)
- results = action.execute(scheduler, self.method, job_id, reason)
-
+ results = self.method(scheduler, job_id, reason)
if results.error:
raise results.error
@@ -312,7 +309,7 @@
def __init__(self, app, selector, verb):
super(JobSelectionHold, self).__init__(app, selector, verb)
- self.method = "HoldJob"
+ self.method = self.app.remote.hold_job
def get_title(self, session):
return "Hold"
@@ -321,7 +318,7 @@
def __init__(self, app, selector, verb):
super(JobSelectionRelease, self).__init__(app, selector, verb)
- self.method = "ReleaseJob"
+ self.method = self.app.remote.release_job
def get_title(self, session):
return "Release"
@@ -330,7 +327,7 @@
def __init__(self, app, selector, verb):
super(JobSelectionRemove, self).__init__(app, selector, verb)
- self.method = "RemoveJob"
+ self.method = self.app.remote.remove_job
def get_title(self, session):
return "Remove"
@@ -416,6 +413,7 @@
1: "integer",
2: "float",
3: "string"}
+
def __init__(self, app, name):
super(JobAdsSet, self).__init__(app, name)
@@ -434,9 +432,7 @@
id = self.frame.id.get(session)
job_server = self.frame.get_job_server(session, id)
job_id = self.frame.job_id.get(session)
-
- action = QmfCall(self.app, {'JobAd': {}})
- results = action.execute(job_server, "GetJobAd", job_id)
+ results = self.app.remote.get_job_ad(job_server, job_id, {'JobAd': {}})
error = results.error
self.qmf_error.set(session, error)
ads = results.data['JobAd']
@@ -507,9 +503,7 @@
id = self.frame.id.get(session)
job_server = self.frame.get_job_server(session, id)
job_id = self.frame.job_id.get(session)
-
- action = QmfCall(self.app, {'JobAd': {}})
- results = action.execute(job_server, "GetJobAd", job_id)
+ results = self.app.remote.get_job_ad(job_server, job_id, {'JobAd': {}})
error = results.error
self.qmf_error.set(session, error)
ads = results.data['JobAd']
@@ -799,8 +793,9 @@
job_id = self.frame.job_id.get(session)
file, start, end = self.get_file_args(session)
if file:
- action = QmfCall(self.app, {'Data': ""})
- result = action.execute(job_server, "FetchJobData", job_id, file, start, end)
+ result = self.app.remote.fetch_job_data(job_server,
+ job_id,
+ file, start, end, {'Data': ""})
if result.error:
return result.status
return escape_entity(result.data['Data'])
@@ -962,23 +957,6 @@
self.add_state("t", "Tail", "Display end of file")
self.add_state("h", "Head", "Display beginning of file")
-class JobAction(ObjectFrameTask):
- def __init__(self, app, frame, verb):
- super(JobAction, self).__init__(app, frame)
-
- self.form = JobActionForm(app, self.name, self, verb)
-
- self.method = None
-
- def do_enter(self, session, osession):
- job_id = self.frame.job_id.get(osession)
- self.form.job_id.set(session, job_id)
-
- def do_invoke(self, invoc, scheduler, job_id, reason):
- assert self.method
-
- self.qmf_call(invoc, scheduler, self.method, job_id, reason)
-
class JobActionForm(ObjectFrameTaskForm):
def __init__(self, app, name, task, verb):
super(JobActionForm, self).__init__(app, name, task)
@@ -1019,21 +997,35 @@
def render_title(self, session):
return "Reason"
+class JobAction(ObjectFrameTask):
+ def __init__(self, app, frame, verb):
+ super(JobAction, self).__init__(app, frame)
+
+ self.form = JobActionForm(app, self.name, self, verb)
+
+ def do_enter(self, session, osession):
+ job_id = self.frame.job_id.get(osession)
+ self.form.job_id.set(session, job_id)
+
+ def do_invoke(self, invoc, scheduler, job_id, reason):
+ raise Exception("oops, not implemented!")
+
class JobHold(JobAction):
def __init__(self, app, frame):
super(JobHold, self).__init__(app, frame, "held")
- self.method = "HoldJob"
+ def do_invoke(self, invoc, scheduler, job_id, reason):
+ self.app.remote.hold_job(scheduler, job_id, reason, invoc.make_callback())
def get_title(self, session):
return "Hold Job"
-
class JobRelease(JobAction):
def __init__(self, app, frame):
- super(JobRelease, self).__init__(app, frame, "held")
+ super(JobRelease, self).__init__(app, frame, "released")
- self.method = "ReleaseJob"
+ def do_invoke(self, invoc, scheduler, job_id, reason):
+ self.app.remote.release_job(scheduler, job_id, reason, invoc.make_callback())
def get_title(self, session):
return "Release Job"
@@ -1042,7 +1034,8 @@
def __init__(self, app, frame):
super(JobRemove, self).__init__(app, frame, "removed")
- self.method = "RemoveJob"
+ def do_invoke(self, invoc, scheduler, job_id, reason):
+ self.app.remote.remove_job(scheduler, job_id, reason, invoc.make_callback())
def get_title(self, session):
return "Remove Job"
@@ -1073,5 +1066,6 @@
return "Edit Ad"
def do_invoke(self, invoc, scheduler, job_id, name, value):
- self.qmf_call(invoc, scheduler, "SetJobAttribute", job_id, name, value)
+ self.app.remote.set_job_attribute(scheduler, job_id, name, value,
+ invoc.make_callback())
Modified: branches/sage_addition/cumin/python/cumin/grid/limit.py
===================================================================
--- branches/sage_addition/cumin/python/cumin/grid/limit.py 2011-06-24 15:31:24 UTC (rev 4853)
+++ branches/sage_addition/cumin/python/cumin/grid/limit.py 2011-06-24 17:40:40 UTC (rev 4854)
@@ -1,11 +1,9 @@
from cumin.formats import fmt_link
-from cumin.model import QmfCall
from cumin.objectframe import ObjectFrame, ObjectFrameTask, \
ObjectFrameTaskFeedbackForm
from cumin.objectselector import ObjectTableColumn, ObjectQmfSelector, \
ObjectQmfTable
from cumin.qmfadapter import ObjectQmfAdapter
-from cumin.widgets import CuminView
from wooly.forms import StringField, StringInput, IntegerField, RealField, \
FormButton, FormField, FormError, CheckboxInput
from wooly.util import StringCatalog
@@ -138,18 +136,13 @@
self.form.limit_max.set(session, self.form.limit_max.get(osession))
def do_invoke(self, invoc, negotiator, limit_name, limit_max):
- action = QmfCall(self.app)
limit_max = float(limit_max)
- results = action.execute(negotiator, "SetLimit", limit_name, limit_max)
-
+ results = self.app.remote.set_limit(negotiator, limit_name, limit_max)
if results.error:
raise results.error
invoc.status_code = results.status
-
- action = QmfCall(self.app)
- action.execute(negotiator, "Reconfig")
-
+ self.app.remote.reconfig(negotiator)
invoc.end()
class NegotiatorLimitForm(ObjectFrameTaskFeedbackForm):
@@ -273,12 +266,3 @@
if message:
self.form.errors.add(session, FormError(message))
-
-
-
-class LimitView(CuminView):
- def __init__(self, app, name, limit):
- super(LimitView, self).__init__(app, name, None)
-
- self.tabs = TabbedModeSet(app, "tabs")
- self.add_child(self.tabs)
Modified: branches/sage_addition/cumin/python/cumin/grid/negotiator.py
===================================================================
--- branches/sage_addition/cumin/python/cumin/grid/negotiator.py 2011-06-24 15:31:24 UTC (rev 4853)
+++ branches/sage_addition/cumin/python/cumin/grid/negotiator.py 2011-06-24 17:40:40 UTC (rev 4854)
@@ -10,7 +10,6 @@
from wooly.parameters import ListParameter
from wooly.template import WidgetTemplate
from cumin.stat import StatFlashChart
-from cumin.model import QmfCall
strings = StringCatalog(__file__)
log = logging.getLogger("cumin.grid.negotiator")
@@ -401,10 +400,7 @@
self.app.main_page.main.grid.view.show(session)
def do_invoke(self, invoc, negotiator, group, value):
- # don't call self.qmf_call here since we need to
- # wait for the response before making the next QMF call
- action = QmfCall(self.app)
- result = action.execute(negotiator, "SetRawConfig", group, value)
+ result = self.app.remote.set_raw_config(negotiator, group, value)
if result.error:
raise result.error
@@ -418,8 +414,7 @@
invoc.end()
def reconfig(self, negotiator):
- action = QmfCall(self.app)
- action.execute(negotiator, "Reconfig")
+ self.app.remote.reconfig(negotiator)
class NegotiatorEditDynamicQuota(NegotiatorGroupTask):
def __init__(self, app, frame):
Modified: branches/sage_addition/cumin/python/cumin/grid/slot.py
===================================================================
--- branches/sage_addition/cumin/python/cumin/grid/slot.py 2011-06-24 15:31:24 UTC (rev 4853)
+++ branches/sage_addition/cumin/python/cumin/grid/slot.py 2011-06-24 17:40:40 UTC (rev 4854)
@@ -36,19 +36,6 @@
return records
return recs
-class SlotView(CuminView):
- def __init__(self, app, name, slot):
- super(SlotView, self).__init__(app, name, slot)
-
- self.tabs = TabbedModeSet(app, "tabs")
- self.add_child(self.tabs)
-
- stats = SlotStats(app, "stats", slot)
- self.tabs.add_tab(stats)
-
- details = CuminDetails(app, "details", slot)
- self.tabs.add_tab(details)
-
class SlotLoadStatSet(StatSet):
def __init__(self, app, name, object):
super(SlotLoadStatSet, self).__init__(app, name, object)
Modified: branches/sage_addition/cumin/python/cumin/grid/submission.py
===================================================================
--- branches/sage_addition/cumin/python/cumin/grid/submission.py 2011-06-24 15:31:24 UTC (rev 4853)
+++ branches/sage_addition/cumin/python/cumin/grid/submission.py 2011-06-24 17:40:40 UTC (rev 4854)
@@ -363,7 +363,7 @@
log_job_ad(ad)
- self.qmf_call(invoc, scheduler, "SubmitJob", ad)
+ self.app.remote.submit_job(scheduler, ad, invoc.make_callback())
class JobSubmitForm(ObjectTaskForm):
def __init__(self, app, name, task):
@@ -663,9 +663,9 @@
invoc.description = "Submit VM job '%s'" % description
log_job_ad(ad)
-
- self.qmf_call(invoc, scheduler, "SubmitJob", ad)
+ self.app.remote.submit_job(scheduler, ad, invoc.make_callback())
+
class VmJobSubmitForm(ObjectTaskForm):
def __init__(self, app, name, task):
cls = app.model.com_redhat_grid.Scheduler
@@ -799,7 +799,7 @@
log_job_ad(ad)
- self.qmf_call(invoc, scheduler, "SubmitJob", ad)
+ self.app.remote.submit_job(scheduler, ad, invoc.make_callback())
class DagJobSubmitForm(ObjectTaskForm):
def __init__(self, app, name, task):
Modified: branches/sage_addition/cumin/python/cumin/main.py
===================================================================
--- branches/sage_addition/cumin/python/cumin/main.py 2011-06-24 15:31:24 UTC (rev 4853)
+++ branches/sage_addition/cumin/python/cumin/main.py 2011-06-24 17:40:40 UTC (rev 4854)
@@ -21,7 +21,9 @@
from user import *
from util import *
from widgets import *
-
+from sage.remoteoperations import RemoteOperations
+from sage.qmfoperations import QmfOperations
+from sage.aviaryoperations import AviaryOperations
from wooly import Session
from cumin.stat import PieChartPage
from cumin.widgets import AboutPage
@@ -40,6 +42,7 @@
self.model = CuminModel(self, model_dir)
self.session = CuminSession(self, broker_uris)
+
self.database = CuminDatabase(self, database_dsn)
self.server = CuminServer(self, host, port)
self.admin = CuminAdmin(self)
@@ -76,6 +79,10 @@
# Space separated list of sasl authentication
# mechanisms, according to the sasl documentation
self.sasl_mech_list = None
+
+ # Whether or not to use Aviary interface for
+ # queries and job submissions. May be set via cumin.conf
+ self.use_aviary = True
def server_alive(self):
return self.server.server_alive()
@@ -143,6 +150,14 @@
def init(self):
log.info("Initializing %s", self)
+ # Create RPC interfaces for QMF and aviary.
+ # Aviary takes precedence if added
+ self.remote = RemoteOperations()
+ ops = [QmfOperations("qmf", self.session)]
+ if self.use_aviary:
+ ops.insert(0, AviaryOperations("aviary"))
+ self.remote.add_mechanisms(ops)
+
self.model.init()
self.session.init()
self.database.init()
Modified: branches/sage_addition/cumin/python/cumin/messaging/brokerlink.py
===================================================================
--- branches/sage_addition/cumin/python/cumin/messaging/brokerlink.py 2011-06-24 15:31:24 UTC (rev 4853)
+++ branches/sage_addition/cumin/python/cumin/messaging/brokerlink.py 2011-06-24 17:40:40 UTC (rev 4854)
@@ -52,7 +52,7 @@
self.app.main_page.main.messaging.broker.view.show(session)
def do_invoke(self, invoc, link):
- self.qmf_call(invoc, link, "close")
+ self.app.remote.close(link, invoc.make_callback())
class BrokerLinkSelector(ObjectSelector):
def __init__(self, app, name, vhost):
@@ -83,7 +83,7 @@
return "Remove broker link"
def do_invoke(self, invoc, link):
- self.qmf_call(invoc, link, "close")
+ self.app.remote.close(link, invoc.make_callback())
class RouteSelector(ObjectSelector):
def __init__(self, app, name, link):
@@ -110,7 +110,7 @@
return "Remove"
def do_invoke(self, invoc, route):
- self.qmf_call(invoc, route, "close")
+ self.app.remote.close(route, invoc.make_callback())
# XXX RouteFrame
@@ -122,20 +122,8 @@
self.app.main_page.main.messaging.broker.view.show(session)
def do_invoke(self, invoc, route):
- self.qmf_call(invoc, route, "close")
+ self.app.remote.close(route, invoc.make_callback())
-class LinkView(CuminView):
- def __init__(self, app, name, link):
- super(LinkView, self).__init__(app, name, link)
-
- self.tabs = TabbedModeSet(app, "tabs")
- self.add_child(self.tabs)
-
- #self.tabs.add_tab(LinkStats(app, "stats"))
-
- self.tabs.add_tab(RouteSet(app, "routes", link))
- self.tabs.add_tab(CuminDetails(app, "details", link))
-
class LinkGeneralStatSet(StatSet):
def __init__(self, app, name, object):
super(LinkGeneralStatSet, self).__init__(app, name, object)
@@ -185,18 +173,18 @@
exchanges = list()
link = self.form.link.get(session)
- vhost = link.vhost
- sortedExchanges = sorted_by(vhost.exchanges)
+ if link is not None:
+ vhost = link.vhost
+ sortedExchanges = sorted_by(vhost.exchanges)
- for exchange in sortedExchanges:
- if ExchangeInfo.is_builtin(exchange) or \
- (not exchange._get_qmfDeleteTime() and \
- not (self.state.is_active(session) and not is_active(exchange))):
- if not exchange.name in ["qpid.management", ""]:
- if not self.param.get(session):
- self.param.set(session, exchange.id)
- exchanges.append(exchange)
-
+ for exchange in sortedExchanges:
+ if ExchangeInfo.is_builtin(exchange) or \
+ (not exchange._get_qmfDeleteTime() and \
+ not (self.state.is_active(session) and not is_active(exchange))):
+ if not exchange.name in ["qpid.management", ""]:
+ if not self.param.get(session):
+ self.param.set(session, exchange.id)
+ exchanges.append(exchange)
return exchanges
def render_item_value(self, session, exchange):
@@ -236,10 +224,11 @@
def do_invoke(self, invoc, link, exchange, key, tag, dynamic, sync,
excludes):
- self.qmf_call(invoc, link, "bridge",
- link.durable, exchange.name, exchange.name,
- key, tag, excludes, False, False, dynamic, sync)
+ self.app.remote.bridge(link, link.durable, exchange.name, exchange.name,
+ key, tag, excludes, False, False, dynamic, sync,
+ invoc.make_callback())
+
class RouteAddForm(ObjectFrameTaskForm):
def __init__(self, app, name, task):
super(RouteAddForm, self).__init__(app, name, task)
@@ -334,10 +323,6 @@
dynamic, sync, excludes)
self.task.exit_with_redirect(session)
- def render_title(self, session):
- link = self.link.get(session)
- return self.task.get_title(session, link)
-
class RouteSetRemoveForm(CuminTaskForm):
def __init__(self, app, name, task):
super(RouteSetRemoveForm, self).__init__(app, name, task)
@@ -368,8 +353,8 @@
else:
mech = "PLAIN"
- self.qmf_call(invoc, obj, "connect",
- host, port, durable, mech, username, password, transport)
+ self.app.remote.connect(obj, host, port, durable, mech, username,
+ password, transport, invoc.make_callback())
class BrokerLinkAddForm(ObjectFrameTaskForm):
def __init__(self, app, name, task):
Modified: branches/sage_addition/cumin/python/cumin/messaging/connection.py
===================================================================
--- branches/sage_addition/cumin/python/cumin/messaging/connection.py 2011-06-24 15:31:24 UTC (rev 4853)
+++ branches/sage_addition/cumin/python/cumin/messaging/connection.py 2011-06-24 17:40:40 UTC (rev 4854)
@@ -50,7 +50,7 @@
raise Exception \
("Cannot close management connection %s" % conn.address)
- self.qmf_call(invoc, conn, "close")
+ self.app.remote.close(conn, invoc.make_callback())
class ConnectionSelector(ObjectSelector):
def __init__(self, app, name, vhost):
@@ -96,7 +96,7 @@
raise Exception \
("Cannot close management connection %s" % conn.address)
- self.qmf_call(invoc, conn, "close")
+ self.app.remote.close(conn, invoc.make_callback())
def get_item_content(self, session, item):
args = (item.remoteProcessName, item.remotePid)
@@ -168,14 +168,14 @@
return "Close"
def do_invoke(self, invoc, sess):
- self.qmf_call(invoc, sess, "close")
+ self.app.remote.close(sess, invoc.make_callback())
class SessionDetach(ObjectFrameTask):
def get_title(self, session):
return "Detach"
def do_invoke(self, invoc, sess):
- self.qmf_call(invoc, sess, "detach")
+ self.app.remote.detach(sess, invoc.make_callback())
class SessionSelector(ObjectSelector):
def __init__(self, app, name, conn):
@@ -205,11 +205,11 @@
return "Close"
def do_invoke(self, invoc, sess):
- self.qmf_call(invoc, sess, "close")
+ self.app.remote.close(sess, invoc.make_callback())
class SessionSelectionDetach(ObjectSelectorTask):
def get_title(self, session):
return "Detach"
def do_invoke(self, invoc, sess):
- self.qmf_call(invoc, sess, "detach")
+ self.app.remote.detach(sess, invoc.make_callback())
Modified: branches/sage_addition/cumin/python/cumin/messaging/queue.py
===================================================================
--- branches/sage_addition/cumin/python/cumin/messaging/queue.py 2011-06-24 15:31:24 UTC (rev 4853)
+++ branches/sage_addition/cumin/python/cumin/messaging/queue.py 2011-06-24 17:40:40 UTC (rev 4854)
@@ -101,8 +101,8 @@
def get_title(self, session):
return "Purge"
- def do_invoke(self, invoc, queue, count=0):
- self.qmf_call(invoc, queue, "purge", count)
+ def do_invoke(self, invoc, queue, count=0):
+ self.app.remote.purge_queue(queue, count, invoc.make_callback())
class QueueBindingSelector(BindingSelector):
def __init__(self, app, name, queue):
@@ -361,7 +361,7 @@
return "Purge"
def do_invoke(self, invoc, queue, count=0):
- self.qmf_call(invoc, queue, "purge", count)
+ self.app.remote.purge_queue(queue, count, invoc.make_callback())
class QueuePurgeForm(ObjectFrameTaskForm):
def __init__(self, app, name, task):
@@ -607,9 +607,8 @@
cls = self.app.model.org_apache_qpid_broker.Broker
broker = cls.get_object_by_id(cursor, vhost._brokerRef_id)
+ self.app.remote.queue_move_messages(broker, src, dst, count, invoc.make_callback())
- self.qmf_call(invoc, broker, "queueMoveMessages", src, dst, count)
-
class MoveQueueMessages(MoveMessagesBase):
def __init__(self, app, frame):
super(MoveQueueMessages, self).__init__(app, frame)
Modified: branches/sage_addition/cumin/python/cumin/model.py
===================================================================
--- branches/sage_addition/cumin/python/cumin/model.py 2011-06-24 15:31:24 UTC (rev 4853)
+++ branches/sage_addition/cumin/python/cumin/model.py 2011-06-24 17:40:40 UTC (rev 4854)
@@ -8,11 +8,12 @@
from cumin.formats import fmt_datetime, fmt_dict, fmt_none_brief, fmt_none,\
fmt_duration
-from cumin.util import calc_rate, JobStatusInfo, secs, wait
+from cumin.util import calc_rate, JobStatusInfo, secs
from cumin.sqladapter import SqlAdapter
from rosemary.sqlfilter import SqlComparisonFilter
from rosemary.sqlquery import SqlQueryOptions
from rosemary.model import RosemaryModel
+from sage.util import SyncSet
log = logging.getLogger("cumin.model")
@@ -305,136 +306,6 @@
return retval
-class CuminAction(object):
- def __init__(self, cls, name):
- self.model = cls.model
- self.mint = self.model.mint
- self.cumin_class = cls
-
- self.name = name
- self.title = None
- self.summary = False
- self.navigable = True
- self.aggregate = False
-
- self.cumin_class.add_action(self)
-
- def init(self):
- pass
-
- def show(self, session, object):
- raise Exception("Not implemented")
-
- # XXX we could even cache this if it gets called a lot
- def get_href(self, session, object):
- branch = session.branch()
- self.show(branch, object)
- return branch.marshal()
-
- def get_title(self, session):
- if self.title:
- return self.title
- else:
- return self.name
-
- def get_summary(self, session, object):
- pred0 = self.get_title(session)
- pred1 = self.cumin_class.get_object_title(session, object)
-
- return "%s %s" % (pred0, pred1)
-
- def get_enabled(self, session, object):
- return True
-
- def get_verb(self, session):
- return None
-
- def get_description(self, session, object):
- verb = self.get_verb(session)
-
- if not verb:
- verb = self.get_title(session)
-
- cls = self.cumin_class
-
- if cls and object:
- obj = None
-
- if type(object) is list:
- if len(object) == 1:
- obj = object[0]
- else:
- obj = object
-
- if obj is None:
- sobject = "%i %s" % \
- (len(object), cls.get_plural_title(session))
- else:
- sobject = cls.get_object_title(session, obj)
- elif cls:
- sobject = cls.get_title(session)
- else:
- # XXX lame
- sobject = str(object)
-
- return "%s %s" % (verb, sobject)
-
- def invoke(self, object, args={}):
- invoc = self.begin(object)
-
- def completion(status, args=None):
- invoc.status = status
- invoc.args = args
- #invoc.prt()
-
- try:
- return self.do_invoke(object, args, completion)
- except Exception, e:
- invoc.status = "failed"
- invoc.exception = e
-
- log.exception(e)
-
- #invoc.prt()
-
- return invoc
-
- def do_invoke(self, object, args, completion):
- raise Exception("Not implemented")
-
- def begin(self, object):
- invoc = CuminActionInvocation(self, object)
- self.model.invocations.add(invoc)
- return invoc
-
-class CuminActionInvocation(object):
- def __init__(self, action, object):
- self.action = action
- self.object = object
- self.when = datetime.now()
- self.status = "pending"
- self.args = None
- self.exception = None
-
- def get_description(self, session):
- return self.action.get_description(session, self.object)
-
- def prt(self):
- print "action", self.action.name, self.object, self.when, \
- self.status, self.args, self.exception
-
-class CuminSetAction(CuminAction):
- def __init__(self, cls, name):
- super(CuminSetAction, self).__init__(cls, name)
-
- self.aggregate = True
-
- def get_summary(self, session, object):
- pred0 = self.get_title(session)
- pred1 = self.cumin_class.get_plural_title(session)
-
- return "%s %s" % (pred0, pred1)
-
class SamplesSqlAdapter(SqlAdapter):
qmf_update_col = '_qmf_update_time'
@@ -775,116 +646,21 @@
except KeyError:
return "Unknown (%s)" % str(value)
-class GetStartedAction(CuminAction):
- def get_xml_response(self, session, object, *args):
- updateTime = object.statsCurr and object.statsCurr.qmfUpdateTime \
- or object.qmfUpdateTime
- delta = secs(updateTime) - secs(datetime.now())
- conf = "<age>%i</age>" % -delta
- rect = "<updatetime>%s</updatetime>" % fmt_duration(delta)
+class FetchRawConfigSet(object):
+ def __init__(self, timeout=5):
+ self.syncs = SyncSet(log, timeout)
- return "%s%s" % (conf, rect)
-
-class QmfCall(object):
- def __init__(self, app, default=None, timeout=5):
- self.app = app
- self.data = default
- self.got_data = False
- self.error = False
- self.status = None
- self.timeout = timeout
-
- def get_completion(self):
- def completion(status, data):
- self.status = status
- if (status == 0) or (status == "OK"):
- self.data = data
- self.got_data = True
- else:
- self.error = Exception(status)
- msg = "QMF call returned with status of %s" % str(status)
- log.error(msg)
- return completion
-
- def do_wait(self):
- wait(self.done, timeout=self.timeout)
- return self
-
- def done(self):
- return self.got_data or self.error
-
- def execute(self, obj, method_name, *args):
- session = self.app.session
- try:
- session.call_method(self.get_completion(), obj, method_name, args)
- except Exception, e:
- self.error = e
- log.exception(e)
-
- results = self.do_wait()
- if not results.got_data and not results.error:
- results.error = Exception("Request timed out")
- msg = "QMF call %s timed out" % method_name
- log.error(msg)
-
- return results
-
-class QmfCallSet(object):
- def __init__(self, app):
- self.app = app
- self.calls = dict()
- self.timeout = 5
-
- def add_call(self, key, default=None):
- call = QmfCall(self.app, default)
- self.calls[key] = call
- return call
-
- def do_wait(self):
- def predicate(calls):
- done = 0
- for call in calls:
- if calls[call].done():
- done += 1
- return done == len(calls)
-
- wait(predicate, timeout=self.timeout, args=self.calls)
- return self.calls
-
-class FetchRawConfigSet(QmfCallSet):
- def execute(self, negotiator, groups, prepend=""):
+ def execute(self, remote, negotiator, groups, prepend=""):
default = {'Value': 0}
- session = self.app.session
for group in groups:
- call = self.add_call(group, default)
-
+ sync = self.syncs.add_sync(group, default)
try:
- session.call_method(call.get_completion(), negotiator, "GetRawConfig", (prepend+group,))
+ remote.get_raw_config(negotiator, prepend+group, sync.get_completion())
except Exception, e:
- call.error = e
+ sync.error = e
log.exception(e)
+ return self.syncs.do_wait()
- return self.do_wait()
-
-class FetchJobAd(QmfCall):
- def __init__(self, app):
- super(FetchJobAd, self).__init__(app)
- self.data = {'JobAd': {"":{"VALUE":"", "TYPE":0}}}
-
-class FetchJobOutput(QmfCall):
- def __init__(self, app):
- super(FetchJobOutput, self).__init__(app)
- self.data = {'Data': ""}
-
- def execute(self, job_server, jobId, file, start, end):
- session = self.app.session
- try:
- session.call_method(self.get_completion(), job_server, "Fetch", (jobId, file, start, end))
- except Exception, e:
- self.error = e
- log.exception(e)
- return self.do_wait()
-
class Pool(object):
def __init__(self, id):
self.id = id
@@ -971,8 +747,7 @@
except KeyError:
pass
- self.model.app.session.call_method \
- (completion, self.negotiator, "GetLimits", ())
+ self.model.app.remote.get_limits(self.negotiator, completion)
def delete(self):
del self.model.limits_by_negotiator[self.negotiator._qmf_agent_id]
@@ -993,8 +768,7 @@
except KeyError:
pass
- self.model.app.session.call_method \
- (completion, self.submission, "GetJobSummaries", ())
+ self.model.app.remote.get_job_summaries(self.submission, completion)
def delete(self):
del self.model.job_summaries_by_submission[self.submission._id]
@@ -1016,9 +790,9 @@
except KeyError:
pass
- self.model.app.session.call_method \
- (completion, self.negotiator, "GetRawConfig", ("GROUP_NAMES",))
-
+ self.model.app.remote.get_raw_config(self.negotiator, "GROUP_NAMES",
+ completion)
+
def delete(self):
del self.model.group_names_by_negotiator[self.negotiator._qmf_agent_id]
@@ -1050,8 +824,10 @@
def update(self, cursor):
for config in self.configs:
- action = FetchRawConfigSet(self.model.app)
- raw_configs = action.execute(self.negotiator, self.configs[config], config+"_")
+ action = FetchRawConfigSet()
+ raw_configs = action.execute(self.model.app.remote,
+ self.negotiator,
+ self.configs[config], config+"_")
#for group in raw_configs:
# qmfc = raw_configs[group]
# qmfc.data = {'Value': 0.1}
@@ -1062,7 +838,7 @@
def update_new(self, cursor):
for config in self.configs:
- action = FetchRawConfigSet(self.model.app)
+ action = FetchRawConfigSet()
new_configs = list()
for group in self.configs.keys():
if group not in self.data.keys() or \
@@ -1070,7 +846,8 @@
not self.data[config][group].data:
new_configs.append(group)
- raw_configs = action.execute(self.negotiator, new_configs, config+"_")
+ raw_configs = action.execute(self.model.app.remote,
+ self.negotiator, new_configs, config+"_")
for group in raw_configs:
self.data[config][group] = raw_configs[group]
@@ -1089,4 +866,4 @@
class NegotiatorStaticGroupConfigValuesStore(NegotiatorGroupConfigValuesStore):
def __init__(self, model, negotiator, groups, config):
- super(NegotiatorStaticGroupConfigValuesStore, self).__init__(model, negotiator, groups, config)
\ No newline at end of file
+ super(NegotiatorStaticGroupConfigValuesStore, self).__init__(model, negotiator, groups, config)
Modified: branches/sage_addition/cumin/python/cumin/objecttask.py
===================================================================
--- branches/sage_addition/cumin/python/cumin/objecttask.py 2011-06-24 15:31:24 UTC (rev 4853)
+++ branches/sage_addition/cumin/python/cumin/objecttask.py 2011-06-24 17:40:40 UTC (rev 4854)
@@ -93,16 +93,6 @@
def do_invoke(self, invoc, obj, *args, **kwargs):
pass
- def qmf_call(self, invoc, obj, meth, *args):
- def completion(status_code, output_args):
- invoc.status_code = status_code
- invoc.output_args = output_args
-
- invoc.end()
-
- session = self.app.session
- session.call_method(completion, obj, meth, args)
-
def exception(self, invoc, e):
now = datetime.now()
Modified: branches/sage_addition/cumin/python/cumin/qmfadapter.py
===================================================================
--- branches/sage_addition/cumin/python/cumin/qmfadapter.py 2011-06-24 15:31:24 UTC (rev 4853)
+++ branches/sage_addition/cumin/python/cumin/qmfadapter.py 2011-06-24 17:40:40 UTC (rev 4854)
@@ -1,28 +1,25 @@
from operator import itemgetter
-from cumin.model import QmfCall
from wooly.datatable import DataAdapter, DataAdapterField
import logging
log = logging.getLogger("cumin.qmfadapter")
class QmfAdapter(DataAdapter):
- def __init__(self, app, method):
+ def __init__(self, app):
super(QmfAdapter, self).__init__()
self.app = app
- self.method = method
self.default = list()
self.columns = list()
self.max_sortable_records = app.max_qmf_table_sort
def get_count(self, values):
- obj = values['obj']
- args = values['args']
+ # This used to make a Qmf call, but all descendent
+ # classes overloaded this method and the argument passed in to
+ # the constructor that named the method was not a Qmf method
+ # name in any case. Broken somewhere in history, but unused,
+ # hence removed.
+ raise Exception("Not implemented")
- action = QmfCall(self.app)
- results = action.execute(obj, self.method, args)
-
- return results and len(results) or 0
-
def get_data(self, values, options):
data = self.do_get_data(values)
@@ -40,17 +37,13 @@
return rows
def do_get_data(self, values):
- obj = values['obj']
- args = values['args']
+ # This used to make a Qmf call, but all descendent
+ # classes overloaded this method and the argument passed in to
+ # the constructor that named the method was not a Qmf method
+ # name in any case. Broken somewhere in history, but unused,
+ # hence removed.
+ raise Exception("Not implemented")
- action = QmfCall(self.app)
- results = action.execute(obj, self.method, args)
-
- if results.error:
- results.data = self.default
-
- return results.data
-
def process_results(self, results):
""" take the dict response from the qmf call and return a list of lists """
@@ -122,7 +115,7 @@
class ObjectQmfAdapter(QmfAdapter):
def __init__(self, app, cls):
- super(ObjectQmfAdapter, self).__init__(app, cls._name)
+ super(ObjectQmfAdapter, self).__init__(app)
self.cls = cls
self.default_field_cls = ObjectQmfField
Modified: branches/sage_addition/cumin/python/cumin/task.py
===================================================================
--- branches/sage_addition/cumin/python/cumin/task.py 2011-06-24 15:31:24 UTC (rev 4853)
+++ branches/sage_addition/cumin/python/cumin/task.py 2011-06-24 17:40:40 UTC (rev 4854)
@@ -102,16 +102,6 @@
def do_invoke(self, session, obj, invoc, **kwargs):
pass
- def qmf_call(self, invoc, obj, meth, *args):
- def completion(status_code, output_args):
- invoc.status_code = status_code
- invoc.output_args = output_args
-
- invoc.end()
-
- session = self.app.session
- session.call_method(completion, obj, meth, args)
-
def exception(self, invoc, e):
now = datetime.now()
@@ -181,6 +171,13 @@
log.info("Ended %s", self.task)
+ def make_callback(self):
+ def completion(status_code, output_args):
+ self.status_code = status_code
+ self.output_args = output_args
+ self.end()
+ return completion
+
class TaskInvocationSet(Widget):
def __init__(self, app, name):
super(TaskInvocationSet, self).__init__(app, name)
Modified: branches/sage_addition/cumin/python/cumin/widgets.py
===================================================================
--- branches/sage_addition/cumin/python/cumin/widgets.py 2011-06-24 15:31:24 UTC (rev 4853)
+++ branches/sage_addition/cumin/python/cumin/widgets.py 2011-06-24 17:40:40 UTC (rev 4854)
@@ -175,57 +175,6 @@
def get_title(self, session):
return self.render_title(session)
-class CuminView(Widget):
- def __init__(self, app, name, object):
- super(CuminView, self).__init__(app, name)
-
- self.object = object
-
- summary = CuminSummary(app, "summary", self.object)
- self.add_child(summary)
-
- self.__frame_tmpl = WidgetTemplate(self, "frame_html")
- self.sticky_frame = None
-
- def render_script(self, session):
- return None
-
- def render_icon_resource(self, session):
- return "action-36.png"
-
- def render_frames(self, session):
- writer = Writer()
-
- for frame in reversed(self.frame.ancestors):
- if isinstance(frame, CuminFrame):
- self.__frame_tmpl.render(writer, session, frame)
-
- self.__frame_tmpl.render(writer, session, self.frame)
-
- return writer.to_string()
-
- def render_frame(self, session, frame):
- if frame == self.frame:
- html = frame.render_title(session)
- else:
- obj = None
-
- if frame.object:
- obj = frame.object.get(session)
-
- href = frame.get_href(session, obj)
- content = frame.render_title(session)
-
- html = fmt_link(href, content)
-
- return html
-
- def do_process(self, session):
- if self.sticky_frame:
- session.client_session.attributes["sticky_%s" % \
- self.sticky_frame.name] = self.frame.object.get(session)
- super(CuminView, self).do_process(session)
-
class BackgroundInclude(Widget):
def __init__(self, app, name):
super(BackgroundInclude, self).__init__(app, name)
@@ -466,57 +415,6 @@
def render_icon_href(self, session, *args):
return "resource?name=action-36.png"
-class CuminSummary(Widget):
- def __init__(self, app, name, object):
- super(CuminSummary, self).__init__(app, name)
-
- self.object = object
-
- props = self.SummaryProperties(app, "properties", self.object)
- self.add_child(props)
-
- tasks = self.SummaryTasks(app, "tasks", self.object)
- self.add_child(tasks)
-
- def render_title(self, session):
- obj = self.object.get(session)
- cls = self.app.model.get_class_by_object(obj)
-
- if cls:
- return xml_escape(cls.get_object_name(obj))
- elif obj:
- return obj.get_title()
-
- def render_icon_href(self, session):
- obj = self.object.get(session)
- cls = self.app.model.get_class_by_object(obj)
-
- if cls:
- return cls.get_icon_href(session)
-
- class SummaryProperties(CuminProperties):
- def do_get_items(self, session):
- obj = self.object.get(session)
- cls = self.app.model.get_class_by_object(obj)
-
- if cls:
- return [(x.get_title(session),
- x.value(session, obj),
- x.escape)
- for x in cls.properties if x.summary]
-
- class SummaryTasks(CuminTasks):
- def do_get_items(self, session):
- obj = self.object.get(session)
- cls = self.app.model.get_class_by_object(obj)
-
- if cls:
- return [(x.get_href(session, obj),
- x.get_title(session),
- x.is_enabled(session, obj))
- for x in cls.tasks
- if not x.aggregate and x.form and x.navigable]
-
class StateSwitch(ItemSet):
def __init__(self, app, name):
super(StateSwitch, self).__init__(app, name)
Modified: branches/sage_addition/etc/Profile.bash
===================================================================
--- branches/sage_addition/etc/Profile.bash 2011-06-24 15:31:24 UTC (rev 4853)
+++ branches/sage_addition/etc/Profile.bash 2011-06-24 17:40:40 UTC (rev 4854)
@@ -1,5 +1,5 @@
export DEVEL_HOME="$PWD"
-export DEVEL_MODULES="mint cumin basil parsley wooly rosemary"
+export DEVEL_MODULES="mint cumin basil parsley wooly rosemary sage"
if [[ -z "$DEVEL_ORIGINAL_PATH" ]]; then
export DEVEL_ORIGINAL_PATH="$PATH"
Modified: branches/sage_addition/rosemary/python/rosemary/model.py
===================================================================
--- branches/sage_addition/rosemary/python/rosemary/model.py 2011-06-24 15:31:24 UTC (rev 4853)
+++ branches/sage_addition/rosemary/python/rosemary/model.py 2011-06-24 17:40:40 UTC (rev 4854)
@@ -271,9 +271,9 @@
stat = RosemaryStatistic(self, child.get("name"))
stat.load(child)
- for child in elem.findall("method"):
- meth = RosemaryMethod(self, child.get("name"))
- meth.load(child)
+# for child in elem.findall("method"):
+# meth = RosemaryMethod(self, child.get("name"))
+# meth.load(child)
def extend(self, elem):
log.debug("Extending %s", self)
@@ -294,9 +294,9 @@
stat = self._statistics_by_name[child.get("name")]
stat.extend(child)
- for child in elem.findall("method"):
- meth = self._methods_by_name[child.get("name")]
- meth.extend(child)
+# for child in elem.findall("method"):
+# meth = self._methods_by_name[child.get("name")]
+# meth.extend(child)
for child in elem.findall("index"):
idx = RosemaryIndex(self, child.get("name"))
Added: branches/sage_addition/sage/python/sage/__init__.py
===================================================================
Added: branches/sage_addition/sage/python/sage/aviaryoperations.py
===================================================================
--- branches/sage_addition/sage/python/sage/aviaryoperations.py (rev 0)
+++ branches/sage_addition/sage/python/sage/aviaryoperations.py 2011-06-24 17:40:40 UTC (rev 4854)
@@ -0,0 +1,4 @@
+class AviaryOperations(object):
+ def __init__(self, name):
+
+ self.name = name
Added: branches/sage_addition/sage/python/sage/qmfoperations.py
===================================================================
--- branches/sage_addition/sage/python/sage/qmfoperations.py (rev 0)
+++ branches/sage_addition/sage/python/sage/qmfoperations.py 2011-06-24 17:40:40 UTC (rev 4854)
@@ -0,0 +1,181 @@
+import logging
+from sage.util import CallSync, wait
+
+log = logging.getLogger("sage.remoteoperations")
+
+class QmfOperations(object):
+ '''
+ Provides an interface to QMF remote procedure calls.
+ Public methods may implement synchronous or asynchronous
+ calling semantics, or may support either one depending on the parameters
+ passed.
+ If a method takes a 'callback' parameter and that parameter is not
+ None, the method will have asynchronous calling semantics. Otherwise,
+ the method will have synchronous calling semantics and the 'default' and
+ 'timeout' parameters will be used.
+ The 'callback' parameter if set is expected to be a reference to a
+ function that can be invoked by self.session.call_method() when an
+ operation is complete.
+ '''
+
+ def __init__(self, name, session):
+ '''
+ If this object is added to a RemoteOperations object as a mechanism,
+ the 'name' parameter will be used to create an attribute which points
+ to this object.
+ The 'session' parameter is expected to be an object that defines the
+ following method:
+
+ def call_method(callback, obj, meth, args)
+
+ session.call_method should be capable of invoking the 'meth' method
+ on the 'obj' object with the tuple 'args' as arguments. It should
+ be capable of passing results to the 'callback' function. Any
+ 'callback' parameter passed to a public method of a QmfOperations object
+ should have a signature that is compatible with that expected by
+ session.call_method().
+ '''
+
+ self.name = name
+ self.session = session
+
+ # Note, methods below were written to be synchronous, asynchronous, or
+ # selectable to reflect existing usage of QMF methods in cumin at
+ # the time 'sage' was created. Any of these methods may be changed to
+ # have synchronous or asynchronous calling semantics by using the callback,
+ # default, and timeout paramters and the self._call method.
+
+# broker operations
+
+ def queue_move_messages(self, broker, src, dst, count, callback):
+ assert callback
+ self._call(broker, "queueMoveMessages", callback, 0, 0, src, dst, count)
+
+# methods on various broker objects. close is implemented on multiple objects
+
+ def close(self, obj, callback):
+ assert callback
+ self._call(obj, "close", callback, 0, 0)
+
+ def detach(self, obj, callback):
+ assert callback
+ self._call(obj, "detach", callback, 0, 0)
+
+ def bridge(self, link, durable, src, dest, key, tag, excludes, srcIsQueue, srcIsLocal, dynamic, sync, callback):
+ assert callback
+ self._call(link, "bridge", callback, 0, 0,
+ durable, src, dest, key, tag, excludes, srcIsQueue, srcIsLocal, dynamic, sync)
+
+ def connect(self, obj, host, port, durable, mech, username, password, transport, callback):
+ assert callback
+ self._call(obj, "connect", callback, 0, 0,
+ host, port, durable, mech, username, password, transport)
+
+# queue operations
+
+ def purge_queue(self, queue, count, callback):
+ assert callback
+ self._call(queue, "purge", callback, 0, 0, count)
+
+# scheduler operations
+
+ def set_job_attribute(self, scheduler, job_id, name, value, callback):
+ assert callback
+ self._call(scheduler, "SetJobAttribute", callback, 0, 0,
+ job_id, name, value)
+
+ def submit_job(self, scheduler, ad, callback):
+ assert callback
+ self._call(scheduler, "SubmitJob", callback, 0, 0, ad)
+
+ def hold_job(self, scheduler, job_id, reason,
+ callback=None, default=None, timeout=5):
+ '''
+ This method is asynchronous iff 'callback' is supplied.
+ '''
+ return self._call(scheduler, "HoldJob", callback, default,
+ timeout, job_id, reason)
+
+ def release_job(self, scheduler, job_id, reason,
+ callback=None, default=None, timeout=5):
+ '''
+ This method is asynchronous iff 'callback' is supplied.
+ '''
+ return self._call(scheduler, "ReleaseJob", callback, default,
+ timeout, job_id, reason)
+
+ def remove_job(self, scheduler, job_id, reason,
+ callback=None, default=None, timeout=5):
+ '''
+ This method is asynchronous iff 'callback' is supplied.
+ '''
+ return self._call(scheduler, "RemoveJob", callback, default,
+ timeout, job_id, reason)
+
+# jobserver operations
+
+ def get_job_ad(self, job_server, job_id, default=None, timeout=5):
+ return self._call(job_server, "GetJobAd", 0, default, timeout, job_id)
+
+ def fetch_job_data(self, job_server, job_id, file, start, end,
+ default=None, timeout=5):
+ return self._call(job_server, "FetchJobData", 0, default, timeout,
+ job_id, file, start, end)
+
+# negotiator operations
+
+ def set_limit(self, negotiator, lim_name, lim_max, default=None, timeout=5):
+ return self._call(negotiator, "SetLimit", 0, default, timeout,
+ lim_name, lim_max)
+
+ def get_limits(self, negotiator, callback):
+ assert callback
+ return self._call(negotiator, "GetLimits", callback, 0, 0)
+
+ def reconfig(self, negotiator, default=None, timeout=5):
+ return self._call(negotiator, "Reconfig", 0, default, timeout)
+
+ def set_raw_config(self, negotiator, name, value, default=None, timeout=5):
+ return self._call(negotiator, "SetRawConfig", 0, default, timeout,
+ name, value)
+
+ def get_raw_config(self, negotiator, name, callback):
+ assert callback
+ return self._call(negotiator, "GetRawConfig", callback, 0, 0, name)
+
+# master operations
+
+ def start(self, master, daemon, callback):
+ assert callback
+ return self._call(master, "Start", callback, 0, 0, daemon)
+
+ def stop(self, master, daemon, callback):
+ assert callback
+ return self._call(master, "Stop", callback, 0, 0, daemon)
+
+# submission operations
+
+ def get_job_summaries(self, submission, callback):
+ assert callback
+ return self._call(submission, "GetJobSummaries", callback, 0, 0)
+
+# Secret private implementation stuff, don't look!
+ def _call(self, obj, meth, cb, dflt, tout, *args):
+ if cb:
+ self.session.call_method(cb, obj, meth, args)
+ else:
+ try:
+ sync = CallSync(log, dflt)
+ self.session.call_method(sync.get_completion(),
+ obj, meth, args)
+ except Exception, e:
+ sync.error = e
+ log.exception(e)
+
+ wait(sync.done, timeout=tout)
+ if not sync.got_data and not sync.error:
+ sync.error = Exception("Request timed out")
+ msg = "QMF call %s timed out" % meth
+ log.error(msg)
+ return sync
+
Added: branches/sage_addition/sage/python/sage/remoteoperations.py
===================================================================
--- branches/sage_addition/sage/python/sage/remoteoperations.py (rev 0)
+++ branches/sage_addition/sage/python/sage/remoteoperations.py 2011-06-24 17:40:40 UTC (rev 4854)
@@ -0,0 +1,51 @@
+class RemoteOperations(object):
+
+ def __init__(self):
+ self.mechanisms = []
+
+ def add_mechanisms(self, mechs):
+ '''
+ Add an item or items to the list of supported mechanisms.
+ A mechanism is expected to be an instance of a class containing
+ methods that implement remote procedure calls.
+ When attribute access is done on an instance of RemoteOperations,
+ the mechanism list is searched in order for a class which contains
+ a method of the given name (assuming the attribute is not contained
+ directly in the RemoteOperations instance itself). This allows methods
+ on mechanism objects to be called as if they are attributes of
+ RemoteOperations, without the caller having knowledge of which mechanism
+ ultimately provides the method.
+ If a mechanism object has an attribute called 'name', a reference to the
+ object is added as an attribute of RemoteOperations named 'name'. This
+ allows a specific mechamism to be used if desired (for example,
+ the hold_job method might be invoked as remote.qmf.hold_job(), ensuring
+ that the QMF version is used, rather than remote.hold_job() which will
+ use the first hold_job method located in the mechanism list).
+ '''
+
+ # Allow mechanism to be specifially chosen by name
+ # rather than determined by precedence order
+ # remote.qmf.Op() vs remote.Op(), for example
+ def make_mech_attr(m):
+ if hasattr(m, "name"):
+ setattr(self, m.name, m)
+
+ if type(mechs) in (tuple, list):
+ for m in mechs:
+ self.mechanisms.append(m)
+ make_mech_attr(m)
+ else:
+ self.mechanisms.append(mechs)
+ make_mech_attr(mechs)
+
+ def __getattr__(self, name):
+ # Reminder, __getattr__ is called when an attribute
+ # cannot be found in the object.
+ for m in self.mechanisms:
+ if hasattr(m, name):
+ a = getattr(m, name)
+ if callable(a):
+ return a
+ raise AttributeError("No mechanism has callable %s" % name)
+
+