modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/Aggregator.java | 22 ++++- modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsDAO.java | 5 + modules/enterprise/server/server-metrics/src/test/java/org/rhq/server/metrics/AggregatorTest.java | 43 ++++++++-- 3 files changed, 60 insertions(+), 10 deletions(-)
New commits: commit 3ec94341a05c476c8d74cfea99b82e2e1b364f71 Author: John Sanda jsanda@redhat.com Date: Thu Oct 3 07:50:23 2013 -0400
delete 1 hr index entries when aggregation of time slice data is finished
diff --git a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/Aggregator.java b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/Aggregator.java index ef95516..15b3ff8 100644 --- a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/Aggregator.java +++ b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/Aggregator.java @@ -386,8 +386,7 @@ public class Aggregator { if (log.isDebugEnabled()) { log.debug("Finished aggregating raw data for time slice [" + startTime + "]"); } - allAggregationFinished.countDown(); - // TODO delete the row from metrics_index + deleteIndexEntries(MetricsTable.ONE_HOUR); } }
@@ -399,9 +398,24 @@ public class Aggregator { log.debug("Finished aggregating one hour data for time slice [startTime: " + start + ", endTime: " + end + "]"); } - allAggregationFinished.countDown(); - // TODO delete the row from metrics_index + deleteIndexEntries(MetricsTable.SIX_HOUR); } }
+ private void deleteIndexEntries(final MetricsTable table) { + StorageResultSetFuture future = dao.deleteMetricsIndexEntriesAsync(table, startTime.getMillis()); + Futures.addCallback(future, new FutureCallback<ResultSet>() { + @Override + public void onSuccess(ResultSet result) { + allAggregationFinished.countDown(); + } + + @Override + public void onFailure(Throwable t) { + log.warn("Failed to delete index entries for table " + table + " at time [" + startTime + "]"); + allAggregationFinished.countDown(); + } + }); + } + } diff --git a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsDAO.java b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsDAO.java index 40450b9..c0d7a67 100644 --- a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsDAO.java +++ b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsDAO.java @@ -298,4 +298,9 @@ public class MetricsDAO { BoundStatement statement = deleteIndexEntries.bind(table.getTableName(), new Date(timestamp)); storageSession.execute(statement); } + + public StorageResultSetFuture deleteMetricsIndexEntriesAsync(MetricsTable table, long timestamp) { + BoundStatement statement = deleteIndexEntries.bind(table.getTableName(), new Date(timestamp)); + return storageSession.executeAsync(statement); + } } diff --git a/modules/enterprise/server/server-metrics/src/test/java/org/rhq/server/metrics/AggregatorTest.java b/modules/enterprise/server/server-metrics/src/test/java/org/rhq/server/metrics/AggregatorTest.java index 283d6fb..ea6284b 100644 --- a/modules/enterprise/server/server-metrics/src/test/java/org/rhq/server/metrics/AggregatorTest.java +++ b/modules/enterprise/server/server-metrics/src/test/java/org/rhq/server/metrics/AggregatorTest.java @@ -2,6 +2,7 @@ package org.rhq.server.metrics;
import static java.util.Arrays.asList; import static org.rhq.test.AssertUtils.assertCollectionMatchesNoOrder; +import static org.testng.Assert.assertEquals;
import java.math.BigDecimal; import java.math.MathContext; @@ -131,16 +132,15 @@ public class AggregatorTest extends CassandraIntegrationTest { assertMetricsIndexEquals(MetricsTable.SIX_HOUR, hour9.minusHours(3).getMillis(), expected6HourIndex, "Failed to update index for " + MetricsTable.SIX_HOUR);
- // The 6 hour data should not get aggregated since the current 6 hour time slice - // has not passed yet. More specifically, the aggregation job is running at 09:00 - // which means that the current 6 hour slice is from 06:00 to 12:00. -// assert6HourDataEmpty(scheduleId); + // The 6 hour data should not get aggregated since the current 6 hour time slice, + // 06:00 - 12:00, has not yet passed. + assert6HourDataEmpty(scheduleId);
// verify that the 24 hour index is empty -// assert24HourMetricsIndexEmpty(scheduleId, hour0.getMillis()); + assert24HourMetricsIndexEmpty(scheduleId, hour0.getMillis());
// verify that the 1 hour queue has been purged -// assert1HourMetricsIndexEmpty(scheduleId, hour9.getMillis()); + assert1HourMetricsIndexEmpty(scheduleId, hour8.getMillis()); }
static double divide(double dividend, int divisor) { @@ -174,4 +174,35 @@ public class AggregatorTest extends CassandraIntegrationTest { expected, actual); }
+ private void assert6HourDataEmpty(int scheduleId) { + assertMetricDataEmpty(scheduleId, MetricsTable.SIX_HOUR); + } + + private void assert24HourDataEmpty(int scheduleId) { + assertMetricDataEmpty(scheduleId, MetricsTable.TWENTY_FOUR_HOUR); + } + + private void assertMetricDataEmpty(int scheduleId, MetricsTable columnFamily) { + List<AggregateNumericMetric> metrics = Lists.newArrayList(findAggregateMetrics(columnFamily, scheduleId)); + assertEquals(metrics.size(), 0, "Expected " + columnFamily + " to be empty for schedule id " + scheduleId + + " but found " + metrics); + } + + private void assert1HourMetricsIndexEmpty(int scheduleId, long timeSlice) { + assertMetricsIndexEmpty(scheduleId, MetricsTable.ONE_HOUR, timeSlice); + } + + private void assert6HourMetricsIndexEmpty(int scheduleId, long timeSlice) { + assertMetricsIndexEmpty(scheduleId, MetricsTable.SIX_HOUR, timeSlice); + } + + private void assert24HourMetricsIndexEmpty(int scheduleId, long timeSlice) { + assertMetricsIndexEmpty(scheduleId, MetricsTable.TWENTY_FOUR_HOUR, timeSlice); + } + + private void assertMetricsIndexEmpty(int scheduleId, MetricsTable table, long timeSlice) { + List<MetricsIndexEntry> index = Lists.newArrayList(dao.findMetricsIndexEntries(table, timeSlice)); + assertEquals(index.size(), 0, "Expected metrics index for " + table + " to be empty but found " + index); + } + }
rhq-commits@lists.fedorahosted.org