modules/core/plugin-container/src/main/java/org/rhq/core/pc/inventory/AvailabilityProxy.java
| 92 +++++--
modules/core/plugin-container/src/test/java/org/rhq/core/pc/inventory/AvailabilityProxyTest.java
| 120 ++++++----
2 files changed, 154 insertions(+), 58 deletions(-)
New commits:
commit 845261576ac892a029eac88b66ad7fd44d37d4f5
Author: Jay Shaughnessy <jshaughn(a)redhat.com>
Date: Tue Dec 3 11:36:48 2013 -0500
Changes to AvailabilityProxy
- support test code tweaking the various configurable values.
- save 3 bytes per proxy by making the sync timeout limit a byte type
- add some commented out dev logging, to be removed later as desired
Work on AvailabilityProxyTest
- add testing for the new async timeout stuff
- add testing for the sync disable/enable
diff --git
a/modules/core/plugin-container/src/main/java/org/rhq/core/pc/inventory/AvailabilityProxy.java
b/modules/core/plugin-container/src/main/java/org/rhq/core/pc/inventory/AvailabilityProxy.java
index 1bf380f..49a7452 100644
---
a/modules/core/plugin-container/src/main/java/org/rhq/core/pc/inventory/AvailabilityProxy.java
+++
b/modules/core/plugin-container/src/main/java/org/rhq/core/pc/inventory/AvailabilityProxy.java
@@ -62,9 +62,9 @@ public class AvailabilityProxy implements AvailabilityFacet,
Callable<Availabili
* Number of consecutive avail sync timeouts before we assume the resource's
avail checking can not meet the async
* timeout. At that point stop slowing things down waiting for the timeout and
instead, for this resource,
* rely only on the async results. In other words, stop trying to report live avail
if live avail checking is
- * consistently too slow.
+ * consistently too slow. Max = 127. We use a byte here to save space.
*/
- private static final int AVAIL_SYNC_TIMEOUT_LIMIT;
+ private static final byte AVAIL_SYNC_TIMEOUT_LIMIT;
/**
* How long to wait for an *async* future to return a resource availability (in ms).
@@ -86,14 +86,17 @@ public class AvailabilityProxy implements AvailabilityFacet,
Callable<Availabili
}
AVAIL_SYNC_TIMEOUT = syncAvailTimeout;
- int syncAvailTimeoutLimit;
+ byte syncAvailTimeoutLimit;
try {
// unlikely to be changed but back-door configurable
- syncAvailTimeoutLimit = Integer.parseInt(System.getProperty(
+ syncAvailTimeoutLimit = Byte.parseByte(System.getProperty(
"rhq.agent.plugins.availability-scan.sync-timeout-limit",
"5"));
} catch (Throwable t) {
syncAvailTimeoutLimit = 5;
}
+ if (syncAvailTimeoutLimit > 127) {
+ syncAvailTimeoutLimit = 127;
+ }
AVAIL_SYNC_TIMEOUT_LIMIT = syncAvailTimeoutLimit;
int asyncAvailTimeout;
@@ -124,7 +127,7 @@ public class AvailabilityProxy implements AvailabilityFacet,
Callable<Availabili
* returned synchronously (within the timeout period). There is currently no way to
'reset' this (short
* of agent restart) after it has triggered, meaning the resource will no longer try
to report live avail.
*/
- private int availAsyncConsecutiveTimeouts = 0;
+ private byte availSyncConsecutiveTimeouts = 0;
private final ClassLoader classLoader;
@@ -155,10 +158,11 @@ public class AvailabilityProxy implements AvailabilityFacet,
Callable<Availabili
* being computed.
*
* @throws TimeoutException
- * if timeout occurred during the second call to this method
+ * if an async check exceeds AVAIL_ASYNC_TIMEOUT
*/
@Override
public AvailabilityType getAvailability() {
+ // TODO take out DevDebug printlns when we're confident we don't need
them
AvailabilityType avail = UNKNOWN;
try {
@@ -168,27 +172,33 @@ public class AvailabilityProxy implements AvailabilityFacet,
Callable<Availabili
if (availabilityFuture.isDone()) {
// hold onto and report the last known value if necessary
avail = availabilityFuture.get();
+ // System.out.println("DevDebug 1 [" +
System.currentTimeMillis() + "] future done avail [" + avail.name() +
"]");
} else {
// We are still waiting on the previously submitted async avail check
- let's just return
// the last one we got. Note that if the future is not done after a
large amount of time,
// then it means this thread could somehow be hung or otherwise stuck
and not returning. Not good.
// In this case, throw a detailed exception to the avail checker.
- if ((System.currentTimeMillis() - lastSubmitTime) >
AVAIL_ASYNC_TIMEOUT) {
+ long elapsedTime = System.currentTimeMillis() - lastSubmitTime;
+ if (elapsedTime > getAsyncTimeout()) {
+ // System.out.println("DevDebug 2 [" +
System.currentTimeMillis() + "] async timeout");
+
Throwable t = new Throwable();
if (current != null) {
t.setStackTrace(current.getStackTrace());
}
- String msg = "Availability check running too long, canceled
for [" + resourceComponent
- + "]; Stack trace includes the timed out thread's
stack trace.";
+ String msg = "Availability check ran too long [" +
elapsedTime + "ms], canceled for ["
+ + resourceComponent + "]; Stack trace includes the timed
out thread's stack trace.";
availabilityFuture.cancel(true);
// try again, maybe the situation will resolve in time for the
next check
availabilityFuture = executor.submit(this);
lastSubmitTime = System.currentTimeMillis();
+ // System.out.println("DevDebug 3 [" +
System.currentTimeMillis() + "] async timeout submit");
throw new TimeoutException(msg, t);
} else {
+ // System.out.println("DevDebug 4 [" +
System.currentTimeMillis() + "] no async timeout, return lastAvail [" +
lastAvail.name() + "]");
return lastAvail;
}
}
@@ -197,26 +207,32 @@ public class AvailabilityProxy implements AvailabilityFacet,
Callable<Availabili
// request a thread to do an avail check
availabilityFuture = executor.submit(this);
lastSubmitTime = System.currentTimeMillis();
+ // System.out.println("DevDebug 5 [" + System.currentTimeMillis()
+ "] standard submit");
// if we have exceeded the timeout too many times in a row assume that this
is a slow
// resource and stop performing synchronous checks, which would likely fail
to return fast enough anyway.
- if (availAsyncConsecutiveTimeouts < AVAIL_SYNC_TIMEOUT_LIMIT) {
+ if (availSyncConsecutiveTimeouts < getSyncTimeoutLimit()) {
// attempt to get availability synchronously
- avail = availabilityFuture.get(AVAIL_SYNC_TIMEOUT,
TimeUnit.MILLISECONDS);
+ avail = availabilityFuture.get(getSyncTimeout(), TimeUnit.MILLISECONDS);
+ // System.out.println("DevDebug 6 [" +
System.currentTimeMillis() + "] sync avail [" + avail.name() + "]");
// success (failure will throw exception)
- availAsyncConsecutiveTimeouts = 0;
+ availSyncConsecutiveTimeouts = 0;
availabilityFuture = null;
- } else if (availAsyncConsecutiveTimeouts == AVAIL_SYNC_TIMEOUT_LIMIT) {
+ } else if (availSyncConsecutiveTimeouts == getSyncTimeoutLimit()) {
+ // System.out.println("DevDebug 7 [" +
System.currentTimeMillis() + "] sync disabled");
+
// log one time that we are disabling synchronous checks for this
resource
- ++availAsyncConsecutiveTimeouts;
+ ++availSyncConsecutiveTimeouts;
if (LOG.isDebugEnabled()) {
LOG.debug("Disabling synchronous availability collection for
[" + resourceComponent + "]; ["
- + AVAIL_SYNC_TIMEOUT_LIMIT + "] consective timeouts
exceeding [" + AVAIL_SYNC_TIMEOUT + "ms]");
+ + getSyncTimeoutLimit() + "] consecutive timeouts exceeding
[" + getSyncTimeout() + "ms]");
}
}
} catch (InterruptedException e) {
+ // System.out.println("DevDebug 8 [" + System.currentTimeMillis()
+ "] Interrupted");
+
LOG.debug("InterruptedException; shut down is (likely) in
progress.");
availabilityFuture.cancel(true);
availabilityFuture = null;
@@ -227,8 +243,10 @@ public class AvailabilityProxy implements AvailabilityFacet,
Callable<Availabili
throw new RuntimeException("Availability check failed",
e.getCause());
} catch (java.util.concurrent.TimeoutException e) {
+ // System.out.println("DevDebug 9 [" + System.currentTimeMillis()
+ "] Sync Timeout");
+
// failed to get avail synchronously. next call to the future will return
availability (we hope)
- ++availAsyncConsecutiveTimeouts;
+ ++availSyncConsecutiveTimeouts;
}
return processAvail(avail);
@@ -253,29 +271,61 @@ public class AvailabilityProxy implements AvailabilityFacet,
Callable<Availabili
// resource comes up we should give it a chance to respond quickly and provide
live avail.
if (result != lastAvail) {
if (result == UP) {
- if (availAsyncConsecutiveTimeouts >= AVAIL_SYNC_TIMEOUT_LIMIT) {
+ if (availSyncConsecutiveTimeouts >= getSyncTimeoutLimit()) {
+ // System.out.println("DevDebug 10 [" +
System.currentTimeMillis() + "] Enabling Sync");
+
if (LOG.isDebugEnabled()) {
LOG.debug("Enabling synchronous availability collection for
[" + resourceComponent
+ "]; Availability has just changed from [" +
lastAvail + "] to UP.");
}
}
- availAsyncConsecutiveTimeouts = 0;
+ availSyncConsecutiveTimeouts = 0;
}
lastAvail = result;
}
+ // System.out.println("DevDebug 11 [" + System.currentTimeMillis() +
"] returning processAvail [" + result.getName()+ "]");
+
return result;
}
/**
+ * Override point. Typically for testing.
+ * @return something other than the env var setting.
+ */
+ protected long getAsyncTimeout() {
+ return AVAIL_ASYNC_TIMEOUT;
+ }
+
+ /**
+ * Override point. Typically for testing.
+ * @return something other than the env var setting.
+ */
+ protected long getSyncTimeout() {
+ return AVAIL_SYNC_TIMEOUT;
+ }
+
+ /**
+ * Override point. Typically for testing.
+ * @return something other than the env var setting.
+ */
+ protected byte getSyncTimeoutLimit() {
+ return AVAIL_SYNC_TIMEOUT_LIMIT;
+ }
+
+ protected boolean isSyncDisabled() {
+ return availSyncConsecutiveTimeouts >= getSyncTimeoutLimit();
+ }
+
+ /**
* Debug string.
*/
@Override
public String toString() {
return "AvailabilityProxy [resourceComponent=" + resourceComponent +
", lastAvail=" + lastAvail
- + ", lastSubmitTime=" + new java.util.Date(lastSubmitTime) +
", executor="
- + executor + ", availabilityFuture=" + availabilityFuture + ",
current=" + current + ", timeouts="
- + availAsyncConsecutiveTimeouts + "]";
+ + ", lastSubmitTime=" + new java.util.Date(lastSubmitTime) +
", executor=" + executor
+ + ", availabilityFuture=" + availabilityFuture + ",
current=" + current + ", timeouts="
+ + availSyncConsecutiveTimeouts + "]";
}
}
diff --git
a/modules/core/plugin-container/src/test/java/org/rhq/core/pc/inventory/AvailabilityProxyTest.java
b/modules/core/plugin-container/src/test/java/org/rhq/core/pc/inventory/AvailabilityProxyTest.java
index e8dd78d..e861845 100644
---
a/modules/core/plugin-container/src/test/java/org/rhq/core/pc/inventory/AvailabilityProxyTest.java
+++
b/modules/core/plugin-container/src/test/java/org/rhq/core/pc/inventory/AvailabilityProxyTest.java
@@ -19,7 +19,6 @@
package org.rhq.core.pc.inventory;
import static org.rhq.core.domain.measurement.AvailabilityType.DOWN;
-import static org.rhq.core.domain.measurement.AvailabilityType.UNKNOWN;
import static org.rhq.core.domain.measurement.AvailabilityType.UP;
import static org.testng.AssertJUnit.assertEquals;
import static org.testng.AssertJUnit.fail;
@@ -42,57 +41,70 @@ public class AvailabilityProxyTest implements AvailabilityFacet {
private final Log LOG = LogFactory.getLog(AvailabilityProxyTest.class);
private volatile int timeout = 1;
- private final AvailabilityType avail = UP;
+ private AvailabilityType returnedAvail = UP;
private final ExecutorService executor = Executors.newCachedThreadPool();
- private volatile boolean interrupted;
/**
* Run a test. Note this may not be 100% reliable, as it depends on thread execution
to
* happen according to our sleep schedule...
*/
public void test() throws InterruptedException {
- AvailabilityProxy ap = new AvailabilityProxy(this, executor,
getClass().getClassLoader());
+ TestAvailabilityProxy ap = new TestAvailabilityProxy(this, executor,
getClass().getClassLoader());
LOG.debug("proxy " + ap);
- assertEquals("should be up", avail, ap.getAvailability());
- assertEquals(false, interrupted);
- timeout = 1100;
- assertEquals("should be down", DOWN, ap.getAvailability());
- Thread.sleep(500); // waited 1.5 sec
- assertEquals("should be up now", UP, ap.getAvailability());
- Thread.sleep(1); // waited 1.001 seconds
+
+ assertEquals("should be up", UP, ap.getAvailability()); // waits 1ms
and returns synchronously
+ timeout = 1200;
+ assertEquals("should be down", DOWN, ap.getAvailability()); // waits 1s
and times out
+ Thread.sleep(300); // now waited total of 1s + .3s = 1.3 sec > 1.2s
+ assertEquals("should be up now", UP, ap.getAvailability()); // waits 1s
and returns last reported value (UP)
+
+ ap.setAsyncTimeout(1020L);
+ Thread.sleep(50); // waited 1.050 seconds
try {
- ap.getAvailability();
- fail("should timeout 1");
+ ap.getAvailability(); // this submits another which we need to let finish
+ fail("should timeout 1020, waited 1050");
} catch (TimeoutException e) {
}
- LOG.debug("proxy " + ap);
- assertEquals(true, interrupted);
-
- LOG.debug("force more timeouts");
- for (int i = 0; i < 5; ++i) {
- LOG.debug("timeout " + i);
- try {
- ap.getAvailability();
- fail("should timeout");
- } catch (TimeoutException e) {
- }
- }
+ // wait for the last submit to return
+ Thread.sleep(1210);
LOG.debug("proxy " + ap);
- timeout = 0;
- try {
+ // try disabling sync checks
+ // - start returning DOWN avail in order to perform a sync disable and then
re-enable
+ // - go back to default async timeout, we don't want it to trigger anymore
+ // short timeout but longer than the sync timeout to force several sync timeouts
+ returnedAvail = DOWN;
+ ap.setAsyncTimeout(null);
+ timeout = 75;
+ ap.setSyncTimeout(50L);
+
+ while (!ap.isSyncDisabled()) {
ap.getAvailability();
- fail("should be down, until we wait a little");
- } catch (TimeoutException e) {
+ Thread.sleep(50L);
}
- Thread.sleep(100);
- assertEquals("should be up now", UP, ap.getAvailability());
+ // go back to returning UP so we can re-enable sync checking
+ // make the sync check a half second so we can prove that sync checking is not
happening
+ returnedAvail = UP;
+ timeout = 500;
+ ap.setSyncTimeout(500L);
+ long start = System.currentTimeMillis();
+ assertEquals("should be DOWN", DOWN, ap.getAvailability());
+ assert System.currentTimeMillis() - start < 100 : "Should have been fast,
returning old avail";
+ // wait for the last submit to return
+ Thread.sleep(510);
+
+ // check for re-enable sync checks
+ assertEquals("should be UP", UP, ap.getAvailability());
+ assertEquals("should be enabled", false, ap.isSyncDisabled());
+ // wait for the last submit to return
+ Thread.sleep(510);
+
+ // test interrupt handling
LOG.debug("interrupt this thread");
Thread.currentThread().interrupt();
- timeout = 1000 * 5;
- assertEquals("cancelation", UNKNOWN, ap.getAvailability());
+ assertEquals("cancellation", AvailabilityType.UNKNOWN,
ap.getAvailability());
assertEquals(true, Thread.currentThread().isInterrupted());
}
@@ -102,10 +114,44 @@ public class AvailabilityProxyTest implements AvailabilityFacet {
LOG.debug("sleep " + timeout);
Thread.sleep(timeout);
} catch (InterruptedException e) {
- interrupted = true;
Thread.currentThread().interrupt();
}
- LOG.debug("return " + avail);
- return avail;
+ LOG.debug("return " + returnedAvail.getName());
+ return returnedAvail;
+ }
+
+ private class TestAvailabilityProxy extends AvailabilityProxy {
+
+ private Long asyncTimeout = null;
+ private Long syncTimeout = null;
+
+ public TestAvailabilityProxy(AvailabilityFacet resourceComponent, ExecutorService
executor,
+ ClassLoader classLoader) {
+ super(resourceComponent, executor, classLoader);
+ }
+
+ @Override
+ public AvailabilityType getAvailability() {
+ // System.out.println("DevDebug 0 [" + System.currentTimeMillis()
+ "] getAvail() timeout=[" + timeout + "], syncTimeout=[" +
syncTimeout + "], asyncTimeout=[" + asyncTimeout + "]");
+ return super.getAvailability();
+ }
+
+ public void setAsyncTimeout(Long asyncTimeout) {
+ this.asyncTimeout = asyncTimeout;
+ }
+
+ public void setSyncTimeout(Long syncTimeout) {
+ this.syncTimeout = syncTimeout;
+ }
+
+ @Override
+ protected long getSyncTimeout() {
+ return null == syncTimeout ? super.getSyncTimeout() : syncTimeout;
+ }
+
+ @Override
+ protected long getAsyncTimeout() {
+ return null == asyncTimeout ? super.getAsyncTimeout() : asyncTimeout;
+ }
}
-}
\ No newline at end of file
+}