Adam Litke has uploaded a new change for review.
Change subject: clusterlock: Add reference counting
......................................................................
clusterlock: Add reference counting
The clusterLock can now be acquired by multiple threads of execution
since it is used by SDM verbs now. We need reference counting to ensure
that one thread does not release the clusterLock while another thread
still needs it.
Change-Id: I846116ae16e88a51bdce20f97ddf22859dea3086
Signed-off-by: Adam Litke <alitke(a)redhat.com>
---
M vdsm/storage/clusterlock.py
1 file changed, 55 insertions(+), 45 deletions(-)
git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/78/40378/1
diff --git a/vdsm/storage/clusterlock.py b/vdsm/storage/clusterlock.py
index 86d47ad..96bba2b 100644
--- a/vdsm/storage/clusterlock.py
+++ b/vdsm/storage/clusterlock.py
@@ -211,6 +211,7 @@
def __init__(self, sdUUID, idsPath, leasesPath, *args):
self._lock = threading.Lock()
+ self._clusterLockUsers = 0
self._sdUUID = sdUUID
self._idsPath = idsPath
self._leasesPath = leasesPath
@@ -302,16 +303,22 @@
# ClusterLock. We could consider to remove it in the future but keeping it
# for logging purpose is desirable.
def acquireClusterLock(self, hostId):
- self.log.info("Acquiring cluster lock for domain %s (id: %s)",
- self._sdUUID, hostId)
- self._acquire(SDM_LEASE_NAME, self.getLockDisk())
- self.log.debug("Cluster lock for domain %s successfully acquired "
- "(id: %s)", self._sdUUID, hostId)
+ with nested(self._lock, SANLock._sanlock_lock):
+ self.log.info("Acquiring cluster lock for domain %s (id: %s)",
+ self._sdUUID, hostId)
+ if self._clusterLockUsers == 0:
+ self._acquire(SDM_LEASE_NAME, self.getLockDisk())
+ self._clusterLockUsers = self._clusterLockUsers + 1
+ self.log.debug("Cluster lock for domain %s successfully acquired "
+ "(id: %s, users: %i)", self._sdUUID, hostId,
+ self._clusterLockUsers)
def acquireResource(self, resource, lockDisk, shared=False):
- self.log.info("Acquiring resource lock for %s", resource)
- self._acquire(resource, lockDisk, shared)
- self.log.debug("Resource lock for %s successfully acquired", resource)
+ with nested(self._lock, SANLock._sanlock_lock):
+ self.log.info("Acquiring resource lock for %s", resource)
+ self._acquire(resource, lockDisk, shared)
+ self.log.debug("Resource lock for %s successfully acquired",
+ resource)
def inquireClusterLock(self):
resource, owners = self._inquire(SDM_LEASE_NAME, self.getLockDisk())
@@ -330,43 +337,47 @@
[owner.get("host_id") for owner in owners])
def releaseClusterLock(self):
- self.log.info("Releasing cluster lock for domain %s", self._sdUUID)
- self._release(SDM_LEASE_NAME, self.getLockDisk())
- self.log.debug("Cluster lock for domain %s successfully released",
- self._sdUUID)
+ with self._lock:
+ self.log.info("Releasing cluster lock for domain %s",
self._sdUUID)
+ if self._clusterLockUsers == 1:
+ self._release(SDM_LEASE_NAME, self.getLockDisk())
+ self._clusterLockUsers = self._clusterLockUsers - 1
+ self.log.debug("Cluster lock for domain %s successfully released "
+ "(users: %i)", self._sdUUID,
self._clusterLockUsers)
def releaseResource(self, resource, lockDisk):
- self.log.info("Releasing resource lock for %s", resource)
- self._release(resource, lockDisk)
- self.log.debug("Resource lock for %s successfully released", resource)
+ with self._lock:
+ self.log.info("Releasing resource lock for %s", resource)
+ self._release(resource, lockDisk)
+ self.log.debug("Resource lock for %s successfully released",
+ resource)
def _acquire(self, resource, lockDisk, shared=False):
- with nested(self._lock, SANLock._sanlock_lock):
- self.log.info("Acquiring resource %s, shared=%s", resource,
shared)
+ self.log.info("Acquiring resource %s, shared=%s", resource, shared)
- while True:
- if SANLock._sanlock_fd is None:
- try:
- SANLock._sanlock_fd = sanlock.register()
- except sanlock.SanlockException as e:
- raise se.AcquireLockFailure(
- self._sdUUID, e.errno,
- "Cannot register to sanlock", str(e))
-
+ while True:
+ if SANLock._sanlock_fd is None:
try:
- sanlock.acquire(self._sdUUID, resource, lockDisk,
- slkfd=SANLock._sanlock_fd, shared=shared)
+ SANLock._sanlock_fd = sanlock.register()
except sanlock.SanlockException as e:
- if e.errno != os.errno.EPIPE:
- raise se.AcquireLockFailure(
- self._sdUUID, e.errno,
- "Cannot acquire sanlock resource", str(e))
- SANLock._sanlock_fd = None
- continue
+ raise se.AcquireLockFailure(
+ self._sdUUID, e.errno, "Cannot register to sanlock",
+ str(e))
- break
+ try:
+ sanlock.acquire(self._sdUUID, resource, lockDisk,
+ slkfd=SANLock._sanlock_fd, shared=shared)
+ except sanlock.SanlockException as e:
+ if e.errno != os.errno.EPIPE:
+ raise se.AcquireLockFailure(
+ self._sdUUID, e.errno,
+ "Cannot acquire sanlock resource", str(e))
+ SANLock._sanlock_fd = None
+ continue
- self.log.debug("Resource %s successfully acquired", resource)
+ break
+
+ self.log.debug("Resource %s successfully acquired", resource)
def _inquire(self, resource, lockDisk):
res = sanlock.read_resource(*lockDisk[0])
@@ -374,17 +385,16 @@
return res, owners
def _release(self, resource, lockDisk):
- with self._lock:
- self.log.info("Releasing resource %s", resource)
+ self.log.info("Releasing resource %s", resource)
- try:
- sanlock.release(self._sdUUID, resource, lockDisk,
- slkfd=SANLock._sanlock_fd)
- except sanlock.SanlockException as e:
- raise se.ReleaseLockFailure(resource, e)
+ try:
+ sanlock.release(self._sdUUID, resource, lockDisk,
+ slkfd=SANLock._sanlock_fd)
+ except sanlock.SanlockException as e:
+ raise se.ReleaseLockFailure(resource, e)
- self._sanlockfd = None
- self.log.debug("Resource %s successfully released", resource)
+ self._sanlockfd = None
+ self.log.debug("Resource %s successfully released", resource)
class LocalLock(object):
--
To view, visit
https://gerrit.ovirt.org/40378
To unsubscribe, visit
https://gerrit.ovirt.org/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I846116ae16e88a51bdce20f97ddf22859dea3086
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Adam Litke <alitke(a)redhat.com>