Repository :
http://git.fedorahosted.org/cgit/
On branch : master
---------------------------------------------------------------
commit e812c19a8202a92cdcfbfcac8d6792454e5ff878
Author: Matt Domsch <matt(a)domsch.com>
Date: Fri Jul 19 15:14:34 2013 -0500
crawler: rethink child reaping, other fixes
---------------------------------------------------------------
server/crawler | 117 +++++++++++++++++++++++++++++++++++++++-----------------
1 files changed, 82 insertions(+), 35 deletions(-)
diff --git a/server/crawler b/server/crawler
index 9842ae5..d0d89f1 100755
--- a/server/crawler
+++ b/server/crawler
@@ -4,9 +4,11 @@ import pkg_resources
pkg_resources.require("TurboGears")
import datetime as dt
+import errno
import logging
from optparse import OptionParser
import os
+import signal
from subprocess import Popen
import sys
import time
@@ -26,69 +28,102 @@ logger = None
class ForkingMaster:
def __init__(self, max_children = 10):
- self.active_children = None
+ self.active_children = []
self.max_children = max_children
self.devnull = open('/dev/null', 'rw')
self.timings = {}
+
+ def check_timedout_children(self):
+ now = dt.datetime.utcnow()
+ for child in self.active_children:
+ if child.kill_time < now:
+ try:
+ os.kill(child.pid, signal.SIGKILL) # SIGTERM wasn't enough
+ logger.info('Killed process %d' % child.pid)
+ except: # the process could be gone
+ pass
+ else:
+ # items lower on this list are newer, no need to check
+ break
+ return None
+
def collect_children(self):
"""Internal routine to wait for died children."""
- while self.active_children:
- if len(self.active_children) < self.max_children:
- options = os.WNOHANG
- else:
- # If the maximum number of children are already
- # running, block while waiting for a child to exit
- options = 0
+ options = os.WNOHANG
+ while True:
try:
pid, status = os.waitpid(0, options)
- except os.error:
+ except OSError as e:
+ logger.debug(str(e))
+ if e.errno == errno.ECHILD:
+ logger.debug("Got ECHILD. Number of active children:
%d/%d" % (len(self.active_children), self.max_children))
+ return False
pid = None
- if not pid: break
- self.active_children.remove(pid)
- self.stop_time(pid)
+
+ if not pid:
+ # no child was ready, see if any should be killed
+ self.check_timedout_children()
+ return
+ # a child should be reaped
+ for p in self.active_children:
+ if p.pid == pid:
+ self.stop_time(p)
+ logger.debug("Removing child pid %d" % p.pid)
+ self.active_children.remove(p)
+ return True
def process_request(self, command, args, host):
"""Fork a new subprocess to process the
request."""
- self.collect_children()
- logging.info("Starting crawler %s: %s" % (host.name, args))
+ logger.info("Starting crawler %s: %s" % (host.name, args))
p = Popen(args, executable=command, stdin=self.devnull, stdout=self.devnull,
stderr=self.devnull, close_fds=True)
- self.start_time(p.pid, host.id)
- # Parent process
- if self.active_children is None:
- self.active_children = []
- self.active_children.append(p.pid)
+ self.start_time(p, host.id)
+ logger.debug("Adding child pid %d" % p.pid)
+ self.active_children.append(p)
+ logger.debug("Number of active children now: %d" %
len(self.active_children))
+
+ def wait_for_available_slot(self):
+ logger.debug("Waiting for a slot: Number of active children: %d/%d" %
(len(self.active_children), self.max_children))
+ while len(self.active_children) >= self.max_children:
+ self.collect_children()
+ time.sleep(1)
def wait_for_completion(self):
self.max_children = 0
- self.collect_children()
+ self.wait_for_available_slot()
- def start_time(self, pid, hostid):
- self.timings[pid] = dict()
- self.timings[pid]['start'] = dt.datetime.utcnow()
- self.timings[pid]['hostid'] = hostid
+ def start_time(self, p, hostid):
+ now = dt.datetime.utcnow()
+ p.kill_time = now + dt.timedelta(seconds=(options.timeout_minutes * 60))
+ self.timings[p.pid] = dict()
+ self.timings[p.pid]['start'] = now
+ self.timings[p.pid]['hostid'] = hostid
- def stop_time(self, pid):
- self.timings[pid]['stop'] = dt.datetime.utcnow()
+ def stop_time(self, p):
+ self.timings[p.pid]['stop'] = dt.datetime.utcnow()
- diff = self.timings[pid]['stop'] - self.timings[pid]['start']
- host = Host.get(self.timings[pid]['hostid'])
+ diff = self.timings[p.pid]['stop'] -
self.timings[p.pid]['start']
+ host = Host.get(self.timings[p.pid]['hostid'])
logger.info('Host %s (id=%s) crawl time %s' % (host.name, host.id,
str(diff)))
- del self.timings[pid]
+ del self.timings[p.pid]
+
+
def doit():
master = ForkingMaster(max_children=options.threads)
- command = '/usr/share/mirrormanager/server/crawler_perhost'
- commonargs = [ command, '-c', options.config, '--timeout-minutes',
'90']
+ commonargs = [ options.crawler_perhost, '-c', options.config,
'--debug']
if options.canary:
commonargs.append('--canary')
-
numhosts = Host.selectBy(private=False).count()
i = 0
- for h in Host.selectBy(private=False).orderBy('id'):
+ for h in list(Host.selectBy(private=False).orderBy('id')):
i += 1
try:
+ if h.id < options.startid: continue
+ if h.id >= options.stopid: break
+ master.wait_for_available_slot()
+
if not (h.admin_active and h.user_active and h.site.user_active and
h.site.admin_active):
continue
if h.site.private:
@@ -100,7 +135,7 @@ def doit():
hostargs.extend(['--logfile', logfilename])
args = commonargs + hostargs
logger.debug('starting crawler for host %s (id=%d) %d/%d' % (h.name,
h.id, i, numhosts))
- master.process_request(command, args, h)
+ master.process_request(options.crawler_perhost, args, h)
except AssertionError: # someone deleted our host while we were looking at it
continue
@@ -125,7 +160,7 @@ def main():
parser = OptionParser(usage=sys.argv[0] + " [options]")
parser.add_option("-c", "--config",
- dest="config", default='dev.cfg',
+ dest="config",
default='/etc/mirrormanager/prod.cfg',
help="TurboGears config file to use")
parser.add_option("--include-private",
@@ -138,9 +173,21 @@ def main():
parser.add_option("-l", "--logdir", type="string",
metavar="DIR",
dest="logdir",
default='/var/log/mirrormanager/crawler',
help="write individual host logfiles to DIR")
+ parser.add_option("--timeout-minutes", type="int",
+ dest="timeout_minutes", default=90,
+ help="per-host timeout, in minutes")
parser.add_option("--logfile", type="string",
metavar="FILE",
dest="logfile",
default='/var/log/mirrormanager/crawler.log',
help="write logfile to FILE")
+ parser.add_option("--startid", type="int",
metavar="ID",
+ dest="startid", default=0,
+ help="Start crawling at host ID (default=0)")
+ parser.add_option("--stopid", type="int",
metavar="ID",
+ dest="stopid", default=sys.maxint,
+ help="Stop crawling before host ID (default=maxint)")
+ parser.add_option("--crawler_perhost", type="string",
metavar="FILE",
+ dest="crawler_perhost",
default='/usr/share/mirrormanager/server/crawler_perhost',
+ help="Per-host crawler executable
(default=/usr/share/mirrormanager/server/crawler_perhost")
parser.add_option("--canary",
dest="canary", action="store_true",
default=False,