Author: tmckay Date: 2011-08-30 20:59:44 +0000 (Tue, 30 Aug 2011) New Revision: 4940
Added: trunk/sage/python/sage/aviary/https.py Modified: trunk/cumin/bin/cumin-web trunk/cumin/python/cumin/config.py trunk/cumin/python/cumin/main.py trunk/sage/python/sage/aviary/aviaryoperations.py Log: Add support for ssl connection to aviary.
Modified: trunk/cumin/bin/cumin-web =================================================================== --- trunk/cumin/bin/cumin-web 2011-08-29 18:00:52 UTC (rev 4939) +++ trunk/cumin/bin/cumin-web 2011-08-30 20:59:44 UTC (rev 4940) @@ -18,6 +18,8 @@ cumin.use_aviary = values.use_aviary cumin.aviary_job_servers = values.aviary_job_servers cumin.aviary_query_servers = values.aviary_query_servers + cumin.aviary_key = values.aviary_key + cumin.aviary_cert = values.aviary_cert
def set_wallaby_configs(cumin, values, brokers): if values.wallaby_broker == "":
Modified: trunk/cumin/python/cumin/config.py =================================================================== --- trunk/cumin/python/cumin/config.py 2011-08-29 18:00:52 UTC (rev 4939) +++ trunk/cumin/python/cumin/config.py 2011-08-30 20:59:44 UTC (rev 4940) @@ -164,6 +164,12 @@ param = ConfigParameter(self, "aviary-query-servers", str) param.default = "http://localhost:9091"
+ param = ConfigParameter(self, "aviary-key", str) + param.default = "" + + param = ConfigParameter(self, "aviary-cert", str) + param.default = "" + self.log_file = ConfigParameter(self, "log-file", str)
param = ConfigParameter(self, "log-level", str)
Modified: trunk/cumin/python/cumin/main.py =================================================================== --- trunk/cumin/python/cumin/main.py 2011-08-29 18:00:52 UTC (rev 4939) +++ trunk/cumin/python/cumin/main.py 2011-08-30 20:59:44 UTC (rev 4940) @@ -87,6 +87,8 @@ self.use_aviary = True self.aviary_job_servers = "" self.aviary_query_servers = "" + self.aviary_key = "" + self.aviary_cert = ""
self.wallaby = None self.wallaby_broker = None @@ -172,7 +174,8 @@
ops.insert(0, AviaryOperations("aviary", aviary_dir, self.aviary_job_servers, - self.aviary_query_servers)) + self.aviary_query_servers, + self.aviary_key, self.aviary_cert)) self.remote.add_mechanisms(ops)
# Create RPC interface for Wallaby
Modified: trunk/sage/python/sage/aviary/aviaryoperations.py =================================================================== --- trunk/sage/python/sage/aviary/aviaryoperations.py 2011-08-29 18:00:52 UTC (rev 4939) +++ trunk/sage/python/sage/aviary/aviaryoperations.py 2011-08-30 20:59:44 UTC (rev 4940) @@ -9,8 +9,10 @@
from suds import * from suds.client import Client +from suds.transport.https import HttpAuthenticated from sage.util import CallSync, CallThread, ObjectPool, host_list from datetime import datetime +from sage.aviary.https import *
log = logging.getLogger("sage.aviary")
@@ -56,7 +58,8 @@ '''
class AviaryOperations(object): - def __init__(self, name, datadir, job_servers, query_servers): + def __init__(self, name, datadir, job_servers, query_servers, + key="", cert=""): self.name = name self.datadir = datadir
@@ -87,29 +90,34 @@ self.type_to_aviary = self._type_to_aviary() self.aviary_to_type = self._aviary_to_type()
+ self.key = key + self.cert = cert + # job server operations
def set_job_attribute(self, scheduler, job_id, name, value, callback, submission): assert callback
- job_client = self.job_client_pool.get_object() - def my_callback(result): self.job_client_pool.return_object(job_client) result = self._pretty_result(result, scheduler.Machine) # massage results for use by standard callback cb_args = self._cb_args_dataless(result) callback(*cb_args) - - host = self._get_host(scheduler.Machine, self.job_servers) - if host == "": - self._raise_no_host(scheduler.Machine) - - service = host + "setJobAttribute" - + +# job_client = self.job_client_pool.get_object() +# scheme, host = self._get_host(scheduler.Machine, self.job_servers) +# if host == "": +# self._raise_no_host(scheduler.Machine) +# service = host + "setJobAttribute" # Have to set the URL for the method. This might go away someday... - job_client.set_options(location=service) +# job_client.set_options(location=service)
+ job_client = self._setup_client(self.job_client_pool, + scheduler.Machine, + self.job_servers, + "setJobAttribute") + # Make a job id parameter (see job wsdl) jobId = job_client.factory.create('ns0:JobID') jobId.job = job_id @@ -130,8 +138,6 @@ def submit_job(self, scheduler, ad, callback): assert callback
- job_client = self.job_client_pool.get_object() - def my_callback(result): # Turn this back off before we put it back in the pool # so allow_overrides isn't set for someone else... @@ -151,15 +157,11 @@ id = None callback(status, id)
- host = self._get_host(scheduler.Machine, self.job_servers) - if host == "": - self._raise_no_host(scheduler.Machine) + job_client = self._setup_client(self.job_client_pool, + scheduler.Machine, + self.job_servers, + "submitJob")
- service = host + "submitJob" - - # Have to set the URL for the method. This might go away someday... - job_client.set_options(location=service) - # Set basic attributes in the order defined by aviary-job.wsdl. args = list() basic_attrs = ("Cmd", "Args", "Owner", "Iwd", "Submission") @@ -277,8 +279,6 @@ # Aviary doesn't use the file name as does QMF, instead it # specifies the file type and lets condor figure out the path.
- client = self.query_client_pool.get_object() - def my_process_results(result): # Fix up the exception message if necessary result = self._pretty_result(result, job_server.Machine) @@ -295,16 +295,11 @@ data = None return (status, data)
- host = self._get_host(job_server.Machine, self.query_servers) - if host == "": - self._raise_no_host(job_server.Machine) + client = self._setup_client(self.query_client_pool, + job_server.Machine, + self.query_servers, + "getJobData")
- service = host + "getJobData" - - # Have to set the URL for the method. This might go away someday... - #print "Aviary control job " + url - client.set_options(location=service) - # Make a job data parameter (see query wsdl) jobData = client.factory.create('ns0:JobData') jobData.id.job = job_id @@ -345,8 +340,6 @@ default = "default" in kwargs and kwargs["default"] or None timeout = "timeout" in kwargs and kwargs["timeout"] or 5
- client = self.query_client_pool.get_object() - def make_tuple(attr): # Attempt to cast the value into the specified type if attr.type in self.aviary_to_type: @@ -380,16 +373,11 @@ data = None return (status, data)
- host = self._get_host(job_server.Machine, self.query_servers) - if host == "": - self._raise_no_host(job_server.Machine) + client = self._setup_client(self.query_client_pool, + job_server.Machine, + self.query_servers, + "getJobDetails")
- service = host + "getJobDetails" - - # Have to set the URL for the method. This might go away someday... - #print "Aviary control job " + url - client.set_options(location=service) - # Make a job id parameter (see job wsdl) jobId = client.factory.create('ns0:JobID') jobId.job = job_id @@ -406,8 +394,6 @@ def get_job_summaries(self, submission, callback, machine_name): assert callback
- query_client = self.query_client_pool.get_object() - def to_int_seconds(dt): # Change a datetime.datetime into int seconds since epoch # Note, this works nicely if the datetime happens to include microseconds @@ -459,16 +445,12 @@ else: data = {"Jobs": None} callback(status, data) - - host = self._get_host(machine_name, self.query_servers) - if host == "": - self._raise_no_host(machine_name)
- service = host + "getSubmissionSummary" + query_client = self._setup_client(self.query_client_pool, + machine_name, + self.query_servers, + "getSubmissionSummary")
- # Have to set the URL for the method. This might go away someday... - query_client.set_options(location=service) - # What we really want here is the job summaries from the # submission summary response. To get those, we have to # set an extra attribute on the client... @@ -499,23 +481,51 @@
@classmethod def _get_host(cls, name, servers): + scheme = "" host = "" if name in servers: urls = servers[name] if len(urls) > 0: - host = str(random.sample(urls, 1)[0]) + url = random.sample(urls, 1)[0] + scheme = url.scheme + host = str(url) # A particular method name is going to be appended to path, # so ensure the file "/" here. Since we supply default path # values when the host list is parsed, we know the last portion # of the host string has to be a path. if not host.endswith("/"): host += "/" - return host + return scheme, host
@classmethod def _raise_no_host(cls, name): raise Exception("No aviary job servers specified for %s, check config files" % name)
+ def _setup_client(self, client_pool, host_name, host_list, method): + # Get a client object from the client_pool. Use host_name as + # a lookup in the host_list to find an initial URL and append the + # method name. Set up the client to point at that URL and return. + client = client_pool.get_object() + scheme, host = self._get_host(host_name, host_list) + if host == "": + self._raise_no_host(host_name) + # Have to set the URL for the method. This might go away someday... + client.set_options(location=host+method) + # Since we pool the clients and reuse them for different requests + # and since its possible to be using servers with different schemes, + # we have to always reset the transport here. + if scheme == "https": + if not os.path.isfile(self.key): + raise Exception("File not found for aviary-key, check cumin.conf settings") + if not os.path.isfile(self.cert): + raise Exception("File not found for aviary-cert, check cumin.conf settings") + the_transport = HTTPSClientCertTransport(self.key, self.cert) + else: + # this is the default transport when none is specified + the_transport = HttpAuthenticated() + client.set_options(transport=the_transport) + return client + @classmethod def _get_status(cls, result): # For Aviary operations, if the operation @@ -560,19 +570,13 @@ meth_name, callback, default, timeout):
- host = self._get_host(scheduler.Machine, self.job_servers) - if host == "": - self._raise_no_host(scheduler.Machine) + client = self._setup_client(self.job_client_pool, + scheduler.Machine, + self.job_servers, + meth_name)
- service = host + meth_name - - client = self.job_client_pool.get_object() meth = getattr(client.service, meth_name)
- # Have to set the URL for the method. This might go away someday... - #print "Aviary control job " + url - client.set_options(location=service) - # Make a job id parameter (see job wsdl) jobId = client.factory.create('ns0:JobID') jobId.job = job_id
Added: trunk/sage/python/sage/aviary/https.py =================================================================== --- trunk/sage/python/sage/aviary/https.py (rev 0) +++ trunk/sage/python/sage/aviary/https.py 2011-08-30 20:59:44 UTC (rev 4940) @@ -0,0 +1,59 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +# Copyright 2009-2011 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# uses Suds - https://fedorahosted.org/suds/ +import urllib2 as u2 +from suds.transport.http import HttpTransport, Reply, TransportError +import httplib + +class HTTPSClientAuthHandler(u2.HTTPSHandler): + def __init__(self, key, cert): + u2.HTTPSHandler.__init__(self) + self.key = key + self.cert = cert + + def https_open(self, req): + #Rather than pass in a reference to a connection class, we pass in + # a reference to a function which, for all intents and purposes, + # will behave as a constructor + return self.do_open(self.getConnection, req) + + def getConnection(self, host, timeout=300): + return httplib.HTTPSConnection(host, key_file=self.key, cert_file=self.cert) + +class HTTPSClientCertTransport(HttpTransport): + def __init__(self, key, cert, *args, **kwargs): + HttpTransport.__init__(self, *args, **kwargs) + self.key = key + self.cert = cert + + def u2open(self, u2request): + """ + Open a connection. + @param u2request: A urllib2 request. + @type u2request: urllib2.Request. + @return: The opened file-like urllib2 object. + @rtype: fp + """ + tm = self.options.timeout + url = u2.build_opener(HTTPSClientAuthHandler(self.key, self.cert)) + if self.u2ver() < 2.6: + socket.setdefaulttimeout(tm) + return url.open(u2request) + else: + return url.open(u2request, timeout=tm)