Skip to content

Commit 00f52dc

Browse files
BorysTheDevmkaruza
andauthored
fix(evicition): Don't accumulate deleted bytes if there is no RSS evi… (#5908)
fix(evicition): Don't accumulate deleted bytes if there is no RSS eviction (#5894) We start to accumulate number of deleted bytes only if there is expected that we evict on RSS threshold. Once we stop expecting to evict and we have cleared number of tracked deleted bytes we stop with tracking. Fixes #5891 Co-authored-by: mkaruza <[email protected]>
1 parent cae4e15 commit 00f52dc

File tree

3 files changed

+74
-14
lines changed

3 files changed

+74
-14
lines changed

src/server/engine_shard.cc

Lines changed: 35 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -690,17 +690,19 @@ void EngineShard::RetireExpiredAndEvict() {
690690
auto [evicted_items, evicted_bytes] =
691691
db_slice.FreeMemWithEvictionStepAtomic(i, starting_segment_id, eviction_goal);
692692

693-
DVLOG(2) << "Heartbeat eviction: Expected to evict " << eviction_goal
694-
<< " bytes. Actually evicted " << evicted_items << " items, " << evicted_bytes
695-
<< " bytes. Max eviction per heartbeat: "
696-
<< GetFlag(FLAGS_max_eviction_per_heartbeat);
693+
VLOG(2) << "Heartbeat eviction: Expected to evict " << eviction_goal
694+
<< " bytes. Actually evicted " << evicted_items << " items, " << evicted_bytes
695+
<< " bytes. Max eviction per heartbeat: "
696+
<< GetFlag(FLAGS_max_eviction_per_heartbeat);
697697

698698
deleted_bytes += evicted_bytes;
699699
eviction_goal -= std::min(eviction_goal, evicted_bytes);
700700
}
701701
}
702702

703-
eviction_state_.deleted_bytes_before_rss_update += deleted_bytes;
703+
// Track deleted bytes only if we expect to lower memory
704+
if (eviction_state_.track_deleted_bytes)
705+
eviction_state_.deleted_bytes_before_rss_update += deleted_bytes;
704706
}
705707

706708
size_t EngineShard::CalculateEvictionBytes() {
@@ -718,9 +720,9 @@ size_t EngineShard::CalculateEvictionBytes() {
718720
size_t goal_bytes =
719721
CalculateHowManyBytesToEvictOnShard(limit, global_used_memory, shard_memory_budget_threshold);
720722

721-
VLOG_IF_EVERY_N(1, goal_bytes > 0, 50)
722-
<< "Used memory goal bytes: " << goal_bytes << ", used memory: " << global_used_memory
723-
<< ", memory limit: " << max_memory_limit;
723+
VLOG_IF(2, goal_bytes > 0) << "Used memory goal bytes: " << goal_bytes
724+
<< ", used memory: " << global_used_memory
725+
<< ", memory limit: " << max_memory_limit;
724726

725727
// Check for `enable_heartbeat_rss_eviction` flag since it dynamic. And reset
726728
// state if flag has changed.
@@ -739,26 +741,45 @@ size_t EngineShard::CalculateEvictionBytes() {
739741
auto decrease_delete_bytes_before_rss_update =
740742
std::min(deleted_bytes_before_rss_update,
741743
(global_rss_memory_at_prev_eviction - global_used_rss_memory) / shards_count);
742-
VLOG_EVERY_N(1, 50) << "deleted_bytes_before_rss_update: " << deleted_bytes_before_rss_update
743-
<< " decrease_delete_bytes_before_rss_update: "
744-
<< decrease_delete_bytes_before_rss_update;
744+
VLOG(2) << "deleted_bytes_before_rss_update: " << deleted_bytes_before_rss_update
745+
<< " decrease_delete_bytes_before_rss_update: "
746+
<< decrease_delete_bytes_before_rss_update;
745747
deleted_bytes_before_rss_update -= decrease_delete_bytes_before_rss_update;
746748
}
747749

748750
global_rss_memory_at_prev_eviction = global_used_rss_memory;
749751

752+
LOG_IF(DFATAL, global_used_rss_memory < (deleted_bytes_before_rss_update * shards_count))
753+
<< "RSS evicition underflow "
754+
<< "global_used_rss_memory: " << global_used_rss_memory
755+
<< " deleted_bytes_before_rss_update: " << deleted_bytes_before_rss_update;
756+
757+
// If we underflow use limit as used_memory
758+
size_t used_rss_memory_with_deleted_bytes =
759+
std::min(global_used_rss_memory - deleted_bytes_before_rss_update * shards_count, limit);
760+
750761
// Try to evict more bytes if we are close to the rss memory limit
751762
const size_t rss_goal_bytes = CalculateHowManyBytesToEvictOnShard(
752-
limit, global_used_rss_memory - deleted_bytes_before_rss_update * shards_count,
753-
shard_memory_budget_threshold);
763+
limit, used_rss_memory_with_deleted_bytes, shard_memory_budget_threshold);
764+
765+
// RSS evictions starts so we should start tracking deleted_bytes
766+
if (rss_goal_bytes) {
767+
eviction_state_.track_deleted_bytes = true;
768+
} else {
769+
// There is no RSS eviction goal and we have cleared tracked deleted bytes
770+
if (!deleted_bytes_before_rss_update) {
771+
eviction_state_.track_deleted_bytes = false;
772+
}
773+
}
754774

755-
VLOG_IF_EVERY_N(1, rss_goal_bytes > 0, 50)
775+
VLOG_IF(2, rss_goal_bytes > 0)
756776
<< "Rss memory goal bytes: " << rss_goal_bytes
757777
<< ", rss used memory: " << global_used_rss_memory << ", rss memory limit: " << limit
758778
<< ", deleted_bytes_before_rss_update: " << deleted_bytes_before_rss_update;
759779

760780
goal_bytes = std::max(goal_bytes, rss_goal_bytes);
761781
}
782+
762783
return goal_bytes;
763784
}
764785

src/server/engine_shard.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,7 @@ class EngineShard {
227227

228228
struct EvictionTaskState {
229229
bool rss_eviction_enabled_ = true;
230+
bool track_deleted_bytes = false;
230231
size_t deleted_bytes_before_rss_update = 0;
231232
size_t global_rss_memory_at_prev_eviction = 0;
232233
};

tests/dragonfly/memory_test.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,44 @@ async def test_eviction_on_rss_treshold(df_factory: DflyInstanceFactory, heartbe
243243
assert stats_info_after["evicted_keys"] == 0
244244

245245

246+
# Github issue #5891
247+
async def test_no_rss_eviction_overflow_on_expired_keys(df_factory: DflyInstanceFactory):
248+
max_memory = 256 * 1024**2 # 256MB
249+
df_server = df_factory.create(
250+
proactor_threads=1, cache_mode="yes", maxmemory=max_memory, vmodule="engine_shard=2"
251+
)
252+
df_server.start()
253+
client = df_server.client()
254+
255+
data_fill_size = int(0.20 * max_memory) # 20% of max_memory
256+
257+
val_size = 1024 * 50 # 50 kb for key
258+
num_keys = data_fill_size // val_size
259+
260+
for i in range(0, 5):
261+
pipe = client.pipeline(transaction=False)
262+
step_keys = num_keys + i * 10
263+
await pipe.execute_command("DEBUG", "POPULATE", step_keys, "key_1", val_size)
264+
await pipe.execute_command("DEBUG", "POPULATE", step_keys + i * 10, "key_2", val_size)
265+
for i in range(step_keys):
266+
if i % 2 == 0:
267+
await pipe.execute_command(f"EXPIRE key_1:{i} 1")
268+
else:
269+
await pipe.execute_command(f"EXPIRE key_2:{i} 1")
270+
await pipe.execute()
271+
await asyncio.sleep(2)
272+
273+
await client.execute_command("FLUSHALL")
274+
275+
# New keys should be added
276+
await client.execute_command("DEBUG", "POPULATE", num_keys, "key", val_size)
277+
# Wait so heartbeat eviction
278+
await asyncio.sleep(5)
279+
280+
keyspace_info = await client.info("keyspace")
281+
assert keyspace_info["db0"]["keys"] == num_keys
282+
283+
246284
@pytest.mark.asyncio
247285
async def test_throttle_on_commands_squashing_replies_bytes(df_factory: DflyInstanceFactory):
248286
df = df_factory.create(

0 commit comments

Comments
 (0)