23
23
import static org .apache .bifromq .basekv .utils .DescriptorUtil .getEffectiveRoute ;
24
24
import static org .apache .bifromq .basekv .utils .DescriptorUtil .organizeByEpoch ;
25
25
26
+ import java .util .Collections ;
27
+ import java .util .Comparator ;
28
+ import java .util .HashMap ;
29
+ import java .util .Map ;
30
+ import java .util .NavigableMap ;
31
+ import java .util .Set ;
32
+ import java .util .SortedSet ;
33
+ import java .util .TreeSet ;
26
34
import org .apache .bifromq .basekv .balance .BalanceResult ;
27
35
import org .apache .bifromq .basekv .balance .NoNeedBalance ;
28
36
import org .apache .bifromq .basekv .balance .StoreBalancer ;
29
37
import org .apache .bifromq .basekv .proto .Boundary ;
30
38
import org .apache .bifromq .basekv .proto .KVRangeDescriptor ;
39
+ import org .apache .bifromq .basekv .proto .KVRangeId ;
31
40
import org .apache .bifromq .basekv .proto .KVRangeStoreDescriptor ;
32
41
import org .apache .bifromq .basekv .raft .proto .RaftNodeStatus ;
33
42
import org .apache .bifromq .basekv .utils .EffectiveEpoch ;
43
+ import org .apache .bifromq .basekv .utils .KVRangeIdUtil ;
34
44
import org .apache .bifromq .basekv .utils .LeaderRange ;
35
- import java .util .Collections ;
36
- import java .util .Map ;
37
- import java .util .NavigableMap ;
38
- import java .util .Set ;
39
45
40
46
/**
41
47
* The RedundantEpochRemovalBalancer is a specialized StoreBalancer designed to manage and remove redundant replicas
@@ -72,7 +78,7 @@ public BalanceResult balance() {
72
78
return NoNeedBalance .INSTANCE ;
73
79
}
74
80
if (latest .size () > 1 ) {
75
- // deal with higher epoch redundant replicas generated during bootstrap at startup time
81
+ // deal with epoch-conflict ranges
76
82
Set <KVRangeStoreDescriptor > storeDescriptors = latest .lastEntry ().getValue ();
77
83
for (KVRangeStoreDescriptor storeDescriptor : storeDescriptors ) {
78
84
if (!storeDescriptor .getId ().equals (localStoreId )) {
@@ -82,12 +88,33 @@ public BalanceResult balance() {
82
88
if (rangeDescriptor .getRole () != RaftNodeStatus .Leader ) {
83
89
continue ;
84
90
}
91
+ log .debug ("Remove Epoch-Conflict range: {} in store {}" ,
92
+ KVRangeIdUtil .toString (rangeDescriptor .getId ()),
93
+ storeDescriptor .getId ());
85
94
return quit (localStoreId , rangeDescriptor );
86
95
}
87
96
}
97
+ return NoNeedBalance .INSTANCE ;
88
98
}
89
- // deal with redundant replicas generated within the effective epoch but not be included in the effective route
90
99
Map .Entry <Long , Set <KVRangeStoreDescriptor >> oldestEntry = latest .firstEntry ();
100
+ Map <KVRangeId , SortedSet <LeaderRange >> conflictingRanges = findConflictingRanges (oldestEntry .getValue ());
101
+ if (!conflictingRanges .isEmpty ()) {
102
+ // deal with id-conflict ranges
103
+ for (KVRangeId rangeId : conflictingRanges .keySet ()) {
104
+ SortedSet <LeaderRange > leaderRanges = conflictingRanges .get (rangeId );
105
+ for (LeaderRange leaderRange : leaderRanges ) {
106
+ if (!leaderRange .ownerStoreDescriptor ().getId ().equals (localStoreId )) {
107
+ return NoNeedBalance .INSTANCE ;
108
+ }
109
+ log .debug ("Remove Id-Conflict range: {} in store {}" ,
110
+ KVRangeIdUtil .toString (leaderRange .descriptor ().getId ()),
111
+ leaderRange .ownerStoreDescriptor ().getId ());
112
+ return quit (localStoreId , leaderRange .descriptor ());
113
+ }
114
+ }
115
+ return NoNeedBalance .INSTANCE ;
116
+ }
117
+ // deal with boundary-conflict ranges
91
118
EffectiveEpoch effectiveEpoch = new EffectiveEpoch (oldestEntry .getKey (), oldestEntry .getValue ());
92
119
NavigableMap <Boundary , LeaderRange > effectiveLeaders = getEffectiveRoute (effectiveEpoch ).leaderRanges ();
93
120
for (KVRangeStoreDescriptor storeDescriptor : effectiveEpoch .storeDescriptors ()) {
@@ -101,10 +128,35 @@ public BalanceResult balance() {
101
128
Boundary boundary = rangeDescriptor .getBoundary ();
102
129
LeaderRange leaderRange = effectiveLeaders .get (boundary );
103
130
if (leaderRange == null || !leaderRange .descriptor ().getId ().equals (rangeDescriptor .getId ())) {
131
+ log .debug ("Remove Boundary-Conflict range: {} in store {}" ,
132
+ KVRangeIdUtil .toString (rangeDescriptor .getId ()),
133
+ storeDescriptor .getId ());
104
134
return quit (localStoreId , rangeDescriptor );
105
135
}
106
136
}
107
137
}
108
138
return NoNeedBalance .INSTANCE ;
109
139
}
140
+
141
+ private Map <KVRangeId , SortedSet <LeaderRange >> findConflictingRanges (Set <KVRangeStoreDescriptor > effectiveEpoch ) {
142
+ Map <KVRangeId , SortedSet <LeaderRange >> leaderRangesByRangeId = new HashMap <>();
143
+ Map <KVRangeId , SortedSet <LeaderRange >> conflictingRanges = new HashMap <>();
144
+ for (KVRangeStoreDescriptor storeDescriptor : effectiveEpoch ) {
145
+ for (KVRangeDescriptor rangeDescriptor : storeDescriptor .getRangesList ()) {
146
+ if (rangeDescriptor .getRole () != RaftNodeStatus .Leader ) {
147
+ continue ;
148
+ }
149
+ KVRangeId rangeId = rangeDescriptor .getId ();
150
+ SortedSet <LeaderRange > leaderRanges = leaderRangesByRangeId .computeIfAbsent (rangeId , k -> new TreeSet <>(
151
+ Comparator .comparing ((LeaderRange lr ) -> lr .ownerStoreDescriptor ().getId (), String ::compareTo )
152
+ .reversed ()));
153
+ leaderRanges .add (new LeaderRange (rangeDescriptor , storeDescriptor ));
154
+ if (leaderRanges .size () > 1 ) {
155
+ // More than one leader for the same range, add to conflicting ranges
156
+ conflictingRanges .put (rangeId , leaderRanges );
157
+ }
158
+ }
159
+ }
160
+ return conflictingRanges ;
161
+ }
110
162
}
0 commit comments