diff --git a/src/defrag.c b/src/defrag.c index f25e102d51d..4766e16370b 100644 --- a/src/defrag.c +++ b/src/defrag.c @@ -8,8 +8,13 @@ * Copyright (c) 2020-Present, Redis Ltd. * All rights reserved. * + * Copyright (c) 2024-present, Valkey contributors. + * All rights reserved. + * * Licensed under your choice of the Redis Source Available License 2.0 * (RSALv2) or the Server Side Public License v1 (SSPLv1). + * + * Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information. */ #include "server.h" @@ -18,15 +23,106 @@ #ifdef HAVE_DEFRAG -typedef struct defragCtx { - void *privdata; +#define DEFRAG_CYCLE_US 500 /* Standard duration of defrag cycle (in microseconds) */ + +typedef enum { DEFRAG_NOT_DONE = 0, + DEFRAG_DONE = 1 } doneStatus; + +/* + * Defragmentation is performed in stages. Each stage is serviced by a stage function + * (defragStageFn). The stage function is passed a context (void*) to defrag. The contents of that + * context are unique to the particular stage - and may even be NULL for some stage functions. The + * same stage function can be used multiple times (for different stages) each having a different + * context. + * + * Parameters: + * endtime - This is the monotonic time that the function should end and return. This ensures + * a bounded latency due to defrag. + * ctx - A pointer to context which is unique to the stage function. + * + * Returns: + * - DEFRAG_DONE if the stage is complete + * - DEFRAG_NOT_DONE if there is more work to do + */ +typedef doneStatus (*defragStageFn)(void *ctx, monotime endtime); + +/* Function pointer type for freeing context in defragmentation stages. */ +typedef void (*defragStageContextFreeFn)(void *ctx); +typedef struct { + defragStageFn stage_fn; /* The function to be invoked for the stage */ + defragStageContextFreeFn ctx_free_fn; /* Function to free the context */ + void *ctx; /* Context, unique to the stage function */ +} StageDescriptor; + +/* Globals needed for the main defrag processing logic. + * Doesn't include variables specific to a stage or type of data. */ +struct DefragContext { + monotime start_cycle; /* Time of beginning of defrag cycle */ + long long start_defrag_hits; /* server.stat_active_defrag_hits captured at beginning of cycle */ + long long start_defrag_misses; /* server.stat_active_defrag_misses captured at beginning of cycle */ + float start_frag_pct; /* Fragmention percent of beginning of defrag cycle */ + float decay_rate; /* Defrag speed decay rate */ + + list *remaining_stages; /* List of stages which remain to be processed */ + listNode *current_stage; /* The list node of stage that's currently being processed */ + + long long timeproc_id; /* Eventloop ID of the timerproc (or AE_DELETED_EVENT_ID) */ + monotime timeproc_end_time; /* Ending time of previous timerproc execution */ + long timeproc_overage_us; /* A correction value if over target CPU percent */ +}; +static struct DefragContext defrag = {0, 0, 0, 0, 1.0f}; + +/* There are a number of stages which process a kvstore. To simplify this, a stage helper function + * `defragStageKvstoreHelper()` is defined. This function aids in iterating over the kvstore. It + * uses these definitions. + */ +/* State of the kvstore helper. The context passed to the kvstore helper MUST BEGIN + * with a kvstoreIterState (or be passed as NULL). */ +#define KVS_SLOT_DEFRAG_LUT -2 +#define KVS_SLOT_UNASSIGNED -1 +typedef struct { + kvstore *kvs; int slot; -} defragCtx; + unsigned long cursor; +} kvstoreIterState; +#define INIT_KVSTORE_STATE(kvs) ((kvstoreIterState){(kvs), KVS_SLOT_DEFRAG_LUT, 0}) + +/* The kvstore helper uses this function to perform tasks before continuing the iteration. For the + * main dictionary, large items are set aside and processed by this function before continuing with + * iteration over the kvstore. + * endtime - This is the monotonic time that the function should end and return. + * ctx - Context for functions invoked by the helper. If provided in the call to + * `defragStageKvstoreHelper()`, the `kvstoreIterState` portion (at the beginning) + * will be updated with the current kvstore iteration status. + * + * Returns: + * - DEFRAG_DONE if the pre-continue work is complete + * - DEFRAG_NOT_DONE if there is more work to do + */ +typedef doneStatus (*kvstoreHelperPreContinueFn)(void *ctx, monotime endtime); -typedef struct defragPubSubCtx { - kvstore *pubsub_channels; - dict *(*clientPubSubChannels)(client*); +typedef struct { + kvstoreIterState kvstate; + int dbid; + + /* When scanning a main kvstore, large elements are queued for later handling rather than + * causing a large latency spike while processing a hash table bucket. This list is only used + * for stage: "defragStageDbKeys". It will only contain values for the current kvstore being + * defragged. + * Note that this is a list of key names. It's possible that the key may be deleted or modified + * before "later" and we will search by key name to find the entry when we defrag the item later. */ + list *defrag_later; + unsigned long defrag_later_cursor; +} defragKeysCtx; +static_assert(offsetof(defragKeysCtx, kvstate) == 0, "defragStageKvstoreHelper requires this"); + +/* Context for pubsub kvstores */ +typedef dict *(*getClientChannelsFn)(client *); +typedef struct { + kvstoreIterState kvstate; + getClientChannelsFn getPubSubChannels; } defragPubSubCtx; +static_assert(offsetof(defragPubSubCtx, kvstate) == 0, "defragStageKvstoreHelper requires this"); /* this method was added to jemalloc in order to help us understand which * pointers are worthwhile moving and which aren't */ @@ -336,36 +432,6 @@ void activeDefragHfieldDict(dict *d) { } /* Defrag a list of ptr, sds or robj string values */ -void activeDefragList(list *l, int val_type) { - listNode *ln, *newln; - for (ln = l->head; ln; ln = ln->next) { - if ((newln = activeDefragAlloc(ln))) { - if (newln->prev) - newln->prev->next = newln; - else - l->head = newln; - if (newln->next) - newln->next->prev = newln; - else - l->tail = newln; - ln = newln; - } - if (val_type == DEFRAG_SDS_DICT_VAL_IS_SDS) { - sds newsds, sdsele = ln->value; - if ((newsds = activeDefragSds(sdsele))) - ln->value = newsds; - } else if (val_type == DEFRAG_SDS_DICT_VAL_IS_STROB) { - robj *newele, *ele = ln->value; - if ((newele = activeDefragStringOb(ele))) - ln->value = newele; - } else if (val_type == DEFRAG_SDS_DICT_VAL_VOID_PTR) { - void *newptr, *ptr = ln->value; - if ((newptr = activeDefragAlloc(ptr))) - ln->value = newptr; - } - } -} - void activeDefragQuickListNode(quicklist *ql, quicklistNode **node_ref) { quicklistNode *newnode, *node = *node_ref; unsigned char *newzl; @@ -395,13 +461,18 @@ void activeDefragQuickListNodes(quicklist *ql) { /* when the value has lots of elements, we want to handle it later and not as * part of the main dictionary scan. this is needed in order to prevent latency * spikes when handling large items */ -void defragLater(redisDb *db, dictEntry *kde) { +void defragLater(defragKeysCtx *ctx, dictEntry *kde) { + if (!ctx->defrag_later) { + ctx->defrag_later = listCreate(); + listSetFreeMethod(ctx->defrag_later, sdsfreegeneric); + ctx->defrag_later_cursor = 0; + } sds key = sdsdup(dictGetKey(kde)); - listAddNodeTail(db->defrag_later, key); + listAddNodeTail(ctx->defrag_later, key); } /* returns 0 if no more work needs to be been done, and 1 if time is up and more work is needed. */ -long scanLaterList(robj *ob, unsigned long *cursor, long long endtime) { +long scanLaterList(robj *ob, unsigned long *cursor, monotime endtime) { quicklist *ql = ob->ptr; quicklistNode *node; long iterations = 0; @@ -427,7 +498,7 @@ long scanLaterList(robj *ob, unsigned long *cursor, long long endtime) { activeDefragQuickListNode(ql, &node); server.stat_active_defrag_scanned++; if (++iterations > 128 && !bookmark_failed) { - if (ustime() > endtime) { + if (getMonotonicUs() > endtime) { if (!quicklistBookmarkCreate(&ql, "_AD", node)) { bookmark_failed = 1; } else { @@ -495,19 +566,19 @@ void scanLaterHash(robj *ob, unsigned long *cursor) { *cursor = dictScanDefrag(d, *cursor, activeDefragHfieldDictCallback, &defragfns, d); } -void defragQuicklist(redisDb *db, dictEntry *kde) { +void defragQuicklist(defragKeysCtx *ctx, dictEntry *kde) { robj *ob = dictGetVal(kde); quicklist *ql = ob->ptr, *newql; serverAssert(ob->type == OBJ_LIST && ob->encoding == OBJ_ENCODING_QUICKLIST); if ((newql = activeDefragAlloc(ql))) ob->ptr = ql = newql; if (ql->len > server.active_defrag_max_scan_fields) - defragLater(db, kde); + defragLater(ctx, kde); else activeDefragQuickListNodes(ql); } -void defragZsetSkiplist(redisDb *db, dictEntry *kde) { +void defragZsetSkiplist(defragKeysCtx *ctx, dictEntry *kde) { robj *ob = dictGetVal(kde); zset *zs = (zset*)ob->ptr; zset *newzs; @@ -523,7 +594,7 @@ void defragZsetSkiplist(redisDb *db, dictEntry *kde) { if ((newheader = activeDefragAlloc(zs->zsl->header))) zs->zsl->header = newheader; if (dictSize(zs->dict) > server.active_defrag_max_scan_fields) - defragLater(db, kde); + defragLater(ctx, kde); else { dictIterator *di = dictGetIterator(zs->dict); while((de = dictNext(di)) != NULL) { @@ -536,13 +607,13 @@ void defragZsetSkiplist(redisDb *db, dictEntry *kde) { zs->dict = newdict; } -void defragHash(redisDb *db, dictEntry *kde) { +void defragHash(defragKeysCtx *ctx, dictEntry *kde) { robj *ob = dictGetVal(kde); dict *d, *newd; serverAssert(ob->type == OBJ_HASH && ob->encoding == OBJ_ENCODING_HT); d = ob->ptr; if (dictSize(d) > server.active_defrag_max_scan_fields) - defragLater(db, kde); + defragLater(ctx, kde); else activeDefragHfieldDict(d); /* defrag the dict struct and tables */ @@ -550,13 +621,13 @@ void defragHash(redisDb *db, dictEntry *kde) { ob->ptr = newd; } -void defragSet(redisDb *db, dictEntry *kde) { +void defragSet(defragKeysCtx *ctx, dictEntry *kde) { robj *ob = dictGetVal(kde); dict *d, *newd; serverAssert(ob->type == OBJ_SET && ob->encoding == OBJ_ENCODING_HT); d = ob->ptr; if (dictSize(d) > server.active_defrag_max_scan_fields) - defragLater(db, kde); + defragLater(ctx, kde); else activeDefragSdsDict(d, DEFRAG_SDS_DICT_NO_VAL); /* defrag the dict struct and tables */ @@ -576,7 +647,7 @@ int defragRaxNode(raxNode **noderef) { } /* returns 0 if no more work needs to be been done, and 1 if time is up and more work is needed. */ -int scanLaterStreamListpacks(robj *ob, unsigned long *cursor, long long endtime) { +int scanLaterStreamListpacks(robj *ob, unsigned long *cursor, monotime endtime) { static unsigned char last[sizeof(streamID)]; raxIterator ri; long iterations = 0; @@ -613,7 +684,7 @@ int scanLaterStreamListpacks(robj *ob, unsigned long *cursor, long long endtime) raxSetData(ri.node, ri.data=newdata); server.stat_active_defrag_scanned++; if (++iterations > 128) { - if (ustime() > endtime) { + if (getMonotonicUs() > endtime) { serverAssert(ri.key_len==sizeof(last)); memcpy(last,ri.key,ri.key_len); raxStop(&ri); @@ -703,7 +774,7 @@ void* defragStreamConsumerGroup(raxIterator *ri, void *privdata) { return NULL; } -void defragStream(redisDb *db, dictEntry *kde) { +void defragStream(defragKeysCtx *ctx, dictEntry *kde) { robj *ob = dictGetVal(kde); serverAssert(ob->type == OBJ_STREAM && ob->encoding == OBJ_ENCODING_STREAM); stream *s = ob->ptr, *news; @@ -716,7 +787,7 @@ void defragStream(redisDb *db, dictEntry *kde) { rax *newrax = activeDefragAlloc(s->rax); if (newrax) s->rax = newrax; - defragLater(db, kde); + defragLater(ctx, kde); } else defragRadixTree(&s->rax, 1, NULL, NULL); @@ -727,24 +798,25 @@ void defragStream(redisDb *db, dictEntry *kde) { /* Defrag a module key. This is either done immediately or scheduled * for later. Returns then number of pointers defragged. */ -void defragModule(redisDb *db, dictEntry *kde) { +void defragModule(defragKeysCtx *ctx, redisDb *db, dictEntry *kde) { robj *obj = dictGetVal(kde); serverAssert(obj->type == OBJ_MODULE); robj keyobj; initStaticStringObject(keyobj, dictGetKey(kde)); if (!moduleDefragValue(&keyobj, obj, db->id)) - defragLater(db, kde); + defragLater(ctx, kde); } /* for each key we scan in the main dict, this function will attempt to defrag * all the various pointers it has. */ -void defragKey(defragCtx *ctx, dictEntry *de) { +void defragKey(defragKeysCtx *ctx, dictEntry *de) { + redisDb *db = &server.db[ctx->dbid]; + int slot = ctx->kvstate.slot; sds keysds = dictGetKey(de); robj *newob, *ob = dictGetVal(de); unsigned char *newzl; sds newsds; - redisDb *db = ctx->privdata; - int slot = ctx->slot; + /* Try to defrag the key name. */ newsds = activeDefragSds(keysds); if (newsds) { @@ -781,7 +853,7 @@ void defragKey(defragCtx *ctx, dictEntry *de) { /* Already handled in activeDefragStringOb. */ } else if (ob->type == OBJ_LIST) { if (ob->encoding == OBJ_ENCODING_QUICKLIST) { - defragQuicklist(db, de); + defragQuicklist(ctx, de); } else if (ob->encoding == OBJ_ENCODING_LISTPACK) { if ((newzl = activeDefragAlloc(ob->ptr))) ob->ptr = newzl; @@ -790,7 +862,7 @@ void defragKey(defragCtx *ctx, dictEntry *de) { } } else if (ob->type == OBJ_SET) { if (ob->encoding == OBJ_ENCODING_HT) { - defragSet(db, de); + defragSet(ctx, de); } else if (ob->encoding == OBJ_ENCODING_INTSET || ob->encoding == OBJ_ENCODING_LISTPACK) { @@ -805,7 +877,7 @@ void defragKey(defragCtx *ctx, dictEntry *de) { if ((newzl = activeDefragAlloc(ob->ptr))) ob->ptr = newzl; } else if (ob->encoding == OBJ_ENCODING_SKIPLIST) { - defragZsetSkiplist(db, de); + defragZsetSkiplist(ctx, de); } else { serverPanic("Unknown sorted set encoding"); } @@ -820,23 +892,23 @@ void defragKey(defragCtx *ctx, dictEntry *de) { if ((newzl = activeDefragAlloc(lpt->lp))) lpt->lp = newzl; } else if (ob->encoding == OBJ_ENCODING_HT) { - defragHash(db, de); + defragHash(ctx, de); } else { serverPanic("Unknown hash encoding"); } } else if (ob->type == OBJ_STREAM) { - defragStream(db, de); + defragStream(ctx, de); } else if (ob->type == OBJ_MODULE) { - defragModule(db, de); + defragModule(ctx,db, de); } else { serverPanic("Unknown object type"); } } /* Defrag scan callback for the main db dictionary. */ -void defragScanCallback(void *privdata, const dictEntry *de) { +static void dbKeysScanCallback(void *privdata, const dictEntry *de) { long long hits_before = server.stat_active_defrag_hits; - defragKey((defragCtx*)privdata, (dictEntry*)de); + defragKey((defragKeysCtx *)privdata, (dictEntry *)de); if (server.stat_active_defrag_hits != hits_before) server.stat_active_defrag_key_hits++; else @@ -880,9 +952,8 @@ float getAllocatorFragmentation(size_t *out_frag_bytes) { /* Defrag scan callback for the pubsub dictionary. */ void defragPubsubScanCallback(void *privdata, const dictEntry *de) { - defragCtx *ctx = privdata; - defragPubSubCtx *pubsub_ctx = ctx->privdata; - kvstore *pubsub_channels = pubsub_ctx->pubsub_channels; + defragPubSubCtx *ctx = privdata; + kvstore *pubsub_channels = ctx->kvstate.kvs; robj *newchannel, *channel = dictGetKey(de); dict *newclients, *clients = dictGetVal(de); @@ -890,7 +961,7 @@ void defragPubsubScanCallback(void *privdata, const dictEntry *de) { serverAssert(channel->refcount == (int)dictSize(clients) + 1); newchannel = activeDefragStringObEx(channel, dictSize(clients) + 1); if (newchannel) { - kvstoreDictSetKey(pubsub_channels, ctx->slot, (dictEntry*)de, newchannel); + kvstoreDictSetKey(pubsub_channels, ctx->kvstate.slot, (dictEntry*)de, newchannel); /* The channel name is shared by the client's pubsub(shard) and server's * pubsub(shard), after defraging the channel name, we need to update @@ -899,36 +970,24 @@ void defragPubsubScanCallback(void *privdata, const dictEntry *de) { dictEntry *clientde; while((clientde = dictNext(di)) != NULL) { client *c = dictGetKey(clientde); - dictEntry *pubsub_channel = dictFind(pubsub_ctx->clientPubSubChannels(c), newchannel); + dict *client_channels = ctx->getPubSubChannels(c); + dictEntry *pubsub_channel = dictFind(client_channels, newchannel); serverAssert(pubsub_channel); - dictSetKey(pubsub_ctx->clientPubSubChannels(c), pubsub_channel, newchannel); + dictSetKey(ctx->getPubSubChannels(c), pubsub_channel, newchannel); } dictReleaseIterator(di); } /* Try to defrag the dictionary of clients that is stored as the value part. */ if ((newclients = dictDefragTables(clients))) - kvstoreDictSetVal(pubsub_channels, ctx->slot, (dictEntry*)de, newclients); + kvstoreDictSetVal(pubsub_channels, ctx->kvstate.slot, (dictEntry *)de, newclients); server.stat_active_defrag_scanned++; } -/* We may need to defrag other globals, one small allocation can hold a full allocator run. - * so although small, it is still important to defrag these */ -void defragOtherGlobals(void) { - - /* there are many more pointers to defrag (e.g. client argv, output / aof buffers, etc. - * but we assume most of these are short lived, we only need to defrag allocations - * that remain static for a long time */ - activeDefragSdsDict(evalScriptsDict(), DEFRAG_SDS_DICT_VAL_LUA_SCRIPT); - moduleDefragGlobals(); - kvstoreDictLUTDefrag(server.pubsub_channels, dictDefragTables); - kvstoreDictLUTDefrag(server.pubsubshard_channels, dictDefragTables); -} - /* returns 0 more work may or may not be needed (see non-zero cursor), * and 1 if time is up and more work is needed. */ -int defragLaterItem(dictEntry *de, unsigned long *cursor, long long endtime, int dbid) { +int defragLaterItem(dictEntry *de, unsigned long *cursor, monotime endtime, int dbid) { if (de) { robj *ob = dictGetVal(de); if (ob->type == OBJ_LIST) { @@ -942,9 +1001,10 @@ int defragLaterItem(dictEntry *de, unsigned long *cursor, long long endtime, int } else if (ob->type == OBJ_STREAM) { return scanLaterStreamListpacks(ob, cursor, endtime); } else if (ob->type == OBJ_MODULE) { + long long endtimeWallClock = ustime() + (endtime - getMonotonicUs()); robj keyobj; initStaticStringObject(keyobj, dictGetKey(de)); - return moduleLateDefrag(&keyobj, ob, cursor, endtime, dbid); + return moduleLateDefrag(&keyobj, ob, cursor, endtimeWallClock, dbid); } else { *cursor = 0; /* object type may have changed since we schedule it for later */ } @@ -954,78 +1014,55 @@ int defragLaterItem(dictEntry *de, unsigned long *cursor, long long endtime, int return 0; } -/* static variables serving defragLaterStep to continue scanning a key from were we stopped last time. */ -static sds defrag_later_current_key = NULL; -static unsigned long defrag_later_cursor = 0; +static int defragIsRunning(void) { + return (defrag.timeproc_id > 0); +} + +/* A kvstoreHelperPreContinueFn */ +static doneStatus defragLaterStep(void *ctx, monotime endtime) { + defragKeysCtx *defrag_keys_ctx = ctx; -/* returns 0 if no more work needs to be been done, and 1 if time is up and more work is needed. */ -int defragLaterStep(redisDb *db, int slot, long long endtime) { unsigned int iterations = 0; unsigned long long prev_defragged = server.stat_active_defrag_hits; unsigned long long prev_scanned = server.stat_active_defrag_scanned; - long long key_defragged; - do { - /* if we're not continuing a scan from the last call or loop, start a new one */ - if (!defrag_later_cursor) { - listNode *head = listFirst(db->defrag_later); - - /* Move on to next key */ - if (defrag_later_current_key) { - serverAssert(defrag_later_current_key == head->value); - listDelNode(db->defrag_later, head); - defrag_later_cursor = 0; - defrag_later_current_key = NULL; - } + while (defrag_keys_ctx->defrag_later && listLength(defrag_keys_ctx->defrag_later) > 0) { + listNode *head = listFirst(defrag_keys_ctx->defrag_later); + sds key = head->value; + dictEntry *de = kvstoreDictFind(defrag_keys_ctx->kvstate.kvs, defrag_keys_ctx->kvstate.slot, key); + + long long key_defragged = server.stat_active_defrag_hits; + int timeout = (defragLaterItem(de, &defrag_keys_ctx->defrag_later_cursor, endtime, defrag_keys_ctx->dbid) == 1); + if (key_defragged != server.stat_active_defrag_hits) { + server.stat_active_defrag_key_hits++; + } else { + server.stat_active_defrag_key_misses++; + } - /* stop if we reached the last one. */ - head = listFirst(db->defrag_later); - if (!head) - return 0; + if (timeout) break; - /* start a new key */ - defrag_later_current_key = head->value; - defrag_later_cursor = 0; + if (defrag_keys_ctx->defrag_later_cursor == 0) { + /* the item is finished, move on */ + listDelNode(defrag_keys_ctx->defrag_later, head); } - /* each time we enter this function we need to fetch the key from the dict again (if it still exists) */ - dictEntry *de = kvstoreDictFind(db->keys, slot, defrag_later_current_key); - key_defragged = server.stat_active_defrag_hits; - do { - int quit = 0; - if (defragLaterItem(de, &defrag_later_cursor, endtime,db->id)) - quit = 1; /* time is up, we didn't finish all the work */ - - /* Once in 16 scan iterations, 512 pointer reallocations, or 64 fields - * (if we have a lot of pointers in one hash bucket, or rehashing), - * check if we reached the time limit. */ - if (quit || (++iterations > 16 || - server.stat_active_defrag_hits - prev_defragged > 512 || - server.stat_active_defrag_scanned - prev_scanned > 64)) { - if (quit || ustime() > endtime) { - if(key_defragged != server.stat_active_defrag_hits) - server.stat_active_defrag_key_hits++; - else - server.stat_active_defrag_key_misses++; - return 1; - } - iterations = 0; - prev_defragged = server.stat_active_defrag_hits; - prev_scanned = server.stat_active_defrag_scanned; - } - } while(defrag_later_cursor); - if(key_defragged != server.stat_active_defrag_hits) - server.stat_active_defrag_key_hits++; - else - server.stat_active_defrag_key_misses++; - } while(1); + if (++iterations > 16 || server.stat_active_defrag_hits - prev_defragged > 512 || + server.stat_active_defrag_scanned - prev_scanned > 64) { + if (getMonotonicUs() > endtime) break; + iterations = 0; + prev_defragged = server.stat_active_defrag_hits; + prev_scanned = server.stat_active_defrag_scanned; + } + } + + return (!defrag_keys_ctx->defrag_later || listLength(defrag_keys_ctx->defrag_later) == 0) ? DEFRAG_DONE : DEFRAG_NOT_DONE; } #define INTERPOLATE(x, x1, x2, y1, y2) ( (y1) + ((x)-(x1)) * ((y2)-(y1)) / ((x2)-(x1)) ) #define LIMIT(y, min, max) ((y)<(min)? min: ((y)>(max)? max: (y))) /* decide if defrag is needed, and at what CPU effort to invest in it */ -void computeDefragCycles(float decay_rate) { +void computeDefragCycles(void) { size_t frag_bytes; float frag_pct = getAllocatorFragmentation(&frag_bytes); /* If we're not already running, and below the threshold, exit. */ @@ -1041,7 +1078,7 @@ void computeDefragCycles(float decay_rate) { server.active_defrag_threshold_upper, server.active_defrag_cycle_min, server.active_defrag_cycle_max); - cpu_pct *= decay_rate; + cpu_pct *= defrag.decay_rate; cpu_pct = LIMIT(cpu_pct, server.active_defrag_cycle_min, server.active_defrag_cycle_max); @@ -1052,246 +1089,451 @@ void computeDefragCycles(float decay_rate) { if (cpu_pct > server.active_defrag_running || server.active_defrag_configuration_changed) { - server.active_defrag_running = cpu_pct; server.active_defrag_configuration_changed = 0; - serverLog(LL_VERBOSE, - "Starting active defrag, frag=%.0f%%, frag_bytes=%zu, cpu=%d%%", - frag_pct, frag_bytes, cpu_pct); + if (defragIsRunning()) { + serverLog(LL_VERBOSE, "Changing active defrag CPU, frag=%.0f%%, frag_bytes=%zu, cpu=%d%%", + frag_pct, frag_bytes, cpu_pct); + } else { + serverLog(LL_VERBOSE, + "Starting active defrag, frag=%.0f%%, frag_bytes=%zu, cpu=%d%%", + frag_pct, frag_bytes, cpu_pct); + } + server.active_defrag_running = cpu_pct; } } -/* Perform incremental defragmentation work from the serverCron. - * This works in a similar way to activeExpireCycle, in the sense that - * we do incremental work across calls. */ -void activeDefragCycle(void) { - static int slot = -1; - static int current_db = -1; - static int defrag_later_item_in_progress = 0; - static int defrag_stage = 0; - static unsigned long defrag_cursor = 0; - static redisDb *db = NULL; - static long long start_scan, start_hits, start_misses; - static float start_frag_pct; - static float decay_rate = 1.0f; +/* This helper function handles most of the work for iterating over a kvstore. 'privdata', if + * provided, MUST begin with 'kvstoreIterState' and this part is automatically updated by this + * function during the iteration. */ +static doneStatus defragStageKvstoreHelper(monotime endtime, + void *ctx, + dictScanFunction scan_fn, + kvstoreHelperPreContinueFn precontinue_fn, + dictDefragFunctions *defragfns) +{ unsigned int iterations = 0; unsigned long long prev_defragged = server.stat_active_defrag_hits; unsigned long long prev_scanned = server.stat_active_defrag_scanned; - long long start, timelimit, endtime; - mstime_t latency; - int all_stages_finished = 0; - int quit = 0; + kvstoreIterState *state = (kvstoreIterState*)ctx; - if (!server.active_defrag_enabled) { - if (server.active_defrag_running) { - /* if active defrag was disabled mid-run, start from fresh next time. */ - server.active_defrag_running = 0; - server.active_defrag_configuration_changed = 0; - if (db) - listEmpty(db->defrag_later); - defrag_later_current_key = NULL; - defrag_later_cursor = 0; - current_db = -1; - defrag_stage = 0; - defrag_cursor = 0; - slot = -1; - defrag_later_item_in_progress = 0; - db = NULL; - moduleDefragEnd(); - goto update_metrics; + if (state->slot == KVS_SLOT_DEFRAG_LUT) { + /* Before we start scanning the kvstore, handle the main structures */ + do { + state->cursor = kvstoreDictLUTDefrag(state->kvs, state->cursor, dictDefragTables); + if (getMonotonicUs() >= endtime) return DEFRAG_NOT_DONE; + } while (state->cursor != 0); + state->slot = KVS_SLOT_UNASSIGNED; + } + + while (1) { + if (++iterations > 16 || server.stat_active_defrag_hits - prev_defragged > 512 || server.stat_active_defrag_scanned - prev_scanned > 64) { + if (getMonotonicUs() >= endtime) break; + iterations = 0; + prev_defragged = server.stat_active_defrag_hits; + prev_scanned = server.stat_active_defrag_scanned; } - return; + + if (precontinue_fn) { + if (precontinue_fn(ctx, endtime) == DEFRAG_NOT_DONE) return DEFRAG_NOT_DONE; + } + + if (!state->cursor) { + /* If there's no cursor, we're ready to begin a new kvstore slot. */ + if (state->slot == KVS_SLOT_UNASSIGNED) { + state->slot = kvstoreGetFirstNonEmptyDictIndex(state->kvs); + } else { + state->slot = kvstoreGetNextNonEmptyDictIndex(state->kvs, state->slot); + } + + if (state->slot == KVS_SLOT_UNASSIGNED) return DEFRAG_DONE; + } + + /* Whatever privdata's actual type, this function requires that it begins with kvstoreIterState. */ + state->cursor = kvstoreDictScanDefrag(state->kvs, state->slot, state->cursor, + scan_fn, defragfns, ctx); } - if (hasActiveChildProcess()) - return; /* Defragging memory while there's a fork will just do damage. */ + return DEFRAG_NOT_DONE; +} - /* Once a second, check if the fragmentation justfies starting a scan - * or making it more aggressive. */ - run_with_period(1000) { - computeDefragCycles(decay_rate); +static doneStatus defragStageDbKeys(void *ctx, monotime endtime) { + defragKeysCtx *defrag_keys_ctx = ctx; + redisDb *db = &server.db[defrag_keys_ctx->dbid]; + if (db->keys != defrag_keys_ctx->kvstate.kvs) { + /* There has been a change of the kvs (flushdb, swapdb, etc.). Just complete the stage. */ + return DEFRAG_DONE; } - /* Normally it is checked once a second, but when there is a configuration - * change, we want to check it as soon as possible. */ - if (server.active_defrag_configuration_changed) { - computeDefragCycles(decay_rate); - server.active_defrag_configuration_changed = 0; + /* Note: for DB keys, we use the start/finish callback to fix an expires table entry if + * the main DB entry has been moved. */ + static dictDefragFunctions defragfns = { + .defragAlloc = activeDefragAlloc, + .defragKey = NULL, /* Handled by dbKeysScanCallback */ + .defragVal = NULL, /* Handled by dbKeysScanCallback */ + }; + + return defragStageKvstoreHelper(endtime, ctx, + dbKeysScanCallback, defragLaterStep, &defragfns); +} + +static doneStatus defragStageExpiresKvstore(void *ctx, monotime endtime) { + defragKeysCtx *defrag_keys_ctx = ctx; + redisDb *db = &server.db[defrag_keys_ctx->dbid]; + if (db->keys != defrag_keys_ctx->kvstate.kvs) { + /* There has been a change of the kvs (flushdb, swapdb, etc.). Just complete the stage. */ + return DEFRAG_DONE; } - if (!server.active_defrag_running) - return; + static dictDefragFunctions defragfns = { + .defragAlloc = activeDefragAlloc, + .defragKey = NULL, /* Not needed for expires (just a ref) */ + .defragVal = NULL, /* Not needed for expires (no value) */ + }; + return defragStageKvstoreHelper(endtime, ctx, + scanCallbackCountScanned, NULL, &defragfns); +} - /* See activeExpireCycle for how timelimit is handled. */ - start = ustime(); - timelimit = 1000000*server.active_defrag_running/server.hz/100; - if (timelimit <= 0) timelimit = 1; - endtime = start + timelimit; - latencyStartMonitor(latency); +static doneStatus defragStagePubsubKvstore(void *ctx, monotime endtime) { + static dictDefragFunctions defragfns = { + .defragAlloc = activeDefragAlloc, + .defragKey = NULL, /* Handled by defragPubsubScanCallback */ + .defragVal = NULL, /* Not needed for expires (no value) */ + }; - dictDefragFunctions defragfns = {.defragAlloc = activeDefragAlloc}; - do { - /* if we're not continuing a scan from the last call or loop, start a new one */ - if (!defrag_stage && !defrag_cursor && (slot < 0)) { - /* finish any leftovers from previous db before moving to the next one */ - if (db && defragLaterStep(db, slot, endtime)) { - quit = 1; /* time is up, we didn't finish all the work */ - break; /* this will exit the function and we'll continue on the next cycle */ - } + return defragStageKvstoreHelper(endtime, ctx, + defragPubsubScanCallback, NULL, &defragfns); +} - if (current_db == -1) { - moduleDefragStart(); - } +static doneStatus defragLuaScripts(void *ctx, monotime endtime) { + UNUSED(endtime); + UNUSED(ctx); + activeDefragSdsDict(evalScriptsDict(), DEFRAG_SDS_DICT_VAL_LUA_SCRIPT); + return DEFRAG_DONE; +} - /* Move on to next database, and stop if we reached the last one. */ - if (++current_db >= server.dbnum) { - /* defrag other items not part of the db / keys */ - defragOtherGlobals(); - - long long now = ustime(); - size_t frag_bytes; - float frag_pct = getAllocatorFragmentation(&frag_bytes); - serverLog(LL_VERBOSE, - "Active defrag done in %dms, reallocated=%d, frag=%.0f%%, frag_bytes=%zu", - (int)((now - start_scan)/1000), (int)(server.stat_active_defrag_hits - start_hits), frag_pct, frag_bytes); - - start_scan = now; - current_db = -1; - defrag_stage = 0; - defrag_cursor = 0; - slot = -1; - defrag_later_item_in_progress = 0; - db = NULL; - server.active_defrag_running = 0; - - long long last_hits = server.stat_active_defrag_hits - start_hits; - long long last_misses = server.stat_active_defrag_misses - start_misses; - float last_frag_pct_change = start_frag_pct - frag_pct; - /* When defragmentation efficiency is low, we gradually reduce the - * speed for the next cycle to avoid CPU waste. However, in the - * following two cases, we keep the normal speed: - * 1) If the fragmentation percentage has increased or decreased by more than 2%. - * 2) If the fragmentation percentage decrease is small, but hits are above 1%, - * we still keep the normal speed. */ - if (fabs(last_frag_pct_change) > 2 || - (last_frag_pct_change < 0 && last_hits >= (last_hits + last_misses) * 0.01)) - { - decay_rate = 1.0f; - } else { - decay_rate *= 0.9; - } +static doneStatus defragModuleGlobals(void *ctx, monotime endtime) { + UNUSED(endtime); + UNUSED(ctx); + moduleDefragGlobals(); + return DEFRAG_DONE; +} - moduleDefragEnd(); +static void freeDefragKeysContext(void *ctx) { + defragKeysCtx *defrag_keys_ctx = ctx; + if (defrag_keys_ctx->defrag_later) { + listRelease(defrag_keys_ctx->defrag_later); + } + zfree(defrag_keys_ctx); +} - computeDefragCycles(decay_rate); /* if another scan is needed, start it right away */ - if (server.active_defrag_running != 0 && ustime() < endtime) - continue; - break; - } - else if (current_db==0) { - /* Start a scan from the first database. */ - start_scan = ustime(); - start_hits = server.stat_active_defrag_hits; - start_misses = server.stat_active_defrag_misses; - start_frag_pct = getAllocatorFragmentation(NULL); - } +static void freeDefragContext(void *ptr) { + StageDescriptor *stage = ptr; + if (stage->ctx_free_fn) + stage->ctx_free_fn(stage->ctx); + zfree(stage); +} + +static void addDefragStage(defragStageFn stage_fn, defragStageContextFreeFn ctx_free_fn, void *ctx) { + StageDescriptor *stage = zmalloc(sizeof(StageDescriptor)); + stage->stage_fn = stage_fn; + stage->ctx_free_fn = ctx_free_fn; + stage->ctx = ctx; + listAddNodeTail(defrag.remaining_stages, stage); +} + +/* Updates the defrag decay rate based on the observed effectiveness of the defrag process. + * The decay rate is used to gradually slow down defrag when it's not being effective. */ +static void updateDefragDecayRate(float frag_pct) { + long long last_hits = server.stat_active_defrag_hits - defrag.start_defrag_hits; + long long last_misses = server.stat_active_defrag_misses - defrag.start_defrag_misses; + float last_frag_pct_change = defrag.start_frag_pct - frag_pct; + /* When defragmentation efficiency is low, we gradually reduce the + * speed for the next cycle to avoid CPU waste. However, in the + * following two cases, we keep the normal speed: + * 1) If the fragmentation percentage has increased or decreased by more than 2%. + * 2) If the fragmentation percentage decrease is small, but hits are above 1%, + * we still keep the normal speed. */ + if (fabs(last_frag_pct_change) > 2 || + (last_frag_pct_change < 0 && last_hits >= (last_hits + last_misses) * 0.01)) + { + defrag.decay_rate = 1.0f; + } else { + defrag.decay_rate *= 0.9; + } +} + +/* Called at the end of a complete defrag cycle, or when defrag is terminated */ +static void endDefragCycle(int normal_termination) { + if (normal_termination) { + /* For normal termination, we expect... */ + serverAssert(!defrag.current_stage); + serverAssert(listLength(defrag.remaining_stages) == 0); + } else { + /* Defrag is being terminated abnormally */ + aeDeleteTimeEvent(server.el, defrag.timeproc_id); - db = &server.db[current_db]; - kvstoreDictLUTDefrag(db->keys, dictDefragTables); - kvstoreDictLUTDefrag(db->expires, dictDefragTables); - defrag_stage = 0; - defrag_cursor = 0; - slot = -1; - defrag_later_item_in_progress = 0; + if (defrag.current_stage) { + listDelNode(defrag.remaining_stages, defrag.current_stage); + defrag.current_stage = NULL; } + } + defrag.timeproc_id = AE_DELETED_EVENT_ID; - /* This array of structures holds the parameters for all defragmentation stages. */ - typedef struct defragStage { - kvstore *kvs; - dictScanFunction *scanfn; - void *privdata; - } defragStage; - defragStage defrag_stages[] = { - {db->keys, defragScanCallback, db}, - {db->expires, scanCallbackCountScanned, NULL}, - {server.pubsub_channels, defragPubsubScanCallback, - &(defragPubSubCtx){server.pubsub_channels, getClientPubSubChannels}}, - {server.pubsubshard_channels, defragPubsubScanCallback, - &(defragPubSubCtx){server.pubsubshard_channels, getClientPubSubShardChannels}}, - }; - do { - int num_stages = sizeof(defrag_stages) / sizeof(defrag_stages[0]); - serverAssert(defrag_stage < num_stages); - defragStage *current_stage = &defrag_stages[defrag_stage]; - - /* before scanning the next bucket, see if we have big keys left from the previous bucket to scan */ - if (defragLaterStep(db, slot, endtime)) { - quit = 1; /* time is up, we didn't finish all the work */ - break; /* this will exit the function and we'll continue on the next cycle */ - } + listRelease(defrag.remaining_stages); + defrag.remaining_stages = NULL; - if (!defrag_later_item_in_progress) { - /* Continue defragmentation from the previous stage. - * If slot is -1, it means this stage starts from the first non-empty slot. */ - if (slot == -1) slot = kvstoreGetFirstNonEmptyDictIndex(current_stage->kvs); - defrag_cursor = kvstoreDictScanDefrag(current_stage->kvs, slot, defrag_cursor, - current_stage->scanfn, &defragfns, &(defragCtx){current_stage->privdata, slot}); - } + size_t frag_bytes; + float frag_pct = getAllocatorFragmentation(&frag_bytes); + serverLog(LL_VERBOSE, "Active defrag done in %dms, reallocated=%d, frag=%.0f%%, frag_bytes=%zu", + (int)elapsedMs(defrag.start_cycle), (int)(server.stat_active_defrag_hits - defrag.start_defrag_hits), + frag_pct, frag_bytes); - if (!defrag_cursor) { - /* Move to the next slot only if regular and large item scanning has been completed. */ - if (listLength(db->defrag_later) > 0) { - defrag_later_item_in_progress = 1; - continue; - } + server.stat_total_active_defrag_time += elapsedUs(server.stat_last_active_defrag_time); + server.stat_last_active_defrag_time = 0; + server.active_defrag_running = 0; - /* Move to the next slot in the current stage. If we've reached the end, move to the next stage. */ - if ((slot = kvstoreGetNextNonEmptyDictIndex(current_stage->kvs, slot)) == -1) - defrag_stage++; - defrag_later_item_in_progress = 0; - } + updateDefragDecayRate(frag_pct); + moduleDefragEnd(); - /* Check if all defragmentation stages have been processed. - * If so, mark as finished and reset the stage counter to move on to next database. */ - if (defrag_stage == num_stages) { - all_stages_finished = 1; - defrag_stage = 0; - } - - /* Once in 16 scan iterations, 512 pointer reallocations. or 64 keys - * (if we have a lot of pointers in one hash bucket or rehashing), - * check if we reached the time limit. - * But regardless, don't start a new db in this loop, this is because after - * the last db we call defragOtherGlobals, which must be done in one cycle */ - if (all_stages_finished || - ++iterations > 16 || - server.stat_active_defrag_hits - prev_defragged > 512 || - server.stat_active_defrag_scanned - prev_scanned > 64) - { - /* Quit if all stages were finished or timeout. */ - if (all_stages_finished || ustime() > endtime) { - quit = 1; - break; - } - iterations = 0; - prev_defragged = server.stat_active_defrag_hits; - prev_scanned = server.stat_active_defrag_scanned; - } - } while(!all_stages_finished && !quit); - } while(!quit); + /* Immediately check to see if we should start another defrag cycle. */ + activeDefragCycle(); +} + +/* Must be called at the start of the timeProc as it measures the delay from the end of the previous + * timeProc invocation when performing the computation. */ +static int computeDefragCycleUs(void) { + long dutyCycleUs; + + int targetCpuPercent = server.active_defrag_running; + serverAssert(targetCpuPercent > 0 && targetCpuPercent < 100); + + static int prevCpuPercent = 0; /* STATIC - this persists */ + if (targetCpuPercent != prevCpuPercent) { + /* If the targetCpuPercent changes, the value might be different from when the last wait + * time was computed. In this case, don't consider wait time. (This is really only an + * issue in crazy tests that dramatically increase CPU while defrag is running.) */ + defrag.timeproc_end_time = 0; + prevCpuPercent = targetCpuPercent; + } + + /* Given when the last duty cycle ended, compute time needed to achieve the desired percentage. */ + if (defrag.timeproc_end_time == 0) { + /* Either the first call to the timeProc, or we were paused for some reason. */ + defrag.timeproc_overage_us = 0; + dutyCycleUs = DEFRAG_CYCLE_US; + } else { + long waitedUs = getMonotonicUs() - defrag.timeproc_end_time; + /* Given the elapsed wait time between calls, compute the necessary duty time needed to + * achieve the desired CPU percentage. + * With: D = duty time, W = wait time, P = percent + * Solve: D P + * ----- = ----- + * D + W 100 + * Solving for D: + * D = P * W / (100 - P) + * + * Note that dutyCycleUs addresses starvation. If the wait time was long, we will compensate + * with a proportionately long duty-cycle. This won't significantly affect perceived + * latency, because clients are already being impacted by the long cycle time which caused + * the starvation of the timer. */ + dutyCycleUs = targetCpuPercent * waitedUs / (100 - targetCpuPercent); + + /* Also adjust for any accumulated overage. */ + dutyCycleUs -= defrag.timeproc_overage_us; + defrag.timeproc_overage_us = 0; + + if (dutyCycleUs < DEFRAG_CYCLE_US) { + /* We never reduce our cycle time, that would increase overhead. Instead, we track this + * as part of the overage, and increase wait time between cycles. */ + defrag.timeproc_overage_us = DEFRAG_CYCLE_US - dutyCycleUs; + dutyCycleUs = DEFRAG_CYCLE_US; + } else if (dutyCycleUs > DEFRAG_CYCLE_US * 10) { + /* Add a time limit for the defrag duty cycle to prevent excessive latency. + * When latency is already high (indicated by a long time between calls), + * we don't want to make it worse by running defrag for too long. */ + dutyCycleUs = DEFRAG_CYCLE_US * 10; + } + } + return dutyCycleUs; +} + +/* Must be called at the end of the timeProc as it records the timeproc_end_time for use in the next + * computeDefragCycleUs computation. */ +static int computeDelayMs(monotime intendedEndtime) { + defrag.timeproc_end_time = getMonotonicUs(); + long overage = defrag.timeproc_end_time - intendedEndtime; + defrag.timeproc_overage_us += overage; /* track over/under desired CPU */ + /* Allow negative overage (underage) to count against existing overage, but don't allow + * underage (from short stages) to be accumulated. */ + if (defrag.timeproc_overage_us < 0) defrag.timeproc_overage_us = 0; + + int targetCpuPercent = server.active_defrag_running; + serverAssert(targetCpuPercent > 0 && targetCpuPercent < 100); + + /* Given the desired duty cycle, what inter-cycle delay do we need to achieve that? */ + /* We want to achieve a specific CPU percent. To do that, we can't use a skewed computation. */ + /* Example, if we run for 1ms and delay 10ms, that's NOT 10%, because the total cycle time is 11ms. */ + /* Instead, if we rum for 1ms, our total time should be 10ms. So the delay is only 9ms. */ + long totalCycleTimeUs = DEFRAG_CYCLE_US * 100 / targetCpuPercent; + long delayUs = totalCycleTimeUs - DEFRAG_CYCLE_US; + /* Only increase delay by the fraction of the overage that would be non-duty-cycle */ + delayUs += defrag.timeproc_overage_us * (100 - targetCpuPercent) / 100; + if (delayUs < 0) delayUs = 0; + long delayMs = delayUs / 1000; /* round down */ + return delayMs; +} + +/* An independent time proc for defrag. While defrag is running, this is called much more often + * than the server cron. Frequent short calls provides low latency impact. */ +static int activeDefragTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData) { + UNUSED(eventLoop); + UNUSED(id); + UNUSED(clientData); + + /* This timer shouldn't be registered unless there's work to do. */ + serverAssert(defrag.current_stage || listLength(defrag.remaining_stages) > 0); + + if (!server.active_defrag_enabled) { + /* Defrag has been disabled while running */ + endDefragCycle(0); + return AE_NOMORE; + } + + if (hasActiveChildProcess()) { + /* If there's a child process, pause the defrag, polling until the child completes. */ + defrag.timeproc_end_time = 0; /* prevent starvation recovery */ + return 100; + } + + monotime starttime = getMonotonicUs(); + int dutyCycleUs = computeDefragCycleUs(); + monotime endtime = starttime + dutyCycleUs; + int haveMoreWork = 1; + + /* Increment server.cronloops so that run_with_period works. */ + long hz_ms = 1000 / server.hz; + int cronloops = (server.mstime - server.blocked_last_cron + (hz_ms - 1)) / hz_ms; /* rounding up */ + server.blocked_last_cron += cronloops * hz_ms; + server.cronloops += cronloops; + + mstime_t latency; + latencyStartMonitor(latency); + + do { + if (!defrag.current_stage) { + defrag.current_stage = listFirst(defrag.remaining_stages); + } + + StageDescriptor *stage = listNodeValue(defrag.current_stage); + doneStatus status = stage->stage_fn(stage->ctx, endtime); + if (status == DEFRAG_DONE) { + listDelNode(defrag.remaining_stages, defrag.current_stage); + defrag.current_stage = NULL; + } + + haveMoreWork = (defrag.current_stage || listLength(defrag.remaining_stages) > 0); + /* If we've completed a stage early, and still have a standard time allotment remaining, + * we'll start another stage. This can happen when defrag is running infrequently, and + * starvation protection has increased the duty-cycle. */ + } while (haveMoreWork && getMonotonicUs() <= endtime - DEFRAG_CYCLE_US); latencyEndMonitor(latency); - latencyAddSampleIfNeeded("active-defrag-cycle",latency); - -update_metrics: - if (server.active_defrag_running > 0) { - if (server.stat_last_active_defrag_time == 0) - elapsedStart(&server.stat_last_active_defrag_time); - } else if (server.stat_last_active_defrag_time != 0) { - server.stat_total_active_defrag_time += elapsedUs(server.stat_last_active_defrag_time); - server.stat_last_active_defrag_time = 0; + latencyAddSampleIfNeeded("active-defrag-cycle", latency); + + if (haveMoreWork) { + return computeDelayMs(endtime); + } else { + endDefragCycle(1); + return AE_NOMORE; /* Ends the timer proc */ } } +/* During long running scripts, or while loading, there is a periodic function for handling other + * actions. This interface allows defrag to continue running, avoiding a single long defrag step + * after the long operation completes. */ +void defragWhileBlocked(void) { + /* This is called infrequently, while timers are not active. We might need to start defrag. */ + if (!defragIsRunning()) activeDefragCycle(); + + if (!defragIsRunning()) return; + + /* Save off the timeproc_id. If we have a normal termination, it will be cleared. */ + long long timeproc_id = defrag.timeproc_id; + + /* Simulate a single call of the timer proc */ + long long reschedule_delay = activeDefragTimeProc(NULL, 0, NULL); + if (reschedule_delay == AE_NOMORE) { + /* If it's done, deregister the timer */ + aeDeleteTimeEvent(server.el, timeproc_id); + } + /* Otherwise, just ignore the reschedule_delay, the timer will pop the next time that the + * event loop can process timers again. */ +} + +static void beginDefragCycle(void) { + serverAssert(!defragIsRunning()); + + moduleDefragStart(); + + serverAssert(defrag.remaining_stages == NULL); + defrag.remaining_stages = listCreate(); + listSetFreeMethod(defrag.remaining_stages, freeDefragContext); + + for (int dbid = 0; dbid < server.dbnum; dbid++) { + redisDb *db = &server.db[dbid]; + + /* Add stage for keys. */ + defragKeysCtx *defrag_keys_ctx = zcalloc(sizeof(defragKeysCtx)); + defrag_keys_ctx->kvstate = INIT_KVSTORE_STATE(db->keys); + defrag_keys_ctx->dbid = dbid; + addDefragStage(defragStageDbKeys, freeDefragKeysContext, defrag_keys_ctx); + + /* Add stage for expires. */ + defragKeysCtx *defrag_expires_ctx = zcalloc(sizeof(defragKeysCtx)); + defrag_expires_ctx->kvstate = INIT_KVSTORE_STATE(db->expires); + defrag_expires_ctx->dbid = dbid; + addDefragStage(defragStageExpiresKvstore, freeDefragKeysContext, defrag_expires_ctx); + } + + /* Add stage for pubsub channels. */ + defragPubSubCtx *defrag_pubsub_ctx = zmalloc(sizeof(defragPubSubCtx)); + defrag_pubsub_ctx->kvstate = INIT_KVSTORE_STATE(server.pubsub_channels); + defrag_pubsub_ctx->getPubSubChannels = getClientPubSubChannels; + addDefragStage(defragStagePubsubKvstore, zfree, defrag_pubsub_ctx); + + /* Add stage for pubsubshard channels. */ + defragPubSubCtx *defrag_pubsubshard_ctx = zmalloc(sizeof(defragPubSubCtx)); + defrag_pubsubshard_ctx->kvstate = INIT_KVSTORE_STATE(server.pubsubshard_channels); + defrag_pubsubshard_ctx->getPubSubChannels = getClientPubSubShardChannels; + addDefragStage(defragStagePubsubKvstore, zfree, defrag_pubsubshard_ctx); + + addDefragStage(defragLuaScripts, NULL, NULL); + addDefragStage(defragModuleGlobals, NULL, NULL); + + defrag.current_stage = NULL; + defrag.start_cycle = getMonotonicUs(); + defrag.start_defrag_hits = server.stat_active_defrag_hits; + defrag.start_defrag_misses = server.stat_active_defrag_misses; + defrag.start_frag_pct = getAllocatorFragmentation(NULL); + defrag.timeproc_end_time = 0; + defrag.timeproc_overage_us = 0; + defrag.timeproc_id = aeCreateTimeEvent(server.el, 0, activeDefragTimeProc, NULL, NULL); + + elapsedStart(&server.stat_last_active_defrag_time); +} + +void activeDefragCycle(void) { + if (!server.active_defrag_enabled) return; + + /* Defrag gets paused while a child process is active. So there's no point in starting a new + * cycle or adjusting the CPU percentage for an existing cycle. */ + if (hasActiveChildProcess()) return; + + computeDefragCycles(); + + if (server.active_defrag_running > 0 && !defragIsRunning()) beginDefragCycle(); +} + #else /* HAVE_DEFRAG */ void activeDefragCycle(void) { @@ -1318,4 +1560,7 @@ robj *activeDefragStringOb(robj *ob) { return NULL; } +void defragWhileBlocked(void) { +} + #endif diff --git a/src/kvstore.c b/src/kvstore.c index 6a4d123ad1c..fdb9b61a683 100644 --- a/src/kvstore.c +++ b/src/kvstore.c @@ -12,9 +12,15 @@ * Copyright (c) 2011-Present, Redis Ltd. and contributors. * All rights reserved. * + * Copyright (c) 2024-present, Valkey contributors. + * All rights reserved. + * * Licensed under your choice of the Redis Source Available License 2.0 * (RSALv2) or the Server Side Public License v1 (SSPLv1). + * + * Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information. */ + #include "fmacros.h" #include @@ -802,10 +808,14 @@ unsigned long kvstoreDictScanDefrag(kvstore *kvs, int didx, unsigned long v, dic * within dict, it only reallocates the memory used by the dict structure itself using * the provided allocation function. This feature was added for the active defrag feature. * - * The 'defragfn' callback is called with a reference to the dict - * that callback can reallocate. */ -void kvstoreDictLUTDefrag(kvstore *kvs, kvstoreDictLUTDefragFunction *defragfn) { - for (int didx = 0; didx < kvs->num_dicts; didx++) { + * With 16k dictionaries for cluster mode with 1 shard, this operation may require substantial time + * to execute. A "cursor" is used to perform the operation iteratively. When first called, a + * cursor value of 0 should be provided. The return value is an updated cursor which should be + * provided on the next iteration. The operation is complete when 0 is returned. + * + * The 'defragfn' callback is called with a reference to the dict that callback can reallocate. */ +unsigned long kvstoreDictLUTDefrag(kvstore *kvs, unsigned long cursor, kvstoreDictLUTDefragFunction *defragfn) { + for (int didx = cursor; didx < kvs->num_dicts; didx++) { dict **d = kvstoreGetDictRef(kvs, didx), *newd; if (!*d) continue; @@ -818,7 +828,9 @@ void kvstoreDictLUTDefrag(kvstore *kvs, kvstoreDictLUTDefragFunction *defragfn) if (metadata->rehashing_node) metadata->rehashing_node->value = *d; } + return (didx + 1); } + return 0; } uint64_t kvstoreGetHash(kvstore *kvs, const void *key) @@ -1059,13 +1071,14 @@ int kvstoreTest(int argc, char **argv, int flags) { } TEST("Verify that a rehashing dict's node in the rehashing list is correctly updated after defragmentation") { + unsigned long cursor = 0; kvstore *kvs = kvstoreCreate(&KvstoreDictTestType, 0, KVSTORE_ALLOCATE_DICTS_ON_DEMAND); for (i = 0; i < 256; i++) { de = kvstoreDictAddRaw(kvs, 0, stringFromInt(i), NULL); if (listLength(kvs->rehashing)) break; } assert(listLength(kvs->rehashing)); - kvstoreDictLUTDefrag(kvs, defragLUTTestCallback); + while ((cursor = kvstoreDictLUTDefrag(kvs, cursor, defragLUTTestCallback)) != 0) {} while (kvstoreIncrementallyRehash(kvs, 1000)) {} kvstoreRelease(kvs); } diff --git a/src/kvstore.h b/src/kvstore.h index 9e2e4afe0d2..8b9fd7348f8 100644 --- a/src/kvstore.h +++ b/src/kvstore.h @@ -1,3 +1,16 @@ +/* + * Copyright (c) 2009-Present, Redis Ltd. + * All rights reserved. + * + * Copyright (c) 2024-present, Valkey contributors. + * All rights reserved. + * + * Licensed under your choice of the Redis Source Available License 2.0 + * (RSALv2) or the Server Side Public License v1 (SSPLv1). + * + * Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information. + */ + #ifndef DICTARRAY_H_ #define DICTARRAY_H_ @@ -78,7 +91,7 @@ unsigned int kvstoreDictGetSomeKeys(kvstore *kvs, int didx, dictEntry **des, uns int kvstoreDictExpand(kvstore *kvs, int didx, unsigned long size); unsigned long kvstoreDictScanDefrag(kvstore *kvs, int didx, unsigned long v, dictScanFunction *fn, dictDefragFunctions *defragfns, void *privdata); typedef dict *(kvstoreDictLUTDefragFunction)(dict *d); -void kvstoreDictLUTDefrag(kvstore *kvs, kvstoreDictLUTDefragFunction *defragfn); +unsigned long kvstoreDictLUTDefrag(kvstore *kvs, unsigned long cursor, kvstoreDictLUTDefragFunction *defragfn); void *kvstoreDictFetchValue(kvstore *kvs, int didx, const void *key); dictEntry *kvstoreDictFind(kvstore *kvs, int didx, void *key); dictEntry *kvstoreDictAddRaw(kvstore *kvs, int didx, void *key, dictEntry **existing); diff --git a/src/module.c b/src/module.c index 12c7788da3c..1f4deb969c4 100644 --- a/src/module.c +++ b/src/module.c @@ -2,8 +2,13 @@ * Copyright (c) 2016-Present, Redis Ltd. * All rights reserved. * + * Copyright (c) 2024-present, Valkey contributors. + * All rights reserved. + * * Licensed under your choice of the Redis Source Available License 2.0 * (RSALv2) or the Server Side Public License v1 (SSPLv1). + * + * Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information. */ /* -------------------------------------------------------------------------- @@ -13782,7 +13787,7 @@ const char *RM_GetCurrentCommandName(RedisModuleCtx *ctx) { * defrag callback. */ struct RedisModuleDefragCtx { - long long int endtime; + monotime endtime; unsigned long *cursor; struct redisObject *key; /* Optional name of key processed, NULL when unknown. */ int dbid; /* The dbid of the key being processed, -1 when unknown. */ @@ -13821,7 +13826,7 @@ int RM_RegisterDefragCallbacks(RedisModuleCtx *ctx, RedisModuleDefragFunc start, * so it generally makes sense to do small batches of work in between calls. */ int RM_DefragShouldStop(RedisModuleDefragCtx *ctx) { - return (ctx->endtime != 0 && ctx->endtime < ustime()); + return (ctx->endtime != 0 && ctx->endtime <= getMonotonicUs()); } /* Store an arbitrary cursor value for future re-use. @@ -13929,7 +13934,7 @@ RedisModuleString *RM_DefragRedisModuleString(RedisModuleDefragCtx *ctx, RedisMo * Returns a zero value (and initializes the cursor) if no more needs to be done, * or a non-zero value otherwise. */ -int moduleLateDefrag(robj *key, robj *value, unsigned long *cursor, long long endtime, int dbid) { +int moduleLateDefrag(robj *key, robj *value, unsigned long *cursor, monotime endtime, int dbid) { moduleValue *mv = value->ptr; moduleType *mt = mv->type; diff --git a/src/server.c b/src/server.c index 5f20900e053..fd819d5b3b8 100644 --- a/src/server.c +++ b/src/server.c @@ -1637,25 +1637,7 @@ void whileBlockedCron(void) { mstime_t latency; latencyStartMonitor(latency); - /* In some cases we may be called with big intervals, so we may need to do - * extra work here. This is because some of the functions in serverCron rely - * on the fact that it is performed every 10 ms or so. For instance, if - * activeDefragCycle needs to utilize 25% cpu, it will utilize 2.5ms, so we - * need to call it multiple times. */ - long hz_ms = 1000/server.hz; - while (server.blocked_last_cron < server.mstime) { - - /* Defrag keys gradually. */ - activeDefragCycle(); - - server.blocked_last_cron += hz_ms; - - /* Increment cronloop so that run_with_period works. */ - server.cronloops++; - } - - /* Other cron jobs do not need to be done in a loop. No need to check - * server.blocked_last_cron since we have an early exit at the top. */ + defragWhileBlocked(); /* Update memory stats during loading (excluding blocked scripts) */ if (server.loading) cronUpdateMemoryStats(); @@ -2756,8 +2738,6 @@ void initServer(void) { server.db[j].watched_keys = dictCreate(&keylistDictType); server.db[j].id = j; server.db[j].avg_ttl = 0; - server.db[j].defrag_later = listCreate(); - listSetFreeMethod(server.db[j].defrag_later, sdsfreegeneric); } evictionPoolAlloc(); /* Initialize the LRU keys pool. */ /* Note that server.pubsub_channels was chosen to be a kvstore (with only one dict, which diff --git a/src/server.h b/src/server.h index 72a6f779dbd..052de222e74 100644 --- a/src/server.h +++ b/src/server.h @@ -1051,7 +1051,6 @@ typedef struct redisDb { int id; /* Database ID */ long long avg_ttl; /* Average TTL, just for stats */ unsigned long expires_cursor; /* Cursor of the active expire cycle. */ - list *defrag_later; /* List of key names to attempt to defrag one by one, gradually. */ } redisDb; /* forward declaration for functions ctx */ @@ -2672,7 +2671,7 @@ size_t moduleGetFreeEffort(robj *key, robj *val, int dbid); size_t moduleGetMemUsage(robj *key, robj *val, size_t sample_size, int dbid); robj *moduleTypeDupOrReply(client *c, robj *fromkey, robj *tokey, int todb, robj *value); int moduleDefragValue(robj *key, robj *obj, int dbid); -int moduleLateDefrag(robj *key, robj *value, unsigned long *cursor, long long endtime, int dbid); +int moduleLateDefrag(robj *key, robj *value, unsigned long *cursor, monotime endtime, int dbid); void moduleDefragGlobals(void); void moduleDefragStart(void); void moduleDefragEnd(void); @@ -3264,6 +3263,7 @@ void enterExecutionUnit(int update_cached_time, long long us); void exitExecutionUnit(void); void resetServerStats(void); void activeDefragCycle(void); +void defragWhileBlocked(void); unsigned int getLRUClock(void); unsigned int LRU_CLOCK(void); const char *evictPolicyToString(void); diff --git a/tests/unit/memefficiency.tcl b/tests/unit/memefficiency.tcl index 92c1f572cfd..15b00e767e4 100644 --- a/tests/unit/memefficiency.tcl +++ b/tests/unit/memefficiency.tcl @@ -1,3 +1,16 @@ +# +# Copyright (c) 2009-Present, Redis Ltd. +# All rights reserved. +# +# Copyright (c) 2024-present, Valkey contributors. +# All rights reserved. +# +# Licensed under your choice of the Redis Source Available License 2.0 +# (RSALv2) or the Server Side Public License v1 (SSPLv1). +# +# Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information. +# + proc test_memory_efficiency {range} { r flushall set rd [redis_deferring_client] @@ -37,15 +50,19 @@ start_server {tags {"memefficiency external:skip"}} { } run_solo {defrag} { - proc wait_for_defrag_stop {maxtries delay} { + proc wait_for_defrag_stop {maxtries delay {expect_frag 0}} { wait_for_condition $maxtries $delay { - [s active_defrag_running] eq 0 + [s active_defrag_running] eq 0 && ($expect_frag == 0 || [s allocator_frag_ratio] <= $expect_frag) } else { after 120 ;# serverCron only updates the info once in 100ms puts [r info memory] puts [r info stats] puts [r memory malloc-stats] - fail "defrag didn't stop." + if {$expect_frag != 0} { + fail "defrag didn't stop or failed to achieve expected frag ratio ([s allocator_frag_ratio] > $expect_frag)" + } else { + fail "defrag didn't stop." + } } } @@ -102,7 +119,7 @@ run_solo {defrag} { r config set active-defrag-cycle-max 75 # Wait for the active defrag to stop working. - wait_for_defrag_stop 2000 100 + wait_for_defrag_stop 2000 100 1.1 # Test the fragmentation is lower. after 120 ;# serverCron only updates the info once in 100ms @@ -124,7 +141,6 @@ run_solo {defrag} { puts [r latency latest] puts [r latency history active-defrag-cycle] } - assert {$frag < 1.1} # due to high fragmentation, 100hz, and active-defrag-cycle-max set to 75, # we expect max latency to be not much higher than 7.5ms but due to rare slowness threshold is set higher if {!$::no_latency} { @@ -142,6 +158,11 @@ run_solo {defrag} { # reset stats and load the AOF file r config resetstat r config set key-load-delay -25 ;# sleep on average 1/25 usec + # Note: This test is checking if defrag is working DURING AOF loading (while + # timers are not active). So we don't give any extra time, and we deactivate + # defrag immediately after the AOF loading is complete. During loading, + # defrag will get invoked less often, causing starvation prevention. We + # should expect longer latency measurements. r debug loadaof r config set activedefrag no # measure hits and misses right after aof loading @@ -246,7 +267,7 @@ run_solo {defrag} { } # wait for the active defrag to stop working - wait_for_defrag_stop 500 100 + wait_for_defrag_stop 500 100 1.05 # test the fragmentation is lower after 120 ;# serverCron only updates the info once in 100ms @@ -256,7 +277,6 @@ run_solo {defrag} { puts "frag [s allocator_frag_ratio]" puts "frag_bytes [s allocator_frag_bytes]" } - assert_lessthan_equal [s allocator_frag_ratio] 1.05 } # Flush all script to make sure we don't crash after defragging them r script flush sync @@ -362,7 +382,7 @@ run_solo {defrag} { } # wait for the active defrag to stop working - wait_for_defrag_stop 500 100 + wait_for_defrag_stop 500 100 1.1 # test the fragmentation is lower after 120 ;# serverCron only updates the info once in 100ms @@ -384,7 +404,6 @@ run_solo {defrag} { puts [r latency latest] puts [r latency history active-defrag-cycle] } - assert {$frag < 1.1} # due to high fragmentation, 100hz, and active-defrag-cycle-max set to 75, # we expect max latency to be not much higher than 7.5ms but due to rare slowness threshold is set higher if {!$::no_latency} { @@ -464,7 +483,7 @@ run_solo {defrag} { } # wait for the active defrag to stop working - wait_for_defrag_stop 500 100 + wait_for_defrag_stop 500 100 1.05 # test the fragmentation is lower after 120 ;# serverCron only updates the info once in 100ms @@ -474,7 +493,6 @@ run_solo {defrag} { puts "frag [s allocator_frag_ratio]" puts "frag_bytes [s allocator_frag_bytes]" } - assert_lessthan_equal [s allocator_frag_ratio] 1.05 } # Publishes some message to all the pubsub clients to make sure that @@ -572,7 +590,7 @@ run_solo {defrag} { } # wait for the active defrag to stop working - wait_for_defrag_stop 500 100 + wait_for_defrag_stop 500 100 1.5 # test the fragmentation is lower after 120 ;# serverCron only updates the info once in 100ms @@ -582,7 +600,6 @@ run_solo {defrag} { puts "frag [s allocator_frag_ratio]" puts "frag_bytes [s allocator_frag_bytes]" } - assert_lessthan_equal [s allocator_frag_ratio] 1.5 } } @@ -682,7 +699,13 @@ run_solo {defrag} { } # wait for the active defrag to stop working - wait_for_defrag_stop 500 100 + if {$io_threads == 1} { + wait_for_defrag_stop 500 100 1.05 + } else { + # TODO: When multithreading is enabled, argv may be created in the io thread + # and kept in the main thread, which can cause fragmentation to become worse. + wait_for_defrag_stop 500 100 1.1 + } # test the fragmentation is lower after 120 ;# serverCron only updates the info once in 100ms @@ -692,14 +715,6 @@ run_solo {defrag} { puts "frag [s allocator_frag_ratio]" puts "frag_bytes [s allocator_frag_bytes]" } - - if {$io_threads == 1} { - assert_lessthan_equal [s allocator_frag_ratio] 1.05 - } else { - # TODO: When multithreading is enabled, argv may be created in the io thread - # and kept in the main thread, which can cause fragmentation to become worse. - assert_lessthan_equal [s allocator_frag_ratio] 1.1 - } } } @@ -763,7 +778,7 @@ run_solo {defrag} { } # wait for the active defrag to stop working - wait_for_defrag_stop 500 100 + wait_for_defrag_stop 500 100 1.1 # test the fragmentation is lower after 120 ;# serverCron only updates the info once in 100ms @@ -789,7 +804,6 @@ run_solo {defrag} { puts [r latency history active-defrag-cycle] puts [r memory malloc-stats] } - assert {$frag < 1.1} # due to high fragmentation, 100hz, and active-defrag-cycle-max set to 75, # we expect max latency to be not much higher than 7.5ms but due to rare slowness threshold is set higher if {!$::no_latency} { @@ -884,7 +898,7 @@ run_solo {defrag} { } # wait for the active defrag to stop working - wait_for_defrag_stop 500 100 + wait_for_defrag_stop 500 100 1.1 # test the fragmentation is lower after 120 ;# serverCron only updates the info once in 100ms @@ -896,7 +910,6 @@ run_solo {defrag} { puts "hits: $hits" puts "misses: $misses" } - assert {$frag < 1.1} assert {$misses < 10000000} ;# when defrag doesn't stop, we have some 30m misses, when it does, we have 2m misses }