Skip to content

Commit bd803d6

Browse files
committed
Reduce the memory consumption in:
1. the read/write path of base-kv 2. inbox service helper's frequently-used method
1 parent b6f4597 commit bd803d6

File tree

51 files changed

+608
-451
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

51 files changed

+608
-451
lines changed

base-kv/base-kv-local-engine/src/main/java/com/baidu/bifromq/basekv/localengine/AbstractKVEngine.java

Lines changed: 69 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -13,33 +13,43 @@
1313

1414
package com.baidu.bifromq.basekv.localengine;
1515

16+
import static com.google.common.collect.Lists.newArrayList;
17+
18+
import com.baidu.bifromq.basekv.localengine.metrics.KVSpaceOpMeters;
1619
import com.baidu.bifromq.logger.SiftLogger;
20+
import com.google.common.collect.Iterables;
1721
import io.micrometer.core.instrument.Gauge;
1822
import io.micrometer.core.instrument.Metrics;
23+
import io.micrometer.core.instrument.Tags;
24+
import java.util.Collections;
25+
import java.util.List;
26+
import java.util.Map;
27+
import java.util.concurrent.ConcurrentHashMap;
1928
import java.util.concurrent.atomic.AtomicReference;
2029
import org.slf4j.Logger;
2130

22-
public abstract class AbstractKVEngine<T extends IKVSpace> implements IKVEngine<T> {
23-
protected enum State {
24-
INIT, STARTING, STARTED, FATAL_FAILURE, STOPPING, STOPPED
25-
}
26-
31+
/**
32+
* The abstract class of KVEngine.
33+
*
34+
* @param <T> the type of KV space created by the engine
35+
* @param <C> the type of configurator
36+
*/
37+
public abstract class AbstractKVEngine<T extends IKVSpace, C extends IKVEngineConfigurator> implements IKVEngine<T> {
2738
protected final String overrideIdentity;
39+
protected final C configurator;
40+
private final AtomicReference<State> state = new AtomicReference<>(State.INIT);
41+
private final Map<String, T> kvSpaceMap = new ConcurrentHashMap<>();
2842
protected Logger log;
2943
protected String[] metricTags;
30-
private final AtomicReference<State> state = new AtomicReference<>(State.INIT);
3144
private Gauge gauge;
3245

33-
public AbstractKVEngine(String overrideIdentity) {
46+
public AbstractKVEngine(String overrideIdentity, C configurator) {
3447
this.overrideIdentity = overrideIdentity;
35-
}
36-
37-
protected State state() {
38-
return state.get();
48+
this.configurator = configurator;
3949
}
4050

4151
@Override
42-
public void start(String... tags) {
52+
public final void start(String... tags) {
4353
if (state.compareAndSet(State.INIT, State.STARTING)) {
4454
try {
4555
log = SiftLogger.getLogger(this.getClass(), tags);
@@ -64,7 +74,7 @@ protected void afterStart() {
6474
}
6575

6676
@Override
67-
public void stop() {
77+
public final void stop() {
6878
assertStarted();
6979
if (state.compareAndSet(State.STARTED, State.STOPPING)) {
7080
try {
@@ -76,9 +86,54 @@ public void stop() {
7686
}
7787
}
7888

79-
protected abstract void doStop();
89+
protected void doStop() {
90+
kvSpaceMap.values().forEach(IKVSpace::close);
91+
}
8092

8193
protected void assertStarted() {
8294
assert state.get() == State.STARTED : "Not started";
8395
}
96+
97+
@Override
98+
public final Map<String, T> spaces() {
99+
assertStarted();
100+
return Collections.unmodifiableMap(kvSpaceMap);
101+
}
102+
103+
@Override
104+
public final T createIfMissing(String spaceId) {
105+
assertStarted();
106+
return kvSpaceMap.computeIfAbsent(spaceId,
107+
k -> {
108+
T space = buildKVSpace(spaceId, configurator, () -> kvSpaceMap.remove(spaceId), metricTags);
109+
space.open();
110+
return space;
111+
});
112+
}
113+
114+
protected final void load(String spaceId) {
115+
T space = buildKVSpace(spaceId, configurator, () -> kvSpaceMap.remove(spaceId), metricTags);
116+
space.open();
117+
T prev = kvSpaceMap.put(spaceId, space);
118+
assert prev == null;
119+
}
120+
121+
private T buildKVSpace(String spaceId, C configurator, Runnable onDestroy, String... tags) {
122+
String[] tagList =
123+
newArrayList(Iterables.concat(List.of(tags), List.of("spaceId", spaceId))).toArray(String[]::new);
124+
KVSpaceOpMeters opMeters = new KVSpaceOpMeters(spaceId, Tags.of(tagList));
125+
Logger logger = SiftLogger.getLogger("space.logger", tagList);
126+
return doBuildKVSpace(spaceId, configurator, onDestroy, opMeters, logger, tagList);
127+
}
128+
129+
protected abstract T doBuildKVSpace(String spaceId,
130+
C configurator,
131+
Runnable onDestroy,
132+
KVSpaceOpMeters opMeters,
133+
Logger logger,
134+
String... tags);
135+
136+
private enum State {
137+
INIT, STARTING, STARTED, FATAL_FAILURE, STOPPING, STOPPED
138+
}
84139
}

base-kv/base-kv-local-engine/src/main/java/com/baidu/bifromq/basekv/localengine/AbstractKVSpaceReader.java

Lines changed: 19 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -13,34 +13,21 @@
1313

1414
package com.baidu.bifromq.basekv.localengine;
1515

16-
import static com.baidu.bifromq.basekv.localengine.metrics.KVSpaceMeters.getTimer;
17-
18-
import com.baidu.bifromq.basekv.localengine.metrics.KVSpaceMetric;
16+
import com.baidu.bifromq.basekv.localengine.metrics.KVSpaceOpMeters;
1917
import com.baidu.bifromq.basekv.proto.Boundary;
20-
import com.baidu.bifromq.logger.SiftLogger;
2118
import com.google.protobuf.ByteString;
22-
import io.micrometer.core.instrument.Tags;
23-
import io.micrometer.core.instrument.Timer;
24-
import java.util.ArrayList;
25-
import java.util.Arrays;
26-
import java.util.List;
2719
import java.util.Optional;
2820
import org.slf4j.Logger;
2921

3022
public abstract class AbstractKVSpaceReader implements IKVSpaceReader {
31-
protected final Logger log;
3223
protected final String id;
33-
protected final String[] metricTags;
34-
private final MetricManager metricMgr;
24+
protected final KVSpaceOpMeters opMeters;
25+
protected final Logger logger;
3526

36-
protected AbstractKVSpaceReader(String id, String... tags) {
27+
protected AbstractKVSpaceReader(String id, KVSpaceOpMeters opMeters, Logger logger) {
3728
this.id = id;
38-
List<String> allTags = new ArrayList<>(Arrays.asList(tags));
39-
allTags.add("spaceId");
40-
allTags.add(id);
41-
this.metricTags = allTags.toArray(String[]::new);
42-
this.metricMgr = new MetricManager(Tags.of(metricTags));
43-
this.log = SiftLogger.getLogger(getClass(), allTags.toArray(String[]::new));
29+
this.opMeters = opMeters;
30+
this.logger = logger;
4431
}
4532

4633
@Override
@@ -50,7 +37,7 @@ public final String id() {
5037

5138
@Override
5239
public final Optional<ByteString> metadata(ByteString metaKey) {
53-
return metricMgr.metadataCallTimer.record(() -> doMetadata(metaKey));
40+
return opMeters.metadataCallTimer.record(() -> doMetadata(metaKey));
5441
}
5542

5643
protected abstract Optional<ByteString> doMetadata(ByteString metaKey);
@@ -62,35 +49,35 @@ public final long size() {
6249

6350
@Override
6451
public final long size(Boundary boundary) {
65-
return metricMgr.sizeCallTimer.record(() -> doSize(boundary));
52+
return opMeters.sizeCallTimer.record(() -> doSize(boundary));
6653
}
6754

6855
protected abstract long doSize(Boundary boundary);
6956

7057
@Override
7158
public final boolean exist(ByteString key) {
72-
return metricMgr.existCallTimer.record(() -> doExist(key));
59+
return opMeters.existCallTimer.record(() -> doExist(key));
7360
}
7461

7562
protected abstract boolean doExist(ByteString key);
7663

7764
@Override
7865
public final Optional<ByteString> get(ByteString key) {
79-
return metricMgr.getCallTimer.record(() -> doGet(key));
66+
return opMeters.getCallTimer.record(() -> doGet(key));
8067
}
8168

8269
protected abstract Optional<ByteString> doGet(ByteString key);
8370

8471
@Override
8572
public final IKVSpaceIterator newIterator() {
86-
return metricMgr.iterNewCallTimer.record(() -> new MonitoredKeyRangeIterator(doNewIterator()));
73+
return opMeters.iterNewCallTimer.record(() -> new MonitoredKeyRangeIterator(doNewIterator()));
8774
}
8875

8976
protected abstract IKVSpaceIterator doNewIterator();
9077

9178
@Override
9279
public final IKVSpaceIterator newIterator(Boundary subBoundary) {
93-
return metricMgr.iterNewCallTimer.record(() -> new MonitoredKeyRangeIterator(doNewIterator(subBoundary)));
80+
return opMeters.iterNewCallTimer.record(() -> new MonitoredKeyRangeIterator(doNewIterator(subBoundary)));
9481
}
9582

9683
protected abstract IKVSpaceIterator doNewIterator(Boundary subBoundary);
@@ -119,74 +106,42 @@ public boolean isValid() {
119106

120107
@Override
121108
public void next() {
122-
metricMgr.iterNextCallTimer.record(delegate::next);
109+
opMeters.iterNextCallTimer.record(delegate::next);
123110
}
124111

125112
@Override
126113
public void prev() {
127-
metricMgr.iterPrevCallTimer.record(delegate::prev);
114+
opMeters.iterPrevCallTimer.record(delegate::prev);
128115
}
129116

130117
@Override
131118
public void seekToFirst() {
132-
metricMgr.iterSeekToFirstCallTimer.record(delegate::seekToFirst);
119+
opMeters.iterSeekToFirstCallTimer.record(delegate::seekToFirst);
133120
}
134121

135122
@Override
136123
public void seekToLast() {
137-
metricMgr.iterSeekToLastCallTimer.record(delegate::seekToLast);
124+
opMeters.iterSeekToLastCallTimer.record(delegate::seekToLast);
138125
}
139126

140127
@Override
141128
public void seek(ByteString target) {
142-
metricMgr.iterSeekCallTimer.record(() -> delegate.seek(target));
129+
opMeters.iterSeekCallTimer.record(() -> delegate.seek(target));
143130
}
144131

145132
@Override
146133
public void seekForPrev(ByteString target) {
147-
metricMgr.iterSeekForPrevCallTimer.record(() -> delegate.seekForPrev(target));
134+
opMeters.iterSeekForPrevCallTimer.record(() -> delegate.seekForPrev(target));
148135
}
149136

150137
@Override
151138
public void refresh() {
152-
metricMgr.iterRefreshTimer.record(delegate::refresh);
139+
opMeters.iterRefreshTimer.record(delegate::refresh);
153140
}
154141

155142
@Override
156143
public void close() {
157144
delegate.close();
158145
}
159146
}
160-
161-
private class MetricManager {
162-
private final Timer metadataCallTimer;
163-
private final Timer sizeCallTimer;
164-
private final Timer existCallTimer;
165-
private final Timer getCallTimer;
166-
private final Timer iterNewCallTimer;
167-
private final Timer iterSeekCallTimer;
168-
private final Timer iterSeekForPrevCallTimer;
169-
private final Timer iterSeekToFirstCallTimer;
170-
private final Timer iterSeekToLastCallTimer;
171-
private final Timer iterNextCallTimer;
172-
private final Timer iterPrevCallTimer;
173-
private final Timer iterRefreshTimer;
174-
175-
MetricManager(Tags tags) {
176-
metadataCallTimer = getTimer(id, KVSpaceMetric.CallTimer, tags.and("op", "metadata"));
177-
178-
sizeCallTimer = getTimer(id, KVSpaceMetric.CallTimer, tags.and("op", "size"));
179-
existCallTimer = getTimer(id, KVSpaceMetric.CallTimer, tags.and("op", "exist"));
180-
getCallTimer = getTimer(id, KVSpaceMetric.CallTimer, tags.and("op", "get"));
181-
iterNewCallTimer = getTimer(id, KVSpaceMetric.CallTimer, tags.and("op", "newitr"));
182-
183-
iterSeekCallTimer = getTimer(id, KVSpaceMetric.CallTimer, tags.and("op", "seek"));
184-
iterSeekForPrevCallTimer = getTimer(id, KVSpaceMetric.CallTimer, tags.and("op", "pseek"));
185-
iterSeekToFirstCallTimer = getTimer(id, KVSpaceMetric.CallTimer, tags.and("op", "fseek"));
186-
iterSeekToLastCallTimer = getTimer(id, KVSpaceMetric.CallTimer, tags.and("op", "lseek"));
187-
iterNextCallTimer = getTimer(id, KVSpaceMetric.CallTimer, tags.and("op", "next"));
188-
iterPrevCallTimer = getTimer(id, KVSpaceMetric.CallTimer, tags.and("op", "prev"));
189-
iterRefreshTimer = getTimer(id, KVSpaceMetric.CallTimer, tags.and("op", "refresh"));
190-
}
191-
}
192147
}

base-kv/base-kv-local-engine/src/main/java/com/baidu/bifromq/basekv/localengine/ICPableKVSpace.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,12 @@
1515

1616
import java.util.Optional;
1717

18+
/**
19+
* KV space supports checkpoint.
20+
*/
1821
public interface ICPableKVSpace extends IKVSpace {
1922
/**
20-
* Make a checkpoint of the current range state
23+
* Make a checkpoint of the current range state.
2124
*
2225
* @return global unique id of the checkpoint
2326
*/
@@ -31,5 +34,5 @@ public interface ICPableKVSpace extends IKVSpace {
3134
* @param checkpointId the checkpoint id
3235
* @return the range object for accessing the checkpoint
3336
*/
34-
Optional<IKVSpaceCheckpoint> open(String checkpointId);
37+
Optional<IKVSpaceCheckpoint> openCheckpoint(String checkpointId);
3538
}

base-kv/base-kv-local-engine/src/main/java/com/baidu/bifromq/basekv/localengine/IKVEngine.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,41 +15,46 @@
1515

1616
import java.util.Map;
1717

18+
/**
19+
* The interface of kv engine.
20+
*
21+
* @param <T> the type of kv space created by the engine
22+
*/
1823
public interface IKVEngine<T extends IKVSpace> {
1924

2025
String DEFAULT_NS = "default";
2126

2227
/**
23-
* The unique identifier of the engine
28+
* The unique identifier of the engine.
2429
*
2530
* @return id
2631
*/
2732
String id();
2833

2934
/**
30-
* Find all currently available kv spaces
35+
* Find all currently available kv spaces.
3136
*
3237
* @return the kv space list
3338
*/
3439
Map<String, T> spaces();
3540

3641
/**
37-
* Create a new key range with specified spaceId and boundary or get existing key range
42+
* Create a new key range with specified spaceId and boundary or get existing key range.
3843
*
3944
* @param spaceId the space id
4045
* @return the key range created
4146
*/
4247
T createIfMissing(String spaceId);
4348

4449
/**
45-
* Start the kv engine and specifying additional tags for generated metrics
50+
* Start the kv engine and specifying additional tags for generated metrics.
4651
*
4752
* @param metricTags the additional metric tags
4853
*/
4954
void start(String... metricTags);
5055

5156
/**
52-
* Stop the engine
57+
* Stop the engine.
5358
*/
5459
void stop();
5560
}

0 commit comments

Comments
 (0)