modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsDAO.java | 52 +++++++- modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsServer.java | 6 - modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/RawNumericMetric.java | 7 + modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/RawNumericMetricMapper.java | 31 +++++ modules/enterprise/server/server-metrics/src/test/java/org/rhq/server/metrics/MetricsDAOTest.java | 37 +++--- modules/enterprise/server/server-metrics/src/test/java/org/rhq/server/metrics/MetricsServerTest.java | 59 +++++----- 6 files changed, 141 insertions(+), 51 deletions(-)
New commits: commit 43892b18eb56638b31eb01f59a37f9263b92a877 Author: John Sanda jsanda@redhat.com Date: Tue Nov 6 21:38:17 2012 -0500
adding support for setting column meta data when inserting raw metrics
Each column in cassandra has a time to live (TTL) and timestamp attributes. If the TTL is not set, then the column will not expire unless explicitly deleted. If the TTL is set, Cassandra will delete the column after the specified number of seconds has passed. The timestamp attribute is basically the insertion time. It is used for conflict resolution between replicas. The TTL for raw metrics is set to seven days.
diff --git a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsDAO.java b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsDAO.java index 383a15b..fb7b1db 100644 --- a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsDAO.java +++ b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsDAO.java @@ -63,8 +63,14 @@ public class MetricsDAO { "FROM " + RAW_METRICS_TABLE + " " + "WHERE schedule_id = ? AND time >= ? AND time < ?";
+ private static final String RAW_METRICS_WITH_METADATA_QUERY = + "SELECT schedule_id, time, value, ttl(value), writetime(value) " + + "FROM " + RAW_METRICS_TABLE + " " + + "WHERE schedule_id = ? AND time >= ? AND time < ?"; + private static final String INSERT_RAW_METRICS = - "INSERT INTO raw_metrics (schedule_id, time, value) VALUES (?, ?, ?)"; + "INSERT INTO raw_metrics (schedule_id, time, value) " + + "VALUES (?, ?, ?) USING TTL ? AND TIMESTAMP ?";
private static final String METRICS_INDEX_QUERY = "SELECT time, schedule_id " + @@ -75,23 +81,21 @@ public class MetricsDAO { private static final String UPDATE_METRICS_INDEX = "INSERT INTO " + METRICS_INDEX_TABLE + " (bucket, time, schedule_id, null_col) VALUES (?, ?, ?, ?)";
- private static interface ConnectionCallback { - void invoke(Connection connection); - } - private DataSource dataSource;
public MetricsDAO(DataSource dataSource) { this.dataSource = dataSource; }
- public Set<MeasurementDataNumeric> insertRawMetrics(Set<MeasurementDataNumeric> dataSet) { + public Set<MeasurementDataNumeric> insertRawMetrics(Set<MeasurementDataNumeric> dataSet, int ttl, long timestamp) { Set<MeasurementDataNumeric> insertedMetrics = new HashSet<MeasurementDataNumeric>(); Connection connection = null; PreparedStatement statement = null; try { + String sql = "INSERT INTO raw_metrics (schedule_id, time, value) VALUES (?, ?, ?) " + + "USING TTL " + ttl + " AND TIMESTAMP " + timestamp; connection = dataSource.getConnection(); - statement = connection.prepareStatement(INSERT_RAW_METRICS); + statement = connection.prepareStatement(sql);
for (MeasurementDataNumeric data : dataSet) { statement.setInt(1, data.getScheduleId()); @@ -177,6 +181,40 @@ public class MetricsDAO { } }
+ public List<RawNumericMetric> findRawMetrics(int scheduleId, DateTime startTime, DateTime endTime, + boolean includeMetadata) { + + if (!includeMetadata) { + return findRawMetrics(scheduleId, startTime, endTime); + } + + Connection connection = null; + PreparedStatement statement = null; + ResultSet resultSet = null; + try { + connection = dataSource.getConnection(); + statement = connection.prepareStatement(RAW_METRICS_WITH_METADATA_QUERY); + statement.setInt(1, scheduleId); + statement.setDate(2, new java.sql.Date(startTime.getMillis())); + statement.setDate(3, new java.sql.Date(endTime.getMillis())); + + resultSet = statement.executeQuery(); + List<RawNumericMetric> metrics = new ArrayList<RawNumericMetric>(); + ResultSetMapper<RawNumericMetric> resultSetMapper = new RawNumericMetricMapper(true); + + while (resultSet.next()) { + metrics.add(resultSetMapper.map(resultSet)); + } + return metrics; + } catch (SQLException e) { + throw new CQLException(e); + } finally { + JDBCUtil.safeClose(resultSet); + JDBCUtil.safeClose(statement); + JDBCUtil.safeClose(connection); + } + } + public List<AggregatedNumericMetric> findAggregateMetrics(String bucket, int scheduleId) { Connection connection = null; Statement statement = null; diff --git a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsServer.java b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsServer.java index 96d8903..63504f3 100644 --- a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsServer.java +++ b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsServer.java @@ -47,6 +47,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.joda.time.DateTime; import org.joda.time.DateTimeComparator; +import org.joda.time.Days; import org.joda.time.Minutes;
import org.rhq.core.domain.auth.Subject; @@ -76,6 +77,8 @@ public class MetricsServer {
private static final int DEFAULT_PAGE_SIZE = 200;
+ public static int RAW_TTL = Days.days(7).toStandardSeconds().getSeconds(); + private final Log log = LogFactory.getLog(MetricsServer.class);
private Cluster cluster; @@ -274,7 +277,8 @@ public class MetricsServer {
public void addNumericData(Set<MeasurementDataNumeric> dataSet) { MetricsDAO dao = new MetricsDAO(cassandraDS); - Set<MeasurementDataNumeric> updates = dao.insertRawMetrics(dataSet); + long timestamp = System.currentTimeMillis(); + Set<MeasurementDataNumeric> updates = dao.insertRawMetrics(dataSet, RAW_TTL, timestamp); updateMetricsIndex(updates); }
diff --git a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/RawNumericMetric.java b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/RawNumericMetric.java index 7d3142e..d8546aa 100644 --- a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/RawNumericMetric.java +++ b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/RawNumericMetric.java @@ -47,6 +47,13 @@ public class RawNumericMetric { this.timestamp = timestamp; }
+ public RawNumericMetric(int scheduleId, long timestamp, double value, ColumnMetadata metadata) { + this.scheduleId = scheduleId; + this.value = value; + this.timestamp = timestamp; + columnMetadata = metadata; + } + public int getScheduleId() { return scheduleId; } diff --git a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/RawNumericMetricMapper.java b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/RawNumericMetricMapper.java index cb81503..2d88075 100644 --- a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/RawNumericMetricMapper.java +++ b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/RawNumericMetricMapper.java @@ -33,8 +33,37 @@ import java.sql.SQLException; */ public class RawNumericMetricMapper implements ResultSetMapper<RawNumericMetric> {
+ private ResultSetMapper<RawNumericMetric> mapper; + + public RawNumericMetricMapper() { + this(false); + } + + public RawNumericMetricMapper(boolean metaDataIncluded) { + if (metaDataIncluded) { + mapper = new ResultSetMapper<RawNumericMetric>() { + @Override + public RawNumericMetric map(ResultSet resultSet) throws SQLException { + RawNumericMetric rawMetric = new RawNumericMetric(resultSet.getInt(1), + resultSet.getDate(2).getTime(), resultSet.getDouble(3)); + ColumnMetadata metadata = new ColumnMetadata(resultSet.getInt(4), resultSet.getLong(5)); + rawMetric.setColumnMetadata(metadata); + return rawMetric; + } + }; + } else { + mapper = new ResultSetMapper<RawNumericMetric>() { + @Override + public RawNumericMetric map(ResultSet resultSet) throws SQLException { + return new RawNumericMetric(resultSet.getInt(1), resultSet.getDate(2).getTime(), + resultSet.getDouble(3)); + } + }; + } + } + @Override public RawNumericMetric map(ResultSet resultSet) throws SQLException { - return new RawNumericMetric(resultSet.getInt(1), resultSet.getDate(2).getTime(), resultSet.getDouble(3)); + return mapper.map(resultSet); } } diff --git a/modules/enterprise/server/server-metrics/src/test/java/org/rhq/server/metrics/MetricsDAOTest.java b/modules/enterprise/server/server-metrics/src/test/java/org/rhq/server/metrics/MetricsDAOTest.java index 20f6ecc..2e22d0e 100644 --- a/modules/enterprise/server/server-metrics/src/test/java/org/rhq/server/metrics/MetricsDAOTest.java +++ b/modules/enterprise/server/server-metrics/src/test/java/org/rhq/server/metrics/MetricsDAOTest.java @@ -41,12 +41,11 @@ import java.util.Map; import java.util.Set;
import org.joda.time.DateTime; +import org.joda.time.Hours; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test;
-import org.rhq.core.domain.measurement.DataType; import org.rhq.core.domain.measurement.MeasurementDataNumeric; -import org.rhq.core.domain.measurement.MeasurementScheduleRequest;
/** * @author John Sanda @@ -77,31 +76,39 @@ public class MetricsDAOTest extends CassandraIntegrationTest { DateTime twoMinutesAgo = currentTime.minusMinutes(2); DateTime oneMinuteAgo = currentTime.minusMinutes(1);
- String scheduleName = getClass().getName() + "_SCHEDULE"; - long interval = MINUTE * 10; - boolean enabled = true; - DataType dataType = DataType.MEASUREMENT; - MeasurementScheduleRequest request = new MeasurementScheduleRequest(scheduleId, scheduleName, interval, - enabled, dataType); - Set<MeasurementDataNumeric> data = new HashSet<MeasurementDataNumeric>(); - data.add(new MeasurementDataNumeric(threeMinutesAgo.getMillis(), request, 3.2)); - data.add(new MeasurementDataNumeric(twoMinutesAgo.getMillis(), request, 3.9)); - data.add(new MeasurementDataNumeric(oneMinuteAgo.getMillis(), request, 2.6)); + data.add(new MeasurementDataNumeric(threeMinutesAgo.getMillis(), scheduleId, 3.2)); + data.add(new MeasurementDataNumeric(twoMinutesAgo.getMillis(), scheduleId, 3.9)); + data.add(new MeasurementDataNumeric(oneMinuteAgo.getMillis(), scheduleId, 2.6));
MetricsDAO dao = new MetricsDAO(dataSource); - Set<MeasurementDataNumeric> actualUpdates = dao.insertRawMetrics(data); + int ttl = Hours.ONE.toStandardSeconds().getSeconds(); + long timestamp = System.currentTimeMillis(); + Set<MeasurementDataNumeric> actualUpdates = dao.insertRawMetrics(data, ttl, timestamp);
assertEquals(actualUpdates, data, "The updates do not match expected value.");
- List<RawNumericMetric> actualMetrics = dao.findRawMetrics(scheduleId, currentHour, currentHour.plusHours(1)); + List<RawNumericMetric> actualMetrics = dao.findRawMetrics(scheduleId, currentHour, currentHour.plusHours(1)); List<RawNumericMetric> expectedMetrics = asList( new RawNumericMetric(scheduleId, threeMinutesAgo.getMillis(), 3.2), new RawNumericMetric(scheduleId, twoMinutesAgo.getMillis(), 3.9), new RawNumericMetric(scheduleId, oneMinuteAgo.getMillis(), 2.6) ); - assertEquals(actualMetrics, expectedMetrics, "Failed to find raw metrics"); + + // Now verify that the column meta data was set. We do this separately in order to + // exercise both versions of the findRawMetrics method. In production code (so far), + // we have no need to retrieve meta data when retrieving raw metrics, but we need + // to verify that the meta data is in fact set. + List<RawNumericMetric> actualMetricsWithMetadata = dao.findRawMetrics(scheduleId, currentHour, + currentHour.plusHours(1), true) ; + List<RawNumericMetric> expectedMetricsWithMetadata = asList( + new RawNumericMetric(scheduleId, threeMinutesAgo.getMillis(), 3.2, new ColumnMetadata(ttl, timestamp)), + new RawNumericMetric(scheduleId, twoMinutesAgo.getMillis(), 3.9, new ColumnMetadata(ttl, timestamp)), + new RawNumericMetric(scheduleId, oneMinuteAgo.getMillis(), 2.6, new ColumnMetadata(ttl, timestamp)) + ); + assertEquals(actualMetricsWithMetadata, expectedMetricsWithMetadata, "Column meta data does not match " + + "expected values"); }
@Test diff --git a/modules/enterprise/server/server-metrics/src/test/java/org/rhq/server/metrics/MetricsServerTest.java b/modules/enterprise/server/server-metrics/src/test/java/org/rhq/server/metrics/MetricsServerTest.java index 60d086f..149a16d 100644 --- a/modules/enterprise/server/server-metrics/src/test/java/org/rhq/server/metrics/MetricsServerTest.java +++ b/modules/enterprise/server/server-metrics/src/test/java/org/rhq/server/metrics/MetricsServerTest.java @@ -36,10 +36,12 @@ import static org.rhq.server.metrics.MetricsDAO.ONE_HOUR_METRICS_TABLE; import static org.rhq.server.metrics.MetricsDAO.RAW_METRICS_TABLE; import static org.rhq.server.metrics.MetricsDAO.SIX_HOUR_METRICS_TABLE; import static org.rhq.server.metrics.MetricsDAO.TWENTY_FOUR_HOUR_METRICS_TABLE; +import static org.rhq.server.metrics.MetricsServer.RAW_TTL; import static org.rhq.server.metrics.MetricsServer.divide; import static org.rhq.test.AssertUtils.assertCollectionMatchesNoOrder; import static org.rhq.test.AssertUtils.assertPropertiesMatch; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue;
import java.sql.ResultSet; import java.sql.SQLException; @@ -110,6 +112,8 @@ public class MetricsServerTest extends CassandraIntegrationTest {
private Keyspace keyspace;
+ private MetricsDAO dao; + private static class MetricsServerStub extends MetricsServer { private DateTime currentHour;
@@ -142,6 +146,9 @@ public class MetricsServerTest extends CassandraIntegrationTest { metricsServer.setTraitsCF(TRAITS_CF); metricsServer.setResourceTraitsCF(RESOURCE_TRAITS_CF); metricsServer.setCassandraDS(dataSource); + + dao = new MetricsDAO(dataSource); + purgeDB(); }
@@ -172,31 +179,23 @@ public class MetricsServerTest extends CassandraIntegrationTest { DateTime twoMinutesAgo = currentTime.minusMinutes(2); DateTime oneMinuteAgo = currentTime.minusMinutes(1);
- String scheduleName = getClass().getName() + "_SCHEDULE"; - long interval = MINUTE * 10; - boolean enabled = true; - DataType dataType = DataType.MEASUREMENT; - MeasurementScheduleRequest request = new MeasurementScheduleRequest(scheduleId, scheduleName, interval, - enabled, dataType); - Set<MeasurementDataNumeric> data = new HashSet<MeasurementDataNumeric>(); - data.add(new MeasurementDataNumeric(threeMinutesAgo.getMillis(), request, 3.2)); - data.add(new MeasurementDataNumeric(twoMinutesAgo.getMillis(), request, 3.9)); - data.add(new MeasurementDataNumeric(oneMinuteAgo.getMillis(), request, 2.6)); + data.add(new MeasurementDataNumeric(threeMinutesAgo.getMillis(), scheduleId, 3.2)); + data.add(new MeasurementDataNumeric(twoMinutesAgo.getMillis(), scheduleId, 3.9)); + data.add(new MeasurementDataNumeric(oneMinuteAgo.getMillis(), scheduleId, 2.6));
+ long timestamp = System.currentTimeMillis(); metricsServer.addNumericData(data);
- Statement statement = connection.createStatement(); - ResultSet resultSet = statement.executeQuery("SELECT * FROM raw_metrics WHERE schedule_id = " + scheduleId); + List<RawNumericMetric> actual = dao.findRawMetrics(scheduleId, hour0.plusHours(4), hour0.plusHours(5)); + List<RawNumericMetric> expected = asList( + new RawNumericMetric(scheduleId, threeMinutesAgo.getMillis(), 3.2), + new RawNumericMetric(scheduleId, twoMinutesAgo.getMillis(), 3.9), + new RawNumericMetric(scheduleId, oneMinuteAgo.getMillis(), 2.6) + );
- Set<MeasurementDataNumeric> actual = new HashSet<MeasurementDataNumeric>(); - while (resultSet.next()) { - actual.add(new MeasurementDataNumeric(resultSet.getDate(2).getTime(), resultSet.getInt(1), - resultSet.getDouble(3))); - } - resultSet.close(); - - assertCollectionMatchesNoOrder("Failed to retrieve raw metric data", data, actual, "name"); + assertEquals(actual, expected, "Failed to retrieve raw metric data"); + assertColumnMetadataEquals(scheduleId, hour0.plusHours(4), hour0.plusHours(5), RAW_TTL, timestamp);
List<MetricsIndexEntry> expectedIndex = asList(new MetricsIndexEntry(ONE_HOUR_METRIC_DATA_CF, hour0.plusHours(4), scheduleId)); @@ -292,8 +291,8 @@ public class MetricsServerTest extends CassandraIntegrationTest { rawMetrics.add(new MeasurementDataNumeric(secondMetricTime.getMillis(), scheduleId, secondValue)); rawMetrics.add(new MeasurementDataNumeric(thirdMetricTime.getMillis(), scheduleId, thirdValue));
- MetricsDAO dao = new MetricsDAO(dataSource); - Set<MeasurementDataNumeric> insertedRawMetrics = dao.insertRawMetrics(rawMetrics); + long timestamp = System.currentTimeMillis(); + Set<MeasurementDataNumeric> insertedRawMetrics = dao.insertRawMetrics(rawMetrics, RAW_TTL, timestamp); metricsServer.updateMetricsIndex(insertedRawMetrics);
// insert raw data to be aggregated @@ -705,6 +704,17 @@ public class MetricsServerTest extends CassandraIntegrationTest { return composite; }
+ private void assertColumnMetadataEquals(int scheduleId, DateTime startTime, DateTime endTime, Integer ttl, + long timestamp) { + List<RawNumericMetric> metrics = dao.findRawMetrics(scheduleId, startTime, endTime, true); + for (RawNumericMetric metric : metrics) { + assertEquals(metric.getColumnMetadata().getTtl(), ttl, "The TTL does not match the expected value for " + + metric); + assertTrue(metric.getColumnMetadata().getWriteTime() >= timestamp, "The column timestamp for " + metric + + " should be >= " + timestamp + " but it is " + metric.getColumnMetadata().getWriteTime()); + } + } + private void assert1HourMetricsQueueEquals(List<HColumn<Composite, Integer>> expected) { assertMetricsQueueEquals(ONE_HOUR_METRIC_DATA_CF, expected); } @@ -812,9 +822,7 @@ public class MetricsServerTest extends CassandraIntegrationTest { }
private void assertMetricDataEquals(String columnFamily, int scheduleId, List<AggregatedNumericMetric> expected) { - MetricsDAO dao = new MetricsDAO(dataSource); List<AggregatedNumericMetric> actual = dao.findAggregateMetrics(columnFamily, scheduleId); - assertCollectionMatchesNoOrder(expected, actual, "Metric data for schedule id " + scheduleId + " in table " + columnFamily + " does not match expected values"); } @@ -853,9 +861,7 @@ public class MetricsServerTest extends CassandraIntegrationTest { // } // // assertEquals(actual.size(), 0, prefix + " Expected the row to be empty."); - MetricsDAO dao = new MetricsDAO(dataSource); List<AggregatedNumericMetric> metrics = dao.findAggregateMetrics(columnFamily, scheduleId); - assertEquals(metrics.size(), 0, "Expected " + columnFamily + " to be empty for schedule id " + scheduleId + " but found " + metrics); } @@ -899,7 +905,6 @@ public class MetricsServerTest extends CassandraIntegrationTest {
// assertEquals(actual.size(), 0, "Expected the " + queueName + " queue to be empty for schedule id " + // scheduleId); - MetricsDAO dao = new MetricsDAO(dataSource); List<MetricsIndexEntry> index = dao.findMetricsIndexEntries(table); assertEquals(index.size(), 0, "Expected metrics index for " + table + " to be empty but found " + index); }
rhq-commits@lists.fedorahosted.org