Skip to content

Commit fc40ee6

Browse files
authored
Memory usage improvement (#180)
1. Improve memory usage for route cache; 2. Enhance restore session management to reuse existing sessions for the same snapshot and handle leader changes appropriately; 3. Enhance snapshot handling to ignore obsolete snapshots based on committed progress; 4. Streamlining snapshot installation process in order to reduce memory overhead; 5. Reduce memory overhead during Split/Merge process; 6. Improved dist worker range lookup efficiency.
1 parent eaac04f commit fc40ee6

File tree

87 files changed

+2075
-634
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

87 files changed

+2075
-634
lines changed

base-kv/base-kv-local-engine/src/main/java/org/apache/bifromq/basekv/localengine/rocksdb/RocksDBKVEngineIterator.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,16 @@ class RocksDBKVEngineIterator implements AutoCloseable {
3838
Snapshot snapshot,
3939
byte[] startKey,
4040
byte[] endKey) {
41-
ReadOptions readOptions = new ReadOptions().setPinData(true);
41+
this(db, cfHandle, snapshot, startKey, endKey, true);
42+
}
43+
44+
RocksDBKVEngineIterator(RocksDB db,
45+
ColumnFamilyHandle cfHandle,
46+
Snapshot snapshot,
47+
byte[] startKey,
48+
byte[] endKey,
49+
boolean fillCache) {
50+
ReadOptions readOptions = new ReadOptions().setPinData(true).setFillCache(fillCache);
4251
Slice lowerSlice = null;
4352
if (startKey != null) {
4453
lowerSlice = new Slice(startKey);

base-kv/base-kv-local-engine/src/main/java/org/apache/bifromq/basekv/localengine/rocksdb/RocksDBKVSpaceCheckpoint.java

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,19 +14,16 @@
1414
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
1515
* KIND, either express or implied. See the License for the
1616
* specific language governing permissions and limitations
17-
* under the License.
17+
* under the License.
1818
*/
1919

2020
package org.apache.bifromq.basekv.localengine.rocksdb;
2121

22+
import static com.google.protobuf.UnsafeByteOperations.unsafeWrap;
2223
import static org.apache.bifromq.basekv.localengine.IKVEngine.DEFAULT_NS;
2324
import static org.apache.bifromq.basekv.localengine.rocksdb.Keys.toMetaKey;
2425
import static org.apache.bifromq.basekv.localengine.rocksdb.RocksDBKVSpace.deleteDir;
25-
import static com.google.protobuf.UnsafeByteOperations.unsafeWrap;
2626

27-
import org.apache.bifromq.basekv.localengine.ISyncContext;
28-
import org.apache.bifromq.basekv.localengine.KVEngineException;
29-
import org.apache.bifromq.basekv.localengine.metrics.KVSpaceOpMeters;
3027
import com.google.protobuf.ByteString;
3128
import java.io.File;
3229
import java.io.IOException;
@@ -36,6 +33,11 @@
3633
import java.util.Optional;
3734
import java.util.function.Predicate;
3835
import java.util.function.Supplier;
36+
import org.apache.bifromq.basekv.localengine.IKVSpaceIterator;
37+
import org.apache.bifromq.basekv.localengine.ISyncContext;
38+
import org.apache.bifromq.basekv.localengine.KVEngineException;
39+
import org.apache.bifromq.basekv.localengine.metrics.KVSpaceOpMeters;
40+
import org.apache.bifromq.basekv.proto.Boundary;
3941
import org.rocksdb.BlockBasedTableConfig;
4042
import org.rocksdb.ColumnFamilyDescriptor;
4143
import org.rocksdb.ColumnFamilyHandle;
@@ -138,6 +140,16 @@ public <T> T call(Supplier<T> supplier) {
138140
};
139141
}
140142

143+
@Override
144+
protected IKVSpaceIterator doNewIterator() {
145+
return new RocksDBKVSpaceIterator(db(), cfHandle(), null, Boundary.getDefaultInstance(), newRefresher(), false);
146+
}
147+
148+
@Override
149+
protected IKVSpaceIterator doNewIterator(Boundary subBoundary) {
150+
return new RocksDBKVSpaceIterator(db(), cfHandle(), null, subBoundary, newRefresher(), false);
151+
}
152+
141153
private record ClosableResources(
142154
String id,
143155
String cpId,

base-kv/base-kv-local-engine/src/main/java/org/apache/bifromq/basekv/localengine/rocksdb/RocksDBKVSpaceIterator.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,19 +19,19 @@
1919

2020
package org.apache.bifromq.basekv.localengine.rocksdb;
2121

22+
import static com.google.protobuf.UnsafeByteOperations.unsafeWrap;
2223
import static org.apache.bifromq.basekv.localengine.rocksdb.Keys.DATA_SECTION_END;
2324
import static org.apache.bifromq.basekv.localengine.rocksdb.Keys.DATA_SECTION_START;
2425
import static org.apache.bifromq.basekv.localengine.rocksdb.Keys.fromDataKey;
2526
import static org.apache.bifromq.basekv.localengine.rocksdb.Keys.toDataKey;
2627
import static org.apache.bifromq.basekv.utils.BoundaryUtil.endKeyBytes;
2728
import static org.apache.bifromq.basekv.utils.BoundaryUtil.startKeyBytes;
28-
import static com.google.protobuf.UnsafeByteOperations.unsafeWrap;
2929

30+
import com.google.protobuf.ByteString;
31+
import java.lang.ref.Cleaner;
3032
import org.apache.bifromq.basekv.localengine.IKVSpaceIterator;
3133
import org.apache.bifromq.basekv.localengine.ISyncContext;
3234
import org.apache.bifromq.basekv.proto.Boundary;
33-
import com.google.protobuf.ByteString;
34-
import java.lang.ref.Cleaner;
3535
import org.rocksdb.ColumnFamilyHandle;
3636
import org.rocksdb.RocksDB;
3737
import org.rocksdb.Snapshot;
@@ -41,6 +41,7 @@ class RocksDBKVSpaceIterator implements IKVSpaceIterator {
4141
private final RocksDBKVEngineIterator rocksItr;
4242
private final ISyncContext.IRefresher refresher;
4343
private final Cleaner.Cleanable onClose;
44+
4445
public RocksDBKVSpaceIterator(RocksDB db,
4546
ColumnFamilyHandle cfHandle,
4647
Boundary boundary,
@@ -53,11 +54,20 @@ public RocksDBKVSpaceIterator(RocksDB db,
5354
Snapshot snapshot,
5455
Boundary boundary,
5556
ISyncContext.IRefresher refresher) {
57+
this(db, cfHandle, snapshot, boundary, refresher, true);
58+
}
59+
60+
public RocksDBKVSpaceIterator(RocksDB db,
61+
ColumnFamilyHandle cfHandle,
62+
Snapshot snapshot,
63+
Boundary boundary,
64+
ISyncContext.IRefresher refresher,
65+
boolean fillCache) {
5666
byte[] startKey = startKeyBytes(boundary);
5767
byte[] endKey = endKeyBytes(boundary);
5868
startKey = startKey != null ? toDataKey(startKey) : DATA_SECTION_START;
5969
endKey = endKey != null ? toDataKey(endKey) : DATA_SECTION_END;
60-
this.rocksItr = new RocksDBKVEngineIterator(db, cfHandle, snapshot, startKey, endKey);
70+
this.rocksItr = new RocksDBKVEngineIterator(db, cfHandle, snapshot, startKey, endKey, fillCache);
6171
this.refresher = refresher;
6272
onClose = CLEANER.register(this, new State(rocksItr));
6373
}

base-kv/base-kv-local-engine/src/main/java/org/apache/bifromq/basekv/localengine/rocksdb/RocksDBKVSpaceReader.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,24 +19,24 @@
1919

2020
package org.apache.bifromq.basekv.localengine.rocksdb;
2121

22+
import static com.google.protobuf.UnsafeByteOperations.unsafeWrap;
23+
import static java.util.Collections.singletonList;
2224
import static org.apache.bifromq.basekv.localengine.rocksdb.Keys.DATA_SECTION_END;
2325
import static org.apache.bifromq.basekv.localengine.rocksdb.Keys.DATA_SECTION_START;
2426
import static org.apache.bifromq.basekv.localengine.rocksdb.Keys.toDataKey;
2527
import static org.apache.bifromq.basekv.utils.BoundaryUtil.compare;
2628
import static org.apache.bifromq.basekv.utils.BoundaryUtil.isValid;
27-
import static com.google.protobuf.UnsafeByteOperations.unsafeWrap;
28-
import static java.util.Collections.singletonList;
2929
import static org.rocksdb.SizeApproximationFlag.INCLUDE_FILES;
3030
import static org.rocksdb.SizeApproximationFlag.INCLUDE_MEMTABLES;
3131

32+
import com.google.protobuf.ByteString;
33+
import java.util.Optional;
3234
import org.apache.bifromq.basekv.localengine.AbstractKVSpaceReader;
3335
import org.apache.bifromq.basekv.localengine.IKVSpaceIterator;
3436
import org.apache.bifromq.basekv.localengine.ISyncContext;
3537
import org.apache.bifromq.basekv.localengine.KVEngineException;
3638
import org.apache.bifromq.basekv.localengine.metrics.KVSpaceOpMeters;
3739
import org.apache.bifromq.basekv.proto.Boundary;
38-
import com.google.protobuf.ByteString;
39-
import java.util.Optional;
4040
import org.rocksdb.ColumnFamilyHandle;
4141
import org.rocksdb.Range;
4242
import org.rocksdb.RocksDB;
@@ -86,12 +86,12 @@ protected final Optional<ByteString> doGet(ByteString key) {
8686
}
8787

8888
@Override
89-
protected final IKVSpaceIterator doNewIterator() {
89+
protected IKVSpaceIterator doNewIterator() {
9090
return new RocksDBKVSpaceIterator(db(), cfHandle(), Boundary.getDefaultInstance(), newRefresher());
9191
}
9292

9393
@Override
94-
protected final IKVSpaceIterator doNewIterator(Boundary subBoundary) {
94+
protected IKVSpaceIterator doNewIterator(Boundary subBoundary) {
9595
assert isValid(subBoundary);
9696
return new RocksDBKVSpaceIterator(db(), cfHandle(), subBoundary, newRefresher());
9797
}

base-kv/base-kv-local-engine/src/main/java/org/apache/bifromq/basekv/localengine/rocksdb/RocksDBKVSpaceSnapshot.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,26 +14,26 @@
1414
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
1515
* KIND, either express or implied. See the License for the
1616
* specific language governing permissions and limitations
17-
* under the License.
17+
* under the License.
1818
*/
1919

2020
package org.apache.bifromq.basekv.localengine.rocksdb;
2121

22+
import static com.google.protobuf.UnsafeByteOperations.unsafeWrap;
2223
import static org.apache.bifromq.basekv.localengine.rocksdb.Keys.toDataKey;
2324
import static org.apache.bifromq.basekv.localengine.rocksdb.Keys.toMetaKey;
2425
import static org.apache.bifromq.basekv.utils.BoundaryUtil.isValid;
25-
import static com.google.protobuf.UnsafeByteOperations.unsafeWrap;
2626

27+
import com.google.protobuf.ByteString;
28+
import java.lang.ref.Cleaner;
29+
import java.util.Optional;
30+
import java.util.function.Supplier;
2731
import org.apache.bifromq.basekv.localengine.AbstractKVSpaceReader;
2832
import org.apache.bifromq.basekv.localengine.IKVSpaceIterator;
2933
import org.apache.bifromq.basekv.localengine.ISyncContext;
3034
import org.apache.bifromq.basekv.localengine.KVEngineException;
3135
import org.apache.bifromq.basekv.localengine.metrics.KVSpaceOpMeters;
3236
import org.apache.bifromq.basekv.proto.Boundary;
33-
import com.google.protobuf.ByteString;
34-
import java.lang.ref.Cleaner;
35-
import java.util.Optional;
36-
import java.util.function.Supplier;
3737
import org.rocksdb.ColumnFamilyHandle;
3838
import org.rocksdb.ReadOptions;
3939
import org.rocksdb.RocksDB;
@@ -119,8 +119,7 @@ protected Optional<ByteString> doGet(ByteString key) {
119119

120120
@Override
121121
protected IKVSpaceIterator doNewIterator() {
122-
return new RocksDBKVSpaceIterator(db, cfHandle, snapshot, Boundary.getDefaultInstance(), DUMMY_REFRESHER
123-
);
122+
return new RocksDBKVSpaceIterator(db, cfHandle, snapshot, Boundary.getDefaultInstance(), DUMMY_REFRESHER);
124123
}
125124

126125
@Override

base-kv/base-kv-local-engine/src/main/java/org/apache/bifromq/basekv/localengine/rocksdb/RocksDBKVSpaceWriter.java

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,20 +19,20 @@
1919

2020
package org.apache.bifromq.basekv.localengine.rocksdb;
2121

22-
import static org.apache.bifromq.basekv.localengine.rocksdb.Keys.toMetaKey;
2322
import static com.google.protobuf.UnsafeByteOperations.unsafeWrap;
23+
import static org.apache.bifromq.basekv.localengine.rocksdb.Keys.toMetaKey;
2424

25+
import com.google.protobuf.ByteString;
26+
import java.util.Map;
27+
import java.util.Optional;
28+
import java.util.function.Consumer;
2529
import org.apache.bifromq.basekv.localengine.IKVSpaceIterator;
2630
import org.apache.bifromq.basekv.localengine.IKVSpaceMetadataWriter;
2731
import org.apache.bifromq.basekv.localengine.IKVSpaceWriter;
2832
import org.apache.bifromq.basekv.localengine.ISyncContext;
2933
import org.apache.bifromq.basekv.localengine.KVEngineException;
3034
import org.apache.bifromq.basekv.localengine.metrics.KVSpaceOpMeters;
3135
import org.apache.bifromq.basekv.proto.Boundary;
32-
import com.google.protobuf.ByteString;
33-
import java.util.Map;
34-
import java.util.Optional;
35-
import java.util.function.Consumer;
3636
import org.rocksdb.ColumnFamilyHandle;
3737
import org.rocksdb.RocksDB;
3838
import org.rocksdb.RocksDBException;
@@ -42,6 +42,8 @@
4242
class RocksDBKVSpaceWriter<E extends RocksDBKVEngine<E, T, C>, T extends
4343
RocksDBKVSpace<E, T, C>, C extends RocksDBKVEngineConfigurator<C>>
4444
extends RocksDBKVSpaceReader implements IKVSpaceWriter {
45+
private static final long MIGRATION_FLUSH_BYTES = 4L * 1024 * 1024;
46+
private static final int MIGRATION_FLUSH_OPS = 4096;
4547
private final RocksDB db;
4648
private final ColumnFamilyHandle cfHandle;
4749
private final ISyncContext syncContext;
@@ -135,12 +137,13 @@ public IKVSpaceWriter clear(Boundary boundary) {
135137
public IKVSpaceMetadataWriter migrateTo(String targetSpaceId, Boundary boundary) {
136138
try {
137139
RocksDBKVSpace<?, ?, ?> targetKVSpace = engine.createIfMissing(targetSpaceId);
138-
IKVSpaceWriter targetKVSpaceWriter = targetKVSpace.toWriter();
140+
RocksDBKVSpaceWriter<?, ?, ?> targetKVSpaceWriter = (RocksDBKVSpaceWriter<?, ?, ?>) targetKVSpace.toWriter();
139141
// move data
140142
int c = 0;
141143
try (IKVSpaceIterator itr = newIterator(boundary)) {
142144
for (itr.seekToFirst(); itr.isValid(); itr.next()) {
143145
targetKVSpaceWriter.put(itr.key(), itr.value());
146+
targetKVSpaceWriter.flushIfNeededForMigration();
144147
c++;
145148
}
146149
}
@@ -163,6 +166,7 @@ public IKVSpaceMetadataWriter migrateFrom(String fromSpaceId, Boundary boundary)
163166
try (IKVSpaceIterator itr = sourceKVSpace.newIterator(boundary)) {
164167
for (itr.seekToFirst(); itr.isValid(); itr.next()) {
165168
helper.put(cfHandle(), itr.key(), itr.value());
169+
flushIfNeededForMigration();
166170
}
167171
}
168172
// clear moved data in right range
@@ -180,7 +184,7 @@ public void done() {
180184
opMeters.writeBatchSizeSummary.record(helper.count());
181185
helper.done();
182186
writeStatsRecorder.stop();
183-
} catch (RocksDBException e) {
187+
} catch (Throwable e) {
184188
logger.error("Write Batch commit failed", e);
185189
throw new KVEngineException("Batch commit failed", e);
186190
}
@@ -217,6 +221,14 @@ protected ColumnFamilyHandle cfHandle() {
217221
return cfHandle;
218222
}
219223

224+
private void flushIfNeededForMigration() {
225+
// ensure metadata changes are flushed atomically
226+
if (!helper.hasPendingMetadata()
227+
&& (helper.count() >= MIGRATION_FLUSH_OPS || helper.dataSize() >= MIGRATION_FLUSH_BYTES)) {
228+
helper.flush();
229+
}
230+
}
231+
220232
@Override
221233
protected ISyncContext.IRefresher newRefresher() {
222234
return syncContext.refresher();

0 commit comments

Comments
 (0)