.classpath | 2
modules/common/cassandra-ccm/cassandra-ccm-arquillian/src/main/java/org/rhq/cassandra/ccm/arquillian/CCMSuiteDeploymentExtension.java | 77 ++++++-
modules/common/cassandra-ccm/cassandra-ccm-core/src/main/java/org/rhq/cassandra/CassandraClusterManager.java | 65 ++++--
modules/common/cassandra-ccm/cassandra-ccm-core/src/main/java/org/rhq/cassandra/ClusterInitService.java | 101 ++++++----
modules/common/cassandra-ccm/cassandra-ccm-maven-plugin/src/main/java/org/rhq/cassandra/ccm/maven/DeployMojo.java | 6
modules/common/cassandra-ccm/cassandra-ccm-testng/src/main/java/org/rhq/cassandra/CCMTestNGListener.java | 17 -
modules/common/cassandra-schema/pom.xml | 21 --
modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/AbstractManager.java | 27 +-
modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/SchemaManager.java | 72 +++----
modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/TopologyManager.java | 48 ++--
modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/VersionManager.java | 6
modules/common/cassandra-util/src/main/java/org/rhq/cassandra/util/ConfigEditor.java | 41 +++-
modules/common/cassandra-util/src/test/java/org/rhq/cassandra/util/ConfigEditorTest.java | 47 ++++
modules/common/drift/pom.xml | 22 +-
modules/common/filetemplate-bundle/pom.xml | 6
modules/common/jboss-as/pom.xml | 7
modules/common/pom.xml | 6
modules/core/dbutils/pom.xml | 5
modules/enterprise/server/installer/src/main/java/org/rhq/enterprise/server/installer/InstallerServiceImpl.java | 27 ++
modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/storage/StorageClientManagerBean.java | 8
modules/helpers/metrics-simulator/src/main/java/org/rhq/metrics/simulator/Simulator.java | 51 +----
modules/plugins/rhq-storage/src/test/java/org/rhq/plugins/storage/StorageNodeComponentITest.java | 6
22 files changed, 422 insertions(+), 246 deletions(-)
New commits:
commit d5660806b9af04345bbc7e54e3d21edebed7d641
Author: Stefan Negrea <snegrea(a)redhat.com>
Date: Mon Aug 19 15:55:38 2013 -0500
[BZ 998049] Remove all core domain dependencies from Cassandra common modules.
This was causing maven build problems because core domain would need to be built before database util would run and setup the database.
diff --git a/.classpath b/.classpath
index d590a22..f489722 100644
--- a/.classpath
+++ b/.classpath
@@ -218,6 +218,8 @@
<classpathentry kind="src" path="modules/helpers/ldap-tool/src/main/java"/>
<classpathentry kind="src" path="modules/common/cassandra-schema/src/test/java"/>
<classpathentry kind="src" path="modules/plugins/rhq-storage/src/test/java"/>
+ <classpathentry kind="src" path="modules/helpers/metrics-simulator/src/main/java"/>
+ <classpathentry kind="src" path="modules/common/cassandra-util/src/test/java"/>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.7"/>
<classpathentry exported="true" kind="var" path="M2_REPO/org/apache/httpcomponents/httpclient/4.2.3/httpclient-4.2.3.jar" sourcepath="M2_REPO/org/apache/httpcomponents/httpclient/4.2.3/httpclient-4.2.3-sources.jar"/>
<classpathentry exported="true" kind="var" path="M2_REPO/commons-io/commons-io/2.1/commons-io-2.1.jar" sourcepath="M2_REPO/commons-io/commons-io/2.1/commons-io-2.1-sources.jar"/>
diff --git a/modules/common/cassandra-ccm/cassandra-ccm-arquillian/src/main/java/org/rhq/cassandra/ccm/arquillian/CCMSuiteDeploymentExtension.java b/modules/common/cassandra-ccm/cassandra-ccm-arquillian/src/main/java/org/rhq/cassandra/ccm/arquillian/CCMSuiteDeploymentExtension.java
index 1aeef43..7c59114 100644
--- a/modules/common/cassandra-ccm/cassandra-ccm-arquillian/src/main/java/org/rhq/cassandra/ccm/arquillian/CCMSuiteDeploymentExtension.java
+++ b/modules/common/cassandra-ccm/cassandra-ccm-arquillian/src/main/java/org/rhq/cassandra/ccm/arquillian/CCMSuiteDeploymentExtension.java
@@ -26,7 +26,6 @@
package org.rhq.cassandra.ccm.arquillian;
import java.io.File;
-import java.util.List;
import java.util.concurrent.Callable;
import org.jboss.arquillian.config.descriptor.api.ArquillianDescriptor;
@@ -62,7 +61,6 @@ import org.rhq.cassandra.ClusterInitService;
import org.rhq.cassandra.DeploymentOptions;
import org.rhq.cassandra.DeploymentOptionsFactory;
import org.rhq.cassandra.schema.SchemaManager;
-import org.rhq.core.domain.cloud.StorageNode;
/**
* @author John Sanda
@@ -114,7 +112,10 @@ public class CCMSuiteDeploymentExtension implements LoadableExtension {
SchemaManager schemaManager;
ClusterInitService clusterInitService = new ClusterInitService();
- List<StorageNode> nodes = null;
+
+ String[] nodes = null;
+ int[] jmxPorts = null;
+ int cqlPort = -1;
if (!Boolean.valueOf(System.getProperty("itest.use-external-storage-node", "false"))) {
@@ -131,13 +132,17 @@ public class CCMSuiteDeploymentExtension implements LoadableExtension {
options.setStartRpc(true);
ccm = new CassandraClusterManager(options);
- nodes = ccm.createCluster();
+ ccm.createCluster();
+
+ nodes = ccm.getNodes();
+ jmxPorts = ccm.getJmxPorts();
+ cqlPort = ccm.getCqlPort();
ccm.startCluster(false);
try {
- clusterInitService.waitForClusterToStart(nodes, nodes.size(), 1500, 20, 5);
- schemaManager = new SchemaManager("rhqadmin", "rhqadmin", nodes);
+ clusterInitService.waitForClusterToStart(nodes, jmxPorts, nodes.length, 20, 5, 1500);
+ schemaManager = new SchemaManager("rhqadmin", "rhqadmin", nodes, cqlPort);
} catch (Exception e) {
if (null != ccm) {
@@ -148,7 +153,10 @@ public class CCMSuiteDeploymentExtension implements LoadableExtension {
} else {
try {
String seed = System.getProperty("rhq.cassandra.seeds", "127.0.0.1|7299|9042");
- schemaManager = new SchemaManager("rhqadmin", "rhqadmin", seed);
+ nodes = parseNodeAddresses(seed);
+ cqlPort = parseNodeCqlPort(seed);
+ jmxPorts = parseNodeJmxPorts(seed);
+ schemaManager = new SchemaManager("rhqadmin", "rhqadmin", nodes, cqlPort);
} catch (Exception e) {
throw new RuntimeException("External Cassandra initialization failed", e);
@@ -157,7 +165,7 @@ public class CCMSuiteDeploymentExtension implements LoadableExtension {
try {
schemaManager.install();
- clusterInitService.waitForSchemaAgreement(nodes);
+ clusterInitService.waitForSchemaAgreement(nodes, jmxPorts);
schemaManager.updateTopology();
} catch (Exception e) {
if (null != ccm) {
@@ -260,5 +268,58 @@ public class CCMSuiteDeploymentExtension implements LoadableExtension {
throw new RuntimeException("Could not load defined deploymentClass: " + className, e);
}
}
+
+ private String[] parseNodeAddresses(String s) {
+ String[] unparsedNodes = s.split(",");
+
+ String[] nodes = new String[unparsedNodes.length];
+
+ for (int index = 0; index < 0; index++) {
+ String[] params = unparsedNodes[index].split("\\|");
+ if (params.length != 3) {
+ throw new IllegalArgumentException(
+ "Expected string of the form, hostname|jmxPort|nativeTransportPort: [" + s + "]");
+ }
+
+ nodes[index] = params[0];
+ }
+
+ return nodes;
+ }
+
+ private int[] parseNodeJmxPorts(String s) {
+ String[] unparsedNodes = s.split(",");
+
+ int[] jmxPorts = new int[unparsedNodes.length];
+
+ for (int index = 0; index < 0; index++) {
+ String[] params = unparsedNodes[index].split("\\|");
+ if (params.length != 3) {
+ throw new IllegalArgumentException(
+ "Expected string of the form, hostname|jmxPort|nativeTransportPort: [" + s + "]");
+ }
+
+ jmxPorts[index] = Integer.parseInt(params[1]);
+ }
+
+ return jmxPorts;
+ }
+
+ private int parseNodeCqlPort(String s) {
+ String[] unparsedNodes = s.split(",");
+
+ for (String unparsedNode : unparsedNodes) {
+ String[] params = unparsedNode.split("\\|");
+ if (params.length != 3) {
+ throw new IllegalArgumentException(
+ "Expected string of the form, hostname|jmxPort|nativeTransportPort: [" + s + "]");
+ }
+
+ return Integer.parseInt(params[2]);
+ }
+
+ throw new IllegalArgumentException("Seed property is not valid [" + s + "]");
+ }
+
}
}
\ No newline at end of file
diff --git a/modules/common/cassandra-ccm/cassandra-ccm-core/src/main/java/org/rhq/cassandra/CassandraClusterManager.java b/modules/common/cassandra-ccm/cassandra-ccm-core/src/main/java/org/rhq/cassandra/CassandraClusterManager.java
index c8bb2ef..4a02e1d 100644
--- a/modules/common/cassandra-ccm/cassandra-ccm-core/src/main/java/org/rhq/cassandra/CassandraClusterManager.java
+++ b/modules/common/cassandra-ccm/cassandra-ccm-core/src/main/java/org/rhq/cassandra/CassandraClusterManager.java
@@ -46,7 +46,6 @@ import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.rhq.core.domain.cloud.StorageNode;
import org.rhq.core.pluginapi.util.ProcessExecutionUtility;
import org.rhq.core.system.OperatingSystemType;
import org.rhq.core.system.ProcessExecution;
@@ -68,6 +67,11 @@ public class CassandraClusterManager {
private List<File> installedNodeDirs = new ArrayList<File>();
private Map<Integer, Process> nodeProcessMap = new HashMap<Integer, Process>();
+ private String[] nodes;
+ private int[] jmxPorts;
+ private int cqlPort;
+
+
public CassandraClusterManager() {
this(new DeploymentOptionsFactory().newDeploymentOptions());
}
@@ -90,7 +94,28 @@ public class CassandraClusterManager {
}
}
- public List<StorageNode> createCluster() {
+ /**
+ * @return addresses of storage cluster nodes
+ */
+ public String[] getNodes() {
+ return nodes;
+ }
+
+ /**
+ * @return the JMX ports
+ */
+ public int[] getJmxPorts() {
+ return jmxPorts;
+ }
+
+ /**
+ * @return the CQL Port
+ */
+ public int getCqlPort() {
+ return cqlPort;
+ }
+
+ public void createCluster() {
if (log.isDebugEnabled()) {
log.debug("Installing embedded " + deploymentOptions.getNumNodes() + " node cluster to "
+ deploymentOptions.getClusterDir());
@@ -104,11 +129,10 @@ public class CassandraClusterManager {
if (installedMarker.exists()) {
log.info("It appears that the cluster already exists in " + clusterDir);
log.info("Skipping cluster creation.");
- return calculateNodes();
+ getStorageClusterConfiguration();
}
FileUtil.purge(clusterDir, false);
- List<StorageNode> nodes = new ArrayList<StorageNode>(deploymentOptions.getNumNodes());
String seeds = collectionToString(calculateLocalIPAddresses(deploymentOptions.getNumNodes()));
Set<InetAddress> ipAddresses = null;
@@ -118,6 +142,10 @@ public class CassandraClusterManager {
throw new RuntimeException("Failed to get cluster IP addresses", e);
}
+ this.nodes = new String[deploymentOptions.getNumNodes()];
+ this.jmxPorts = new int[deploymentOptions.getNumNodes()];
+ this.cqlPort = deploymentOptions.getNativeTransportPort();
+
for (int i = 0; i < deploymentOptions.getNumNodes(); ++i) {
File basedir = new File(deploymentOptions.getClusterDir(), "node" + i);
String address = getLocalIPAddress(i + 1);
@@ -142,15 +170,11 @@ public class CassandraClusterManager {
deployer.unzipDistro();
deployer.applyConfigChanges();
deployer.updateFilePerms();
-
- StorageNode storageNode = new StorageNode();
- storageNode.setAddress(address);
- storageNode.setJmxPort(deploymentOptions.getJmxPort() + i);
- storageNode.setCqlPort(nodeOptions.getNativeTransportPort());
- nodes.add(storageNode);
-
deployer.updateStorageAuthConf(ipAddresses);
+ this.nodes[i] = address;
+ this.jmxPorts[i] = deploymentOptions.getJmxPort() + i;
+
installedNodeDirs.add(basedir);
} catch (Exception e) {
log.error("Failed to install node at " + basedir);
@@ -162,7 +186,6 @@ public class CassandraClusterManager {
} catch (IOException e) {
log.warn("Failed to write installed file marker to " + installedMarker, e);
}
- return nodes;
}
private void updateStorageAuthConf(File basedir) {
@@ -210,16 +233,14 @@ public class CassandraClusterManager {
return ipAddresses;
}
- private List<StorageNode> calculateNodes() {
- List<StorageNode> nodes = new ArrayList<StorageNode>(deploymentOptions.getNumNodes());
+ private void getStorageClusterConfiguration() {
+ this.nodes = new String[deploymentOptions.getNumNodes()];
for (int i = 0; i < deploymentOptions.getNumNodes(); ++i) {
- StorageNode storageNode = new StorageNode();
- storageNode.setAddress(getLocalIPAddress(i + 1));
- storageNode.setJmxPort(deploymentOptions.getJmxPort() + i);
- storageNode.setCqlPort(deploymentOptions.getNativeTransportPort());
- nodes.add(storageNode);
+ this.nodes[i] = getLocalIPAddress(i + 1);
+ this.jmxPorts[i] = deploymentOptions.getJmxPort() + i;
}
- return nodes;
+
+ this.cqlPort = deploymentOptions.getNativeTransportPort();
}
public void startCluster() {
@@ -230,9 +251,9 @@ public class CassandraClusterManager {
startCluster(getNodeIds());
if (waitForClusterToStart) {
- List<StorageNode> nodes = calculateNodes();
+ getStorageClusterConfiguration();
ClusterInitService clusterInitService = new ClusterInitService();
- clusterInitService.waitForClusterToStart(nodes, nodes.size(), 20);
+ clusterInitService.waitForClusterToStart(this.nodes, this.jmxPorts, this.nodes.length, 20);
}
}
diff --git a/modules/common/cassandra-ccm/cassandra-ccm-core/src/main/java/org/rhq/cassandra/ClusterInitService.java b/modules/common/cassandra-ccm/cassandra-ccm-core/src/main/java/org/rhq/cassandra/ClusterInitService.java
index 83851c5..cbbfad5 100644
--- a/modules/common/cassandra-ccm/cassandra-ccm-core/src/main/java/org/rhq/cassandra/ClusterInitService.java
+++ b/modules/common/cassandra-ccm/cassandra-ccm-core/src/main/java/org/rhq/cassandra/ClusterInitService.java
@@ -28,7 +28,6 @@ package org.rhq.cassandra;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
-import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
@@ -42,8 +41,6 @@ import javax.management.remote.JMXServiceURL;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.rhq.core.domain.cloud.StorageNode;
-
/**
* This class provides operations to ensure a cluster is initialized and in a consistent
* state. It does not offer functionality for initializing a cluster but rather to make
@@ -56,13 +53,25 @@ public final class ClusterInitService {
private final Log log = LogFactory.getLog(ClusterInitService.class);
- public boolean ping(List<StorageNode> storageNodes, int numHosts) {
+ private static final String JMX_CONNECTION_STRING = "service:jmx:rmi:///jndi/rmi://%s:%s/jmxrmi";
+
+ /**
+ * Pings the storage nodes to verify if they are available and native transport
+ * is running.
+ *
+ * @param storageNodes storage node addresses
+ * @param jmxPorts JMX ports
+ * @param numHosts minimum number of active hosts
+ *
+ * @return [true] cluster available with at least minimum number of hosts available, [false] otherwise
+ */
+ public boolean ping(String[] storageNodes, int[] jmxPorts, int numHosts) {
int connections = 0;
long sleep = 100;
- for (StorageNode host : storageNodes) {
+ for (int index = 0; index < jmxPorts.length; index++) {
try {
- boolean isNativeTransportRunning = this.isNativeTransportRunning(host);
+ boolean isNativeTransportRunning = this.isNativeTransportRunning(storageNodes[index], jmxPorts[index]);
if (isNativeTransportRunning) {
++connections;
}
@@ -71,7 +80,8 @@ public final class ClusterInitService {
}
} catch (Exception e) {
if (log.isDebugEnabled()) {
- log.debug("Unable to open JMX connection to cassandra node [" + host + "]", e);
+ log.debug("Unable to open JMX connection on port [" + jmxPorts[index] + "] to cassandra node ["
+ + storageNodes[index] + "]", e);
}
return false;
}
@@ -89,12 +99,12 @@ public final class ClusterInitService {
* hosts. A runtime exception will be thrown after 10 failed retries.
* <br/><br/>
* After connecting to all nodes, this method will then sleep for a fixed delay.
- * See {@link #waitForClusterToStart(java.util.List, int, int)} for details.
- *
- * @param hosts The cluster nodes to which a connection should be made
+ * See {@link #waitForClusterToStart(int, java.util.List, int)} for details.
+ * @param storageNodes The cluster nodes to which a connection should be made
+ * @param jmxPorts JMX port for each cluster node address
*/
- public void waitForClusterToStart(List<StorageNode> storageNodes) {
- waitForClusterToStart(storageNodes, storageNodes.size(), 10);
+ public void waitForClusterToStart(String[] storageNodes, int jmxPorts[]) {
+ waitForClusterToStart(storageNodes, jmxPorts, storageNodes.length, 10);
}
/**
@@ -109,15 +119,14 @@ public final class ClusterInitService {
* schema and to create the cassandra super user. Cassandra has a hard-coded delay of
* 10 sceonds before it creates the super user, which means the rhq schema cannot be
* created before that.
- *
- * @param hosts The cluster nodes to which a connection should be made
* @param numHosts The number of hosts to which a successful connection has to be made
* before returning.
* @param retries The number of times to retry connecting. A runtime exception will be
* thrown when the number of failed connections exceeds this value.
+ * @param hosts The cluster nodes to which a connection should be made
*/
- public void waitForClusterToStart(List<StorageNode> storageNodes, int numHosts, int retries) {
- waitForClusterToStart(storageNodes, numHosts, 250, retries, 1);
+ public void waitForClusterToStart(String[] storageNodes, int jmxPorts[], int numHosts, int retries) {
+ waitForClusterToStart(storageNodes, jmxPorts, numHosts, 250, retries, 1);
}
/**
@@ -132,17 +141,16 @@ public final class ClusterInitService {
* schema and to create the cassandra super user. Cassandra has a hard-coded delay of
* 10 sceonds before it creates the super user, which means the rhq schema cannot be
* created before that.
- *
- * @param hosts The cluster nodes to which a connection should be made
* @param numHosts The number of hosts to which a successful connection has to be made
* before returning.
* @param delay The amount of time wait between attempts to make a connection
* @param retries The number of times to retry connecting. A runtime exception will be
* thrown when the number of failed connections exceeds this value.
* @param initialWait The amount of seconds before first try.
+ * @param hosts The cluster nodes to which a connection should be made
*/
- public void waitForClusterToStart(List<StorageNode> storageNodes, int numHosts, long delay, int retries,
- int initialWait) {
+ public void waitForClusterToStart(String[] storageNodes, int jmxPorts[], int numHosts, long delay,
+ int retries, int initialWait) {
if (initialWait > 0) {
try {
if (log.isDebugEnabled()) {
@@ -155,23 +163,28 @@ public final class ClusterInitService {
int connections = 0;
int failedConnections = 0;
- Queue<StorageNode> queue = new LinkedList<StorageNode>(storageNodes);
- StorageNode storageNode = queue.poll();
+ Queue<Integer> queue = new LinkedList<Integer>();
+ for (int index = 0; index < storageNodes.length; index++) {
+ queue.add(index);
+ }
+
+ Integer storageNodeIndex = queue.poll();
- while (storageNode != null) {
+ while (storageNodeIndex != null) {
if (failedConnections >= retries) {
throw new RuntimeException("Unable to verify that cluster nodes have started after "
+ failedConnections + " failed attempts");
}
try {
- boolean isNativeTransportRunning = this.isNativeTransportRunning(storageNode);
+ boolean isNativeTransportRunning = isNativeTransportRunning(storageNodes[storageNodeIndex],
+ jmxPorts[storageNodeIndex]);
if (log.isDebugEnabled() && isNativeTransportRunning) {
- log.debug("Successfully connected to cassandra node [" + storageNode + "]");
+ log.debug("Successfully connected to cassandra node [" + storageNodes[storageNodeIndex] + "]");
}
if (isNativeTransportRunning) {
++connections;
} else {
- queue.offer(storageNode);
+ queue.offer(storageNodeIndex);
}
if (connections == numHosts) {
if (log.isDebugEnabled()) {
@@ -186,9 +199,10 @@ public final class ClusterInitService {
}
} catch (Exception e) {
++failedConnections;
- queue.offer(storageNode);
+ queue.offer(storageNodeIndex);
if (log.isDebugEnabled()) {
- log.debug("Unable to open JMX connection to cassandra node [" + storageNode + "].", e);
+ log.debug("Unable to open JMX connection on port [" + jmxPorts[storageNodeIndex]
+ + "] to cassandra node [" + storageNodes[storageNodeIndex] + "].", e);
} else if (log.isInfoEnabled()) {
log.debug("Unable to open connection to cassandra node.");
}
@@ -197,7 +211,7 @@ public final class ClusterInitService {
Thread.sleep(delay);
} catch (InterruptedException e) {
}
- storageNode = queue.poll();
+ storageNodeIndex = queue.poll();
}
}
@@ -209,8 +223,8 @@ public final class ClusterInitService {
*
* @param hosts The cluster nodes
*/
- public void waitForSchemaAgreement(List<StorageNode> storageNodes) throws Exception {
- if (storageNodes == null) {
+ public void waitForSchemaAgreement(String[] storageNodes, int[] jmxPorts) throws Exception {
+ if (storageNodes == null || storageNodes.length == 0) {
return;
}
@@ -219,8 +233,8 @@ public final class ClusterInitService {
while (!schemaInAgreement) {
Set<String> schemaVersions = new HashSet<String>();
- for (StorageNode host : storageNodes) {
- String otherSchchemaVersion = getSchemaVersionForNode(host);
+ for (int index = 0; index < storageNodes.length; index++) {
+ String otherSchchemaVersion = getSchemaVersionForNode(storageNodes[index], jmxPorts[index]);
if (otherSchchemaVersion != null) {
schemaVersions.add(otherSchchemaVersion);
}
@@ -256,9 +270,9 @@ public final class ClusterInitService {
}
}
- public boolean isNativeTransportRunning(StorageNode storageNode) throws Exception {
+ public boolean isNativeTransportRunning(String storageNode, int jmxPort) throws Exception {
Boolean nativeTransportRunning = false;
- String url = storageNode.getJMXConnectionURL();
+ String url = getJMXConnectionURL(storageNode, jmxPort);
JMXServiceURL serviceURL = new JMXServiceURL(url);
Map<String, String> env = new HashMap<String, String>();
JMXConnector connector = null;
@@ -292,8 +306,8 @@ public final class ClusterInitService {
return nativeTransportRunning;
}
- private String getSchemaVersionForNode(StorageNode storageNode) throws Exception {
- String url = storageNode.getJMXConnectionURL();
+ private String getSchemaVersionForNode(String storageNode, int jmxPort) throws Exception {
+ String url = this.getJMXConnectionURL(storageNode, jmxPort);
JMXServiceURL serviceURL = new JMXServiceURL(url);
Map<String, String> env = new HashMap<String, String>();
JMXConnector connector = null;
@@ -326,4 +340,17 @@ public final class ClusterInitService {
}
return null;
}
+
+ /**
+ * Constructs the JMX connection URL based on the node address and
+ * JMX port
+ *
+ * @param address
+ * @param jmxPort
+ * @return
+ */
+ private String getJMXConnectionURL(String address, int jmxPort) {
+ String[] split = JMX_CONNECTION_STRING.split("%s");
+ return split[0] + address + split[1] + jmxPort + split[2];
+ }
}
\ No newline at end of file
diff --git a/modules/common/cassandra-ccm/cassandra-ccm-maven-plugin/src/main/java/org/rhq/cassandra/ccm/maven/DeployMojo.java b/modules/common/cassandra-ccm/cassandra-ccm-maven-plugin/src/main/java/org/rhq/cassandra/ccm/maven/DeployMojo.java
index f50535c..310d7a2 100644
--- a/modules/common/cassandra-ccm/cassandra-ccm-maven-plugin/src/main/java/org/rhq/cassandra/ccm/maven/DeployMojo.java
+++ b/modules/common/cassandra-ccm/cassandra-ccm-maven-plugin/src/main/java/org/rhq/cassandra/ccm/maven/DeployMojo.java
@@ -26,7 +26,6 @@
package org.rhq.cassandra.ccm.maven;
import java.io.File;
-import java.util.List;
import org.apache.maven.plugin.AbstractMojo;
import org.apache.maven.plugin.MojoExecutionException;
@@ -38,7 +37,6 @@ import org.rhq.cassandra.CassandraClusterManager;
import org.rhq.cassandra.DeploymentOptions;
import org.rhq.cassandra.DeploymentOptionsFactory;
import org.rhq.cassandra.schema.SchemaManager;
-import org.rhq.core.domain.cloud.StorageNode;
/**
* @author John Sanda
@@ -63,14 +61,14 @@ public class DeployMojo extends AbstractMojo {
long start = System.currentTimeMillis();
getLog().info("Creating " + numNodes + " cluster in " + clusterDir);
- List<StorageNode> nodes = ccm.createCluster();
+ ccm.createCluster();
getLog().info("Starting cluster nodes");
ccm.startCluster();
getLog().info("Installing RHQ schema");
SchemaManager schemaManager = new SchemaManager(deploymentOptions.getUsername(),
- deploymentOptions.getPassword(), nodes);
+ deploymentOptions.getPassword(), ccm.getNodes(), ccm.getCqlPort());
try {
schemaManager.install();
diff --git a/modules/common/cassandra-ccm/cassandra-ccm-testng/src/main/java/org/rhq/cassandra/CCMTestNGListener.java b/modules/common/cassandra-ccm/cassandra-ccm-testng/src/main/java/org/rhq/cassandra/CCMTestNGListener.java
index a9292f7..48d047d 100644
--- a/modules/common/cassandra-ccm/cassandra-ccm-testng/src/main/java/org/rhq/cassandra/CCMTestNGListener.java
+++ b/modules/common/cassandra-ccm/cassandra-ccm-testng/src/main/java/org/rhq/cassandra/CCMTestNGListener.java
@@ -27,7 +27,6 @@ package org.rhq.cassandra;
import java.io.File;
import java.lang.reflect.Method;
-import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -36,7 +35,6 @@ import org.testng.IInvokedMethodListener;
import org.testng.ITestResult;
import org.rhq.cassandra.schema.SchemaManager;
-import org.rhq.core.domain.cloud.StorageNode;
/**
* @author John Sanda
@@ -105,13 +103,15 @@ public class CCMTestNGListener implements IInvokedMethodListener {
// we cannot initialize ccm here.
ccm = new CassandraClusterManager(deploymentOptions);
ClusterInitService clusterInitService = new ClusterInitService();
+ ccm.createCluster();
- List<StorageNode> nodes = ccm.createCluster();
+ String[] nodes = ccm.getNodes();
+ int[] jmxPorts = ccm.getJmxPorts();
if (System.getProperty("rhq.cassandra.cluster.skip-shutdown") == null) {
- for (StorageNode node : nodes) {
+ for (int index = 0; index < nodes.length; index++) {
try {
- if (clusterInitService.isNativeTransportRunning(node)) {
+ if (clusterInitService.isNativeTransportRunning(nodes[index], jmxPorts[index])) {
throw new RuntimeException("A cluster is already running on the same ports.");
}
} catch (Exception e) {
@@ -122,12 +122,13 @@ public class CCMTestNGListener implements IInvokedMethodListener {
ccm.startCluster(false);
- clusterInitService.waitForClusterToStart(nodes, nodes.size(), 1500, 20, 2);
+ clusterInitService.waitForClusterToStart(nodes, jmxPorts, nodes.length, 20, 2, 1500);
- SchemaManager schemaManager = new SchemaManager(annotation.username(), annotation.password(), nodes);
+ SchemaManager schemaManager = new SchemaManager(annotation.username(), annotation.password(), nodes,
+ ccm.getCqlPort());
schemaManager.install();
if (annotation.waitForSchemaAgreement()) {
- clusterInitService.waitForSchemaAgreement(nodes);
+ clusterInitService.waitForSchemaAgreement(nodes, jmxPorts);
}
schemaManager.updateTopology();
}
diff --git a/modules/common/cassandra-schema/pom.xml b/modules/common/cassandra-schema/pom.xml
index 6fb2915..077cdb9 100644
--- a/modules/common/cassandra-schema/pom.xml
+++ b/modules/common/cassandra-schema/pom.xml
@@ -13,16 +13,10 @@
<name>RHQ Cassandra Schema</name>
<dependencies>
- <dependency>
- <groupId>${project.groupId}</groupId>
- <artifactId>rhq-cassandra-ccm-core</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- <dependency>
- <groupId>${project.groupId}</groupId>
- <artifactId>rhq-cassandra-util</artifactId>
- <version>${project.version}</version>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>rhq-cassandra-util</artifactId>
+ <version>${project.version}</version>
</dependency>
<dependency>
@@ -36,13 +30,6 @@
</dependency>
<dependency>
- <groupId>${project.groupId}</groupId>
- <artifactId>rhq-core-domain</artifactId>
- <version>${project.version}</version>
- </dependency>
-
-
- <dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-core</artifactId>
<version>${cassandra.driver.version}</version>
diff --git a/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/AbstractManager.java b/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/AbstractManager.java
index 7b8c520..7dcef1b 100644
--- a/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/AbstractManager.java
+++ b/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/AbstractManager.java
@@ -40,8 +40,6 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.rhq.cassandra.util.ClusterBuilder;
-import org.rhq.core.domain.cloud.StorageNode;
-import org.rhq.core.util.StringUtil;
/**
* @author Stefan Negrea
@@ -71,13 +69,15 @@ abstract class AbstractManager {
private Session session;
private final String username;
private final String password;
- private List<StorageNode> nodes = new ArrayList<StorageNode>();
+ private final int cqlPort;
+ private final String[] nodes;
private final UpdateFile managementTasks;
- protected AbstractManager(String username, String password, List<StorageNode> nodes) {
+ protected AbstractManager(String username, String password, String[] nodes, int cqlPort) {
try {
this.username = username;
this.password = password;
+ this.cqlPort = cqlPort;
this.nodes = nodes;
} catch (NoHostAvailableException e) {
throw new RuntimeException("Unable create storage node session.", e);
@@ -108,15 +108,11 @@ abstract class AbstractManager {
protected void initClusterSession(String username, String password) {
shutdownClusterConnection();
- String[] hostNames = new String[nodes.size()];
- for (int i = 0; i < hostNames.length; ++i) {
- hostNames[i] = nodes.get(i).getAddress();
- }
- log.info("Initializing session to connect to " + StringUtil.arrayToString(hostNames));
+ log.info("Initializing storage node session.");
- Cluster cluster = new ClusterBuilder().addContactPoints(hostNames).withCredentials(username, password)
- .withPort(nodes.get(0).getCqlPort()).withCompression(Compression.NONE).build();
+ Cluster cluster = new ClusterBuilder().addContactPoints(nodes).withCredentials(username, password)
+ .withPort(this.getCqlPort()).withCompression(Compression.NONE).build();
log.info("Cluster connection configured.");
@@ -140,7 +136,7 @@ abstract class AbstractManager {
* @return cluster size
*/
protected int getClusterSize() {
- return nodes.size();
+ return nodes.length;
}
/**
@@ -158,6 +154,13 @@ abstract class AbstractManager {
}
/**
+ * @return the cqlPort
+ */
+ protected int getCqlPort() {
+ return cqlPort;
+ }
+
+ /**
* Runs a CQL query to check the existence of the RHQ user on the storage cluster.
*
* @return true if the RHQ user exists, false otherwise
diff --git a/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/SchemaManager.java b/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/SchemaManager.java
index 1a82779..fdad697 100644
--- a/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/SchemaManager.java
+++ b/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/SchemaManager.java
@@ -25,7 +25,6 @@
package org.rhq.cassandra.schema;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -34,8 +33,6 @@ import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.log4j.PatternLayout;
-import org.rhq.core.domain.cloud.StorageNode;
-
/**
* @author John Sanda
*/
@@ -51,19 +48,29 @@ public class SchemaManager {
*/
private final String password;
- private final List<StorageNode> nodes = new ArrayList<StorageNode>();
+ /**
+ * Node addresses
+ */
+ private final String[] nodes;
/**
*
- * @param username The username RHQ will use to connect to the storage cluster.
- * @param password The password RHQ will use to connect to the storage cluster.
- * @param nodes A list of seeds nodes that are assumed to be already running and
- * clustered prior to apply schema changes. The format for each node
- * should be address|jmx_port|cql_port,address|jmx_port|cql_port.
- * Each node consists of three fields that are pipe-delimited.
*/
- public SchemaManager(String username, String password, String... nodes) {
- this(username, password, parseNodeInformation(nodes));
+ private final int cqlPort;
+
+ /**
+ *
+ * @param username The username RHQ will use to connect to the storage cluster
+ * @param password The password RHQ will use to connect to the storage cluster
+ * @param nodes A list of seeds nodes that are assumed to be already running and
+ * clustered prior to apply schema changes.
+ * @param cqlPort The native CQL port for the storage cluster
+ */
+ public SchemaManager(String username, String password, String[] nodes, int cqlPort) {
+ this.username = username;
+ this.password = password;
+ this.cqlPort = cqlPort;
+ this.nodes = nodes;
}
/**
@@ -72,11 +79,13 @@ public class SchemaManager {
* @param password The password RHQ will use to connect to the storage cluster.
* @param nodes A list of seeds nodes that are assumed to be already running and
* clustered prior to apply schema changes.
+ * @param cqlPort The native CQL port for the storage cluster
*/
- public SchemaManager(String username, String password, List<StorageNode> nodes) {
+ public SchemaManager(String username, String password, List<String> nodes, int cqlPort) {
this.username = username;
this.password = password;
- this.nodes.addAll(nodes);
+ this.cqlPort = cqlPort;
+ this.nodes = nodes.toArray(new String[nodes.size()]);
}
/**
@@ -85,7 +94,7 @@ public class SchemaManager {
* @throws Exception
*/
public void install() throws Exception {
- VersionManager version = new VersionManager(username, password, nodes);
+ VersionManager version = new VersionManager(username, password, nodes, cqlPort);
version.install();
}
@@ -96,7 +105,7 @@ public class SchemaManager {
* @throws Exception
*/
public void checkCompatibility() throws Exception {
- VersionManager version = new VersionManager(username, password, nodes);
+ VersionManager version = new VersionManager(username, password, nodes, cqlPort);
version.checkCompatibility();
}
@@ -106,7 +115,7 @@ public class SchemaManager {
* @throws Exception
*/
public void drop() throws Exception {
- VersionManager version = new VersionManager(username, password, nodes);
+ VersionManager version = new VersionManager(username, password, nodes, cqlPort);
version.drop();
}
@@ -118,7 +127,7 @@ public class SchemaManager {
* @throws Exception
*/
public void updateTopology() throws Exception {
- TopologyManager topology = new TopologyManager(username, password, nodes);
+ TopologyManager topology = new TopologyManager(username, password, nodes, cqlPort);
topology.updateTopology();
}
@@ -127,28 +136,11 @@ public class SchemaManager {
*
* @return list of storage nodes
*/
- public List<StorageNode> getStorageNodes() {
+ protected String[] getStorageNodes() {
return nodes;
}
/**
- * Parse raw string that contains the list of storage nodes.
- *
- * @param nodes list of storage nodes
- * @return
- */
- private static List<StorageNode> parseNodeInformation(String... nodes) {
- List<StorageNode> parsedNodes = new ArrayList<StorageNode>();
- for (String node : nodes) {
- StorageNode storageNode = new StorageNode();
- storageNode.parseNodeInformation(node);
- parsedNodes.add(storageNode);
- }
-
- return parsedNodes;
- }
-
- /**
* A main runner used for direct usage of the schema manager.
*
* @param args arguments
@@ -164,19 +156,19 @@ public class SchemaManager {
migratorLogging.setLevel(Level.ALL);
if (args.length < 4) {
- System.out.println("Usage : command username password nodes...");
+ System.out.println("Usage : command username password cqlPort nodes...");
System.out.println("\n");
System.out.println("Commands : install | drop | topology");
- System.out.println("Node format: hostname|jmxPort|cqlPort");
return;
}
String command = args[0];
String username = args[1];
String password = args[2];
- String[] hosts = Arrays.copyOfRange(args, 3, args.length);
+ int cqlPort = Integer.parseInt(args[3]);
+ String[] hosts = Arrays.copyOfRange(args, 4, args.length);
- SchemaManager schemaManager = new SchemaManager(username, password, hosts);
+ SchemaManager schemaManager = new SchemaManager(username, password, hosts, cqlPort);
if ("install".equalsIgnoreCase(command)) {
schemaManager.install();
diff --git a/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/TopologyManager.java b/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/TopologyManager.java
index 6c08faa..481c006 100644
--- a/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/TopologyManager.java
+++ b/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/TopologyManager.java
@@ -1,37 +1,33 @@
/*
*
- * * RHQ Management Platform
- * * Copyright (C) 2005-2012 Red Hat, Inc.
- * * All rights reserved.
- * *
- * * This program is free software; you can redistribute it and/or modify
- * * it under the terms of the GNU General Public License, version 2, as
- * * published by the Free Software Foundation, and/or the GNU Lesser
- * * General Public License, version 2.1, also as published by the Free
- * * Software Foundation.
- * *
- * * This program is distributed in the hope that it will be useful,
- * * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * * GNU General Public License and the GNU Lesser General Public License
- * * for more details.
- * *
- * * You should have received a copy of the GNU General Public License
- * * and the GNU Lesser General Public License along with this program;
- * * if not, write to the Free Software Foundation, Inc.,
- * * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * RHQ Management Platform
+ * Copyright (C) 2005-2012 Red Hat, Inc.
+ * All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License, version 2, as
+ * published by the Free Software Foundation, and/or the GNU Lesser
+ * General Public License, version 2.1, also as published by the Free
+ * Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License and the GNU Lesser General Public License
+ * for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * and the GNU Lesser General Public License along with this program;
+ * if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*
*/
package org.rhq.cassandra.schema;
-import java.util.List;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.rhq.core.domain.cloud.StorageNode;
-
/**
* @author Stefan Negrea
*/
@@ -56,8 +52,8 @@ class TopologyManager extends AbstractManager {
}
}
- public TopologyManager(String username, String password, List<StorageNode> nodes) {
- super(username, password, nodes);
+ public TopologyManager(String username, String password, String[] nodes, int cqlPort) {
+ super(username, password, nodes, cqlPort);
}
/**
diff --git a/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/VersionManager.java b/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/VersionManager.java
index fe6ddf9..05cee25 100644
--- a/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/VersionManager.java
+++ b/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/VersionManager.java
@@ -25,7 +25,6 @@
package org.rhq.cassandra.schema;
-import java.util.List;
import java.util.Properties;
import java.util.UUID;
@@ -37,7 +36,6 @@ import org.apache.commons.logging.LogFactory;
import org.rhq.cassandra.schema.exception.InstalledSchemaTooAdvancedException;
import org.rhq.cassandra.schema.exception.InstalledSchemaTooOldException;
import org.rhq.cassandra.schema.exception.SchemaNotInstalledException;
-import org.rhq.core.domain.cloud.StorageNode;
/**
* @author Stefan Negrea
@@ -64,8 +62,8 @@ class VersionManager extends AbstractManager {
}
}
- public VersionManager(String username, String password, List<StorageNode> nodes) throws Exception {
- super(username, password, nodes);
+ public VersionManager(String username, String password, String[] nodes, int cqlPort) throws Exception {
+ super(username, password, nodes, cqlPort);
}
/**
diff --git a/modules/common/cassandra-util/src/main/java/org/rhq/cassandra/util/ConfigEditor.java b/modules/common/cassandra-util/src/main/java/org/rhq/cassandra/util/ConfigEditor.java
index 9fc389c..42e531f 100644
--- a/modules/common/cassandra-util/src/main/java/org/rhq/cassandra/util/ConfigEditor.java
+++ b/modules/common/cassandra-util/src/main/java/org/rhq/cassandra/util/ConfigEditor.java
@@ -1,8 +1,11 @@
package org.rhq.cassandra.util;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.util.List;
@@ -11,9 +14,6 @@ import java.util.Map;
import org.yaml.snakeyaml.DumperOptions;
import org.yaml.snakeyaml.Yaml;
-import org.rhq.core.util.StringUtil;
-import org.rhq.core.util.file.FileUtil;
-
/**
* @author John Sanda
*/
@@ -57,7 +57,7 @@ public class ConfigEditor {
public void restore() {
try {
- FileUtil.copyFile(backupFile, configFile);
+ this.copyFile(backupFile, configFile);
backupFile.delete();
yaml = null;
config = null;
@@ -70,7 +70,7 @@ public class ConfigEditor {
private void createBackup() {
backupFile = new File(configFile.getParent(), "." + configFile.getName() + ".bak");
try {
- FileUtil.copyFile(configFile, backupFile);
+ this.copyFile(configFile, backupFile);
} catch (IOException e) {
throw new ConfigEditorException("Failed to create " + backupFile, e);
}
@@ -113,7 +113,16 @@ public class ConfigEditor {
Map seedProvider = (Map) seedProviderList.get(0);
List paramsList = (List) seedProvider.get("parameters");
Map params = (Map) paramsList.get(0);
- params.put("seeds", StringUtil.arrayToString(seeds));
+
+ StringBuilder seedsString = new StringBuilder();
+ for (int i = 0; i < seeds.length; i++) {
+ if (i > 0) {
+ seedsString.append(",");
+ }
+
+ seedsString.append(seeds[i]);
+ }
+ params.put("seeds", seedsString.toString());
}
public Integer getNativeTransportPort() {
@@ -132,4 +141,24 @@ public class ConfigEditor {
config.put("storage_port", port);
}
+ public static void copyFile(File inFile, File outFile) throws FileNotFoundException, IOException {
+ BufferedInputStream is = new BufferedInputStream(new FileInputStream(inFile));
+ BufferedOutputStream os = new BufferedOutputStream(new FileOutputStream(outFile));
+
+ int bufferSize = 32768;
+ try {
+ is = new BufferedInputStream(is, bufferSize);
+ byte[] buffer = new byte[bufferSize];
+ for (int bytesRead = is.read(buffer); bytesRead != -1; bytesRead = is.read(buffer)) {
+ os.write(buffer, 0, bytesRead);
+ }
+ os.flush();
+ } catch (IOException ioe) {
+ throw new RuntimeException("Stream data cannot be copied", ioe);
+ } finally {
+ os.close();
+ is.close();
+ }
+ }
+
}
diff --git a/modules/common/cassandra-util/src/test/java/org/rhq/cassandra/util/ConfigEditorTest.java b/modules/common/cassandra-util/src/test/java/org/rhq/cassandra/util/ConfigEditorTest.java
index 9c3cc16..d101fc2 100644
--- a/modules/common/cassandra-util/src/test/java/org/rhq/cassandra/util/ConfigEditorTest.java
+++ b/modules/common/cassandra-util/src/test/java/org/rhq/cassandra/util/ConfigEditorTest.java
@@ -3,10 +3,14 @@ package org.rhq.cassandra.util;
import static java.util.Arrays.asList;
import static org.testng.Assert.assertEquals;
+import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileInputStream;
+import java.io.FileNotFoundException;
import java.io.FileOutputStream;
+import java.io.IOException;
import java.io.InputStream;
+import java.io.OutputStream;
import java.lang.reflect.Method;
import org.apache.cassandra.config.Config;
@@ -17,9 +21,6 @@ import org.yaml.snakeyaml.Loader;
import org.yaml.snakeyaml.TypeDescription;
import org.yaml.snakeyaml.Yaml;
-import org.rhq.core.util.file.FileUtil;
-import org.rhq.core.util.stream.StreamUtil;
-
/**
* @author John Sanda
*/
@@ -33,14 +34,14 @@ public class ConfigEditorTest {
public void initTestDir(Method test) throws Exception {
File dir = new File(getClass().getResource(".").toURI());
basedir = new File(dir, getClass().getSimpleName() + "/" + test.getName());
- FileUtil.purge(basedir, true);
+ purge(basedir, true);
basedir.mkdirs();
configFile = new File(basedir, "cassandra.yaml");
InputStream inputStream = getClass().getResourceAsStream("/cassandra.yaml");
FileOutputStream outputStream = new FileOutputStream(configFile);
- StreamUtil.copy(inputStream, outputStream);
+ copyStreams(inputStream, outputStream);
}
@Test
@@ -119,4 +120,40 @@ public class ConfigEditorTest {
return (Config) yaml.load(inputStream);
}
+ private static void purge(File dir, boolean deleteIt) {
+ if (dir != null) {
+ if (dir.isDirectory()) {
+ File[] doomedFiles = dir.listFiles();
+ if (doomedFiles != null) {
+ for (File doomedFile : doomedFiles) {
+ purge(doomedFile, true); // call this method recursively
+ }
+ }
+ }
+
+ if (deleteIt) {
+ dir.delete();
+ }
+ }
+
+ return;
+ }
+
+ public static void copyStreams(InputStream is, OutputStream os) throws FileNotFoundException, IOException {
+ int bufferSize = 32768;
+ try {
+ is = new BufferedInputStream(is, bufferSize);
+ byte[] buffer = new byte[bufferSize];
+ for (int bytesRead = is.read(buffer); bytesRead != -1; bytesRead = is.read(buffer)) {
+ os.write(buffer, 0, bytesRead);
+ }
+ os.flush();
+ } catch (IOException ioe) {
+ throw new RuntimeException("Stream data cannot be copied", ioe);
+ } finally {
+ os.close();
+ is.close();
+ }
+ }
+
}
diff --git a/modules/common/drift/pom.xml b/modules/common/drift/pom.xml
index 9500e8d..b1347ab 100644
--- a/modules/common/drift/pom.xml
+++ b/modules/common/drift/pom.xml
@@ -16,15 +16,23 @@
<dependencies>
<dependency>
- <groupId>${project.groupId}</groupId>
- <artifactId>test-utils</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>test-utils</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
</dependency>
+
<dependency>
- <groupId>commons-io</groupId>
- <artifactId>commons-io</artifactId>
- </dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>rhq-core-domain</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ </dependency>
</dependencies>
<profiles>
diff --git a/modules/common/filetemplate-bundle/pom.xml b/modules/common/filetemplate-bundle/pom.xml
index b6c587e..8e60b26 100644
--- a/modules/common/filetemplate-bundle/pom.xml
+++ b/modules/common/filetemplate-bundle/pom.xml
@@ -15,6 +15,12 @@
<description>A library with the code common to the agent and server plugins for File Template Bundles</description>
<dependencies>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>rhq-core-domain</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
<dependency>
<groupId>gnu-getopt</groupId>
diff --git a/modules/common/jboss-as/pom.xml b/modules/common/jboss-as/pom.xml
index a5d3255..16dd176 100644
--- a/modules/common/jboss-as/pom.xml
+++ b/modules/common/jboss-as/pom.xml
@@ -16,6 +16,13 @@
<dependencies>
<dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>rhq-core-domain</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
<groupId>ant</groupId>
<artifactId>ant</artifactId>
<version>1.6.5</version>
diff --git a/modules/common/pom.xml b/modules/common/pom.xml
index 45ba5c6..f957b0e 100644
--- a/modules/common/pom.xml
+++ b/modules/common/pom.xml
@@ -15,12 +15,6 @@
<description>parent POM for all RHQ common plugin libraries</description>
<dependencies>
- <dependency>
- <groupId>${project.groupId}</groupId>
- <artifactId>rhq-core-domain</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope> <!-- by PC -->
- </dependency>
</dependencies>
diff --git a/modules/core/dbutils/pom.xml b/modules/core/dbutils/pom.xml
index 1c66dd6..70783f4 100644
--- a/modules/core/dbutils/pom.xml
+++ b/modules/core/dbutils/pom.xml
@@ -204,11 +204,6 @@
<version>${project.version}</version>
</dependency>
<dependency>
- <groupId>${project.groupId}</groupId>
- <artifactId>rhq-core-domain</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
<groupId>org.codehaus.groovy</groupId>
<artifactId>groovy-all</artifactId>
<version>2.1.3</version>
diff --git a/modules/enterprise/server/installer/src/main/java/org/rhq/enterprise/server/installer/InstallerServiceImpl.java b/modules/enterprise/server/installer/src/main/java/org/rhq/enterprise/server/installer/InstallerServiceImpl.java
index 5c8002a..56e2df0 100644
--- a/modules/enterprise/server/installer/src/main/java/org/rhq/enterprise/server/installer/InstallerServiceImpl.java
+++ b/modules/enterprise/server/installer/src/main/java/org/rhq/enterprise/server/installer/InstallerServiceImpl.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.regex.Matcher;
@@ -39,6 +40,7 @@ import org.rhq.common.jbossas.client.controller.DatasourceJBossASClient;
import org.rhq.common.jbossas.client.controller.DeploymentJBossASClient;
import org.rhq.common.jbossas.client.controller.WebJBossASClient;
import org.rhq.core.db.DatabaseTypeFactory;
+import org.rhq.core.domain.cloud.StorageNode;
import org.rhq.core.util.PropertiesFileUpdate;
import org.rhq.core.util.exception.ThrowableUtil;
import org.rhq.enterprise.server.installer.ServerInstallUtil.ExistingSchemaOption;
@@ -501,7 +503,7 @@ public class InstallerServiceImpl implements InstallerService {
ServerInstallUtil.storeServerDetails(serverProperties, clearTextDbPassword, serverDetails);
ServerInstallUtil.persistStorageNodesIfNecessary(serverProperties, clearTextDbPassword,
- storageNodeSchemaManager.getStorageNodes());
+ parseNodeInformation(serverProperties));
}
@Override
@@ -1154,12 +1156,31 @@ public class InstallerServiceImpl implements InstallerService {
}
}
+ private List<StorageNode> parseNodeInformation(HashMap<String, String> serverProps) {
+ String[] nodes = serverProps.get("rhq.cassandra.seeds").split(",");
+
+ List<StorageNode> parsedNodes = new ArrayList<StorageNode>();
+ for (String node : nodes) {
+ StorageNode storageNode = new StorageNode();
+ storageNode.parseNodeInformation(node);
+ parsedNodes.add(storageNode);
+ }
+
+ return parsedNodes;
+ }
+
private SchemaManager createStorageNodeSchemaManager(HashMap<String, String> serverProps) {
- String[] hosts = serverProps.get("rhq.cassandra.seeds").split(",");
String username = serverProps.get("rhq.cassandra.username");
String password = serverProps.get("rhq.cassandra.password");
- return new SchemaManager(username, password, hosts);
+ List<StorageNode> storageNodes = this.parseNodeInformation(serverProps);
+ String[] nodes = new String[storageNodes.size()];
+ for (int index = 0; index < storageNodes.size(); index++) {
+ nodes[index] = storageNodes.get(index).getAddress();
+ }
+ int cqlPort = storageNodes.get(0).getCqlPort();
+
+ return new SchemaManager(username, password, nodes, cqlPort);
}
private void writeInstalledFileMarker() throws Exception {
diff --git a/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/storage/StorageClientManagerBean.java b/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/storage/StorageClientManagerBean.java
index dbd599a..799abcc 100644
--- a/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/storage/StorageClientManagerBean.java
+++ b/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/storage/StorageClientManagerBean.java
@@ -121,7 +121,13 @@ public class StorageClientManagerBean {
* @param storageNodes storage nodes
*/
private void checkSchemaCompability(String username, String password, List<StorageNode> storageNodes) {
- SchemaManager schemaManager = new SchemaManager(username, password, storageNodes);
+ String[] nodes = new String[storageNodes.size()];
+ for (int index = 0; index < storageNodes.size(); index++) {
+ nodes[index] = storageNodes.get(index).getAddress();
+ }
+ int cqlPort = storageNodes.get(0).getCqlPort();
+
+ SchemaManager schemaManager = new SchemaManager(username, password, nodes, cqlPort);
try {
schemaManager.checkCompatibility();
} catch (Exception e) {
diff --git a/modules/helpers/metrics-simulator/src/main/java/org/rhq/metrics/simulator/Simulator.java b/modules/helpers/metrics-simulator/src/main/java/org/rhq/metrics/simulator/Simulator.java
index 40e00bf..a5a3994 100644
--- a/modules/helpers/metrics-simulator/src/main/java/org/rhq/metrics/simulator/Simulator.java
+++ b/modules/helpers/metrics-simulator/src/main/java/org/rhq/metrics/simulator/Simulator.java
@@ -28,7 +28,6 @@ package org.rhq.metrics.simulator;
import java.io.File;
import java.io.IOException;
import java.util.HashSet;
-import java.util.List;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.concurrent.Executors;
@@ -52,7 +51,6 @@ import org.rhq.cassandra.DeploymentOptions;
import org.rhq.cassandra.DeploymentOptionsFactory;
import org.rhq.cassandra.schema.SchemaManager;
import org.rhq.cassandra.util.ClusterBuilder;
-import org.rhq.core.domain.cloud.StorageNode;
import org.rhq.metrics.simulator.plan.ClusterConfig;
import org.rhq.metrics.simulator.plan.ScheduleGroup;
import org.rhq.metrics.simulator.plan.SimulationPlan;
@@ -84,17 +82,16 @@ public class Simulator implements ShutdownManager {
}
});
- List<StorageNode> nodes = initCluster(plan);
-
- createSchema(nodes);
+ initCluster(plan);
+ createSchema();
Session session;
if (plan.getClientCompression() == null) {
- session = createSession(nodes);
+ session = createSession();
} else {
ProtocolOptions.Compression compression = Enum.valueOf(ProtocolOptions.Compression.class,
plan.getClientCompression().toUpperCase());
- session = createSession(nodes, compression);
+ session = createSession(compression);
}
StorageSession storageSession = new StorageSession(session);
@@ -172,17 +169,16 @@ public class Simulator implements ShutdownManager {
log.info("Shut down complete");
}
- private List<StorageNode> initCluster(SimulationPlan plan) {
+ private void initCluster(SimulationPlan plan) {
try {
- List<StorageNode> nodes = deployCluster(plan.getClusterConfig());
- waitForClusterToInitialize(nodes);
- return nodes;
+ deployCluster(plan.getClusterConfig());
+ waitForClusterToInitialize();
} catch (Exception e) {
throw new RuntimeException("Failed to start simulator. Cluster initialization failed.", e);
}
}
- private List<StorageNode> deployCluster(ClusterConfig clusterConfig) throws IOException {
+ private void deployCluster(ClusterConfig clusterConfig) throws IOException {
File clusterDir = new File(clusterConfig.getClusterDir(), "cassandra");
log.info("Deploying cluster to " + clusterDir);
clusterDir.mkdirs();
@@ -200,10 +196,8 @@ public class Simulator implements ShutdownManager {
deploymentOptions.load();
ccm = new CassandraClusterManager(deploymentOptions);
- List<StorageNode> nodes = ccm.createCluster();
+ ccm.createCluster();
ccm.startCluster(false);
-
- return nodes;
}
private void shutdownCluster() {
@@ -211,26 +205,25 @@ public class Simulator implements ShutdownManager {
ccm.shutdownCluster();
}
- private void waitForClusterToInitialize(List<StorageNode> nodes) {
+ private void waitForClusterToInitialize() {
log.info("Waiting for cluster to initialize");
ClusterInitService clusterInitService = new ClusterInitService();
- clusterInitService.waitForClusterToStart(nodes, nodes.size(), 1500, 20, 2);
+ clusterInitService.waitForClusterToStart(ccm.getNodes(), ccm.getJmxPorts(), ccm.getNodes().length, 20, 2, 1500);
}
- private void createSchema(List<StorageNode> nodes) {
+ private void createSchema() {
try {
log.info("Creating schema");
- SchemaManager schemaManager = new SchemaManager("rhqadmin", "rhqadmin", nodes);
+ SchemaManager schemaManager = new SchemaManager("rhqadmin", "rhqadmin", ccm.getNodes(), ccm.getCqlPort());
schemaManager.install();
} catch (Exception e) {
throw new RuntimeException("Failed to start simulator. An error occurred during schema creation.", e);
}
}
- private Session createSession(List<StorageNode> nodes) throws NoHostAvailableException {
+ private Session createSession() throws NoHostAvailableException {
try {
- Cluster cluster = new ClusterBuilder()
- .addContactPoints(getHostNames(nodes))
+ Cluster cluster = new ClusterBuilder().addContactPoints(ccm.getNodes()).withPort(ccm.getCqlPort())
.withCredentials("rhqadmin", "rhqadmin")
.build();
@@ -244,13 +237,12 @@ public class Simulator implements ShutdownManager {
}
}
- private Session createSession(List<StorageNode> nodes, ProtocolOptions.Compression compression)
+ private Session createSession(ProtocolOptions.Compression compression)
throws NoHostAvailableException {
try {
log.debug("Creating session using " + compression.name() + " compression");
- Cluster cluster = new ClusterBuilder()
- .addContactPoints(getHostNames(nodes))
+ Cluster cluster = new ClusterBuilder().addContactPoints(ccm.getNodes()).withPort(ccm.getCqlPort())
.withCredentials("cassandra", "cassandra")
.withCompression(compression)
.build();
@@ -264,6 +256,7 @@ public class Simulator implements ShutdownManager {
}
}
+ @SuppressWarnings("deprecation")
private Session initSession(Cluster cluster) {
NodeFailureListener listener = new NodeFailureListener();
for (Host host : cluster.getMetadata().getAllHosts()) {
@@ -273,14 +266,6 @@ public class Simulator implements ShutdownManager {
return cluster.connect("rhq");
}
- private String[] getHostNames(List<StorageNode> nodes) {
- String[] hostnames = new String[nodes.size()];
- for (int i = 0; i < hostnames.length; ++i) {
- hostnames[i] = nodes.get(i).getAddress();
- }
- return hostnames;
- }
-
private Set<Schedule> initSchedules(ScheduleGroup scheduleSet) {
long nextCollection = System.currentTimeMillis();
Set<Schedule> schedules = new HashSet<Schedule>();
diff --git a/modules/plugins/rhq-storage/src/test/java/org/rhq/plugins/storage/StorageNodeComponentITest.java b/modules/plugins/rhq-storage/src/test/java/org/rhq/plugins/storage/StorageNodeComponentITest.java
index 63517e1..50f8156 100644
--- a/modules/plugins/rhq-storage/src/test/java/org/rhq/plugins/storage/StorageNodeComponentITest.java
+++ b/modules/plugins/rhq-storage/src/test/java/org/rhq/plugins/storage/StorageNodeComponentITest.java
@@ -150,9 +150,11 @@ public class StorageNodeComponentITest {
storageNode.parseNodeInformation("127.0.0.1|7399|9142");
ClusterInitService clusterInitService = new ClusterInitService();
- clusterInitService.waitForClusterToStart(asList(storageNode));
+ clusterInitService.waitForClusterToStart(new String[] { storageNode.getAddress() },
+ new int[] { storageNode.getJmxPort() });
- SchemaManager schemaManager = new SchemaManager("rhqadmin", "rhqadmin", "127.0.0.1|7399|9142");
+ SchemaManager schemaManager = new SchemaManager("rhqadmin", "rhqadmin",
+ new String[] { storageNode.getAddress() }, storageNode.getCqlPort());
schemaManager.install();
schemaManager.updateTopology();
}