@@ -210,7 +210,7 @@ private void runDirectAgentScanTimerTask() {
210210 scanDirectAgentToLoad ();
211211 }
212212
213- private void scanDirectAgentToLoad () {
213+ protected void scanDirectAgentToLoad () {
214214 logger .trace ("Begin scanning directly connected hosts" );
215215
216216 // for agents that are self-managed, threshold to be considered as disconnected after pingtimeout
@@ -231,11 +231,21 @@ private void scanDirectAgentToLoad() {
231231 logger .info ("{} is detected down, but we have a forward attache running, disconnect this one before launching the host" , host );
232232 removeAgent (agentattache , Status .Disconnected );
233233 } else {
234- continue ;
234+ logger .debug ("Host {} status is {} but has an AgentAttache which is not forForward, try to load directly" , host , host .getStatus ());
235+ Status hostStatus = investigate (agentattache );
236+ if (Status .Up == hostStatus ) {
237+ /* Got ping response from host, bring it back */
238+ logger .info ("After investigation, Agent for host {} is determined to be up and running" , host );
239+ agentStatusTransitTo (host , Event .Ping , _nodeId );
240+ } else {
241+ logger .debug ("After investigation, AgentAttache is not null but host status is {}, try to load directly {}" , hostStatus , host );
242+ loadDirectlyConnectedHost (host , false );
243+ }
235244 }
245+ } else {
246+ logger .debug ("AgentAttache is null, loading directly connected {}" , host );
247+ loadDirectlyConnectedHost (host , false );
236248 }
237- logger .debug ("Loading directly connected {}" , host );
238- loadDirectlyConnectedHost (host , false );
239249 } catch (final Throwable e ) {
240250 logger .warn (" can not load directly connected {} due to " , host , e );
241251 }
@@ -381,20 +391,20 @@ public void reconnect(final long hostId) throws CloudRuntimeException, AgentUnav
381391 return ;
382392 }
383393 if (!result ) {
384- throw new CloudRuntimeException ("Failed to propagate agent change request event:" + Event . ShutdownRequested + " to host:" + hostId );
394+ throw new CloudRuntimeException (String . format ( "Failed to propagate agent change request event: %s to host: %s" , Event . ShutdownRequested , hostId ) );
385395 }
386396 }
387397
388398 public void notifyNodesInCluster (final AgentAttache attache ) {
389399 logger .debug ("Notifying other nodes of to disconnect" );
390- final Command [] cmds = new Command [] {new ChangeAgentCommand (attache .getId (), Event .AgentDisconnected )};
400+ final Command [] cmds = new Command []{new ChangeAgentCommand (attache .getId (), Event .AgentDisconnected )};
391401 _clusterMgr .broadcast (attache .getId (), _gson .toJson (cmds ));
392402 }
393403
394404 // notifies MS peers to schedule a host scan task immediately, triggered during addHost operation
395405 public void notifyNodesInClusterToScheduleHostScanTask () {
396406 logger .debug ("Notifying other MS nodes to run host scan task" );
397- final Command [] cmds = new Command [] {new ScheduleHostScanTaskCommand ()};
407+ final Command [] cmds = new Command []{new ScheduleHostScanTaskCommand ()};
398408 _clusterMgr .broadcast (0 , _gson .toJson (cmds ));
399409 }
400410
@@ -435,7 +445,7 @@ public boolean routeToPeer(final String peer, final byte[] bytes) {
435445 }
436446 try {
437447 logD (bytes , "Routing to peer" );
438- Link .write (ch , new ByteBuffer [] {ByteBuffer .wrap (bytes )}, sslEngine );
448+ Link .write (ch , new ByteBuffer []{ByteBuffer .wrap (bytes )}, sslEngine );
439449 return true ;
440450 } catch (final IOException e ) {
441451 try {
@@ -954,7 +964,7 @@ protected void runInContext() {
954964 if (!_agentToTransferIds .isEmpty ()) {
955965 logger .debug ("Found {} agents to transfer" , _agentToTransferIds .size ());
956966 // for (Long hostId : _agentToTransferIds) {
957- for (final Iterator <Long > iterator = _agentToTransferIds .iterator (); iterator .hasNext ();) {
967+ for (final Iterator <Long > iterator = _agentToTransferIds .iterator (); iterator .hasNext (); ) {
958968 final Long hostId = iterator .next ();
959969 final AgentAttache attache = findAttache (hostId );
960970
@@ -1095,7 +1105,7 @@ protected void finishRebalance(final long hostId, final long futureOwnerId, fina
10951105 return ;
10961106 }
10971107
1098- final ClusteredAgentAttache forwardAttache = (ClusteredAgentAttache )attache ;
1108+ final ClusteredAgentAttache forwardAttache = (ClusteredAgentAttache ) attache ;
10991109
11001110 if (success ) {
11011111
@@ -1146,10 +1156,10 @@ protected boolean startRebalance(final long hostId) {
11461156 }
11471157
11481158 synchronized (_agents ) {
1149- final ClusteredDirectAgentAttache attache = (ClusteredDirectAgentAttache )_agents .get (hostId );
1159+ final ClusteredDirectAgentAttache attache = (ClusteredDirectAgentAttache ) _agents .get (hostId );
11501160 if (attache != null && attache .getQueueSize () == 0 && attache .getNonRecurringListenersSize () == 0 ) {
11511161 handleDisconnectWithoutInvestigation (attache , Event .StartAgentRebalance , true , true );
1152- final ClusteredAgentAttache forwardAttache = (ClusteredAgentAttache )createAttache (host );
1162+ final ClusteredAgentAttache forwardAttache = (ClusteredAgentAttache ) createAttache (host );
11531163 if (forwardAttache == null ) {
11541164 logger .warn ("Unable to create a forward attache for the host {} as a part of rebalance process" , host );
11551165 return false ;
@@ -1253,7 +1263,7 @@ public String dispatch(final ClusterServicePdu pdu) {
12531263 }
12541264
12551265 if (cmds .length == 1 && cmds [0 ] instanceof ChangeAgentCommand ) { // intercepted
1256- final ChangeAgentCommand cmd = (ChangeAgentCommand )cmds [0 ];
1266+ final ChangeAgentCommand cmd = (ChangeAgentCommand ) cmds [0 ];
12571267
12581268 logger .debug ("Intercepting command for agent change: agent {} event: {}" , cmd .getAgentId (), cmd .getEvent ());
12591269 boolean result ;
@@ -1270,7 +1280,7 @@ public String dispatch(final ClusterServicePdu pdu) {
12701280 answers [0 ] = new ChangeAgentAnswer (cmd , result );
12711281 return _gson .toJson (answers );
12721282 } else if (cmds .length == 1 && cmds [0 ] instanceof TransferAgentCommand ) {
1273- final TransferAgentCommand cmd = (TransferAgentCommand )cmds [0 ];
1283+ final TransferAgentCommand cmd = (TransferAgentCommand ) cmds [0 ];
12741284
12751285 logger .debug ("Intercepting command for agent rebalancing: agent: {}, event: {}, connection transfer: {}" , cmd .getAgentId (), cmd .getEvent (), cmd .isConnectionTransfer ());
12761286 boolean result ;
@@ -1289,7 +1299,7 @@ public String dispatch(final ClusterServicePdu pdu) {
12891299 answers [0 ] = new Answer (cmd , result , null );
12901300 return _gson .toJson (answers );
12911301 } else if (cmds .length == 1 && cmds [0 ] instanceof PropagateResourceEventCommand ) {
1292- final PropagateResourceEventCommand cmd = (PropagateResourceEventCommand )cmds [0 ];
1302+ final PropagateResourceEventCommand cmd = (PropagateResourceEventCommand ) cmds [0 ];
12931303
12941304 logger .debug ("Intercepting command to propagate event {} for host {} ({})" , () -> cmd .getEvent ().name (), cmd ::getHostId , () -> _hostDao .findById (cmd .getHostId ()));
12951305
@@ -1306,10 +1316,10 @@ public String dispatch(final ClusterServicePdu pdu) {
13061316 answers [0 ] = new Answer (cmd , result , null );
13071317 return _gson .toJson (answers );
13081318 } else if (cmds .length == 1 && cmds [0 ] instanceof ScheduleHostScanTaskCommand ) {
1309- final ScheduleHostScanTaskCommand cmd = (ScheduleHostScanTaskCommand )cmds [0 ];
1319+ final ScheduleHostScanTaskCommand cmd = (ScheduleHostScanTaskCommand ) cmds [0 ];
13101320 return handleScheduleHostScanTaskCommand (cmd );
13111321 } else if (cmds .length == 1 && cmds [0 ] instanceof BaseShutdownManagementServerHostCommand ) {
1312- final BaseShutdownManagementServerHostCommand cmd = (BaseShutdownManagementServerHostCommand )cmds [0 ];
1322+ final BaseShutdownManagementServerHostCommand cmd = (BaseShutdownManagementServerHostCommand ) cmds [0 ];
13131323 return handleShutdownManagementServerHostCommand (cmd );
13141324 }
13151325
@@ -1362,7 +1372,7 @@ private String handleShutdownManagementServerHostCommand(BaseShutdownManagementS
13621372 try {
13631373 managementServerMaintenanceManager .prepareForShutdown ();
13641374 return "Successfully prepared for shutdown" ;
1365- } catch (CloudRuntimeException e ) {
1375+ } catch (CloudRuntimeException e ) {
13661376 return e .getMessage ();
13671377 }
13681378 }
@@ -1371,7 +1381,7 @@ private String handleShutdownManagementServerHostCommand(BaseShutdownManagementS
13711381 try {
13721382 managementServerMaintenanceManager .triggerShutdown ();
13731383 return "Successfully triggered shutdown" ;
1374- } catch (CloudRuntimeException e ) {
1384+ } catch (CloudRuntimeException e ) {
13751385 return e .getMessage ();
13761386 }
13771387 }
@@ -1380,7 +1390,7 @@ private String handleShutdownManagementServerHostCommand(BaseShutdownManagementS
13801390 try {
13811391 managementServerMaintenanceManager .cancelShutdown ();
13821392 return "Successfully cancelled shutdown" ;
1383- } catch (CloudRuntimeException e ) {
1393+ } catch (CloudRuntimeException e ) {
13841394 return e .getMessage ();
13851395 }
13861396 }
0 commit comments