modules/common/cassandra-ccm/cassandra-ccm-arquillian/src/main/java/org/rhq/cassandra/ccm/arquillian/CCMSuiteDeploymentExtension.java | 2 modules/common/cassandra-ccm/cassandra-ccm-maven-plugin/src/main/java/org/rhq/cassandra/ccm/maven/DeployMojo.java | 2 modules/common/cassandra-ccm/cassandra-ccm-testng/src/main/java/org/rhq/cassandra/CCMTestNGListener.java | 2 modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/SchemaManager.java | 6 modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/TopologyManager.java | 34 +- modules/common/cassandra-schema/src/main/resources/topology/0001.xml | 5 modules/common/cassandra-schema/src/main/resources/topology/0002.xml | 26 - modules/common/cassandra-schema/src/main/resources/topology/create/0001.xml | 5 modules/common/cassandra-schema/src/main/resources/topology/create/0002.xml | 26 + modules/common/cassandra-schema/src/main/resources/topology/update/0001.xml | 9 modules/common/cassandra-schema/src/main/resources/topology/update/0002.xml | 26 + modules/enterprise/server/installer/src/main/java/org/rhq/enterprise/server/installer/InstallerServiceImpl.java | 2 modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/cloud/StorageNodeManagerBean.java | 20 - modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/cloud/StorageNodeManagerLocal.java | 10 modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/scheduler/jobs/StorageNodeMaintenanceJob.java | 164 +++++----- modules/plugins/cassandra/src/main/java/org/rhq/plugins/cassandra/CassandraNodeComponent.java | 2 16 files changed, 202 insertions(+), 139 deletions(-)
New commits: commit 95a9f222e3d047fe335fee7c2d305c7ffe18518b Author: John Sanda jsanda@redhat.com Date: Wed Jul 3 14:54:03 2013 -0400
big refactoring of the work that is done when a new storage node is committed to inventory
Here is a run down of the changes.
* fix endpoint comparisons There was a bug in StorageModeMaintenanceJob.waitForClustering where it was comparing the string form of an ip address against a PropertySimple.toString. It should be comparing the value of the property.
* schedule the addNodeMaintenance op as a group operation StorageNodeMaintenanceJob now schedules the operations for each storage node as a group operation instead of as individual operations. This allows us to remove a lot of code around waiting for operations to complete before scheduling the next one. More importantly, the previous implementation was not blocking until each operation completed which resulted in repair operations running across multiple nodes simultenously. We definitely want to run repair on the nodes serially. Scheduling the work as a group operation handles that for us.
Scheduling the work as a group operation required a slight change to StorageNodeManagerBean.linkResource. The resource has to be added to the group before the quartz job is scheduled. Logic needs to be added to verify that the node has actually joined to cluster before adding it to the group.
* add logic to detect when repair needs to run There is logic in place now to determine whether or not repair needs to run. Previously, we would run repair against each node whenever StorageNodeMaintenanceJob would run. We only want to run repair if and when we have to since it is a very resource intensive operation.
* Add logic back to update replication_factor of system_auth keyspace I had previously changed the logic in TopologyManager to *not* update the RF of the system_auth keyspace. For an multi-node installation, we would increase the RF of system_auth because the change was made after the rhqadmin user was created. Without running repair this results in inconsistent reads which in turns leads to failed authentication. When StorageNodeMaintenanceJob runs we do want to update the RF of both the system_auth and rhq keysapces. I have refactored TopologyManager so that system_auth gets updated.
diff --git a/modules/common/cassandra-ccm/cassandra-ccm-arquillian/src/main/java/org/rhq/cassandra/ccm/arquillian/CCMSuiteDeploymentExtension.java b/modules/common/cassandra-ccm/cassandra-ccm-arquillian/src/main/java/org/rhq/cassandra/ccm/arquillian/CCMSuiteDeploymentExtension.java index d307f0b..a86c49e 100644 --- a/modules/common/cassandra-ccm/cassandra-ccm-arquillian/src/main/java/org/rhq/cassandra/ccm/arquillian/CCMSuiteDeploymentExtension.java +++ b/modules/common/cassandra-ccm/cassandra-ccm-arquillian/src/main/java/org/rhq/cassandra/ccm/arquillian/CCMSuiteDeploymentExtension.java @@ -159,7 +159,7 @@ public class CCMSuiteDeploymentExtension implements LoadableExtension { try { schemaManager.install(); clusterInitService.waitForSchemaAgreement(nodes); - schemaManager.updateTopology(); + schemaManager.updateTopology(true); } catch (Exception e) { if (null != ccm) { ccm.shutdownCluster(); diff --git a/modules/common/cassandra-ccm/cassandra-ccm-maven-plugin/src/main/java/org/rhq/cassandra/ccm/maven/DeployMojo.java b/modules/common/cassandra-ccm/cassandra-ccm-maven-plugin/src/main/java/org/rhq/cassandra/ccm/maven/DeployMojo.java index f50535c..b84018f 100644 --- a/modules/common/cassandra-ccm/cassandra-ccm-maven-plugin/src/main/java/org/rhq/cassandra/ccm/maven/DeployMojo.java +++ b/modules/common/cassandra-ccm/cassandra-ccm-maven-plugin/src/main/java/org/rhq/cassandra/ccm/maven/DeployMojo.java @@ -74,7 +74,7 @@ public class DeployMojo extends AbstractMojo {
try { schemaManager.install(); - schemaManager.updateTopology(); + schemaManager.updateTopology(true); } catch (Exception e) { throw new MojoExecutionException("Schema installation failed.", e); } diff --git a/modules/common/cassandra-ccm/cassandra-ccm-testng/src/main/java/org/rhq/cassandra/CCMTestNGListener.java b/modules/common/cassandra-ccm/cassandra-ccm-testng/src/main/java/org/rhq/cassandra/CCMTestNGListener.java index a9292f7..38d5337 100644 --- a/modules/common/cassandra-ccm/cassandra-ccm-testng/src/main/java/org/rhq/cassandra/CCMTestNGListener.java +++ b/modules/common/cassandra-ccm/cassandra-ccm-testng/src/main/java/org/rhq/cassandra/CCMTestNGListener.java @@ -129,7 +129,7 @@ public class CCMTestNGListener implements IInvokedMethodListener { if (annotation.waitForSchemaAgreement()) { clusterInitService.waitForSchemaAgreement(nodes); } - schemaManager.updateTopology(); + schemaManager.updateTopology(true); }
private void shutdownCluster() throws Exception { diff --git a/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/SchemaManager.java b/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/SchemaManager.java index 8f8c47e..8f67ab3 100644 --- a/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/SchemaManager.java +++ b/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/SchemaManager.java @@ -93,9 +93,9 @@ public class SchemaManager { version.drop(); }
- public boolean updateTopology() throws Exception { + public boolean updateTopology(boolean isNewSchema) throws Exception { TopologyManager topology = new TopologyManager(username, password, nodes); - return topology.updateTopology(); + return topology.updateTopology(isNewSchema); }
private static List<StorageNode> parseNodeInformation(String... nodes) { @@ -139,7 +139,7 @@ public class SchemaManager { } else if ("drop".equalsIgnoreCase(command)) { schemaManager.drop(); } else if ("topology".equalsIgnoreCase(command)) { - schemaManager.updateTopology(); + schemaManager.updateTopology(true); } else { throw new IllegalArgumentException(command + " not available."); } diff --git a/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/TopologyManager.java b/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/TopologyManager.java index 850c383..fd987a1 100644 --- a/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/TopologyManager.java +++ b/modules/common/cassandra-schema/src/main/java/org/rhq/cassandra/schema/TopologyManager.java @@ -55,8 +55,12 @@ public class TopologyManager extends AbstractManager { this.file = file; }
- protected String getFile() { - return TOPOLOGY_BASE_FOLDER + "/" + this.file; + protected String getFile(boolean isNewSchema) { + if (isNewSchema) { + return TOPOLOGY_BASE_FOLDER + "/create/" + this.file; + } + + return TOPOLOGY_BASE_FOLDER + "/update/" + this.file; } }
@@ -64,14 +68,14 @@ public class TopologyManager extends AbstractManager { super(username, password, nodes); }
- public boolean updateTopology() throws Exception { + public boolean updateTopology(boolean isNewSchema) throws Exception { boolean result = false;
initCluster(); if (schemaExists()) { log.info("Applying topology updates..."); - result = this.updateReplicationFactor(nodes.size()); - this.updateGCGrace(nodes.size()); + result = this.updateReplicationFactor(isNewSchema, nodes.size()); + this.updateGCGrace(isNewSchema, nodes.size()); } else { log.info("Topology updates cannot be applied because the schema is not installed."); } @@ -80,7 +84,7 @@ public class TopologyManager extends AbstractManager { return result; }
- private boolean updateReplicationFactor(int numberOfNodes) throws Exception { + private boolean updateReplicationFactor(boolean isNewSchema, int numberOfNodes) throws Exception { log.info("Starting to execute " + Task.UpdateReplicationFactor + " task.");
int replicationFactor = 1; @@ -97,19 +101,19 @@ public class TopologyManager extends AbstractManager { return false; }
- log.info("Applying file " + Task.UpdateReplicationFactor.getFile() + " for " + Task.UpdateReplicationFactor - + " task."); - for (String query : this.getSteps(Task.UpdateReplicationFactor.getFile())) { + log.info("Applying file " + Task.UpdateReplicationFactor.getFile(isNewSchema) + " for " + + Task.UpdateReplicationFactor + " task."); + for (String query : this.getSteps(Task.UpdateReplicationFactor.getFile(isNewSchema))) { executedPreparedStatement(query, replicationFactor); } - log.info("File " + Task.UpdateReplicationFactor.getFile() + " applied for " + Task.UpdateReplicationFactor - + " task."); + log.info("File " + Task.UpdateReplicationFactor.getFile(isNewSchema) + " applied for " + + Task.UpdateReplicationFactor + " task.");
log.info("Successfully executed " + Task.UpdateReplicationFactor + " task."); return true; }
- private boolean updateGCGrace(int numberOfNodes) throws Exception { + private boolean updateGCGrace(boolean isNewSchema, int numberOfNodes) throws Exception { log.info("Starting to execute " + Task.UpdateGCGrace + " task.");
int gcGraceSeconds = 864000; @@ -120,11 +124,11 @@ public class TopologyManager extends AbstractManager { }
- log.info("Applying file " + Task.UpdateGCGrace.getFile() + " for " + Task.UpdateGCGrace + " task."); - for (String query : this.getSteps(Task.UpdateGCGrace.getFile())) { + log.info("Applying file " + Task.UpdateGCGrace.getFile(isNewSchema) + " for " + Task.UpdateGCGrace + " task."); + for (String query : this.getSteps(Task.UpdateGCGrace.getFile(isNewSchema))) { executedPreparedStatement(query, gcGraceSeconds); } - log.info("File " + Task.UpdateGCGrace.getFile() + " applied for " + Task.UpdateGCGrace + " task."); + log.info("File " + Task.UpdateGCGrace.getFile(isNewSchema) + " applied for " + Task.UpdateGCGrace + " task.");
log.info("Successfully executed " + Task.UpdateGCGrace + " task."); return true; diff --git a/modules/common/cassandra-schema/src/main/resources/topology/0001.xml b/modules/common/cassandra-schema/src/main/resources/topology/0001.xml deleted file mode 100644 index 5cbd7eb..0000000 --- a/modules/common/cassandra-schema/src/main/resources/topology/0001.xml +++ /dev/null @@ -1,5 +0,0 @@ -<updatePlan> - <step> - ALTER KEYSPACE rhq WITH replication = {'class': 'SimpleStrategy', 'replication_factor': %s}; - </step> -</updatePlan> \ No newline at end of file diff --git a/modules/common/cassandra-schema/src/main/resources/topology/0002.xml b/modules/common/cassandra-schema/src/main/resources/topology/0002.xml deleted file mode 100644 index d631030..0000000 --- a/modules/common/cassandra-schema/src/main/resources/topology/0002.xml +++ /dev/null @@ -1,26 +0,0 @@ -<updatePlan> - <step> - ALTER COLUMNFAMILY rhq.metrics_index WITH gc_grace_seconds = %s; - </step> - - <step> - ALTER COLUMNFAMILY rhq.raw_metrics WITH gc_grace_seconds = %s; - </step> - - <step> - ALTER COLUMNFAMILY rhq.one_hour_metrics WITH gc_grace_seconds = %s; - </step> - - <step> - ALTER COLUMNFAMILY rhq.six_hour_metrics WITH gc_grace_seconds = %s; - </step> - - <step> - ALTER COLUMNFAMILY rhq.twenty_four_hour_metrics WITH gc_grace_seconds = %s; - </step> - - <step> - ALTER COLUMNFAMILY rhq.schema_version WITH gc_grace_seconds = %s; - </step> - -</updatePlan> \ No newline at end of file diff --git a/modules/common/cassandra-schema/src/main/resources/topology/create/0001.xml b/modules/common/cassandra-schema/src/main/resources/topology/create/0001.xml new file mode 100644 index 0000000..5cbd7eb --- /dev/null +++ b/modules/common/cassandra-schema/src/main/resources/topology/create/0001.xml @@ -0,0 +1,5 @@ +<updatePlan> + <step> + ALTER KEYSPACE rhq WITH replication = {'class': 'SimpleStrategy', 'replication_factor': %s}; + </step> +</updatePlan> \ No newline at end of file diff --git a/modules/common/cassandra-schema/src/main/resources/topology/create/0002.xml b/modules/common/cassandra-schema/src/main/resources/topology/create/0002.xml new file mode 100644 index 0000000..d631030 --- /dev/null +++ b/modules/common/cassandra-schema/src/main/resources/topology/create/0002.xml @@ -0,0 +1,26 @@ +<updatePlan> + <step> + ALTER COLUMNFAMILY rhq.metrics_index WITH gc_grace_seconds = %s; + </step> + + <step> + ALTER COLUMNFAMILY rhq.raw_metrics WITH gc_grace_seconds = %s; + </step> + + <step> + ALTER COLUMNFAMILY rhq.one_hour_metrics WITH gc_grace_seconds = %s; + </step> + + <step> + ALTER COLUMNFAMILY rhq.six_hour_metrics WITH gc_grace_seconds = %s; + </step> + + <step> + ALTER COLUMNFAMILY rhq.twenty_four_hour_metrics WITH gc_grace_seconds = %s; + </step> + + <step> + ALTER COLUMNFAMILY rhq.schema_version WITH gc_grace_seconds = %s; + </step> + +</updatePlan> \ No newline at end of file diff --git a/modules/common/cassandra-schema/src/main/resources/topology/update/0001.xml b/modules/common/cassandra-schema/src/main/resources/topology/update/0001.xml new file mode 100644 index 0000000..f2c0e57 --- /dev/null +++ b/modules/common/cassandra-schema/src/main/resources/topology/update/0001.xml @@ -0,0 +1,9 @@ +<updatePlan> + <step> + ALTER KEYSPACE system_auth WITH replication = {'class': 'SimpleStrategy', 'replication_factor': %s}; + </step> + + <step> + ALTER KEYSPACE rhq WITH replication = {'class': 'SimpleStrategy', 'replication_factor': %s}; + </step> +</updatePlan> \ No newline at end of file diff --git a/modules/common/cassandra-schema/src/main/resources/topology/update/0002.xml b/modules/common/cassandra-schema/src/main/resources/topology/update/0002.xml new file mode 100644 index 0000000..d631030 --- /dev/null +++ b/modules/common/cassandra-schema/src/main/resources/topology/update/0002.xml @@ -0,0 +1,26 @@ +<updatePlan> + <step> + ALTER COLUMNFAMILY rhq.metrics_index WITH gc_grace_seconds = %s; + </step> + + <step> + ALTER COLUMNFAMILY rhq.raw_metrics WITH gc_grace_seconds = %s; + </step> + + <step> + ALTER COLUMNFAMILY rhq.one_hour_metrics WITH gc_grace_seconds = %s; + </step> + + <step> + ALTER COLUMNFAMILY rhq.six_hour_metrics WITH gc_grace_seconds = %s; + </step> + + <step> + ALTER COLUMNFAMILY rhq.twenty_four_hour_metrics WITH gc_grace_seconds = %s; + </step> + + <step> + ALTER COLUMNFAMILY rhq.schema_version WITH gc_grace_seconds = %s; + </step> + +</updatePlan> \ No newline at end of file diff --git a/modules/enterprise/server/installer/src/main/java/org/rhq/enterprise/server/installer/InstallerServiceImpl.java b/modules/enterprise/server/installer/src/main/java/org/rhq/enterprise/server/installer/InstallerServiceImpl.java index fbd869b..4c87f70 100644 --- a/modules/enterprise/server/installer/src/main/java/org/rhq/enterprise/server/installer/InstallerServiceImpl.java +++ b/modules/enterprise/server/installer/src/main/java/org/rhq/enterprise/server/installer/InstallerServiceImpl.java @@ -487,7 +487,7 @@ public class InstallerServiceImpl implements InstallerService { } log("Install RHQ schema along with updates to Cassandra."); storageNodeSchemaManager.install(); - storageNodeSchemaManager.updateTopology(); + storageNodeSchemaManager.updateTopology(true); } else { log("Ignoring Cassandra schema - installer will assume it exists and is already up-to-date."); } diff --git a/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/cloud/StorageNodeManagerBean.java b/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/cloud/StorageNodeManagerBean.java index 90d9497..874be1e 100644 --- a/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/cloud/StorageNodeManagerBean.java +++ b/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/cloud/StorageNodeManagerBean.java @@ -39,6 +39,7 @@ import javax.persistence.TypedQuery;
import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.quartz.JobDataMap; import org.quartz.SimpleTrigger; import org.quartz.Trigger;
@@ -200,7 +201,7 @@ public class StorageNodeManagerBean implements StorageNodeManagerLocal, StorageN this.updateStorageNodes(storageNodeMap);
if (clusterMaintenanceNeeded) { - this.scheduleQuartzJob(); + this.scheduleQuartzJob(existingStorageNodes.size()); }
return new ArrayList<StorageNode>(storageNodeMap.values()); @@ -214,6 +215,10 @@ public class StorageNodeManagerBean implements StorageNodeManagerLocal, StorageN String configAddress = resourceConfig.getSimpleValue(RHQ_STORAGE_ADDRESS_PROPERTY);
if (configAddress != null) { + // TODO Do not add the node to the group until we have verified it has joined the cluster + // StorageNodeMaintenanceJob currently determines if a new node has successfully joined the cluster. + addStorageNodeToGroup(resource); + boolean storageNodeFound = false; if (storageNodes != null) { for (StorageNode storageNode : storageNodes) { @@ -239,10 +244,8 @@ public class StorageNodeManagerBean implements StorageNodeManagerLocal, StorageN
entityManager.persist(storageNode);
- scheduleQuartzJob(); + scheduleQuartzJob(storageNodes.size()); } - - addStorageNodeToGroup(resource); } }
@@ -306,13 +309,14 @@ public class StorageNodeManagerBean implements StorageNodeManagerLocal, StorageN * @return The storage node resource group. * @throws IllegalStateException if the group is not found or does not exist. */ - private ResourceGroup getStorageNodeGroup() { + public ResourceGroup getStorageNodeGroup() { Subject overlord = subjectManager.getOverlord();
ResourceGroupCriteria criteria = new ResourceGroupCriteria(); criteria.addFilterResourceTypeName(STORAGE_NODE_RESOURCE_TYPE_NAME); criteria.addFilterPluginName(STORAGE_NODE_PLUGIN_NAME); criteria.addFilterName(STORAGE_NODE_GROUP_NAME); + criteria.fetchExplicitResources(true);
List<ResourceGroup> groups = resourceGroupManager.findResourceGroupsByCriteria(overlord, criteria);
@@ -472,7 +476,7 @@ public class StorageNodeManagerBean implements StorageNodeManagerLocal, StorageN return newNodes; }
- private void scheduleQuartzJob() { + private void scheduleQuartzJob(int clusterSize) { String jobName = StorageNodeMaintenanceJob.class.getName(); String jobGroupName = StorageNodeMaintenanceJob.class.getName(); String triggerName = StorageNodeMaintenanceJob.class.getName(); @@ -482,6 +486,10 @@ public class StorageNodeManagerBean implements StorageNodeManagerLocal, StorageN trigger.setJobName(jobName); trigger.setJobGroup(jobGroupName); try { + JobDataMap jobDataMap = new JobDataMap(); + jobDataMap.put(StorageNodeMaintenanceJob.JOB_DATA_PROPERTY_CLUSTER_SIZE, Integer.toString(clusterSize)); + trigger.setJobDataMap(jobDataMap); + quartzScheduler.scheduleJob(trigger); } catch (Throwable t) { log.warn("Unable to schedule storage node maintenance job", t); diff --git a/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/cloud/StorageNodeManagerLocal.java b/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/cloud/StorageNodeManagerLocal.java index 54646ec..52e2424 100644 --- a/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/cloud/StorageNodeManagerLocal.java +++ b/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/cloud/StorageNodeManagerLocal.java @@ -27,6 +27,7 @@ import org.rhq.core.domain.cloud.StorageNode; import org.rhq.core.domain.cloud.StorageNodeLoadComposite; import org.rhq.core.domain.criteria.StorageNodeCriteria; import org.rhq.core.domain.resource.Resource; +import org.rhq.core.domain.resource.group.ResourceGroup; import org.rhq.core.domain.util.PageList;
@Local @@ -86,4 +87,13 @@ public interface StorageNodeManagerLocal { */ void runReadRepair();
+ /** + * This method assumes the storage node resource group already exists; as such, it should only be called from places + * in the code that are after the point(s) where the group has been created. + * + * @return The storage node resource group. + * @throws IllegalStateException if the group is not found or does not exist. + */ + ResourceGroup getStorageNodeGroup(); + } diff --git a/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/scheduler/jobs/StorageNodeMaintenanceJob.java b/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/scheduler/jobs/StorageNodeMaintenanceJob.java index f492fa0..6b1940d 100644 --- a/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/scheduler/jobs/StorageNodeMaintenanceJob.java +++ b/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/scheduler/jobs/StorageNodeMaintenanceJob.java @@ -24,6 +24,7 @@ import java.util.List;
import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.quartz.JobDataMap; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException;
@@ -36,13 +37,11 @@ import org.rhq.core.domain.configuration.PropertyList; import org.rhq.core.domain.configuration.PropertyMap; import org.rhq.core.domain.configuration.PropertySimple; import org.rhq.core.domain.criteria.ResourceCriteria; -import org.rhq.core.domain.operation.OperationRequestStatus; -import org.rhq.core.domain.operation.ResourceOperationHistory; -import org.rhq.core.domain.operation.bean.ResourceOperationSchedule; +import org.rhq.core.domain.operation.bean.GroupOperationSchedule; import org.rhq.core.domain.resource.Resource; -import org.rhq.core.domain.util.PageControl; -import org.rhq.core.domain.util.PageList; +import org.rhq.core.domain.resource.group.ResourceGroup; import org.rhq.core.util.StringUtil; +import org.rhq.enterprise.server.auth.SubjectManagerLocal; import org.rhq.enterprise.server.cloud.StorageNodeManagerLocal; import org.rhq.enterprise.server.operation.OperationManagerLocal; import org.rhq.enterprise.server.util.LookupUtil; @@ -58,6 +57,10 @@ public class StorageNodeMaintenanceJob extends AbstractStatefulJob {
private final Log log = LogFactory.getLog(StorageNodeMaintenanceJob.class);
+ public static final String JOB_DATA_PROPERTY_CLUSTER_SIZE = "clusterSize"; + + public static final String JOB_DATA_PROPERTY_TOPOLOGY_CHANGED = "topologyChanged"; + private final static int MAX_ITERATIONS = 5; private final static int TIMEOUT = 10000; private final static String STORAGE_SERVICE = "Storage Service"; @@ -73,7 +76,10 @@ public class StorageNodeMaintenanceJob extends AbstractStatefulJob { private static final String PASSWORD_PROP = "rhq.cassandra.password";
@Override - public void executeJobCode(JobExecutionContext arg0) throws JobExecutionException { + public void executeJobCode(JobExecutionContext context) throws JobExecutionException { + JobDataMap jobDataMap = context.getMergedJobDataMap(); + int clusterSize = Integer.parseInt(jobDataMap.getString(JOB_DATA_PROPERTY_CLUSTER_SIZE)); + //1. Wait for resouces to be linked to node storage nodes waitForResouceLinks();
@@ -84,21 +90,50 @@ public class StorageNodeMaintenanceJob extends AbstractStatefulJob { //3. Wait for the all storage nodes to be part of the same cluster storageNodes = waitForClustering(storageNodes);
- //4. Update topology - boolean topologyUpdated = updateTopology(storageNodes); - - //5. Run repair operation on all the storage nodes if topology(replication factor was updated) - if (topologyUpdated) { - List<String> seedList = new ArrayList<String>(); - for (StorageNode storageNode : storageNodes) { - seedList.add(storageNode.getAddress()); + boolean isReadRepairNeeded; + + if (clusterSize >= 4) { + // At 4 nodes we increase the RF to 3. We are not increasing the RF beyond + // that for additional nodes; so, there is no need to run repair if we are + // expanding from a 4 node cluster since the RF remains the same. + isReadRepairNeeded = false; + } else if (clusterSize == 1) { + // The RF will increase since we are going from a single to a multi-node + // cluster; therefore, we want to run repair. + isReadRepairNeeded = true; + } else if (clusterSize == 2) { + if (storageNodes.size() > 3) { + // If we go from 2 to > 3 nodes we will increase the RF to 3; therefore + // we want to run repair. + isReadRepairNeeded = true; + } else { + // If we go from 2 to 3 nodes, we keep the RF at 2 so there is no need + // to run repair. + isReadRepairNeeded = false; } + } else if (clusterSize == 3) { + // We are increasing the cluster size > 3 which means the RF will be + // updated to 3; therefore, we want to run repair. + isReadRepairNeeded = true; + } else { + // If we cluster size of zero, then something is really screwed up. It + // should always be > 0. + log.error("The job data property [" + JOB_DATA_PROPERTY_CLUSTER_SIZE + "] should always be greater " + + "than zero. This may be a bug in the code that scheduled this job."); + isReadRepairNeeded = storageNodes.size() > 1; + }
- for (StorageNode storageNode : storageNodes) { - Resource resource = storageNode.getResource(); - runNodeMaintenance(resource, seedList); - } + if (isReadRepairNeeded) { + updateTopology(storageNodes); + } + + //5. run maintenance on each node + List<String> seedList = new ArrayList<String>(); + for (StorageNode storageNode : storageNodes) { + seedList.add(storageNode.getAddress()); } + + runNodeMaintenance(seedList, isReadRepairNeeded); }
private boolean updateTopology(List<StorageNode> storageNodes) throws JobExecutionException { @@ -106,9 +141,9 @@ public class StorageNodeMaintenanceJob extends AbstractStatefulJob { String password = getRequiredStorageProperty(PASSWORD_PROP); SchemaManager schemaManager = new SchemaManager(username, password, storageNodes); try{ - return schemaManager.updateTopology(); + return schemaManager.updateTopology(false); } catch (Exception e) { - log.error(e); + log.error("An error occurred while applying schema topology changes", e); }
return false; @@ -147,7 +182,7 @@ public class StorageNodeMaintenanceJob extends AbstractStatefulJob { List<Property> actualList = propertyList.getList(); for (Property property : actualList) { PropertyMap map = (PropertyMap) property; - endpoints.add(map.get(ENDPOINT_PROPERTY).toString()); + endpoints.add(map.getSimpleValue(ENDPOINT_PROPERTY, null)); } } catch (Exception e) { log.error("Error fetching live configuration for resource " + resource.getId()); @@ -157,7 +192,7 @@ public class StorageNodeMaintenanceJob extends AbstractStatefulJob { } } } catch (Exception e) { - log.error(e); + log.error("An exception occurred while waiting for nodes to cluster", e); }
Collections.sort(endpoints); @@ -184,67 +219,38 @@ public class StorageNodeMaintenanceJob extends AbstractStatefulJob { return storageNodes; }
- private void runNodeMaintenance(Resource resource, List<String> seedList) { + private void runNodeMaintenance(List<String> seedList, boolean runRepair) { OperationManagerLocal operationManager = LookupUtil.getOperationManager(); + StorageNodeManagerLocal storageNodeManager = LookupUtil.getStorageNodeManager(); + SubjectManagerLocal subjectManager = LookupUtil.getSubjectManager(); + + ResourceGroup storageNodeGroup = storageNodeManager.getStorageNodeGroup(); + + GroupOperationSchedule schedule = new GroupOperationSchedule(); + schedule.setGroup(storageNodeGroup); + schedule.setHaltOnFailure(false); + schedule.setExecutionOrder(new ArrayList<Resource>(storageNodeGroup.getExplicitResources())); + schedule.setJobTrigger(JobTrigger.createNowTrigger()); + schedule.setSubject(subjectManager.getOverlord()); + schedule.setOperationName(MAINTENANCE_OPERATION); + schedule.setDescription(MAINTENANCE_OPERATION_NOTE); + + List<Property> properties = new ArrayList<Property>(); + properties.add(new PropertySimple(RUN_REPAIR_PROPERTY, runRepair)); + properties.add(new PropertySimple(UPDATE_SEEDS_LIST, Boolean.TRUE)); + + PropertyList seedListProperty = new PropertyList(SEEDS_LIST); + for (String seed : seedList) { + seedListProperty.add(new PropertySimple("seed", seed)); + } + properties.add(seedListProperty);
- try { - ResourceOperationSchedule newSchedule = new ResourceOperationSchedule(); - newSchedule.setJobTrigger(JobTrigger.createNowTrigger()); - newSchedule.setResource(resource); - newSchedule.setOperationName(MAINTENANCE_OPERATION); - newSchedule.setDescription(MAINTENANCE_OPERATION_NOTE); - - List<Property> properties = new ArrayList<Property>(); - properties.add(new PropertySimple(RUN_REPAIR_PROPERTY, Boolean.TRUE)); - properties.add(new PropertySimple(UPDATE_SEEDS_LIST, Boolean.TRUE)); - - PropertyList seedListProperty = new PropertyList(SEEDS_LIST); - for (String seed : seedList) { - seedListProperty.add(new PropertySimple("seed", seed)); - } - properties.add(seedListProperty); - - Configuration config = new Configuration(); - config.setProperties(properties); - newSchedule.setParameters(config); - - long operationStartTime = System.currentTimeMillis(); - operationManager.scheduleResourceOperation(LookupUtil.getSubjectManager().getOverlord(), newSchedule); - - int iteration = 0; - boolean resultFound = false; - while (iteration < MAX_ITERATIONS && !resultFound) { - PageList<ResourceOperationHistory> results = operationManager.findCompletedResourceOperationHistories( - LookupUtil.getSubjectManager().getOverlord(), resource.getId(), operationStartTime, null, - PageControl.getUnlimitedInstance()); - - for (ResourceOperationHistory operationHistory : results) { - if (MAINTENANCE_OPERATION.equals(operationHistory.getOperationDefinition().getName())) { - if (OperationRequestStatus.SUCCESS.equals(operationHistory.getStatus())) { - Configuration operationResults = operationHistory.getResults(); - if ("true".equals(operationResults.getSimpleValue(SUCCEED_PROPERTY))) { - resultFound = true; - } - } - } - } - - if (resultFound) { - break; - } else { - try { - Thread.sleep(TIMEOUT); - } catch (Exception e) { - log.error(e); - } - } + Configuration config = new Configuration(); + config.setProperties(properties);
- iteration++; - } + schedule.setParameters(config);
- } catch (Exception e) { - log.error(e); - } + operationManager.scheduleGroupOperation(subjectManager.getOverlord(), schedule); }
private List<StorageNode> getOnlyResourceLinkedStorageNodes() {
commit a39e88e38e73d07f7ee7ac6374c78b566ea9c5b4 Author: John Sanda jsanda@redhat.com Date: Tue Jul 2 17:01:15 2013 -0400
use ProcessInfo.freshSnapshot during avail checks to avoid stale data
diff --git a/modules/plugins/cassandra/src/main/java/org/rhq/plugins/cassandra/CassandraNodeComponent.java b/modules/plugins/cassandra/src/main/java/org/rhq/plugins/cassandra/CassandraNodeComponent.java index 7d06cb2..93d758c 100644 --- a/modules/plugins/cassandra/src/main/java/org/rhq/plugins/cassandra/CassandraNodeComponent.java +++ b/modules/plugins/cassandra/src/main/java/org/rhq/plugins/cassandra/CassandraNodeComponent.java @@ -135,7 +135,7 @@ public class CassandraNodeComponent extends JMXServerComponent<ResourceComponent return UNKNOWN; } else { // It is safe to read prior snapshot as getNativeProcess always return a fresh instance - ProcessInfoSnapshot processInfoSnaphot = processInfo.priorSnaphot(); + ProcessInfoSnapshot processInfoSnaphot = processInfo.freshSnapshot(); if (processInfoSnaphot.isRunning()) { return UP; } else {
rhq-commits@lists.fedorahosted.org