modules/core/plugin-container/src/main/java/org/rhq/core/pc/inventory/AvailabilityProxy.java
| 72 +++++++---
modules/core/plugin-container/src/test/java/org/rhq/core/pc/inventory/AvailabilityProxyConcurrencyTest.java
| 17 +-
2 files changed, 65 insertions(+), 24 deletions(-)
New commits:
commit 7fe7f7efb4d5754d8a0ebdccaa4604a450f3e0db
Author: John Mazzitelli <mazz(a)redhat.com>
Date: Wed Nov 27 12:44:18 2013 -0500
get avail proxy test to pass. we no longer assume that we need to abort if the first
time we check if the future is done and it is not.
instead, we check the time when the future was submitted. if its been under a certain
time (1m by default), then
we just return the last avail known to have been returned. otherwise, we timeout.
diff --git
a/modules/core/plugin-container/src/main/java/org/rhq/core/pc/inventory/AvailabilityProxy.java
b/modules/core/plugin-container/src/main/java/org/rhq/core/pc/inventory/AvailabilityProxy.java
index 4e14581..1bf380f 100644
---
a/modules/core/plugin-container/src/main/java/org/rhq/core/pc/inventory/AvailabilityProxy.java
+++
b/modules/core/plugin-container/src/main/java/org/rhq/core/pc/inventory/AvailabilityProxy.java
@@ -52,7 +52,7 @@ public class AvailabilityProxy implements AvailabilityFacet,
Callable<Availabili
private static final Log LOG = LogFactory.getLog(AvailabilityProxy.class); //
purposefully static, don't create one per proxy
/**
- * How long to wait for a resource to return their availability immediately (in ms).
+ * How long to wait for a resource to return their availability *immediately* (in
ms).
* If a resource takes longer than this, then the number of timeouts is incremented,
and then
* the container will just assume availability will be returned asynchronously for
this resource.
*/
@@ -66,6 +66,15 @@ public class AvailabilityProxy implements AvailabilityFacet,
Callable<Availabili
*/
private static final int AVAIL_SYNC_TIMEOUT_LIMIT;
+ /**
+ * How long to wait for an *async* future to return a resource availability (in ms).
+ * If a resource takes longer than this during an async call (via a thread from the
executor thread pool)
+ * and another request comes in for the availability, then that async call will be
canceled and a new
+ * one will be resubmitted, restarting the clock. This just helps clean up any hung
threads waiting
+ * for an availability that is just taking too much time to complete.
+ */
+ private static final int AVAIL_ASYNC_TIMEOUT;
+
static {
int syncAvailTimeout;
try {
@@ -86,6 +95,16 @@ public class AvailabilityProxy implements AvailabilityFacet,
Callable<Availabili
syncAvailTimeoutLimit = 5;
}
AVAIL_SYNC_TIMEOUT_LIMIT = syncAvailTimeoutLimit;
+
+ int asyncAvailTimeout;
+ try {
+ // unlikely to be changed but back-door configurable
+ asyncAvailTimeout = Integer.parseInt(System.getProperty(
+ "rhq.agent.plugins.availability-scan.async-timeout",
"60000"));
+ } catch (Throwable t) {
+ asyncAvailTimeout = 60000;
+ }
+ AVAIL_ASYNC_TIMEOUT = asyncAvailTimeout;
}
private final AvailabilityFacet resourceComponent;
@@ -96,7 +115,9 @@ public class AvailabilityProxy implements AvailabilityFacet,
Callable<Availabili
private volatile Thread current;
- private AvailabilityType last = UNKNOWN;
+ private long lastSubmitTime = 0;
+
+ private AvailabilityType lastAvail = UNKNOWN;
/**
* Number of consecutive avail sync timeouts for the resource. This value is reset if
availability is
@@ -149,25 +170,33 @@ public class AvailabilityProxy implements AvailabilityFacet,
Callable<Availabili
avail = availabilityFuture.get();
} else {
- // if the future is not done then it means this thread has been
checking avail since the
- // last scheduled avail check. Not good. Throw a detailed exception
to the avail checker..
- Throwable t = new Throwable();
- if (current != null) {
- t.setStackTrace(current.getStackTrace());
+ // We are still waiting on the previously submitted async avail check
- let's just return
+ // the last one we got. Note that if the future is not done after a
large amount of time,
+ // then it means this thread could somehow be hung or otherwise stuck
and not returning. Not good.
+ // In this case, throw a detailed exception to the avail checker.
+ if ((System.currentTimeMillis() - lastSubmitTime) >
AVAIL_ASYNC_TIMEOUT) {
+ Throwable t = new Throwable();
+ if (current != null) {
+ t.setStackTrace(current.getStackTrace());
+ }
+ String msg = "Availability check running too long, canceled
for [" + resourceComponent
+ + "]; Stack trace includes the timed out thread's
stack trace.";
+ availabilityFuture.cancel(true);
+
+ // try again, maybe the situation will resolve in time for the
next check
+ availabilityFuture = executor.submit(this);
+ lastSubmitTime = System.currentTimeMillis();
+
+ throw new TimeoutException(msg, t);
+ } else {
+ return lastAvail;
}
- String msg = "Availability check running too long, canceled for
" + resourceComponent
- + "; Stack trace includes the timed out thread's stack
trace.";
- availabilityFuture.cancel(true);
-
- // try again, maybe the situation will resolve in time for the next
check
- availabilityFuture = executor.submit(this);
-
- throw new TimeoutException(msg, t);
}
}
// request a thread to do an avail check
availabilityFuture = executor.submit(this);
+ lastSubmitTime = System.currentTimeMillis();
// if we have exceeded the timeout too many times in a row assume that this
is a slow
// resource and stop performing synchronous checks, which would likely fail
to return fast enough anyway.
@@ -213,7 +242,7 @@ public class AvailabilityProxy implements AvailabilityFacet,
Callable<Availabili
break;
default:
if (LOG.isDebugEnabled()) {
- LOG.debug("ResourceComponent " + resourceComponent + "
getAvailability() returned " + type
+ LOG.debug("ResourceComponent [" + resourceComponent + "]
getAvailability() returned " + type
+ ". This is invalid and is being replaced with DOWN.");
}
result = DOWN;
@@ -222,18 +251,18 @@ public class AvailabilityProxy implements AvailabilityFacet,
Callable<Availabili
// whenever changing to UP we reset the timeout counter. This is because DOWN
resources often respond
// slowly to getAvailability() calls (for example, waiting for a connection
attempt to time out). When a
// resource comes up we should give it a chance to respond quickly and provide
live avail.
- if (result != last) {
+ if (result != lastAvail) {
if (result == UP) {
if (availAsyncConsecutiveTimeouts >= AVAIL_SYNC_TIMEOUT_LIMIT) {
if (LOG.isDebugEnabled()) {
LOG.debug("Enabling synchronous availability collection for
[" + resourceComponent
- + "]; Availability has just changed from [" + last
+ "] to UP.");
+ + "]; Availability has just changed from [" +
lastAvail + "] to UP.");
}
}
availAsyncConsecutiveTimeouts = 0;
}
- last = result;
+ lastAvail = result;
}
return result;
@@ -244,8 +273,9 @@ public class AvailabilityProxy implements AvailabilityFacet,
Callable<Availabili
*/
@Override
public String toString() {
- return "AvailabilityProxy [resourceComponent=" + resourceComponent +
", executor=" + executor
- + ", availabilityFuture=" + availabilityFuture + ",
current=" + current + ", timeouts="
+ return "AvailabilityProxy [resourceComponent=" + resourceComponent +
", lastAvail=" + lastAvail
+ + ", lastSubmitTime=" + new java.util.Date(lastSubmitTime) +
", executor="
+ + executor + ", availabilityFuture=" + availabilityFuture + ",
current=" + current + ", timeouts="
+ availAsyncConsecutiveTimeouts + "]";
}
}
diff --git
a/modules/core/plugin-container/src/test/java/org/rhq/core/pc/inventory/AvailabilityProxyConcurrencyTest.java
b/modules/core/plugin-container/src/test/java/org/rhq/core/pc/inventory/AvailabilityProxyConcurrencyTest.java
index f2880dd..06753ca 100644
---
a/modules/core/plugin-container/src/test/java/org/rhq/core/pc/inventory/AvailabilityProxyConcurrencyTest.java
+++
b/modules/core/plugin-container/src/test/java/org/rhq/core/pc/inventory/AvailabilityProxyConcurrencyTest.java
@@ -26,15 +26,18 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import org.testng.annotations.Test;
import org.rhq.core.domain.measurement.AvailabilityType;
import org.rhq.core.pluginapi.availability.AvailabilityFacet;
-@Test(enabled = false)
+@Test
public class AvailabilityProxyConcurrencyTest implements AvailabilityFacet {
+ private AtomicInteger numberOfFacetCalls = new AtomicInteger(-1);
+
public void testConcurrentAvailChecks() throws Exception {
Thread.interrupted(); // clear any hanging around interrupt status
@@ -48,7 +51,7 @@ public class AvailabilityProxyConcurrencyTest implements
AvailabilityFacet {
assert UP.equals(firstAvail) : "Can't even get our first avail
correctly: " + firstAvail;
// create several threads that will concurrently call getAvailability
- final int numThreads = 2;
+ final int numThreads = 10;
final Hashtable<String, AvailabilityType> availResults = new
Hashtable<String, AvailabilityType>(numThreads);
final Hashtable<String, Date> dateResults = new Hashtable<String,
Date>(numThreads);
final Hashtable<String, Throwable> throwableResults = new
Hashtable<String, Throwable>(numThreads);
@@ -68,6 +71,7 @@ public class AvailabilityProxyConcurrencyTest implements
AvailabilityFacet {
}
}
};
+ numberOfFacetCalls.set(0); // this will count how many times the proxy
actually calls the facet getAvail method
for (int i = 0; i < numThreads; i++) {
Thread t = new Thread(runnable, "t" + i);
t.start();
@@ -86,6 +90,12 @@ public class AvailabilityProxyConcurrencyTest implements
AvailabilityFacet {
for (AvailabilityType availtype : availResults.values()) {
assert availtype.equals(UP) : "Failed, bad avail: availResults =
" + availResults;
}
+
+ // make sure we actually tested the code we need to test - we should not be
making
+ // individual facet calls for each request because we shotgun the requests so
fast,
+ // and the facet sleeps so long, that the proxy should return the last avail
rather
+ // than requiring a new facet call.
+ assert (numberOfFacetCalls.get()) < numThreads : numberOfFacetCalls;
} finally {
executor.shutdownNow();
}
@@ -94,7 +104,8 @@ public class AvailabilityProxyConcurrencyTest implements
AvailabilityFacet {
@Override
public synchronized AvailabilityType getAvailability() {
try {
- Thread.sleep(750);
+ System.out.println("~~~AVAILABILITY FACET CALL #" +
numberOfFacetCalls.incrementAndGet());
+ Thread.sleep(250); // just make it slow enough so a few proxy calls are done
concurrently while this method is running
} catch (Exception e) {
System.out.println("~~~AVAILABILITY SLEEP WAS ABORTED: " + e);
}