modules/core/plugin-container/src/main/java/org/rhq/core/pc/inventory/AvailabilityProxy.java | 72 +++++++--- modules/core/plugin-container/src/test/java/org/rhq/core/pc/inventory/AvailabilityProxyConcurrencyTest.java | 17 +- 2 files changed, 65 insertions(+), 24 deletions(-)
New commits: commit 7fe7f7efb4d5754d8a0ebdccaa4604a450f3e0db Author: John Mazzitelli mazz@redhat.com Date: Wed Nov 27 12:44:18 2013 -0500
get avail proxy test to pass. we no longer assume that we need to abort if the first time we check if the future is done and it is not. instead, we check the time when the future was submitted. if its been under a certain time (1m by default), then we just return the last avail known to have been returned. otherwise, we timeout.
diff --git a/modules/core/plugin-container/src/main/java/org/rhq/core/pc/inventory/AvailabilityProxy.java b/modules/core/plugin-container/src/main/java/org/rhq/core/pc/inventory/AvailabilityProxy.java index 4e14581..1bf380f 100644 --- a/modules/core/plugin-container/src/main/java/org/rhq/core/pc/inventory/AvailabilityProxy.java +++ b/modules/core/plugin-container/src/main/java/org/rhq/core/pc/inventory/AvailabilityProxy.java @@ -52,7 +52,7 @@ public class AvailabilityProxy implements AvailabilityFacet, Callable<Availabili private static final Log LOG = LogFactory.getLog(AvailabilityProxy.class); // purposefully static, don't create one per proxy
/** - * How long to wait for a resource to return their availability immediately (in ms). + * How long to wait for a resource to return their availability *immediately* (in ms). * If a resource takes longer than this, then the number of timeouts is incremented, and then * the container will just assume availability will be returned asynchronously for this resource. */ @@ -66,6 +66,15 @@ public class AvailabilityProxy implements AvailabilityFacet, Callable<Availabili */ private static final int AVAIL_SYNC_TIMEOUT_LIMIT;
+ /** + * How long to wait for an *async* future to return a resource availability (in ms). + * If a resource takes longer than this during an async call (via a thread from the executor thread pool) + * and another request comes in for the availability, then that async call will be canceled and a new + * one will be resubmitted, restarting the clock. This just helps clean up any hung threads waiting + * for an availability that is just taking too much time to complete. + */ + private static final int AVAIL_ASYNC_TIMEOUT; + static { int syncAvailTimeout; try { @@ -86,6 +95,16 @@ public class AvailabilityProxy implements AvailabilityFacet, Callable<Availabili syncAvailTimeoutLimit = 5; } AVAIL_SYNC_TIMEOUT_LIMIT = syncAvailTimeoutLimit; + + int asyncAvailTimeout; + try { + // unlikely to be changed but back-door configurable + asyncAvailTimeout = Integer.parseInt(System.getProperty( + "rhq.agent.plugins.availability-scan.async-timeout", "60000")); + } catch (Throwable t) { + asyncAvailTimeout = 60000; + } + AVAIL_ASYNC_TIMEOUT = asyncAvailTimeout; }
private final AvailabilityFacet resourceComponent; @@ -96,7 +115,9 @@ public class AvailabilityProxy implements AvailabilityFacet, Callable<Availabili
private volatile Thread current;
- private AvailabilityType last = UNKNOWN; + private long lastSubmitTime = 0; + + private AvailabilityType lastAvail = UNKNOWN;
/** * Number of consecutive avail sync timeouts for the resource. This value is reset if availability is @@ -149,25 +170,33 @@ public class AvailabilityProxy implements AvailabilityFacet, Callable<Availabili avail = availabilityFuture.get();
} else { - // if the future is not done then it means this thread has been checking avail since the - // last scheduled avail check. Not good. Throw a detailed exception to the avail checker.. - Throwable t = new Throwable(); - if (current != null) { - t.setStackTrace(current.getStackTrace()); + // We are still waiting on the previously submitted async avail check - let's just return + // the last one we got. Note that if the future is not done after a large amount of time, + // then it means this thread could somehow be hung or otherwise stuck and not returning. Not good. + // In this case, throw a detailed exception to the avail checker. + if ((System.currentTimeMillis() - lastSubmitTime) > AVAIL_ASYNC_TIMEOUT) { + Throwable t = new Throwable(); + if (current != null) { + t.setStackTrace(current.getStackTrace()); + } + String msg = "Availability check running too long, canceled for [" + resourceComponent + + "]; Stack trace includes the timed out thread's stack trace."; + availabilityFuture.cancel(true); + + // try again, maybe the situation will resolve in time for the next check + availabilityFuture = executor.submit(this); + lastSubmitTime = System.currentTimeMillis(); + + throw new TimeoutException(msg, t); + } else { + return lastAvail; } - String msg = "Availability check running too long, canceled for " + resourceComponent - + "; Stack trace includes the timed out thread's stack trace."; - availabilityFuture.cancel(true); - - // try again, maybe the situation will resolve in time for the next check - availabilityFuture = executor.submit(this); - - throw new TimeoutException(msg, t); } }
// request a thread to do an avail check availabilityFuture = executor.submit(this); + lastSubmitTime = System.currentTimeMillis();
// if we have exceeded the timeout too many times in a row assume that this is a slow // resource and stop performing synchronous checks, which would likely fail to return fast enough anyway. @@ -213,7 +242,7 @@ public class AvailabilityProxy implements AvailabilityFacet, Callable<Availabili break; default: if (LOG.isDebugEnabled()) { - LOG.debug("ResourceComponent " + resourceComponent + " getAvailability() returned " + type + LOG.debug("ResourceComponent [" + resourceComponent + "] getAvailability() returned " + type + ". This is invalid and is being replaced with DOWN."); } result = DOWN; @@ -222,18 +251,18 @@ public class AvailabilityProxy implements AvailabilityFacet, Callable<Availabili // whenever changing to UP we reset the timeout counter. This is because DOWN resources often respond // slowly to getAvailability() calls (for example, waiting for a connection attempt to time out). When a // resource comes up we should give it a chance to respond quickly and provide live avail. - if (result != last) { + if (result != lastAvail) { if (result == UP) { if (availAsyncConsecutiveTimeouts >= AVAIL_SYNC_TIMEOUT_LIMIT) { if (LOG.isDebugEnabled()) { LOG.debug("Enabling synchronous availability collection for [" + resourceComponent - + "]; Availability has just changed from [" + last + "] to UP."); + + "]; Availability has just changed from [" + lastAvail + "] to UP."); } } availAsyncConsecutiveTimeouts = 0;
} - last = result; + lastAvail = result; }
return result; @@ -244,8 +273,9 @@ public class AvailabilityProxy implements AvailabilityFacet, Callable<Availabili */ @Override public String toString() { - return "AvailabilityProxy [resourceComponent=" + resourceComponent + ", executor=" + executor - + ", availabilityFuture=" + availabilityFuture + ", current=" + current + ", timeouts=" + return "AvailabilityProxy [resourceComponent=" + resourceComponent + ", lastAvail=" + lastAvail + + ", lastSubmitTime=" + new java.util.Date(lastSubmitTime) + ", executor=" + + executor + ", availabilityFuture=" + availabilityFuture + ", current=" + current + ", timeouts=" + availAsyncConsecutiveTimeouts + "]"; } } diff --git a/modules/core/plugin-container/src/test/java/org/rhq/core/pc/inventory/AvailabilityProxyConcurrencyTest.java b/modules/core/plugin-container/src/test/java/org/rhq/core/pc/inventory/AvailabilityProxyConcurrencyTest.java index f2880dd..06753ca 100644 --- a/modules/core/plugin-container/src/test/java/org/rhq/core/pc/inventory/AvailabilityProxyConcurrencyTest.java +++ b/modules/core/plugin-container/src/test/java/org/rhq/core/pc/inventory/AvailabilityProxyConcurrencyTest.java @@ -26,15 +26,18 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger;
import org.testng.annotations.Test;
import org.rhq.core.domain.measurement.AvailabilityType; import org.rhq.core.pluginapi.availability.AvailabilityFacet;
-@Test(enabled = false) +@Test public class AvailabilityProxyConcurrencyTest implements AvailabilityFacet {
+ private AtomicInteger numberOfFacetCalls = new AtomicInteger(-1); + public void testConcurrentAvailChecks() throws Exception { Thread.interrupted(); // clear any hanging around interrupt status
@@ -48,7 +51,7 @@ public class AvailabilityProxyConcurrencyTest implements AvailabilityFacet { assert UP.equals(firstAvail) : "Can't even get our first avail correctly: " + firstAvail;
// create several threads that will concurrently call getAvailability - final int numThreads = 2; + final int numThreads = 10; final Hashtable<String, AvailabilityType> availResults = new Hashtable<String, AvailabilityType>(numThreads); final Hashtable<String, Date> dateResults = new Hashtable<String, Date>(numThreads); final Hashtable<String, Throwable> throwableResults = new Hashtable<String, Throwable>(numThreads); @@ -68,6 +71,7 @@ public class AvailabilityProxyConcurrencyTest implements AvailabilityFacet { } } }; + numberOfFacetCalls.set(0); // this will count how many times the proxy actually calls the facet getAvail method for (int i = 0; i < numThreads; i++) { Thread t = new Thread(runnable, "t" + i); t.start(); @@ -86,6 +90,12 @@ public class AvailabilityProxyConcurrencyTest implements AvailabilityFacet { for (AvailabilityType availtype : availResults.values()) { assert availtype.equals(UP) : "Failed, bad avail: availResults = " + availResults; } + + // make sure we actually tested the code we need to test - we should not be making + // individual facet calls for each request because we shotgun the requests so fast, + // and the facet sleeps so long, that the proxy should return the last avail rather + // than requiring a new facet call. + assert (numberOfFacetCalls.get()) < numThreads : numberOfFacetCalls; } finally { executor.shutdownNow(); } @@ -94,7 +104,8 @@ public class AvailabilityProxyConcurrencyTest implements AvailabilityFacet { @Override public synchronized AvailabilityType getAvailability() { try { - Thread.sleep(750); + System.out.println("~~~AVAILABILITY FACET CALL #" + numberOfFacetCalls.incrementAndGet()); + Thread.sleep(250); // just make it slow enough so a few proxy calls are done concurrently while this method is running } catch (Exception e) { System.out.println("~~~AVAILABILITY SLEEP WAS ABORTED: " + e); }