modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsDAO.java
| 52 +++++++-
modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsServer.java
| 6 -
modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/RawNumericMetric.java
| 7 +
modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/RawNumericMetricMapper.java
| 31 +++++
modules/enterprise/server/server-metrics/src/test/java/org/rhq/server/metrics/MetricsDAOTest.java
| 37 +++---
modules/enterprise/server/server-metrics/src/test/java/org/rhq/server/metrics/MetricsServerTest.java
| 59 +++++-----
6 files changed, 141 insertions(+), 51 deletions(-)
New commits:
commit 43892b18eb56638b31eb01f59a37f9263b92a877
Author: John Sanda <jsanda(a)redhat.com>
Date: Tue Nov 6 21:38:17 2012 -0500
adding support for setting column meta data when inserting raw metrics
Each column in cassandra has a time to live (TTL) and timestamp attributes. If
the TTL is not set, then the column will not expire unless explicitly deleted.
If the TTL is set, Cassandra will delete the column after the specified number
of seconds has passed. The timestamp attribute is basically the insertion time.
It is used for conflict resolution between replicas. The TTL for raw metrics is
set to seven days.
diff --git
a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsDAO.java
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsDAO.java
index 383a15b..fb7b1db 100644
---
a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsDAO.java
+++
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsDAO.java
@@ -63,8 +63,14 @@ public class MetricsDAO {
"FROM " + RAW_METRICS_TABLE + " " +
"WHERE schedule_id = ? AND time >= ? AND time < ?";
+ private static final String RAW_METRICS_WITH_METADATA_QUERY =
+ "SELECT schedule_id, time, value, ttl(value), writetime(value) " +
+ "FROM " + RAW_METRICS_TABLE + " " +
+ "WHERE schedule_id = ? AND time >= ? AND time < ?";
+
private static final String INSERT_RAW_METRICS =
- "INSERT INTO raw_metrics (schedule_id, time, value) VALUES (?, ?, ?)";
+ "INSERT INTO raw_metrics (schedule_id, time, value) " +
+ "VALUES (?, ?, ?) USING TTL ? AND TIMESTAMP ?";
private static final String METRICS_INDEX_QUERY =
"SELECT time, schedule_id " +
@@ -75,23 +81,21 @@ public class MetricsDAO {
private static final String UPDATE_METRICS_INDEX =
"INSERT INTO " + METRICS_INDEX_TABLE + " (bucket, time,
schedule_id, null_col) VALUES (?, ?, ?, ?)";
- private static interface ConnectionCallback {
- void invoke(Connection connection);
- }
-
private DataSource dataSource;
public MetricsDAO(DataSource dataSource) {
this.dataSource = dataSource;
}
- public Set<MeasurementDataNumeric>
insertRawMetrics(Set<MeasurementDataNumeric> dataSet) {
+ public Set<MeasurementDataNumeric>
insertRawMetrics(Set<MeasurementDataNumeric> dataSet, int ttl, long timestamp) {
Set<MeasurementDataNumeric> insertedMetrics = new
HashSet<MeasurementDataNumeric>();
Connection connection = null;
PreparedStatement statement = null;
try {
+ String sql = "INSERT INTO raw_metrics (schedule_id, time, value) VALUES
(?, ?, ?) " +
+ "USING TTL " + ttl + " AND TIMESTAMP " + timestamp;
connection = dataSource.getConnection();
- statement = connection.prepareStatement(INSERT_RAW_METRICS);
+ statement = connection.prepareStatement(sql);
for (MeasurementDataNumeric data : dataSet) {
statement.setInt(1, data.getScheduleId());
@@ -177,6 +181,40 @@ public class MetricsDAO {
}
}
+ public List<RawNumericMetric> findRawMetrics(int scheduleId, DateTime
startTime, DateTime endTime,
+ boolean includeMetadata) {
+
+ if (!includeMetadata) {
+ return findRawMetrics(scheduleId, startTime, endTime);
+ }
+
+ Connection connection = null;
+ PreparedStatement statement = null;
+ ResultSet resultSet = null;
+ try {
+ connection = dataSource.getConnection();
+ statement = connection.prepareStatement(RAW_METRICS_WITH_METADATA_QUERY);
+ statement.setInt(1, scheduleId);
+ statement.setDate(2, new java.sql.Date(startTime.getMillis()));
+ statement.setDate(3, new java.sql.Date(endTime.getMillis()));
+
+ resultSet = statement.executeQuery();
+ List<RawNumericMetric> metrics = new
ArrayList<RawNumericMetric>();
+ ResultSetMapper<RawNumericMetric> resultSetMapper = new
RawNumericMetricMapper(true);
+
+ while (resultSet.next()) {
+ metrics.add(resultSetMapper.map(resultSet));
+ }
+ return metrics;
+ } catch (SQLException e) {
+ throw new CQLException(e);
+ } finally {
+ JDBCUtil.safeClose(resultSet);
+ JDBCUtil.safeClose(statement);
+ JDBCUtil.safeClose(connection);
+ }
+ }
+
public List<AggregatedNumericMetric> findAggregateMetrics(String bucket, int
scheduleId) {
Connection connection = null;
Statement statement = null;
diff --git
a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsServer.java
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsServer.java
index 96d8903..63504f3 100644
---
a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsServer.java
+++
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsServer.java
@@ -47,6 +47,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.joda.time.DateTime;
import org.joda.time.DateTimeComparator;
+import org.joda.time.Days;
import org.joda.time.Minutes;
import org.rhq.core.domain.auth.Subject;
@@ -76,6 +77,8 @@ public class MetricsServer {
private static final int DEFAULT_PAGE_SIZE = 200;
+ public static int RAW_TTL = Days.days(7).toStandardSeconds().getSeconds();
+
private final Log log = LogFactory.getLog(MetricsServer.class);
private Cluster cluster;
@@ -274,7 +277,8 @@ public class MetricsServer {
public void addNumericData(Set<MeasurementDataNumeric> dataSet) {
MetricsDAO dao = new MetricsDAO(cassandraDS);
- Set<MeasurementDataNumeric> updates = dao.insertRawMetrics(dataSet);
+ long timestamp = System.currentTimeMillis();
+ Set<MeasurementDataNumeric> updates = dao.insertRawMetrics(dataSet,
RAW_TTL, timestamp);
updateMetricsIndex(updates);
}
diff --git
a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/RawNumericMetric.java
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/RawNumericMetric.java
index 7d3142e..d8546aa 100644
---
a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/RawNumericMetric.java
+++
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/RawNumericMetric.java
@@ -47,6 +47,13 @@ public class RawNumericMetric {
this.timestamp = timestamp;
}
+ public RawNumericMetric(int scheduleId, long timestamp, double value, ColumnMetadata
metadata) {
+ this.scheduleId = scheduleId;
+ this.value = value;
+ this.timestamp = timestamp;
+ columnMetadata = metadata;
+ }
+
public int getScheduleId() {
return scheduleId;
}
diff --git
a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/RawNumericMetricMapper.java
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/RawNumericMetricMapper.java
index cb81503..2d88075 100644
---
a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/RawNumericMetricMapper.java
+++
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/RawNumericMetricMapper.java
@@ -33,8 +33,37 @@ import java.sql.SQLException;
*/
public class RawNumericMetricMapper implements ResultSetMapper<RawNumericMetric> {
+ private ResultSetMapper<RawNumericMetric> mapper;
+
+ public RawNumericMetricMapper() {
+ this(false);
+ }
+
+ public RawNumericMetricMapper(boolean metaDataIncluded) {
+ if (metaDataIncluded) {
+ mapper = new ResultSetMapper<RawNumericMetric>() {
+ @Override
+ public RawNumericMetric map(ResultSet resultSet) throws SQLException {
+ RawNumericMetric rawMetric = new
RawNumericMetric(resultSet.getInt(1),
+ resultSet.getDate(2).getTime(), resultSet.getDouble(3));
+ ColumnMetadata metadata = new ColumnMetadata(resultSet.getInt(4),
resultSet.getLong(5));
+ rawMetric.setColumnMetadata(metadata);
+ return rawMetric;
+ }
+ };
+ } else {
+ mapper = new ResultSetMapper<RawNumericMetric>() {
+ @Override
+ public RawNumericMetric map(ResultSet resultSet) throws SQLException {
+ return new RawNumericMetric(resultSet.getInt(1),
resultSet.getDate(2).getTime(),
+ resultSet.getDouble(3));
+ }
+ };
+ }
+ }
+
@Override
public RawNumericMetric map(ResultSet resultSet) throws SQLException {
- return new RawNumericMetric(resultSet.getInt(1), resultSet.getDate(2).getTime(),
resultSet.getDouble(3));
+ return mapper.map(resultSet);
}
}
diff --git
a/modules/enterprise/server/server-metrics/src/test/java/org/rhq/server/metrics/MetricsDAOTest.java
b/modules/enterprise/server/server-metrics/src/test/java/org/rhq/server/metrics/MetricsDAOTest.java
index 20f6ecc..2e22d0e 100644
---
a/modules/enterprise/server/server-metrics/src/test/java/org/rhq/server/metrics/MetricsDAOTest.java
+++
b/modules/enterprise/server/server-metrics/src/test/java/org/rhq/server/metrics/MetricsDAOTest.java
@@ -41,12 +41,11 @@ import java.util.Map;
import java.util.Set;
import org.joda.time.DateTime;
+import org.joda.time.Hours;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-import org.rhq.core.domain.measurement.DataType;
import org.rhq.core.domain.measurement.MeasurementDataNumeric;
-import org.rhq.core.domain.measurement.MeasurementScheduleRequest;
/**
* @author John Sanda
@@ -77,31 +76,39 @@ public class MetricsDAOTest extends CassandraIntegrationTest {
DateTime twoMinutesAgo = currentTime.minusMinutes(2);
DateTime oneMinuteAgo = currentTime.minusMinutes(1);
- String scheduleName = getClass().getName() + "_SCHEDULE";
- long interval = MINUTE * 10;
- boolean enabled = true;
- DataType dataType = DataType.MEASUREMENT;
- MeasurementScheduleRequest request = new MeasurementScheduleRequest(scheduleId,
scheduleName, interval,
- enabled, dataType);
-
Set<MeasurementDataNumeric> data = new
HashSet<MeasurementDataNumeric>();
- data.add(new MeasurementDataNumeric(threeMinutesAgo.getMillis(), request, 3.2));
- data.add(new MeasurementDataNumeric(twoMinutesAgo.getMillis(), request, 3.9));
- data.add(new MeasurementDataNumeric(oneMinuteAgo.getMillis(), request, 2.6));
+ data.add(new MeasurementDataNumeric(threeMinutesAgo.getMillis(), scheduleId,
3.2));
+ data.add(new MeasurementDataNumeric(twoMinutesAgo.getMillis(), scheduleId,
3.9));
+ data.add(new MeasurementDataNumeric(oneMinuteAgo.getMillis(), scheduleId, 2.6));
MetricsDAO dao = new MetricsDAO(dataSource);
- Set<MeasurementDataNumeric> actualUpdates = dao.insertRawMetrics(data);
+ int ttl = Hours.ONE.toStandardSeconds().getSeconds();
+ long timestamp = System.currentTimeMillis();
+ Set<MeasurementDataNumeric> actualUpdates = dao.insertRawMetrics(data, ttl,
timestamp);
assertEquals(actualUpdates, data, "The updates do not match expected
value.");
- List<RawNumericMetric> actualMetrics = dao.findRawMetrics(scheduleId,
currentHour, currentHour.plusHours(1));
+ List<RawNumericMetric> actualMetrics = dao.findRawMetrics(scheduleId,
currentHour, currentHour.plusHours(1));
List<RawNumericMetric> expectedMetrics = asList(
new RawNumericMetric(scheduleId, threeMinutesAgo.getMillis(), 3.2),
new RawNumericMetric(scheduleId, twoMinutesAgo.getMillis(), 3.9),
new RawNumericMetric(scheduleId, oneMinuteAgo.getMillis(), 2.6)
);
-
assertEquals(actualMetrics, expectedMetrics, "Failed to find raw
metrics");
+
+ // Now verify that the column meta data was set. We do this separately in order
to
+ // exercise both versions of the findRawMetrics method. In production code (so
far),
+ // we have no need to retrieve meta data when retrieving raw metrics, but we
need
+ // to verify that the meta data is in fact set.
+ List<RawNumericMetric> actualMetricsWithMetadata =
dao.findRawMetrics(scheduleId, currentHour,
+ currentHour.plusHours(1), true) ;
+ List<RawNumericMetric> expectedMetricsWithMetadata = asList(
+ new RawNumericMetric(scheduleId, threeMinutesAgo.getMillis(), 3.2, new
ColumnMetadata(ttl, timestamp)),
+ new RawNumericMetric(scheduleId, twoMinutesAgo.getMillis(), 3.9, new
ColumnMetadata(ttl, timestamp)),
+ new RawNumericMetric(scheduleId, oneMinuteAgo.getMillis(), 2.6, new
ColumnMetadata(ttl, timestamp))
+ );
+ assertEquals(actualMetricsWithMetadata, expectedMetricsWithMetadata, "Column
meta data does not match " +
+ "expected values");
}
@Test
diff --git
a/modules/enterprise/server/server-metrics/src/test/java/org/rhq/server/metrics/MetricsServerTest.java
b/modules/enterprise/server/server-metrics/src/test/java/org/rhq/server/metrics/MetricsServerTest.java
index 60d086f..149a16d 100644
---
a/modules/enterprise/server/server-metrics/src/test/java/org/rhq/server/metrics/MetricsServerTest.java
+++
b/modules/enterprise/server/server-metrics/src/test/java/org/rhq/server/metrics/MetricsServerTest.java
@@ -36,10 +36,12 @@ import static
org.rhq.server.metrics.MetricsDAO.ONE_HOUR_METRICS_TABLE;
import static org.rhq.server.metrics.MetricsDAO.RAW_METRICS_TABLE;
import static org.rhq.server.metrics.MetricsDAO.SIX_HOUR_METRICS_TABLE;
import static org.rhq.server.metrics.MetricsDAO.TWENTY_FOUR_HOUR_METRICS_TABLE;
+import static org.rhq.server.metrics.MetricsServer.RAW_TTL;
import static org.rhq.server.metrics.MetricsServer.divide;
import static org.rhq.test.AssertUtils.assertCollectionMatchesNoOrder;
import static org.rhq.test.AssertUtils.assertPropertiesMatch;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
import java.sql.ResultSet;
import java.sql.SQLException;
@@ -110,6 +112,8 @@ public class MetricsServerTest extends CassandraIntegrationTest {
private Keyspace keyspace;
+ private MetricsDAO dao;
+
private static class MetricsServerStub extends MetricsServer {
private DateTime currentHour;
@@ -142,6 +146,9 @@ public class MetricsServerTest extends CassandraIntegrationTest {
metricsServer.setTraitsCF(TRAITS_CF);
metricsServer.setResourceTraitsCF(RESOURCE_TRAITS_CF);
metricsServer.setCassandraDS(dataSource);
+
+ dao = new MetricsDAO(dataSource);
+
purgeDB();
}
@@ -172,31 +179,23 @@ public class MetricsServerTest extends CassandraIntegrationTest {
DateTime twoMinutesAgo = currentTime.minusMinutes(2);
DateTime oneMinuteAgo = currentTime.minusMinutes(1);
- String scheduleName = getClass().getName() + "_SCHEDULE";
- long interval = MINUTE * 10;
- boolean enabled = true;
- DataType dataType = DataType.MEASUREMENT;
- MeasurementScheduleRequest request = new MeasurementScheduleRequest(scheduleId,
scheduleName, interval,
- enabled, dataType);
-
Set<MeasurementDataNumeric> data = new
HashSet<MeasurementDataNumeric>();
- data.add(new MeasurementDataNumeric(threeMinutesAgo.getMillis(), request, 3.2));
- data.add(new MeasurementDataNumeric(twoMinutesAgo.getMillis(), request, 3.9));
- data.add(new MeasurementDataNumeric(oneMinuteAgo.getMillis(), request, 2.6));
+ data.add(new MeasurementDataNumeric(threeMinutesAgo.getMillis(), scheduleId,
3.2));
+ data.add(new MeasurementDataNumeric(twoMinutesAgo.getMillis(), scheduleId,
3.9));
+ data.add(new MeasurementDataNumeric(oneMinuteAgo.getMillis(), scheduleId, 2.6));
+ long timestamp = System.currentTimeMillis();
metricsServer.addNumericData(data);
- Statement statement = connection.createStatement();
- ResultSet resultSet = statement.executeQuery("SELECT * FROM raw_metrics
WHERE schedule_id = " + scheduleId);
+ List<RawNumericMetric> actual = dao.findRawMetrics(scheduleId,
hour0.plusHours(4), hour0.plusHours(5));
+ List<RawNumericMetric> expected = asList(
+ new RawNumericMetric(scheduleId, threeMinutesAgo.getMillis(), 3.2),
+ new RawNumericMetric(scheduleId, twoMinutesAgo.getMillis(), 3.9),
+ new RawNumericMetric(scheduleId, oneMinuteAgo.getMillis(), 2.6)
+ );
- Set<MeasurementDataNumeric> actual = new
HashSet<MeasurementDataNumeric>();
- while (resultSet.next()) {
- actual.add(new MeasurementDataNumeric(resultSet.getDate(2).getTime(),
resultSet.getInt(1),
- resultSet.getDouble(3)));
- }
- resultSet.close();
-
- assertCollectionMatchesNoOrder("Failed to retrieve raw metric data",
data, actual, "name");
+ assertEquals(actual, expected, "Failed to retrieve raw metric data");
+ assertColumnMetadataEquals(scheduleId, hour0.plusHours(4), hour0.plusHours(5),
RAW_TTL, timestamp);
List<MetricsIndexEntry> expectedIndex = asList(new
MetricsIndexEntry(ONE_HOUR_METRIC_DATA_CF,
hour0.plusHours(4), scheduleId));
@@ -292,8 +291,8 @@ public class MetricsServerTest extends CassandraIntegrationTest {
rawMetrics.add(new MeasurementDataNumeric(secondMetricTime.getMillis(),
scheduleId, secondValue));
rawMetrics.add(new MeasurementDataNumeric(thirdMetricTime.getMillis(),
scheduleId, thirdValue));
- MetricsDAO dao = new MetricsDAO(dataSource);
- Set<MeasurementDataNumeric> insertedRawMetrics =
dao.insertRawMetrics(rawMetrics);
+ long timestamp = System.currentTimeMillis();
+ Set<MeasurementDataNumeric> insertedRawMetrics =
dao.insertRawMetrics(rawMetrics, RAW_TTL, timestamp);
metricsServer.updateMetricsIndex(insertedRawMetrics);
// insert raw data to be aggregated
@@ -705,6 +704,17 @@ public class MetricsServerTest extends CassandraIntegrationTest {
return composite;
}
+ private void assertColumnMetadataEquals(int scheduleId, DateTime startTime, DateTime
endTime, Integer ttl,
+ long timestamp) {
+ List<RawNumericMetric> metrics = dao.findRawMetrics(scheduleId, startTime,
endTime, true);
+ for (RawNumericMetric metric : metrics) {
+ assertEquals(metric.getColumnMetadata().getTtl(), ttl, "The TTL does not
match the expected value for " +
+ metric);
+ assertTrue(metric.getColumnMetadata().getWriteTime() >= timestamp,
"The column timestamp for " + metric +
+ " should be >= " + timestamp + " but it is " +
metric.getColumnMetadata().getWriteTime());
+ }
+ }
+
private void assert1HourMetricsQueueEquals(List<HColumn<Composite,
Integer>> expected) {
assertMetricsQueueEquals(ONE_HOUR_METRIC_DATA_CF, expected);
}
@@ -812,9 +822,7 @@ public class MetricsServerTest extends CassandraIntegrationTest {
}
private void assertMetricDataEquals(String columnFamily, int scheduleId,
List<AggregatedNumericMetric> expected) {
- MetricsDAO dao = new MetricsDAO(dataSource);
List<AggregatedNumericMetric> actual =
dao.findAggregateMetrics(columnFamily, scheduleId);
-
assertCollectionMatchesNoOrder(expected, actual, "Metric data for schedule
id " + scheduleId +
" in table " + columnFamily + " does not match expected
values");
}
@@ -853,9 +861,7 @@ public class MetricsServerTest extends CassandraIntegrationTest {
// }
//
// assertEquals(actual.size(), 0, prefix + " Expected the row to be
empty.");
- MetricsDAO dao = new MetricsDAO(dataSource);
List<AggregatedNumericMetric> metrics =
dao.findAggregateMetrics(columnFamily, scheduleId);
-
assertEquals(metrics.size(), 0, "Expected " + columnFamily + " to
be empty for schedule id " + scheduleId +
" but found " + metrics);
}
@@ -899,7 +905,6 @@ public class MetricsServerTest extends CassandraIntegrationTest {
// assertEquals(actual.size(), 0, "Expected the " + queueName + "
queue to be empty for schedule id " +
// scheduleId);
- MetricsDAO dao = new MetricsDAO(dataSource);
List<MetricsIndexEntry> index = dao.findMetricsIndexEntries(table);
assertEquals(index.size(), 0, "Expected metrics index for " + table +
" to be empty but found " + index);
}