Skip to content

Commit f62f6d3

Browse files
authored
Improved tenant route cache refresh efficiency (#177)
* Improved tenant route cache refresh efficiency and reduced memory overhead accordingly * Reduce heap usage during matching and routing
1 parent 18e33d6 commit f62f6d3

File tree

60 files changed

+4012
-830
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

60 files changed

+4012
-830
lines changed

bifromq-dist/bifromq-dist-coproc-proto/pom.xml

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,11 @@
3434
<groupId>org.apache.bifromq</groupId>
3535
<artifactId>bifromq-common-type</artifactId>
3636
</dependency>
37+
<dependency>
38+
<groupId>com.github.ben-manes.caffeine</groupId>
39+
<artifactId>caffeine</artifactId>
40+
<version>${caffeine.version}</version>
41+
</dependency>
3742
<dependency>
3843
<groupId>org.apache.bifromq</groupId>
3944
<artifactId>bifromq-dist-worker-schema</artifactId>
@@ -47,6 +52,10 @@
4752
<groupId>org.openjdk.jmh</groupId>
4853
<artifactId>jmh-generator-annprocess</artifactId>
4954
</dependency>
55+
<dependency>
56+
<groupId>org.awaitility</groupId>
57+
<artifactId>awaitility</artifactId>
58+
</dependency>
5059
<dependency>
5160
<groupId>org.apache.logging.log4j</groupId>
5261
<artifactId>log4j-api</artifactId>
@@ -71,4 +80,4 @@
7180
</plugin>
7281
</plugins>
7382
</build>
74-
</project>
83+
</project>

bifromq-dist/bifromq-dist-coproc-proto/src/main/java/org/apache/bifromq/dist/trie/ITopicFilterIterator.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
1515
* KIND, either express or implied. See the License for the
1616
* specific language governing permissions and limitations
17-
* under the License.
17+
* under the License.
1818
*/
1919

2020
package org.apache.bifromq.dist.trie;
@@ -29,7 +29,19 @@
2929
*
3030
* @param <V> the value type for topic associated value
3131
*/
32-
public interface ITopicFilterIterator<V> {
32+
public interface ITopicFilterIterator<V> extends AutoCloseable {
33+
/**
34+
* Init the iterator with the given root node.
35+
*
36+
* @param root the root node of the topic trie
37+
*/
38+
void init(TopicTrieNode<V> root);
39+
40+
/**
41+
* Reset the iterator after using.
42+
*/
43+
void close();
44+
3345
/**
3446
* Seek to the given topic filter levels, so that the next topic filter is greater or equals to the given topic
3547
* filter levels.

bifromq-dist/bifromq-dist-coproc-proto/src/main/java/org/apache/bifromq/dist/trie/MTopicFilterTrieNode.java

Lines changed: 89 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -14,34 +14,105 @@
1414
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
1515
* KIND, either express or implied. See the License for the
1616
* specific language governing permissions and limitations
17-
* under the License.
17+
* under the License.
1818
*/
1919

2020
package org.apache.bifromq.dist.trie;
2121

2222
import static org.apache.bifromq.util.TopicConst.MULTI_WILDCARD;
23-
import static java.util.Collections.emptySet;
24-
import static java.util.Collections.singleton;
2523

26-
import com.google.common.collect.Sets;
24+
import com.github.benmanes.caffeine.cache.Cache;
25+
import com.github.benmanes.caffeine.cache.Caffeine;
26+
import com.github.benmanes.caffeine.cache.RemovalCause;
27+
import com.github.benmanes.caffeine.cache.Scheduler;
28+
import com.github.benmanes.caffeine.cache.Ticker;
29+
import java.util.HashSet;
2730
import java.util.NoSuchElementException;
2831
import java.util.Set;
32+
import java.util.concurrent.ConcurrentLinkedDeque;
33+
import java.util.concurrent.atomic.AtomicLong;
2934

3035
/**
3136
* Multi-level topic filter trie node.
3237
*
3338
* @param <V> value type
3439
*/
3540
final class MTopicFilterTrieNode<V> extends TopicFilterTrieNode<V> {
36-
private final Set<TopicTrieNode<V>> backingTopics;
41+
private static final ConcurrentLinkedDeque<Long> KEYS = new ConcurrentLinkedDeque<>();
42+
private static final AtomicLong SEQ = new AtomicLong();
43+
private static volatile Ticker TICKER = Ticker.systemTicker();
44+
private static final Cache<Long, MTopicFilterTrieNode<?>> POOL = Caffeine.newBuilder()
45+
.expireAfterAccess(EXPIRE_AFTER)
46+
.recordStats()
47+
.scheduler(Scheduler.systemScheduler())
48+
.ticker(() -> TICKER.read())
49+
.removalListener((Long key, MTopicFilterTrieNode<?> value, RemovalCause cause) -> {
50+
KEYS.remove(key);
51+
if (cause == RemovalCause.EXPIRED || cause == RemovalCause.SIZE) {
52+
value.recycle();
53+
}
54+
})
55+
.build();
56+
57+
private final Set<TopicTrieNode<V>> backingTopics = new HashSet<>();
58+
59+
MTopicFilterTrieNode() {
60+
}
61+
62+
static <V> MTopicFilterTrieNode<V> borrow(TopicFilterTrieNode<V> parent,
63+
Set<TopicTrieNode<V>> siblingTopicTrieNodes) {
64+
while (true) {
65+
Long key = KEYS.pollFirst();
66+
if (key == null) {
67+
break;
68+
}
69+
@SuppressWarnings("unchecked")
70+
MTopicFilterTrieNode<V> pooled = (MTopicFilterTrieNode<V>) POOL.asMap().remove(key);
71+
if (pooled != null) {
72+
return pooled.init(parent, siblingTopicTrieNodes);
73+
}
74+
}
75+
MTopicFilterTrieNode<V> node = new MTopicFilterTrieNode<>();
76+
return node.init(parent, siblingTopicTrieNodes);
77+
}
78+
79+
static void release(MTopicFilterTrieNode<?> node) {
80+
node.recycle();
81+
long key = SEQ.incrementAndGet();
82+
KEYS.offerLast(key);
83+
POOL.put(key, node);
84+
}
85+
86+
// test hooks (package-private)
87+
static void poolClear() {
88+
POOL.invalidateAll();
89+
POOL.cleanUp();
90+
KEYS.clear();
91+
}
92+
93+
static void poolCleanUp() {
94+
POOL.cleanUp();
95+
}
96+
97+
static int poolApproxSize() {
98+
return KEYS.size();
99+
}
100+
101+
static void setTicker(Ticker ticker) {
102+
TICKER = ticker != null ? ticker : Ticker.systemTicker();
103+
}
37104

38-
MTopicFilterTrieNode(TopicFilterTrieNode<V> parent, Set<TopicTrieNode<V>> siblingTopicTrieNodes) {
39-
super(parent);
40-
Set<TopicTrieNode<V>> topics = parent != null ? parent.backingTopics() : emptySet();
105+
MTopicFilterTrieNode<V> init(TopicFilterTrieNode<V> parent, Set<TopicTrieNode<V>> siblingTopicTrieNodes) {
106+
assert siblingTopicTrieNodes != null;
107+
this.parent = parent;
108+
backingTopics.clear();
109+
if (parent != null) {
110+
backingTopics.addAll(parent.backingTopics());
111+
}
41112
for (TopicTrieNode<V> sibling : siblingTopicTrieNodes) {
42-
topics = collectTopics(sibling, topics);
113+
collectTopics(sibling);
43114
}
44-
backingTopics = topics;
115+
return this;
45116
}
46117

47118
@Override
@@ -54,17 +125,15 @@ Set<TopicTrieNode<V>> backingTopics() {
54125
return backingTopics;
55126
}
56127

57-
private Set<TopicTrieNode<V>> collectTopics(TopicTrieNode<V> node, Set<TopicTrieNode<V>> topics) {
128+
private void collectTopics(TopicTrieNode<V> node) {
58129
if (node.isUserTopic()) {
59-
topics = Sets.union(topics, singleton(node));
130+
backingTopics.add(node);
60131
}
61132
for (TopicTrieNode<V> child : node.children().values()) {
62-
topics = collectTopics(child, topics);
133+
collectTopics(child);
63134
}
64-
return topics;
65135
}
66136

67-
68137
@Override
69138
void seekChild(String childLevelName) {
70139

@@ -104,4 +173,9 @@ void prevChild() {
104173
TopicFilterTrieNode<V> childNode() {
105174
throw new NoSuchElementException();
106175
}
176+
177+
private void recycle() {
178+
parent = null;
179+
backingTopics.clear();
180+
}
107181
}

bifromq-dist/bifromq-dist-coproc-proto/src/main/java/org/apache/bifromq/dist/trie/NTopicFilterTrieNode.java

Lines changed: 99 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,19 @@
1414
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
1515
* KIND, either express or implied. See the License for the
1616
* specific language governing permissions and limitations
17-
* under the License.
17+
* under the License.
1818
*/
1919

2020
package org.apache.bifromq.dist.trie;
2121

2222
import static org.apache.bifromq.util.TopicConst.MULTI_WILDCARD;
2323
import static org.apache.bifromq.util.TopicConst.SINGLE_WILDCARD;
2424

25+
import com.github.benmanes.caffeine.cache.Cache;
26+
import com.github.benmanes.caffeine.cache.Caffeine;
27+
import com.github.benmanes.caffeine.cache.RemovalCause;
28+
import com.github.benmanes.caffeine.cache.Scheduler;
29+
import com.github.benmanes.caffeine.cache.Ticker;
2530
import java.util.HashSet;
2631
import java.util.Map;
2732
import java.util.NavigableMap;
@@ -30,31 +35,98 @@
3035
import java.util.Set;
3136
import java.util.TreeMap;
3237
import java.util.TreeSet;
38+
import java.util.concurrent.ConcurrentLinkedDeque;
39+
import java.util.concurrent.atomic.AtomicLong;
3340

3441
/**
3542
* Normal level topic filter trie node.
3643
*
3744
* @param <V> value type
3845
*/
3946
final class NTopicFilterTrieNode<V> extends TopicFilterTrieNode<V> {
40-
private final String levelName;
41-
private final NavigableSet<String> subLevelNames;
42-
private final NavigableMap<String, Set<TopicTrieNode<V>>> subTopicTrieNodes;
43-
private final Set<TopicTrieNode<V>> subWildcardMatchableTopicTrieNodes;
44-
private final Set<TopicTrieNode<V>> backingTopics;
45-
47+
private static final ConcurrentLinkedDeque<Long> KEYS = new ConcurrentLinkedDeque<>();
48+
private static final AtomicLong SEQ = new AtomicLong();
49+
private static volatile Ticker TICKER = Ticker.systemTicker();
50+
private static final Cache<Long, NTopicFilterTrieNode<?>> POOL = Caffeine.newBuilder()
51+
.expireAfterAccess(EXPIRE_AFTER)
52+
.recordStats()
53+
.scheduler(Scheduler.systemScheduler())
54+
.ticker(() -> TICKER.read())
55+
.removalListener((Long key, NTopicFilterTrieNode<?> value, RemovalCause cause) -> {
56+
KEYS.remove(key);
57+
if (cause == RemovalCause.EXPIRED || cause == RemovalCause.SIZE) {
58+
value.recycle();
59+
}
60+
})
61+
.build();
62+
63+
private final NavigableSet<String> subLevelNames = new TreeSet<>();
64+
private final NavigableMap<String, Set<TopicTrieNode<V>>> subTopicTrieNodes = new TreeMap<>();
65+
private final Set<TopicTrieNode<V>> subWildcardMatchableTopicTrieNodes = new HashSet<>();
66+
private final Set<TopicTrieNode<V>> backingTopics = new HashSet<>();
67+
private String levelName;
4668
// point to the sub node during iteration
4769
private String subLevelName;
4870

49-
NTopicFilterTrieNode(TopicFilterTrieNode<V> parent, String levelName, Set<TopicTrieNode<V>> siblingTopicTrieNodes) {
50-
super(parent);
71+
NTopicFilterTrieNode() {
72+
}
73+
74+
static <V> NTopicFilterTrieNode<V> borrow(TopicFilterTrieNode<V> parent,
75+
String levelName,
76+
Set<TopicTrieNode<V>> siblingTopicTrieNodes) {
77+
while (true) {
78+
Long key = KEYS.pollFirst();
79+
if (key == null) {
80+
break;
81+
}
82+
@SuppressWarnings("unchecked")
83+
NTopicFilterTrieNode<V> pooled = (NTopicFilterTrieNode<V>) POOL.asMap().remove(key);
84+
if (pooled != null) {
85+
return pooled.init(parent, levelName, siblingTopicTrieNodes);
86+
}
87+
}
88+
NTopicFilterTrieNode<V> node = new NTopicFilterTrieNode<>();
89+
return node.init(parent, levelName, siblingTopicTrieNodes);
90+
}
91+
92+
static void release(NTopicFilterTrieNode<?> node) {
93+
node.recycle();
94+
long key = SEQ.incrementAndGet();
95+
KEYS.offerLast(key);
96+
POOL.put(key, node);
97+
}
98+
99+
// test hooks (package-private)
100+
static void poolClear() {
101+
POOL.invalidateAll();
102+
POOL.cleanUp();
103+
KEYS.clear();
104+
}
105+
106+
static void poolCleanUp() {
107+
POOL.cleanUp();
108+
}
109+
110+
static int poolApproxSize() {
111+
return KEYS.size();
112+
}
113+
114+
static void setTicker(Ticker ticker) {
115+
TICKER = ticker != null ? ticker : Ticker.systemTicker();
116+
}
117+
118+
NTopicFilterTrieNode<V> init(TopicFilterTrieNode<V> parent, String levelName,
119+
Set<TopicTrieNode<V>> siblingTopicTrieNodes) {
51120
assert levelName != null;
121+
assert siblingTopicTrieNodes != null;
52122
assert siblingTopicTrieNodes.stream().allMatch(node -> node.levelName().equals(levelName));
53-
this.subTopicTrieNodes = new TreeMap<>();
54-
this.subLevelNames = new TreeSet<>();
55-
this.subWildcardMatchableTopicTrieNodes = new HashSet<>();
123+
this.parent = parent;
56124
this.levelName = levelName;
57-
this.backingTopics = new HashSet<>();
125+
subLevelName = null;
126+
subLevelNames.clear();
127+
subTopicTrieNodes.clear();
128+
subWildcardMatchableTopicTrieNodes.clear();
129+
backingTopics.clear();
58130
for (TopicTrieNode<V> sibling : siblingTopicTrieNodes) {
59131
if (sibling.isUserTopic()) {
60132
backingTopics.add(sibling);
@@ -77,6 +149,7 @@ final class NTopicFilterTrieNode<V> extends TopicFilterTrieNode<V> {
77149
subLevelNames.add(SINGLE_WILDCARD);
78150
}
79151
seekChild("");
152+
return this;
80153
}
81154

82155
@Override
@@ -142,9 +215,19 @@ TopicFilterTrieNode<V> childNode() {
142215
throw new NoSuchElementException();
143216
}
144217
return switch (subLevelName) {
145-
case MULTI_WILDCARD -> new MTopicFilterTrieNode<>(this, subWildcardMatchableTopicTrieNodes);
146-
case SINGLE_WILDCARD -> new STopicFilterTrieNode<>(this, subWildcardMatchableTopicTrieNodes);
147-
default -> new NTopicFilterTrieNode<>(this, subLevelName, subTopicTrieNodes.get(subLevelName));
218+
case MULTI_WILDCARD -> MTopicFilterTrieNode.borrow(this, subWildcardMatchableTopicTrieNodes);
219+
case SINGLE_WILDCARD -> STopicFilterTrieNode.borrow(this, subWildcardMatchableTopicTrieNodes);
220+
default -> NTopicFilterTrieNode.borrow(this, subLevelName, subTopicTrieNodes.get(subLevelName));
148221
};
149222
}
223+
224+
private void recycle() {
225+
parent = null;
226+
levelName = null;
227+
subLevelName = null;
228+
subLevelNames.clear();
229+
subTopicTrieNodes.clear();
230+
subWildcardMatchableTopicTrieNodes.clear();
231+
backingTopics.clear();
232+
}
150233
}

0 commit comments

Comments
 (0)