Author: tmckay Date: 2011-08-15 18:06:36 +0000 (Mon, 15 Aug 2011) New Revision: 4927
Modified: trunk/cumin/bin/cumin-web trunk/cumin/etc/cumin.conf trunk/cumin/python/cumin/config.py trunk/cumin/python/cumin/main.py trunk/sage/python/sage/aviary/aviaryoperations.py Log: Add aviary-job-servers and aviary-query-servers values to cumin.conf. Aviary operations will select an available server at random from the list of servers running on a particular host (host is derived from context based on the operation, ie from the scheduler object associated with a job for job operations).
Modified: trunk/cumin/bin/cumin-web =================================================================== --- trunk/cumin/bin/cumin-web 2011-08-15 17:45:52 UTC (rev 4926) +++ trunk/cumin/bin/cumin-web 2011-08-15 18:06:36 UTC (rev 4927) @@ -16,17 +16,15 @@
def set_aviary_configs(cumin, values): cumin.use_aviary = values.use_aviary - cumin.aviary_host = values.aviary_host - cumin.aviary_job_host = values.aviary_job_host - cumin.aviary_query_host = values.aviary_query_host - cumin.aviary_job_port = values.aviary_job_port - cumin.aviary_query_port = values.aviary_query_port + cumin.aviary_job_servers = values.aviary_job_servers + cumin.aviary_query_servers = values.aviary_query_servers
- def set_wallaby_configs(cumin, values, brokers): if values.wallaby_broker == "": cumin.wallaby_broker = brokers[0] else: + if values.wallaby_broker == "None": + values.wallaby_broker = None cumin.wallaby_broker = values.wallaby_broker # Let 0 indicate "no timeout", since the timeout # value is an int in the config and None can't be
Modified: trunk/cumin/etc/cumin.conf =================================================================== --- trunk/cumin/etc/cumin.conf 2011-08-15 17:45:52 UTC (rev 4926) +++ trunk/cumin/etc/cumin.conf 2011-08-15 18:06:36 UTC (rev 4927) @@ -13,11 +13,12 @@ [common] # database: dbname=cumin user=cumin host=localhost # brokers: localhost:5672 -# sasl-mech-list: [allow all available mechanisms] +# sasl-mech-list: [default, allow all available mechanisms] +# wallaby-broker: [default, first item in 'brokers' list] +# wallaby-refresh: 60 # use-aviary: True -# aviary-host: localhost -# aviary-job-port: 9090 -# aviary-query-port: 9091 +# aviary-job-servers: http://localhost:9090 +# aviary-query-servers: http://localhost:9091 # log-level: info # log-max-mb: 10 # log-max-archives: 1 @@ -141,6 +142,43 @@
# [common]
+# wallaby-broker: [first item in 'brokers' list] +# The broker to use for interaction with a Wallaby agent. +# This is not necessarily the same broker used for grid +# and messaging information. The default value is the +# first item in the 'brokers' list if not specified. To +# turn off interaction with Wallaby, set wallaby-broker to +# the string 'None'. + +# wallaby-refresh: 60 +# How often in seconds to contact the Wallaby agent +# for updated information. + +# use-aviary: True +# Whether or not to use the Aviary services for +# remote procedure calls to condor. If this is +# set to False, the QMF interface will be used +# instead. + +# aviary-job-servers: http://localhost:9090 +# Specifies the URIs for aviary job servers. The value +# is a comma separated list of URIs. A full URI has the +# form 'scheme://user/password@host:port/path". The scheme +# will default to http if not specified, the port will +# default to 9090, and the path will default to +# /services/job/. User and password will be empty by +# default. As a convenience, a URI that explicitly +# sets a port number may be followed by one or more +# port numbers separated by commas to specify +# mulitple job servers whose URIs differ only +# by port number. + +# aviary-query-servers: http://localhost:9091 +# Like aviary-job-servers but specifies URIs for aviary +# query servers. The port value defaults to 9091 and the +# path defaults to /services/query/. Other +# defaults are as noted for aviary-job-servers. + # log-max-mb: 10 # Maximum size in MB of *.log files created by cumin. # A log file reaching maximum size will be rolled over.
Modified: trunk/cumin/python/cumin/config.py =================================================================== --- trunk/cumin/python/cumin/config.py 2011-08-15 17:45:52 UTC (rev 4926) +++ trunk/cumin/python/cumin/config.py 2011-08-15 18:06:36 UTC (rev 4927) @@ -158,24 +158,12 @@ param = ConfigParameter(self, "use-aviary", bool) param.default = True
- param = ConfigParameter(self, "aviary-host", str) - param.default = "localhost" + param = ConfigParameter(self, "aviary-job-servers", str) + param.default = "http://localhost:9090"
- # Use aviary-host for all services, unless these are - # set explicitly as an override. This allows different - # services to be hosted on different machines if desired. - param = ConfigParameter(self, "aviary-job-host", str) - param.default = "" + param = ConfigParameter(self, "aviary-query-servers", str) + param.default = "http://localhost:9091"
- param = ConfigParameter(self, "aviary-query-host", str) - param.default = "" - - param = ConfigParameter(self, "aviary-job-port", int) - param.default = 9090 - - param = ConfigParameter(self, "aviary-query-port", int) - param.default = 9091 - self.log_file = ConfigParameter(self, "log-file", str)
param = ConfigParameter(self, "log-level", str)
Modified: trunk/cumin/python/cumin/main.py =================================================================== --- trunk/cumin/python/cumin/main.py 2011-08-15 17:45:52 UTC (rev 4926) +++ trunk/cumin/python/cumin/main.py 2011-08-15 18:06:36 UTC (rev 4927) @@ -85,11 +85,8 @@ # These values are expected to be filled in if # use_aviary is left True. self.use_aviary = True - self.aviary_host = None - self.aviary_job_host = None - self.aviary_query_host = None - self.aviary_job_port = None - self.aviary_query_port = None + self.aviary_job_servers = "" + self.aviary_query_servers = ""
self.wallaby = None self.wallaby_broker = None @@ -173,17 +170,9 @@ from sage.aviary.aviaryoperations import AviaryOperations aviary_dir = os.path.join(self.home, "rpc-defs/aviary")
- # If aviary_job_host or aviary_query_host are "" or - # None then they inherit the value of aviary_host. - if not self.aviary_job_host: - self.aviary_job_host = self.aviary_host - if not self.aviary_query_host: - self.aviary_query_host = self.aviary_host ops.insert(0, AviaryOperations("aviary", aviary_dir, - self.aviary_job_host, - self.aviary_job_port, - self.aviary_query_host, - self.aviary_query_port)) + self.aviary_job_servers, + self.aviary_query_servers)) self.remote.add_mechanisms(ops)
# Create RPC interface for Wallaby
Modified: trunk/sage/python/sage/aviary/aviaryoperations.py =================================================================== --- trunk/sage/python/sage/aviary/aviaryoperations.py 2011-08-15 17:45:52 UTC (rev 4926) +++ trunk/sage/python/sage/aviary/aviaryoperations.py 2011-08-15 18:06:36 UTC (rev 4927) @@ -2,9 +2,11 @@
from suds import * from suds.client import Client -from sage.util import CallSync, CallThread, ObjectPool +from sage.util import CallSync, CallThread, ObjectPool, host_list import threading import logging +import random +import urllib2
log = logging.getLogger("sage.aviary")
@@ -49,29 +51,26 @@ - can we use default/timeout with suds? yes, there is at least a timeout that can be set once, it might be possible to change the timeout per call. - -- Add some nice error message that indicates the connection was -refused to the aviary service. This probably means that aviary -is not running on that port -- more helpful than "connection refulsed" '''
class AviaryOperations(object): - def __init__(self, name, datadir, job_host, job_port, query_host, query_port): - assert type(job_host) is str - assert type(query_host) is str - + def __init__(self, name, datadir, job_servers, query_servers): self.name = name self.datadir = datadir
- # Add http:// if there is no protocol... - if job_host.find("://") < 0: - job_host = "http://" + job_host - if query_host.find("://") < 0: - query_host = "http://" + query_host + # job_servers and query_servers are comma separated lists of + # network locations. See comments on host_port_list for format. + self.job_servers = host_list(job_servers, + default_scheme = "http", + default_port="9090", + default_path="/services/job/") + + + self.query_servers = host_list(job_servers, + default_scheme = "http", + default_port="9091", + default_path="/services/query/")
- self.job_url = job_host + ":" + str(job_port) + "/services/job/" - self.query_url = query_host + ":" + str(query_port) + "/services/query/" - job_wsdl = "file:" + os.path.join(self.datadir, "aviary-job.wsdl") query_wsdl = "file:" + os.path.join(self.datadir, "aviary-query.wsdl")
@@ -89,15 +88,19 @@
def my_callback(result): self.job_client_pool.return_object(job_client) + result = self._pretty_result(result, scheduler.Machine) # massage results for use by standard callback cb_args = self._cb_args_dataless(result) callback(*cb_args)
- url = self.job_url + "setJobAttribute" - #print "Aviary set job attribute " + url + host = self._get_host(scheduler.Machine, self.job_servers) + if host == "": + self._raise_no_host(scheduler.Machine)
+ service = host + "setJobAttribute" + # Have to set the URL for the method. This might go away someday... - job_client.set_options(location=url) + job_client.set_options(location=service)
# Make a job id parameter (see job wsdl) jobId = job_client.factory.create('ns0:JobID') @@ -124,6 +127,7 @@ # so allow_overrides isn't set for someone else... job_client.set_allow_overrides(False) self.job_client_pool.return_object(job_client) + result = self._pretty_result(result, scheduler.Machine) if isinstance(result, Exception): callback(result, None) else: @@ -133,11 +137,14 @@ status = AviaryOperations._get_status(result.status) callback(status, result.id)
- url = self.job_url + "submitJob" - #print "Aviary submit job " + url + host = self._get_host(scheduler.Machine, self.job_servers) + if host == "": + self._raise_no_host(scheduler.Machine)
+ service = host + "submitJob" + # Have to set the URL for the method. This might go away someday... - job_client.set_options(location=url) + job_client.set_options(location=service)
# Set basic attributes in the order defined by aviary-job.wsdl. args = list() @@ -217,6 +224,25 @@ return {int: "INTEGER", float: "FLOAT", str: "STRING", bool: "BOOLEAN"}
@classmethod + def _get_host(cls, name, servers): + host = "" + if name in servers: + urls = servers[name] + if len(urls) > 0: + host = str(random.sample(urls, 1)[0]) + # A particular method name is going to be appended to path, + # so ensure the file "/" here. Since we supply default path + # values when the host list is parsed, we know the last portion + # of the host string has to be a path. + if not host.endswith("/"): + host += "/" + return host + + @classmethod + def _raise_no_host(cls, name): + raise Exception("No aviary job servers specified for %s, check config files" % name) + + @classmethod def _get_status(cls, result): # For Aviary operations, if the operation # did not work the reason is in the text field. @@ -236,6 +262,12 @@ status = AviaryOperations._get_status(result) return (status, None)
+ @classmethod + def _pretty_result(cls, result, host): + if isinstance(result, urllib2.URLError): + return Exception("Trouble reaching host %s, %s" % (host, result.reason)) + return result + def _call_sync(self, process_results, meth, *meth_args, **meth_kwargs): # Common interface with QMF operations requires that a MethodResult # object (or something just like it) be returned for synchronous calls. @@ -253,13 +285,18 @@ def _control_job(self, scheduler, job_id, reason, meth_name, callback, default, timeout):
- url = self.job_url + meth_name + host = self._get_host(scheduler.Machine, self.job_servers) + if host == "": + self._raise_no_host(scheduler.Machine) + + service = host + meth_name + client = self.job_client_pool.get_object() meth = getattr(client.service, meth_name)
# Have to set the URL for the method. This might go away someday... #print "Aviary control job " + url - client.set_options(location=url) + client.set_options(location=service)
# Make a job id parameter (see job wsdl) jobId = client.factory.create('ns0:JobID') @@ -270,13 +307,20 @@ if callback: def my_callback(result): self.job_client_pool.return_object(client) + # Fix up the exception message if necessary + result = self._pretty_result(result, scheduler.Machine) cb_args = self._cb_args_dataless(result) callback(*cb_args)
t = CallThread(meth, my_callback, jobId, reason) t.start() else: - res = self._call_sync(self._cb_args_dataless, meth, jobId, reason) + def my_process_results(result): + # Fix up the exception message if necessary + result = self._pretty_result(result, scheduler.Machine) + return self._cb_args_dataless(result) + + res = self._call_sync(my_process_results, meth, jobId, reason) self.job_client_pool.return_object(client) return res;
cumin-developers@lists.fedorahosted.org