Author: croberts Date: 2012-07-05 19:28:00 +0000 (Thu, 05 Jul 2012) New Revision: 5427
Modified: trunk/cumin/model/plumage/plumage.xml trunk/cumin/model/plumage/rosemary.xml trunk/mint/python/mint/plumage/session.py Log: Adding code to support loading the plumage Accountant samples data into the cumin database.
Modified: trunk/cumin/model/plumage/plumage.xml =================================================================== --- trunk/cumin/model/plumage/plumage.xml 2012-07-03 18:23:06 UTC (rev 5426) +++ trunk/cumin/model/plumage/plumage.xml 2012-07-05 19:28:00 UTC (rev 5427) @@ -17,4 +17,18 @@ <statistic name="availcpu" type="uint32" uint="uint32" /> <statistic name="totalcpu" type="uint32" uint="uint32" /> </class> + + <class name="Accountant"> + <property name="host" type="sstr" /> + <statistic name="user" type="sstr" /> + <statistic name="agroup" type="sstr" optional="y" /> + <statistic name="ts" type="absTime" unit="uint32" /> + <statistic name="prio" type="float" optional="y" /> + <statistic name="wresused" type="uint32" uint="uint32" optional="y" /> + <statistic name="resused" type="uint32" uint="uint32" optional="y" /> + <statistic name="cquota" type="float" optional="y" /> + <statistic name="equota" type="float" optional="y" /> + <statistic name="squota" type="float" optional="y" /> + <statistic name="wausage" type="uint32" uint="uint32" optional="y" /> + </class> </schema> \ No newline at end of file
Modified: trunk/cumin/model/plumage/rosemary.xml =================================================================== --- trunk/cumin/model/plumage/rosemary.xml 2012-07-03 18:23:06 UTC (rev 5426) +++ trunk/cumin/model/plumage/rosemary.xml 2012-07-05 19:28:00 UTC (rev 5427) @@ -68,8 +68,51 @@ <statistic name="totalcpu"> <title>Total</title> </statistic> + </class> + <class name="Accountant"> + <loading_class> + <name>AccountantLoader</name> + </loading_class> + + <source> + <database>condor_stats</database> + <collection>samples.accountant</collection> + </source> + + <statistic name="user" /> + <statistic name="ts" timestamp="y" /> + + <statistic name="agroup"> + <title>Accounting group</title> + </statistic> + + <statistic name="prio"> + <title>Priority</title> + </statistic>
- </class> + <statistic name="wresused"> + <title>Weighted resources used</title> + </statistic> + + <statistic name="resused"> + <title>Resources used</title> + </statistic> + + <statistic name="cquota"> + <title>Configured quota</title> + </statistic> + + <statistic name="equota"> + <title>Effective quota</title> + </statistic> + + <statistic name="squota"> + <title>Subtree quota</title> + </statistic> + + <statistic name="wausage"> + <title>Weighted accumulated usage</title> + </statistic> + </class> </package> - </model>
Modified: trunk/mint/python/mint/plumage/session.py =================================================================== --- trunk/mint/python/mint/plumage/session.py 2012-07-03 18:23:06 UTC (rev 5426) +++ trunk/mint/python/mint/plumage/session.py 2012-07-05 19:28:00 UTC (rev 5427) @@ -43,7 +43,7 @@ """)
-class OSUtil(object): +class RecordObject(object): pass
class PlumageSession(object): @@ -114,22 +114,17 @@ class ClassLoaders(object): ''' method for loading the com.redhat.grid.plumage.OSUtil data from plumage, name is found in rosemary.xml under package/class/loading_class ''' def OSUtilLoader(self, obj, cls): - obj.threads.append(CatchUpPlumageSessionThread( - obj.app, - obj.server_host, - obj.server_port, - cls)) - obj.threads.append(PlumageSessionThread( - obj.app, - obj.server_host, - obj.server_port, - cls)) - obj.threads.append(CurrentPlumageSessionThread( - obj.app, - obj.server_host, - obj.server_port, - cls)) + obj.threads.append(CatchUpPlumageOSUtilSessionThread(obj.app, obj.server_host, obj.server_port, cls)) + obj.threads.append(PlumageOSUtilSessionThread(obj.app, obj.server_host, obj.server_port, cls)) + obj.threads.append(CurrentPlumageOSUtilSessionThread(obj.app, obj.server_host, obj.server_port, cls))
+ ''' method for loading the com.redhat.grid.plumage.Accountant data from plumage, name is found in rosemary.xml under package/class/loading_class ''' + def AccountantLoader(self, obj, cls): + log.debug("AccountLoader called") + obj.threads.append(CatchupPlumageAccountantSessionThread(obj.app, obj.server_host, obj.server_port, cls)) + obj.threads.append(PlumageAccountantSessionThread(obj.app, obj.server_host, obj.server_port, cls)) + obj.threads.append(CurrentPlumageAccountantSessionThread(obj.app, obj.server_host, obj.server_port, cls)) + class PlumageSessionThread(MintDaemonThread): def __init__(self, app, server_host, server_port, cls): super(PlumageSessionThread, self).__init__(app) @@ -174,9 +169,139 @@ sleep(1) if self.stop_requested: break +
+class PlumageAccountantSessionThread(PlumageSessionThread): + def fillAccountantStats(self, time, temptable, name): + record = RecordObject() + record.host= "%s:%s" % (self.connection.host, self.connection.port) + record.user = name + record.ts = time - UTC_DIFF + + userentry = self.collection.find({'ts':time, 'n':name})[0] + record.agroup = self.getEntry(userentry, "ag") + record.prio = self.getEntry(userentry, "prio") + record.wresused = self.getEntry(userentry, "wru") + record.resused = self.getEntry(userentry, "ru") + record.cquota = self.getEntry(userentry, "cq") + record.equota = self.getEntry(userentry, "eq") + record.squota = self.getEntry(userentry, "sq") + record.wausage = self.getEntry(userentry, "au") + + return record + + def getEntry(self, item, value): + result = None + try: + result = item[value] + except: + result = None + return result + + def run(self): + while True: + self._check_connection() + if self.stop_requested: + return + + try: + # Make sure we have a database + self._init() + + # We create objects here. Tag them with the right class, + # probably specified to us from a config option (with a corresponding + # query specification in the xml) + (oldest, newest) = self.app.update_thread.get_first_and_last_sample_timestamp(self.cls) + if oldest is None: + # if we have no oldest record (first run), start at "10 min ago" and start loading everything + oldest = datetime.now() - timedelta(seconds=600) + oldest = oldest + UTC_DIFF + + log.info("PlumageAccountantSessionThread--history: Loading records older than %s" % oldest) + times = sorted(self.collection.find({"ts": {'$lt': oldest}}).distinct('ts'), reverse=True) + sample_times = map(lambda i: times[i],filter(lambda i: i%5 == 0,range(len(times)))) + + for time in sample_times: + names = self.collection.find({'ts':time}).distinct('n') + for name in names: + record = self.fillAccountantStats(time, "history", name) + obj = ObjectUpdate(self.app.model, record, self.cls) + self.app.update_thread.enqueue(obj) + + log.info("PlumageAccountantSessionThread--history: run completed") + + except Exception, e: + log.info("%s got exception %s, exiting" % (self.__class__.__name__, str(e))) + #wake up once a day just in case there has been additional historical items added + sleep(86400) + +class CurrentPlumageAccountantSessionThread(PlumageAccountantSessionThread): + def run(self): + while True: + self._check_connection() + if self.stop_requested: + break + + # Make sure we have a db + try: + self._init() + + (oldest, newest) = self.app.update_thread.get_first_and_last_sample_timestamp(self.cls) + if newest is not None: + most_recent = max((datetime.now() - timedelta(seconds=600) + UTC_DIFF), (newest + UTC_DIFF)) + else: + most_recent = datetime.now() - timedelta(seconds=600) + UTC_DIFF + + log.info("CurrentPlumageAccountantThread--current: Loading records newer than %s" % most_recent) + + times = sorted(self.collection.find({"ts": {'$gt': most_recent}}).distinct('ts'), reverse=True) + sample_times = map(lambda i: times[i],filter(lambda i: i%5 == 0,range(len(times)))) + + for time in sample_times: + names = self.collection.find({'ts':time}).distinct('n') + for name in names: + record = self.fillAccountantStats(time, "current", name) + obj = ObjectUpdate(self.app.model, record, self.cls) + self.app.update_thread.enqueue(obj) + + log.info("CurrentPlumageAccountantThread--current: pass completed") + except Exception, e: + log.info("%s got exception %s, sleeping" % (self.__class__.__name__, str(e))) + sleep(600) + +class CatchupPlumageAccountantSessionThread(PlumageAccountantSessionThread): + def run(self): + self._check_connection() + if self.stop_requested: + return + + try: + # Make sure we have a db + self._init() + + (oldest, newest) = self.app.update_thread.get_first_and_last_sample_timestamp(self.cls) + if newest is not None: + log.info("CatchupPlumageAccountantThread: Starting for records newer than %s" % newest) + + times = sorted(self.collection.find({"ts": {'$gt': newest + UTC_DIFF, '$lt': datetime.now() - timedelta(seconds=600) + UTC_DIFF}}).distinct('ts'), reverse=True) + sample_times = map(lambda i: times[i],filter(lambda i: i%5 == 0,range(len(times)))) + + for time in sample_times: + names = self.collection.find({'ts':time}).distinct('n') + for name in names: + record = self.fillAccountantStats(time, "catchup", name) + obj = ObjectUpdate(self.app.model, record, self.cls) + self.app.update_thread.enqueue(obj) + + log.info("CatchupPlumageAccountantThread--catch-up: catch-up run completed for records newer than %s and older than %s" % (newest, datetime.now() - timedelta(seconds=300) + UTC_DIFF)) + else: + log.info("CatchUpPlumageSessionThread: Skipping catch-up, no records present (probably first-run)") + except Exception, e: + log.info("%s got exception %s, exiting" % (self.__class__.__name__, str(e))) + +class PlumageOSUtilSessionThread(PlumageSessionThread): def fillOSUtilStats(self, time, temptable): - record = OSUtil() + record = RecordObject() record.host= "%s:%s" % (self.connection.host, self.connection.port) record.total = len(self.collection.find({'ts':time}).distinct('n')) record.used = len(self.collection.find({'ts':time,'st':{'$nin':['Unclaimed','Owner']}}).distinct('n')) @@ -213,9 +338,7 @@ except: return 0 return itemcount -
- def run(self): while True: self._check_connection() @@ -252,7 +375,7 @@ #wake up once a day just in case there has been additional historical items added sleep(86400)
-class CurrentPlumageSessionThread(PlumageSessionThread): +class CurrentPlumageOSUtilSessionThread(PlumageOSUtilSessionThread): def run(self): while True: self._check_connection() @@ -290,7 +413,7 @@ database and then make a pass to load all records from that time forward (up to 5 min ago...those get picked-up by the currency thread. ''' -class CatchUpPlumageSessionThread(PlumageSessionThread): +class CatchUpPlumageOSUtilSessionThread(PlumageOSUtilSessionThread): def run(self): self._check_connection() if self.stop_requested:
cumin-developers@lists.fedorahosted.org