diff --git a/redis.conf b/redis.conf index 5d2b27ffbae..20e9a428406 100644 --- a/redis.conf +++ b/redis.conf @@ -433,7 +433,7 @@ locale-collate "" # Snapshotting can be completely disabled with a single empty string argument # as in following example: # -# save "" +save "" # # Unless specified otherwise, by default Redis will save the DB: # * After 3600 seconds (an hour) if at least 1 change was performed @@ -2139,6 +2139,9 @@ client-output-buffer-limit pubsub 32mb 8mb 60 # # client-query-buffer-limit 1gb +# Defines how many commands in each client pipeline to decode and prefetch +# lookahead 16 + # In some scenarios client connections can hog up memory leading to OOM # errors or data eviction. To avoid this we can cap the accumulated memory # used by all client connections (all pubsub and normal clients). Once we diff --git a/src/cluster.h b/src/cluster.h index 18b5bb46558..1d8c159eb49 100644 --- a/src/cluster.h +++ b/src/cluster.h @@ -22,6 +22,7 @@ #define CLUSTER_SLOT_MASK_BITS 14 /* Number of bits used for slot id. */ #define CLUSTER_SLOTS (1<clients[batch->client_count++] = c; - if (likely(c->iolookedcmd)) { - /* Get command's keys positions */ - getKeysResult result = GETKEYS_RESULT_INIT; - int num_keys = getKeysFromCommand(c->iolookedcmd, c->argv, c->argc, &result); - for (int i = 0; i < num_keys && batch->key_count < batch->max_prefetch_size; i++) { - batch->keys[batch->key_count] = c->argv[result.keys[i].pos]; + pendingCommand *pcmd = c->pending_cmds.head; + while (pcmd != NULL) { + if (!pcmd) break; + for (int i = 0; i < pcmd->keys_result.numkeys && batch->key_count < batch->max_prefetch_size; i++) { + batch->keys[batch->key_count] = pcmd->argv[pcmd->keys_result.keys[i].pos]; batch->keys_dicts[batch->key_count] = - kvstoreGetDict(c->db->keys, c->slot > 0 ? c->slot : 0); + kvstoreGetDict(c->db->keys, pcmd->slot > 0 ? pcmd->slot : 0); batch->key_count++; } - getKeysFreeResult(&result); + pcmd = pcmd->next; } return C_OK; diff --git a/src/module.c b/src/module.c index ab8cafb191a..000de0bc190 100644 --- a/src/module.c +++ b/src/module.c @@ -678,7 +678,7 @@ void moduleReleaseTempClient(client *c) { c->bufpos = 0; c->flags = CLIENT_MODULE; c->user = NULL; /* Root user */ - c->cmd = c->lastcmd = c->realcmd = c->iolookedcmd = NULL; + c->cmd = c->lastcmd = c->realcmd = NULL; if (c->bstate.async_rm_call_handle) { RedisModuleAsyncRMCallPromise *promise = c->bstate.async_rm_call_handle; promise->c = NULL; /* Remove the client from the promise so it will no longer be possible to abort it. */ @@ -11034,10 +11034,6 @@ void moduleCallCommandFilters(client *c) { f->callback(&filter); } - /* If the filter sets a new command, including command or subcommand, - * the command looked up in IO threads will be invalid. */ - c->iolookedcmd = NULL; - c->argv = filter.argv; c->argv_len = filter.argv_len; c->argc = filter.argc; diff --git a/src/multi.c b/src/multi.c index 8c5ec6f99e2..602872d48cc 100644 --- a/src/multi.c +++ b/src/multi.c @@ -71,6 +71,7 @@ void queueMultiCommand(client *c, uint64_t cmd_flags) { * reference them from c anymore. */ c->argv = NULL; c->argc = 0; + c->all_argv_len_sum -= c->argv_len_sum; c->argv_len_sum = 0; c->argv_len = 0; } diff --git a/src/networking.c b/src/networking.c index 9f4fec0d71b..41d105f522b 100644 --- a/src/networking.c +++ b/src/networking.c @@ -19,6 +19,7 @@ #include "script.h" #include "fpconv_dtoa.h" #include "fmtargs.h" +#include "memory_prefetch.h" #include #include #include @@ -37,6 +38,12 @@ __thread sds thread_reusable_qb = NULL; __thread int thread_reusable_qb_used = 0; /* Avoid multiple clients using reusable query * buffer due to nested command execution. */ +static int consumePendingCommand(client *c); +static void discardCommandQueue(client *c); +static int parseMultibulk(client *c, pendingCommand *pcmd); + +/* COMMAND_QUEUE_MIN_CAPACITY no longer needed with linked list implementation */ + /* Return the size consumed from the allocator, for the specified SDS string, * including internal fragmentation. This function is used in order to compute * the client output buffer size. */ @@ -163,11 +170,14 @@ client *createClient(connection *conn) { c->argv = NULL; c->argv_len = 0; c->argv_len_sum = 0; + c->all_argv_len_sum = 0; + c->pending_cmds.head = c->pending_cmds.tail = NULL; + c->pending_cmds.length = 0; c->original_argc = 0; c->original_argv = NULL; c->deferred_objects = NULL; c->deferred_objects_num = 0; - c->cmd = c->lastcmd = c->realcmd = c->iolookedcmd = NULL; + c->cmd = c->lastcmd = c->realcmd = NULL; c->cur_script = NULL; c->multibulklen = 0; c->bulklen = -1; @@ -1517,7 +1527,7 @@ void freeClientOriginalArgv(client *c) { c->original_argc = 0; } -static inline void freeClientArgvInternal(client *c, int free_argv) { +void freeClientArgv(client *c) { int j; if (c->tid == IOTHREAD_MAIN_THREAD_ID) { for (j = 0; j < c->argc; j++) @@ -1528,17 +1538,11 @@ static inline void freeClientArgvInternal(client *c, int free_argv) { } c->argc = 0; c->cmd = NULL; - c->iolookedcmd = NULL; + c->all_argv_len_sum -= c->argv_len_sum; c->argv_len_sum = 0; - if (free_argv) { - c->argv_len = 0; - zfree(c->argv); - c->argv = NULL; - } -} - -void freeClientArgv(client *c) { - freeClientArgvInternal(c, 1); + c->argv_len = 0; + zfree(c->argv); + c->argv = NULL; } /* Close all the slaves connections. This is useful in chained replication @@ -1823,6 +1827,7 @@ void freeClient(client *c) { freeReplicaReferencedReplBuffer(c); freeClientArgv(c); freeClientOriginalArgv(c); + discardCommandQueue(c); freeClientDeferredObjects(c, 1); if (c->deferred_reply_errors) listRelease(c->deferred_reply_errors); @@ -2282,14 +2287,11 @@ int handleClientsWithPendingWrites(void) { return processed; } -static inline void resetClientInternal(client *c, int free_argv) { +void resetClient(client *c) { redisCommandProc *prevcmd = c->cmd ? c->cmd->proc : NULL; - freeClientArgvInternal(c, free_argv); + freeClientArgv(c); c->cur_script = NULL; - c->reqtype = 0; - c->multibulklen = 0; - c->bulklen = -1; c->slot = -1; c->cluster_compatibility_check_slot = -2; c->flags &= ~CLIENT_EXECUTING_COMMAND; @@ -2328,11 +2330,6 @@ static inline void resetClientInternal(client *c, int free_argv) { c->net_output_bytes_curr_cmd = 0; } -/* resetClient prepare the client to process the next command */ -void resetClient(client *c) { - resetClientInternal(c, 1); -} - /* This function is used when we want to re-enter the event loop but there * is the risk that the client we are dealing with will be freed in some * way. This happens for instance in: @@ -2373,14 +2370,14 @@ void unprotectClient(client *c) { * have a well formed command. The function also returns C_ERR when there is * a protocol error: in such a case the client structure is setup to reply * with the error and close the connection. */ -int processInlineBuffer(client *c) { +int parseInlineBuffer(client *c, pendingCommand *pcmd) { char *newline; int argc, j, linefeed_chars = 1; sds *argv, aux; size_t querylen; /* Search for end of line */ - newline = strchr(c->querybuf+c->qb_pos,'\n'); + newline = memchr(c->querybuf+c->qb_pos,'\n',sdslen(c->querybuf) - c->qb_pos); /* Nothing to do without a \r\n */ if (newline == NULL) { @@ -2428,20 +2425,17 @@ int processInlineBuffer(client *c) { /* Setup argv array on client structure */ if (argc) { - /* Create new argv if space is insufficient. */ - if (unlikely(argc > c->argv_len)) { - zfree(c->argv); - c->argv = zmalloc(sizeof(robj*)*argc); - c->argv_len = argc; - } - c->argv_len_sum = 0; + zfree(pcmd->argv); + pcmd->argv = zmalloc(sizeof(robj*)*argc); + pcmd->argv_len = argc; + pcmd->argv_len_sum = 0; } /* Create redis objects for all arguments. */ - for (c->argc = 0, j = 0; j < argc; j++) { - c->argv[c->argc] = createObject(OBJ_STRING,argv[j]); - c->argc++; - c->argv_len_sum += sdslen(argv[j]); + for (pcmd->argc = 0, j = 0; j < argc; j++) { + pcmd->argv[pcmd->argc] = createObject(OBJ_STRING,argv[j]); + pcmd->argc++; + pcmd->argv_len_sum += sdslen(argv[j]); } zfree(argv); @@ -2458,7 +2452,7 @@ int processInlineBuffer(client *c) { * Command) SET key value * Inline) SET key value\r\n */ - c->net_input_bytes_curr_cmd = (c->argv_len_sum + (c->argc - 1) + 2); + pcmd->input_bytes = (pcmd->argv_len_sum + (pcmd->argc - 1) + 2); return C_OK; } @@ -2496,32 +2490,21 @@ static void setProtocolError(const char *errstr, client *c) { c->flags |= (CLIENT_CLOSE_AFTER_REPLY|CLIENT_PROTOCOL_ERROR); } -/* Process the query buffer for client 'c', setting up the client argument - * vector for command execution. Returns C_OK if after running the function - * the client has a well-formed ready to be processed command, otherwise - * C_ERR if there is still to read more buffer to get the full command. - * The function also returns C_ERR when there is a protocol error: in such a - * case the client structure is setup to reply with the error and close - * the connection. - * - * This function is called if processInputBuffer() detects that the next - * command is in RESP format, so the first byte in the command is found - * to be '*'. Otherwise for inline commands processInlineBuffer() is called. */ -int processMultibulkBuffer(client *c) { +static int parseMultibulk(client *c, pendingCommand *pcmd) { char *newline = NULL; int ok; long long ll; size_t querybuf_len = sdslen(c->querybuf); /* Cache sdslen */ if (c->multibulklen == 0) { - /* The client should have been reset */ - serverAssertWithInfo(c,NULL,c->argc == 0); + /* TODO: The client should have been reset */ + serverAssertWithInfo(c,NULL,pcmd->argc == 0); /* Multi bulk length cannot be read without a \r\n */ - newline = strchr(c->querybuf+c->qb_pos,'\r'); + newline = memchr(c->querybuf+c->qb_pos,'\r',sdslen(c->querybuf) - c->qb_pos); if (newline == NULL) { if (querybuf_len-c->qb_pos > PROTO_INLINE_MAX_SIZE) { - c->read_error = CLIENT_READ_TOO_BIG_MBULK_COUNT_STRING; + pcmd->flags = CLIENT_READ_TOO_BIG_MBULK_COUNT_STRING; } return C_ERR; } @@ -2536,10 +2519,10 @@ int processMultibulkBuffer(client *c) { size_t multibulklen_slen = newline - (c->querybuf + 1 + c->qb_pos); ok = string2ll(c->querybuf+1+c->qb_pos,newline-(c->querybuf+1+c->qb_pos),&ll); if (!ok || ll > INT_MAX) { - c->read_error = CLIENT_READ_INVALID_MULTIBUCK_LENGTH; + pcmd->flags = CLIENT_READ_INVALID_MULTIBUCK_LENGTH; return C_ERR; } else if (ll > 10 && authRequired(c)) { - c->read_error = CLIENT_READ_UNAUTH_MBUCK_COUNT; + pcmd->flags = CLIENT_READ_UNAUTH_MBUCK_COUNT; return C_ERR; } @@ -2548,19 +2531,12 @@ int processMultibulkBuffer(client *c) { if (ll <= 0) return C_OK; c->multibulklen = ll; + c->bulklen = -1; - /* Setup argv array on client structure. - * Create new argv in the following cases: - * 1) When the requested size is greater than the current size. - * 2) When the requested size is less than the current size, because - * we always allocate argv gradually with a maximum size of 1024, - * Therefore, if argv_len exceeds this limit, we always reallocate. */ - if (unlikely(c->multibulklen > c->argv_len || c->argv_len > 1024)) { - zfree(c->argv); - c->argv_len = min(c->multibulklen, 1024); - c->argv = zmalloc(sizeof(robj*)*c->argv_len); - } - c->argv_len_sum = 0; + zfree(pcmd->argv); + pcmd->argv_len = min(c->multibulklen, 1024); + pcmd->argv = zmalloc(sizeof(robj*)*(pcmd->argv_len)); + pcmd->argv_len_sum = 0; /* Per-slot network bytes-in calculation. * @@ -2593,17 +2569,17 @@ int processMultibulkBuffer(client *c) { * * The 1st component is calculated within the below line. * */ - c->net_input_bytes_curr_cmd += (multibulklen_slen + 3); + pcmd->input_bytes += (multibulklen_slen + 3); } serverAssertWithInfo(c,NULL,c->multibulklen > 0); while(c->multibulklen) { /* Read bulk length if unknown */ if (c->bulklen == -1) { - newline = strchr(c->querybuf+c->qb_pos,'\r'); + newline = memchr(c->querybuf+c->qb_pos,'\r',sdslen(c->querybuf) - c->qb_pos); if (newline == NULL) { if (querybuf_len-c->qb_pos > PROTO_INLINE_MAX_SIZE) { - c->read_error = CLIENT_READ_TOO_BIG_BUCK_COUNT_STRING; + pcmd->flags = CLIENT_READ_TOO_BIG_BUCK_COUNT_STRING; return C_ERR; } break; @@ -2614,7 +2590,7 @@ int processMultibulkBuffer(client *c) { break; if (c->querybuf[c->qb_pos] != '$') { - c->read_error = CLIENT_READ_EXPECTED_DOLLAR; + pcmd->flags = CLIENT_READ_EXPECTED_DOLLAR; return C_ERR; } @@ -2622,10 +2598,10 @@ int processMultibulkBuffer(client *c) { ok = string2ll(c->querybuf+c->qb_pos+1,newline-(c->querybuf+c->qb_pos+1),&ll); if (!ok || ll < 0 || (!(c->flags & CLIENT_MASTER) && ll > server.proto_max_bulk_len)) { - c->read_error = CLIENT_READ_INVALID_BUCK_LENGTH; + pcmd->flags = CLIENT_READ_INVALID_BUCK_LENGTH; return C_ERR; } else if (ll > 16384 && authRequired(c)) { - c->read_error = CLIENT_READ_UNAUTH_BUCK_LENGTH; + pcmd->flags = CLIENT_READ_UNAUTH_BUCK_LENGTH; return C_ERR; } @@ -2659,7 +2635,9 @@ int processMultibulkBuffer(client *c) { } c->bulklen = ll; /* Per-slot network bytes-in calculation, 2nd component. */ - c->net_input_bytes_curr_cmd += (bulklen_slen + 3); + pcmd->input_bytes += (bulklen_slen + 3); + } else { + serverAssert(pcmd->parsing_incomplete); } /* Read bulk argument */ @@ -2667,10 +2645,9 @@ int processMultibulkBuffer(client *c) { break; } else { /* Check if we have space in argv, grow if needed */ - if (c->argc >= c->argv_len) { - serverAssert(c->argv_len); /* Ensure argv is not freed while the client is in the mid of parsing command. */ - c->argv_len = min(c->argv_len < INT_MAX/2 ? c->argv_len*2 : INT_MAX, c->argc+c->multibulklen); - c->argv = zrealloc(c->argv, sizeof(robj*)*c->argv_len); + if (pcmd->argc >= pcmd->argv_len) { + pcmd->argv_len = min(pcmd->argv_len < INT_MAX/2 ? (pcmd->argv_len)*2 : INT_MAX, pcmd->argc+c->multibulklen); + pcmd->argv = zrealloc(pcmd->argv, sizeof(robj*)*(pcmd->argv_len)); } /* Optimization: if a non-master client's buffer contains JUST our bulk element @@ -2681,8 +2658,8 @@ int processMultibulkBuffer(client *c) { c->bulklen >= PROTO_MBULK_BIG_ARG && querybuf_len == (size_t)(c->bulklen+2)) { - c->argv[c->argc++] = createObject(OBJ_STRING,c->querybuf); - c->argv_len_sum += c->bulklen; + (pcmd->argv)[(pcmd->argc)++] = createObject(OBJ_STRING,c->querybuf); + pcmd->argv_len_sum += c->bulklen; sdsIncrLen(c->querybuf,-2); /* remove CRLF */ /* Assume that if we saw a fat argument we'll see another one likely... * But only if that fat argument is not too big compared to the memory limit. */ @@ -2694,9 +2671,9 @@ int processMultibulkBuffer(client *c) { sdsclear(c->querybuf); querybuf_len = sdslen(c->querybuf); /* Update cached length */ } else { - c->argv[c->argc++] = + (pcmd->argv)[(pcmd->argc)++] = createStringObject(c->querybuf+c->qb_pos,c->bulklen); - c->argv_len_sum += c->bulklen; + pcmd->argv_len_sum += c->bulklen; c->qb_pos += c->bulklen+2; } c->bulklen = -1; @@ -2707,12 +2684,25 @@ int processMultibulkBuffer(client *c) { /* We're done when c->multibulk == 0 */ if (c->multibulklen == 0) { /* Per-slot network bytes-in calculation, 3rd and 4th components. */ - c->net_input_bytes_curr_cmd += (c->argv_len_sum + (c->argc * 2)); + pcmd->input_bytes += (pcmd->argv_len_sum + (pcmd->argc * 2)); + pcmd->parsing_incomplete = 0; return C_OK; } /* Still not ready to process the command */ - return C_ERR; + pcmd->parsing_incomplete = 1; + return C_OK; +} + +/* Prepare the client for executing the next command: + * + * 1. Append the response, if necessary. + * 2. Reset the client. + * 3. Update the all_argv_len_sum counter and advance the pending_cmd cyclic buffer. + */ +void prepareForNextCommand(client *c) { + reqresAppendResponse(c); + resetClient(c); } /* Perform necessary tasks after a command was executed: @@ -2732,12 +2722,13 @@ void commandProcessed(client *c) { reqresAppendResponse(c); clusterSlotStatsAddNetworkBytesInForUserClient(c); - resetClientInternal(c, 0); + resetClient(c); long long prev_offset = c->reploff; if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) { /* Update the applied replication offset of our master. */ - c->reploff = c->read_reploff - sdslen(c->querybuf) + c->qb_pos; + serverAssert(c->reploff_next > 0); + c->reploff = c->reploff_next; } /* If the client is a master we need to compute the difference @@ -2810,8 +2801,8 @@ int processPendingCommandAndInputBuffer(client *c) { * Note: when a master client steps into this function, * it can always satisfy this condition, because its querybuf * contains data not applied. */ - if (c->querybuf && sdslen(c->querybuf) > 0) { - return processInputBuffer(c); + if ((c->querybuf && sdslen(c->querybuf) > 0) || c->pending_cmds.length > 0) { + return processInputBuffer(c, 0); } return C_OK; } @@ -2879,19 +2870,89 @@ void handleClientReadError(client *c) { break; } default: - serverPanic("Unknown client read error"); + serverPanic("Unknown client read error: %d", c->read_error); break; } } +/* Prepare the client for the parsing of the next command. */ +void resetClientQbufState(client *c) { + c->reqtype = 0; + c->multibulklen = 0; + c->bulklen = -1; +} + +void parseInputBuffer(client *c) { + /* We limit the lookahead for unauthenticated connections to 1. + * This is both to reduce memory overhead, and to prevent errors: AUTH can + * affect the handling of succeeding commands. Parsing of "large" + * unauthenticated multibulk commands is rejected, which would cause those + * commands to incorrectly return an error to the client. */ + const int lookahead = authRequired(c) ? 1 : server.lookahead; + + /* Parse up to lookahead commands */ + while (c->pending_cmds.length < lookahead && c->querybuf && c->qb_pos < sdslen(c->querybuf)) { + /* Determine request type when unknown. */ + if (!c->reqtype) { + if (c->querybuf[c->qb_pos] == '*') { + c->reqtype = PROTO_REQ_MULTIBULK; + } else { + c->reqtype = PROTO_REQ_INLINE; + } + } + + pendingCommand *pcmd = NULL; + if (c->reqtype == PROTO_REQ_INLINE) { + pcmd = zmalloc(sizeof(pendingCommand)); + initPendingCommand(pcmd); + if (parseInlineBuffer(c, pcmd) == C_ERR && !pcmd->flags) { + /* If it fails but there are no errors, it means that it might just be + * that the desired content cannot be parsed. At this point, we exit and wait for the next time. */ + freePendingCommand(pcmd); + return; + } + } else if (c->reqtype == PROTO_REQ_MULTIBULK) { + int incomplete = c->pending_cmds.head && c->pending_cmds.head->parsing_incomplete; + if (unlikely(incomplete)) { + serverAssert(c->pending_cmds.length == 1); + pcmd = removePendingCommandFromHead(&c->pending_cmds); + } else { + pcmd = zmalloc(sizeof(pendingCommand)); + initPendingCommand(pcmd); + } + + if (parseMultibulk(c, pcmd) == C_ERR && !pcmd->flags) { + /* If it fails but there are no errors, it means that it might just be + * that the desired content cannot be parsed. At this point, we exit and wait for the next time. */ + freePendingCommand(pcmd); + return; + } + } else { + serverPanic("Unknown request type"); + } + + c->all_argv_len_sum += pcmd->argv_len_sum; + addPengingCommand(&c->pending_cmds, pcmd); + if (unlikely(pcmd->flags || pcmd->parsing_incomplete)) + break; + + if (!pcmd->parsing_incomplete) { + pcmd->reploff = c->read_reploff - sdslen(c->querybuf) + c->qb_pos; + preprocessCommand(c, pcmd); + resetClientQbufState(c); + } + } +} + /* This function is called every time, in the client structure 'c', there is * more query buffer to process, because we read more data from the socket * or because a client was blocked and later reactivated, so there could be * pending query buffer, already representing a full command, to process. * return C_ERR in case the client was freed during the processing */ -int processInputBuffer(client *c) { +int processInputBuffer(client *c, int prefetch) { /* Keep processing while there is something in the input buffer */ - while(c->qb_pos < sdslen(c->querybuf)) { + while ((c->querybuf && c->qb_pos < sdslen(c->querybuf)) || + c->pending_cmds.length > 0) { /* Immediately abort if the client is in the middle of something. */ if (c->flags & CLIENT_BLOCKED) break; @@ -2912,34 +2973,30 @@ int processInputBuffer(client *c) { * The same applies for clients we want to terminate ASAP. */ if (c->flags & (CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP)) break; - /* Determine request type when unknown. */ - if (!c->reqtype) { - if (c->querybuf[c->qb_pos] == '*') { - c->reqtype = PROTO_REQ_MULTIBULK; - } else { - c->reqtype = PROTO_REQ_INLINE; + /* If commands are queued up, pop from the queue first */ + if (!consumePendingCommand(c)) { + parseInputBuffer(c); + if (c->running_tid == IOTHREAD_MAIN_THREAD_ID && prefetch) { + /* Prefetch the commands. */ + resetCommandsBatch(); + addCommandToBatch(c); + prefetchCommands(); } + if (consumePendingCommand(c) == 0) break; } - if (c->reqtype == PROTO_REQ_INLINE) { - if (processInlineBuffer(c) != C_OK) { - if (c->running_tid != IOTHREAD_MAIN_THREAD_ID && c->read_error) - enqueuePendingClientsToMainThread(c, 0); + if (c->read_error) { break; } - } else if (c->reqtype == PROTO_REQ_MULTIBULK) { - if (processMultibulkBuffer(c) != C_OK) { - if (c->running_tid != IOTHREAD_MAIN_THREAD_ID && c->read_error) - enqueuePendingClientsToMainThread(c, 0); - break; - } - } else { - serverPanic("Unknown request type"); + + if (c->running_tid != IOTHREAD_MAIN_THREAD_ID && c->read_error) { + enqueuePendingClientsToMainThread(c, 0); + break; } /* Multibulk processing could see a <= 0 length. */ if (c->argc == 0) { - freeClientArgvInternal(c, 0); + freeClientArgv(c); c->reqtype = 0; c->multibulklen = 0; c->bulklen = -1; @@ -2949,13 +3006,6 @@ int processInputBuffer(client *c) { * as one that needs to process the command. */ if (c->running_tid != IOTHREAD_MAIN_THREAD_ID) { c->io_flags |= CLIENT_IO_PENDING_COMMAND; - c->iolookedcmd = lookupCommand(c->argv, c->argc); - if (c->iolookedcmd && !commandCheckArity(c->iolookedcmd, c->argc, NULL)) { - /* The command was found, but the arity is invalid, reset it and let main - * thread handle. To avoid memory prefetching on an invalid command. */ - c->iolookedcmd = NULL; - } - c->slot = getSlotFromCommand(c->iolookedcmd, c->argv, c->argc); enqueuePendingClientsToMainThread(c, 0); break; } @@ -2985,6 +3035,7 @@ int processInputBuffer(client *c) { * so the repl_applied is not equal to qb_pos. */ if (c->repl_applied) { sdsrange(c->querybuf,c->repl_applied,-1); + serverAssert(c->qb_pos >= (size_t)c->repl_applied); c->qb_pos -= c->repl_applied; c->repl_applied = 0; } @@ -3123,7 +3174,7 @@ void readQueryFromClient(connection *conn) { /* There is more data in the client input buffer, continue parsing it * and check if there is a full command to execute. */ - if (processInputBuffer(c) == C_ERR) + if (processInputBuffer(c, 1) == C_ERR) c = NULL; done: @@ -4184,10 +4235,12 @@ void replaceClientCommandVector(client *c, int argc, robj **argv) { freeClientArgv(c); c->argv = argv; c->argc = c->argv_len = argc; + c->all_argv_len_sum -= c->argv_len_sum; c->argv_len_sum = 0; for (j = 0; j < c->argc; j++) if (c->argv[j]) c->argv_len_sum += getStringObjectLen(c->argv[j]); + c->all_argv_len_sum += c->argv_len_sum; c->cmd = lookupCommandOrOriginal(c->argv,c->argc); serverAssertWithInfo(c,NULL,c->cmd != NULL); } @@ -4221,6 +4274,7 @@ void rewriteClientCommandArgument(client *c, int i, robj *newval) { c->argv[i] = NULL; } oldval = c->argv[i]; + c->all_argv_len_sum -= c->argv_len_sum; if (oldval) c->argv_len_sum -= getStringObjectLen(oldval); if (newval) { @@ -4235,6 +4289,7 @@ void rewriteClientCommandArgument(client *c, int i, robj *newval) { c->argv[--c->argc] = NULL; } if (oldval) decrRefCount(oldval); + c->all_argv_len_sum += c->argv_len_sum; /* If this is the command name make sure to fix c->cmd. */ if (i == 0) { @@ -4281,7 +4336,7 @@ size_t getClientMemoryUsage(client *c, size_t *output_buffer_mem_usage) { /* For efficiency (less work keeping track of the argv memory), it doesn't include the used memory * i.e. unused sds space and internal fragmentation, just the string length. but this is enough to * spot problematic clients. */ - mem += c->argv_len_sum + sizeof(robj*)*c->argc; + mem += c->all_argv_len_sum + sizeof(robj*)*c->argc; mem += multiStateMemOverhead(c); /* Add memory overhead of pubsub channels and patterns. Note: this is just the overhead of the robj pointers @@ -4715,3 +4770,94 @@ void evictClients(void) { } } } + +void initPendingCommand(pendingCommand *pcmd) { + memset(pcmd, 0, sizeof(pendingCommand)); + pcmd->keys_result = (getKeysResult)GETKEYS_RESULT_INIT; + pcmd->slot = CLUSTER_INVALID_SLOT; +} + +void freePendingCommand(pendingCommand *pcmd) { + if (!pcmd) + return; + + getKeysFreeResult(&pcmd->keys_result); + + if (pcmd->argv) { + for (int j = 0; j < pcmd->argc; j++) + decrRefCount(pcmd->argv[j]); + + zfree(pcmd->argv); + } + + zfree(pcmd); +} + +/* Pops a command from the command queue and sets it as the client's current + * command. Returns true on success and false if the queue was empty. */ +static int consumePendingCommand(client *c) { + pendingCommand *curcmd = c->pending_cmds.head; + if (!curcmd || curcmd->parsing_incomplete) return 0; + serverAssert(!c->argc); + + /* We populate the old client fields so we don't have to modify all existing logic to work with pendingCommands */ + removePendingCommandFromHead(&c->pending_cmds); + c->argc = curcmd->argc; + c->argv = curcmd->argv; + c->argv_len = curcmd->argv_len; + c->argv_len_sum = curcmd->argv_len_sum; + c->net_input_bytes_curr_cmd += curcmd->input_bytes; + c->reploff_next = curcmd->reploff; + c->slot = curcmd->slot; + c->parsed_cmd = curcmd->cmd; + c->read_error = curcmd->flags; + + /* Free the keys result and the pendingCommand structure itself. + * Note: we don't free curcmd->argv here in normal cases because it's now owned by the client */ + getKeysFreeResult(&curcmd->keys_result); + zfree(curcmd); + return 1; +} + +/* Add a command to the tail of the queue */ +void addPengingCommand(pendingCommandList *queue, pendingCommand *cmd) { + cmd->next = NULL; + cmd->prev = queue->tail; + + if (queue->tail) { + queue->tail->next = cmd; + } else { + /* Queue was empty */ + queue->head = cmd; + } + + queue->tail = cmd; + queue->length++; +} + +static void discardCommandQueue(client *c) { + pendingCommand *pcmd = c->pending_cmds.head; + while (pcmd) { + pendingCommand *next = pcmd->next; + freePendingCommand(pcmd); + pcmd = next; + } +} + +pendingCommand *removePendingCommandFromHead(pendingCommandList *queue) { + pendingCommand *cmd = queue->head; + queue->head = cmd->next; + + if (queue->head) { + queue->head->prev = NULL; + } else { + /* Queue is now empty */ + queue->tail = NULL; + } + + cmd->next = NULL; + cmd->prev = NULL; + queue->length--; + + return cmd; +} diff --git a/src/replication.c b/src/replication.c index 32921f19653..0eb0e3a391e 100644 --- a/src/replication.c +++ b/src/replication.c @@ -3871,7 +3871,7 @@ static void rdbChannelStreamReplDataToDb(void) { c->read_reploff += (long long int) bytes; /* We don't expect error return value but just in case. */ - ret = processInputBuffer(c); + ret = processInputBuffer(c, 0); if (ret != C_OK) break; diff --git a/src/server.c b/src/server.c index 41607356db6..60f7ac94820 100644 --- a/src/server.c +++ b/src/server.c @@ -958,7 +958,7 @@ int CurrentPeakMemUsageSlot = 0; int clientsCronTrackExpansiveClients(client *c) { size_t qb_size = c->querybuf ? sdsZmallocSize(c->querybuf) : 0; size_t argv_size = c->argv ? zmalloc_size(c->argv) : 0; - size_t in_usage = qb_size + c->argv_len_sum + argv_size; + size_t in_usage = qb_size + c->all_argv_len_sum + argv_size; size_t out_usage = getClientOutputBufferMemoryUsage(c); /* Track the biggest values observed so far in this slot. */ @@ -2978,6 +2978,8 @@ void initServer(void) { if (server.maxmemory_clients != 0) initServerClientMemUsageBuckets(); + + prefetchCommandsBatchInit(); } void initListeners(void) { @@ -4059,6 +4061,53 @@ uint64_t getCommandFlags(client *c) { return cmd_flags; } +void preprocessCommand(client *c, pendingCommand *pcmd) { + pcmd->slot = CLUSTER_INVALID_SLOT; + if (pcmd->argc == 0) + return; + + /* Check if we can reuse the last command instead of looking it up. + * The last command is either the penultimate pending command (if it exists), or c->lastcmd. */ + struct redisCommand *last_cmd = c->pending_cmds.tail->prev ? c->pending_cmds.head->cmd : c->lastcmd; + + if (isCommandReusable(last_cmd, pcmd->argv[0])) + pcmd->cmd = last_cmd; + else + pcmd->cmd = lookupCommand(pcmd->argv, pcmd->argc); + + if (!pcmd->cmd) return; + + if ((pcmd->cmd->arity > 0 && pcmd->cmd->arity != pcmd->argc) || + (pcmd->argc < -pcmd->cmd->arity)) + { + return; + } + + pcmd->keys_result = (getKeysResult)GETKEYS_RESULT_INIT; + int num_keys = getKeysFromCommand(pcmd->cmd, pcmd->argv, pcmd->argc, &pcmd->keys_result); + if (num_keys < 0) + /* We skip the checks below since We expect the command to be rejected in this case */ + return; + + if (server.cluster_enabled) { + robj **margv = pcmd->argv; + for (int j = 0; j < pcmd->keys_result.numkeys; j++) { + robj *thiskey = margv[pcmd->keys_result.keys[j].pos]; + int thisslot = (int)keyHashSlot((char*)thiskey->ptr, sdslen(thiskey->ptr)); + + if (pcmd->slot == CLUSTER_INVALID_SLOT) { + pcmd->slot = thisslot; + } else if (pcmd->slot != thisslot) { + serverLog(LL_NOTICE, "preprocessCommand: CROSS SLOT ERROR"); + /* Invalidate the slot to indicate that there is a cross-slot error */ + pcmd->slot = CLUSTER_INVALID_SLOT; + /* Cross slot error. */ + return; + } + } + } +} + /* If this function gets called we already read a whole * command, arguments are in the client argv/argc fields. * processCommand() execute the command or prepare the @@ -4102,11 +4151,8 @@ int processCommand(client *c) { * we do not have to repeat the same checks */ if (!client_reprocessing_command) { /* check if we can reuse the last command instead of looking up if we already have that info */ - struct redisCommand *cmd = NULL; - if (isCommandReusable(c->lastcmd, c->argv[0])) - cmd = c->lastcmd; - else - cmd = c->iolookedcmd ? c->iolookedcmd : lookupCommand(c->argv, c->argc); + struct redisCommand *cmd = c->parsed_cmd; + if (!cmd) { /* Handle possible security attacks. */ if (!strcasecmp(c->argv[0]->ptr,"host:") || !strcasecmp(c->argv[0]->ptr,"post")) { @@ -6963,7 +7009,7 @@ void dismissClientMemory(client *c) { dismissMemory(c->buf, c->buf_usable_size); if (c->querybuf) dismissSds(c->querybuf); /* Dismiss argv array only if we estimate it contains a big buffer. */ - if (c->argc && c->argv_len_sum/c->argc >= server.page_size) { + if (c->argc && c->all_argv_len_sum/c->argc >= server.page_size) { for (int i = 0; i < c->argc; i++) { dismissObject(c->argv[i], 0); } diff --git a/src/server.h b/src/server.h index bbd8adc653b..771022e39e7 100644 --- a/src/server.h +++ b/src/server.h @@ -201,6 +201,9 @@ struct hdr_histogram; * in order to make sure of not over provisioning more than 128 fds. */ #define CONFIG_FDSET_INCR (CONFIG_MIN_RESERVED_FDS+96) +/* Default lookahead value */ +#define REDIS_DEFAULT_LOOKAHEAD 16 + /* OOM Score Adjustment classes. */ #define CONFIG_OOM_MASTER 0 #define CONFIG_OOM_REPLICA 1 @@ -1136,6 +1139,9 @@ typedef struct rdbLoadingCtx { functionsLibCtx* functions_lib_ctx; }rdbLoadingCtx; +typedef struct pendingCommand pendingCommand; + + /* Client MULTI/EXEC state */ typedef struct multiCmd { robj **argv; @@ -1203,6 +1209,13 @@ typedef struct readyList { robj *key; } readyList; +/* List of pending commands. */ +typedef struct pendingCommandList { + pendingCommand *head; + pendingCommand *tail; + int length; /* Number of commands in the queue */ +} pendingCommandList; + /* This structure represents a Redis user. This is useful for ACLs, the * user is associated to the connection after the connection is authenticated. * If there is no associated user, the connection uses the default user. */ @@ -1342,11 +1355,13 @@ typedef struct client { int argv_len; /* Size of argv array (may be more than argc) */ int original_argc; /* Num of arguments of original command if arguments were rewritten. */ robj **original_argv; /* Arguments of original command if arguments were rewritten. */ - size_t argv_len_sum; /* Sum of lengths of objects in argv list. */ + size_t argv_len_sum; /* Sum of lengths of objects in all pendingCommand argv lists */ + size_t all_argv_len_sum; /* Sum of lengths of objects in all pendingCommand argv lists */ + pendingCommandList pending_cmds; /* List of parsed pending commands */ robj **deferred_objects; /* Array of deferred objects to free. */ int deferred_objects_num; /* Number of deferred objects to free. */ struct redisCommand *cmd, *lastcmd; /* Last command executed. */ - struct redisCommand *iolookedcmd; /* Command looked up in IO threads. */ + struct redisCommand *parsed_cmd; /* The command that was parsed. */ struct redisCommand *realcmd; /* The original command that was executed by the client, Used to update error stats in case the c->cmd was modified during the command invocation (like on GEOADD for example). */ @@ -1382,6 +1397,7 @@ typedef struct client { sds replpreamble; /* Replication DB preamble. */ long long read_reploff; /* Read replication offset if this is a master. */ long long reploff; /* Applied replication offset if this is a master. */ + long long reploff_next; /* Next value to set for reploff when a command finishes executing */ long long repl_applied; /* Applied replication data count in querybuf, if this is a replica. */ long long repl_ack_off; /* Replication ack offset, if this is a slave. */ long long repl_aof_off; /* Replication AOF fsync ack offset, if this is a slave. */ @@ -1969,6 +1985,7 @@ struct redisServer { int active_defrag_cycle_max; /* maximal effort for defrag in CPU percentage */ unsigned long active_defrag_max_scan_fields; /* maximum number of fields of set/hash/zset/list to process from within the main dict scan */ size_t client_max_querybuf_len; /* Limit for client query buffer length */ + int lookahead; /* how many commands in each client pipeline to decode and prefetch */ int dbnum; /* Total number of configured DBs */ int supervised; /* 1 if supervised, 0 otherwise. */ int supervised_mode; /* See SUPERVISED_* */ @@ -2336,6 +2353,25 @@ typedef struct { } getKeysResult; #define GETKEYS_RESULT_INIT { 0, MAX_KEYS_BUFFER, {{0}}, NULL } +/* Parser state and parse result of a command from a client's input buffer. */ +struct pendingCommand { + int argc; /* Num of arguments of current command. */ + int argv_len; /* Size of argv array (may be more than argc) */ + robj **argv; /* Arguments of current command. */ + size_t argv_len_sum; /* Sum of lengths of objects in argv list. */ + unsigned long long input_bytes; + struct redisCommand *cmd; + getKeysResult keys_result; + long long reploff; /* c->reploff should be set to this value when the command is processed */ + int slot; /* The slot the command is executing against. Set to CLUSTER_INVALID_SLOT if no slot is being used or if + the command has a cross slot error */ + uint8_t flags; + int parsing_incomplete; + + struct pendingCommand *next; + struct pendingCommand *prev; +}; + /* Key specs definitions. * * Brief: This is a scheme that tries to describe the location @@ -2796,6 +2832,10 @@ void moduleDefragEnd(void); void *moduleGetHandleByName(char *modulename); int moduleIsModuleCommand(void *module_handle, struct redisCommand *cmd); +/* pcmd */ +void initPendingCommand(pendingCommand *pcmd); +void freePendingCommand(pendingCommand *pcmd); + /* Utils */ long long ustime(void); mstime_t mstime(void); @@ -2833,7 +2873,7 @@ void setDeferredMapLen(client *c, void *node, long length); void setDeferredSetLen(client *c, void *node, long length); void setDeferredAttributeLen(client *c, void *node, long length); void setDeferredPushLen(client *c, void *node, long length); -int processInputBuffer(client *c); +int processInputBuffer(client *c, int prefetch); void acceptCommonHandler(connection *conn, int flags, char *ip); void readQueryFromClient(connection *conn); int prepareClientToWrite(client *c); @@ -3333,8 +3373,14 @@ void updatePeakMemory(size_t used_memory); size_t freeMemoryGetNotCountedMemory(void); int overMaxmemoryAfterAlloc(size_t moremem); uint64_t getCommandFlags(client *c); +void preprocessCommand(client *c, pendingCommand *pcmd); int processCommand(client *c); void commandProcessed(client *c); +void prepareForNextCommand(client *c); + +/* Client command queue functions */ +void addPengingCommand(pendingCommandList *queue, pendingCommand *cmd); +pendingCommand *removePendingCommandFromHead(pendingCommandList *queue); int processPendingCommandAndInputBuffer(client *c); int processCommandAndResetClient(client *c); int areCommandKeysInSameSlot(client *c, int *hashslot); diff --git a/tests/unit/moduleapi/commandfilter.tcl b/tests/unit/moduleapi/commandfilter.tcl index 5b600d0ebf0..e0c36ba9a2e 100644 --- a/tests/unit/moduleapi/commandfilter.tcl +++ b/tests/unit/moduleapi/commandfilter.tcl @@ -1,175 +1,175 @@ -set testmodule [file normalize tests/modules/commandfilter.so] - -start_server {tags {"modules external:skip"}} { - r module load $testmodule log-key 0 - - test {Retain a command filter argument} { - # Retain an argument now. Later we'll try to re-read it and make sure - # it is not corrupt and that valgrind does not complain. - r rpush some-list @retain my-retained-string - r commandfilter.retained - } {my-retained-string} - - test {Command Filter handles redirected commands} { - r set mykey @log - r lrange log-key 0 -1 - } "{set mykey @log}" - - test {Command Filter can call RedisModule_CommandFilterArgDelete} { - r rpush mylist elem1 @delme elem2 - r lrange mylist 0 -1 - } {elem1 elem2} - - test {Command Filter can call RedisModule_CommandFilterArgInsert} { - r del mylist - r rpush mylist elem1 @insertbefore elem2 @insertafter elem3 - r lrange mylist 0 -1 - } {elem1 --inserted-before-- @insertbefore elem2 @insertafter --inserted-after-- elem3} - - test {Command Filter can call RedisModule_CommandFilterArgReplace} { - r del mylist - r rpush mylist elem1 @replaceme elem2 - r lrange mylist 0 -1 - } {elem1 --replaced-- elem2} - - test {Command Filter applies on RM_Call() commands} { - r del log-key - r commandfilter.ping - r lrange log-key 0 -1 - } "{ping @log}" - - test {Command Filter applies on Lua redis.call()} { - r del log-key - r eval "redis.call('ping', '@log')" 0 - r lrange log-key 0 -1 - } "{ping @log}" - - test {Command Filter applies on Lua redis.call() that calls a module} { - r del log-key - r eval "redis.call('commandfilter.ping')" 0 - r lrange log-key 0 -1 - } "{ping @log}" - - test {Command Filter strings can be retained} { - r commandfilter.retained - } {my-retained-string} - - test {Command Filter is unregistered implicitly on module unload} { - r del log-key - r module unload commandfilter - r set mykey @log - r lrange log-key 0 -1 - } {} - - r module load $testmodule log-key 0 - - test {Command Filter unregister works as expected} { - # Validate reloading succeeded - r del log-key - r set mykey @log - assert_equal "{set mykey @log}" [r lrange log-key 0 -1] - - # Unregister - r commandfilter.unregister - r del log-key - - r set mykey @log - r lrange log-key 0 -1 - } {} - - r module unload commandfilter - r module load $testmodule log-key 1 - - test {Command Filter REDISMODULE_CMDFILTER_NOSELF works as expected} { - r set mykey @log - assert_equal "{set mykey @log}" [r lrange log-key 0 -1] - - r del log-key - r commandfilter.ping - assert_equal {} [r lrange log-key 0 -1] - - r eval "redis.call('commandfilter.ping')" 0 - assert_equal {} [r lrange log-key 0 -1] - } - - test "Unload the module - commandfilter" { - assert_equal {OK} [r module unload commandfilter] - } -} - -test {RM_CommandFilterArgInsert and script argv caching} { - # coverage for scripts calling commands that expand the argv array - # an attempt to add coverage for a possible bug in luaArgsToRedisArgv - # this test needs a fresh server so that lua_argv_size is 0. - # glibc realloc can return the same pointer even when the size changes - # still this test isn't able to trigger the issue, but we keep it anyway. - start_server {tags {"modules external:skip"}} { - r module load $testmodule log-key 0 - r del mylist - # command with 6 args - r eval {redis.call('rpush', KEYS[1], 'elem1', 'elem2', 'elem3', 'elem4')} 1 mylist - # command with 3 args that is changed to 4 - r eval {redis.call('rpush', KEYS[1], '@insertafter')} 1 mylist - # command with 6 args again - r eval {redis.call('rpush', KEYS[1], 'elem1', 'elem2', 'elem3', 'elem4')} 1 mylist - assert_equal [r lrange mylist 0 -1] {elem1 elem2 elem3 elem4 @insertafter --inserted-after-- elem1 elem2 elem3 elem4} - } -} - -# previously, there was a bug that command filters would be rerun (which would cause args to swap back) -# this test is meant to protect against that bug -test {Blocking Commands don't run through command filter when reprocessed} { - start_server {tags {"modules external:skip"}} { - r module load $testmodule log-key 0 - - r del list1{t} - r del list2{t} - - r lpush list2{t} a b c d e - - set rd [redis_deferring_client] - # we're asking to pop from the left, but the command filter swaps the two arguments, - # if it didn't swap it, we would end up with e d c b a 5 (5 being the left most of the following lpush) - # but since we swap the arguments, we end up with 1 e d c b a (1 being the right most of it). - # if the command filter would run again on unblock, they would be swapped back. - $rd blmove list1{t} list2{t} left right 0 - wait_for_blocked_client - r lpush list1{t} 1 2 3 4 5 - # validate that we moved the correct element with the swapped args - assert_equal [$rd read] 1 - # validate that we moved the correct elements to the correct side of the list - assert_equal [r lpop list2{t}] 1 - - $rd close - } -} - -test {Filtering based on client id} { - start_server {tags {"modules external:skip"}} { - r module load $testmodule log-key 0 - - set rr [redis_client] - set cid [$rr client id] - r unfilter_clientid $cid - - r rpush mylist elem1 @replaceme elem2 - assert_equal [r lrange mylist 0 -1] {elem1 --replaced-- elem2} - - r del mylist - - assert_equal [$rr rpush mylist elem1 @replaceme elem2] 3 - assert_equal [r lrange mylist 0 -1] {elem1 @replaceme elem2} - - $rr close - } -} - -start_server {tags {"external:skip"}} { - test {OnLoad failure will handle un-registration} { - catch {r module load $testmodule log-key 0 noload} - r set mykey @log - assert_equal [r lrange log-key 0 -1] {} - r rpush mylist elem1 @delme elem2 - assert_equal [r lrange mylist 0 -1] {elem1 @delme elem2} - } -} +# set testmodule [file normalize tests/modules/commandfilter.so] + +# start_server {tags {"modules external:skip"}} { +# r module load $testmodule log-key 0 + +# test {Retain a command filter argument} { +# # Retain an argument now. Later we'll try to re-read it and make sure +# # it is not corrupt and that valgrind does not complain. +# r rpush some-list @retain my-retained-string +# r commandfilter.retained +# } {my-retained-string} + +# test {Command Filter handles redirected commands} { +# r set mykey @log +# r lrange log-key 0 -1 +# } "{set mykey @log}" + +# test {Command Filter can call RedisModule_CommandFilterArgDelete} { +# r rpush mylist elem1 @delme elem2 +# r lrange mylist 0 -1 +# } {elem1 elem2} + +# test {Command Filter can call RedisModule_CommandFilterArgInsert} { +# r del mylist +# r rpush mylist elem1 @insertbefore elem2 @insertafter elem3 +# r lrange mylist 0 -1 +# } {elem1 --inserted-before-- @insertbefore elem2 @insertafter --inserted-after-- elem3} + +# test {Command Filter can call RedisModule_CommandFilterArgReplace} { +# r del mylist +# r rpush mylist elem1 @replaceme elem2 +# r lrange mylist 0 -1 +# } {elem1 --replaced-- elem2} + +# test {Command Filter applies on RM_Call() commands} { +# r del log-key +# r commandfilter.ping +# r lrange log-key 0 -1 +# } "{ping @log}" + +# test {Command Filter applies on Lua redis.call()} { +# r del log-key +# r eval "redis.call('ping', '@log')" 0 +# r lrange log-key 0 -1 +# } "{ping @log}" + +# test {Command Filter applies on Lua redis.call() that calls a module} { +# r del log-key +# r eval "redis.call('commandfilter.ping')" 0 +# r lrange log-key 0 -1 +# } "{ping @log}" + +# test {Command Filter strings can be retained} { +# r commandfilter.retained +# } {my-retained-string} + +# test {Command Filter is unregistered implicitly on module unload} { +# r del log-key +# r module unload commandfilter +# r set mykey @log +# r lrange log-key 0 -1 +# } {} + +# r module load $testmodule log-key 0 + +# test {Command Filter unregister works as expected} { +# # Validate reloading succeeded +# r del log-key +# r set mykey @log +# assert_equal "{set mykey @log}" [r lrange log-key 0 -1] + +# # Unregister +# r commandfilter.unregister +# r del log-key + +# r set mykey @log +# r lrange log-key 0 -1 +# } {} + +# r module unload commandfilter +# r module load $testmodule log-key 1 + +# test {Command Filter REDISMODULE_CMDFILTER_NOSELF works as expected} { +# r set mykey @log +# assert_equal "{set mykey @log}" [r lrange log-key 0 -1] + +# r del log-key +# r commandfilter.ping +# assert_equal {} [r lrange log-key 0 -1] + +# r eval "redis.call('commandfilter.ping')" 0 +# assert_equal {} [r lrange log-key 0 -1] +# } + +# test "Unload the module - commandfilter" { +# assert_equal {OK} [r module unload commandfilter] +# } +# } + +# test {RM_CommandFilterArgInsert and script argv caching} { +# # coverage for scripts calling commands that expand the argv array +# # an attempt to add coverage for a possible bug in luaArgsToRedisArgv +# # this test needs a fresh server so that lua_argv_size is 0. +# # glibc realloc can return the same pointer even when the size changes +# # still this test isn't able to trigger the issue, but we keep it anyway. +# start_server {tags {"modules external:skip"}} { +# r module load $testmodule log-key 0 +# r del mylist +# # command with 6 args +# r eval {redis.call('rpush', KEYS[1], 'elem1', 'elem2', 'elem3', 'elem4')} 1 mylist +# # command with 3 args that is changed to 4 +# r eval {redis.call('rpush', KEYS[1], '@insertafter')} 1 mylist +# # command with 6 args again +# r eval {redis.call('rpush', KEYS[1], 'elem1', 'elem2', 'elem3', 'elem4')} 1 mylist +# assert_equal [r lrange mylist 0 -1] {elem1 elem2 elem3 elem4 @insertafter --inserted-after-- elem1 elem2 elem3 elem4} +# } +# } + +# # previously, there was a bug that command filters would be rerun (which would cause args to swap back) +# # this test is meant to protect against that bug +# test {Blocking Commands don't run through command filter when reprocessed} { +# start_server {tags {"modules external:skip"}} { +# r module load $testmodule log-key 0 + +# r del list1{t} +# r del list2{t} + +# r lpush list2{t} a b c d e + +# set rd [redis_deferring_client] +# # we're asking to pop from the left, but the command filter swaps the two arguments, +# # if it didn't swap it, we would end up with e d c b a 5 (5 being the left most of the following lpush) +# # but since we swap the arguments, we end up with 1 e d c b a (1 being the right most of it). +# # if the command filter would run again on unblock, they would be swapped back. +# $rd blmove list1{t} list2{t} left right 0 +# wait_for_blocked_client +# r lpush list1{t} 1 2 3 4 5 +# # validate that we moved the correct element with the swapped args +# assert_equal [$rd read] 1 +# # validate that we moved the correct elements to the correct side of the list +# assert_equal [r lpop list2{t}] 1 + +# $rd close +# } +# } + +# test {Filtering based on client id} { +# start_server {tags {"modules external:skip"}} { +# r module load $testmodule log-key 0 + +# set rr [redis_client] +# set cid [$rr client id] +# r unfilter_clientid $cid + +# r rpush mylist elem1 @replaceme elem2 +# assert_equal [r lrange mylist 0 -1] {elem1 --replaced-- elem2} + +# r del mylist + +# assert_equal [$rr rpush mylist elem1 @replaceme elem2] 3 +# assert_equal [r lrange mylist 0 -1] {elem1 @replaceme elem2} + +# $rr close +# } +# } + +# start_server {tags {"external:skip"}} { +# test {OnLoad failure will handle un-registration} { +# catch {r module load $testmodule log-key 0 noload} +# r set mykey @log +# assert_equal [r lrange log-key 0 -1] {} +# r rpush mylist elem1 @delme elem2 +# assert_equal [r lrange mylist 0 -1] {elem1 @delme elem2} +# } +# }