Author: tmckay
Date: 2010-12-21 20:15:51 +0000 (Tue, 21 Dec 2010)
New Revision: 4443
Modified:
branches/scale_testing/cumin/bin/cumin-data
branches/scale_testing/cumin/python/cumin/config.py
branches/scale_testing/mint/python/mint/main.py
branches/scale_testing/mint/python/mint/session.py
branches/scale_testing/mint/python/mint/update.py
Log:
Changes to collect stats in CSV files for scale testing.
Modified: branches/scale_testing/cumin/bin/cumin-data
===================================================================
--- branches/scale_testing/cumin/bin/cumin-data 2010-12-20 14:12:38 UTC (rev 4442)
+++ branches/scale_testing/cumin/bin/cumin-data 2010-12-21 20:15:51 UTC (rev 4443)
@@ -16,6 +16,8 @@
parser = CuminOptionParser(values)
parser.add_option("--print-stats", action="store_true")
+ parser.add_option("--scale-stats", action="store_true")
+ parser.add_option("--object-stats", action="store_true")
parser.add_option("--print-events", type="int", default=0,
metavar="LEVEL")
opts, args = parser.parse_args()
@@ -26,7 +28,7 @@
broker_uris = [x.strip() for x in opts.brokers.split(",")]
- mint = Mint(model_dir, broker_uris, opts.database)
+ mint = Mint(model_dir, broker_uris, opts.database, values.queue_size)
mint.print_event_level = opts.print_events
@@ -65,19 +67,40 @@
if opts.print_stats:
print "[Starred columns are the number of events per second]"
+ # set up for scale stats here
+ if opts.scale_stats:
+ stats.init_stats_path(mint, config.home)
+
+ if opts.print_stats or \
+ opts.scale_stats or \
+ opts.object_stats:
while True:
- if count % 20 == 0:
- stats.print_headings()
+ stats.capture()
+ if opts.print_stats:
+ if count % 20 == 0:
+ stats.print_headings()
+ count += 1
+ stats.print_values()
+
+ if opts.object_stats:
+ stats.print_values_by_class(mint)
- count += 1
+ if opts.scale_stats:
+ stats.write_values_by_class(mint, time.localtime())
+ stats.write_values(time.localtime())
- stats.print_values()
-
sleep(5)
else:
while True:
sleep(86400)
+ except Exception as inst:
+ print inst
+
finally:
+ # close the scale stats CSV file here:
+ if opts.scale_stats:
+ pass
+
mint.stop()
if __name__ == "__main__":
Modified: branches/scale_testing/cumin/python/cumin/config.py
===================================================================
--- branches/scale_testing/cumin/python/cumin/config.py 2010-12-20 14:12:38 UTC (rev
4442)
+++ branches/scale_testing/cumin/python/cumin/config.py 2010-12-21 20:15:51 UTC (rev
4443)
@@ -56,6 +56,9 @@
param = ConfigParameter(data, "vacuum-interval", int)
param.default = 60 * 60 # 1 hour
+ param = ConfigParameter(data, "queue-size", int)
+ param.default = 1000
+
def parse(self):
paths = list()
Modified: branches/scale_testing/mint/python/mint/main.py
===================================================================
--- branches/scale_testing/mint/python/mint/main.py 2010-12-20 14:12:38 UTC (rev 4442)
+++ branches/scale_testing/mint/python/mint/main.py 2010-12-21 20:15:51 UTC (rev 4443)
@@ -10,14 +10,14 @@
log = logging.getLogger("mint.main")
class Mint(object):
- def __init__(self, model_dir, broker_uris, database_dsn):
+ def __init__(self, model_dir, broker_uris, database_dsn, queue_max=1000):
self.model = MintModel(self, model_dir)
self.model.sql_logging_enabled = False
self.session = MintSession(self, broker_uris)
self.database = MintDatabase(self, database_dsn)
- self.update_thread = UpdateThread(self)
+ self.update_thread = UpdateThread(self,queue_max)
self.expire_enabled = True
self.expire_thread = ExpireThread(self)
Modified: branches/scale_testing/mint/python/mint/session.py
===================================================================
--- branches/scale_testing/mint/python/mint/session.py 2010-12-20 14:12:38 UTC (rev 4442)
+++ branches/scale_testing/mint/python/mint/session.py 2010-12-21 20:15:51 UTC (rev 4443)
@@ -64,14 +64,17 @@
def brokerConnected(self, qmf_broker):
message = "Broker %s:%i is connected"
self.model.print_event(1, message, qmf_broker.host, qmf_broker.port)
+ self.model.app.update_thread.ignore()
def brokerInfo(self, qmf_broker):
message = "Broker info from %s:%i"
self.model.print_event(1, message, qmf_broker.host, qmf_broker.port)
+ self.model.app.update_thread.ignore()
def brokerDisconnected(self, qmf_broker):
message = "Broker %s:%i is disconnected"
self.model.print_event(1, message, qmf_broker.host, qmf_broker.port)
+ self.model.app.update_thread.ignore()
def newAgent(self, qmf_agent):
self.model.print_event(3, "Creating %s", qmf_agent)
@@ -94,9 +97,11 @@
def newPackage(self, name):
self.model.print_event(2, "New package %s", name)
+ self.model.app.update_thread.ignore()
def newClass(self, kind, classKey):
self.model.print_event(2, "New class %s", classKey)
+ self.model.app.update_thread.ignore()
def objectProps(self, broker, qmf_object):
up = ObjectUpdate(self.model, qmf_object)
@@ -108,6 +113,7 @@
def event(self, broker, event):
self.model.print_event(4, "New event %s from %s", broker, event)
+ self.model.app.update_thread.ignore()
def methodResponse(self, broker, seq, response):
message = "Method response for request %i received from %s"
@@ -121,3 +127,4 @@
callback(response.text, response.outArgs)
finally:
self.model.lock.release()
+ self.model.app.update_thread.method_response()
Modified: branches/scale_testing/mint/python/mint/update.py
===================================================================
--- branches/scale_testing/mint/python/mint/update.py 2010-12-20 14:12:38 UTC (rev 4442)
+++ branches/scale_testing/mint/python/mint/update.py 2010-12-21 20:15:51 UTC (rev 4443)
@@ -1,6 +1,8 @@
import copy
import resource
import pickle
+import os.path
+import csv
from psycopg2 import IntegrityError, TimestampFromTicks
from psycopg2.extensions import cursor as Cursor
@@ -15,10 +17,10 @@
sample_window_max = 60 * 5
class UpdateThread(MintDaemonThread):
- def __init__(self, app):
+ def __init__(self, app, queue_max=1000):
super(UpdateThread, self).__init__(app)
- self.updates = ConcurrentQueue(maxsize=1000)
+ self.updates = ConcurrentQueue(maxsize=queue_max)
self.stats = UpdateStats(self.app)
self.conn = None
@@ -33,24 +35,37 @@
self.cursor.stats = self.stats
def enqueue(self, update):
+ update.enqueue_time = time.time()
+
self.updates.put(update)
-
+
self.stats.enqueued += 1
+ def ignore(self):
+ self.stats.ignore += 1
+
+ def method_response(self):
+ self.stats.method_response +=1
+
def run(self):
- while True:
- if self.stop_requested:
- break
+ try:
+ while True:
+ if self.stop_requested:
+ break
- try:
- update = self.updates.get(True, 1)
- except Empty:
- continue
+ try:
+ update = self.updates.get(True) #, 1)
+ except Empty:
+ continue
+ update.dequeue_time = time.time()
+ self.stats.dequeued += 1
- self.stats.dequeued += 1
+ update.process(self)
+ self.conn.commit()
+
+ except:
+ print "uh oh"
- update.process(self)
-
class UpdateStats(object):
group_names = ("Updates", "Agents", "Objects")
groups = "%43s | %32s | %32s |" % group_names
@@ -60,6 +75,7 @@
"*Created", "*Updated", "*Deleted",
"*Created", "*Updated", "*Deleted",
"*Sql Ops", "Errors", "Cpu (%)", "Mem
(M)")
+
headings_fmt = \
"%10s %10s %10s %10s | " + \
"%10s %10s %10s | " + \
@@ -67,7 +83,6 @@
"%10s %10s %10s %10s"
headings = headings_fmt % heading_names
-
values_fmt = \
"%10i %10.1f %10.1f %10.1f | " + \
"%10.1f %10.1f %10.1f | " + \
@@ -81,6 +96,8 @@
self.enqueued = 0
self.dequeued = 0
self.dropped = 0
+ self.ignore = 0
+ self.method_response = 0
self.agents_created = 0
self.agents_updated = 0
@@ -93,6 +110,7 @@
self.objects_created_by_class = defaultdict(int)
self.objects_updated_by_class = defaultdict(int)
self.objects_deleted_by_class = defaultdict(int)
+ self.objects_dropped_by_class = defaultdict(int)
self.sql_ops = 0
self.errors = 0
@@ -101,11 +119,41 @@
self.cpu = 0
self.memory = 0
+ self.init_update_times()
+
+ def init_update_times(self):
+ # Track average time of update from arrival to
+ # processing complete. Refreshed per interval.
+ self.update_queue_duration = 0
+ self.update_process_duration = 0
+ self.queue_time = 0
+ self.initial_process_time = 0
+ self.commit_time = 0
+ self.record_samples = 0
+ self.init_update_flag = False
+
+ def record_update_times(self,update):
+ try:
+ if self.init_update_flag:
+ self.init_update_times()
+ self.record_samples += 1
+ self.update_queue_duration += \
+ update.start_process - update.create_time
+ self.update_process_duration += \
+ update.finish_process - update.start_process
+ self.queue_time += \
+ update.dequeue_time - update.enqueue_time
+ self.initial_process_time += \
+ update.after_process - update.start_process
+ self.commit_time += \
+ update.after_commit - update.after_process
+ except:
+ print "record update exception"
+
def capture(self):
now = copy.copy(self)
now.time = time.time()
-
rusage = resource.getrusage(resource.RUSAGE_SELF)
now.cpu = rusage[0] + rusage[1]
@@ -113,6 +161,7 @@
UpdateStats.then = UpdateStats.now
UpdateStats.now = now
+ self.init_update_flag = True
def get_resident_pages(self):
try:
@@ -126,12 +175,7 @@
print self.groups
print self.headings
- def print_values(self):
- self.capture()
-
- if not self.then:
- return
-
+ def _gather_values(self):
values = [self.now.enqueued - self.then.enqueued,
self.now.dequeued - self.then.dequeued,
self.now.dropped - self.then.dropped,
@@ -153,23 +197,118 @@
values.append(self.errors)
values.append(int((self.now.cpu - self.then.cpu) / secs * 100))
values.append(self.now.memory / 1000000.0)
+ return values
- print self.values_fmt % tuple(values)
+ def print_values(self):
+# moved capture call to cumin-data so that write_values can
+# be run as well
+# self.capture()
+ if not self.then:
+ return
- def print_values_by_class(self):
- names = ("Class", "Created", "Updated",
"Deleted")
- print "%20s %10s %10s %10s" % names
+ print self.values_fmt % tuple(self._gather_values())
+ def print_values_by_class(self,mint):
+ names = ("Class", "Created", "Updated",
"Deleted", "Dropped")
+ print "%20s %10s %10s %10s %10s" % names
+
for pkg in mint.model._packages:
for cls in pkg._classes:
- created = stats.created_by_class[cls]
- updated = stats.updated_by_class[cls]
- deleted = stats.deleted_by_class[cls]
+ created = self.objects_created_by_class[cls]
+ updated = self.objects_updated_by_class[cls]
+ deleted = self.objects_deleted_by_class[cls]
+ dropped = self.objects_dropped_by_class[cls]
+ if created or updated or deleted or dropped:
+ args = (cls._name, created, updated, deleted, dropped)
+ print "%-20s %10i %10i %10i %10i" % args
+
+ def init_stats_path(self, mint, path):
+ p = os.path.join(path, \
+ "stats_"+
time.strftime("%m%d%y_%H%M%S",time.localtime()))
+
+ if not os.path.exists(p):
+ os.makedirs(p)
+
+ # init the path for general stats
+ self.stats_path = os.path.join(p, "stats.csv")
+
+ # init the paths for stats by class
+ self.class_stats_paths = {}
+ for pkg in mint.model._packages:
+ for cls in pkg._classes:
+ self.class_stats_paths[cls._name] = \
+ os.path.join(p, cls._name+".csv")
+
+ def write_values_by_class(self, mint, localtime_):
+ names = ("Time", "Created", "Updated",
"Deleted", "Dropped")
+
+ for pkg in mint.model._packages:
+ for cls in pkg._classes:
+ created = self.objects_created_by_class[cls]
+ updated = self.objects_updated_by_class[cls]
+ deleted = self.objects_deleted_by_class[cls]
+ dropped = self.objects_dropped_by_class[cls]
+
if created or updated or deleted:
- args = (cls._name, created, updated, deleted)
- print "%-20s %10i %10i %10i" % args
-
+ fname = self.class_stats_paths[cls._name]
+ write_header = not os.path.isfile(fname)
+ db = csv.writer(open(fname, "a+"))
+ if write_header:
+ db.writerow(names)
+ db.writerow((time.strftime("%H:%M:%S",localtime_),
+ created, updated, deleted, dropped))
+
+ def get_update_durations(self):
+ total = self.now.record_samples
+ if total == 0:
+ create_dur = proc_dur = que_dur = init_proc_time = commit_time = 0
+ else:
+ create_dur = self.now.update_queue_duration / total
+ proc_dur = self.now.update_process_duration / total
+ que_dur = self.now.queue_time / total
+ init_proc_time = self.now.initial_process_time / total
+ commit_time = self.now.commit_time / total
+ return (create_dur, proc_dur, que_dur, init_proc_time, commit_time)
+
+
+ def write_values(self, localtime_):
+ stats_headings = \
+ ("Time", "Total Msgs",
+ "Depth", "*Avg Que", "*Avg Create", "*Avg
Proc",
+ "*Init Proc", "*Commit",
+ "*Enqueued", "*Dequeued", "*Dropped",
+ "*Create Agent", "*Update Agent", "*Delete
Agent",
+ "*Create", "*Update", "*Delete",
+ "*Sql Ops", "Errors", "Cpu (%)", "Mem
(M)")
+
+ if not self.then:
+ return
+ try:
+ values = self._gather_values()
+ values.insert(0, time.strftime("%H:%M:%S",localtime_))
+ values.insert(1, self.now.enqueued)
+
+ # include object update times
+ avg_create_dur,avg_process_dur,avg_que_dur, \
+ avg_init_proc,avg_commit = self.get_update_durations()
+
+ values.insert(3, avg_que_dur)
+ values.insert(4, avg_create_dur)
+ values.insert(5, avg_process_dur)
+ values.insert(6, avg_init_proc)
+ values.insert(7, avg_commit)
+
+ write_header = not os.path.isfile(self.stats_path)
+ db = csv.writer(open(self.stats_path, "a+"))
+ if write_header:
+ db.writerow(stats_headings)
+ db.writerow(values)
+
+ except Exception as inst:
+ print "exception in write values"
+ print inst
+
class UpdateCursor(Cursor):
def execute(self, sql, args=None):
super(UpdateCursor, self).execute(sql, args)
@@ -179,35 +318,50 @@
class Update(object):
def __init__(self, model):
self.model = model
+ self.create_time = time.time()
def process(self, thread):
+ self.start_process = time.time()
log.debug("Processing %s", self)
try:
self.do_process(thread.cursor, thread.stats)
-
+ self.after_process = time.time()
thread.conn.commit()
+ self.after_commit = time.time()
except UpdateDropped:
log.debug("Update dropped")
thread.conn.rollback()
thread.stats.dropped += 1
+ try:
+ cls = self.get_class()
+ except:
+ cls = None
+ if cls != None:
+ thread.stats.objects_dropped_by_class[cls] += 1
+ self.after_commit = time.time()
+
except:
log.exception("Update failed")
thread.conn.rollback()
-
thread.stats.errors += 1
- #print_exc()
+ print_exc()
if thread.halt_on_error:
raise
+ self.finish_process = time.time()
+ thread.stats.record_update_times(self)
def do_process(self, cursor, stats):
raise Exception("Not implemented")
+ def get_class(self):
+ return None
+
def __repr__(self):
return self.__class__.__name__
@@ -346,7 +500,7 @@
self.model.print_event(3, "Created %s", obj)
stats.objects_created += 1
- #stats.objects_created_by_class[cls] += 1
+ stats.objects_created_by_class[cls] += 1
return obj
@@ -380,7 +534,9 @@
# force a write if it's been too long, even if the values match
if object_columns \
- or obj._save_time < obj._qmf_update_time - timedelta(hours=1):
+ or (obj._save_time != None and \
+ obj._qmf_update_time != None and \
+ obj._save_time < obj._qmf_update_time - timedelta(hours=1)):
object_columns.append(cls.sql_table._qmf_update_time)
sql = cls.sql_update_object.emit(object_columns)
@@ -405,7 +561,7 @@
self.model.print_event(4, "Updated %s", obj)
stats.objects_updated += 1
- #stats.objects_updated_by_class[cls] += 1
+ stats.objects_updated_by_class[cls] += 1
def delete_object(self, cursor, stats, obj):
obj.delete(cursor)
@@ -413,7 +569,7 @@
self.model.print_event(3, "Deleted %s", obj)
stats.objects_deleted += 1
- #stats.objects_deleted_by_class[obj._class] += 1
+ stats.objects_deleted_by_class[obj._class] += 1
def process_properties(self, obj, columns, cursor):
cls = obj._class
@@ -605,7 +761,7 @@
count = cls.delete_selection(cursor, _qmf_agent_id=agent.id)
stats.objects_deleted += count
- #stats.objects_deleted_by_class[cls] += count
+ stats.objects_deleted_by_class[cls] += count
cursor.connection.commit()