Repository :
http://git.fedorahosted.org/cgit/copr.git
On branch : master
---------------------------------------------------------------
commit d05eb48433da4dabbe67d58a7fdeb91c2177c1be
Merge: 5fdd930 7ae3974
Author: Seth Vidal <skvidal(a)fedoraproject.org>
Date: Mon Dec 10 17:02:09 2012 -0500
Merge branch 'skvidal-backend'
* skvidal-backend: (63 commits)
rename file to README and explain where to look
...
---------------------------------------------------------------
TODO-backend | 19 ++
backend/__init__.py | 7 +
backend/dispatcher.py | 300 +++++++++++++++++++++++
backend/errors.py | 12 +
backend/mockremote.py | 645 +++++++++++++++++++++++++++++++++++++++++++++++++
copr-be.conf.example | 16 ++
copr-be.py | 263 ++++++++++++++++++++
7 files changed, 1262 insertions(+), 0 deletions(-)
diff --git a/TODO-backend b/TODO-backend
new file mode 100644
index 0000000..4cc5ca1
--- /dev/null
+++ b/TODO-backend
@@ -0,0 +1,19 @@
+
+- change instance type by build request for more mem/procs/extend timeouts
+ - use extra-vars?
+ - need ansible 0.9?
+- auto-timer/cleanup script for old instances that may have been orphaned
+- prune out builders when we drop the number of them active
+- LOADS of fixme and catching weird conditions
+- make logging from mockremote more sane and coinsistent
+- mock configs should be pushed to instances at creation time
+ - single url to repos, not mirrorlists
+- consider making each worker return job to a completed queue so the primary
+ process can do other kinds of notification
+- email notifications from backend?
+- refactor mockremote/dispatcher.worker together?
+- work on a way to find and cancel a specific build that's happening other than just
killing the instance
+- determine if it is properly checking the timeout from a dead instance
+- maybe dump out the PID of the worker that is running so we know which one to kill?
+- failure/success not being returned correctly. Should check for 'fail' in the
directories and return based on
+ that. also anything lacking success is a failure.
diff --git a/backend/__init__.py b/backend/__init__.py
new file mode 100644
index 0000000..e25106e
--- /dev/null
+++ b/backend/__init__.py
@@ -0,0 +1,7 @@
+# part of copr backend
+# skvidal(a)fedoraproject.org - seth vidal
+# (c) copyright Red Hat, Inc 2012
+# gplv2+
+
+__version__ = "0.1"
+__author__ = "Seth Vidal"
diff --git a/backend/dispatcher.py b/backend/dispatcher.py
new file mode 100644
index 0000000..9b0399d
--- /dev/null
+++ b/backend/dispatcher.py
@@ -0,0 +1,300 @@
+#!/usr/bin/python -tt
+
+
+import os
+import sys
+import multiprocessing
+import time
+import Queue
+import json
+import mockremote
+from bunch import Bunch
+import errors
+import ansible
+import ansible.playbook
+import ansible.errors
+from ansible import callbacks
+import requests
+
+
+
+
+class SilentPlaybookCallbacks(callbacks.PlaybookCallbacks):
+ ''' playbook callbacks - quietly! '''
+
+ def __init__(self, verbose=False):
+
+ self.verbose = verbose
+
+ def on_start(self):
+ callbacks.call_callback_module('playbook_on_start')
+
+ def on_notify(self, host, handler):
+ callbacks.call_callback_module('playbook_on_notify', host, handler)
+
+ def on_no_hosts_matched(self):
+ callbacks.call_callback_module('playbook_on_no_hosts_matched')
+
+ def on_no_hosts_remaining(self):
+ callbacks.call_callback_module('playbook_on_no_hosts_remaining')
+
+ def on_task_start(self, name, is_conditional):
+ callbacks.call_callback_module('playbook_on_task_start', name,
is_conditional)
+
+ def on_vars_prompt(self, varname, private=True, prompt=None, encrypt=None,
confirm=False, salt_size=None, salt=None):
+ result = None
+ print "***** VARS_PROMPT WILL NOT BE RUN IN THIS KIND OF PLAYBOOK
*****"
+ callbacks.call_callback_module('playbook_on_vars_prompt', varname,
private=private, prompt=prompt, encrypt=encrypt, confirm=confirm, salt_size=salt_size,
salt=None)
+ return result
+
+ def on_setup(self):
+ callbacks.call_callback_module('playbook_on_setup')
+
+ def on_import_for_host(self, host, imported_file):
+ callbacks.call_callback_module('playbook_on_import_for_host', host,
imported_file)
+
+ def on_not_import_for_host(self, host, missing_file):
+ callbacks.call_callback_module('playbook_on_not_import_for_host', host,
missing_file)
+
+ def on_play_start(self, pattern):
+ callbacks.call_callback_module('playbook_on_play_start', pattern)
+
+ def on_stats(self, stats):
+ callbacks.call_callback_module('playbook_on_stats', stats)
+
+
+class WorkerCallback(object):
+ def __init__(self, logfile=None):
+ self.logfile = logfile
+
+ def log(self, msg):
+ if not self.logfile:
+ return
+
+ now = time.strftime('%F %T')
+ try:
+ open(self.logfile, 'a').write(str(now) + ': ' + msg +
'\n')
+ except (IOError, OSError), e:
+ print >>sys.stderr, 'Could not write to logfile %s - %s' %
(self.logfile, str(e))
+
+
+class Worker(multiprocessing.Process):
+ def __init__(self, opts, jobs, worker_num, ip=None, create=True, callback=None):
+
+ # base class initialization
+ multiprocessing.Process.__init__(self, name="worker-builder")
+
+
+ # job management stuff
+ self.jobs = jobs
+ self.worker_num = worker_num
+ self.ip = ip
+ self.opts = opts
+ self.kill_received = False
+ self.callback = callback
+ self.create = create
+ if not self.callback:
+ self.logfile = self.opts.worker_logdir + '/worker-%s.log' %
self.worker_num
+ self.callback = WorkerCallback(logfile = self.logfile)
+
+ if ip:
+ self.callback.log('creating worker: %s' % ip)
+ else:
+ self.callback.log('creating worker: dynamic ip')
+
+ def spawn_instance(self):
+ """call the spawn playbook to startup/provision a building
instance"""
+ self.callback.log('spawning instance begin')
+ start = time.time()
+
+ stats = callbacks.AggregateStats()
+ playbook_cb = SilentPlaybookCallbacks(verbose=False)
+ runner_cb = callbacks.DefaultRunnerCallbacks()
+ # fixme - extra_vars to include ip as a var if we need to specify ips
+ # also to include info for instance type to handle the memory requirements of
builds
+ play = ansible.playbook.PlayBook(stats=stats, playbook=self.opts.spawn_playbook,
+ callbacks=playbook_cb, runner_callbacks=runner_cb,
+ remote_user='root')
+
+ play.run()
+ self.callback.log('spawning instance end')
+ self.callback.log('Instance spawn/provision took %s sec' % (time.time() -
start))
+
+ if self.ip:
+ return self.ip
+
+ for i in play.SETUP_CACHE:
+ if i =='localhost':
+ continue
+ return i
+
+ # if we get here we're in trouble
+ self.callback.log('No IP back from spawn_instance - dumping cache
output')
+ self.callback.log(str(play.SETUP_CACHE))
+ self.callback.log(str(play.stats.summarize('localhost')))
+ self.callback.log('Test spawn_instance playbook manually')
+
+ return None
+
+ def terminate_instance(self,ip):
+ """call the terminate playbook to destroy the building
instance"""
+ self.callback.log('terminate instance begin')
+
+ stats = callbacks.AggregateStats()
+ playbook_cb = SilentPlaybookCallbacks(verbose=False)
+ runner_cb = callbacks.DefaultRunnerCallbacks()
+ play = ansible.playbook.PlayBook(host_list=[ip], stats=stats,
playbook=self.opts.terminate_playbook,
+ callbacks=playbook_cb, runner_callbacks=runner_cb,
+ remote_user='root')
+
+ play.run()
+ self.callback.log('terminate instance end')
+
+ def parse_job(self, jobfile):
+ # read the json of the job in
+ # break out what we need return a bunch of the info we need
+ build = json.load(open(jobfile))
+ jobdata = Bunch()
+ jobdata.pkgs = build['pkgs'].split(' ')
+ jobdata.repos = [r for r in build['repos'].split(' ') if
r.strip() ]
+ jobdata.chroots = build['chroots'].split(' ')
+ jobdata.memory_reqs = build['memory_reqs']
+ jobdata.timeout = build['timeout']
+ jobdata.destdir = self.opts.destdir + '/' +
build['copr']['owner']['name'] + '/' +
build['copr']['name'] + '/'
+ jobdata.build_id = build['id']
+ jobdata.results = self.opts.results_baseurl + '/' +
build['copr']['owner']['name'] + '/' +
build['copr']['name'] + '/'
+ jobdata.copr_id = build['copr']['id']
+ jobdata.user_id = build['user_id']
+ return jobdata
+
+ # maybe we move this to the callback?
+ def post_to_frontend(self, data):
+ """send data to frontend"""
+
+ headers = {'content-type': 'application/json'}
+ url='%s/update_builds/' % self.opts.frontend_url
+ auth=('user', self.opts.frontend_auth)
+
+ msg = None
+ try:
+ r = requests.post(url, data=json.dumps(data), auth=auth,
+ headers=headers)
+ if r.status_code != 200:
+ msg = 'Failed to submit to frontend: %s: %s' % (r.status_code,
r.text)
+ except requests.RequestException, e:
+ msg = 'Post request failed: %s' % e
+
+ if msg:
+ self.callback.log(msg)
+ return False
+
+ return True
+
+ # maybe we move this to the callback?
+ def mark_started(self, job):
+
+ build = {'id':job.build_id,
+ 'started_on': job.started_on,
+ 'results': job.results,
+ }
+ data = {'builds':[build]}
+
+ if not self.post_to_frontend(data):
+ raise errors.CoprWorkerError, "Could not communicate to front end to
submit status info"
+
+ # maybe we move this to the callback?
+ def return_results(self, job):
+ self.callback.log('%s status %s. Took %s seconds' % (job.build_id,
job.status, job.ended_on - job.started_on))
+
+ build = {'id':job.build_id,
+ 'ended_on': job.ended_on,
+ 'status': job.status,
+ }
+ data = {'builds':[build]}
+
+ if not self.post_to_frontend(data):
+ raise errors.CoprWorkerError, "Could not communicate to front end to
submit results"
+
+ os.unlink(job.jobfile)
+
+ def run(self):
+ # worker should startup and check if it can function
+ # for each job it takes from the jobs queue
+ # run opts.setup_playbook to create the instance
+ # do the build (mockremote)
+ # terminate the instance
+
+ while not self.kill_received:
+ try:
+ jobfile = self.jobs.get()
+ except Queue.Empty:
+ break
+
+ # parse the job json into our info
+ job = self.parse_job(jobfile)
+
+ # FIXME
+ # this is our best place to sanity check the job before starting
+ # up any longer process
+
+ job.jobfile = jobfile
+
+ # spin up our build instance
+ if self.create:
+ try:
+ ip = self.spawn_instance()
+ if not ip:
+ raise errors.CoprWorkerError, "No IP found from creating
instance"
+
+ except ansible.errors.AnsibleError, e:
+ self.callback.log('failure to setup instance: %s' % e)
+ raise
+
+ status = 1
+ job.started_on = time.time()
+ self.mark_started(job)
+
+ for chroot in job.chroots:
+
+ chroot_destdir = job.destdir + '/' + chroot
+ # setup our target dir locally
+ if not os.path.exists(chroot_destdir):
+ try:
+ os.makedirs(chroot_destdir)
+ except (OSError, IOError), e:
+ msg = "Could not make results dir for job: %s - %s" %
(chroot_destdir, str(e))
+ self.callback.log(msg)
+ status = 0
+ continue
+
+ # FIXME
+ # need a plugin hook or some mechanism to check random
+ # info about the pkgs
+ # this should use ansible to download the pkg on the remote system
+ # and run a series of checks on the package before we
+ # start the build - most importantly license checks.
+
+
+ self.callback.log('Starting build: id=%r builder=%r timeout=%r
destdir=%r chroot=%r repos=%r' % (job.build_id,ip, job.timeout, job.destdir, chroot,
str(job.repos)))
+ self.callback.log('building pkgs: %s' % '
'.join(job.pkgs))
+ try:
+ chrootlogfile = chroot_destdir + '/mockremote.log'
+ mr = mockremote.MockRemote(builder=ip, timeout=job.timeout,
+ destdir=job.destdir, chroot=chroot, cont=True, recurse=True,
+ repos=job.repos,
+
callback=mockremote.CliLogCallBack(quiet=True,logfn=chrootlogfile))
+ mr.build_pkgs(job.pkgs)
+ except mockremote.MockRemoteError, e:
+ # record and break
+ self.callback.log('%s - %s' % (ip, e))
+ status = 0 # failure
+ self.callback.log('Finished build: builder=%r timeout=%r destdir=%r
chroot=%r repos=%r' % (ip, job.timeout, job.destdir, chroot, str(job.repos)))
+
+ job.ended_on = time.time()
+ job.status = status
+ self.return_results(job)
+ self.callback.log('worker finished build: %s' % ip)
+ # clean up the instance
+ if self.create:
+ self.terminate_instance(ip)
+
diff --git a/backend/errors.py b/backend/errors.py
new file mode 100644
index 0000000..ae8ac34
--- /dev/null
+++ b/backend/errors.py
@@ -0,0 +1,12 @@
+# copr error/exceptions
+class CoprBackendError(Exception):
+
+ def __init__(self, msg):
+ self.msg = msg
+
+ def __str__(self):
+ return self.msg
+
+class CoprWorkerError(CoprBackendError):
+ pass
+
diff --git a/backend/mockremote.py b/backend/mockremote.py
new file mode 100755
index 0000000..2c641c8
--- /dev/null
+++ b/backend/mockremote.py
@@ -0,0 +1,645 @@
+#!/usr/bin/python -tt
+# by skvidal
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Library General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+# copyright 2012 Red Hat, Inc.
+
+
+# take list of pkgs
+# take single hostname
+# send 1 pkg at a time to host
+# build in remote w/mockchain
+# rsync results back
+# repeat
+# take args from mockchain (more or less)
+
+
+import os
+import sys
+import subprocess
+
+import ansible.runner
+import optparse
+from operator import methodcaller
+import time
+import socket
+import traceback
+
+# where we should execute mockchain from on the remote
+mockchain='/usr/bin/mockchain'
+# rsync path
+rsync='/usr/bin/rsync'
+
+DEF_REMOTE_BASEDIR='/var/tmp'
+DEF_TIMEOUT=3600
+DEF_REPOS = []
+DEF_CHROOT= None
+DEF_USER = 'mockbuilder'
+DEF_DESTDIR = os.getcwd()
+
+class SortedOptParser(optparse.OptionParser):
+ '''Optparser which sorts the options by opt before outputting
--help'''
+ def format_help(self, formatter=None):
+ self.option_list.sort(key=methodcaller('get_opt_string'))
+ return optparse.OptionParser.format_help(self, formatter=None)
+
+
+def createrepo(path):
+ if os.path.exists(path + '/repodata/repomd.xml'):
+ comm = ['/usr/bin/createrepo', '--update', path]
+ else:
+ comm = ['/usr/bin/createrepo', path]
+ cmd = subprocess.Popen(comm,
+ stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ out, err = cmd.communicate()
+ return cmd.returncode, out, err
+
+def read_list_from_file(fn):
+ lst = []
+ f = open(fn, 'r')
+ for line in f.readlines():
+ line = line.replace('\n','')
+ line = line.strip()
+ if line.startswith('#'):
+ continue
+ lst.append(line)
+
+ return lst
+
+def log(lf, msg):
+ if lf:
+ now = time.time()
+ try:
+ open(lf, 'a').write(str(now) + ':' + msg + '\n')
+ except (IOError, OSError), e:
+ print 'Could not write to logfile %s - %s' % (lf, str(e))
+ print msg
+
+def get_ans_results(results, hostname):
+ if hostname in results['dark']:
+ return results['dark'][hostname]
+ if hostname in results['contacted']:
+ return results['contacted'][hostname]
+
+ return {}
+
+def _create_ans_conn(hostname, username, timeout):
+ ans_conn = ansible.runner.Runner(remote_user=username,
+ host_list=[hostname], pattern=hostname, forks=1,
+ timeout=timeout)
+ return ans_conn
+
+def check_for_ans_error(results, hostname, err_codes=[], success_codes=[0],
+ return_on_error=['stdout', 'stderr']):
+ # returns True or False + dict
+ # dict includes 'msg'
+ # may include 'rc', 'stderr', 'stdout' and any other
+ # requested result codes
+ err_results = {}
+
+ if 'dark' in results and hostname in results['dark']:
+ err_results['msg'] = "Error: Could not contact/connect to %s."
% hostname
+ return (True, err_results)
+
+ error = False
+
+ if err_codes or success_codes:
+ if hostname in results['contacted']:
+ if 'rc' in results['contacted'][hostname]:
+ rc = int(results['contacted'][hostname]['rc'])
+ err_results['rc'] = rc
+ # check for err codes first
+ if rc in err_codes:
+ error = True
+ err_results['msg'] = 'rc %s matched err_codes' % rc
+ elif rc not in success_codes:
+ error = True
+ err_results['msg'] = 'rc %s not in success_codes' %
rc
+ elif 'failed' in results['contacted'][hostname] and
results['contacted'][hostname]['failed']:
+ error = True
+ err_results['msg'] = 'results included failed as true'
+
+ if error:
+ for item in return_on_error:
+ if item in results['contacted'][hostname]:
+ err_results[item] = results['contacted'][hostname][item]
+
+ return error, err_results
+
+
+class MockRemoteError(Exception):
+
+ def __init__(self, msg):
+ self.msg = msg
+
+ def __str__(self):
+ return self.msg
+
+class BuilderError(MockRemoteError):
+ pass
+
+class DefaultCallBack(object):
+ def __init__(self, **kwargs):
+ self.quiet = kwargs.get('quiet', False)
+ self.logfn = kwargs.get('logfn', None)
+
+ def start_build(self, pkg):
+ pass
+
+ def end_build(self, pkg):
+ pass
+
+ def start_download(self, pkg):
+ pass
+
+ def end_download(self, pkg):
+ pass
+
+ def error(self, msg):
+ self.log("Error: %s" % msg)
+
+ def log(self, msg):
+ if not self.quiet:
+ print msg
+
+class CliLogCallBack(DefaultCallBack):
+ def __init__(self, **kwargs):
+ DefaultCallBack.__init__(self, **kwargs)
+
+ def start_build(self, pkg):
+ msg = "Start build: %s" % pkg
+ self.log(msg)
+
+
+ def end_build(self, pkg):
+ msg = "End Build: %s" % pkg
+ self.log(msg)
+
+ def start_download(self, pkg):
+ msg = "Start retrieve results for: %s" % pkg
+ self.log(msg)
+
+ def end_download(self, pkg):
+ msg = "End retrieve results for: %s" % pkg
+ self.log(msg)
+
+ def error(self, msg):
+ self.log("Error: %s" % msg)
+
+ def log(self, msg):
+ if self.logfn:
+ now = time.time()
+ try:
+ open(self.logfn, 'a').write(str(now) + ':' + msg +
'\n')
+ except (IOError, OSError), e:
+ print >>sys.stderr, 'Could not write to logfile %s - %s' %
(self.lf, str(e))
+ if not self.quiet:
+ print msg
+
+class Builder(object):
+ def __init__(self, hostname, username, timeout, mockremote):
+ self.hostname = hostname
+ self.username = username
+ self.timeout = timeout
+ self.chroot = mockremote.chroot
+ self.repos = mockremote.repos
+ self.mockremote = mockremote
+ self.checked = False
+ self._tempdir = None
+ # check out the host - make sure it can build/be contacted/etc
+ self.check()
+ # if we're at this point we've connected and done stuff on the host
+ self.conn = _create_ans_conn(self.hostname, self.username, self.timeout)
+
+ @property
+ def remote_build_dir(self):
+ return self.tempdir + '/build/'
+
+ @property
+ def tempdir(self):
+ if self.mockremote.remote_tempdir:
+ return self.mockremote.remote_tempdir
+
+ if self._tempdir:
+ return self._tempdir
+
+ cmd='/bin/mktemp -d %s/%s-XXXXX' % (self.mockremote.remote_basedir,
'mockremote')
+ self.conn.module_name="shell"
+ self.conn.module_args = str(cmd)
+ results = self.conn.run()
+ tempdir = None
+ for hn, resdict in results['contacted'].items():
+ tempdir = resdict['stdout']
+
+ # if still nothing then we've broken
+ if not tempdir:
+ raise BuilderError('Could not make tmpdir on %s' % self.hostname)
+
+ cmd = "/bin/chmod 755 %s" % tempdir
+ self.conn.module_args = str(cmd)
+ self.conn.run()
+ self._tempdir = tempdir
+
+ return self._tempdir
+
+ @tempdir.setter
+ def tempdir(self, value):
+ self._tempdir = value
+
+ def _get_remote_pkg_dir(self, pkg):
+ # the pkg will build into a dir by mockchain named:
+ # $tempdir/build/results/$chroot/$packagename
+ s_pkg = os.path.basename(pkg)
+ pdn = s_pkg.replace('.src.rpm', '')
+ remote_pkg_dir = self.remote_build_dir + '/results/' + self.chroot +
'/' + pdn
+ return remote_pkg_dir
+
+ def build(self, pkg):
+
+ # build the pkg passed in
+ # add pkg to various lists
+ # check for success/failure of build
+ # return success/failure,stdout,stderr of build command
+ # returns success_bool, out, err
+
+ success = False
+
+ # check if pkg is local or http
+ dest = None
+ if os.path.exists(pkg):
+ dest = self.tempdir + '/' + os.path.basename(pkg)
+ self.conn.module_name="copy"
+ margs = 'src=%s dest=%s' % (pkg, dest)
+ self.conn.module_args = str(margs)
+ self.mockremote.callback.log("Sending %s to %s to build" %
(os.path.basename(pkg), self.hostname))
+
+ # FIXME should probably check this but <shrug>
+ self.conn.run()
+ else:
+ dest = pkg
+
+ # construct the mockchain command
+ buildcmd = '%s -r %s -l %s ' % (mockchain, self.chroot,
self.remote_build_dir)
+ for r in self.repos:
+ buildcmd += '-a %s ' % r
+
+ buildcmd += dest
+
+ #print ' Running %s on %s' % (buildcmd, hostname)
+ # run the mockchain command async
+ # this runs it sync - FIXME
+ self.conn.module_name="shell"
+ self.conn.module_args = str(buildcmd)
+ results = self.conn.run()
+
+ is_err, err_results = check_for_ans_error(results, self.hostname,
success_codes=[0],
+ return_on_error=['stdout', 'stderr'])
+ if is_err:
+ return success, err_results.get('stdout', ''),
err_results.get('stderr', '')
+
+ # we know the command ended successfully but not if the pkg built successfully
+ myresults = get_ans_results(results, self.hostname)
+ out = myresults.get('stdout', '')
+ err = myresults.get('stderr', '')
+
+ successfile = self._get_remote_pkg_dir(pkg) + '/success'
+ testcmd = '/usr/bin/test -f %s' % successfile
+ self.conn.module_args = str(testcmd)
+ results = self.conn.run()
+ is_err, err_results = check_for_ans_error(results, self.hostname,
success_codes=[0])
+ if not is_err:
+ success = True
+
+ return success, out, err
+
+ def download(self, pkg, destdir):
+ # download the pkg to destdir using rsync + ssh
+ # return success/failure, stdout, stderr
+
+ success = False
+ rpd = self._get_remote_pkg_dir(pkg)
+ destdir = "'" + destdir.replace("'",
"'\\''") + "'" # make spaces work w/our rsync command
below :(
+ # build rsync command line from the above
+ remote_src = '%s@%s:%s' % (self.username, self.hostname, rpd)
+ ssh_opts = "'ssh -o PasswordAuthentication=no -o
StrictHostKeyChecking=no'"
+ command = "%s -avH -e %s %s %s/" % (rsync, ssh_opts, remote_src,
destdir)
+ cmd = subprocess.Popen(command, shell=True,
+ stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+
+ # rsync results into opts.destdir
+ out, err = cmd.communicate()
+ if cmd.returncode:
+ success = False
+ else:
+ success = True
+
+ return success, out, err
+
+ def check(self):
+ # do check of host
+ # set checked if successful
+ # return success/failure, errorlist
+
+ if self.checked:
+ return True, []
+
+ errors = []
+
+ try:
+ socket.gethostbyname(self.hostname)
+ except socket.gaierror:
+ raise BuilderError('%s could not be resolved' % self.hostname)
+
+ # connect as user
+ ans = ansible.runner.Runner(host_list=[self.hostname], pattern='*',
+ remote_user=self.username, forks=1, timeout=20)
+ ans.module_name = "shell"
+ ans.module_args = str("/bin/rpm -q mock rsync")
+ res = ans.run()
+ # check for mock/rsync from results
+ is_err, err_results = check_for_ans_error(res, self.hostname, success_codes=[0])
+ if is_err:
+ if 'rc' in err_results:
+ errors.append('Warning: %s does not have mock or rsync installed'
% self.hostname)
+ else:
+ errors.append(err_results['msg'])
+
+
+ # test for path existence for mockchain and chroot config for this chroot
+ ans.module_name = "shell"
+ ans.module_args = str("/usr/bin/test -f %s && /usr/bin/test -f
/etc/mock/%s.cfg" % (mockchain, self.chroot))
+ res = ans.run()
+
+ is_err, err_results = check_for_ans_error(res, self.hostname, success_codes=[0])
+ if is_err:
+ if 'rc' in err_results:
+ errors.append('Warning: %s lacks mockchain or the chroot %s' %
(self.hostname, self.chroot))
+ else:
+ errors.append(err_results['msg'])
+
+ if not errors:
+ self.checked = True
+ else:
+ msg = '\n'.join(errors)
+ raise BuilderError(msg)
+
+
+class MockRemote(object):
+ def __init__(self, builder=None, user=DEF_USER, timeout=DEF_TIMEOUT,
+ destdir=DEF_DESTDIR, chroot=DEF_CHROOT, cont=False, recurse=False,
+ repos=DEF_REPOS, callback=None,
+ remote_basedir=DEF_REMOTE_BASEDIR, remote_tempdir=None):
+
+ self.destdir = destdir
+ self.chroot = chroot
+ self.repos = repos
+ self.cont = cont
+ self.recurse = recurse
+ self.callback = callback
+ self.remote_basedir = remote_basedir
+ self.remote_tempdir = remote_tempdir
+
+ if not self.callback:
+ self.callback = DefaultCallBack()
+
+ self.callback.log("Setting up builder: %s" % builder)
+ self.builder = Builder(builder, user, timeout, self)
+
+ if not self.chroot:
+ raise MockRemoteError("No chroot specified!")
+
+
+ self.failed = []
+ self.finished = []
+ self.pkg_list = []
+
+
+ def _get_pkg_destpath(self, pkg):
+ s_pkg = os.path.basename(pkg)
+ pdn = s_pkg.replace('.src.rpm', '')
+ resdir = '%s/%s/%s' % (self.destdir, self.chroot, pdn)
+ resdir = os.path.normpath(resdir)
+ return resdir
+
+ def build_pkgs(self, pkgs=None):
+
+ if not pkgs:
+ pkgs = self.pkg_list
+
+ built_pkgs = []
+ downloaded_pkgs = {}
+
+ try_again = True
+ to_be_built = pkgs
+ while try_again:
+ self.failed = []
+ just_built = []
+ for pkg in to_be_built:
+ if pkg in just_built:
+ self.callback.log("skipping duplicate pkg in this list: %s"
% pkg)
+ continue
+ else:
+ just_built.append(pkg)
+
+ p_path = self._get_pkg_destpath(pkg)
+
+ # check the destdir to see if these pkgs need to be built
+ if os.path.exists(p_path):
+ if os.path.exists(p_path + '/success'):
+ self.callback.log("Skipping already built pkg %s" %
os.path.basename(pkg))
+ continue
+ # if we're asking to build it and it is marked as fail - nuke
+ # the failure and try rebuilding it
+ elif os.path.exists(p_path + '/fail'):
+ os.unlink(p_path + '/fail')
+
+ # off to the builder object
+ # building
+ self.callback.start_build(pkg)
+ b_status, b_out, b_err = self.builder.build(pkg)
+ self.callback.end_build(pkg)
+
+ # downloading
+ self.callback.start_download(pkg)
+ # mockchain makes things with the chroot appended - so suck down
+ # that pkg subdir from w/i that location
+ d_ret, d_out, d_err = self.builder.download(pkg, self.destdir +
'/' + self.chroot)
+ if not d_ret:
+ msg = "Failure to download %s: %s" % (pkg, d_out + d_err)
+ if not self.cont:
+ raise MockRemoteError, msg
+ self.callback.error(msg)
+
+ self.callback.end_download(pkg)
+ # write out whatever came from the builder call into the destdir/chroot
+ if not os.path.exists(self.destdir + '/' + self.chroot):
+ os.makedirs(self.destdir + '/' + self.chroot)
+ r_log = open(self.destdir + '/' + self.chroot +
'/mockchain.log', 'a')
+ r_log.write('%s\n' % pkg)
+ r_log.write(b_out)
+ if b_err:
+ r_log.write('\nstderr\n')
+ r_log.write(b_err)
+ r_log.close()
+
+
+ # checking where to stick stuff
+ if not b_status:
+ if self.recurse:
+ self.failed.append(pkg)
+ self.callback.error("Error building %s, will try again"
% os.path.basename(pkg))
+ else:
+ msg = "Error building %s\nSee logs/resultsin %s" %
(os.path.basename(pkg), self.destdir)
+ if not self.cont:
+ raise MockRemoteError, msg
+ self.callback.error(msg)
+
+ else:
+ self.callback.log("Success building %s" %
os.path.basename(pkg))
+ built_pkgs.append(pkg)
+ # createrepo with the new pkgs
+ rc, out, err = createrepo(self.destdir)
+ if err.strip():
+ self.callback.error("Error making local repo: %s" %
self.destdir)
+ self.callback.error("%s" % err)
+ #FIXME - maybe clean up .repodata and .olddata here?
+
+ if self.failed:
+ if len(self.failed) != len(to_be_built):
+ to_be_built = self.failed
+ try_again = True
+ self.callback.log('Trying to rebuild %s failed pkgs' %
len(self.failed))
+ else:
+ self.callback.log("Tried twice - following pkgs could not be
successfully built:")
+ for pkg in self.failed:
+ msg = pkg
+ if pkg in downloaded_pkgs:
+ msg = downloaded_pkgs[pkg]
+ self.callback.log(msg)
+
+ try_again = False
+ else:
+ try_again = False
+
+
+
+def parse_args(args):
+
+ parser = SortedOptParser("mockremote -b hostname -u user -r chroot pkg pkg
pkg")
+ parser.add_option('-r', '--root', default=DEF_CHROOT,
dest='chroot',
+ help="chroot config name/base to use in the mock build")
+ parser.add_option('-c', '--continue', default=False,
action='store_true',
+ dest='cont',
+ help="if a pkg fails to build, continue to the next one")
+ parser.add_option('-a','--addrepo', default=DEF_REPOS,
action='append',
+ dest='repos',
+ help="add these repo baseurls to the chroot's yum config")
+ parser.add_option('--recurse', default=False, action='store_true',
+ help="if more than one pkg and it fails to build, try to build the rest
and come back to it")
+ parser.add_option('--log', default=None, dest='logfile',
+ help="log to the file named by this option, defaults to not
logging")
+ parser.add_option("-b", "--builder", dest='builder',
default=None,
+ help="builder to use")
+ parser.add_option("-u", dest="user", default=DEF_USER,
+ help="user to run as/connect as on builder systems")
+ parser.add_option("-t", "--timeout", dest="timeout",
type="int",
+ default=DEF_TIMEOUT, help="maximum time in seconds a build can take to
run")
+ parser.add_option("--destdir", dest="destdir",
default=DEF_DESTDIR,
+ help="place to download all the results/packages")
+ parser.add_option("--packages", dest="packages_file",
default=None,
+ help="file to read list of packages from")
+ parser.add_option("-q","--quiet", dest="quiet",
default=False, action="store_true",
+ help="output very little to the terminal")
+
+ opts,args = parser.parse_args(args)
+
+ if not opts.builder:
+ print "Must specify a system to build on"
+ sys.exit(1)
+
+ if opts.packages_file and os.path.exists(opts.packages_file):
+ args.extend(read_list_from_file(opts.packages_file))
+
+ #args = list(set(args)) # poor man's 'unique' - this also changes the
order
+ # :(
+
+ if not args:
+ print "Must specify at least one pkg to build"
+ sys.exit(1)
+
+ if not opts.chroot:
+ print "Must specify a mock chroot"
+ sys.exit(1)
+
+ for url in opts.repos:
+ if not (url.startswith('http') or url.startswith('file://')):
+ print "Only http[s] or file urls allowed for repos"
+ sys.exit(1)
+
+ return opts, args
+
+
+#FIXME
+# play with createrepo run at the end of each build
+# need to output the things that actually worked :)
+
+
+def main(args):
+
+ # parse args
+ opts,pkgs = parse_args(args)
+
+ if not os.path.exists(opts.destdir):
+ os.makedirs(opts.destdir)
+
+ try:
+ # setup our callback
+ callback = CliLogCallBack(logfn=opts.logfile, quiet=opts.quiet)
+ # our mockremote instance
+ mr = MockRemote(builder=opts.builder, user=opts.user,
+ timeout=opts.timeout, destdir=opts.destdir, chroot=opts.chroot,
+ cont=opts.cont, recurse=opts.recurse, repos=opts.repos,
+ callback=callback)
+
+ # FIXMES
+ # things to think about doing:
+ # output the remote tempdir when you start up
+ # output the number of pkgs
+ # output where you're writing things to
+ # consider option to sync over destdir to the remote system to use
+ # as a local repo for the build
+ #
+
+ if not opts.quiet:
+ print "Building %s pkgs" % len(pkgs)
+
+ mr.build_pkgs(pkgs)
+
+ if not opts.quiet:
+ print "Output written to: %s" % mr.destdir
+
+ except MockRemoteError, e:
+ print >>sys.stderr, "Error on build:"
+ print >>sys.stderr, str(e)
+ return
+
+
+if __name__ == '__main__':
+ try:
+ main(sys.argv[1:])
+ except Exception, e:
+
+ print "ERROR: %s - %s" % (str(type(e)), str(e))
+ traceback.print_exc()
+ sys.exit(1)
diff --git a/copr-be.conf.example b/copr-be.conf.example
new file mode 100644
index 0000000..ffeb235
--- /dev/null
+++ b/copr-be.conf.example
@@ -0,0 +1,16 @@
+[backend]
+results_baseurl=http://copr-be.cloud.fedoraproject.org/results
+frontend_url=http://copr-fe.cloud.fedoraproject.org/backend
+frontend_auth=backend_password_from_fe_config
+spawn_playbook=/srv/copr-work/provision/builderpb.yml
+terminate_playbook=/srv/copr-work/provision/terminatepb.yml
+jobsdir=/srv/copr-work/jobs
+destdir=/srv/copr-repo/results
+sleeptime=30
+num_workers=5
+logfile=/srv/copr-work/logs/copr.log
+worker_logdir=/srv/copr-work/logs/workers/
+
+
+[builder]
+timeout=3600
diff --git a/copr-be.py b/copr-be.py
new file mode 100644
index 0000000..6631ac4
--- /dev/null
+++ b/copr-be.py
@@ -0,0 +1,263 @@
+#!/usr/bin/python -tt
+
+
+import sys
+import os
+import glob
+import time
+import multiprocessing
+from backend.dispatcher import Worker
+from backend import errors
+from bunch import Bunch
+import ConfigParser
+import optparse
+import json
+import requests
+
+def _get_conf(cp, section, option, default):
+ """to make returning items from config parser less
irritating"""
+ if cp.has_section(section) and cp.has_option(section,option):
+ return cp.get(section, option)
+ return default
+
+
+class CoprBackend(object):
+ def __init__(self, config_file=None, ext_opts=None):
+ # read in config file
+ # put all the config items into a single self.opts bunch
+
+ if not config_file:
+ raise errors.CoprBackendError, "Must specify config_file"
+
+ self.config_file = config_file
+ self.ext_opts = ext_opts # to stow our cli options for read_conf()
+ self.opts = self.read_conf()
+
+ logdir = os.path.dirname(self.opts.logfile)
+ if not os.path.exists(logdir):
+ os.makedirs(logdir, mode=0750)
+
+ if not os.path.exists(self.opts.destdir):
+ os.makedirs(self.opts.destdir, mode=0755)
+
+ # setup a log file to write to
+ self.logfile = self.opts.logfile
+ self.log("Starting up new copr-be instance")
+
+
+ if not os.path.exists(self.opts.worker_logdir):
+ os.makedirs(self.opts.worker_logdir, mode=0750)
+
+ self.jobs = multiprocessing.Queue()
+ self.workers = []
+ self.added_jobs = []
+
+
+ def read_conf(self):
+ "read in config file - return Bunch of config data"
+ opts = Bunch()
+ cp = ConfigParser.ConfigParser()
+ try:
+ cp.read(self.config_file)
+ opts.results_baseurl = _get_conf(cp,'backend',
'results_baseurl', 'http://copr')
+ opts.frontend_url = _get_conf(cp, 'backend', 'frontend_url',
'http://coprs/rest/api')
+ opts.frontend_auth = _get_conf(cp,'backend', 'frontend_auth',
'PASSWORDHERE')
+ opts.spawn_playbook =
_get_conf(cp,'backend','spawn_playbook',
'/etc/copr/builder_playbook.yml')
+ opts.terminate_playbook =
_get_conf(cp,'backend','terminate_playbook',
'/etc/copr/terminate_playbook.yml')
+ opts.jobsdir = _get_conf(cp, 'backend', 'jobsdir', None)
+ opts.destdir = _get_conf(cp, 'backend', 'destdir', None)
+ opts.daemonize = _get_conf(cp, 'backend', 'daemonize', True)
+ opts.exit_on_worker = _get_conf(cp, 'backend',
'exit_on_worker', False)
+ opts.sleeptime = int(_get_conf(cp, 'backend', 'sleeptime',
10))
+ opts.num_workers = int(_get_conf(cp, 'backend',
'num_workers', 8))
+ opts.timeout = int(_get_conf(cp, 'builder', 'timeout',
1800))
+ opts.logfile = _get_conf(cp, 'backend', 'logfile',
'/var/log/copr/backend.log')
+ opts.worker_logdir = _get_conf(cp, 'backend',
'worker_logdir', '/var/log/copr/worker/')
+ # thoughts for later
+ # ssh key for connecting to builders?
+ # cloud key stuff?
+ #
+ except ConfigParser.Error, e:
+ raise errors.CoprBackendError, 'Error parsing config file: %s: %s' %
(self.config_file, e)
+
+
+ if not opts.jobsdir or not opts.destdir:
+ raise errors.CoprBackendError, "Incomplete Config - must specify jobsdir
and destdir in configuration"
+
+ if self.ext_opts:
+ for v in self.ext_opts:
+ setattr(opts, v, self.ext_opts.get(v))
+ return opts
+
+
+ def log(self, msg):
+ now = time.strftime('%F %T')
+ output = str(now) + ': ' + msg
+ if not self.opts.daemonize:
+ print output
+
+ try:
+ open(self.logfile, 'a').write(output + '\n')
+ except (IOError, OSError), e:
+ print >>sys.stderr, 'Could not write to logfile %s - %s' %
(self.logfile, str(e))
+
+
+ def fetch_jobs(self):
+ self.log('fetching jobs')
+ try:
+ r = requests.get('%s/waiting_builds/' % self.opts.frontend_url) #
auth stuff here? maybe/maybenot
+ except requests.RequestException, e:
+ self.log('Error retrieving jobs from %s: %s' %
(self.opts.frontend_url, e))
+ else:
+ r_json = json.loads(r.content) # using old requests on el6 :(
+ if 'builds' in r_json:
+ self.log('%s jobs returned' % len(r_json['builds']))
+ count = 0
+ for b in r_json['builds']:
+ if 'id' in b:
+ jobfile = self.opts.jobsdir + '/%s.json' %
b['id']
+ if not os.path.exists(jobfile) and b['id'] not in
self.added_jobs:
+ count += 1
+ open(jobfile, 'w').write(json.dumps(b))
+ self.log('Wrote job: %s' % b['id'])
+ self.log('New jobs: %s' % count)
+
+ def run(self):
+
+ abort = False
+ while not abort:
+ self.fetch_jobs()
+ for f in sorted(glob.glob(self.opts.jobsdir + '/*.json')):
+ n = os.path.basename(f).replace('.json', '')
+ if n not in self.added_jobs:
+ self.jobs.put(f)
+ self.added_jobs.append(n)
+ self.log('adding to work queue id %s' % n)
+
+ # re-read config into opts
+ self.opts = self.read_conf()
+
+ if self.jobs.qsize():
+ self.log("# jobs in queue: %s" % self.jobs.qsize())
+ # this handles starting/growing the number of workers
+ if len(self.workers) < self.opts.num_workers:
+ self.log("Spinning up more workers for jobs")
+ for i in range(self.opts.num_workers - len(self.workers)):
+ worker_num = len(self.workers) + 1
+ w = Worker(self.opts, self.jobs, worker_num)
+ self.workers.append(w)
+ w.start()
+ self.log("Finished starting worker processes")
+ # FIXME - prune out workers
+ #if len(self.workers) > self.opts.num_workers:
+ # killnum = len(self.workers) - self.opts.num_workers
+ # for w in self.workers[:killnum]:
+ # #insert a poison pill? Kill after something? I dunno.
+ # FIXME - if a worker bombs out - we need to check them
+ # and startup a new one if it happens
+ # check for dead workers and abort
+ for w in self.workers:
+ if not w.is_alive():
+ self.log('Worker %d died unexpectedly' % w.worker_num)
+ if self.opts.exit_on_worker:
+ raise errors.CoprBackendError, "Worker died unexpectedly,
exiting"
+ else:
+ self.workers.remove(w) # it is not working anymore
+ w.terminate() # kill it with a fire
+
+ time.sleep(self.opts.sleeptime)
+
+# lifted from certmaster
+def daemonize(pidfile=None):
+ """
+ Daemonize this process with the UNIX double-fork trick.
+ Writes the new PID to the provided file name if not None.
+ """
+
+ cur_umask = os.umask(077)
+ os.umask(cur_umask)
+
+ pid = os.fork()
+ if pid > 0:
+ sys.exit(0)
+ os.chdir("/")
+ os.setsid()
+ os.umask(cur_umask)
+ pid = os.fork()
+
+ os.close(0)
+ os.close(1)
+ os.close(2)
+
+ # The standard I/O file descriptors are redirected to /dev/null by default.
+ if (hasattr(os, "devnull")):
+ REDIRECT_TO = os.devnull
+ else:
+ REDIRECT_TO = "/dev/null"
+
+ # based on
http://code.activestate.com/recipes/278731/
+ os.open(REDIRECT_TO, os.O_RDWR) # standard input (0)
+
+ os.dup2(0, 1) # standard output (1)
+ os.dup2(0, 2) # standard error (2)
+
+
+
+ if pid > 0:
+ if pidfile is not None:
+ open(pidfile, "w").write(str(pid))
+ sys.exit(0)
+
+def parse_args(args):
+ parser = optparse.OptionParser('\ncopr-be [options]')
+ parser.add_option('-c', '--config',
default='/etc/copr-be.conf', dest='config_file',
+ help="config file to use for copr-be run")
+ parser.add_option('-d','--daemonize', default=False,
dest='daemonize',
+ action='store_true', help="daemonize or not")
+ parser.add_option('-p', '--pidfile', default='copr-be.pid',
dest='pidfile',
+ help="pid file to use for copr-be if daemonized")
+ parser.add_option('-x', '--exit', default=False,
dest='exit_on_worker',
+ action='store_true', help="exit on worker failure")
+
+ opts, args = parser.parse_args(args)
+ if not os.path.exists(opts.config_file):
+ print "No config file found at: %s" % opts.config_file
+ sys.exit(1)
+ opts.config_file = os.path.abspath(opts.config_file)
+
+ ret_opts = Bunch()
+ for o in ('daemonize', 'exit_on_worker', 'pidfile',
'config_file'):
+ setattr(ret_opts, o, getattr(opts, o))
+
+ return ret_opts
+
+
+
+def main(args):
+ opts = parse_args(args)
+
+ try:
+ cbe = CoprBackend(opts.config_file, ext_opts=opts)
+ if opts.daemonize:
+ daemonize(opts.pidfile)
+ cbe.run()
+ except Exception, e:
+ print 'Killing/Dying'
+ if 'cbe' in locals():
+ for w in cbe.workers:
+ w.terminate()
+ raise
+ except KeyboardInterrupt, e:
+ pass
+
+if __name__ == '__main__':
+ try:
+ main(sys.argv[1:])
+ except Exception, e:
+ print "ERROR: %s - %s" % (str(type(e)), str(e))
+ # FIXME - maybe check on daemonize and do this as a syslog.syslog() call?
+ sys.exit(1)
+ except KeyboardInterrupt, e:
+ print "\nUser cancelled, may need cleanup\n"
+ sys.exit(0)
+