From: Jiri Pirko <jiri(a)mellanox.com>
So far, each resource is checked and loaded one by one. That results
in many rpc calls. On my 20ms line, this makes ~4secs for one slave.
Bulk the checks and loads together which allows to be done in ~1sec.
Signed-off-by: Jiri Pirko <jiri(a)mellanox.com>
---
rfc->v1:
- avoid duplicates
- have rpc methods accept either list or a single entry
- store has_resource return value and sync accordingly
- compute res hash in get module info
---
lnst/Controller/Machine.py | 78 +++++++++++++++++++++++++++++-----------------
lnst/Slave/NetTestSlave.py | 33 +++++++++++++++-----
2 files changed, 75 insertions(+), 36 deletions(-)
diff --git a/lnst/Controller/Machine.py b/lnst/Controller/Machine.py
index 38a3646..e8e74d5 100644
--- a/lnst/Controller/Machine.py
+++ b/lnst/Controller/Machine.py
@@ -257,18 +257,17 @@ class Machine(object):
for cls_name, cls in device_classes:
classes.extend(reversed(self._get_base_classes(cls)))
+ minfos = []
for cls in classes:
if cls is object:
continue
- module_name = cls.__module__
- module = sys.modules[module_name]
- filename = module.__file__
-
- if filename[-3:] == "pyc":
- filename = filename[:-1]
+ name = cls.__module__
+ if any(minfo['name'] == name for minfo in minfos):
+ continue
+ minfos.append(self._get_module_info(name))
- res_hash = self.sync_resource(module_name, filename)
- self.rpc_call("load_cached_module", module_name, res_hash)
+ self.sync_resources(minfos)
+ self.rpc_call("load_cached_module", minfos)
for cls_name, cls in device_classes:
module_name = cls.__module__
@@ -348,15 +347,9 @@ class Machine(object):
for cls in reversed(classes):
if cls is object or cls is BaseTestModule:
continue
- m_name = cls.__module__
- m = sys.modules[m_name]
- filename = m.__file__
- if filename[-3:] == "pyc":
- filename = filename[:-1]
-
- res_hash = self.sync_resource(m_name, filename)
-
- self.rpc_call("load_cached_module", m_name, res_hash)
+ minfo = self._get_module_info(cls.__module__)
+ self.sync_resource(minfo)
+ self.rpc_call("load_cached_module", minfo)
logging.info("Host %s executing job %d: %s" %
(self._id, job.id, str(job)))
@@ -518,19 +511,48 @@ class Machine(object):
local_file.close()
self.rpc_call("finish_copy_from", remote_path)
- def sync_resource(self, res_name, file_path):
- digest = sha256sum(file_path)
+ def _get_module_info(self, name):
+ module = sys.modules[name]
+ filepath = module.__file__
+
+ if filepath[-3:] == "pyc":
+ filepath = filepath[:-1]
- if not self.rpc_call("has_resource", digest):
- msg = "Transfering %s to machine %s as '%s'" % (file_path,
- self.get_id(),
- res_name)
- logging.debug(msg)
+ res_hash = sha256sum(filepath)
- remote_path = self.copy_file_to_machine(file_path)
- self.rpc_call("add_resource_to_cache",
- "file", remote_path, res_name)
- return digest
+ return {"name": name, "filepath": filepath,
"res_hash": res_hash}
+
+ def _sync_resource(self, minfo):
+ if minfo["has_resource"]:
+ return
+ msg = "Transfering %s to machine %s as '%s'" %
(minfo["filepath"],
+ self.get_id(),
+ minfo["name"])
+ logging.debug(msg)
+ remote_path = self.copy_file_to_machine(minfo["filepath"])
+ self.rpc_call("add_resource_to_cache", "file",
+ remote_path, minfo["name"])
+
+ def sync_resource(self, minfo):
+ minfo["has_resource"] = self.rpc_call("has_resource",
+ minfo["res_hash"])
+ self._sync_resource(minfo)
+
+ def sync_resources(self, minfos):
+ res_hashes = list(map(lambda minfo: minfo["res_hash"], minfos))
+
+ # To avoid asking per-resource, ask for all resources in one call.
+ has_resources = self.rpc_call("has_resource", res_hashes)
+
+ from pprint import pprint
+ pprint(has_resources)
+ minfos = list(map(lambda minfo, has_resource:
+ dict(minfo.items() + {"has_resource":
has_resource}.items()),
+ minfos, has_resources))
+ pprint(minfos)
+
+ for minfo in minfos:
+ self._sync_resource(minfo)
# def enable_nm(self):
# return self._rpc_call("enable_nm")
diff --git a/lnst/Slave/NetTestSlave.py b/lnst/Slave/NetTestSlave.py
index 908e85c..02a9d18 100644
--- a/lnst/Slave/NetTestSlave.py
+++ b/lnst/Slave/NetTestSlave.py
@@ -148,13 +148,21 @@ class SlaveMethods:
setattr(Devices, cls_name, cls)
- def load_cached_module(self, module_name, res_hash):
- self._cache.renew_entry(res_hash)
- if module_name in self._dynamic_modules:
+ def _load_cached_module(self, minfo):
+ self._cache.renew_entry(minfo["res_hash"])
+ if minfo["name"] in self._dynamic_modules:
return
- module_path = self._cache.get_path(res_hash)
- module = imp.load_source(module_name, module_path)
- self._dynamic_modules[module_name] = module
+ module_path = self._cache.get_path(minfo["res_hash"])
+ module = imp.load_source(minfo["name"], module_path)
+ self._dynamic_modules[minfo["name"]] = module
+
+ def load_cached_module(self, minfos):
+ if isinstance(minfos, list):
+ for minfo in minfos:
+ self._load_cached_module(minfo)
+ elif isinstance(minfos, dict):
+ minfo = minfos
+ self._load_cached_module(minfo)
def init_if_manager(self):
self._if_manager = InterfaceManager(self._server_handler)
@@ -401,12 +409,21 @@ class SlaveMethods:
self._remove_capture_files()
return True
- def has_resource(self, res_hash):
+ def _has_resource(self, res_hash):
if self._cache.query(res_hash):
return True
-
return False
+ def has_resource(self, res_hashes):
+ if isinstance(res_hashes, list):
+ retvals = []
+ for res_hash in res_hashes:
+ retvals.append(self._has_resource(res_hash))
+ return retvals
+ elif isinstance(res_hashes, str):
+ res_hash = res_hashes
+ return self._has_resource(res_hash)
+
def add_resource_to_cache(self, res_type, local_path, name):
if res_type == "file":
self._cache.add_file_entry(local_path, name)
--
2.9.5