modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/StorageResultSetFuture.java
| 4
modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Aggregator.java
| 24 +++++
modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/BatchException.java
| 45 ++++++++++
modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/BatchFailureListener.java
| 26 +++++
modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/ProcessBatch.java
| 24 ++++-
modules/enterprise/server/server-metrics/src/test/java/org/rhq/server/metrics/AggregationTests.java
| 32 +++++++
modules/enterprise/server/server-metrics/src/test/java/org/rhq/server/metrics/MetricsDAOTest.java
| 2
7 files changed, 150 insertions(+), 7 deletions(-)
New commits:
commit a493f07e7c71b721777ee27e618668d60960cc94
Author: John Sanda <jsanda(a)redhat.com>
Date: Tue Feb 25 22:54:07 2014 -0500
adding some initial support for failure handling
This commit gives the capability to log stack traces when exceptions occur
while processing a batch of futures. More importantly, this is a step towards
providing fail fast behavior. Suppose we provide a configurable threshold for
failures, and when that threshold is passed, we abort the aggregation so that
it can be retried at some later time.
diff --git
a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/StorageResultSetFuture.java
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/StorageResultSetFuture.java
index d6b8598..ca287b0 100644
---
a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/StorageResultSetFuture.java
+++
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/StorageResultSetFuture.java
@@ -43,6 +43,10 @@ public class StorageResultSetFuture implements
ListenableFuture<ResultSet> {
return wrapperFuture.isDone();
}
+ public void setException(Throwable t) {
+ wrapperFuture.setException(t);
+ }
+
/**
* Delegates to {@link
com.datastax.driver.core.ResultSetFuture#getUninterruptibly()}
*/
diff --git
a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Aggregator.java
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Aggregator.java
index 95b9e07..f3ee9c3 100644
---
a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Aggregator.java
+++
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Aggregator.java
@@ -4,7 +4,9 @@ import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import com.datastax.driver.core.ResultSet;
import com.google.common.base.Stopwatch;
+import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
@@ -45,6 +47,13 @@ class Aggregator {
private TaskTracker taskTracker = new TaskTracker();
+ private AsyncFunction<BatchResult, ResultSet> deleteCachePartition = new
AsyncFunction<BatchResult, ResultSet>() {
+ @Override
+ public ListenableFuture<ResultSet> apply(BatchResult batchResult) throws
Exception {
+ return dao.deleteCacheEntries(aggregationType.getCacheTable(),
startTime.getMillis(), startScheduleId);
+ }
+ };
+
void setComputeMetric(ComputeMetric computeMetric) {
this.computeMetric = computeMetric;
}
@@ -131,7 +140,20 @@ class Aggregator {
@Override
public void onFailure(Throwable t) {
- LOG.warn("There was an unexpected error while processing a batch of
" + aggregationType);
+ if (t instanceof BatchException) {
+ BatchException exception = (BatchException) t;
+ LOG.warn("There were errors while processing a batch of " +
aggregationType + " with starting " +
+ "schedule id " + startScheduleId + ": " +
exception.getErrorMessages());
+ if (LOG.isDebugEnabled()) {
+ for (Throwable error : exception.getRootCauses()) {
+ LOG.debug("Root cause for batch error", error);
+ }
+ }
+ } else {
+ LOG.warn("There was an unexpected error while processing a batch
of " + aggregationType +
+ " with starting schedule id " + startScheduleId, t);
+ }
+ // TODO add some configurable strategy to determine whether or not to
abort
updateRemainingBatches();
}
};
diff --git
a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/BatchException.java
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/BatchException.java
new file mode 100644
index 0000000..46e005b
--- /dev/null
+++
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/BatchException.java
@@ -0,0 +1,45 @@
+package org.rhq.server.metrics.aggregation;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.rhq.core.util.exception.ThrowableUtil;
+
+/**
+ * @author John Sanda
+ */
+class BatchException extends Exception {
+
+ private int startScheduleId;
+
+ private List<Throwable> errors;
+
+ public BatchException(int startScheduleId, List<Throwable> errors) {
+ this.startScheduleId = startScheduleId;
+ this.errors = errors;
+ }
+
+ int getStartScheduleId() {
+ return startScheduleId;
+ }
+
+ List<Throwable> getErrors() {
+ return errors;
+ }
+
+ List<String> getErrorMessages() {
+ List<String> messages = new ArrayList<String>(errors.size());
+ for (Throwable error : errors) {
+ messages.add(ThrowableUtil.getRootMessage(error));
+ }
+ return messages;
+ }
+
+ List<Throwable> getRootCauses() {
+ List<Throwable> rootCauses = new
ArrayList<Throwable>(errors.size());
+ for (Throwable error : errors) {
+ rootCauses.add(ThrowableUtil.getRootCause(error));
+ }
+ return rootCauses;
+ }
+}
diff --git
a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/BatchFailureListener.java
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/BatchFailureListener.java
new file mode 100644
index 0000000..fef0a67
--- /dev/null
+++
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/BatchFailureListener.java
@@ -0,0 +1,26 @@
+package org.rhq.server.metrics.aggregation;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.datastax.driver.core.ResultSet;
+import com.google.common.util.concurrent.FutureFallback;
+import com.google.common.util.concurrent.ListenableFuture;
+
+/**
+ * @author John Sanda
+ */
+class BatchFailureListener implements FutureFallback<ResultSet> {
+
+ private List<Throwable> errors = new ArrayList<Throwable>();
+
+ @Override
+ public ListenableFuture<ResultSet> create(Throwable t) throws Exception {
+ errors.add(t);
+ return null;
+ }
+
+ List<Throwable> getErrors() {
+ return errors;
+ }
+}
diff --git
a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/ProcessBatch.java
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/ProcessBatch.java
index 5c8680f..a9519f5 100644
---
a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/ProcessBatch.java
+++
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/ProcessBatch.java
@@ -41,6 +41,8 @@ class ProcessBatch implements AsyncFunction<ResultSet,
BatchResult> {
private int batchSize;
+ private BatchFailureListener failureListener = new BatchFailureListener();
+
public ProcessBatch(MetricsDAO dao, ComputeMetric computeMetric, int
startScheduleId,
DateTime timeSlice, AggregationType aggregationType, int batchSize) {
this.dao = dao;
@@ -57,7 +59,7 @@ class ProcessBatch implements AsyncFunction<ResultSet,
BatchResult> {
if (log.isDebugEnabled()) {
log.debug("Aggregating batch of " + aggregationType + " with
starting schedule id " + startScheduleId);
}
- List<StorageResultSetFuture> insertFutures = new
ArrayList<StorageResultSetFuture>(batchSize * 4);
+ List<ListenableFuture<ResultSet>> insertFutures = new
ArrayList<ListenableFuture<ResultSet>>(batchSize * 4);
try {
if (resultSet.isExhausted()) {
return Futures.immediateFuture(new BatchResult(timeSlice,
startScheduleId));
@@ -81,8 +83,8 @@ class ProcessBatch implements AsyncFunction<ResultSet,
BatchResult> {
}
mean.add(currentMetric.getAvg());
} else {
- insertFutures.addAll(computeMetric.execute(startScheduleId,
currentMetric.getScheduleId(), min, max,
- mean));
+
insertFutures.addAll(wrapWithFailureListener(computeMetric.execute(startScheduleId,
+ currentMetric.getScheduleId(), min, max, mean)));
currentMetric = nextMetric;
min = currentMetric.getMin();
@@ -91,12 +93,16 @@ class ProcessBatch implements AsyncFunction<ResultSet,
BatchResult> {
mean.add(currentMetric.getAvg());
}
}
- insertFutures.addAll(computeMetric.execute(startScheduleId,
currentMetric.getScheduleId(), min, max, mean));
+
insertFutures.addAll(wrapWithFailureListener(computeMetric.execute(startScheduleId,
+ currentMetric.getScheduleId(), min, max, mean)));
ListenableFuture<List<ResultSet>> insertsFuture =
Futures.successfulAsList(insertFutures);
return Futures.transform(insertsFuture, new
AsyncFunction<List<ResultSet>, BatchResult>() {
@Override
- public ListenableFuture<BatchResult> apply(final
List<ResultSet> resultSets) {
+ public ListenableFuture<BatchResult> apply(final
List<ResultSet> resultSets) throws Exception {
+ if (!failureListener.getErrors().isEmpty()) {
+ throw new BatchException(startScheduleId,
failureListener.getErrors());
+ }
StorageResultSetFuture deleteFuture = dao.deleteCacheEntries(
aggregationType.getCacheTable(), timeSlice.getMillis(),
startScheduleId);
return Futures.transform(deleteFuture, new Function<ResultSet,
BatchResult>() {
@@ -117,4 +123,12 @@ class ProcessBatch implements AsyncFunction<ResultSet,
BatchResult> {
}
}
+ private List<ListenableFuture<ResultSet>>
wrapWithFailureListener(List<StorageResultSetFuture> futures) {
+ List<ListenableFuture<ResultSet>> wrappedFutures = new
ArrayList<ListenableFuture<ResultSet>>(futures.size());
+ for (StorageResultSetFuture future : futures) {
+ wrappedFutures.add(Futures.withFallback(future, failureListener));
+ }
+ return wrappedFutures;
+ }
+
}
diff --git
a/modules/enterprise/server/server-metrics/src/test/java/org/rhq/server/metrics/AggregationTests.java
b/modules/enterprise/server/server-metrics/src/test/java/org/rhq/server/metrics/AggregationTests.java
index a9919f1..f98e80d 100644
---
a/modules/enterprise/server/server-metrics/src/test/java/org/rhq/server/metrics/AggregationTests.java
+++
b/modules/enterprise/server/server-metrics/src/test/java/org/rhq/server/metrics/AggregationTests.java
@@ -26,6 +26,7 @@ import org.rhq.core.domain.measurement.MeasurementDataNumeric;
import org.rhq.server.metrics.aggregation.AggregationManager;
import org.rhq.server.metrics.domain.AggregateNumericMetric;
import org.rhq.server.metrics.domain.AggregateType;
+import org.rhq.server.metrics.domain.RawNumericMetric;
/**
* @author John Sanda
@@ -271,6 +272,30 @@ public class AggregationTests extends MetricsTest {
purgeDB();
}
+ @Test(dependsOnMethods = "resetDBForFailureScenarios")
+ public void doNotDeleteCachePartitionOnBatchFailure() throws Exception {
+ currentHour = hour(5);
+ DateTime time = hour(4).plusMinutes(20);
+ insertRawData(hour(4), new MeasurementDataNumeric(time.getMillis(), schedule1.id,
3.0))
+ .await("Failed to insert raw data");
+
+ TestDAO testDAO = new TestDAO() {
+ @Override
+ public StorageResultSetFuture insertOneHourDataAsync(int scheduleId, long
timestamp, AggregateType type,
+ double value) {
+ StorageResultSetFuture future = super.insertOneHourDataAsync(scheduleId,
timestamp, type, value);
+ future.setException(new Exception("An unexpected error occurred
while inserting 1 hour data"));
+ return future;
+ }
+ };
+
+ AggregationManagerTestStub aggregationManager = new
AggregationManagerTestStub(hour(4), testDAO);
+ aggregationManager.run();
+
+ assert1HourCacheEquals(hour(4), startScheduleId(schedule1.id), asList(new
RawNumericMetric(schedule1.id,
+ time.getMillis(), 3.0)));
+ }
+
//@Test(dependsOnMethods = "resetDBForFailureScenarios")
// public void failToFetchRawDataIndexDuringAggregationForHour12() throws Exception {
// currentHour = hour(12);
@@ -401,6 +426,13 @@ public class AggregationTests extends MetricsTest {
Map<DateTime, AggregateNumericMetric> twentyFourHourData = new
HashMap<DateTime, AggregateNumericMetric>();
}
+ private class TestDAO extends MetricsDAO {
+
+ public TestDAO() {
+ super(storageSession, configuration);
+ }
+ }
+
private class FailedStorageResultSetFuture extends StorageResultSetFuture implements
ListenableFuture<ResultSet> {
private SettableFuture future;
diff --git
a/modules/enterprise/server/server-metrics/src/test/java/org/rhq/server/metrics/MetricsDAOTest.java
b/modules/enterprise/server/server-metrics/src/test/java/org/rhq/server/metrics/MetricsDAOTest.java
index 07500f1..5d3f304 100644
---
a/modules/enterprise/server/server-metrics/src/test/java/org/rhq/server/metrics/MetricsDAOTest.java
+++
b/modules/enterprise/server/server-metrics/src/test/java/org/rhq/server/metrics/MetricsDAOTest.java
@@ -91,7 +91,7 @@ public class MetricsDAOTest extends CassandraIntegrationTest {
session.execute("TRUNCATE " + MetricsTable.ONE_HOUR);
session.execute("TRUNCATE " + MetricsTable.SIX_HOUR);
session.execute("TRUNCATE " + MetricsTable.TWENTY_FOUR_HOUR);
- session.execute("TRUNCATE " + MetricsTable.INDEX);
+ session.execute("TRUNCATE " + MetricsTable.METRICS_CACHE);
}
@Test(enabled = ENABLED)