|
17 | 17 | */ |
18 | 18 | package org.apache.cassandra.gms; |
19 | 19 |
|
| 20 | +import java.io.ByteArrayInputStream; |
| 21 | +import java.io.DataInputStream; |
| 22 | +import java.io.IOException; |
20 | 23 | import java.net.UnknownHostException; |
21 | 24 | import java.util.ArrayList; |
22 | 25 | import java.util.Arrays; |
|
67 | 70 | import org.apache.cassandra.config.CassandraRelevantProperties; |
68 | 71 | import org.apache.cassandra.config.DatabaseDescriptor; |
69 | 72 | import org.apache.cassandra.db.SystemKeyspace; |
| 73 | +import org.apache.cassandra.dht.IPartitioner; |
70 | 74 | import org.apache.cassandra.dht.Token; |
71 | 75 | import org.apache.cassandra.locator.InetAddressAndPort; |
72 | 76 | import org.apache.cassandra.net.Message; |
@@ -1027,6 +1031,130 @@ else if (newState.getHeartBeatState().getHeartBeatVersion() != heartbeat) |
1027 | 1031 | }); |
1028 | 1032 | } |
1029 | 1033 |
|
| 1034 | + public void reviveEndpoint(String address) throws UnknownHostException |
| 1035 | + { |
| 1036 | + InetAddressAndPort endpoint = InetAddressAndPort.getByName(address); |
| 1037 | + EndpointState epState = endpointStateMap.get(endpoint); |
| 1038 | + logger.warn("Reviving {} via gossip", endpoint); |
| 1039 | + |
| 1040 | + if (epState == null) |
| 1041 | + throw new RuntimeException("Cannot revive endpoint " + endpoint + ": no endpoint-state"); |
| 1042 | + |
| 1043 | + int generation = epState.getHeartBeatState().getGeneration(); |
| 1044 | + int heartbeat = epState.getHeartBeatState().getHeartBeatVersion(); |
| 1045 | + |
| 1046 | + logger.info("Have endpoint-state for {}: status={}, generation={}, heartbeat={}", |
| 1047 | + endpoint, epState.getStatus(), generation, heartbeat); |
| 1048 | + |
| 1049 | + if (!isSilentShutdownState(epState)) |
| 1050 | + throw new RuntimeException("Cannot revive endpoint " + endpoint + ": not in a (silent) shutdown state: " + epState.getStatus()); |
| 1051 | + |
| 1052 | + if (FailureDetector.instance.isAlive(endpoint)) |
| 1053 | + throw new RuntimeException("Cannot revive endpoint " + endpoint + ": still alive (failure-detector)"); |
| 1054 | + |
| 1055 | + logger.info("Sleeping for {}ms to ensure {} does not change", StorageService.RING_DELAY_MILLIS, endpoint); |
| 1056 | + Uninterruptibles.sleepUninterruptibly(StorageService.RING_DELAY_MILLIS, TimeUnit.MILLISECONDS); |
| 1057 | + // make sure the endpoint state did not change |
| 1058 | + EndpointState newState = endpointStateMap.get(endpoint); |
| 1059 | + if (newState == null) |
| 1060 | + throw new RuntimeException("Cannot revive endpoint " + endpoint + ": endpoint-state disappeared"); |
| 1061 | + if (newState.getHeartBeatState().getGeneration() != generation) |
| 1062 | + throw new RuntimeException("Cannot revive endpoint " + endpoint + ": still alive, generation changed while trying to reviving it"); |
| 1063 | + if (newState.getHeartBeatState().getHeartBeatVersion() != heartbeat) |
| 1064 | + throw new RuntimeException("Cannot revive endpoint " + endpoint + ": still alive, heartbeat changed while trying to reviving it"); |
| 1065 | + |
| 1066 | + epState.updateTimestamp(); // make sure we don't evict it too soon |
| 1067 | + epState.getHeartBeatState().forceNewerGenerationUnsafe(); |
| 1068 | + |
| 1069 | + // using the tokens from the endpoint-state as that is the real source of truth |
| 1070 | + Collection<Token> tokens = getTokensFromEndpointState(epState, DatabaseDescriptor.getPartitioner()); |
| 1071 | + if (tokens == null || tokens.isEmpty()) |
| 1072 | + throw new RuntimeException("Cannot revive endpoint " + endpoint + ": no tokens from TokenMetadata"); |
| 1073 | + |
| 1074 | + epState.addApplicationState(ApplicationState.STATUS, StorageService.instance.valueFactory.normal(tokens)); |
| 1075 | + epState.addApplicationState(ApplicationState.STATUS_WITH_PORT, StorageService.instance.valueFactory.normal(tokens)); |
| 1076 | + handleMajorStateChange(endpoint, epState); |
| 1077 | + Uninterruptibles.sleepUninterruptibly(intervalInMillis * 4, TimeUnit.MILLISECONDS); |
| 1078 | + logger.warn("Finished reviving {}, status={}, generation={}, heartbeat={}", |
| 1079 | + endpoint, epState.getStatus(), generation, heartbeat); |
| 1080 | + } |
| 1081 | + |
| 1082 | + public void unsafeSetEndpointState(String address, String status) throws UnknownHostException |
| 1083 | + { |
| 1084 | + logger.warn("Forcibly changing gossip status of " + address + " to " + status); |
| 1085 | + |
| 1086 | + InetAddressAndPort endpoint = InetAddressAndPort.getByName(address); |
| 1087 | + EndpointState epState = endpointStateMap.get(endpoint); |
| 1088 | + |
| 1089 | + if (epState == null) |
| 1090 | + throw new RuntimeException("No state for endpoint " + endpoint); |
| 1091 | + |
| 1092 | + int generation = epState.getHeartBeatState().getGeneration(); |
| 1093 | + int heartbeat = epState.getHeartBeatState().getHeartBeatVersion(); |
| 1094 | + |
| 1095 | + logger.info("Have endpoint-state for {}: status={}, generation={}, heartbeat={}", |
| 1096 | + endpoint, epState.getStatus(), generation, heartbeat); |
| 1097 | + |
| 1098 | + if (FailureDetector.instance.isAlive(endpoint)) |
| 1099 | + throw new RuntimeException("Cannot update status for endpoint " + endpoint + ": still alive (failure-detector)"); |
| 1100 | + |
| 1101 | + Collection<Token> tokens = getTokensFromEndpointState(epState, DatabaseDescriptor.getPartitioner()); |
| 1102 | + |
| 1103 | + VersionedValue newStatus; |
| 1104 | + switch (status.toLowerCase()) |
| 1105 | + { |
| 1106 | + case "hibernate": |
| 1107 | + newStatus = StorageService.instance.valueFactory.hibernate(true); |
| 1108 | + break; |
| 1109 | + case "normal": |
| 1110 | + newStatus = StorageService.instance.valueFactory.normal(tokens); |
| 1111 | + break; |
| 1112 | + case "left": |
| 1113 | + newStatus = StorageService.instance.valueFactory.left(tokens, computeExpireTime()); |
| 1114 | + break; |
| 1115 | + case "shutdown": |
| 1116 | + newStatus = StorageService.instance.valueFactory.shutdown(true); |
| 1117 | + break; |
| 1118 | + default: |
| 1119 | + throw new IllegalArgumentException("Unknown status '" + status + '\''); |
| 1120 | + } |
| 1121 | + |
| 1122 | + epState.updateTimestamp(); // make sure we don't evict it too soon |
| 1123 | + epState.getHeartBeatState().forceNewerGenerationUnsafe(); |
| 1124 | + |
| 1125 | + epState.addApplicationState(ApplicationState.STATUS, newStatus); |
| 1126 | + epState.addApplicationState(ApplicationState.STATUS_WITH_PORT, newStatus); |
| 1127 | + |
| 1128 | + handleMajorStateChange(endpoint, epState); |
| 1129 | + |
| 1130 | + logger.warn("Forcibly changed gossip status of " + endpoint + " to " + newStatus); |
| 1131 | + } |
| 1132 | + |
| 1133 | + public Collection<Token> getTokensFor(InetAddressAndPort endpoint, IPartitioner partitioner) |
| 1134 | + { |
| 1135 | + EndpointState state = getEndpointStateForEndpoint(endpoint); |
| 1136 | + if (state == null) |
| 1137 | + return Collections.emptyList(); |
| 1138 | + |
| 1139 | + return getTokensFromEndpointState(state, partitioner); |
| 1140 | + } |
| 1141 | + |
| 1142 | + private Collection<Token> getTokensFromEndpointState(EndpointState state, IPartitioner partitioner) |
| 1143 | + { |
| 1144 | + try |
| 1145 | + { |
| 1146 | + VersionedValue versionedValue = state.getApplicationState(ApplicationState.TOKENS); |
| 1147 | + if (versionedValue == null) |
| 1148 | + return Collections.emptyList(); |
| 1149 | + |
| 1150 | + return TokenSerializer.deserialize(partitioner, new DataInputStream(new ByteArrayInputStream(versionedValue.toBytes()))); |
| 1151 | + } |
| 1152 | + catch (IOException e) |
| 1153 | + { |
| 1154 | + throw new RuntimeException(e); |
| 1155 | + } |
| 1156 | + } |
| 1157 | + |
1030 | 1158 | public int getCurrentGenerationNumber(InetAddressAndPort endpoint) |
1031 | 1159 | { |
1032 | 1160 | return endpointStateMap.get(endpoint).getHeartBeatState().getGeneration(); |
|
0 commit comments