Skip to content

Commit 7910819

Browse files
author
李金松
committed
Implemented read-write separation based on JedisSentineled
1 parent affb536 commit 7910819

File tree

7 files changed

+346
-22
lines changed

7 files changed

+346
-22
lines changed

pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,12 @@
208208
<version>${resilience4j.version}</version>
209209
<optional>true</optional>
210210
</dependency>
211+
<dependency>
212+
<groupId>org.powermock</groupId>
213+
<artifactId>powermock-module-junit4</artifactId>
214+
<version>2.0.2</version>
215+
<scope>test</scope>
216+
</dependency>
211217
<dependency>
212218
<groupId>io.github.resilience4j</groupId>
213219
<artifactId>resilience4j-circuitbreaker</artifactId>

src/main/java/redis/clients/jedis/DefaultJedisClientConfig.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ public final class DefaultJedisClientConfig implements JedisClientConfig {
3333

3434
private final AuthXManager authXManager;
3535

36+
private final boolean fallbackToMaster;
37+
3638
private DefaultJedisClientConfig(DefaultJedisClientConfig.Builder builder) {
3739
this.redisProtocol = builder.redisProtocol;
3840
this.connectionTimeoutMillis = builder.connectionTimeoutMillis;
@@ -50,6 +52,7 @@ private DefaultJedisClientConfig(DefaultJedisClientConfig.Builder builder) {
5052
this.clientSetInfoConfig = builder.clientSetInfoConfig;
5153
this.readOnlyForRedisClusterReplicas = builder.readOnlyForRedisClusterReplicas;
5254
this.authXManager = builder.authXManager;
55+
this.fallbackToMaster = builder.fallbackToMaster;
5356
}
5457

5558
@Override
@@ -143,6 +146,11 @@ public boolean isReadOnlyForRedisClusterReplicas() {
143146
return readOnlyForRedisClusterReplicas;
144147
}
145148

149+
@Override
150+
public boolean isFallbackToMaster() {
151+
return fallbackToMaster;
152+
}
153+
146154
public static Builder builder() {
147155
return new Builder();
148156
}
@@ -175,6 +183,8 @@ public static class Builder {
175183

176184
private AuthXManager authXManager = null;
177185

186+
private boolean fallbackToMaster = true;
187+
178188
private Builder() {
179189
}
180190

@@ -297,6 +307,11 @@ public Builder authXManager(AuthXManager authXManager) {
297307
return this;
298308
}
299309

310+
public Builder fallbackToMaster(boolean fallbackToMaster) {
311+
this.fallbackToMaster = fallbackToMaster;
312+
return this;
313+
}
314+
300315
public Builder from(JedisClientConfig instance) {
301316
this.redisProtocol = instance.getRedisProtocol();
302317
this.connectionTimeoutMillis = instance.getConnectionTimeoutMillis();
@@ -314,6 +329,7 @@ public Builder from(JedisClientConfig instance) {
314329
this.clientSetInfoConfig = instance.getClientSetInfoConfig();
315330
this.readOnlyForRedisClusterReplicas = instance.isReadOnlyForRedisClusterReplicas();
316331
this.authXManager = instance.getAuthXManager();
332+
this.fallbackToMaster = instance.isFallbackToMaster();
317333
return this;
318334
}
319335
}
@@ -376,6 +392,8 @@ public static DefaultJedisClientConfig copyConfig(JedisClientConfig copy) {
376392

377393
builder.authXManager(copy.getAuthXManager());
378394

395+
builder.fallbackToMaster(copy.isFallbackToMaster());
396+
379397
return builder.build();
380398
}
381399
}

src/main/java/redis/clients/jedis/JedisClientConfig.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import javax.net.ssl.SSLSocketFactory;
77

88
import redis.clients.jedis.authentication.AuthXManager;
9+
import redis.clients.jedis.util.Commands;
910

1011
public interface JedisClientConfig {
1112

@@ -115,4 +116,21 @@ default boolean isReadOnlyForRedisClusterReplicas() {
115116
default ClientSetInfoConfig getClientSetInfoConfig() {
116117
return ClientSetInfoConfig.DEFAULT;
117118
}
119+
120+
/**
121+
* fallback when no replicas are healthy, default to master
122+
* @return {@code true} - to execute command by master. {@code false} - throw exception.
123+
*/
124+
default boolean isFallbackToMaster() {
125+
return true;
126+
}
127+
128+
/**
129+
* check a Command is READONLY
130+
* @param args
131+
* @return
132+
*/
133+
default boolean isReadCommand(CommandArguments args) {
134+
return Commands.ReadOnlyCommands.contains(args.getCommand());
135+
}
118136
}

src/main/java/redis/clients/jedis/providers/SentineledConnectionProvider.java

Lines changed: 198 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import java.util.ArrayList;
44
import java.util.Collection;
55
import java.util.List;
6+
import java.util.Map;
67
import java.util.Set;
78
import java.util.concurrent.atomic.AtomicBoolean;
89
import java.util.concurrent.locks.Lock;
@@ -26,6 +27,15 @@
2627
import redis.clients.jedis.util.IOUtils;
2728

2829
public class SentineledConnectionProvider implements ConnectionProvider {
30+
class PoolInfo {
31+
public String host;
32+
public ConnectionPool pool;
33+
34+
public PoolInfo(String host, ConnectionPool pool) {
35+
this.host = host;
36+
this.pool = pool;
37+
}
38+
}
2939

3040
private static final Logger LOG = LoggerFactory.getLogger(SentineledConnectionProvider.class);
3141

@@ -51,6 +61,10 @@ public class SentineledConnectionProvider implements ConnectionProvider {
5161

5262
private final Lock initPoolLock = new ReentrantLock(true);
5363

64+
private final List<PoolInfo> slavePools = new ArrayList<>();
65+
66+
private int poolIndex;
67+
5468
public SentineledConnectionProvider(String masterName, final JedisClientConfig masterClientConfig,
5569
Set<HostAndPort> sentinels, final JedisClientConfig sentinelClientConfig) {
5670
this(masterName, masterClientConfig, null, null, sentinels, sentinelClientConfig);
@@ -102,13 +116,52 @@ public SentineledConnectionProvider(String masterName, final JedisClientConfig m
102116
initMaster(master);
103117
}
104118

119+
private Connection getSlaveResource() {
120+
int startIdx;
121+
synchronized (slavePools) {
122+
poolIndex++;
123+
if (poolIndex >= slavePools.size()) {
124+
poolIndex = 0;
125+
}
126+
startIdx = poolIndex;
127+
}
128+
return _getSlaveResource(startIdx, 0);
129+
}
130+
131+
private Connection _getSlaveResource(int idx, int cnt) {
132+
PoolInfo poolInfo;
133+
synchronized (slavePools) {
134+
if (cnt >= slavePools.size()) {
135+
return null;
136+
}
137+
poolInfo = slavePools.get(idx % slavePools.size());
138+
}
139+
try {
140+
Connection jedis = poolInfo.pool.getResource();
141+
return jedis;
142+
} catch (Exception e) {
143+
LOG.error("get connection fail:", e);
144+
return _getSlaveResource(idx + 1, cnt + 1);
145+
}
146+
}
147+
105148
@Override
106149
public Connection getConnection() {
107150
return pool.getResource();
108151
}
109152

110153
@Override
111154
public Connection getConnection(CommandArguments args) {
155+
boolean readCommand = masterClientConfig.isReadCommand(args);
156+
if (readCommand) {
157+
Connection slaveConn = getSlaveResource();
158+
if (slaveConn != null) {
159+
return slaveConn;
160+
}
161+
if (!masterClientConfig.isFallbackToMaster()) {
162+
throw new JedisException("can not get Connection, all slave is invalid");
163+
}
164+
}
112165
return pool.getResource();
113166
}
114167

@@ -117,6 +170,10 @@ public void close() {
117170
sentinelListeners.forEach(SentinelListener::shutdown);
118171

119172
pool.close();
173+
174+
for (PoolInfo slavePool : slavePools) {
175+
slavePool.pool.close();
176+
}
120177
}
121178

122179
public HostAndPort getCurrentMaster() {
@@ -167,6 +224,79 @@ private ConnectionPool createNodePool(HostAndPort master) {
167224
}
168225
}
169226

227+
private void initSlaves(List<HostAndPort> slaves) {
228+
List<PoolInfo> removedSlavePools = new ArrayList<>();
229+
try {
230+
synchronized (slavePools) {
231+
Loop:
232+
for (int i = slavePools.size()-1; i >= 0; i--) {
233+
PoolInfo poolInfo = slavePools.get(i);
234+
for (HostAndPort slave : slaves) {
235+
String host = slave.toString();
236+
if (poolInfo.host.equals(host)) {
237+
continue Loop;
238+
}
239+
}
240+
removedSlavePools.add(slavePools.remove(i));
241+
}
242+
243+
for (HostAndPort slave : slaves) {
244+
addSlave(slave);
245+
}
246+
}
247+
} finally {
248+
if (!removedSlavePools.isEmpty() && clientSideCache != null) {
249+
clientSideCache.flush();
250+
}
251+
252+
for (PoolInfo removedSlavePool : removedSlavePools) {
253+
removedSlavePool.pool.destroy();
254+
}
255+
}
256+
}
257+
258+
private static boolean isHealthy(String flags) {
259+
for (String flag : flags.split(",")) {
260+
switch (flag.trim()) {
261+
case "s_down":
262+
case "o_down":
263+
case "disconnected":
264+
return false;
265+
}
266+
}
267+
return true;
268+
}
269+
270+
private void addSlave(HostAndPort slave) {
271+
String newSlaveHost = slave.toString();
272+
synchronized (this.slavePools) {
273+
for (int i = 0; i < this.slavePools.size(); i++) {
274+
PoolInfo poolInfo = this.slavePools.get(i);
275+
if (poolInfo.host.equals(newSlaveHost)) {
276+
return;
277+
}
278+
}
279+
slavePools.add(new PoolInfo(newSlaveHost, createNodePool(slave)));
280+
}
281+
}
282+
283+
private void removeSlave(HostAndPort slave) {
284+
String newSlaveHost = slave.toString();
285+
PoolInfo removed = null;
286+
synchronized (this.slavePools) {
287+
for (int i = 0; i < this.slavePools.size(); i++) {
288+
PoolInfo poolInfo = this.slavePools.get(i);
289+
if (poolInfo.host.equals(newSlaveHost)) {
290+
removed = slavePools.remove(i);
291+
break;
292+
}
293+
}
294+
}
295+
if (removed != null) {
296+
removed.pool.destroy();
297+
}
298+
}
299+
170300
private HostAndPort initSentinels(Set<HostAndPort> sentinels) {
171301

172302
HostAndPort master = null;
@@ -262,6 +392,24 @@ public void run() {
262392

263393
sentinelJedis = new Jedis(node, sentinelClientConfig);
264394

395+
List<Map<String, String>> slaveInfos = sentinelJedis.sentinelSlaves(masterName);
396+
397+
List<HostAndPort> slaves = new ArrayList<>();
398+
399+
for (int i = 0; i < slaveInfos.size(); i++) {
400+
Map<String, String> slaveInfo = slaveInfos.get(i);
401+
String flags = slaveInfo.get("flags");
402+
if (flags == null || !isHealthy(flags)) {
403+
continue;
404+
}
405+
String ip = slaveInfo.get("ip");
406+
int port = Integer.parseInt(slaveInfo.get("port"));
407+
HostAndPort slave = new HostAndPort(ip, port);
408+
slaves.add(slave);
409+
}
410+
411+
initSlaves(slaves);
412+
265413
// code for active refresh
266414
List<String> masterAddr = sentinelJedis.sentinelGetMasterAddrByName(masterName);
267415
if (masterAddr == null || masterAddr.size() != 2) {
@@ -275,24 +423,58 @@ public void run() {
275423
public void onMessage(String channel, String message) {
276424
LOG.debug("Sentinel {} published: {}.", node, message);
277425

278-
String[] switchMasterMsg = message.split(" ");
279-
280-
if (switchMasterMsg.length > 3) {
281-
282-
if (masterName.equals(switchMasterMsg[0])) {
283-
initMaster(toHostAndPort(switchMasterMsg[3], switchMasterMsg[4]));
284-
} else {
285-
LOG.debug(
286-
"Ignoring message on +switch-master for master {}. Our master is {}.",
287-
switchMasterMsg[0], masterName);
288-
}
289-
290-
} else {
291-
LOG.error("Invalid message received on sentinel {} on channel +switch-master: {}.",
292-
node, message);
426+
String[] switchMsg = message.split(" ");
427+
String slaveIp;
428+
int slavePort;
429+
switch (channel) {
430+
case "+switch-master":
431+
if (switchMsg.length > 3) {
432+
if (masterName.equals(switchMsg[0])) {
433+
initMaster(toHostAndPort(switchMsg[3], switchMsg[4]));
434+
} else {
435+
LOG.debug(
436+
"Ignoring message on +switch-master for master {}. Our master is {}.",
437+
switchMsg[0], masterName);
438+
}
439+
} else {
440+
LOG.error("Invalid message received on sentinel {} on channel +switch-master: {}.",
441+
node, message);
442+
}
443+
break;
444+
case "+sdown":
445+
if (switchMsg[0].equals("master")) {
446+
return;
447+
}
448+
if (!masterName.equals(switchMsg[5])) {
449+
return;
450+
}
451+
slaveIp = switchMsg[2];
452+
slavePort = Integer.parseInt(switchMsg[3]);
453+
removeSlave(new HostAndPort(slaveIp, slavePort));
454+
break;
455+
case "-sdown":
456+
if (!masterName.equals(switchMsg[5])) {
457+
return;
458+
}
459+
slaveIp = switchMsg[2];
460+
slavePort = Integer.parseInt(switchMsg[3]);
461+
addSlave(new HostAndPort(slaveIp, slavePort));
462+
break;
463+
case "+slave":
464+
if (!masterName.equals(switchMsg[5])) {
465+
return;
466+
}
467+
slaveIp = switchMsg[2];
468+
slavePort = Integer.parseInt(switchMsg[3]);
469+
addSlave(new HostAndPort(slaveIp, slavePort));
470+
471+
String masterIp = switchMsg[6];
472+
int masterPort = Integer.parseInt(switchMsg[7]);
473+
removeSlave(new HostAndPort(masterIp, masterPort));
474+
break;
293475
}
294476
}
295-
}, "+switch-master");
477+
}, "+switch-master", "+sdown", "-sdown", "+slave");
296478

297479
} catch (JedisException e) {
298480

0 commit comments

Comments
 (0)