Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion redis.conf
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ locale-collate ""
# Snapshotting can be completely disabled with a single empty string argument
# as in following example:
#
# save ""
save ""
#
# Unless specified otherwise, by default Redis will save the DB:
# * After 3600 seconds (an hour) if at least 1 change was performed
Expand Down Expand Up @@ -2139,6 +2139,9 @@ client-output-buffer-limit pubsub 32mb 8mb 60
#
# client-query-buffer-limit 1gb

# Defines how many commands in each client pipeline to decode and prefetch
# lookahead 16

# In some scenarios client connections can hog up memory leading to OOM
# errors or data eviction. To avoid this we can cap the accumulated memory
# used by all client connections (all pubsub and normal clients). Once we
Expand Down
1 change: 1 addition & 0 deletions src/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#define CLUSTER_SLOT_MASK_BITS 14 /* Number of bits used for slot id. */
#define CLUSTER_SLOTS (1<<CLUSTER_SLOT_MASK_BITS) /* Total number of slots in cluster mode, which is 16384. */
#define CLUSTER_SLOT_MASK ((unsigned long long)(CLUSTER_SLOTS - 1)) /* Bit mask for slot id stored in LSB. */
#define CLUSTER_INVALID_SLOT (-1) /* Invalid slot number. */
#define CLUSTER_OK 0 /* Everything looks ok */
#define CLUSTER_FAIL 1 /* The cluster can't work */
#define CLUSTER_NAMELEN 40 /* sha1 hex length */
Expand Down
1 change: 1 addition & 0 deletions src/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -3217,6 +3217,7 @@ standardConfig static_configs[] = {
createIntConfig("shutdown-timeout", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.shutdown_timeout, 10, INTEGER_CONFIG, NULL, NULL),
createIntConfig("repl-diskless-sync-max-replicas", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.repl_diskless_sync_max_replicas, 0, INTEGER_CONFIG, NULL, NULL),
createIntConfig("cluster-compatibility-sample-ratio", NULL, MODIFIABLE_CONFIG, 0, 100, server.cluster_compatibility_sample_ratio, 0, INTEGER_CONFIG, NULL, NULL),
createIntConfig("lookahead", NULL, MODIFIABLE_CONFIG, 1, INT_MAX, server.lookahead, REDIS_DEFAULT_LOOKAHEAD, INTEGER_CONFIG, NULL, NULL),

/* Unsigned int configs */
createUIntConfig("maxclients", NULL, MODIFIABLE_CONFIG, 1, UINT_MAX, server.maxclients, 10000, INTEGER_CONFIG, NULL, updateMaxclients),
Expand Down
1 change: 0 additions & 1 deletion src/iothread.c
Original file line number Diff line number Diff line change
Expand Up @@ -729,7 +729,6 @@ void initThreadedIO(void) {
exit(1);
}

prefetchCommandsBatchInit();

/* Spawn and initialize the I/O threads. */
for (int i = 1; i < server.io_threads_num; i++) {
Expand Down
15 changes: 7 additions & 8 deletions src/memory_prefetch.c
Original file line number Diff line number Diff line change
Expand Up @@ -382,17 +382,16 @@ int addCommandToBatch(client *c) {

batch->clients[batch->client_count++] = c;

if (likely(c->iolookedcmd)) {
/* Get command's keys positions */
getKeysResult result = GETKEYS_RESULT_INIT;
int num_keys = getKeysFromCommand(c->iolookedcmd, c->argv, c->argc, &result);
for (int i = 0; i < num_keys && batch->key_count < batch->max_prefetch_size; i++) {
batch->keys[batch->key_count] = c->argv[result.keys[i].pos];
pendingCommand *pcmd = c->pending_cmds.head;
while (pcmd != NULL) {
if (!pcmd) break;
for (int i = 0; i < pcmd->keys_result.numkeys && batch->key_count < batch->max_prefetch_size; i++) {
batch->keys[batch->key_count] = pcmd->argv[pcmd->keys_result.keys[i].pos];
batch->keys_dicts[batch->key_count] =
kvstoreGetDict(c->db->keys, c->slot > 0 ? c->slot : 0);
kvstoreGetDict(c->db->keys, pcmd->slot > 0 ? pcmd->slot : 0);
batch->key_count++;
}
getKeysFreeResult(&result);
pcmd = pcmd->next;
}

return C_OK;
Expand Down
6 changes: 1 addition & 5 deletions src/module.c
Original file line number Diff line number Diff line change
Expand Up @@ -678,7 +678,7 @@ void moduleReleaseTempClient(client *c) {
c->bufpos = 0;
c->flags = CLIENT_MODULE;
c->user = NULL; /* Root user */
c->cmd = c->lastcmd = c->realcmd = c->iolookedcmd = NULL;
c->cmd = c->lastcmd = c->realcmd = NULL;
if (c->bstate.async_rm_call_handle) {
RedisModuleAsyncRMCallPromise *promise = c->bstate.async_rm_call_handle;
promise->c = NULL; /* Remove the client from the promise so it will no longer be possible to abort it. */
Expand Down Expand Up @@ -11034,10 +11034,6 @@ void moduleCallCommandFilters(client *c) {
f->callback(&filter);
}

/* If the filter sets a new command, including command or subcommand,
* the command looked up in IO threads will be invalid. */
c->iolookedcmd = NULL;

c->argv = filter.argv;
c->argv_len = filter.argv_len;
c->argc = filter.argc;
Expand Down
1 change: 1 addition & 0 deletions src/multi.c
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ void queueMultiCommand(client *c, uint64_t cmd_flags) {
* reference them from c anymore. */
c->argv = NULL;
c->argc = 0;
c->all_argv_len_sum -= c->argv_len_sum;
c->argv_len_sum = 0;
c->argv_len = 0;
}
Expand Down
Loading
Loading