modules/core/client-api/src/main/java/org/rhq/core/clientapi/server/core/PingRequest.java | 19 - modules/core/dbutils/pom.xml | 2 modules/core/plugin-container/src/main/java/org/rhq/core/pc/PluginContainer.java | 13 modules/enterprise/agent/src/main/java/org/rhq/enterprise/agent/AgentMain.java | 144 ++++++---- modules/enterprise/comm/src/main/java/org/rhq/enterprise/communications/command/client/ClientCommandSender.java | 16 - modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/core/AgentManagerBean.java | 8 6 files changed, 129 insertions(+), 73 deletions(-)
New commits: commit e76a89b5b670e1a40e237b933b9118e1e4b0dc1d Author: Jay Shaughnessy jshaughn@redhat.com Date: Thu Feb 2 17:34:20 2012 -0500
- Make PingRequest pass agent name as opposed to agent id, which we don't have - Make PingRequest Serializable so it can be passed between agent and server - fix schema version to be 2.119 - Fix PluginContainer.addShutdownListener to not immediately invoke the listener if the PC is down. - Leaving the updated listeners in PluginContainer because they are a little more robust, but change strategy around calling the new PC listeners. It's not really necessary as it turns out. Instead: * ping from Agent start to agent stop. This gives us clock sync at all times * ping avail update is only performed when the PC is up, but that can be determined by a quick call to the PC instance. * use polling only when we need to determine sendning mode - start polling at agent start - stop polling in ping if in sending mode, then just ping - resume polling if ping fails In other words, poll when we're not pinging. - Added the ping thread and ping runnable - Add method to determine if sender is currently polling
diff --git a/modules/core/client-api/src/main/java/org/rhq/core/clientapi/server/core/PingRequest.java b/modules/core/client-api/src/main/java/org/rhq/core/clientapi/server/core/PingRequest.java index a19fd9a..10f2f61 100644 --- a/modules/core/client-api/src/main/java/org/rhq/core/clientapi/server/core/PingRequest.java +++ b/modules/core/client-api/src/main/java/org/rhq/core/clientapi/server/core/PingRequest.java @@ -20,24 +20,29 @@
package org.rhq.core.clientapi.server.core;
+import java.io.Serializable; + /** * A simple POJO for requesting actions or data from the ping. * * @author Jay Shaughnessy */ -public class PingRequest { - private int agentId; +public class PingRequest implements Serializable { + private static final long serialVersionUID = 1L; + + private String agentName; private boolean requestUpdateAvailability; private boolean requestServerTimestamp;
private boolean replyUpdateAvailability; private Long replyServerTimestamp;
- public PingRequest(int agentId) { - this(agentId, true, true); + public PingRequest(String agentName) { + this(agentName, true, true); }
- public PingRequest(int agentId, boolean requestUpdateAvailability, boolean requestServerTimestamp) { + public PingRequest(String agentName, boolean requestUpdateAvailability, boolean requestServerTimestamp) { + this.agentName = agentName; this.requestUpdateAvailability = requestUpdateAvailability; this.requestServerTimestamp = requestServerTimestamp; } @@ -54,8 +59,8 @@ public class PingRequest { return replyUpdateAvailability; }
- public int getAgentId() { - return agentId; + public String getAgentName() { + return agentName; }
public void setReplyUpdateAvailability(boolean replyUpdateAvailability) { diff --git a/modules/core/dbutils/pom.xml b/modules/core/dbutils/pom.xml index c18ffe1..29b1f2e 100644 --- a/modules/core/dbutils/pom.xml +++ b/modules/core/dbutils/pom.xml @@ -15,7 +15,7 @@ <description>Database schema setup, upgrade and other utilities</description>
<properties> - <db.schema.version>2.118</db.schema.version> + <db.schema.version>2.119</db.schema.version> <rhq.ds.type-mapping>${rhq.test.ds.type-mapping}</rhq.ds.type-mapping> <rhq.ds.db-name>${rhq.test.ds.db-name}</rhq.ds.db-name> <rhq.ds.connection-url>${rhq.test.ds.connection-url}</rhq.ds.connection-url> diff --git a/modules/core/plugin-container/src/main/java/org/rhq/core/pc/PluginContainer.java b/modules/core/plugin-container/src/main/java/org/rhq/core/pc/PluginContainer.java index 768450b..140e3fd 100644 --- a/modules/core/plugin-container/src/main/java/org/rhq/core/pc/PluginContainer.java +++ b/modules/core/plugin-container/src/main/java/org/rhq/core/pc/PluginContainer.java @@ -75,8 +75,8 @@ import org.rhq.core.pluginapi.util.FileUtils; * {@link #setConfiguration(PluginContainerConfiguration)}. If this is not done, a default configuration will be * created.</p> * - * @author Greg Hinkle * @author John Mazzitelli + * @author Greg Hinkle */ public class PluginContainer implements ContainerService { private static final PluginContainer INSTANCE = new PluginContainer(); @@ -661,6 +661,7 @@ public class PluginContainer implements ContainerService { /** * Add the callback listener to notify when the plugin container is initialized. If this method is invoked and * the PC is already initialized, then <code>listener</code> will be invoked immediately. + * * @param name associated with the listener * @param listener The callback object to notify. If a listener with the supplied name is registered, it * will be replaced with the newly supplied listner. @@ -676,8 +677,10 @@ public class PluginContainer implements ContainerService { }
/** - * Add the callback listener to notify when the plugin container is shutdown. If this method is invoked and - * the PC is already shutdown, then <code>listener</code> will be invoked immediately. + * Add the callback listener to notify when the plugin container is shutdown. Unlike + * {@link #addInitializationListener(String, InitializationListener)} the <code>listener</code> will + * not be invoked immediately if the PC is already shutdown. It will only be invoked on future shutdowns. + * * @param name associated with the listener * @param listener The callback object to notify. If a listener with the supplied name is registered, it * will be replaced with the newly supplied listner. @@ -685,10 +688,6 @@ public class PluginContainer implements ContainerService { public void addShutdownListener(String name, ShutdownListener listener) { synchronized (shutdownListenersLock) { shutdownListeners.put(name, listener); - - if (!started) { - listener.shutdown(); - } } }
diff --git a/modules/enterprise/agent/src/main/java/org/rhq/enterprise/agent/AgentMain.java b/modules/enterprise/agent/src/main/java/org/rhq/enterprise/agent/AgentMain.java index 928e38e..8536de2 100644 --- a/modules/enterprise/agent/src/main/java/org/rhq/enterprise/agent/AgentMain.java +++ b/modules/enterprise/agent/src/main/java/org/rhq/enterprise/agent/AgentMain.java @@ -48,6 +48,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -81,6 +82,7 @@ import org.rhq.core.clientapi.server.core.AgentVersion; import org.rhq.core.clientapi.server.core.ConnectAgentRequest; import org.rhq.core.clientapi.server.core.ConnectAgentResults; import org.rhq.core.clientapi.server.core.CoreServerService; +import org.rhq.core.clientapi.server.core.PingRequest; import org.rhq.core.clientapi.server.discovery.DiscoveryServerService; import org.rhq.core.clientapi.server.drift.DriftServerService; import org.rhq.core.clientapi.server.event.EventServerService; @@ -95,6 +97,7 @@ import org.rhq.core.pc.RebootRequestListener; import org.rhq.core.pc.ServerServices; import org.rhq.core.pc.inventory.InventoryManager; import org.rhq.core.pc.plugin.FileSystemPluginFinder; +import org.rhq.core.pc.util.LoggingThreadFactory; import org.rhq.core.system.SystemInfoFactory; import org.rhq.core.util.ObjectNameFactory; import org.rhq.core.util.exception.ThrowableUtil; @@ -150,11 +153,7 @@ import org.rhq.enterprise.communications.command.client.ClientRemotePojoFactory; import org.rhq.enterprise.communications.command.client.CommandPreprocessor; import org.rhq.enterprise.communications.command.client.JBossRemotingRemoteCommunicator; import org.rhq.enterprise.communications.command.client.OutgoingCommandTrace; -import org.rhq.enterprise.communications.command.client.PollingListener; import org.rhq.enterprise.communications.command.client.RemoteCommunicator; -import org.rhq.enterprise.communications.command.impl.identify.Identification; -import org.rhq.enterprise.communications.command.impl.identify.IdentifyCommand; -import org.rhq.enterprise.communications.command.impl.identify.IdentifyCommandResponse; import org.rhq.enterprise.communications.command.impl.remotepojo.RemotePojoInvocationCommand; import org.rhq.enterprise.communications.command.server.CommandListener; import org.rhq.enterprise.communications.command.server.IncomingCommandTrace; @@ -190,6 +189,11 @@ public class AgentMain { */ private static final String JAVA_UTIL_LOGGING_PROPERTIES_RESOURCE_PATH = "java.util.logging.properties";
+ // Ensure only one instance of the ping job runs by using a pool size of 1 + private static final String PING_THREAD_POOL_NAME = "RHQ Agent Ping Thread"; + private static final int PING_THREAD_POOL_CORE_POOL_SIZE = 1; + private static final long PING_INTERVAL_MINIMUM = 60000L; + static final String PROMPT_INPUT_THREAD_NAME = "RHQ Agent Prompt Input Thread";
/** @@ -373,9 +377,14 @@ public class AgentMain { private boolean m_disableNativeSystem;
/** + * Thread used to repeatedly ping the server for connectivity, agent avail update, and clock sync + */ + private ScheduledThreadPoolExecutor m_pingThreadPoolExecutor; + + /** * Tracks whether we already logged a warning to let the user know SIGAR support isn't available. */ - private boolean loggedNativeSystemInfoUnavailableWarning; + private boolean m_loggedNativeSystemInfoUnavailableWarning;
/** * The main method that starts the whole thing. @@ -488,6 +497,7 @@ public class AgentMain { m_serverFailoverList = null; m_primaryServerSwitchoverThread = null; m_vmHealthCheckThread = null; + m_pingThreadPoolExecutor = null;
if (args == null) { args = new String[0]; @@ -656,6 +666,15 @@ public class AgentMain { // now that our plugin container has been initialized, it can begin to receive incoming commands latch.allowAllCommands(m_commServices);
+ // Now that we are allowing commands to be passed back and forth with the server and + // the PC is likely up, start our Ping service + m_pingThreadPoolExecutor = new ScheduledThreadPoolExecutor(PING_THREAD_POOL_CORE_POOL_SIZE, + new LoggingThreadFactory(PING_THREAD_POOL_NAME, true)); + long pingInterval = m_configuration.getClientSenderServerPollingInterval(); + pingInterval = (pingInterval < PING_INTERVAL_MINIMUM) ? PING_INTERVAL_MINIMUM : pingInterval; + m_pingThreadPoolExecutor.scheduleWithFixedDelay(new PingExecutor(), 0L, pingInterval, + TimeUnit.MILLISECONDS); + // prepare our shutdown hook m_shutdownHook = new AgentShutdownHook(this); Runtime.getRuntime().addShutdownHook(m_shutdownHook); @@ -710,6 +729,18 @@ public class AgentMain { // We want to keep going to ensure we attempt to try to shutdown everything.
/////// + // stop the thread that pings the server + try { + if (null != m_pingThreadPoolExecutor) { + m_pingThreadPoolExecutor.shutdownNow(); + m_pingThreadPoolExecutor = null; + } + } catch (Throwable ignore) { + LOG.warn(AgentI18NResourceKeys.FAILED_TO_SHUTDOWN_COMPONENT, "Server Ping Thread", + ThrowableUtil.getAllMessages(ignore)); + } + + /////// // stop the thread that checks the VM health try { if (m_vmHealthCheckThread != null) { @@ -2233,20 +2264,20 @@ public class AgentMain { SystemInfoFactory.disableNativeSystemInfo(); LOG.info(AgentI18NResourceKeys.NATIVE_SYSTEM_DISABLED); } - this.loggedNativeSystemInfoUnavailableWarning = false; + this.m_loggedNativeSystemInfoUnavailableWarning = false; } else { if (!SystemInfoFactory.isNativeSystemInfoAvailable()) { - if (!this.loggedNativeSystemInfoUnavailableWarning) { + if (!this.m_loggedNativeSystemInfoUnavailableWarning) { Throwable t = SystemInfoFactory.getNativeLibraryLoadThrowable(); if (LOG.isDebugEnabled()) { LOG.debug(AgentI18NResourceKeys.NATIVE_SYSINFO_UNAVAILABLE_DEBUG, t); } else { LOG.warn(AgentI18NResourceKeys.NATIVE_SYSINFO_UNAVAILABLE); } - this.loggedNativeSystemInfoUnavailableWarning = true; + this.m_loggedNativeSystemInfoUnavailableWarning = true; } } else { - this.loggedNativeSystemInfoUnavailableWarning = false; + this.m_loggedNativeSystemInfoUnavailableWarning = false; } }
@@ -2287,32 +2318,21 @@ public class AgentMain { // initialize and start the server-side services so we can process incoming commands m_commServices.start(m_configuration.getPreferences(), m_configuration.getClientCommandSenderConfiguration());
- // TODO: I think this can removed altogether. We don't want to do polling at the comm layer - // for server detection because we are now pinging the server at a higher level. If that ping (or any - // server service fails) the sender status will be set to down. The higher level ping is now used - // for agent avail, clock sync, and server status checking. - // prime the sender so it can be prepared to start sending messages. - // if auto-discovery is enabled, then the auto-discovery listener will tell the sender when its OK to start sending. - // if polling is enabled, then we start polling now - the poller will tell the sender when its OK to start sending. - // if both auto-discovery and polling is enabled, at least one of them will tell the sender when its OK to start sending. - // if neither is enabled, we have to blindly tell the sender that its OK to start sending now. - //if (m_configuration.getClientSenderServerPollingInterval() <= 0) { - // if (m_autoDiscoveryListener == null) { - // LOG.info(AgentI18NResourceKeys.NO_AUTO_DETECT); - // m_clientSender.startSending(); - // } - //} else { - // m_clientSender.startServerPolling(); - // - // // must do this after we start polling, otherwise, the listener is never really added - // ClockCheckPollingListener clockCheckPollingListener = new ClockCheckPollingListener(); - // m_clientSender.addPollingListener(clockCheckPollingListener); - //} + // if auto-discovery is enabled, then the auto-discovery listener will tell the sender when its OK to start + // sending. Otherwise start polling and let the poller tell the sender when it is ok to start sending. + if (!isAutoDiscoveryEnabled()) { + LOG.info(AgentI18NResourceKeys.NO_AUTO_DETECT); + m_clientSender.startServerPolling(); + }
return; }
+ private boolean isAutoDiscoveryEnabled() { + return m_autoDiscoveryListener != null; + } + /** * This will prepare the auto-discovery listener, if server auto-detection is enabled. * @@ -3418,24 +3438,6 @@ public class AgentMain { }
/** - * Listener that is told about the results of all server polls (if polling is enabled). - * Because we know the poll thread uses the {@link IdentifyCommand}, this listener will - * simply use the results of that command to track our synchronicity with the server clock. - */ - private class ClockCheckPollingListener implements PollingListener { - public void pollResponse(CommandResponse response) { - if (response instanceof IdentifyCommandResponse && response.isSuccessful()) { - IdentifyCommandResponse id_response = (IdentifyCommandResponse) response; - Identification id = id_response.getIdentification(); - if (id != null) { - AgentMain.this.serverClockNotification(id.getTimestamp()); - } - } - return; - } - } - - /** * Listener that will register the agent as soon as the sender has started (which means we should be able to connect * to the RHQ Server and send the register command). */ @@ -3644,4 +3646,50 @@ public class AgentMain { return this.serverEndpoint + "@" + new Date(this.timestamp); } } + + private class PingExecutor implements Runnable { + + @Override + public void run() { + try { + // if we can't send to the server ignore the ping + if (!m_clientSender.isSending()) { + // An unlikely state, but if we're not sending, not polling and not performing autoDiscovery + // (multicast), then start polling to we eventually get out of this state. + if (!(m_clientSender.isServerPolling() || isAutoDiscoveryEnabled())) { + LOG.info("Starting polling to determine sender status"); + m_clientSender.startServerPolling(); + } + + return; + } + + // we are in sending mode, so make sure the poller is off + if (m_clientSender.isServerPolling()) { + LOG.info("Stopping polling and resuming pinging"); + m_clientSender.stopServerPolling(); + } + + boolean updateAvail = PluginContainer.getInstance().isStarted(); + PingRequest request = new PingRequest(getConfiguration().getAgentName(), updateAvail, true); + + ClientRemotePojoFactory factory = m_clientSender.getClientRemotePojoFactory(); + CoreServerService server = factory.getRemotePojo(CoreServerService.class); + request = server.ping(request); + + // take this opportunity to check the agent-server clock sync + serverClockNotification(request.getReplyServerTimestamp()); + + } catch (Throwable t) { + // If the ping fails, typically do to a CannotConnectException, and we're not using autodiscovery, + // then start the poller to have sending mode re-established when the connection resumes. + if (!(m_clientSender.isServerPolling() || isAutoDiscoveryEnabled())) { + LOG.info("Starting polling to determine sender status", t); + m_clientSender.startServerPolling(); + } else { + LOG.warn("Server Ping failed", t); + } + } + } + } } diff --git a/modules/enterprise/comm/src/main/java/org/rhq/enterprise/communications/command/client/ClientCommandSender.java b/modules/enterprise/comm/src/main/java/org/rhq/enterprise/communications/command/client/ClientCommandSender.java index ed9f20f..745f755 100644 --- a/modules/enterprise/comm/src/main/java/org/rhq/enterprise/communications/command/client/ClientCommandSender.java +++ b/modules/enterprise/comm/src/main/java/org/rhq/enterprise/communications/command/client/ClientCommandSender.java @@ -228,8 +228,8 @@ public class ClientCommandSender { public ClientCommandSender(RemoteCommunicator remote_communicator, ClientCommandSenderConfiguration config) throws IllegalArgumentException { if (remote_communicator == null) { - throw new IllegalArgumentException(LOG - .getMsgString(CommI18NResourceKeys.CLIENT_COMMAND_SENDER_NULL_REMOTE_COMM)); + throw new IllegalArgumentException( + LOG.getMsgString(CommI18NResourceKeys.CLIENT_COMMAND_SENDER_NULL_REMOTE_COMM)); }
if (config == null) { @@ -384,8 +384,8 @@ public class ClientCommandSender { */ public void setRemoteCommunicator(RemoteCommunicator remote_communicator) throws IllegalArgumentException { if (remote_communicator == null) { - throw new IllegalArgumentException(LOG - .getMsgString(CommI18NResourceKeys.CLIENT_COMMAND_SENDER_NULL_REMOTE_COMM)); + throw new IllegalArgumentException( + LOG.getMsgString(CommI18NResourceKeys.CLIENT_COMMAND_SENDER_NULL_REMOTE_COMM)); }
// we don't need to synchronize on this. As per JLS, section 17.7: @@ -768,6 +768,10 @@ public class ClientCommandSender { return; }
+ public boolean isServerPolling() { + return (null != m_serverPollingThread); + } + public void addPollingListener(PollingListener listener) { ServerPollingThread thread = m_serverPollingThread; if (thread != null) { @@ -1159,8 +1163,8 @@ public class ClientCommandSender { sendAsynch(cnc.getCommand(), cnc.getCallback()); } } catch (Exception e) { - LOG.error(e, CommI18NResourceKeys.CLIENT_COMMAND_SENDER_RETRY_FAILURE, m_remoteCommunicator, cnc - .getCommand()); + LOG.error(e, CommI18NResourceKeys.CLIENT_COMMAND_SENDER_RETRY_FAILURE, m_remoteCommunicator, + cnc.getCommand()); } finally { m_shuttingDownTasksLock.readLock().unlock(); } diff --git a/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/core/AgentManagerBean.java b/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/core/AgentManagerBean.java index 5dc7b93..8cfaf4e 100644 --- a/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/core/AgentManagerBean.java +++ b/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/core/AgentManagerBean.java @@ -561,7 +561,7 @@ public class AgentManagerBean implements AgentManagerLocal { long now = System.currentTimeMillis();
if (request.isRequestUpdateAvailability()) { - updateLastAvailabilityPing(request.getAgentId(), now); + updateLastAvailabilityPing(request.getAgentName(), now); request.setReplyUpdateAvailability(true); }
@@ -572,7 +572,7 @@ public class AgentManagerBean implements AgentManagerLocal { return request; }
- private void updateLastAvailabilityPing(int agentId, long now) { + private void updateLastAvailabilityPing(String agentName, long now) { /* * since we already know we have to update the agent row with the last avail ping time, might as well * set the backfilled to false here (as opposed to called agentManager.setBackfilled(agentId, false) @@ -580,11 +580,11 @@ public class AgentManagerBean implements AgentManagerLocal { String updateStatement = "" // + "UPDATE Agent " // + " SET lastAvailabilityPing = :now, backFilled = FALSE " // - + " WHERE id = :agentId "; + + " WHERE name = :agentName ";
Query query = entityManager.createQuery(updateStatement); query.setParameter("now", now); - query.setParameter("agentId", agentId); + query.setParameter("agentName", agentName);
query.executeUpdate(); }
rhq-commits@lists.fedorahosted.org