From: Ondrej Lichtner <olichtne(a)redhat.com>
This patch implements parallel pool machine availability checking.
Availability is checked by creating a nonblocking TCP socket and
connecting to the slave. This way we can connect to all the slaves at
once instead of one-by-one.
This patch also introduces proper support for multiple pool directories.
Until now all slaves were put in a single pool, this could cause name
conflicts and network label conflicts, leading to confusion for users.
From now we clearly separate individual directories and machines from
two different pool directories can't be present in a single match.
Signed-off-by: Ondrej Lichtner <olichtne(a)redhat.com>
---
lnst/Controller/SlavePool.py | 152 ++++++++++++++++++++++++++++++-------------
1 file changed, 105 insertions(+), 47 deletions(-)
diff --git a/lnst/Controller/SlavePool.py b/lnst/Controller/SlavePool.py
index 9c54629..82737a4 100644
--- a/lnst/Controller/SlavePool.py
+++ b/lnst/Controller/SlavePool.py
@@ -17,6 +17,8 @@ import logging
import os
import re
import copy
+import socket
+import select
from xml.dom import minidom
from lnst.Common.Config import lnst_config
from lnst.Common.NetUtils import normalize_hwaddr
@@ -35,6 +37,7 @@ class SlavePool:
"""
def __init__(self, pool_dirs, pool_checks=True):
self._map = {}
+ self._pool_dirs = {}
self._pool = {}
self._machine_matches = []
@@ -50,33 +53,73 @@ class SlavePool:
logging.info("Checking machine pool availability.")
for pool_dir in pool_dirs:
+ self._pool_dirs[pool_dir] = {}
self.add_dir(pool_dir)
+ if len(self._pool_dirs[pool_dir]) == 0:
+ del self._pool_dirs[pool_dir]
- self._mapper.set_pool(self._pool)
+ self._mapper.set_pool_dirs(self._pool_dirs)
- def add_dir(self, pool_dir):
- logging.info("Processing pool dir '%s'" % pool_dir)
- dentries = os.listdir(pool_dir)
+ def add_dir(self, dir_path):
+ logging.info("Processing pool dir '%s'" % dir_path)
- res = []
+ pool_dir = self._pool_dirs[dir_path]
+
+ dentries = os.listdir(dir_path)
for dirent in dentries:
- m_info = self.add_file("%s/%s" % (pool_dir, dirent))
- if m_info != None:
- res.append(m_info)
+ m_id, m = self.add_file(dir_path, dirent)
+ if m_id != None and m != None:
+ pool_dir[m_id] = m
- if len(res) == 0:
+ if len(pool_dir) == 0:
logging.warn("No machines found in this directory")
max_len = 0
- for m_id, _ in res:
+ for m_id in pool_dir.keys():
if len(m_id) > max_len:
max_len = len(m_id)
- for m_id, available in res:
- if available:
- machine_spec = self._pool[m_id]
- if 'libvirt_domain' in machine_spec['params']:
+
+ if self._pool_checks:
+ check_sockets = {}
+ for m_id, m in pool_dir.iteritems():
+ hostname = m["params"]["hostname"]
+ if "rpc_port" in m["params"]:
+ port = m["params"]["rpc_port"]
+ else:
+ port = lnst_config.get_option('environment',
'rpcport')
+
+ logging.debug("Querying machine '%s': %s:%s" %\
+ (m_id, hostname, port))
+
+ s = socket.socket()
+ s.settimeout(0)
+ try:
+ s.connect((hostname, port))
+ except:
+ pass
+ check_sockets[s] = m_id
+
+ while len(check_sockets) > 0:
+ rl, wl, el = select.select([], check_sockets.keys(), [])
+ for s in wl:
+ err = s.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
+ m_id = check_sockets[s]
+ if err == 0:
+ pool_dir[m_id]["available"] = True
+ del check_sockets[s]
+ else:
+ pool_dir[m_id]["available"] = False
+ del check_sockets[s]
+ else:
+ for m_id in pool_dir.keys():
+ pool_dir[m_id]["available"] = True
+
+ for m_id in list(pool_dir.keys()):
+ m = pool_dir[m_id]
+ if m["available"]:
+ if 'libvirt_domain' in m['params']:
libvirt_msg = " libvirt_domain: %s" %\
-
machine_spec['params']['libvirt_domain']
+ m['params']['libvirt_domain']
else:
libvirt_msg = ""
msg = "%s%s [%s] %s" % (m_id, (max_len - len(m_id)) * "
",
@@ -85,10 +128,13 @@ class SlavePool:
else:
msg = "%s%s [%s]" % (m_id, (max_len - len(m_id)) * "
",
decorate_with_preset("DOWN",
"fail"))
+ del pool_dir[m_id]
logging.info(msg)
- def add_file(self, filepath):
+ def add_file(self, dir_path, dirent):
+ filepath = dir_path + "/" + dirent
+ pool_dir = self._pool_dirs[dir_path]
if os.path.isfile(filepath) and re.search("\.xml$", filepath, re.I):
dirname, basename = os.path.split(filepath)
m_id = re.sub("\.[xX][mM][lL]$", "", basename)
@@ -97,9 +143,16 @@ class SlavePool:
xml_data = parser.parse()
machine_spec = self._process_machine_xml_data(m_id, xml_data)
+ if 'libvirt_domain' in machine_spec['params'] and \
+ not self._allow_virt:
+ logging.debug("libvirtd not running or allow_virtual "\
+ "disabled. Removing libvirt_domain from "\
+ "machine '%s'" % m_id)
+ del machine_spec['params']['libvirt_domain']
+
# Check if there isn't any machine with the same
# hostname or libvirt_domain already in the pool
- for pm_id, m in self._pool.iteritems():
+ for pm_id, m in pool_dir.iteritems():
pm = m["params"]
rm = machine_spec["params"]
if pm["hostname"] == rm["hostname"]:
@@ -113,32 +166,8 @@ class SlavePool:
"your pool ('%s' and '%s')." % (m_id,
pm_id)
raise SlaveMachineError(msg)
- if self._pool_checks:
- available = False
-
- hostname = machine_spec["params"]["hostname"]
- if "rpc_port" in machine_spec["params"]:
- port = machine_spec["params"]["rpc_port"]
- else:
- port = lnst_config.get_option('environment',
'rpcport')
-
- logging.debug("Querying machine '%s': %s:%s" %\
- (m_id, hostname, port))
- if test_tcp_connection(hostname, port):
- available = True
-
- if 'libvirt_domain' in machine_spec['params'] and \
- not self._allow_virt:
- logging.debug("libvirtd not running or allow_virtual "\
- "disabled. Removing libvirt_domain from
"\
- "machine '%s'" % m_id)
- del machine_spec['params']['libvirt_domain']
- else:
- available = True
-
- if available:
- self._pool[m_id] = machine_spec
- return (m_id, available)
+ return (m_id, machine_spec)
+ return (None, None)
def _process_machine_xml_data(self, m_id, machine_xml_data):
machine_spec = {"interfaces": {}, "params":{}}
@@ -239,7 +268,10 @@ class SlavePool:
self._map = {}
if self._map == {}:
+ self._pool = {}
return False
+ else:
+ self._pool = self._pool_dirs[self._map["pool_dir"]]
if self._map["virtual"]:
mreqs = self._mreqs
@@ -335,7 +367,10 @@ class MapperError(Exception):
class SetupMapper(object):
def __init__(self):
+ self._pool_dirs = {}
+ self._pool_dir_stack = []
self._pool = {}
+ self._pool_dir = None
self._mreqs = {}
self._unmatched_req_machines = []
self._matched_pool_machines = []
@@ -346,8 +381,8 @@ class SetupMapper(object):
def set_requirements(self, mreqs):
self._mreqs = mreqs
- def set_pool(self, pool):
- self._pool = pool
+ def set_pool_dirs(self, pool_dirs):
+ self._pool_dirs = pool_dirs
def set_virtual(self, virt_value):
self._virtual_matching = virt_value
@@ -372,6 +407,12 @@ class SetupMapper(object):
self._machine_stack = []
self._unmatched_req_machines = self._mreqs.keys()
+ self._pool_dir_stack = list(self._pool_dirs.keys())
+ if len(self._pool_dir_stack) > 0:
+ self._pool_dir = self._pool_dir_stack.pop()
+ self._pool = self._pool_dirs[self._pool_dir]
+ logging.info("Using pool dir: %s" % self._pool_dir)
+
self._unmatched_pool_machines = []
for p_id, p_machine in self._pool.iteritems():
if self._virtual_matching:
@@ -380,7 +421,7 @@ class SetupMapper(object):
else:
self._unmatched_pool_machines.append(p_id)
- if self._pool is not None or self._mreqs is not None:
+ if len(self._pool) > 0 and len(self._mreqs) > 0:
self._push_machine_stack()
def match(self):
@@ -427,6 +468,22 @@ class SetupMapper(object):
continue
else:
self._pop_machine_stack()
+ if len(self._machine_stack) == 0 and\
+ len(self._pool_dir_stack) > 0:
+ self._pool_dir = self._pool_dir_stack.pop()
+ self._pool = self._pool_dirs[self._pool_dir]
+ logging.info("Using pool dir: %s" % self._pool_dir)
+
+ self._unmatched_pool_machines = []
+ for p_id, p_machine in self._pool.iteritems():
+ if self._virtual_matching:
+ if "libvirt_domain" in
p_machine["params"]:
+ self._unmatched_pool_machines.append(p_id)
+ else:
+ self._unmatched_pool_machines.append(p_id)
+
+ if len(self._pool) > 0 and len(self._mreqs) > 0:
+ self._push_machine_stack()
continue
return False
@@ -544,7 +601,8 @@ class SetupMapper(object):
return True
def get_mapping(self):
- mapping = {"machines": {}, "networks": {},
"virtual": False}
+ mapping = {"machines": {}, "networks": {},
"virtual": False,
+ "pool_dir": self._pool_dir}
for req_label, label_map in self._net_label_mapping.iteritems():
mapping["networks"][req_label] = label_map[0]
--
1.9.3