[rhq] Branch 'feature/embeddedagent' - .classpath modules/enterprise
by mazz
.classpath | 8 ++--
modules/enterprise/server/embeddedagent/src/main/java/org/rhq/embeddedagent/extension/AgentService.java | 17 +++++++++-
modules/enterprise/server/embeddedagent/src/main/java/org/rhq/embeddedagent/extension/AgentSubsystemAdd.java | 6 +++
modules/enterprise/server/embeddedagent/src/main/java/org/rhq/embeddedagent/extension/AgentSubsystemDefinition.java | 8 ++++
modules/enterprise/server/embeddedagent/src/main/java/org/rhq/embeddedagent/extension/AgentSubsystemExtension.java | 6 ++-
modules/enterprise/server/embeddedagent/src/main/resources/module/main/module.xml | 3 +
modules/enterprise/server/embeddedagent/src/main/resources/org/rhq/embeddedagent/extension/LocalDescriptions.properties | 1
modules/enterprise/server/embeddedagent/src/main/resources/schema/embeddedagent.xsd | 1
modules/enterprise/server/embeddedagent/src/test/java/org/rhq/embeddedagent/extension/SubsystemParsingTestCase.java | 1
modules/enterprise/server/embeddedagent/src/test/resources/org/rhq/embeddedagent/extension/subsystem.xml | 2 +
10 files changed, 47 insertions(+), 6 deletions(-)
New commits:
commit de47fe216953166ed41e6efefaca243703c8282f
Author: John Mazzitelli <mazz(a)redhat.com>
Date: Mon Feb 3 13:43:15 2014 -0500
add ability to define agent binding addr/port in EAP config xml
diff --git a/.classpath b/.classpath
index 455f739..4916f67 100644
--- a/.classpath
+++ b/.classpath
@@ -334,9 +334,11 @@
<classpathentry exported="true" kind="var" path="M2_REPO/org/jboss/as/jboss-as-ejb3/7.2.0.Alpha1-redhat-4/jboss-as-ejb3-7.2.0.Alpha1-redhat-4.jar" sourcepath="/M2_REPO/org/jboss/as/jboss-as-ejb3/7.2.0.Alpha1-redhat-4/jboss-as-ejb3-7.2.0.Alpha1-redhat-4-sources.jar"/>
<classpathentry exported="true" kind="var" path="M2_REPO/org/jboss/staxmapper/1.1.0.Final/staxmapper-1.1.0.Final.jar"/>
<classpathentry exported="true" kind="var" path="M2_REPO/org/jboss/modules/jboss-modules/1.1.1.GA/jboss-modules-1.1.1.GA.jar"/>
- <classpathentry exported="true" kind="var" path="M2_REPO/org/jboss/as/jboss-as-subsystem-test/7.1.1.Final/jboss-as-subsystem-test-7.1.1.Final.jar"/>
- <classpathentry exported="true" kind="var" path="M2_REPO/org/jboss/as/jboss-as-controller/7.1.1.Final/jboss-as-controller-7.1.1.Final.jar" sourcepath="/M2_REPO/org/jboss/as/jboss-as-controller/7.2.0.Alpha1-redhat-4/jboss-as-controller-7.2.0.Alpha1-redhat-4-sources.jar"/>
- <classpathentry exported="true" kind="var" path="M2_REPO/org/jboss/as/jboss-as-controller-client/7.1.1.Final/jboss-as-controller-client-7.1.1.Final.jar"/>
+ <classpathentry exported="true" kind="var" path="M2_REPO/org/jboss/as/jboss-as-subsystem-test/7.2.0.Alpha1-redhat-4/jboss-as-subsystem-test-7.2.0.Alpha1-redhat-4.jar" sourcepath="/M2_REPO/org/jboss/as/jboss-as-subsystem-test/7.2.0.Alpha1-redhat-4/jboss-as-subsystem-test-7.2.0.Alpha1-redhat-4-sources.jar"/>
+ <classpathentry exported="true" kind="var" path="M2_REPO/org/jboss/as/jboss-as-controller/7.2.0.Alpha1-redhat-4/jboss-as-controller-7.2.0.Alpha1-redhat-4.jar" sourcepath="/M2_REPO/org/jboss/as/jboss-as-controller/7.2.0.Alpha1-redhat-4/jboss-as-controller-7.2.0.Alpha1-redhat-4-sources.jar"/>
+ <classpathentry exported="true" kind="var" path="M2_REPO/org/jboss/as/jboss-as-controller-client/7.2.0.Alpha1-redhat-4/jboss-as-controller-client-7.2.0.Alpha1-redhat-4.jar"/>
+ <classpathentry exported="true" kind="var" path="M2_REPO/org/jboss/as/jboss-as-model-test/7.2.0.Alpha1-redhat-4/jboss-as-model-test-7.2.0.Alpha1-redhat-4.jar"/>
+ <classpathentry exported="true" kind="var" path="M2_REPO/org/jboss/as/jboss-as-network/7.2.0.Alpha1-redhat-4/jboss-as-network-7.2.0.Alpha1-redhat-4.jar" sourcepath="/M2_REPO/org/jboss/as/jboss-as-network/7.2.0.Alpha1-redhat-4/jboss-as-network-7.2.0.Alpha1-redhat-4-sources.jar"/>
<classpathentry exported="true" kind="var" path="M2_REPO/org/jboss/resteasy/resteasy-links/2.3.5.Final/resteasy-links-2.3.5.Final.jar"/>
<classpathentry exported="true" kind="var" path="M2_REPO/org/jboss/resteasy/resteasy-jaxrs/2.3.5.Final/resteasy-jaxrs-2.3.5.Final.jar"/>
<classpathentry exported="true" kind="var" path="M2_REPO/org/jboss/resteasy/resteasy-jackson-provider/2.3.5.Final/resteasy-jackson-provider-2.3.5.Final.jar"/>
diff --git a/modules/enterprise/server/embeddedagent/src/main/java/org/rhq/embeddedagent/extension/AgentService.java b/modules/enterprise/server/embeddedagent/src/main/java/org/rhq/embeddedagent/extension/AgentService.java
index 018519d..651f557 100644
--- a/modules/enterprise/server/embeddedagent/src/main/java/org/rhq/embeddedagent/extension/AgentService.java
+++ b/modules/enterprise/server/embeddedagent/src/main/java/org/rhq/embeddedagent/extension/AgentService.java
@@ -6,6 +6,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
+import org.jboss.as.network.SocketBinding;
import org.jboss.as.server.ServerEnvironment;
import org.jboss.logging.Logger;
import org.jboss.modules.Module;
@@ -18,6 +19,7 @@ import org.jboss.msc.service.StopContext;
import org.jboss.msc.value.InjectedValue;
import org.rhq.enterprise.agent.AgentMain;
+import org.rhq.enterprise.communications.ServiceContainerConfigurationConstants;
public class AgentService implements Service<AgentService> {
@@ -31,7 +33,13 @@ public class AgentService implements Service<AgentService> {
* This service gives us information about the server, like the install directory, data directory, etc.
* Package-scoped so the add-step handler can access this.
*/
- InjectedValue<ServerEnvironment> envServiceValue = new InjectedValue<ServerEnvironment>();
+ final InjectedValue<ServerEnvironment> envServiceValue = new InjectedValue<ServerEnvironment>();
+
+ /**
+ * Our subsystem add-step handler will inject this as a dependency for us.
+ * This object will provide the binding address and port for the agent listener.
+ */
+ final InjectedValue<SocketBinding> agentListenerBinding = new InjectedValue<SocketBinding>();
/**
* This service can be configured to be told explicitly about certain plugins to be
@@ -131,10 +139,17 @@ public class AgentService implements Service<AgentService> {
log.info("Starting the embedded agent now");
try {
// make sure we pre-configure the agent with some settings taken from our runtime environment
+ SocketBinding agentListenerBindingValue = agentListenerBinding.getValue();
+ String agentBindAddress = agentListenerBindingValue.getAddress().getHostAddress();
+ String agentBindPort = String.valueOf(agentListenerBindingValue.getAbsolutePort());
+ configOverrides.put(ServiceContainerConfigurationConstants.CONNECTOR_BIND_ADDRESS, agentBindAddress);
+ configOverrides.put(ServiceContainerConfigurationConstants.CONNECTOR_BIND_PORT, agentBindPort);
+
ServerEnvironment env = envServiceValue.getValue();
boolean resetConfigurationAtStartup = true;
AgentConfigurationSetup configSetup = new AgentConfigurationSetup(
getExportedResource("conf/agent-configuration.xml"), resetConfigurationAtStartup, configOverrides, env);
+
// prepare the agent logging first thing so the agent logs messages using this config
configSetup.prepareLogConfigFile(getExportedResource("conf/log4j.xml"));
configSetup.preConfigureAgent();
diff --git a/modules/enterprise/server/embeddedagent/src/main/java/org/rhq/embeddedagent/extension/AgentSubsystemAdd.java b/modules/enterprise/server/embeddedagent/src/main/java/org/rhq/embeddedagent/extension/AgentSubsystemAdd.java
index c2cc801..937af70 100644
--- a/modules/enterprise/server/embeddedagent/src/main/java/org/rhq/embeddedagent/extension/AgentSubsystemAdd.java
+++ b/modules/enterprise/server/embeddedagent/src/main/java/org/rhq/embeddedagent/extension/AgentSubsystemAdd.java
@@ -9,6 +9,7 @@ import org.jboss.as.controller.AttributeDefinition;
import org.jboss.as.controller.OperationContext;
import org.jboss.as.controller.OperationFailedException;
import org.jboss.as.controller.ServiceVerificationHandler;
+import org.jboss.as.network.SocketBinding;
import org.jboss.as.server.ServerEnvironment;
import org.jboss.as.server.ServerEnvironmentService;
import org.jboss.dmr.ModelNode;
@@ -41,6 +42,7 @@ class AgentSubsystemAdd extends AbstractAddStepHandler {
AgentSubsystemDefinition.SERVER_BIND_ADDRESS_ATTRIBDEF.validateAndSet(operation, model);
AgentSubsystemDefinition.SERVER_TRANSPORT_PARAMS_ATTRIBDEF.validateAndSet(operation, model);
AgentSubsystemDefinition.SERVER_ALIAS_ATTRIBDEF.validateAndSet(operation, model);
+ AgentSubsystemDefinition.SOCKET_BINDING_ATTRIBDEF.validateAndSet(operation, model);
log.info("Populating the embedded agent subsystem model: " + operation + "=" + model);
}
@@ -87,10 +89,14 @@ class AgentSubsystemAdd extends AbstractAddStepHandler {
service.setConfigurationOverrides(overrides);
// install the service
+ String binding = AgentSubsystemDefinition.SOCKET_BINDING_ATTRIBDEF.resolveModelAttribute(context, model)
+ .asString();
ServiceName name = AgentService.SERVICE_NAME;
ServiceController<AgentService> controller = context.getServiceTarget() //
.addService(name, service) //
.addDependency(ServerEnvironmentService.SERVICE_NAME, ServerEnvironment.class, service.envServiceValue) //
+ .addDependency(SocketBinding.JBOSS_BINDING_NAME.append(binding), SocketBinding.class,
+ service.agentListenerBinding) //
.addListener(verificationHandler) //
.setInitialMode(Mode.ACTIVE) //
.install();
diff --git a/modules/enterprise/server/embeddedagent/src/main/java/org/rhq/embeddedagent/extension/AgentSubsystemDefinition.java b/modules/enterprise/server/embeddedagent/src/main/java/org/rhq/embeddedagent/extension/AgentSubsystemDefinition.java
index dec1a26..ad50c4e 100644
--- a/modules/enterprise/server/embeddedagent/src/main/java/org/rhq/embeddedagent/extension/AgentSubsystemDefinition.java
+++ b/modules/enterprise/server/embeddedagent/src/main/java/org/rhq/embeddedagent/extension/AgentSubsystemDefinition.java
@@ -9,6 +9,7 @@ import org.jboss.as.controller.SimpleAttributeDefinitionBuilder;
import org.jboss.as.controller.SimpleResourceDefinition;
import org.jboss.as.controller.descriptions.DefaultOperationDescriptionProvider;
import org.jboss.as.controller.operations.common.GenericSubsystemDescribeHandler;
+import org.jboss.as.controller.operations.validation.StringLengthValidator;
import org.jboss.as.controller.registry.AttributeAccess;
import org.jboss.as.controller.registry.ManagementResourceRegistration;
import org.jboss.as.controller.registry.OperationEntry;
@@ -69,6 +70,12 @@ public class AgentSubsystemDefinition extends SimpleResourceDefinition {
.setXmlName(AgentSubsystemExtension.ATTRIB_SERVER_ALIAS)
.setFlags(AttributeAccess.Flag.RESTART_RESOURCE_SERVICES).setAllowNull(true).build();
+ protected static final SimpleAttributeDefinition SOCKET_BINDING_ATTRIBDEF = new SimpleAttributeDefinitionBuilder(
+ AgentSubsystemExtension.ATTRIB_SOCKET_BINDING, ModelType.STRING)
+ .setFlags(AttributeAccess.Flag.RESTART_RESOURCE_SERVICES)
+ .setDefaultValue(new ModelNode("embeddedagent")).setValidator(new StringLengthValidator(1)).setAllowNull(false)
+ .build();
+
private AgentSubsystemDefinition() {
super(AgentSubsystemExtension.SUBSYSTEM_PATH, AgentSubsystemExtension.getResourceDescriptionResolver(null),
AgentSubsystemAdd.INSTANCE, AgentSubsystemRemove.INSTANCE);
@@ -85,6 +92,7 @@ public class AgentSubsystemDefinition extends SimpleResourceDefinition {
registerReloadRequiredWriteAttributeHandler(rr, SERVER_BIND_ADDRESS_ATTRIBDEF);
registerReloadRequiredWriteAttributeHandler(rr, SERVER_TRANSPORT_PARAMS_ATTRIBDEF);
registerReloadRequiredWriteAttributeHandler(rr, SERVER_ALIAS_ATTRIBDEF);
+ registerReloadRequiredWriteAttributeHandler(rr, SOCKET_BINDING_ATTRIBDEF);
}
private void registerReloadRequiredWriteAttributeHandler(ManagementResourceRegistration rr, AttributeDefinition def) {
diff --git a/modules/enterprise/server/embeddedagent/src/main/java/org/rhq/embeddedagent/extension/AgentSubsystemExtension.java b/modules/enterprise/server/embeddedagent/src/main/java/org/rhq/embeddedagent/extension/AgentSubsystemExtension.java
index a74bafb..4374c7e 100644
--- a/modules/enterprise/server/embeddedagent/src/main/java/org/rhq/embeddedagent/extension/AgentSubsystemExtension.java
+++ b/modules/enterprise/server/embeddedagent/src/main/java/org/rhq/embeddedagent/extension/AgentSubsystemExtension.java
@@ -62,11 +62,10 @@ public class AgentSubsystemExtension implements Extension {
protected static final String ATTRIB_SERVER_BIND_ADDRESS = AgentConfigurationConstants.SERVER_BIND_ADDRESS;
protected static final String ATTRIB_SERVER_TRANSPORT_PARAMS = AgentConfigurationConstants.SERVER_TRANSPORT_PARAMS;
protected static final String ATTRIB_SERVER_ALIAS = AgentConfigurationConstants.SERVER_ALIAS;
+ protected static final String ATTRIB_SOCKET_BINDING = "socket-binding";
protected static final PathElement SUBSYSTEM_PATH = PathElement.pathElement(SUBSYSTEM, SUBSYSTEM_NAME);
-
-
static StandardResourceDescriptionResolver getResourceDescriptionResolver(final String keyPrefix) {
String prefix = SUBSYSTEM_NAME + (keyPrefix == null ? "" : "." + keyPrefix);
return new StandardResourceDescriptionResolver(prefix, RESOURCE_NAME,
@@ -133,6 +132,8 @@ public class AgentSubsystemExtension implements Extension {
opAdd.get(ATTRIB_SERVER_TRANSPORT_PARAMS).set(reader.getElementText());
} else if (elementName.equals(ATTRIB_SERVER_ALIAS)) {
opAdd.get(ATTRIB_SERVER_ALIAS).set(reader.getElementText());
+ } else if (elementName.equals(ATTRIB_SOCKET_BINDING)) {
+ opAdd.get(ATTRIB_SOCKET_BINDING).set(reader.getElementText());
} else {
throw ParseUtils.unexpectedElement(reader);
}
@@ -188,6 +189,7 @@ public class AgentSubsystemExtension implements Extension {
writeElement(writer, node, ATTRIB_SERVER_BIND_ADDRESS);
writeElement(writer, node, ATTRIB_SERVER_TRANSPORT_PARAMS);
writeElement(writer, node, ATTRIB_SERVER_ALIAS);
+ writeElement(writer, node, ATTRIB_SOCKET_BINDING);
// <plugins>
writer.writeStartElement(PLUGINS_ELEMENT);
diff --git a/modules/enterprise/server/embeddedagent/src/main/resources/module/main/module.xml b/modules/enterprise/server/embeddedagent/src/main/resources/module/main/module.xml
index 56419c3..faa971f 100644
--- a/modules/enterprise/server/embeddedagent/src/main/resources/module/main/module.xml
+++ b/modules/enterprise/server/embeddedagent/src/main/resources/module/main/module.xml
@@ -48,5 +48,8 @@
<!-- the standalone agent had this in its endorsed dir, but the embedded agent adds it as a dependency -->
<module name="javax.xml.bind.api"/>
+
+ <!-- other dependencies we need -->
+ <module name="org.jboss.as.network"/>
</dependencies>
</module>
diff --git a/modules/enterprise/server/embeddedagent/src/main/resources/org/rhq/embeddedagent/extension/LocalDescriptions.properties b/modules/enterprise/server/embeddedagent/src/main/resources/org/rhq/embeddedagent/extension/LocalDescriptions.properties
index 80ea9e5..e2badfc 100644
--- a/modules/enterprise/server/embeddedagent/src/main/resources/org/rhq/embeddedagent/extension/LocalDescriptions.properties
+++ b/modules/enterprise/server/embeddedagent/src/main/resources/org/rhq/embeddedagent/extension/LocalDescriptions.properties
@@ -6,6 +6,7 @@ embeddedagent.stop=Stops the RHQ Agent if it is running.
embeddedagent.status=Tells you if the RHQ Agent is currently started or stopped.
embeddedagent.enabled=When true, the RHQ Agent will be deployed and started. Otherwise, it will be disabled.
embeddedagent.plugins=Indicates what plugins should be enabled or disabled.
+embeddedagent.socket-binding=Determines the binding address and port the agent listens to for incoming server messages.
embeddedagent.rhq.agent.name=Name to uniquely identify this agent among all other agents in the environment
embeddedagent.rhq.agent.disable-native-system=The RHQ Agent has a native system on certain supported platforms to help the \n\
plugin container perform discovery of native components on those platforms. \n\
diff --git a/modules/enterprise/server/embeddedagent/src/main/resources/schema/embeddedagent.xsd b/modules/enterprise/server/embeddedagent/src/main/resources/schema/embeddedagent.xsd
index b9a3689..d587e23 100644
--- a/modules/enterprise/server/embeddedagent/src/main/resources/schema/embeddedagent.xsd
+++ b/modules/enterprise/server/embeddedagent/src/main/resources/schema/embeddedagent.xsd
@@ -17,6 +17,7 @@
<xs:element name="rhq.agent.server.bind-address" type="xs:string" use="optional"/>
<xs:element name="rhq.agent.server.transport-params" type="xs:string" />
<xs:element name="rhq.agent.server.alias" type="xs:string" use="optional"/>
+ <xs:element name="socket-binding" type="xs:string"/>
<xs:element name="plugins" type="pluginsType"/>
</xs:all>
</xs:complexType>
diff --git a/modules/enterprise/server/embeddedagent/src/test/java/org/rhq/embeddedagent/extension/SubsystemParsingTestCase.java b/modules/enterprise/server/embeddedagent/src/test/java/org/rhq/embeddedagent/extension/SubsystemParsingTestCase.java
index 0a8758f..0f3a8fa 100644
--- a/modules/enterprise/server/embeddedagent/src/test/java/org/rhq/embeddedagent/extension/SubsystemParsingTestCase.java
+++ b/modules/enterprise/server/embeddedagent/src/test/java/org/rhq/embeddedagent/extension/SubsystemParsingTestCase.java
@@ -198,6 +198,7 @@ public class SubsystemParsingTestCase extends SubsystemBaseParsingTestCase {
List<Property> attributes = content.get("attributes").asPropertyList();
List<String> expectedAttributes = Arrays.asList( //
+ AgentSubsystemExtension.ATTRIB_SOCKET_BINDING, //
AgentSubsystemExtension.ATTRIB_SERVER_TRANSPORT, //
AgentSubsystemExtension.ATTRIB_SERVER_BIND_PORT, //
AgentSubsystemExtension.ATTRIB_SERVER_BIND_ADDRESS, //
diff --git a/modules/enterprise/server/embeddedagent/src/test/resources/org/rhq/embeddedagent/extension/subsystem.xml b/modules/enterprise/server/embeddedagent/src/test/resources/org/rhq/embeddedagent/extension/subsystem.xml
index e8641f1..09da3f6 100644
--- a/modules/enterprise/server/embeddedagent/src/test/resources/org/rhq/embeddedagent/extension/subsystem.xml
+++ b/modules/enterprise/server/embeddedagent/src/test/resources/org/rhq/embeddedagent/extension/subsystem.xml
@@ -7,6 +7,8 @@
<rhq.agent.server.transport-params>test-transport-params</rhq.agent.server.transport-params>
<!-- <rhq.agent.server.alias>test-alias</rhq.agent.server.alias> -->
+ <socket-binding>embeddedagent</socket-binding>
+
<plugins>
<plugin name="platform" enabled="true" />
<plugin name="blah" enabled="false" />
10 years, 3 months
[rhq] 2 commits - modules/core
by Jay Shaughnessy
modules/core/plugin-container/src/main/java/org/rhq/core/pc/configuration/ConfigurationCheckExecutor.java | 77 +++++-----
modules/core/plugin-container/src/main/java/org/rhq/core/pc/configuration/ConfigurationManager.java | 8 -
modules/core/plugin-container/src/main/java/org/rhq/core/pc/drift/SnapshotGenerator.java | 17 +-
modules/core/plugin-container/src/main/java/org/rhq/core/pc/inventory/InventoryManager.java | 21 ++
4 files changed, 69 insertions(+), 54 deletions(-)
New commits:
commit ea1f3b883340dd4db2724f01e80484a497c4f048
Author: Jay Shaughnessy <jshaughn(a)redhat.com>
Date: Mon Feb 3 12:29:48 2014 -0500
[1059932] PC classes must use data directory configured in PC, not just use "data"
Change things around to us the proper data directory, as configured for the
plugin container (via InventoryManager)
diff --git a/modules/core/plugin-container/src/main/java/org/rhq/core/pc/configuration/ConfigurationCheckExecutor.java b/modules/core/plugin-container/src/main/java/org/rhq/core/pc/configuration/ConfigurationCheckExecutor.java
index da70b18..666281f 100644
--- a/modules/core/plugin-container/src/main/java/org/rhq/core/pc/configuration/ConfigurationCheckExecutor.java
+++ b/modules/core/plugin-container/src/main/java/org/rhq/core/pc/configuration/ConfigurationCheckExecutor.java
@@ -39,6 +39,7 @@ import org.rhq.core.domain.measurement.AvailabilityType;
import org.rhq.core.domain.resource.InventoryStatus;
import org.rhq.core.domain.resource.Resource;
import org.rhq.core.domain.resource.ResourceType;
+import org.rhq.core.pc.PluginContainer;
import org.rhq.core.pc.inventory.InventoryManager;
import org.rhq.core.pc.inventory.ResourceContainer;
import org.rhq.core.pc.util.FacetLockType;
@@ -52,13 +53,10 @@ public class ConfigurationCheckExecutor implements Runnable, Callable {
private static final Log log = LogFactory.getLog(ConfigurationCheckExecutor.class);
private ConfigurationServerService configurationServerService;
- private InventoryManager inventoryManager;
private static final long CONFIGURATION_CHECK_TIMEOUT = 30000L;
- public ConfigurationCheckExecutor(ConfigurationServerService configurationServerService,
- InventoryManager inventoryManager) {
+ public ConfigurationCheckExecutor(ConfigurationServerService configurationServerService) {
this.configurationServerService = configurationServerService;
- this.inventoryManager = inventoryManager;
}
public void run() {
@@ -70,14 +68,16 @@ public class ConfigurationCheckExecutor implements Runnable, Callable {
long start = System.currentTimeMillis();
CountTime countTime;
- countTime = checkConfigurations(this.inventoryManager.getPlatform(), true);
- log.info("Configuration update check for [" + countTime.count + "] resources completed in " +
- (System.currentTimeMillis() - start)/1000 + "s wall time, " + countTime.time + "ms check time");
+ InventoryManager inventoryManager = PluginContainer.getInstance().getInventoryManager();
+ Resource platform = inventoryManager.getPlatform();
+ countTime = checkConfigurations(inventoryManager, platform, true);
+ log.info("Configuration update check for [" + countTime.count + "] resources completed in "
+ + (System.currentTimeMillis() - start) / 1000 + "s wall time, " + countTime.time + "ms check time");
return null;
}
- public CountTime checkConfigurations(Resource resource, boolean checkChildren) {
- ResourceContainer resourceContainer = this.inventoryManager.getResourceContainer(resource.getId());
+ public CountTime checkConfigurations(InventoryManager inventoryManager, Resource resource, boolean checkChildren) {
+ ResourceContainer resourceContainer = inventoryManager.getResourceContainer(resource.getId());
ConfigurationFacet resourceComponent = null;
ResourceType resourceType = resource.getResourceType();
@@ -93,7 +93,7 @@ public class ConfigurationCheckExecutor implements Runnable, Callable {
FacetLockType.NONE, CONFIGURATION_CHECK_TIMEOUT, true, false, true);
} catch (PluginContainerException e) {
// Expecting when the resource does not support configuration management
- // Should never happen after above check
+ // Should never happen after above check
}
}
@@ -104,7 +104,6 @@ public class ConfigurationCheckExecutor implements Runnable, Callable {
long t1 = System.currentTimeMillis();
-
if (debugEnabled) {
log.debug("Checking for updated Resource configuration for " + resource + "...");
}
@@ -117,7 +116,8 @@ public class ConfigurationCheckExecutor implements Runnable, Callable {
.getResourceConfigurationDefinition();
// Normalize and validate the config.
- ConfigurationUtility.normalizeConfiguration(liveConfiguration, configurationDefinition,true,true);
+ ConfigurationUtility.normalizeConfiguration(liveConfiguration, configurationDefinition,
+ true, true);
List<String> errorMessages = ConfigurationUtility.validateConfiguration(liveConfiguration,
configurationDefinition);
for (String errorMessage : errorMessages) {
@@ -126,10 +126,10 @@ public class ConfigurationCheckExecutor implements Runnable, Callable {
+ errorMessage);
}
- Configuration original = getResourceConfiguration(resource);
+ Configuration original = getResourceConfiguration(inventoryManager, resource);
- if (original==null) {
- original = loadConfigurationFromFile(resource.getId());
+ if (original == null) {
+ original = loadConfigurationFromFile(inventoryManager, resource.getId());
}
if (!liveConfiguration.equals(original)) {
@@ -138,8 +138,9 @@ public class ConfigurationCheckExecutor implements Runnable, Callable {
}
this.configurationServerService.persistUpdatedResourceConfiguration(resource.getId(),
liveConfiguration);
-// resource.setResourceConfiguration(liveConfiguration);
- boolean persisted = persistConfigurationToFile(resource.getId(),liveConfiguration, log);
+ // resource.setResourceConfiguration(liveConfiguration);
+ boolean persisted = persistConfigurationToFile(inventoryManager, resource.getId(),
+ liveConfiguration, log);
if (persisted) {
resource.setResourceConfiguration(null);
}
@@ -151,22 +152,22 @@ public class ConfigurationCheckExecutor implements Runnable, Callable {
}
long now = System.currentTimeMillis();
- countTime.add(1,(now-t1));
+ countTime.add(1, (now - t1));
// Give the agent some time to breathe
try {
Thread.sleep(750);
} catch (InterruptedException e) {
; // We don't care
+ }
}
}
- }
if (checkChildren) {
- for (Resource child : this.inventoryManager.getContainerChildren(resource, resourceContainer)) {
+ for (Resource child : inventoryManager.getContainerChildren(resource, resourceContainer)) {
try {
- CountTime inner = checkConfigurations(child, true);
- countTime.add(inner.count,inner.time);
+ CountTime inner = checkConfigurations(inventoryManager, child, true);
+ countTime.add(inner.count, inner.time);
} catch (Exception e) {
log.error("Failed to check Resource configuration for " + child + ".", e);
}
@@ -176,20 +177,22 @@ public class ConfigurationCheckExecutor implements Runnable, Callable {
return countTime;
}
- public static Configuration getResourceConfiguration(Resource resource) {
+ static public Configuration getResourceConfiguration(InventoryManager inventoryManager, Resource resource) {
Configuration result = resource.getResourceConfiguration();
if (null == result) {
- result = loadConfigurationFromFile(resource.getId());
+ result = loadConfigurationFromFile(inventoryManager, resource.getId());
}
return result;
}
- public static boolean persistConfigurationToFile(int resourceId, Configuration liveConfiguration, Log log) {
+ static public boolean persistConfigurationToFile(InventoryManager inventoryManager, int resourceId,
+ Configuration liveConfiguration, Log log) {
boolean success = true;
try {
- String pathname = "data/rc/" + String.valueOf(resourceId/1000); // Don't put too many files into one data dir
- File dataDir = new File(pathname);
+ File baseDataDir = inventoryManager.getDataDirectory();
+ String pathname = "rc/" + String.valueOf(resourceId / 1000); // Don't put too many files into one data dir
+ File dataDir = new File(baseDataDir, pathname);
if (!dataDir.exists()) {
success = dataDir.mkdirs();
if (!success) {
@@ -213,9 +216,10 @@ public class ConfigurationCheckExecutor implements Runnable, Callable {
}
- static private Configuration loadConfigurationFromFile(int resourceId) {
- String pathname = "data/rc/" + String.valueOf(resourceId/1000); // Don't put too many files into one data dir
- File dataDir = new File(pathname);
+ static private Configuration loadConfigurationFromFile(InventoryManager inventoryManager, int resourceId) {
+ File baseDataDir = inventoryManager.getDataDirectory();
+ String pathname = "rc/" + String.valueOf(resourceId / 1000); // Don't put too many files into one data dir
+ File dataDir = new File(baseDataDir, pathname);
File file = new File(dataDir, String.valueOf(resourceId));
if (!file.exists()) {
log.error("File " + file.getAbsolutePath() + " does not exist");
@@ -239,21 +243,18 @@ public class ConfigurationCheckExecutor implements Runnable, Callable {
}
private static class CountTime {
- private long count=0;
- private long time=0;
+ private long count = 0L;
+ private long time = 0L;
private void add(long count, long time) {
- this.count +=count;
- this.time +=time;
+ this.count += count;
+ this.time += time;
}
@Override
public String toString() {
- return "CountTime{" +
- "count=" + count +
- ", time=" + time +
- '}';
+ return "CountTime{" + "count=" + count + ", time=" + time + '}';
}
}
diff --git a/modules/core/plugin-container/src/main/java/org/rhq/core/pc/configuration/ConfigurationManager.java b/modules/core/plugin-container/src/main/java/org/rhq/core/pc/configuration/ConfigurationManager.java
index 393d392..dfb6412 100644
--- a/modules/core/plugin-container/src/main/java/org/rhq/core/pc/configuration/ConfigurationManager.java
+++ b/modules/core/plugin-container/src/main/java/org/rhq/core/pc/configuration/ConfigurationManager.java
@@ -81,13 +81,13 @@ public class ConfigurationManager extends AgentService implements ContainerServi
threadPool = new ScheduledThreadPoolExecutor(1, threadFactory);
ConfigurationCheckExecutor configurationChecker = new ConfigurationCheckExecutor(
- getConfigurationServerService(), PluginContainer.getInstance().getInventoryManager());
+ getConfigurationServerService());
if (pluginContainerConfiguration.getConfigurationDiscoveryPeriod() > 0
&& pluginContainerConfiguration.isInsideAgent()) {
- threadPool.scheduleAtFixedRate(configurationChecker, pluginContainerConfiguration
- .getConfigurationDiscoveryInitialDelay(), pluginContainerConfiguration
- .getConfigurationDiscoveryPeriod(), TimeUnit.SECONDS);
+ threadPool.scheduleAtFixedRate(configurationChecker,
+ pluginContainerConfiguration.getConfigurationDiscoveryInitialDelay(),
+ pluginContainerConfiguration.getConfigurationDiscoveryPeriod(), TimeUnit.SECONDS);
}
}
diff --git a/modules/core/plugin-container/src/main/java/org/rhq/core/pc/inventory/InventoryManager.java b/modules/core/plugin-container/src/main/java/org/rhq/core/pc/inventory/InventoryManager.java
index ca940cb..aed76d9 100644
--- a/modules/core/plugin-container/src/main/java/org/rhq/core/pc/inventory/InventoryManager.java
+++ b/modules/core/plugin-container/src/main/java/org/rhq/core/pc/inventory/InventoryManager.java
@@ -1864,6 +1864,10 @@ public class InventoryManager extends AgentService implements ContainerService,
+ "] as it is already in the process of starting.");
return false;
+ default:
+ log.error("Unexpected state [" + state.name() + "], returning false...");
+
+ return false;
}
} else {
if (log.isTraceEnabled()) {
@@ -2038,6 +2042,13 @@ public class InventoryManager extends AgentService implements ContainerService,
}
}
+ /**
+ * @return The location for [plugins] to write data files
+ */
+ public File getDataDirectory() {
+ return this.configuration.getDataDirectory();
+ }
+
private <T extends ResourceComponent<?>> ResourceContext<T> createResourceContext(Resource resource,
T parentComponent, ResourceContext<?> parentResourceContext, ResourceDiscoveryComponent<T> discoveryComponent) {
@@ -2047,7 +2058,7 @@ public class InventoryManager extends AgentService implements ContainerService,
discoveryComponent, // the discovery component (this is actually the proxy to it)
SystemInfoFactory.createSystemInfo(), // for native access
this.configuration.getTemporaryDirectory(), // location for plugin to write temp files
- this.configuration.getDataDirectory(), // location for plugin to write data files
+ this.configuration.getDataDirectory(), // base location for plugin to write data files
this.configuration.getContainerName(), // the name of the agent/PC
getEventContext(resource), // for event access
getOperationContext(resource), // for operation manager access
@@ -3073,7 +3084,7 @@ public class InventoryManager extends AgentService implements ContainerService,
if (resourceConfiguration != null) {
resourceConfiguration.cleanoutRawConfiguration();
- boolean persisted = ConfigurationCheckExecutor.persistConfigurationToFile(resource.getId(),
+ boolean persisted = ConfigurationCheckExecutor.persistConfigurationToFile(this, resource.getId(),
resourceConfiguration, log);
if (persisted) {
resource.setResourceConfiguration(null);
@@ -3082,7 +3093,8 @@ public class InventoryManager extends AgentService implements ContainerService,
}
public static Configuration getResourceConfiguration(Resource agentSideResource) {
- return ConfigurationCheckExecutor.getResourceConfiguration(agentSideResource);
+ return ConfigurationCheckExecutor.getResourceConfiguration(PluginContainer.getInstance().getInventoryManager(),
+ agentSideResource);
}
private void compactConfiguration(Configuration config) {
@@ -3487,6 +3499,9 @@ public class InventoryManager extends AgentService implements ContainerService,
}
break;
}
+ default:
+ // nothing to do for other states
+ break;
}
container.setSynchronizationState(ResourceContainer.SynchronizationState.SYNCHRONIZED);
commit 5bd9673b5d08f51e933438132e63d7620354de9c
Author: Jay Shaughnessy <jshaughn(a)redhat.com>
Date: Mon Feb 3 12:00:11 2014 -0500
Fix: was logging as the wrong class.
diff --git a/modules/core/plugin-container/src/main/java/org/rhq/core/pc/drift/SnapshotGenerator.java b/modules/core/plugin-container/src/main/java/org/rhq/core/pc/drift/SnapshotGenerator.java
index e6c4d07..bdc84f4 100644
--- a/modules/core/plugin-container/src/main/java/org/rhq/core/pc/drift/SnapshotGenerator.java
+++ b/modules/core/plugin-container/src/main/java/org/rhq/core/pc/drift/SnapshotGenerator.java
@@ -1,10 +1,6 @@
package org.rhq.core.pc.drift;
-import org.apache.commons.io.DirectoryWalker;
-import org.apache.commons.io.FilenameUtils;
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import static java.io.File.separator;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
@@ -21,14 +17,17 @@ import java.util.List;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;
-import org.rhq.core.pc.configuration.ConfigurationCheckExecutor;
-import org.rhq.core.util.MessageDigestGenerator;
+import org.apache.commons.io.DirectoryWalker;
+import org.apache.commons.io.FilenameUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
-import static java.io.File.separator;
+import org.rhq.core.util.MessageDigestGenerator;
public class SnapshotGenerator extends DirectoryWalker {
- private final Log log = LogFactory.getLog(ConfigurationCheckExecutor.class);
+ private final Log log = LogFactory.getLog(SnapshotGenerator.class);
private MessageDigestGenerator digestGenerator = new MessageDigestGenerator(MessageDigestGenerator.SHA_256);
10 years, 3 months
[rhq] Branch 'jsanda/throttling' - modules/enterprise
by John Sanda
modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/SignalingCountDownLatch.java | 6
modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/StorageSession.java | 72 --
modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Aggregate1HourData.java | 133 ----
modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Aggregate6HourData.java | 91 ---
modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/AggregateIndexEntriesHandler.java | 85 --
modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/AggregateRawData.java | 137 ----
modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/AggregationState.java | 144 ++--
modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/AggregationType.java | 25
modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Aggregator.java | 289 +---------
modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/BatchAggregationScheduler.java | 111 +++
modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/BatchAggregationState.java | 108 +++
modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/BatchAggregator.java | 78 ++
modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/OneHourDataScheduler.java | 47 +
modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/RawDataScheduler.java | 47 +
modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/SixHourDataScheduler.java | 46 +
15 files changed, 620 insertions(+), 799 deletions(-)
New commits:
commit af7af8fc5f453246637a2a6aea9e9e297abb7147
Author: John Sanda <jsanda(a)redhat.com>
Date: Mon Feb 3 11:30:00 2014 -0500
[BZ 1045589] major refactoring of aggregation to limit concurrent reads
Even with really high throttling like only allowing 5k requests/sec, it was
still easily possible to do enough concurrent reads during aggregation that
could result in OMMs. Prior to this commit, several hundred or thousdand
queries could be basically executed in parallel to fetch raw data. The max
size of the payload coming in the form of a StorageResultSetFuture object is
about 30 KB.
With this commit, the number of concurrent reads that is done during
aggregation has a fix, configurable upper bound. It can be configured with the
following system properties,
* rhq.metrics.aggregation.batch-size
* rhq.metrics.aggregation.parallelism
Data for measurement schedules is fetched and computed in chunks or batches.
The batch-size property specifies the number of schedules for which data will
be fetched. The parallelism property specifies the number of batches that can
be processed in parallel. In terms of implementation, this is all handled using
a Semaphore. The number of semaphore permits is determined by
batch-size * parallelism. So for example, if batch-size is 25 and parallelism
is 4, then this will yield 100 permits, meaning at most 100 concurrent metric
data queries are allowed.
diff --git a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/SignalingCountDownLatch.java b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/SignalingCountDownLatch.java
index 2d0d190..edcb7b7 100644
--- a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/SignalingCountDownLatch.java
+++ b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/SignalingCountDownLatch.java
@@ -11,6 +11,8 @@ public class SignalingCountDownLatch {
private CountDownLatch latch;
+ private String msg;
+
public SignalingCountDownLatch(CountDownLatch latch) {
this.latch = latch;
}
@@ -18,11 +20,11 @@ public class SignalingCountDownLatch {
public void await() throws InterruptedException, AbortedException {
latch.await();
if (aborted) {
- throw new AbortedException();
+ throw new AbortedException(msg);
}
}
- public void abort() {
+ public void abort(String msg) {
aborted = true;
latch.countDown();
}
diff --git a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/StorageSession.java b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/StorageSession.java
index d61f380..41cdc92 100644
--- a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/StorageSession.java
+++ b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/StorageSession.java
@@ -4,7 +4,6 @@ import java.net.InetAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Host;
@@ -35,7 +34,7 @@ public class StorageSession implements Host.StateListener {
private int minRequestLimit = Integer.parseInt(System.getProperty("rhq.storage.request-limit.min", "1000"));
private RateLimiter permits = RateLimiter.create(Double.parseDouble(
- System.getProperty("rhq.storage.request-limit", "50000")), 3, TimeUnit.MINUTES);
+ System.getProperty("rhq.storage.request-limit", "10000")), 3, TimeUnit.MINUTES);
private int requestLimitDelta;
@@ -43,10 +42,6 @@ public class StorageSession implements Host.StateListener {
private long permitsChangeWindow = 1000 * 10;
- private boolean permitsChanging;
-
- private ReentrantReadWriteLock permitsLock = new ReentrantReadWriteLock();
-
public StorageSession(Session wrappedSession) {
this.wrappedSession = wrappedSession;
this.wrappedSession.getCluster().register(this);
@@ -98,8 +93,7 @@ public class StorageSession implements Host.StateListener {
public ResultSet execute(String query) {
try {
-// permits.acquire();
- acquirePermit();
+ permits.acquire();
return wrappedSession.execute(query);
} catch (NoHostAvailableException e) {
handleNoHostAvailable(e);
@@ -109,8 +103,7 @@ public class StorageSession implements Host.StateListener {
public ResultSet execute(Query query) {
try {
-// permits.acquire();
- acquirePermit();
+ permits.acquire();
return wrappedSession.execute(query);
} catch (NoHostAvailableException e) {
handleNoHostAvailable(e);
@@ -119,22 +112,19 @@ public class StorageSession implements Host.StateListener {
}
public StorageResultSetFuture executeAsync(String query) {
-// permits.acquire();
- acquirePermit();
+ permits.acquire();
ResultSetFuture future = wrappedSession.executeAsync(query);
return new StorageResultSetFuture(future, this);
}
public StorageResultSetFuture executeAsync(Query query) {
-// permits.acquire();
- acquirePermit();
+ permits.acquire();
ResultSetFuture future = wrappedSession.executeAsync(query);
return new StorageResultSetFuture(future, this);
}
public PreparedStatement prepare(String query) {
-// permits.acquire();
- acquirePermit();
+ permits.acquire();
return wrappedSession.prepare(query);
}
@@ -195,54 +185,30 @@ public class StorageSession implements Host.StateListener {
void handleNoHostAvailable(NoHostAvailableException e) {
log.warn("Encountered " + NoHostAvailableException.class.getSimpleName() + " due to following error(s): " +
e.getErrors());
+ boolean isClientTimeout = false;
for (InetAddress address : e.getErrors().keySet()) {
String error = e.getErrors().get(address);
if (error != null && error.contains("Timeout during read")) {
- try {
- permitsLock.writeLock().lock();
- if (System.currentTimeMillis() - permitsLastChanged > permitsChangeWindow) {
- permitsChanging = true;
- int newRate = (int) permits.getRate() - requestLimitDelta;
- if (newRate < minRequestLimit) {
- newRate = minRequestLimit;
- }
- log.warn("The request timed out. Decreasing request throughput to " + newRate);
-// permits.setRate(newRate);
- permits = RateLimiter.create(newRate, 2, TimeUnit.MINUTES);
- permitsLastChanged = System.currentTimeMillis();
- requestLimitDelta = requestLimitDelta * 2;
+ if (System.currentTimeMillis() - permitsLastChanged > permitsChangeWindow) {
+ int newRate = (int) permits.getRate() - requestLimitDelta;
+ if (newRate < minRequestLimit) {
+ newRate = minRequestLimit;
}
- } finally {
- permitsChanging = false;
- permitsLock.writeLock().unlock();
+ log.warn("The request timed out. Decreasing request throughput to " + newRate);
+ permits.setRate(newRate);
+ permitsLastChanged = System.currentTimeMillis();
+ requestLimitDelta = requestLimitDelta * 2;
}
-
+ isClientTimeout = true;
break;
}
}
-// for (String error : e.getErrors().values()) {
-// if (error.contains("Timeout during read")) {
-// log.warn("The request timed out. Decreasing request throughput.");
-// permits.setRate(permits.getRate() - requestLimitDelta);
-// break;
-// }
-// }
- }
-
- private void acquirePermit() {
- if (permitsChanging) {
- try {
- permitsLock.readLock().lock();
- permits.acquire();
- } finally {
- permitsLock.readLock().unlock();
- }
- } else {
- permits.acquire();
+ if (!isClientTimeout) {
+ fireClusterDownEvent(e);
}
}
- void fireClusterDownEvent(NoHostAvailableException e) {
+ private void fireClusterDownEvent(NoHostAvailableException e) {
isClusterAvailable = false;
for (StorageStateListener listener : listeners) {
listener.onStorageClusterDown(e);
diff --git a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Aggregate1HourData.java b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Aggregate1HourData.java
deleted file mode 100644
index 702b64f..0000000
--- a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Aggregate1HourData.java
+++ /dev/null
@@ -1,133 +0,0 @@
-package org.rhq.server.metrics.aggregation;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-import com.datastax.driver.core.ResultSet;
-import com.google.common.base.Stopwatch;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.FutureFallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import org.rhq.core.util.exception.ThrowableUtil;
-import org.rhq.server.metrics.AbortedException;
-import org.rhq.server.metrics.MetricsDAO;
-import org.rhq.server.metrics.StorageResultSetFuture;
-
-/**
- * Generates 6 hour data for a batch of 1 hour data futures. After data is inserted for the batch, aggregation of 6 hour
- * data will start immediately for the batch if the 24 hour time slice has finished.
- *
- * @see Compute6HourData
- * @author John Sanda
- */
-class Aggregate1HourData implements Runnable {
-
- private final Log log = LogFactory.getLog(Aggregate1HourData.class);
-
- private MetricsDAO dao;
-
- private AggregationState state;
-
- private Set<Integer> scheduleIds;
-
- private List<StorageResultSetFuture> queryFutures;
-
- public Aggregate1HourData(MetricsDAO dao, AggregationState state, Set<Integer> scheduleIds,
- List<StorageResultSetFuture> queryFutures) {
- this.dao = dao;
- this.state = state;
- this.scheduleIds = scheduleIds;
- this.queryFutures = queryFutures;
- }
-
- @Override
- public void run() {
- final Stopwatch stopwatch = new Stopwatch().start();
- ListenableFuture<List<ResultSet>> queriesFuture = Futures.successfulAsList(queryFutures);
- Futures.withFallback(queriesFuture, new FutureFallback<List<ResultSet>>() {
- @Override
- public ListenableFuture<List<ResultSet>> create(Throwable t) throws Exception {
- log.error("An error occurred while fetching one hour data", t);
- return Futures.immediateFailedFuture(t);
- }
- });
- ListenableFuture<List<ResultSet>> computeFutures = Futures.transform(queriesFuture,
- state.getCompute6HourData(), state.getAggregationTasks());
- Futures.addCallback(computeFutures, new FutureCallback<List<ResultSet>>() {
- @Override
- public void onSuccess(List<ResultSet> result) {
- stopwatch.stop();
- log.debug("Finished aggregating 1 hour data for " + result.size() + " schedules in " +
- stopwatch.elapsed(TimeUnit.MILLISECONDS) + " ms");
- start6HourDataAggregationIfNecessary();
- }
-
- @Override
- public void onFailure(Throwable t) {
- if (log.isDebugEnabled()) {
- // TODO should we log the schedule ids?
- log.debug("Failed to aggregate 1 hour data for " + scheduleIds.size() + " schedules. An " +
- "unexpected error occurred.", t);
- } else {
- log.warn("Failed to aggregate 1 hour data for " + scheduleIds.size() + " schedules. An " +
- "unexpected error occurred: " + ThrowableUtil.getRootMessage(t));
- }
- start6HourDataAggregationIfNecessary();
- }
- });
- }
-
- private void start6HourDataAggregationIfNecessary() {
- try {
- if (state.is24HourTimeSliceFinished()) {
- update6HourIndexEntries();
- List<StorageResultSetFuture> queryFutures = new ArrayList<StorageResultSetFuture>(scheduleIds.size());
- for (Integer scheduleId : scheduleIds) {
- queryFutures.add(dao.findSixHourMetricsAsync(scheduleId, state.getTwentyFourHourTimeSlice().getMillis(),
- state.getTwentyFourHourTimeSliceEnd().getMillis()));
- }
- state.getAggregationTasks().submit(new Aggregate6HourData(dao, state, scheduleIds, queryFutures));
- }
- } catch (InterruptedException e) {
- if (log.isDebugEnabled()) {
- log.debug("An interrupt occurred while waiting for 6 hour data index entries. Aborting data aggregation",
- e);
- } else {
- log.info("An interrupt occurred while waiting for 6 hour data index entries. Aborting data " +
- "aggregation: " + e.getMessage());
- }
- } finally {
- int remainingSchedules = state.getRemaining1HourData().addAndGet(-scheduleIds.size());
- if (log.isDebugEnabled()) {
- log.debug("There are " + remainingSchedules + " remaining schedules with 1 hr data to be aggregated");
- }
- }
- }
-
- private void update6HourIndexEntries() throws InterruptedException {
- try {
- state.getSixHourIndexEntriesArrival().await();
- try {
- state.getSixHourIndexEntriesLock().writeLock().lock();
- state.getSixHourIndexEntries().removeAll(scheduleIds);
- } finally {
- state.getSixHourIndexEntriesLock().writeLock().unlock();
- }
- } catch (AbortedException e) {
- // This means we failed to retrieve the index entries. We can however
- // continue generating 6 hour data because we do not need the index
- // here since we already have 6 hour data to aggregate along with the
- // schedule ids.
- log.debug("The wait for 6 hour index entries has been aborted. Proceeding with scheduling computation of " +
- "1 hour aggregates for previously assigned schedules.");
- state.getRemaining6HourData().addAndGet(scheduleIds.size());
- }
- }
-}
diff --git a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Aggregate6HourData.java b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Aggregate6HourData.java
deleted file mode 100644
index 836e070..0000000
--- a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Aggregate6HourData.java
+++ /dev/null
@@ -1,91 +0,0 @@
-package org.rhq.server.metrics.aggregation;
-
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-import com.datastax.driver.core.ResultSet;
-import com.google.common.base.Stopwatch;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.FutureFallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import org.rhq.core.util.exception.ThrowableUtil;
-import org.rhq.server.metrics.MetricsDAO;
-import org.rhq.server.metrics.StorageResultSetFuture;
-
-/**
- * Generates 24 hour data for a batch of 1 hour data futures. After data is inserted for the batch, aggregation of 6
- * hour data will start immediately for the batch if the 24 hour time slice has finished.
- *
- * @see Compute24HourData
- * @author John Sanda
- */
-class Aggregate6HourData implements Runnable {
-
- private final Log log = LogFactory.getLog(Aggregate6HourData.class);
-
- private MetricsDAO dao;
-
- private AggregationState state;
-
- private Set<Integer> scheduleIds;
-
- private List<StorageResultSetFuture> queryFutures;
-
- public Aggregate6HourData(MetricsDAO dao, AggregationState state, Set<Integer> scheduleIds,
- List<StorageResultSetFuture> queryFutures) {
- this.dao = dao;
- this.state = state;
- this.scheduleIds = scheduleIds;
- this.queryFutures = queryFutures;
- }
-
- @Override
- public void run() {
- final Stopwatch stopwatch = new Stopwatch().start();
- ListenableFuture<List<ResultSet>> queriesFuture = Futures.successfulAsList(queryFutures);
- Futures.withFallback(queriesFuture, new FutureFallback<List<ResultSet>>() {
- @Override
- public ListenableFuture<List<ResultSet>> create(Throwable t) throws Exception {
- log.error("An error occurred while fetching 6 hour data", t);
- return Futures.immediateFailedFuture(t);
- }
- });
- ListenableFuture<List<ResultSet>> computeFutures = Futures.transform(queriesFuture,
- state.getCompute24HourData(), state.getAggregationTasks());
- Futures.addCallback(computeFutures, new FutureCallback<List<ResultSet>>() {
- @Override
- public void onSuccess(List<ResultSet> result) {
- stopwatch.stop();
- log.debug("Finished aggregating 6 hour data for " + result.size() + " schedules in " +
- stopwatch.elapsed(TimeUnit.MILLISECONDS) + " ms");
- updateRemaining6HrData();
- }
-
- @Override
- public void onFailure(Throwable t) {
- if (log.isDebugEnabled()) {
- // TODO should we log the schedule ids?
- log.debug("Failed to aggregate 6 hour data for " + scheduleIds.size() + " schedules. An " +
- "unexpected error occurred.", t);
- } else {
- log.warn("Failed to aggregate 6 hour data for " + scheduleIds.size() + " schedules. An " +
- "unexpected error occurred: " + ThrowableUtil.getRootMessage(t));
- }
- updateRemaining6HrData();
- }
- });
- }
-
- private void updateRemaining6HrData() {
- int remainingSchedules = state.getRemaining6HourData().addAndGet(-scheduleIds.size());
- if (log.isDebugEnabled()) {
- log.debug("There are " + remainingSchedules + " remaining schedules with 6 hr data to be aggregated");
- }
- }
-}
diff --git a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/AggregateIndexEntriesHandler.java b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/AggregateIndexEntriesHandler.java
deleted file mode 100644
index f8c5318..0000000
--- a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/AggregateIndexEntriesHandler.java
+++ /dev/null
@@ -1,85 +0,0 @@
-package org.rhq.server.metrics.aggregation;
-
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.Row;
-import com.google.common.base.Stopwatch;
-import com.google.common.util.concurrent.FutureCallback;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import org.rhq.core.util.exception.ThrowableUtil;
-import org.rhq.server.metrics.SignalingCountDownLatch;
-
-/**
-* @author John Sanda
-*/
-class AggregateIndexEntriesHandler implements FutureCallback<ResultSet> {
-
- private final Log log = LogFactory.getLog(AggregateIndexEntriesHandler.class);
-
- private Set<Integer> indexEntries;
-
- private AtomicInteger remainingData;
-
- private SignalingCountDownLatch indexEntriesArrival;
-
- private Stopwatch stopwatch;
-
- private String src;
-
- private String dest;
-
- public AggregateIndexEntriesHandler(Set<Integer> indexEntries, AtomicInteger remainingData,
- SignalingCountDownLatch indexEntriesArrival, Stopwatch stopwatch, String src, String dest) {
- this.indexEntries = indexEntries;
- this.remainingData = remainingData;
- this.indexEntriesArrival = indexEntriesArrival;
- this.stopwatch = stopwatch;
- this.src = src;
- this.dest = dest;
- }
-
- @Override
- public void onSuccess(ResultSet resultSet) {
- for (Row row : resultSet) {
- indexEntries.add(row.getInt(1));
- }
-
- // Even if indexInetries is empty, it is possible (though unlikely) that a subsequent query could return a
- // non-empty result set. This might happen if the index was updated at the very end of the last time slice, and
- // we do an inconsistent read. When it is empty, abort() is called to let the AggregateXXXData objects know that
- // the should update remainingData.
- if (indexEntries.isEmpty()) {
- indexEntriesArrival.abort();
- } else {
- remainingData.set(indexEntries.size());
- indexEntriesArrival.countDown();
- }
-
- stopwatch.stop();
-
- if (log.isDebugEnabled()) {
- log.debug("Finished loading " + indexEntries.size() + " " + src + " index entries in " +
- stopwatch.elapsed(TimeUnit.MILLISECONDS) + " ms");
- }
-
- }
-
- @Override
- public void onFailure(Throwable t) {
- if (log.isDebugEnabled()) {
- log.debug("Some " + dest + " aggregates may not get computed. An unexpected error occurred while " +
- "retrieving " + src + " index entries", t);
- } else {
- log.warn("Some " + dest + " aggregates may not get computed. An unexpected error occurred while " +
- "retrieving " + src + " index entries: " + ThrowableUtil.getRootMessage(t));
- }
- remainingData.set(0);
- indexEntriesArrival.abort();
- }
-}
diff --git a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/AggregateRawData.java b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/AggregateRawData.java
deleted file mode 100644
index 0402f9e..0000000
--- a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/AggregateRawData.java
+++ /dev/null
@@ -1,137 +0,0 @@
-package org.rhq.server.metrics.aggregation;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-import com.datastax.driver.core.ResultSet;
-import com.google.common.base.Stopwatch;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import org.rhq.core.util.exception.ThrowableUtil;
-import org.rhq.server.metrics.AbortedException;
-import org.rhq.server.metrics.MetricsDAO;
-import org.rhq.server.metrics.StorageResultSetFuture;
-
-/**
- * Generates 1 hour data for a batch of raw data futures. After data is inserted for the batch, aggregation of 1 hour
- * data will start immediately for the batch if the 6 hour time slice has finished.
- *
- * @see Compute1HourData
- * @author John Sanda
- */
-class AggregateRawData implements Runnable {
-
- private final Log log = LogFactory.getLog(AggregateRawData.class);
-
- private MetricsDAO dao;
-
- private AggregationState state;
-
- private Set<Integer> scheduleIds;
-
- private List<StorageResultSetFuture> queryFutures;
-
- public AggregateRawData(MetricsDAO dao, AggregationState state, Set<Integer> scheduleIds,
- List<StorageResultSetFuture> queryFutures) {
- this.dao = dao;
- this.state = state;
- this.scheduleIds = scheduleIds;
- this.queryFutures = queryFutures;
- }
-
- @Override
- public void run() {
- final Stopwatch stopwatch = new Stopwatch().start();
- try {
- ListenableFuture<List<ResultSet>> rawDataFutures = Futures.successfulAsList(queryFutures);
- final ListenableFuture<List<ResultSet>> insert1HourDataFutures = Futures.transform(rawDataFutures,
- state.getCompute1HourData(), state.getAggregationTasks());
- Futures.addCallback(insert1HourDataFutures, new FutureCallback<List<ResultSet>>() {
- @Override
- public void onSuccess(List<ResultSet> resultSets) {
- stopwatch.stop();
- log.debug("Finished aggregating raw data for " + scheduleIds.size() + " schedules in " +
- stopwatch.elapsed(TimeUnit.MILLISECONDS) + " ms");
- start1HourDataAggregationIfNecessary();
- }
-
- @Override
- public void onFailure(Throwable t) {
- if (log.isDebugEnabled()) {
- // TODO should we log the schedule ids?
- log.debug("Failed to aggregate raw data for " + scheduleIds.size() + " schedules. An unexpected " +
- "error occurred.", t);
- } else {
- log.warn("Failed to aggregate raw data for " + scheduleIds.size() + " schedules. An " +
- "unexpected error occurred: " + ThrowableUtil.getRootMessage(t));
- }
- start1HourDataAggregationIfNecessary();
- }
- }, state.getAggregationTasks());
- } catch (Exception e) {
- log.error("An unexpected error occurred while aggregating raw data", e);
- }
- }
-
- private void start1HourDataAggregationIfNecessary() {
- try {
- if (state.is6HourTimeSliceFinished()) {
- update1HourIndexEntries();
- List<StorageResultSetFuture> oneHourDataQueryFutures = new ArrayList<StorageResultSetFuture>(
- scheduleIds.size());
- for (Integer scheduleId : scheduleIds) {
- oneHourDataQueryFutures.add(dao.findOneHourMetricsAsync(scheduleId,
- state.getSixHourTimeSlice().getMillis(), state.getSixHourTimeSliceEnd().getMillis()));
- }
- state.getAggregationTasks().submit(new Aggregate1HourData(dao, state, scheduleIds,
- oneHourDataQueryFutures));
- }
- } catch (InterruptedException e) {
- if (log.isDebugEnabled()) {
- log.debug("An interrupt occurred while waiting for 1 hour data index entries. Aborting data aggregation",
- e);
- } else {
- log.info("An interrupt occurred while waiting for 1 hour data index entries. Aborting data " +
- "aggregation: " + e.getMessage());
- }
- } finally {
- int remainingSchedules = state.getRemainingRawData().addAndGet(-scheduleIds.size());
- if (log.isDebugEnabled()) {
- log.debug("There are " + remainingSchedules + " remaining schedules with raw data to be aggregated");
- }
-
- }
- }
-
- private void update1HourIndexEntries() throws InterruptedException {
- try {
- // Wait for the arrival so that we can remove the schedules ids in this
- // batch from the one hour index entries. This will prevent duplicate tasks
- // being submitted to process the same 1 hour data.
- log.debug("Waiting for arrival of 1 hour index entries");
- state.getOneHourIndexEntriesArrival().await();
- try {
- state.getOneHourIndexEntriesLock().writeLock().lock();
- state.getOneHourIndexEntries().removeAll(scheduleIds);
- log.debug("Finished updating state.oneHourIndexEntries");
- } finally {
- state.getOneHourIndexEntriesLock().writeLock().unlock();
- }
- } catch (AbortedException e) {
- // This means we failed to retrieve the index entries. We can however
- // continue generating 1 hour data because we do not need the index
- // here since we already have 1 hour data to aggregate along with the
- // schedule ids.
- log.debug("The wait for 1 hour index entries has been aborted. Proceeding with scheduling computation of " +
- "1 hour aggregates for previously assigned schedules.");
- state.getRemaining1HourData().addAndGet(scheduleIds.size());
- }
- }
-}
diff --git a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/AggregationState.java b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/AggregationState.java
index 345e53a..78f1bc4 100644
--- a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/AggregationState.java
+++ b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/AggregationState.java
@@ -1,13 +1,13 @@
package org.rhq.server.metrics.aggregation;
-import java.util.Set;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
import com.google.common.util.concurrent.ListeningExecutorService;
import org.joda.time.DateTime;
+import org.rhq.server.metrics.MetricsDAO;
import org.rhq.server.metrics.SignalingCountDownLatch;
/**
@@ -15,28 +15,32 @@ import org.rhq.server.metrics.SignalingCountDownLatch;
*/
class AggregationState {
- private ListeningExecutorService aggregationTasks;
+ private DateTime startTime;
- private SignalingCountDownLatch oneHourIndexEntriesArrival;
+ private int batchSize;
- private SignalingCountDownLatch sixHourIndexEntriesArrival;
+ private MetricsDAO dao;
- private AtomicInteger remainingRawData;
+ private ListeningExecutorService aggregationTasks;
- private AtomicInteger remaining1HourData;
+ private Semaphore permits;
- private AtomicInteger remaining6HourData;
+ private SignalingCountDownLatch rawAggregationDone;
- private Set<Integer> oneHourIndexEntries;
+ private SignalingCountDownLatch oneHourAggregationDone;
- private Set<Integer> sixHourIndexEntries;
+ private SignalingCountDownLatch sixHourAggregationDone;
- private ReentrantReadWriteLock oneHourIndexEntriesLock;
+ private AtomicInteger remainingRawData;
+
+ private AtomicInteger remaining1HourData;
- private ReentrantReadWriteLock sixHourIndexEntriesLock;
+ private AtomicInteger remaining6HourData;
private DateTime oneHourTimeSlice;
+ private DateTime oneHourTimeSliceEnd;
+
private DateTime sixHourTimeSlice;
private DateTime sixHourTimeSliceEnd;
@@ -55,6 +59,33 @@ class AggregationState {
private Compute24HourData compute24HourData;
+ DateTime getStartTime() {
+ return startTime;
+ }
+
+ AggregationState setStartTime(DateTime startTime) {
+ this.startTime = startTime;
+ return this;
+ }
+
+ int getBatchSize() {
+ return batchSize;
+ }
+
+ AggregationState setBatchSize(int batchSize) {
+ this.batchSize = batchSize;
+ return this;
+ }
+
+ MetricsDAO getDao() {
+ return dao;
+ }
+
+ AggregationState setDao(MetricsDAO dao) {
+ this.dao = dao;
+ return this;
+ }
+
public ListeningExecutorService getAggregationTasks() {
return aggregationTasks;
}
@@ -64,29 +95,39 @@ class AggregationState {
return this;
}
- /**
- * @return A {@link SignalingCountDownLatch} to signal the arrival of index entries for schedules with 1 hour
- * data to be aggregated
- */
- public SignalingCountDownLatch getOneHourIndexEntriesArrival() {
- return oneHourIndexEntriesArrival;
+ Semaphore getPermits() {
+ return permits;
}
- public AggregationState setOneHourIndexEntriesArrival(SignalingCountDownLatch oneHourIndexEntriesArrival) {
- this.oneHourIndexEntriesArrival = oneHourIndexEntriesArrival;
+ AggregationState setPermits(Semaphore permits) {
+ this.permits = permits;
return this;
}
- /**
- * @return A {@link SignalingCountDownLatch} to signal the arrival of index entries for schedules with 6 hour
- * data to be aggregated
- */
- public SignalingCountDownLatch getSixHourIndexEntriesArrival() {
- return sixHourIndexEntriesArrival;
+ SignalingCountDownLatch getRawAggregationDone() {
+ return rawAggregationDone;
}
- public AggregationState setSixHourIndexEntriesArrival(SignalingCountDownLatch sixHourIndexEntriesArrival) {
- this.sixHourIndexEntriesArrival = sixHourIndexEntriesArrival;
+ AggregationState setRawAggregationDone(SignalingCountDownLatch rawAggregationDone) {
+ this.rawAggregationDone = rawAggregationDone;
+ return this;
+ }
+
+ SignalingCountDownLatch getOneHourAggregationDone() {
+ return oneHourAggregationDone;
+ }
+
+ AggregationState setOneHourAggregationDone(SignalingCountDownLatch oneHourAggregationDone) {
+ this.oneHourAggregationDone = oneHourAggregationDone;
+ return this;
+ }
+
+ SignalingCountDownLatch getSixHourAggregationDone() {
+ return sixHourAggregationDone;
+ }
+
+ AggregationState setSixHourAggregationDone(SignalingCountDownLatch sixHourAggregationDone) {
+ this.sixHourAggregationDone = sixHourAggregationDone;
return this;
}
@@ -126,51 +167,21 @@ class AggregationState {
return this;
}
- /**
- * @return The schedule ids with 1 hour data to be aggregated
- */
- public Set<Integer> getOneHourIndexEntries() {
- return oneHourIndexEntries;
- }
-
- public AggregationState setOneHourIndexEntries(Set<Integer> oneHourIndexEntries) {
- this.oneHourIndexEntries = oneHourIndexEntries;
- return this;
- }
-
- public Set<Integer> getSixHourIndexEntries() {
- return sixHourIndexEntries;
- }
-
- public AggregationState setSixHourIndexEntries(Set<Integer> sixHourIndexEntries) {
- this.sixHourIndexEntries = sixHourIndexEntries;
- return this;
- }
-
- public ReentrantReadWriteLock getOneHourIndexEntriesLock() {
- return oneHourIndexEntriesLock;
- }
-
- public AggregationState setOneHourIndexEntriesLock(ReentrantReadWriteLock oneHourIndexEntriesLock) {
- this.oneHourIndexEntriesLock = oneHourIndexEntriesLock;
- return this;
- }
-
- public ReentrantReadWriteLock getSixHourIndexEntriesLock() {
- return sixHourIndexEntriesLock;
+ public DateTime getOneHourTimeSlice() {
+ return oneHourTimeSlice;
}
- public AggregationState setSixHourIndexEntriesLock(ReentrantReadWriteLock sixHourIndexEntriesLock) {
- this.sixHourIndexEntriesLock = sixHourIndexEntriesLock;
+ public AggregationState setOneHourTimeSlice(DateTime oneHourTimeSlice) {
+ this.oneHourTimeSlice = oneHourTimeSlice;
return this;
}
- public DateTime getOneHourTimeSlice() {
- return oneHourTimeSlice;
+ DateTime getOneHourTimeSliceEnd() {
+ return oneHourTimeSliceEnd;
}
- public AggregationState setOneHourTimeSlice(DateTime oneHourTimeSlice) {
- this.oneHourTimeSlice = oneHourTimeSlice;
+ AggregationState setOneHourTimeSliceEnd(DateTime oneHourTimeSliceEnd) {
+ this.oneHourTimeSliceEnd = oneHourTimeSliceEnd;
return this;
}
@@ -254,4 +265,5 @@ class AggregationState {
this.compute24HourData = compute24HourData;
return this;
}
+
}
diff --git a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/AggregationType.java b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/AggregationType.java
new file mode 100644
index 0000000..4a6c886
--- /dev/null
+++ b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/AggregationType.java
@@ -0,0 +1,25 @@
+package org.rhq.server.metrics.aggregation;
+
+/**
+ * @author John Sanda
+ */
+public enum AggregationType {
+
+ RAW("raw data"),
+
+ ONE_HOUR("one hour data"),
+
+ SIX_HOUR("six hour data");
+
+ private String type;
+
+ private AggregationType(String type) {
+ this.type = type;
+ }
+
+
+ @Override
+ public String toString() {
+ return type;
+ }
+}
diff --git a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Aggregator.java b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Aggregator.java
index d86379e..c1ed47a 100644
--- a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Aggregator.java
+++ b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Aggregator.java
@@ -1,18 +1,17 @@
package org.rhq.server.metrics.aggregation;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Set;
-import java.util.TreeSet;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.Row;
import com.google.common.base.Stopwatch;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
@@ -58,19 +57,11 @@ public class Aggregator {
private DateTime startTime;
- /**
- * Signals when raw data index entries (in metrics_index) can be deleted. We cannot delete the row in metrics_index
- * until we know that it has been read, successfully or otherwise.
- */
- private SignalingCountDownLatch rawDataIndexEntriesArrival;
-
- private int batchSize;
-
private AggregationState state;
private Set<AggregateNumericMetric> oneHourData;
- private AtomicInteger remainingIndexEntries;
+ private int maxParallelism = Integer.parseInt(System.getProperty("rhq.metrics.aggregation.parallelism", "5"));
public Aggregator(ListeningExecutorService aggregationTasks, MetricsDAO dao, MetricsConfiguration configuration,
DateTimeService dtService, DateTime startTime, int batchSize) {
@@ -78,17 +69,22 @@ public class Aggregator {
this.configuration = configuration;
this.dtService = dtService;
this.startTime = startTime;
- this.batchSize = batchSize;
oneHourData = new ConcurrentSkipListSet<AggregateNumericMetric>(AGGREGATE_COMPARATOR);
- rawDataIndexEntriesArrival = new SignalingCountDownLatch(new CountDownLatch(1));
- remainingIndexEntries = new AtomicInteger(1);
DateTime sixHourTimeSlice = get6HourTimeSlice();
DateTime twentyFourHourTimeSlice = get24HourTimeSlice();
state = new AggregationState()
+ .setDao(dao)
+ .setStartTime(startTime)
+ .setBatchSize(batchSize)
.setAggregationTasks(aggregationTasks)
+ .setPermits(new Semaphore(maxParallelism * batchSize, true))
+ .setRawAggregationDone(new SignalingCountDownLatch(new CountDownLatch(1)))
+ .setOneHourAggregationDone(new SignalingCountDownLatch(new CountDownLatch(1)))
+ .setSixHourAggregationDone(new SignalingCountDownLatch(new CountDownLatch(1)))
.setOneHourTimeSlice(startTime)
+ .setOneHourTimeSliceEnd(startTime.plus(configuration.getRawTimeSliceDuration()))
.setSixHourTimeSlice(sixHourTimeSlice)
.setSixHourTimeSliceEnd(sixHourTimeSlice.plus(configuration.getOneHourTimeSliceDuration()))
.setTwentyFourHourTimeSlice(twentyFourHourTimeSlice)
@@ -101,27 +97,7 @@ public class Aggregator {
configuration.getSixHourTimeSliceDuration()))
.setRemainingRawData(new AtomicInteger(0))
.setRemaining1HourData(new AtomicInteger(0))
- .setRemaining6HourData(new AtomicInteger(0))
- .setOneHourIndexEntries(new TreeSet<Integer>())
- .setSixHourIndexEntries(new TreeSet<Integer>())
- .setOneHourIndexEntriesLock(new ReentrantReadWriteLock())
- .setSixHourIndexEntriesLock(new ReentrantReadWriteLock());
-
- if (state.is6HourTimeSliceFinished()) {
- state.setOneHourIndexEntriesArrival(new SignalingCountDownLatch(new CountDownLatch(1)));
- remainingIndexEntries.incrementAndGet();
- } else {
- state.setOneHourIndexEntriesArrival(new SignalingCountDownLatch(new CountDownLatch(0)));
- state.setRemaining1HourData(new AtomicInteger(0));
- }
-
- if (state.is24HourTimeSliceFinished()) {
- state.setSixHourIndexEntriesArrival(new SignalingCountDownLatch(new CountDownLatch(1)));
- remainingIndexEntries.incrementAndGet();
- } else {
- state.setSixHourIndexEntriesArrival(new SignalingCountDownLatch(new CountDownLatch(0)));
- state.setRemaining6HourData(new AtomicInteger(0));
- }
+ .setRemaining6HourData(new AtomicInteger(0));
}
private DateTime get24HourTimeSlice() {
@@ -143,214 +119,63 @@ public class Aggregator {
public Set<AggregateNumericMetric> run() {
log.info("Starting aggregation for time slice " + startTime);
- StorageResultSetFuture rawFuture = dao.findMetricsIndexEntriesAsync(MetricsTable.ONE_HOUR,
- startTime.getMillis());
- Futures.addCallback(rawFuture, new FutureCallback<ResultSet>() {
- @Override
- public void onSuccess(ResultSet result) {
- int schedules = 0;
- List<Row> rows = result.all();
- log.debug("Retrieved " + rows.size() + " schedule ids from raw data index");
- state.getRemainingRawData().set(rows.size());
- rawDataIndexEntriesArrival.countDown();
-
- Stopwatch stopwatch = new Stopwatch().start();
-
- final DateTime endTime = startTime.plus(configuration.getRawTimeSliceDuration());
- Set<Integer> scheduleIds = new TreeSet<Integer>();
- List<StorageResultSetFuture> rawDataFutures = new ArrayList<StorageResultSetFuture>(batchSize);
- for (final Row row : rows) {
- scheduleIds.add(row.getInt(1));
- rawDataFutures.add(dao.findRawMetricsAsync(row.getInt(1), startTime.getMillis(),
- endTime.getMillis()));
- if (rawDataFutures.size() == batchSize) {
- state.getAggregationTasks().submit(new AggregateRawData(dao, state, scheduleIds,
- rawDataFutures));
- schedules += rawDataFutures.size();
- rawDataFutures = new ArrayList<StorageResultSetFuture>();
- scheduleIds = new TreeSet<Integer>();
- }
- }
- if (!rawDataFutures.isEmpty()) {
- state.getAggregationTasks().submit(new AggregateRawData(dao, state, scheduleIds,
- rawDataFutures));
- schedules += rawDataFutures.size();
- }
-
- if (log.isDebugEnabled()) {
- stopwatch.stop();
- log.debug("Finished scheduling raw data aggregation tasks for " + schedules + " schedules in " +
- stopwatch.elapsed(TimeUnit.MILLISECONDS) + " ms");
- }
- }
-
- @Override
- public void onFailure(Throwable t) {
- if (log.isDebugEnabled()) {
- log.debug("Aggregation for time slice [" + startTime + "] cannot proceed. There was an " +
- "unexpected error while retrieving raw data index entries.", t);
- } else {
- log.warn("Aggregation for time slice [" + startTime + "] cannot proceed. There was an " +
- "unexpected error while retrieving raw data index entries: " + ThrowableUtil.getRootMessage(t));
- }
- state.setRemainingRawData(new AtomicInteger(0));
- rawDataIndexEntriesArrival.abort();
- deleteIndexEntries(MetricsTable.ONE_HOUR);
- }
- }, state.getAggregationTasks());
-
- if (state.is6HourTimeSliceFinished()) {
- log.debug("Fetching 1 hour index entries");
- Stopwatch stopwatch = new Stopwatch().start();
- StorageResultSetFuture oneHourFuture = dao.findMetricsIndexEntriesAsync(MetricsTable.SIX_HOUR,
- state.getSixHourTimeSlice().getMillis());
- Futures.addCallback(oneHourFuture, new AggregateIndexEntriesHandler(state.getOneHourIndexEntries(),
- state.getRemaining1HourData(), state.getOneHourIndexEntriesArrival(), stopwatch, "1 hour", "6 hour"),
- state.getAggregationTasks());
- }
-
- if (state.is24HourTimeSliceFinished()) {
- log.debug("Fetching 6 hour index entries");
+ try {
Stopwatch stopwatch = new Stopwatch().start();
- StorageResultSetFuture sixHourFuture = dao.findMetricsIndexEntriesAsync(MetricsTable.TWENTY_FOUR_HOUR,
- state.getTwentyFourHourTimeSlice().getMillis());
- Futures.addCallback(sixHourFuture, new AggregateIndexEntriesHandler(state.getSixHourIndexEntries(),
- state.getRemaining6HourData(), state.getSixHourIndexEntriesArrival(), stopwatch, "6 hour", "24 hour"),
- state.getAggregationTasks());
- }
+ List<MetricsTable> indexUpdates = new ArrayList<MetricsTable>(3);
+ indexUpdates.add(MetricsTable.ONE_HOUR);
+ StorageResultSetFuture rawIndexFuture = dao.findMetricsIndexEntriesAsync(MetricsTable.ONE_HOUR,
+ startTime.getMillis());
+ Futures.addCallback(rawIndexFuture, new RawDataScheduler(state), state.getAggregationTasks());
- try {
- try {
- rawDataIndexEntriesArrival.await();
- } catch (AbortedException e) {
- }
- deleteIndexEntries(MetricsTable.ONE_HOUR);
+ state.getRawAggregationDone().await();
+ stopwatch.stop();
+ log.info("Finished aggregating raw data in " + stopwatch.elapsed(TimeUnit.MILLISECONDS) + " ms");
if (state.is6HourTimeSliceFinished()) {
- boolean is1HourIndexWaitAborted = false;
- waitFor(state.getRemainingRawData());
- try {
- state.getOneHourIndexEntriesArrival().await();
-
- List<StorageResultSetFuture> queryFutures = new ArrayList<StorageResultSetFuture>(batchSize);
- Set<Integer> scheduleIds = new TreeSet<Integer>();
- state.getOneHourIndexEntriesLock().writeLock().lock();
- log.debug("Preparing to submit 1 hour data aggregation tasks for " +
- state.getOneHourIndexEntries().size() + " schedules");
- for (Integer scheduleId : state.getOneHourIndexEntries()) {
- queryFutures.add(dao.findOneHourMetricsAsync(scheduleId, state.getSixHourTimeSlice().getMillis(),
- state.getSixHourTimeSliceEnd().getMillis()));
- scheduleIds.add(scheduleId);
- if (queryFutures.size() == batchSize) {
- state.getAggregationTasks().submit(new Aggregate1HourData(dao, state, scheduleIds,
- queryFutures));
- queryFutures = new ArrayList<StorageResultSetFuture>(batchSize);
- scheduleIds = new TreeSet<Integer>();
- }
- }
- if (!queryFutures.isEmpty()) {
- state.getAggregationTasks().submit(new Aggregate1HourData(dao, state, scheduleIds,
- queryFutures));
- queryFutures = null;
- scheduleIds = null;
- }
- } catch (AbortedException e) {
- is1HourIndexWaitAborted = true;
- if (log.isDebugEnabled()) {
- log.debug("Some 6 hour aggregates may not get generated. There was an unexpected error while " +
- "loading 1 hour index entries", e);
- } else {
- log.warn("Some 6 hour aggregates may not get generated. There was an unexpected error while " +
- "loading 1 hour index entries: " + ThrowableUtil.getRootMessage(e));
- }
- } finally {
- deleteIndexEntries(MetricsTable.SIX_HOUR);
- if (!is1HourIndexWaitAborted) {
- state.getOneHourIndexEntriesLock().writeLock().unlock();
- }
- }
+ log.info("Starting aggregation of 1 hour data");
+ stopwatch.reset().start();
+ indexUpdates.add(MetricsTable.SIX_HOUR);
+ StorageResultSetFuture oneHourIndexFuture = dao.findMetricsIndexEntriesAsync(MetricsTable.SIX_HOUR,
+ state.getSixHourTimeSlice().getMillis());
+ Futures.addCallback(oneHourIndexFuture, new OneHourDataScheduler(state),
+ state.getAggregationTasks());
+
+ state.getOneHourAggregationDone().await();
+ stopwatch.stop();
+ log.info("Finished aggregating one hour data in " + stopwatch.elapsed(TimeUnit.MILLISECONDS) + " ms");
}
if (state.is24HourTimeSliceFinished()) {
- boolean is6HourIndexWaitAborted = false;
- waitFor(state.getRemaining1HourData());
- try {
- state.getSixHourIndexEntriesArrival().await();
-
- List<StorageResultSetFuture> queryFutures = new ArrayList<StorageResultSetFuture>(batchSize);
- Set<Integer> scheduleIds = new TreeSet<Integer>();
- state.getSixHourIndexEntriesLock().writeLock().lock();
- log.debug("Preparing to submit 6 hour data aggregation tasks for " +
- state.getSixHourIndexEntries().size() + " schedules");
- for (Integer scheduleId : state.getSixHourIndexEntries()) {
- queryFutures.add(dao.findSixHourMetricsAsync(scheduleId, state.getTwentyFourHourTimeSlice().getMillis(),
- state.getTwentyFourHourTimeSliceEnd().getMillis()));
- scheduleIds.add(scheduleId);
- if (queryFutures.size() == batchSize) {
- state.getAggregationTasks().submit(new Aggregate6HourData(dao, state, scheduleIds,
- queryFutures));
- queryFutures = new ArrayList<StorageResultSetFuture>(batchSize);
- scheduleIds = new TreeSet<Integer>();
- }
- }
- if (!queryFutures.isEmpty()) {
- state.getAggregationTasks().submit(new Aggregate6HourData(dao, state, scheduleIds,
- queryFutures));
- queryFutures = null;
- scheduleIds = null;
- }
- } catch (AbortedException e) {
- is6HourIndexWaitAborted = true;
- if (log.isDebugEnabled()) {
- log.debug("Some 24 hour aggregates may not get generated. There was an unexpected error while " +
- "loading 6 hour index entries", e);
- } else {
- log.warn("Some 24 hour aggregates may not get generated. There was an unexpected error while " +
- "loading 6 hour index entries: " + ThrowableUtil.getRootMessage(e));
- }
- } finally {
- deleteIndexEntries(MetricsTable.TWENTY_FOUR_HOUR);
- if (!is6HourIndexWaitAborted) {
- state.getSixHourIndexEntriesLock().writeLock().unlock();
- }
- }
+ log.info("Starting aggregation of 6 hour data");
+ stopwatch.reset().start();
+ indexUpdates.add(MetricsTable.TWENTY_FOUR_HOUR);
+ StorageResultSetFuture sixHourIndexFuture = dao.findMetricsIndexEntriesAsync(
+ MetricsTable.TWENTY_FOUR_HOUR, state.getTwentyFourHourTimeSlice().getMillis());
+ Futures.addCallback(sixHourIndexFuture, new SixHourDataScheduler(state),
+ state.getAggregationTasks());
+
+ state.getSixHourAggregationDone().await();
+ stopwatch.stop();
+ log.info("Finished aggregating six hour data in " + stopwatch.elapsed(TimeUnit.MILLISECONDS) + " ms");
}
- long lastUpdated = System.currentTimeMillis();
- while (!isAggregationFinished()) {
- if (log.isDebugEnabled() && ((System.currentTimeMillis() - lastUpdated) >= 30000)) {
- log.debug("Remaining aggregation:\n" +
- "raw data - " + state.getRemainingRawData().get() + "\n" +
- "1 hour data - " + state.getRemaining1HourData().get() + "\n" +
- "6 hour data - " + state.getRemaining6HourData().get() + "\n");
- lastUpdated = System.currentTimeMillis();
- }
- Thread.sleep(3000);
+ CountDownLatch updateIndexSignal = new CountDownLatch(indexUpdates.size());
+ for (MetricsTable table : indexUpdates) {
+ deleteIndexEntries(table, updateIndexSignal);
}
- } catch (InterruptedException e) {
- if (log.isDebugEnabled()) {
- log.debug("An interrupt occurred while waiting for aggregation to finish. Aborting remaining work.", e);
- } else {
- log.warn("An interrupt occurred while waiting for aggregation to finish. Aborting remaining work: " +
- ThrowableUtil.getRootMessage(e));
- }
- log.warn("An interrupt occurred while waiting for aggregation to finish", e);
- }
- return oneHourData;
- }
+ updateIndexSignal.await();
- private void waitFor(AtomicInteger remainingData) throws InterruptedException {
- while (remainingData.get() > 0) {
- Thread.sleep(50);
+ return oneHourData;
+ } catch (InterruptedException e) {
+ log.info("There was an interrupt while waiting for aggregation to finish. Aggregation will be aborted.");
+ return Collections.emptySet();
+ } catch (AbortedException e) {
+ log.warn("Aggregation has been aborted: " + e.getMessage());
+ return Collections.emptySet();
}
}
- private boolean isAggregationFinished() {
- return state.getRemainingRawData().get() <= 0 && state.getRemaining1HourData().get() <= 0 &&
- state.getRemaining6HourData().get() <= 0 && remainingIndexEntries.get() <= 0;
- }
-
- private void deleteIndexEntries(final MetricsTable table) {
+ private void deleteIndexEntries(final MetricsTable table, final CountDownLatch doneSignal) {
final DateTime time;
switch (table) {
case ONE_HOUR:
@@ -368,7 +193,7 @@ public class Aggregator {
Futures.addCallback(future, new FutureCallback<ResultSet>() {
@Override
public void onSuccess(ResultSet result) {
- remainingIndexEntries.decrementAndGet();
+ doneSignal.countDown();
}
@Override
@@ -380,7 +205,7 @@ public class Aggregator {
log.warn("Failed to delete index entries for table " + table + " at time [" + time + "]. An " +
"unexpected error occurred: " + ThrowableUtil.getRootMessage(t));
}
- remainingIndexEntries.decrementAndGet();
+ doneSignal.countDown();
}
});
}
diff --git a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/BatchAggregationScheduler.java b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/BatchAggregationScheduler.java
new file mode 100644
index 0000000..415c677
--- /dev/null
+++ b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/BatchAggregationScheduler.java
@@ -0,0 +1,111 @@
+package org.rhq.server.metrics.aggregation;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.google.common.base.Stopwatch;
+import com.google.common.util.concurrent.AsyncFunction;
+import com.google.common.util.concurrent.FutureCallback;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.rhq.core.util.exception.ThrowableUtil;
+import org.rhq.server.metrics.SignalingCountDownLatch;
+import org.rhq.server.metrics.StorageResultSetFuture;
+
+/**
+ * @author John Sanda
+ */
+abstract class BatchAggregationScheduler implements FutureCallback<ResultSet> {
+
+ private final Log log = LogFactory.getLog(BatchAggregationScheduler.class);
+
+ protected AggregationState state;
+
+ public BatchAggregationScheduler(AggregationState state) {
+ this.state = state;
+ }
+
+ @Override
+ public void onSuccess(ResultSet indexResultSet) {
+ Stopwatch stopwatch = new Stopwatch().start();
+ Stopwatch batchStopwatch = new Stopwatch().start();
+ List<StorageResultSetFuture> queryFutures = new ArrayList<StorageResultSetFuture>(state.getBatchSize());
+ int numSchedules = 0;
+ try {
+ for (Row row : indexResultSet) {
+ state.getPermits().acquire();
+ ++numSchedules;
+ getRemainingSchedules().incrementAndGet();
+ queryFutures.add(findMetricData(row.getInt(1)));
+ if (queryFutures.size() == state.getBatchSize()) {
+ state.getAggregationTasks().submit(new BatchAggregator(createBatchAggregationState(queryFutures,
+ batchStopwatch)));
+ queryFutures = new ArrayList<StorageResultSetFuture>(state.getBatchSize());
+ batchStopwatch = new Stopwatch().start();
+ }
+ }
+ if (!queryFutures.isEmpty()) {
+ state.getAggregationTasks().submit(new BatchAggregator(createBatchAggregationState(queryFutures,
+ batchStopwatch)));
+ }
+ if (numSchedules == 0) {
+ getAggregationDoneSignal().countDown();
+ }
+ stopwatch.stop();
+ if (log.isDebugEnabled()) {
+ log.debug("Finished scheduling " + getAggregationType() + " aggregation tasks for " + numSchedules +
+ " schedules in " + stopwatch.elapsed(TimeUnit.MILLISECONDS) + " ms");
+ }
+ } catch (InterruptedException e) {
+ log.info("There was an interrupt while scheduling aggregation tasks for " + getAggregationType() + ": " +
+ e.getMessage());
+ log.info("Aggregation will be aborted");
+ getAggregationDoneSignal().abort("There was an interrupt while scheduling aggregation tasks for " +
+ getAggregationType() + ": " + e.getMessage());
+ }
+ }
+
+ private BatchAggregationState createBatchAggregationState(List<StorageResultSetFuture> queryFutures,
+ Stopwatch batchStopwatch) {
+ return new BatchAggregationState()
+ .setAggregationTasks(state.getAggregationTasks())
+ .setAggregationType(getAggregationType())
+ .setComputeAggregates(getComputeAggregates())
+ .setDoneSignal(getAggregationDoneSignal())
+ .setPermits(state.getPermits())
+ .setQueryFutures(queryFutures)
+ .setRemainingSchedules(getRemainingSchedules())
+ .setStopwatch(batchStopwatch);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ if (log.isDebugEnabled()) {
+ log.debug("Aggregation for time slice [" + state.getStartTime() + "] cannot proceed. There was an " +
+ "unexpected error while retrieving " + getAggregationType() + " index entries.", t);
+ } else {
+ log.warn("Aggregation for time slice [" + state.getStartTime() + "] cannot proceed. There was an " +
+ "unexpected error while retrieving " + getAggregationType() + " index entries: " +
+ ThrowableUtil.getRootMessage(t));
+ }
+ getAggregationDoneSignal().abort("There was an error while retrieving " + getAggregationType() +
+ " index entries: " + ThrowableUtil.getRootMessage(t));
+ }
+
+ protected abstract SignalingCountDownLatch getAggregationDoneSignal();
+
+ protected abstract AggregationType getAggregationType();
+
+ protected abstract StorageResultSetFuture findMetricData(int scheduleId);
+
+ protected abstract AsyncFunction<List<ResultSet>, List<ResultSet>> getComputeAggregates();
+
+ protected abstract AtomicInteger getRemainingSchedules();
+
+}
diff --git a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/BatchAggregationState.java b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/BatchAggregationState.java
new file mode 100644
index 0000000..4e7985e
--- /dev/null
+++ b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/BatchAggregationState.java
@@ -0,0 +1,108 @@
+package org.rhq.server.metrics.aggregation;
+
+import java.util.List;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.datastax.driver.core.ResultSet;
+import com.google.common.base.Stopwatch;
+import com.google.common.util.concurrent.AsyncFunction;
+import com.google.common.util.concurrent.ListeningExecutorService;
+
+import org.rhq.server.metrics.SignalingCountDownLatch;
+import org.rhq.server.metrics.StorageResultSetFuture;
+
+/**
+ * @author John Sanda
+ */
+class BatchAggregationState {
+
+ private List<StorageResultSetFuture> queryFutures;
+
+ private AsyncFunction<List<ResultSet>, List<ResultSet>> computeAggregates;
+
+ private ListeningExecutorService aggregationTasks;
+
+ private Semaphore permits;
+
+ private AtomicInteger remainingSchedules;
+
+ private SignalingCountDownLatch doneSignal;
+
+ private AggregationType aggregationType;
+
+ private Stopwatch stopwatch;
+
+ AggregationType getAggregationType() {
+ return aggregationType;
+ }
+
+ BatchAggregationState setAggregationType(AggregationType aggregationType) {
+ this.aggregationType = aggregationType;
+ return this;
+ }
+
+ List<StorageResultSetFuture> getQueryFutures() {
+ return queryFutures;
+ }
+
+ BatchAggregationState setQueryFutures(List<StorageResultSetFuture> queryFutures) {
+ this.queryFutures = queryFutures;
+ return this;
+ }
+
+ AsyncFunction<List<ResultSet>, List<ResultSet>> getComputeAggregates() {
+ return computeAggregates;
+ }
+
+ BatchAggregationState setComputeAggregates(AsyncFunction<List<ResultSet>, List<ResultSet>> computeAggregates) {
+ this.computeAggregates = computeAggregates;
+ return this;
+ }
+
+ ListeningExecutorService getAggregationTasks() {
+ return aggregationTasks;
+ }
+
+ BatchAggregationState setAggregationTasks(ListeningExecutorService aggregationTasks) {
+ this.aggregationTasks = aggregationTasks;
+ return this;
+ }
+
+ Semaphore getPermits() {
+ return permits;
+ }
+
+ BatchAggregationState setPermits(Semaphore permits) {
+ this.permits = permits;
+ return this;
+ }
+
+ AtomicInteger getRemainingSchedules() {
+ return remainingSchedules;
+ }
+
+ BatchAggregationState setRemainingSchedules(AtomicInteger remainingSchedules) {
+ this.remainingSchedules = remainingSchedules;
+ return this;
+ }
+
+ SignalingCountDownLatch getDoneSignal() {
+ return doneSignal;
+ }
+
+ BatchAggregationState setDoneSignal(SignalingCountDownLatch doneSignal) {
+ this.doneSignal = doneSignal;
+ return this;
+ }
+
+ Stopwatch getStopwatch() {
+ return stopwatch;
+ }
+
+ BatchAggregationState setStopwatch(Stopwatch stopwatch) {
+ this.stopwatch = stopwatch;
+ return this;
+ }
+
+}
diff --git a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/BatchAggregator.java b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/BatchAggregator.java
new file mode 100644
index 0000000..363a6f2
--- /dev/null
+++ b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/BatchAggregator.java
@@ -0,0 +1,78 @@
+package org.rhq.server.metrics.aggregation;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import com.datastax.driver.core.ResultSet;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.rhq.core.util.exception.ThrowableUtil;
+
+/**
+ * @author John Sanda
+ */
+class BatchAggregator implements Runnable {
+
+ private final Log log = LogFactory.getLog(BatchAggregator.class);
+
+ private BatchAggregationState state;
+
+ public BatchAggregator(BatchAggregationState state) {
+ this.state = state;
+ }
+
+ @Override
+ public void run() {
+ ListenableFuture<List<ResultSet>> queriesFuture = Futures.successfulAsList(state.getQueryFutures());
+ ListenableFuture<List<ResultSet>> insertFutures = Futures.transform(queriesFuture,
+ state.getComputeAggregates(), state.getAggregationTasks());
+ Futures.addCallback(insertFutures, new FutureCallback<List<ResultSet>>() {
+ @Override
+ public void onSuccess(List<ResultSet> result) {
+ updateRemainingSchedules();
+ state.getStopwatch().stop();
+
+ if (log.isDebugEnabled()) {
+ log.debug("Finished aggregating " + state.getAggregationType() + " for " +
+ state.getQueryFutures().size() + " schedules in " +
+ state.getStopwatch().elapsed(TimeUnit.MILLISECONDS) + " ms");
+ }
+
+ state.getPermits().release(state.getQueryFutures().size());
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ if (log.isDebugEnabled()) {
+ log.debug("There was an error during " + state.getAggregationType() + " aggregation",
+ ThrowableUtil.getRootCause(t));
+ } else {
+ log.warn("There was an error during " + state.getAggregationType() + " aggregation: " +
+ ThrowableUtil.getRootMessage(t));
+ }
+ state.getPermits().release(state.getQueryFutures().size());
+ updateRemainingSchedules();
+ }
+ }, state.getAggregationTasks());
+ }
+
+ private void updateRemainingSchedules() {
+ int count = state.getRemainingSchedules().addAndGet(-state.getQueryFutures().size());
+ if (log.isDebugEnabled()) {
+ log.debug("There are " + count + " remaining schedules with " + state.getAggregationType() +
+ " to be aggregated");
+ }
+ if (count == 0) {
+ state.getDoneSignal().countDown();
+ } else if (count < 0) {
+ log.warn("The number of remaining schedules should never be less that zero. ");
+ state.getDoneSignal().abort("There are " + count + " remaining schedules with " +
+ state.getAggregationType() + " to be aggregated. The count should never be less than zero.");
+ }
+ }
+}
diff --git a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/OneHourDataScheduler.java b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/OneHourDataScheduler.java
new file mode 100644
index 0000000..d172f08
--- /dev/null
+++ b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/OneHourDataScheduler.java
@@ -0,0 +1,47 @@
+package org.rhq.server.metrics.aggregation;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.datastax.driver.core.ResultSet;
+import com.google.common.util.concurrent.AsyncFunction;
+
+import org.rhq.server.metrics.SignalingCountDownLatch;
+import org.rhq.server.metrics.StorageResultSetFuture;
+
+/**
+ * @author John Sanda
+ */
+class OneHourDataScheduler extends BatchAggregationScheduler {
+
+ public OneHourDataScheduler(AggregationState state) {
+ super(state);
+ }
+
+ @Override
+ protected SignalingCountDownLatch getAggregationDoneSignal() {
+ return state.getOneHourAggregationDone();
+ }
+
+ @Override
+ protected AggregationType getAggregationType() {
+ return AggregationType.ONE_HOUR;
+ }
+
+ @Override
+ protected StorageResultSetFuture findMetricData(int scheduleId) {
+ return state.getDao().findOneHourMetricsAsync(scheduleId, state.getSixHourTimeSlice().getMillis(),
+ state.getSixHourTimeSliceEnd().getMillis());
+ }
+
+ @Override
+ protected AsyncFunction<List<ResultSet>, List<ResultSet>> getComputeAggregates() {
+ return state.getCompute6HourData();
+ }
+
+ @Override
+ protected AtomicInteger getRemainingSchedules() {
+ return state.getRemaining1HourData();
+ }
+
+}
diff --git a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/RawDataScheduler.java b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/RawDataScheduler.java
new file mode 100644
index 0000000..cd20aad
--- /dev/null
+++ b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/RawDataScheduler.java
@@ -0,0 +1,47 @@
+package org.rhq.server.metrics.aggregation;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.datastax.driver.core.ResultSet;
+import com.google.common.util.concurrent.AsyncFunction;
+
+import org.rhq.server.metrics.SignalingCountDownLatch;
+import org.rhq.server.metrics.StorageResultSetFuture;
+
+/**
+ * @author John Sanda
+ */
+class RawDataScheduler extends BatchAggregationScheduler {
+
+ public RawDataScheduler(AggregationState state) {
+ super(state);
+ }
+
+ @Override
+ protected SignalingCountDownLatch getAggregationDoneSignal() {
+ return state.getRawAggregationDone();
+ }
+
+ @Override
+ protected AggregationType getAggregationType() {
+ return AggregationType.RAW;
+ }
+
+ @Override
+ protected StorageResultSetFuture findMetricData(int scheduleId) {
+ return state.getDao().findRawMetricsAsync(scheduleId, state.getOneHourTimeSlice().getMillis(),
+ state.getOneHourTimeSliceEnd().getMillis());
+ }
+
+ @Override
+ protected AsyncFunction<List<ResultSet>, List<ResultSet>> getComputeAggregates() {
+ return state.getCompute1HourData();
+ }
+
+ @Override
+ protected AtomicInteger getRemainingSchedules() {
+ return state.getRemainingRawData();
+ }
+
+}
diff --git a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/SixHourDataScheduler.java b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/SixHourDataScheduler.java
new file mode 100644
index 0000000..1dc17a5
--- /dev/null
+++ b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/SixHourDataScheduler.java
@@ -0,0 +1,46 @@
+package org.rhq.server.metrics.aggregation;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.datastax.driver.core.ResultSet;
+import com.google.common.util.concurrent.AsyncFunction;
+
+import org.rhq.server.metrics.SignalingCountDownLatch;
+import org.rhq.server.metrics.StorageResultSetFuture;
+
+/**
+ * @author John Sanda
+ */
+class SixHourDataScheduler extends BatchAggregationScheduler {
+
+ public SixHourDataScheduler(AggregationState state) {
+ super(state);
+ }
+
+ @Override
+ protected SignalingCountDownLatch getAggregationDoneSignal() {
+ return state.getSixHourAggregationDone();
+ }
+
+ @Override
+ protected AggregationType getAggregationType() {
+ return AggregationType.SIX_HOUR;
+ }
+
+ @Override
+ protected StorageResultSetFuture findMetricData(int scheduleId) {
+ return state.getDao().findSixHourMetricsAsync(scheduleId, state.getTwentyFourHourTimeSlice().getMillis(),
+ state.getTwentyFourHourTimeSliceEnd().getMillis());
+ }
+
+ @Override
+ protected AsyncFunction<List<ResultSet>, List<ResultSet>> getComputeAggregates() {
+ return state.getCompute24HourData();
+ }
+
+ @Override
+ protected AtomicInteger getRemainingSchedules() {
+ return state.getRemaining6HourData();
+ }
+}
10 years, 3 months
[rhq] modules/enterprise
by Heiko W. Rupp
modules/enterprise/gui/coregui/src/main/resources/org/rhq/coregui/client/Messages.properties | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
New commits:
commit b5a6da4f2560725799fde0e033cf51e424bc9e5d
Author: Heiko W. Rupp <hwr(a)redhat.com>
Date: Mon Feb 3 09:38:57 2014 +0100
BZ 1039188 - fix a typo.
diff --git a/modules/enterprise/gui/coregui/src/main/resources/org/rhq/coregui/client/Messages.properties b/modules/enterprise/gui/coregui/src/main/resources/org/rhq/coregui/client/Messages.properties
index 686b9bb..6be2e1b 100644
--- a/modules/enterprise/gui/coregui/src/main/resources/org/rhq/coregui/client/Messages.properties
+++ b/modules/enterprise/gui/coregui/src/main/resources/org/rhq/coregui/client/Messages.properties
@@ -749,14 +749,14 @@ view_adminTopology_storageNodes_clusterSettings_deployments = New Deployment Set
view_adminTopology_storageNodes_clusterSettings_deployments_autoDeploy = If this is set, the newly installed Storage Nodes will be automatically deployed to the Storage Cluster. It only applies to new installations.
view_adminTopology_storageNodes_clusterSettings_deployments_autoDeploy_title = Automatic Deployment
view_adminTopology_storageNodes_clusterSettings_deployments_desc = Only applies to new installations.
-view_adminTopology_storageNodes_clusterSettings_message_cantLoad = Unable to load common Storage Cluster configuration:
+view_adminTopology_storageNodes_clusterSettings_message_cantLoad = Unable to load common Storage Cluster configuration:
view_adminTopology_storageNodes_clusterSettings_message_confirmation = Changing the cluster wide configuration will eventually affect all the Storage Nodes. Do you want to continue?
view_adminTopology_storageNodes_clusterSettings_message_updateFail = Unable to update the Storage Node settings.
view_adminTopology_storageNodes_clusterSettings_message_updateSuccess = Storage Cluster settings were successfully updated.
view_adminTopology_storageNodes_detail_associatedResource = Associated Resource
view_adminTopology_storageNodes_detail_chart = Chart
view_adminTopology_storageNodes_detail_configuration = Configuration
-view_adminTopology_storageNodes_detail_errorAlertFetch = Unable to fetch alerts for storage node with id {0}. Caused by:
+view_adminTopology_storageNodes_detail_errorAlertFetch = Unable to fetch alerts for storage node with id {0}. Caused by:
view_adminTopology_storageNodes_detail_errorDeployment = Deployment error
view_adminTopology_storageNodes_detail_errorFailedDeployOp = Failed deployment operation
view_adminTopology_storageNodes_detail_errorFailedUneployOp = Failed undeployment operation
@@ -1338,7 +1338,7 @@ view_bundle_createWizard_groupsStep_assigned = The new bundle version is for an
view_bundle_createWizard_groupsStep_failedAssign = Failed to assigned initial bundle groups to a bundle named [{0}] with a version of [{1}]. Please cancel the create wizard and notify your administrator.
view_bundle_createWizard_groupsStep_failedGetAssignable = Failed to determine assignable bundle groups. Please cancel the create wizard and notify your administrator.
view_bundle_createWizard_groupsStep_help = A new bundle is created when uploading the first version for that bundle. The new bundle is then assigned to its initial bundle groups. A user can only assign the new bundle to bundle groups for which he has Create Bundles permission, either global or at the bundle group level. At least one bundle group must be assigned unless the user has global Create and global View Bundles permission, in which case it can be left unassigned.
-view_bundle_createWizard_groupsStep_leaveUnassigned = Leave the new bundle unsassigned.
+view_bundle_createWizard_groupsStep_leaveUnassigned = Leave the new bundle unassigned.
view_bundle_createWizard_groupsStep_noAssignable = Unable to create initial bundle version because the user has no bundle groups to which it can be assigned. Please cancel the create wizard and notify your administrator.
view_bundle_createWizard_groupsStep_noneAssigned = The new bundle version must be assigned to at least one bundle group!
view_bundle_createWizard_groupsStep_radioTitle = Initial bundle group assignment for the new bundle
10 years, 3 months
[rhq] modules/core modules/plugins
by Jay Shaughnessy
modules/core/arquillian-integration/container/src/main/java/org/rhq/test/arquillian/FakeServerInventory.java | 8 -
modules/core/plugin-test-util/src/main/java/org/rhq/core/plugin/testutil/AbstractAgentPluginTest.java | 73 +++++++++-
modules/plugins/jboss-as-7/pom.xml | 47 ++++--
modules/plugins/jboss-as-7/src/test/java/org/rhq/modules/plugins/jbossas7/itest/AbstractJBossAS7PluginTest.java | 1
modules/plugins/jboss-as-7/src/test/java/org/rhq/modules/plugins/jbossas7/itest/domain/SecurityModuleOptionsTest.java | 41 +++--
modules/plugins/jboss-as-7/src/test/java/org/rhq/modules/plugins/jbossas7/itest/standalone/DeploymentRuntimeResourcesTest.java | 2
modules/plugins/jboss-as-7/src/test/java/org/rhq/modules/plugins/jbossas7/itest/standalone/DeploymentTest.java | 3
modules/plugins/jboss-as-7/src/test/java/org/rhq/modules/plugins/jbossas7/itest/standalone/ResourcesStandaloneServerTest.java | 57 ++++---
modules/plugins/jboss-as-7/src/test/java/org/rhq/modules/plugins/jbossas7/itest/standalone/TemplatedResourcesTest.java | 6
9 files changed, 173 insertions(+), 65 deletions(-)
New commits:
commit 746acfd0473f74a33f47b734f036a76659f1ee65
Author: Jay Shaughnessy <jshaughn(a)redhat.com>
Date: Fri Jan 31 19:50:23 2014 -0500
AS7 i-test work, continued...
- break up the test runs into a few different executions, deployment tests
seem to benefit from running independently
- add new waitForAsyncDiscoveryToStabilize() mechanism which is based on
tree size stabilizing as opposed to tree depth being reached. For large
discoveries (like an AS-7) tree depth is reached well before the whole
tree is discovered. Started using this as needed.
- fix various tests in various ways, see diffs
diff --git a/modules/core/arquillian-integration/container/src/main/java/org/rhq/test/arquillian/FakeServerInventory.java b/modules/core/arquillian-integration/container/src/main/java/org/rhq/test/arquillian/FakeServerInventory.java
index a553a5d..36fb392 100644
--- a/modules/core/arquillian-integration/container/src/main/java/org/rhq/test/arquillian/FakeServerInventory.java
+++ b/modules/core/arquillian-integration/container/src/main/java/org/rhq/test/arquillian/FakeServerInventory.java
@@ -99,12 +99,12 @@ public class FakeServerInventory {
synchronized (sync) {
if (!depthReached) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Waiting for the discovery to complete on " + this);
+ LOG.debug("Waiting for the discovery depth to be reached on " + this);
}
sync.wait(timeoutMillis);
} else {
if (LOG.isDebugEnabled()) {
- LOG.debug("Discovery already complete... no need to wait on " + this);
+ LOG.debug("Discovery depth already reached... no need to wait on " + this);
}
}
@@ -112,6 +112,10 @@ public class FakeServerInventory {
}
}
+ public int getExpectedDepth() {
+ return expectedDepth;
+ }
+
private void setDepth(int resourceTreeDepth) {
synchronized (sync) {
if (LOG.isDebugEnabled()) {
diff --git a/modules/core/plugin-test-util/src/main/java/org/rhq/core/plugin/testutil/AbstractAgentPluginTest.java b/modules/core/plugin-test-util/src/main/java/org/rhq/core/plugin/testutil/AbstractAgentPluginTest.java
index c7a2333..e904c56 100644
--- a/modules/core/plugin-test-util/src/main/java/org/rhq/core/plugin/testutil/AbstractAgentPluginTest.java
+++ b/modules/core/plugin-test-util/src/main/java/org/rhq/core/plugin/testutil/AbstractAgentPluginTest.java
@@ -194,20 +194,89 @@ public abstract class AbstractAgentPluginTest extends Arquillian {
protected abstract int getTypeHierarchyDepth();
+ /**
+ * Note - this is still a bit weak. Waiting for a discovery depth to be reached does not mean that discovery
+ * is actually complete. For many simple test hierarchies it is sufficient, but for a large scale integration
+ * test, requiring full discovery of an AS-7 server (for example), it can reach the target depth well before
+ * the entire tree is discovered and populated. In cases where you want to better ensure complete discovery,
+ * try making a call to {@link #waitForAsyncDiscoveryToStabilize(Resource)}.
+ *
+ * @throws Exception
+ */
@AfterDiscovery
protected void waitForAsyncDiscoveries() throws Exception {
try {
+ System.out.println("\n====== Waiting for Discovery Depth [" + discoveryCompleteChecker.getExpectedDepth()
+ + "] to be Reached...");
discoveryCompleteChecker.waitForDiscoveryComplete(12000);
- System.out.println("\n====== Discovery completed.");
+ System.out.println("\n====== Discovery Depth Reached.");
} catch (InterruptedException e) {
- throw new RuntimeException("Discovery did not complete within 12 seconds.");
+ throw new RuntimeException("Discovery depth not reached within 12 seconds.");
}
+
// Wait a while longer to give all Resource components a chance to start.
// TODO: Do this more intelligently so we don't sleep longer than needed.
Thread.sleep(10000);
}
/**
+ * Note - this is stronger than {@link #waitForAsyncDiscoveries()} but can be slower. It waits until the
+ * discovered tree size stabilizes, which may take longer than hitting a target tree depth.
+ * Tree depth may be sufficient for many simple test hierarchies but for a large scale integration
+ * test, requiring full discovery of an AS-7 server (for example), it can reach the target depth well before
+ * the entire tree is discovered and populated.
+ * </p>
+ * This is equivalent to {{waitForAsyncDiscoveryToStabilize(root, 5000L, 5)}}.
+ *
+ * @throws Exception
+ */
+ protected void waitForAsyncDiscoveryToStabilize(Resource root) throws Exception {
+ waitForAsyncDiscoveryToStabilize(root, 5000L, 5);
+ }
+
+ /**
+ * @param root
+ * @param checkInterval how long between checks of the tree size
+ * @param stableCount how many checks must be the same before we're convinced we're stable
+ * @throws Exception
+ */
+ protected void waitForAsyncDiscoveryToStabilize(Resource root, long checkInterval, int stableCount)
+ throws Exception {
+ int startResCount = 0;
+ int endResCount = getResCount(root);
+ int numStableChecks = 0;
+ log.info("waitForAsyncDiscoveryToStabilize: ResourceCount Start=" + endResCount);
+ do {
+ startResCount = endResCount;
+ try {
+ Thread.sleep(checkInterval);
+ } catch (InterruptedException e) {
+ //
+ }
+ endResCount = getResCount(root);
+
+ if (startResCount == endResCount) {
+ ++numStableChecks;
+ } else {
+ numStableChecks = 0;
+ }
+ } while (startResCount < endResCount || numStableChecks < stableCount);
+ log.info("waitForAsyncDiscoveryToStabilize: ResourceCount Stable at=" + endResCount);
+ }
+
+ private int getResCount(Resource resource) {
+ int size = 1;
+ Set<Resource> children = resource.getChildResources();
+ if (null != children && !children.isEmpty()) {
+ HashSet<Resource> safeChildren = new HashSet<Resource>(children);
+ for (Resource r : safeChildren) {
+ size += getResCount(r);
+ }
+ }
+ return size;
+ }
+
+ /**
* Get availability for a Resource synchronously, with a 5 second timeout.
*
* @param resource the Resource
diff --git a/modules/plugins/jboss-as-7/pom.xml b/modules/plugins/jboss-as-7/pom.xml
index 7827000..401f21c 100644
--- a/modules/plugins/jboss-as-7/pom.xml
+++ b/modules/plugins/jboss-as-7/pom.xml
@@ -625,19 +625,6 @@
<configuration>
<skipTests>false</skipTests>
<skipITs>false</skipITs>
- <includes>
- <!-- only include integration tests; normal unit tests are handled above by surefire plugin -->
- <include>org/rhq/modules/plugins/jbossas7/itest/**/*Test.java</include>
- <!--<include>org/rhq/modules/plugins/jbossas7/itest/standalone/StandaloneServerComponentTest.java</include>-->
- <!--<include>org/rhq/modules/plugins/jbossas7/itest/standalone/InterruptibleOperationsTest.java</include>-->
- <!--<include>org/rhq/modules/plugins/jbossas7/itest/standalone/DeploymentTest.java</include>-->
- <!--<include>org/rhq/modules/plugins/jbossas7/itest/standalone/DeploymentRuntimeResourcesTest.java</include>-->
- <!--<include>org/rhq/modules/plugins/jbossas7/itest/standalone/WebConnectorComponentTest.java</include>-->
- <!--<include>org/rhq/modules/plugins/jbossas7/itest/domain/DomainServerComponentTest.java</include>-->
- <!--<include>org/rhq/modules/plugins/jbossas7/itest/domain/ManagedServerTest.java</include>-->
- <!--<include>org/rhq/modules/plugins/jbossas7/itest/domain/SecurityModuleOptionsTest.java</include>-->
- <!--<include>org/rhq/modules/plugins/jbossas7/itest/nonpc/ManagementConnectionPersistenceTest.java</include>-->
- </includes>
<properties>
<property>
<name>listener</name>
@@ -658,7 +645,41 @@
</systemPropertyVariables>
</configuration>
<executions>
+ <!-- Some of these test classes seem to do better when run by themseleves. So, use different executions... -->
<execution>
+ <id>DeploymentRuntimeResourcesTest</id>
+ <configuration>
+ <includes>
+ <include>org/rhq/modules/plugins/jbossas7/itest/standalone/DeploymentRuntimeResourcesTest.java</include>
+ </includes>
+ </configuration>
+ <goals>
+ <goal>integration-test</goal>
+ <goal>verify</goal>
+ </goals>
+ </execution>
+ <execution>
+ <id>DeploymentTest</id>
+ <configuration>
+ <includes>
+ <include>org/rhq/modules/plugins/jbossas7/itest/standalone/DeploymentTest.java</include>
+ </includes>
+ </configuration>
+ <goals>
+ <goal>integration-test</goal>
+ <goal>verify</goal>
+ </goals>
+ </execution>
+ <execution>
+ <id>everything-else</id>
+ <configuration>
+ <includes>
+ <include>org/rhq/modules/plugins/jbossas7/itest/**/*Test.java</include>
+ </includes>
+ <excludes>
+ <exclude>org/rhq/modules/plugins/jbossas7/itest/standalone/Deployment*Test.java</exclude>
+ </excludes>
+ </configuration>
<goals>
<goal>integration-test</goal>
<goal>verify</goal>
diff --git a/modules/plugins/jboss-as-7/src/test/java/org/rhq/modules/plugins/jbossas7/itest/AbstractJBossAS7PluginTest.java b/modules/plugins/jboss-as-7/src/test/java/org/rhq/modules/plugins/jbossas7/itest/AbstractJBossAS7PluginTest.java
index 780d398..db7e35a 100644
--- a/modules/plugins/jboss-as-7/src/test/java/org/rhq/modules/plugins/jbossas7/itest/AbstractJBossAS7PluginTest.java
+++ b/modules/plugins/jboss-as-7/src/test/java/org/rhq/modules/plugins/jbossas7/itest/AbstractJBossAS7PluginTest.java
@@ -72,7 +72,6 @@ public abstract class AbstractJBossAS7PluginTest extends AbstractAgentPluginTest
protected void installManagementUsers() throws PluginContainerException, Exception {
waitForAsyncDiscoveries();
- System.out.println("\n=== Discovery scan completed.");
if (!createdManagementUsers) {
System.out.println("====== Installing management users...");
diff --git a/modules/plugins/jboss-as-7/src/test/java/org/rhq/modules/plugins/jbossas7/itest/domain/SecurityModuleOptionsTest.java b/modules/plugins/jboss-as-7/src/test/java/org/rhq/modules/plugins/jbossas7/itest/domain/SecurityModuleOptionsTest.java
index 182ef36..ead0224 100644
--- a/modules/plugins/jboss-as-7/src/test/java/org/rhq/modules/plugins/jbossas7/itest/domain/SecurityModuleOptionsTest.java
+++ b/modules/plugins/jboss-as-7/src/test/java/org/rhq/modules/plugins/jbossas7/itest/domain/SecurityModuleOptionsTest.java
@@ -79,6 +79,9 @@ public class SecurityModuleOptionsTest extends AbstractJBossAS7PluginTest {
private static String SECURITY_RESOURCE_TYPE = "Security";
private static String SECURITY_RESOURCE_KEY = "subsystem=security";
private static String SECURITY_DOMAIN_RESOURCE_KEY = "security-domain";
+ // Out of box:
+ // The full-ha profile is associated with other-server-group
+ // server-three is in other-server-group and not started
private static String PROFILE = "profile=full-ha";
private static String SECURITY_DOMAIN_RESOURCE_TYPE = "Security Domain";
private static String AUTH_CLASSIC_RESOURCE_TYPE = "Authentication (Classic)";
@@ -119,9 +122,10 @@ public class SecurityModuleOptionsTest extends AbstractJBossAS7PluginTest {
"[{\"code\":\"Test\", \"type\":\"attribute\", \"module-options\":{\"mapping\":\"module\", \"mapping1\":\"module1\"}}]");
jsonMap.put("provider-modules",
"[{\"code\":\"Providers\", \"module-options\":{\"provider\":\"module\", \"provider1\":\"module1\"}}]");
- jsonMap
- .put("acl-modules",
- "[{\"flag\":\"sufficient\", \"code\":\"ACL\", \"module-options\":{\"acl\":\"module\", \"acl1\":\"module1\"}}]");
+ // (jshaughn) commenting out, this caused an NPE (EAP 6.0.1), not sure why...
+ //jsonMap
+ // .put("acl-modules",
+ // "[{\"flag\":\"sufficient\", \"code\":\"ACL\", \"module-options\":{\"acl\":\"module\", \"acl1\":\"module1\"}}]");
jsonMap
.put("trust-modules",
"[{\"flag\":\"optional\", \"code\":\"TRUST\", \"module-options\":{\"trust\":\"module\", \"trust1\":\"module1\"}}]");
@@ -156,9 +160,6 @@ public class SecurityModuleOptionsTest extends AbstractJBossAS7PluginTest {
mapper = new ObjectMapper();
mapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, false);
- //Adjust discovery depth to support deeper hierarchy depth of Module Option elements
- setMaxDiscoveryDepthOverride(12);
-
//create new Security Domain
Address destination = new Address(PROFILE);
destination.addSegment(SECURITY_RESOURCE_KEY);
@@ -200,6 +201,7 @@ public class SecurityModuleOptionsTest extends AbstractJBossAS7PluginTest {
assert false : "An unknown attribute '" + attribute
+ "' was found. Is there a new type to be supported?";
}
+
//build the operation to add the component
////Load json map into ModuleOptionType
List<Value> moduleTypeValue = new ArrayList<Value>();
@@ -222,6 +224,8 @@ public class SecurityModuleOptionsTest extends AbstractJBossAS7PluginTest {
moduleTypeValue);
//submit the command
result = connection.execute(op);
+ assert result.getOutcome().equals("success") : "Add ModuleOptionType has failed: "
+ + result.getFailureDescription();
}
}
@@ -236,9 +240,8 @@ public class SecurityModuleOptionsTest extends AbstractJBossAS7PluginTest {
assertNotNull(platform);
assertEquals(platform.getInventoryStatus(), InventoryStatus.COMMITTED);
- //Have thread sleep longer to discover deeper resource types.
- //spinder 6/29/12: up this number if the resources are not being discovered.
- Thread.sleep(240 * 1000L); // delay so that PC gets a chance to scan for resources
+ // ensure the entire EAP inventory is discovered before continuing, we need deep resources in inventory
+ waitForAsyncDiscoveryToStabilize(platform);
}
/** This test method exercises a number of things:
@@ -273,8 +276,7 @@ public class SecurityModuleOptionsTest extends AbstractJBossAS7PluginTest {
//check the configuration for the Module Option Type Ex. 'Acl (Profile)' Resource. Should be able to verify components
Resource moduleOptionsTypeResource = getModuleOptionTypeResource(attribute);
//assert non-zero id returned
- assert moduleOptionsTypeResource.getId() > 0 : "The resource was not properly initialized. Expected id >0 but got:"
- + moduleOptionsTypeResource.getId();
+ assert moduleOptionsTypeResource.getId() != 0 : "The resource was not properly initialized. Expected id != 0";
//Now request the resource complete with resource config
Configuration loadedConfiguration = testConfigurationManager
@@ -331,8 +333,7 @@ public class SecurityModuleOptionsTest extends AbstractJBossAS7PluginTest {
//Ex. Module Options for (Acl Modules - Profile)
Resource moduleOptionsResource = getModuleOptionsResource(moduleOptionsTypeResource, attribute);
//assert non-zero id returned
- assert moduleOptionsResource.getId() > 0 : "The resource was not properly initialized. Expected id > 0 but got:"
- + moduleOptionsResource.getId();
+ assert moduleOptionsResource.getId() != 0 : "The resource was not properly initialized. Expected id != 0";
//fetch configuration for module options
//Now request the resource complete with resource config
Configuration loadedOptionsConfiguration = testConfigurationManager
@@ -487,19 +488,28 @@ public class SecurityModuleOptionsTest extends AbstractJBossAS7PluginTest {
if (testSecurityDomain == null) {
InventoryManager im = pluginContainer.getInventoryManager();
Resource platform = im.getPlatform();
+ if (platform != null)
+ System.out.println("*** Found Platform [" + platform.getResourceKey() + "]");
//host controller
Resource hostController = getResourceByTypeAndKey(platform, DomainServerComponentTest.RESOURCE_TYPE,
DomainServerComponentTest.RESOURCE_KEY);
+ if (hostController != null)
+ System.out.println("*** Found Host Controller [" + hostController.getResourceKey() + "]");
+
//profile=full-ha
ResourceType profileType = new ResourceType("Profile", PLUGIN_NAME, ResourceCategory.SERVICE, null);
String key = PROFILE;
Resource profile = getResourceByTypeAndKey(hostController, profileType, key);
+ if (profile != null)
+ System.out.println("*** Found Profile [" + platform.getResourceKey() + "]");
//Security (Profile)
ResourceType securityType = new ResourceType("Security (Profile)", PLUGIN_NAME, ResourceCategory.SERVICE,
null);
key += "," + SECURITY_RESOURCE_KEY;
Resource security = getResourceByTypeAndKey(profile, securityType, key);
+ if (security != null)
+ System.out.println("*** Found Security [" + security.getResourceKey() + "]");
//Security Domain (Profile)
ResourceType domainType = new ResourceType("Security Domain (Profile)", PLUGIN_NAME,
@@ -507,6 +517,8 @@ public class SecurityModuleOptionsTest extends AbstractJBossAS7PluginTest {
key += "," + securityDomainId;
testSecurityDomainKey = key;
testSecurityDomain = getResourceByTypeAndKey(security, domainType, key);
+ if (testSecurityDomain != null)
+ System.out.println("*** Found Domain [" + testSecurityDomain.getResourceKey() + "]");
}
//acl=classic
@@ -546,8 +558,11 @@ public class SecurityModuleOptionsTest extends AbstractJBossAS7PluginTest {
}
moduleOptionResource = getResourceByTypeAndKey(testSecurityDomain, moduleOptionType, moduleOptionTypeKey);
+ if (moduleOptionResource != null)
+ System.out.println("*** Found ModuleOption [" + moduleOptionResource.getResourceKey() + "]");
return moduleOptionResource;
+
}
/** Automates hierarchy creation for Module Option type resources and their parents
diff --git a/modules/plugins/jboss-as-7/src/test/java/org/rhq/modules/plugins/jbossas7/itest/standalone/DeploymentRuntimeResourcesTest.java b/modules/plugins/jboss-as-7/src/test/java/org/rhq/modules/plugins/jbossas7/itest/standalone/DeploymentRuntimeResourcesTest.java
index 3747821..1f915d5 100644
--- a/modules/plugins/jboss-as-7/src/test/java/org/rhq/modules/plugins/jbossas7/itest/standalone/DeploymentRuntimeResourcesTest.java
+++ b/modules/plugins/jboss-as-7/src/test/java/org/rhq/modules/plugins/jbossas7/itest/standalone/DeploymentRuntimeResourcesTest.java
@@ -65,7 +65,7 @@ import org.rhq.test.arquillian.RunDiscovery;
*
* @author Thomas Segismont
*/
-@Test
+@Test(groups = { "integration", "pc", "standalone" }, singleThreaded = true)
public class DeploymentRuntimeResourcesTest extends AbstractJBossAS7PluginTest {
private static final Log LOG = LogFactory.getLog(DeploymentRuntimeResourcesTest.class);
diff --git a/modules/plugins/jboss-as-7/src/test/java/org/rhq/modules/plugins/jbossas7/itest/standalone/DeploymentTest.java b/modules/plugins/jboss-as-7/src/test/java/org/rhq/modules/plugins/jbossas7/itest/standalone/DeploymentTest.java
index fbdc02b..c3a8ec4 100644
--- a/modules/plugins/jboss-as-7/src/test/java/org/rhq/modules/plugins/jbossas7/itest/standalone/DeploymentTest.java
+++ b/modules/plugins/jboss-as-7/src/test/java/org/rhq/modules/plugins/jbossas7/itest/standalone/DeploymentTest.java
@@ -127,7 +127,6 @@ public class DeploymentTest extends AbstractJBossAS7PluginTest {
serverResource = standaloneResources.iterator().next();
}
- //(dependsOnMethods = "assignServerResource")
@Test(priority = 11)
public void testDeploy() throws Exception {
ResourcePackageDetails packageDetails = getTestDeploymentPackageDetails(TestDeployments.DEPLOYMENT_1);
@@ -215,7 +214,6 @@ public class DeploymentTest extends AbstractJBossAS7PluginTest {
}
@Test(priority = 15)
- @RunDiscovery
public void testWebRuntimeMetricsHaveNonNullValues() throws Exception {
assertTrue(webRuntimeResources != null && !webRuntimeResources.isEmpty(),
"Web Runtime resource should have been discovered");
@@ -237,7 +235,6 @@ public class DeploymentTest extends AbstractJBossAS7PluginTest {
}
@Test(priority = 16)
- @RunDiscovery
public void testUndeploy() throws Exception {
Resource deployment = deploymentResources.iterator().next();
DeleteResourceRequest request = new DeleteResourceRequest(0, deployment.getId());
diff --git a/modules/plugins/jboss-as-7/src/test/java/org/rhq/modules/plugins/jbossas7/itest/standalone/ResourcesStandaloneServerTest.java b/modules/plugins/jboss-as-7/src/test/java/org/rhq/modules/plugins/jbossas7/itest/standalone/ResourcesStandaloneServerTest.java
index 8d391a1..e918d50 100644
--- a/modules/plugins/jboss-as-7/src/test/java/org/rhq/modules/plugins/jbossas7/itest/standalone/ResourcesStandaloneServerTest.java
+++ b/modules/plugins/jboss-as-7/src/test/java/org/rhq/modules/plugins/jbossas7/itest/standalone/ResourcesStandaloneServerTest.java
@@ -55,16 +55,35 @@ public class ResourcesStandaloneServerTest extends AbstractJBossAS7PluginTest {
assertNotNull(platform);
assertEquals(platform.getInventoryStatus(), InventoryStatus.COMMITTED);
- Thread.sleep(20 * 1000L);
-
ResourceContainer platformContainer = inventoryManager.getResourceContainer(platform);
Resource server = getResourceByTypeAndKey(platform, StandaloneServerComponentTest.RESOURCE_TYPE,
StandaloneServerComponentTest.RESOURCE_KEY);
inventoryManager.activateResource(server, platformContainer, false);
-
- Thread.sleep(40 * 1000L);
}
+ @Test(priority = 11)
+ public void standaloneExecuteNoArgOperations() throws Exception {
+ List<String> ignoredSubsystems = new ArrayList<String>();
+
+ //ignored because mod_cluster is not setup in default server configuration
+ //to be more specific, there is no server to fail-over to
+ ignoredSubsystems.add("ModCluster Standalone Service");
+
+ List<String> ignoredOperations = new ArrayList<String>();
+ //ignored because there is no other server to fail-over to
+ ignoredOperations.add("subsystem:force-failover");
+ //ignored because this is not a true operation, it is handled
+ //internally by a configuration property change
+ ignoredOperations.add("enable");
+ //ignored because the Osgi subsystem not configured out of box
+ ignoredOperations.add("subsystem:activate");
+
+ Resource platform = this.pluginContainer.getInventoryManager().getPlatform();
+ Resource server = getResourceByTypeAndKey(platform, StandaloneServerComponentTest.RESOURCE_TYPE,
+ StandaloneServerComponentTest.RESOURCE_KEY);
+
+ executeNoArgOperations(server, ignoredSubsystems, ignoredOperations);
+ }
@Test(priority = 12)
public void loadUpdateResourceConfiguration() throws Exception {
@@ -89,6 +108,13 @@ public class ResourcesStandaloneServerTest extends AbstractJBossAS7PluginTest {
ignoredResources.add("Memory Pool");
ignoredResources.add("Periodic Rotating File Handler");
+ //created BZ 1059882 for failures related to:
+ // attribute discovery-group-name (mutually exclusive issue?)
+ ignoredResources.add("Pooled Connection Factory");
+ ignoredResources.add("Connection Factory");
+ // attribute static-connectors (nullable list issue?)
+ ignoredResources.add("Cluster Connection");
+
Resource platform = this.pluginContainer.getInventoryManager().getPlatform();
Resource server = getResourceByTypeAndKey(platform, StandaloneServerComponentTest.RESOURCE_TYPE,
StandaloneServerComponentTest.RESOURCE_KEY);
@@ -97,28 +123,5 @@ public class ResourcesStandaloneServerTest extends AbstractJBossAS7PluginTest {
Assert.assertEquals(errorCount, 0);
}
- @Test(priority = 11)
- public void standaloneExecuteNoArgOperations() throws Exception {
- List<String> ignoredSubsystems = new ArrayList<String>();
-
- //ignored because mod_cluster is not setup in default server configuration
- //to be more specific, there is no server to fail-over to
- ignoredSubsystems.add("ModCluster Standalone Service");
-
- List<String> ignoredOperations = new ArrayList<String>();
- //ignored because there is no other server to fail-over to
- ignoredOperations.add("subsystem:force-failover");
- //ignored because this is not a true operation, it is handled
- //internally by a configuration property change
- ignoredOperations.add("enable");
- //ignored because the Osgi subsystem not configured out of box
- ignoredOperations.add("subsystem:activate");
-
- Resource platform = this.pluginContainer.getInventoryManager().getPlatform();
- Resource server = getResourceByTypeAndKey(platform, StandaloneServerComponentTest.RESOURCE_TYPE,
- StandaloneServerComponentTest.RESOURCE_KEY);
-
- executeNoArgOperations(server, ignoredSubsystems, ignoredOperations);
- }
}
diff --git a/modules/plugins/jboss-as-7/src/test/java/org/rhq/modules/plugins/jbossas7/itest/standalone/TemplatedResourcesTest.java b/modules/plugins/jboss-as-7/src/test/java/org/rhq/modules/plugins/jbossas7/itest/standalone/TemplatedResourcesTest.java
index 908faff..cbd5ef8 100644
--- a/modules/plugins/jboss-as-7/src/test/java/org/rhq/modules/plugins/jbossas7/itest/standalone/TemplatedResourcesTest.java
+++ b/modules/plugins/jboss-as-7/src/test/java/org/rhq/modules/plugins/jbossas7/itest/standalone/TemplatedResourcesTest.java
@@ -58,7 +58,7 @@ public class TemplatedResourcesTest extends AbstractJBossAS7PluginTest {
assertNotNull(platform);
assertEquals(platform.getInventoryStatus(), InventoryStatus.COMMITTED);
- Thread.sleep(40 * 1000L);
+ //Thread.sleep(40 * 1000L);
}
@Test(priority = 11)
@@ -73,7 +73,7 @@ public class TemplatedResourcesTest extends AbstractJBossAS7PluginTest {
StandaloneServerComponentTest.RESOURCE_KEY);
inventoryManager.activateResource(server, platformContainer, false);
- Thread.sleep(30 * 1000L);
+ //Thread.sleep(30 * 1000L);
for (ResourceData resourceData : testResourceData) {
ResourceType resourceType = new ResourceType(resourceData.resourceTypeName, PLUGIN_NAME,
@@ -100,7 +100,7 @@ public class TemplatedResourcesTest extends AbstractJBossAS7PluginTest {
for (Resource resourceUnderTest : foundResources) {
log.info(foundResources);
- assert resourceUnderTest.getId() > 0 : "Resource not properly initialized. Id = 0. Try extending sleep after discovery.";
+ assert resourceUnderTest.getId() != 0 : "Resource not properly initialized. Id = 0. Try extending sleep after discovery.";
Configuration resourceUnderTestConfig = configurationManager
.loadResourceConfiguration(resourceUnderTest.getId());
10 years, 3 months