Author: tmckay Date: 2012-12-11 21:16:54 +0000 (Tue, 11 Dec 2012) New Revision: 5584
Modified: branches/elephant/cumin/bin/cumin-web branches/elephant/cumin/python/cumin/config.py branches/elephant/cumin/python/cumin/main.py branches/elephant/sage/python/sage/aviary/aviaryoperations.py Log: Add initial stub for Aviary hadoop operations to aviaryoperations.
Modified: branches/elephant/cumin/bin/cumin-web =================================================================== --- branches/elephant/cumin/bin/cumin-web 2012-12-11 20:32:57 UTC (rev 5583) +++ branches/elephant/cumin/bin/cumin-web 2012-12-11 21:16:54 UTC (rev 5584) @@ -21,6 +21,7 @@ def set_aviary_configs(cumin, values): cumin.aviary_job_servers = values.aviary_job_servers cumin.aviary_query_servers = values.aviary_query_servers + cumin.aviary_hadoop_servers = values.aviary_hadoop_servers cumin.aviary_locator = values.aviary_locator cumin.aviary_key = values.aviary_key cumin.aviary_cert = values.aviary_cert
Modified: branches/elephant/cumin/python/cumin/config.py =================================================================== --- branches/elephant/cumin/python/cumin/config.py 2012-12-11 20:32:57 UTC (rev 5583) +++ branches/elephant/cumin/python/cumin/config.py 2012-12-11 21:16:54 UTC (rev 5584) @@ -196,6 +196,9 @@ param = ConfigParameter(self, "aviary-job-servers", str) param.default = "http://localhost:9090"
+ param = ConfigParameter(self, "aviary-hadoop-servers", str) + param.default = "http://localhost:9090" + param = ConfigParameter(self, "aviary-query-servers", str) param.default = "http://localhost:9091"
Modified: branches/elephant/cumin/python/cumin/main.py =================================================================== --- branches/elephant/cumin/python/cumin/main.py 2012-12-11 20:32:57 UTC (rev 5583) +++ branches/elephant/cumin/python/cumin/main.py 2012-12-11 21:16:54 UTC (rev 5584) @@ -104,6 +104,7 @@ # Aviary operations for that server type will not be used. self.aviary_job_servers = "" self.aviary_query_servers = "" + self.aviary_hadoop_servers = "" self.aviary_key = "" self.aviary_cert = "" self.aviary_root_cert = "" @@ -202,45 +203,53 @@ # given op... self.remote = Catalog() ops = [QmfOperations("qmf", self.session)] + try: + from sage.aviary.aviaryoperations \ + import SudsLogging, AviaryOperationsFactory + imports_ok = True + except: + imports_ok = False
- imports_ok = True - if self.aviary_job_servers or self.aviary_query_servers: - try: - from sage.aviary.aviaryoperations import \ - SudsLogging, AviaryOperationsFactory - except: - imports_ok = False - if imports_ok: - SudsLogging.set(self.aviary_suds_logs, self.home) + # Turn off hadoop operations config if the module isn't loaded + hadoop_loaded = "gridhadoop" in self.authorizator.get_enabled_modules() + if not hadoop_loaded: + self.aviary_hadoop_servers = ""
- # By default Cumin uses /var/lib/condor/aviary/services for - # aviary wsdl files if it exists. - aviary_dir = ["/var/lib/condor/aviary/services", - os.path.join(self.home, "rpc-defs/aviary")] - if not self.aviary_prefer_condor: - aviary_dir = [aviary_dir[1], aviary_dir[0]] + if imports_ok: + SudsLogging.set(self.aviary_suds_logs, self.home)
- # The factory will choose an impl that gives us jobs, queries, - # or both depending on whether job_servers and query_servers - # are empty strings. If locator is non empty, their actual - # values will be overridden but the presence of a value will - # still control enable/disable. - aviary_itf = AviaryOperationsFactory("aviary", aviary_dir, - self.aviary_locator, - self.aviary_job_servers, - self.aviary_query_servers, - key=self.aviary_key, - cert=self.aviary_cert, - root_cert=self.aviary_root_cert, - domain_verify=self.aviary_domain_verify) + # By default Cumin uses /var/lib/condor/aviary/services for + # aviary wsdl files if it exists. + aviary_dir = ["/var/lib/condor/aviary/services", + os.path.join(self.home, "rpc-defs/aviary")] + if not self.aviary_prefer_condor: + aviary_dir = [aviary_dir[1], aviary_dir[0]] + + # The factory will choose an impls that give us services + # based on the values of the 'X_servers' arguments. + # Empty/non-empty for a 'X_servers' value controls enable/disable. + (aviary_itf, + hadoop_itf) = AviaryOperationsFactory("aviary", aviary_dir, + self.aviary_locator, + self.aviary_job_servers, + self.aviary_query_servers, + self.aviary_hadoop_servers, + key=self.aviary_key, + cert=self.aviary_cert, + root_cert=self.aviary_root_cert, + domain_verify=self.aviary_domain_verify) + if hadoop_itf: + ops.insert(0, hadoop_itf) + if aviary_itf: ops.insert(0, aviary_itf) - else: - log.info("Imports failed for Aviary interface, disabling") + else: + log.info("Imports failed for Aviary interface, disabling")
log.info("%s Aviary locator interface" % \ ((self.aviary_locator and \ (self.aviary_job_servers or \ - self.aviary_query_servers) and \ + self.aviary_query_servers or + self.aviary_hadoop_servers) and \ imports_ok) and "Enabled" or "Disabled"))
log.info("%s Aviary interface for job submission and control." % \ @@ -249,6 +258,10 @@ log.info("%s Aviary interface for query operations." % \ ((self.aviary_query_servers and imports_ok) and "Enabled" or "Disabled"))
+ if hadoop_loaded: + log.info("%s Aviary interface for hadoop operations." % \ + ((self.aviary_hadoop_servers and imports_ok) and "Enabled" or "Disabled")) + self.remote.add_mechanisms(ops)
# Create RPC interface for Wallaby
Modified: branches/elephant/sage/python/sage/aviary/aviaryoperations.py =================================================================== --- branches/elephant/sage/python/sage/aviary/aviaryoperations.py 2012-12-11 20:32:57 UTC (rev 5583) +++ branches/elephant/sage/python/sage/aviary/aviaryoperations.py 2012-12-11 21:16:54 UTC (rev 5584) @@ -632,7 +632,24 @@ query_client, "getSubmissionSummary", subId) t.start()
+class _AviaryHadoopMethods(object):
+ # Do this here rather than __init__ so we don't have to worry about + # matching parameter lists in multiple inheritance cases with super + def init(self, datadir, hadoop_servers): + + if self.locator: + self.hadoop_servers = ServerList(self.locator, "SCHEDULER", "HADOOP") + else: + self.hadoop_servers = FixedServerList(hadoop_servers, + "9090", + "/services/hadoop/", + "HADOOP") + + hadoop_wsdl = "file:" + os.path.join(get_datadir(datadir, "hadoop"), + "aviary-hadoop.wsdl") + self.hadoop_client_pool = ClientPool(hadoop_wsdl, None) + class _AviaryCommon(object): def __init__(self, name, locator, key="", cert="", root_cert="", domain_verify=True): @@ -783,12 +800,25 @@
_AviaryQueryMethods.init(self, datadir, query_servers)
+class AviaryHadoopOperations(_AviaryCommon, _AviaryHadoopMethods): + def __init__(self, name, datadir, locator, hadoop_servers, + key="", cert="", root_cert="", domain_verify=True): + + super(AviaryHadoopOperations, self).__init__(name, locator, + key, cert, root_cert, + domain_verify) + + _AviaryHadoopMethods.init(self, datadir, hadoop_servers) + def AviaryOperationsFactory(name, datadir, locator_uri, - job_servers, query_servers, + job_servers, query_servers, hadoop_servers, key="", cert="", root_cert="", domain_verify=True):
# If locator uri has not been specified, it's disabled and we will - # use the specified job_servers and query_servers values + # use the specified job_servers and query_servers values. + # These operations are historically selectable because originally + # they were an alternative implementation to QMF methods. + # At some point in the future this may change. if locator_uri: locator = AviaryLocator(datadir, locator_uri, key, cert, root_cert, domain_verify) @@ -796,14 +826,23 @@ locator = None
if job_servers and query_servers: - res = AviaryOperations(name, datadir, locator, + ops = AviaryOperations(name, datadir, locator, job_servers, query_servers, key, cert, root_cert, domain_verify) elif job_servers: - res = AviaryJobOperations(name, datadir, locator,job_servers, + ops = AviaryJobOperations(name, datadir, locator,job_servers, key, cert, root_cert, domain_verify) elif query_servers: - res = AviaryQueryOperations(name, datadir, locator,query_servers, + ops = AviaryQueryOperations(name, datadir, locator,query_servers, key, cert, root_cert, domain_verify) - return res + else: + ops = None
+ if hadoop_servers: + hadoop = AviaryHadoopOperations(name, datadir, locator, hadoop_servers, + key, cert, root_cert, domain_verify) + else: + hadoop = None + + return ops, hadoop +
cumin-developers@lists.fedorahosted.org