Author: tmckay Date: 2011-03-03 15:20:16 +0000 (Thu, 03 Mar 2011) New Revision: 4570
Modified: trunk/cumin/bin/cumin trunk/cumin/bin/cumin-data trunk/cumin/bin/cumin-web trunk/cumin/python/cumin/config.py trunk/cumin/python/cumin/main.py trunk/cumin/python/cumin/util.py trunk/parsley/python/parsley/loggingex.py trunk/wooly/python/wooly/server.py Log: Set up an internal thread and pipes to funnel all IO to files in CUMIN_HOME/log with rollover control. This is catchall for reporting unhandled exceptions while running as a service. BZ680265 (further refinement)
Modified: trunk/cumin/bin/cumin =================================================================== --- trunk/cumin/bin/cumin 2011-03-03 14:34:57 UTC (rev 4569) +++ trunk/cumin/bin/cumin 2011-03-03 15:20:16 UTC (rev 4570) @@ -27,25 +27,26 @@ signal.signal(signal.SIGTERM, func)
def log_parse_errors(r): - msg = "" - r = os.fdopen(r, "r") - while True: - t = r.read() - if t == "": - break - msg += t - r.close() + msg = os.fdopen(r, "r").readlines() if len(msg) > 0: - msg = "\n"+msg - log.error("Error in options"+msg) + log.error("".join(msg)) return True
+def get_args(app, section, init_only, console): + args = [app, "--section="+section] + if init_only: + args.append("--init-only") + if not console: + args.append("--daemon") + prog_string = "".join([" "+x for x in args]) + return args, prog_string + def main():
# tuple indices, for clarity PROCESS = 0 - SECTION = 1 - PROGRAM = 2 + ARGS = 1 + PROG_STRING = 2
parser = OptionParser()
@@ -61,13 +62,8 @@ "\nEach value implies a separate cumin-data instance.")
parser.add_option("--console", dest="console", action="store_true", default=False, - help="Log to stderr rather than master.log, no other IO redirection.") + help="Log to stderr rather than master.log, no IO redirection for children.")
- parser.add_option("--devel", dest="devel", action="store_true", default=False, - help="Option is ignored if --console is set."\ - "\nRedirect stderr and stdout to files in "\ - "$CUMIN_HOME/log.") - # Trap exit from parser and save standard error for logging # Then put stderr back to original value r, w = os.pipe() @@ -80,27 +76,15 @@ sys.stderr = sys.__stderr__
# Parse may have failed, in which case make a quick check for options ourselves - if options != None: - console = options.console - devel = options.devel - else: - console = "--console" in sys.argv[1:] - devel = "--devel" in sys.argv[1:] - - # Set up logging and IO before we go any further - new_stdout = new_stderr = new_stdin = None + console = (options and options.console) or "--console" in sys.argv[1:] if console: - enable_logging("cumin.master", logging.INFO, sys.stderr) + log_dest = sys.stderr else: - enable_logging("cumin.master", logging.INFO, os.path.join(home, "log", "master.log")) - if devel: - new_stderr = open(os.path.join(home, "log", "master.stderr"), "a") - new_stdout = open(os.path.join(home, "log", "master.stdout"), "a") - sys.stderr = new_stderr - sys.stdout = new_stdout + log_dest = os.path.join(home, "log", "master.log") + enable_logging("cumin.master", logging.INFO, log_dest)
# Parser exited, either on --help or with errors - if options == None: + if not options: return log_parse_errors(r)
if len(args) != 0: @@ -110,48 +94,37 @@ # Get our list of cumin-web and data instances # create list elements to hold the process object, section arg, and app apps = [] - for app in options.webs.split(','): - apps.append([None, app, "cumin-web"]) + for instance in options.webs.split(','): + args, prog_string = get_args("cumin-web", instance, options.init_only, console) + apps.append([None, args, prog_string])
- for app in options.datas.split(','): - apps.append([None, app, "cumin-data"]) + for instance in options.datas.split(','): + args, prog_string = get_args("cumin-data", instance, options.init_only, console) + apps.append([None, args, prog_string])
# If we are just checking startup flags, invoke each instance - # with "--init-only" and return status to caller. Log failure with - # content of stderr from subprocess, if any. + # with "--init-only" and return status to caller. if options.init_only: for app in apps: - arg = [app[PROGRAM], "--init-only", "--section=" + app[SECTION]] - app[PROCESS] = subprocess.Popen(arg, stderr=subprocess.PIPE) - errors = app[PROCESS].communicate()[1] - # Indent any output we received - if len(errors) > 0: - errors = "\n " + errors.replace("\n","\n ") - result = app[PROCESS].wait() - if len(errors) > 0 or result != 0: - msg = "".join([" "+x for x in arg]) + errors - if result != 0: - log.error("Subprocess failed:" + msg) - else: - log.info("Subprocess output:" + msg) - return result + if subprocess.Popen(app[ARGS]).wait() != 0: + log.error("Subprocess failed init check:" + app[PROG_STRING]) + return 1 else: # Launch and babysit try: + for app in apps: + log.info("Starting:" + app[PROG_STRING]) + app[PROCESS] = subprocess.Popen(app[ARGS]) + while True: + sleep(5) for app in apps: - if not app[PROCESS] or app[PROCESS].poll(): - arg = [app[PROGRAM], "--section=" + app[SECTION]] - prog_string = "".join([" "+x for x in arg]) - if app[PROCESS]: - log.warn("Restarting:" + prog_string) - else: - log.info("Starting:" + prog_string) - app[PROCESS] = subprocess.Popen(arg, - stderr=new_stderr, - stdout=new_stdout) - sleep(30) - + return_code = app[PROCESS].poll() + if return_code != None: + if return_code != 0: + log.warn("Subprocess exited with status " + str(return_code)) + log.info("Restarting:" + app[PROG_STRING]) + app[PROCESS] = subprocess.Popen(app[ARGS]) finally: for app in apps: if app[PROCESS]:
Modified: trunk/cumin/bin/cumin-data =================================================================== --- trunk/cumin/bin/cumin-data 2011-03-03 14:34:57 UTC (rev 4569) +++ trunk/cumin/bin/cumin-data 2011-03-03 15:20:16 UTC (rev 4570) @@ -9,51 +9,90 @@ from cumin.config import * from cumin.util import * from mint import * +from parsley.loggingex import PipeLogThread
+def restore_IO(): + sys.stderr = sys.__stderr__ + sys.stdout = sys.__stdout__ + def main(): - setup_initial_logging()
- config, values, opts, args = get_configuration(CuminDataOptionParser(), CuminDataConfig) + # Do our own simple option check so we can redirect IO early + # without worrying about other options or the behavior of optParse + opts = check_for_options(["--section", "--daemon"], sys.argv[1:]) + if type(opts["--section"]) is str: + section_name = opts["--section"] + else: + section_name = "data"
- setup_operational_logging(opts) + # If the --daemon option has been set, redirect stderr and stdio through + # pipes. Launch a thread to read those pipes and direct the content to + # files with rollover control. + pipeThread = None + if opts["--daemon"]: + try: + cumin_home = os.environ.get("CUMIN_HOME", os.path.normpath("/usr/share/cumin")) + err_file = os.path.join(cumin_home, "log", section_name+".stderr") + out_file = os.path.join(cumin_home, "log", section_name+".stdout") + + # Open pipes and redirect + err_r, err_w = os.pipe() + out_r, out_w = os.pipe() + pipeThread = PipeLogThread(descriptors=[err_r, out_r], + paths=[err_file, out_file], + call_on_fail=restore_IO) + pipeThread.start() + sys.stderr = os.fdopen(err_w, "w", 0) + sys.stdout = os.fdopen(out_w, "w", 0) + except: + print_exc() + pipeThread = None + + try: + mint = None + return_code = 0 + setup_initial_logging()
- model_dir = os.path.join(config.home, "model") + config, values, opts, args = get_configuration(CuminDataOptionParser(), CuminDataConfig)
- broker_uris = [x.strip() for x in opts.brokers.split(",")] + setup_operational_logging(opts)
- mint = Mint(model_dir, broker_uris, opts.database) + model_dir = os.path.join(config.home, "model")
- mint.print_event_level = opts.print_events + broker_uris = [x.strip() for x in opts.brokers.split(",")]
- mint.expire_thread.interval = values.expire_interval - mint.expire_thread.threshold = values.expire_threshold + mint = Mint(model_dir, broker_uris, opts.database)
- mint.vacuum_thread.interval = values.vacuum_interval + mint.print_event_level = opts.print_events
- mint.check() - mint.init() + mint.expire_thread.interval = values.expire_interval + mint.expire_thread.threshold = values.expire_threshold
- if values.packages: - packages = list() + mint.vacuum_thread.interval = values.vacuum_interval
- for name in values.packages.split(","): - name = name.strip() + mint.check() + mint.init()
- try: - pkg = mint.model._packages_by_name[name] - except KeyError: - print "No package found for '%s'" % name + if values.packages: + packages = list()
- packages.append(pkg) + for name in values.packages.split(","): + name = name.strip()
- mint.session.qmf_packages = packages + try: + pkg = mint.model._packages_by_name[name] + except KeyError: + print "No package found for '%s'" % name
- if opts.init_only: - return + packages.append(pkg)
- mint.start() + mint.session.qmf_packages = packages
- try: + if opts.init_only: + return + + mint.start() + stats = mint.update_thread.stats count = 0
@@ -72,8 +111,20 @@ else: while True: sleep(86400) + + except (KeyboardInterrupt, SystemExit): + pass + + except: + print_exc() + return_code = 1 + finally: - mint.stop() + if mint: + mint.stop() + if pipeThread: + pipeThread.stop() + sys.exit(return_code)
if __name__ == "__main__": try:
Modified: trunk/cumin/bin/cumin-web =================================================================== --- trunk/cumin/bin/cumin-web 2011-03-03 14:34:57 UTC (rev 4569) +++ trunk/cumin/bin/cumin-web 2011-03-03 15:20:16 UTC (rev 4570) @@ -1,5 +1,4 @@ #!/usr/bin/python - import os import sys
@@ -9,45 +8,100 @@ from cumin import * from cumin.config import * from cumin.util import * +from parsley.loggingex import PipeLogThread
+def restore_IO(): + sys.stderr = sys.__stderr__ + sys.stdout = sys.__stdout__ + def main(): - setup_initial_logging() + # Do our own simple option check so we can redirect IO early + # without worrying about other options or the behavior of optParse + opts = check_for_options(["--section", "--daemon"], sys.argv[1:]) + if type(opts["--section"]) is str: + section_name = opts["--section"] + else: + section_name = "web"
- config, values, opts, args = get_configuration(CuminWebOptionParser(), CuminWebConfig) + # If the --daemon option has been set, redirect stderr and stdio through + # pipes. Launch a thread to read those pipes and direct the content to + # files with rollover control. + pipeThread = None + if opts["--daemon"]: + try: + cumin_home = os.environ.get("CUMIN_HOME", os.path.normpath("/usr/share/cumin")) + err_file = os.path.join(cumin_home, "log", section_name+".stderr") + out_file = os.path.join(cumin_home, "log", section_name+".stdout") + + # Open pipes and redirect + err_r, err_w = os.pipe() + out_r, out_w = os.pipe() + pipeThread = PipeLogThread(descriptors=[err_r, out_r], + paths=[err_file, out_file], + call_on_fail=restore_IO) + pipeThread.start() + sys.stderr = os.fdopen(err_w, "w", 0) + sys.stdout = os.fdopen(out_w, "w", 0) + except: + restore_IO() + print_exc() + pipeThread = None
- setup_operational_logging(opts) + try: + return_code = 0 + cumin = None
- broker_uris = [x.strip() for x in opts.brokers.split(",")] + setup_initial_logging()
- cumin = Cumin(config.get_home(), broker_uris, opts.database, - opts.host, opts.port) + config, values, opts, args = get_configuration(CuminWebOptionParser(), CuminWebConfig)
- cumin.debug = opts.debug - cumin.user = values.user - cumin.update_interval = values.update_interval - cumin.max_qmf_table_sort = values.max_qmf_table_sort + setup_operational_logging(opts)
- # set default values for form inputs - cumin.set_form_defaults(values.request_memory, - values.request_memory_vm, - values.request_disk, - values.request_disk_vm) + broker_uris = [x.strip() for x in opts.brokers.split(",")]
- cumin.check() - cumin.init() + cumin = Cumin(config.get_home(), broker_uris, opts.database, + opts.host, opts.port)
- if opts.init_only: - return + cumin.debug = opts.debug + cumin.user = values.user + cumin.update_interval = values.update_interval + cumin.max_qmf_table_sort = values.max_qmf_table_sort
- cumin.start() + # set default values for form inputs + cumin.set_form_defaults(values.request_memory, + values.request_memory_vm, + values.request_disk, + values.request_disk_vm)
- try: + cumin.check() + cumin.init() + + if opts.init_only: + return + + cumin.start() + while True: # print_threads() + sleep(1) + if not cumin.server_alive(): + print "web server has stopped, exiting..." + return_code = 1 + break
- sleep(5) + except (KeyboardInterrupt, SystemExit): + pass + + except: + print_exc() + return_code = 1 + finally: - cumin.stop() + if cumin: + cumin.stop() + if pipeThread: + pipeThread.stop() + sys.exit(return_code)
if __name__ == "__main__": try:
Modified: trunk/cumin/python/cumin/config.py =================================================================== --- trunk/cumin/python/cumin/config.py 2011-03-03 14:34:57 UTC (rev 4569) +++ trunk/cumin/python/cumin/config.py 2011-03-03 15:20:16 UTC (rev 4570) @@ -117,6 +117,7 @@ OptionParser.__init__(self)
self.add_option("--section", default=default_section) + self.add_option("--daemon", action="store_true", default=False)
# We don't know defaults yet... self.add_option("--database")
Modified: trunk/cumin/python/cumin/main.py =================================================================== --- trunk/cumin/python/cumin/main.py 2011-03-03 14:34:57 UTC (rev 4569) +++ trunk/cumin/python/cumin/main.py 2011-03-03 15:20:16 UTC (rev 4570) @@ -60,6 +60,9 @@
# self.model.sql_logging_enabled = True
+ def server_alive(self): + return self.server.server_alive() + def check(self): log.info("Checking %s", self)
Modified: trunk/cumin/python/cumin/util.py =================================================================== --- trunk/cumin/python/cumin/util.py 2011-03-03 14:34:57 UTC (rev 4569) +++ trunk/cumin/python/cumin/util.py 2011-03-03 15:20:16 UTC (rev 4570) @@ -200,6 +200,35 @@
return rows
+def check_for_options(options,search): + """ + Simple option checking. + + This is a rudimentary option checker that handles option strings of the form + 'option=value' or 'option'. + + The options parameter is a list of items for which to check. The search + parameter is a list of strings to match against the options list. + + A dictionary will be returned keyed on the elements of options. + If an option is not found in the search list, its value in the dictionary + will be None. If it is found, its value will be True if the search list + contains a string of the form 'option' or everything following '=' if the + form is 'option=value' + + """ + result = dict() + for opt in options: + result[opt] = None + for arg in search: + if arg.find(opt) == 0: + rem = arg[len(opt):] + if rem != "" and rem[0] == "=": + result[opt] = rem[1:] + else: + result[opt] = True + return result + def rgb_to_string(r, g, b): hr, hg, hb = [hex(min(int(n*255), 255))[2:] for n in(r, g, b)] hList = [] @@ -208,4 +237,3 @@ hList.append(n.rjust(2, '0').upper())
return "".join(hList) -
Modified: trunk/parsley/python/parsley/loggingex.py =================================================================== --- trunk/parsley/python/parsley/loggingex.py 2011-03-03 14:34:57 UTC (rev 4569) +++ trunk/parsley/python/parsley/loggingex.py 2011-03-03 15:20:16 UTC (rev 4570) @@ -1,8 +1,12 @@ import logging import os +import traceback +import select from logging.handlers import RotatingFileHandler from collectionsex import defaultdict +from threading import Thread
+ _levels_by_name = { "debug": logging.DEBUG, "info": logging.INFO, @@ -78,3 +82,95 @@
for handler in handlers: log.removeHandler(handler) + +class PipeLogThread(Thread): + """ + Thread class for reading multiple file descriptors and redirecting to files. + + This class takes a list of file descriptors and a parallel list of paths. + It opens the file descriptors for read and opens the paths for write. + Via select(), it waits for a descriptor to become readable and then + writes the content to the file at the corresponding path. + + If rollover_size is non-zero, the size of output files will be checked before each + write. If the file size is greater than or equal to the rollover_size, the file + will be closed and reopened for write as an empty file. + + If call_on_fail is callable, it will be called if the thread gets an + exception during the run() routine. This is useful for notifying other threads + that the thread has exited unexpectedly, potentially allowing them to clean up + the IO being handled by this thread. + """ + def __init__(self, descriptors, paths, call_on_fail=None, rollover_size = 1024*1024): + + assert len(descriptors) == len(paths) + assert type(descriptors) in (list,tuple) + assert type(paths) in (list,tuple) + assert self.check_unique(descriptors) + assert self.check_unique(paths) + assert type(rollover_size) is int + + super(PipeLogThread, self).__init__() + + self.name = self.__class__.__name__ + + self.rollover_size = rollover_size + self.call_on_fail = call_on_fail + + # Open an output file at the designated path for each input descriptor + self.descriptors = descriptors + self.files = dict() + for desc, path in zip(self.descriptors, paths): + creating = not os.path.exists(path) + f = open(path, "a") + if creating: + set_log_owner(path) + self.files[desc] = [f, path] + + # Open a pipe for exit signaling + self.stop_r, self.stop_w = os.pipe() + + def check_unique(self, values): + return len(set(values)) == len(values) + + def check_rollover(self, file): + if os.path.getsize(file[1]) >= self.rollover_size: + file[0].close() + file[0] = open(file[1], "w") + + def run(self): + # The last thing we want is unhandled exceptions in this thread + # which could leave IO broken. + stop = False + try: + while not stop: + r, w, e = select.select(self.descriptors+[self.stop_r], [], []) + for desc in r: + if desc == self.stop_r: + stop = True + else: + msg = os.read(desc,1024) + if msg != "": + if self.rollover_size != 0: + self.check_rollover(self.files[desc]) + self.files[desc][0].write(msg) + self.files[desc][0].flush() + + except Exception, e: + if callable(self.call_on_fail): + self.call_on_fail() + traceback.print_exc() + + finally: + for key, value in self.files.items(): + value[0].close() + + def stop(self): + if self.isAlive(): + os.write(self.stop_w,"\n") + # This gives the thread a chance to write out + # anything pending on an input pipe. + for i in range(5): + if not self.isAlive(): + break + sleep(1)
Modified: trunk/wooly/python/wooly/server.py =================================================================== --- trunk/wooly/python/wooly/server.py 2011-03-03 14:34:57 UTC (rev 4569) +++ trunk/wooly/python/wooly/server.py 2011-03-03 15:20:16 UTC (rev 4570) @@ -23,6 +23,9 @@ self.client_sessions_by_id = dict() self.client_session_expire_thread = ClientSessionExpireThread(self)
+ def server_alive(self): + return self.dispatch_thread.is_alive() + def init(self): return # XXX urgh
cumin-developers@lists.fedorahosted.org