Repository :
http://git.fedorahosted.org/cgit/copr.git
On branch : master
---------------------------------------------------------------
commit f814691f3895a7fa24818d9e020b7479a350b800
Author: Ralph Bean <rbean(a)redhat.com>
Date: Tue Mar 5 10:21:09 2013 -0500
First pass at adding fedmsg hooks.
---------------------------------------------------------------
backend/dispatcher.py | 59 ++++++++++++++++++++++++++++++++++++++++++++-----
copr-be.conf.example | 5 ++++
copr-be.py | 2 +-
copr.spec | 1 +
4 files changed, 60 insertions(+), 7 deletions(-)
diff --git a/backend/dispatcher.py b/backend/dispatcher.py
index 8ad5b70..4817eeb 100644
--- a/backend/dispatcher.py
+++ b/backend/dispatcher.py
@@ -13,6 +13,11 @@ import ansible.errors
from ansible import callbacks
import requests
+try:
+ import fedmsg
+except ImportError:
+ pass # fedmsg is optional
+
@@ -95,18 +100,40 @@ class Worker(multiprocessing.Process):
if ip:
self.callback.log('creating worker: %s' % ip)
- self.event('creating worker: %s' % ip)
+ self.event('worker.create', 'creating worker: {ip}',
dict(ip=ip))
else:
self.callback.log('creating worker: dynamic ip')
- self.event('creating worker: dynamic ip')
+ self.event('worker.create', 'creating worker: dynamic ip')
+
+ def event(self, topic, template, content=None):
+ """ Multi-purpose logging method.
+
+ Logs messages to two different destinations:
+ - The internal "events" queue for communicating back to the
+ dispatcher.
+ - The fedmsg bus. Messages are posted asynchronously to a
+ zmq.PUB socket.
+
+ """
+
+ content = content or {}
+ what = template.format(**content)
- def event(self, what):
if self.ip:
who = 'worker-%s-%s' % (self.worker_num, self.ip)
else:
who = 'worker-%s' % (self.worker_num)
self.events.put({'when':time.time(), 'who':who,
'what':what})
+ try:
+ content['who'] = who
+ content['what'] = what
+ if self.opts.fedmsg_enabled:
+ fedmsg.publish(modname="copr", topic=topic, msg=content)
+ except Exception, e:
+ # XXX - Maybe log traceback as well with traceback.format_exc()
+ self.callback.log('failed to publish message: %s' % e)
+ pass # But continue on happily.
def spawn_instance(self):
"""call the spawn playbook to startup/provision a building
instance"""
@@ -261,14 +288,30 @@ class Worker(multiprocessing.Process):
self.callback.log('failure to setup instance: %s' % e)
raise
+ # This assumes there are certs and a fedmsg config on disk
+ try:
+ if self.opts.fedmsg_enabled:
+ fedmsg.init(name="relay_inbound",
cert_prefix="copr", active=True)
+ except Exception, e:
+ self.callback.log('failed to initialize fedmsg: %s' % e)
+ pass # But continue on happily.
+
status = 1
job.started_on = time.time()
self.mark_started(job)
- self.event('build start: user:%s copr:%s build:%s ip:%s pid:%s' %
(job.user_name, job.copr_name, job.build_id, ip, self.pid))
+ template = 'build start: user:{user} copr:{copr} build:{build} ip:{ip}
pid:{pid}'
+ content = dict(user=job.user_name, copr=job.copr_name,
+ build=job.build_id, ip=ip, pid=self.pid)
+ self.event('build.start', template, content)
for chroot in job.chroots:
- self.event('chroot start: chroot:%s user:%s copr:%s build:%s ip:%s
pid:%s' % (chroot, job.user_name, job.copr_name, job.build_id, ip, self.pid))
+ template = 'chroot start: chroot:{chroot} user:{user} copr:{copr}
build:{build} ip:{ip} pid:{pid}'
+ content = dict(chroot=chroot, user=job.user_name,
+ copr=job.copr_name, build=job.build_id,
+ ip=ip, pid=self.pid)
+ self.event('chroot.start', template, content)
+
chroot_destdir = job.destdir + '/' + chroot
# setup our target dir locally
if not os.path.exists(chroot_destdir):
@@ -314,7 +357,11 @@ class Worker(multiprocessing.Process):
job.status = status
self.return_results(job)
self.callback.log('worker finished build: %s' % ip)
- self.event('build end: user:%s copr:%s build:%s ip:%s pid:%s
status:%s' % (job.user_name, job.copr_name, job.build_id, ip, self.pid, job.status))
+ template = 'build end: user:{user} copr:{copr} build:{build} ip:{ip}
pid:{pid} status:{status}'
+ content = dict(user=job.user_name, copr=job.copr_name,
+ build=job.build_id, ip=ip, pid=self.pid,
+ status=job.status)
+ self.event('build.end', template, content)
# clean up the instance
if self.create:
self.terminate_instance(ip)
diff --git a/copr-be.conf.example b/copr-be.conf.example
index f109cae..3d8cdf1 100644
--- a/copr-be.conf.example
+++ b/copr-be.conf.example
@@ -51,6 +51,11 @@ worker_logdir=/var/log/copr/workers/
# default is true
#daemonize=true
+# publish fedmsg notifications from workers if true
+# default is false
+#fedmsg_enabled=false
+
+
[builder]
# default is 1800
timeout=3600
diff --git a/copr-be.py b/copr-be.py
index 1bac8ff..c836592 100755
--- a/copr-be.py
+++ b/copr-be.py
@@ -166,6 +166,7 @@ class CoprBackend(object):
opts.destdir = _get_conf(cp, 'backend', 'destdir', None)
opts.daemonize = _get_conf(cp, 'backend', 'daemonize', True)
opts.exit_on_worker = _get_conf(cp, 'backend',
'exit_on_worker', False)
+ opts.fedmsg_enabled = _get_conf(cp, 'backend',
'fedmsg_enabled', False)
opts.sleeptime = int(_get_conf(cp, 'backend', 'sleeptime',
10))
opts.num_workers = int(_get_conf(cp, 'backend',
'num_workers', 8))
opts.timeout = int(_get_conf(cp, 'builder', 'timeout',
1800))
@@ -319,4 +320,3 @@ if __name__ == '__main__':
except KeyboardInterrupt, e:
print "\nUser cancelled, may need cleanup\n"
sys.exit(0)
-
diff --git a/copr.spec b/copr.spec
index 4a90921..4e90fa3 100644
--- a/copr.spec
+++ b/copr.spec
@@ -55,6 +55,7 @@ Requires: createrepo
Requires: python-bunch
Requires: python-requests
Requires: logrotate
+Requires: fedmsg
%description backend
COPR is lightweight build system. It allows you to create new project in WebUI,