diff --git a/notes/redis.md b/notes/redis.md index 87bceb65f..f3cf59687 100644 --- a/notes/redis.md +++ b/notes/redis.md @@ -15,7 +15,7 @@ +-------------------+------------+---------------------------------------------------------------------------------------------------------------------+ | EXPIREAT | Yes | EXPIREAT key timestamp | +-------------------+------------+---------------------------------------------------------------------------------------------------------------------+ - | KEYS | Yes* | KEYS pattern | + | KEYS | Yes* | KEYS pattern | +-------------------+------------+---------------------------------------------------------------------------------------------------------------------+ | MIGRATE | No | MIGRATE host port key destination-db timeout | +-------------------+------------+---------------------------------------------------------------------------------------------------------------------+ @@ -267,6 +267,7 @@ ** SSCAN scans only sorted sets in the local node. ## HyperLogLog + +-------------------+------------+---------------------------------------------------------------------------------------------------------------------+ | Command | Supported? | Format | +-------------------+------------+---------------------------------------------------------------------------------------------------------------------+ @@ -277,6 +278,37 @@ | PFMERGE | No | PFMERGE destkey sourcekey [sourcekey ...] | +-------------------+------------+---------------------------------------------------------------------------------------------------------------------+ +### Streams + + +-------------------+------------+---------------------------------------------------------------------------------------------------------------------+ + | Command | Supported? | Format | + +-------------------+------------+---------------------------------------------------------------------------------------------------------------------+ + | XACK | Yes | XACK key group ID [ID ...] | + +-------------------+------------+---------------------------------------------------------------------------------------------------------------------+ + | XADD | Yes | XADD key ID field string [field string ...] | + +-------------------+------------+---------------------------------------------------------------------------------------------------------------------+ + | XCLAIM | Yes | XCLAIM key group consumer min-idle-time ID [ID ...] [IDLE ms] [TIME ms-unix-time] [RETRYCOUNT count] [force] [id] | + +-------------------+------------+---------------------------------------------------------------------------------------------------------------------+ + | XDEL | Yes | XDEL key ID [ID ...] | + +-------------------+------------+---------------------------------------------------------------------------------------------------------------------+ + | XGROUP | Yes | XGROUP [CREATE key groupname id-or-$] [SETID key id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname cname] | + +-------------------+------------+---------------------------------------------------------------------------------------------------------------------+ + | XINFO | Yes | XINFO [CONSUMERS key groupname] [GROUPS key] [STREAM key] [HELP] | + +-------------------+------------+---------------------------------------------------------------------------------------------------------------------+ + | XLEN | Yes | XLEN key | + +-------------------+------------+---------------------------------------------------------------------------------------------------------------------+ + | XPENDING | Yes | XPENDING key group [start end count] [consumer] | + +-------------------+------------+---------------------------------------------------------------------------------------------------------------------+ + | XRANGE | Yes | XRANGE key start end [COUNT count] | + +-------------------+------------+---------------------------------------------------------------------------------------------------------------------+ + | XREAD | Yes | XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...] | + +-------------------+------------+---------------------------------------------------------------------------------------------------------------------+ + | XREADGROUP | No | XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...] | + +-------------------+------------+---------------------------------------------------------------------------------------------------------------------+ + | XREVRANGE | Yes | XREVRANGE key end start [COUNT count] | + +-------------------+------------+---------------------------------------------------------------------------------------------------------------------+ + | XTRIM | Yes | XTRIM key MAXLEN [~] count | + +-------------------+------------+---------------------------------------------------------------------------------------------------------------------+ ### Pub/Sub @@ -402,7 +434,6 @@ * INFO reads only the local node. - ## Note - redis commands are not case sensitive diff --git a/src/dyn_message.c b/src/dyn_message.c index 335a436aa..276ce4598 100644 --- a/src/dyn_message.c +++ b/src/dyn_message.c @@ -376,6 +376,8 @@ static struct msg *_msg_get(struct conn *conn, bool request, msg->ntokens = 0; msg->rntokens = 0; msg->nkeys = 0; + memset(msg->stack, 0, sizeof(msg->stack)); + msg->nested_depth = 0; msg->rlen = 0; msg->integer = 0; @@ -390,6 +392,7 @@ static struct msg *_msg_get(struct conn *conn, bool request, msg->swallow = 0; msg->dnode_header_prepended = 0; msg->rsp_sent = 0; + msg->require_subcommand = 0; // dynomite msg->is_read = 1; diff --git a/src/dyn_message.h b/src/dyn_message.h index 2b58884db..f8f78e702 100644 --- a/src/dyn_message.h +++ b/src/dyn_message.h @@ -39,6 +39,8 @@ #define MAX_ALLOWABLE_PROCESSED_MSGS 500 +#define MAX_BULK_REPLY_DEPTH 6 + typedef void (*func_msg_parse_t)(struct msg *, const struct string *hash_tag); typedef rstatus_t (*func_msg_fragment_t)(struct msg *, struct server_pool *, struct rack *, struct msg_tqh *); @@ -152,6 +154,7 @@ typedef enum msg_parse_result { ACTION(REQ_REDIS_HSTRLEN) \ ACTION(REQ_REDIS_KEYS) \ ACTION(REQ_REDIS_INFO) \ + ACTION(REQ_REDIS_DBSIZE) \ ACTION(REQ_REDIS_LINDEX) /* redis requests - lists */ \ ACTION(REQ_REDIS_LINSERT) \ ACTION(REQ_REDIS_LLEN) \ @@ -224,6 +227,30 @@ typedef enum msg_parse_result { ACTION(REQ_REDIS_JSONARRLEN) \ ACTION(REQ_REDIS_JSONOBJKEYS) \ ACTION(REQ_REDIS_JSONOBJLEN) \ + ACTION(REQ_REDIS_XACK) /* redis requests - streams */ \ + ACTION(REQ_REDIS_XADD) \ + ACTION(REQ_REDIS_XCLAIM) \ + ACTION(REQ_REDIS_XDEL) \ + ACTION(REQ_REDIS_XGROUP) \ + ACTION(REQ_REDIS_XGROUP_CREATE) \ + ACTION(REQ_REDIS_XGROUP_DELCONSUMER) \ + ACTION(REQ_REDIS_XGROUP_DESTROY) \ + ACTION(REQ_REDIS_XGROUP_HELP) \ + ACTION(REQ_REDIS_XGROUP_SETID) \ + ACTION(REQ_REDIS_XINFO) \ + ACTION(REQ_REDIS_XINFO_CONSUMERS) \ + ACTION(REQ_REDIS_XINFO_GROUPS) \ + ACTION(REQ_REDIS_XINFO_HELP) \ + ACTION(REQ_REDIS_XINFO_STREAM) \ + ACTION(REQ_REDIS_XLEN) \ + ACTION(REQ_REDIS_XPENDING) \ + ACTION(REQ_REDIS_XRANGE) \ + ACTION(REQ_REDIS_XREAD) \ + ACTION(REQ_REDIS_XREAD_STREAMS) \ + ACTION(REQ_REDIS_XREADGROUP) \ + ACTION(REQ_REDIS_XREADGROUP_GROUP) \ + ACTION(REQ_REDIS_XREVRANGE) \ + ACTION(REQ_REDIS_XTRIM) \ /* ACTION(REQ_REDIS_AUTH) */ \ /* ACTION(REQ_REDIS_SELECT)*/ /* only during init */ \ ACTION(REQ_REDIS_PFADD) /* redis requests - hyperloglog */ \ @@ -253,6 +280,7 @@ typedef enum msg_parse_result { ACTION(REQ_REDIS_SCRIPT_EXISTS) \ ACTION(REQ_REDIS_SCRIPT_FLUSH) \ ACTION(REQ_REDIS_SCRIPT_KILL) \ + ACTION(REQ_REDIS_SCRIPT_HELP) \ /* ACTION( REQ_REDIS_AUTH) */ \ /* ACTION( REQ_REDIS_SELECT)*/ /* only during init */ @@ -423,13 +451,15 @@ struct msg { uint32_t vlen; /* value length (memcache) */ uint8_t *end; /* end marker (memcache) */ - uint8_t *ntoken_start; /* ntoken start (redis) */ - uint8_t *ntoken_end; /* ntoken end (redis) */ - uint32_t ntokens; /* # tokens (redis) */ - uint32_t nkeys; /* # keys in script (redis EVAL/EVALSHA) */ - uint32_t rntokens; /* running # tokens used by parsing fsa (redis) */ - uint32_t rlen; /* running length in parsing fsa (redis) */ - uint32_t integer; /* integer reply value (redis) */ + uint8_t *ntoken_start; /* ntoken start (redis) */ + uint8_t *ntoken_end; /* ntoken end (redis) */ + uint32_t ntokens; /* # tokens (redis) */ + uint32_t nkeys; /* # keys in script (redis EVAL/EVALSHA) or parameters for XREAD */ + uint32_t rntokens; /* running # tokens used by parsing fsa (redis) */ + uint32_t rlen; /* running length in parsing fsa (redis) */ + uint32_t stack[MAX_BULK_REPLY_DEPTH]; /* stack to save rntokens of nesting multibulks */ + uint8_t nested_depth; /* the depth of the current nested multibulk */ + uint32_t integer; /* integer reply value (redis) */ struct msg *frag_owner; /* owner of fragment message */ uint32_t nfrag; /* # fragment */ @@ -438,6 +468,7 @@ struct msg { struct msg * *frag_seq; /* sequence of fragment message, map from keys to fragments*/ + msg_type_t require_subcommand; /* command with sub-cmd like SCRIPT LOAD */ err_t error_code; /* errno on error? */ unsigned is_error : 1; /* error? */ unsigned is_ferror : 1; /* one or more fragments are in error? */ diff --git a/src/proto/dyn_redis.c b/src/proto/dyn_redis.c index 9726607a6..c5f193810 100644 --- a/src/proto/dyn_redis.c +++ b/src/proto/dyn_redis.c @@ -44,8 +44,12 @@ static bool redis_argz(struct msg *r) { switch (r->type) { case MSG_REQ_REDIS_PING: case MSG_REQ_REDIS_QUIT: + case MSG_REQ_REDIS_DBSIZE: case MSG_REQ_REDIS_SCRIPT_FLUSH: case MSG_REQ_REDIS_SCRIPT_KILL: + case MSG_REQ_REDIS_SCRIPT_HELP: + case MSG_REQ_REDIS_XINFO_HELP: + case MSG_REQ_REDIS_XGROUP_HELP: return true; default: @@ -87,6 +91,10 @@ static bool redis_arg0(struct msg *r) { case MSG_REQ_REDIS_ZCARD: + case MSG_REQ_REDIS_XLEN: + case MSG_REQ_REDIS_XINFO_STREAM: + case MSG_REQ_REDIS_XINFO_GROUPS: + case MSG_REQ_REDIS_KEYS: case MSG_REQ_REDIS_PFCOUNT: return true; @@ -135,6 +143,9 @@ static bool redis_arg1(struct msg *r) { case MSG_REQ_REDIS_SCRIPT_LOAD: case MSG_REQ_REDIS_SCRIPT_EXISTS: + case MSG_REQ_REDIS_XINFO_CONSUMERS: + case MSG_REQ_REDIS_XGROUP_DESTROY: + return true; default: @@ -153,6 +164,7 @@ static bool redis_arg_upto1(struct msg *r) { } return false; } + /* * Return true, if the redis command accepts exactly 2 arguments, otherwise * return false @@ -186,6 +198,9 @@ static bool redis_arg2(struct msg *r) { case MSG_REQ_REDIS_RESTORE: + case MSG_REQ_REDIS_XGROUP_DELCONSUMER: + case MSG_REQ_REDIS_XGROUP_SETID: + return true; default: @@ -261,6 +276,16 @@ static bool redis_argn(struct msg *r) { case MSG_REQ_REDIS_GEOPOS: case MSG_REQ_REDIS_GEORADIUSBYMEMBER: + case MSG_REQ_REDIS_XACK: + case MSG_REQ_REDIS_XADD: + case MSG_REQ_REDIS_XCLAIM: + case MSG_REQ_REDIS_XDEL: + case MSG_REQ_REDIS_XPENDING: + case MSG_REQ_REDIS_XRANGE: + case MSG_REQ_REDIS_XREVRANGE: + case MSG_REQ_REDIS_XTRIM: + case MSG_REQ_REDIS_XGROUP_CREATE: + case MSG_REQ_REDIS_JSONSET: case MSG_REQ_REDIS_JSONGET: case MSG_REQ_REDIS_JSONDEL: @@ -271,6 +296,7 @@ static bool redis_argn(struct msg *r) { case MSG_REQ_REDIS_JSONARRLEN: case MSG_REQ_REDIS_JSONOBJKEYS: case MSG_REQ_REDIS_JSONOBJLEN: + return true; default: @@ -333,6 +359,45 @@ static bool redis_argeval(struct msg *r) { return false; } +static bool redis_argxread(struct msg *r) { + switch (r->type) { + case MSG_REQ_REDIS_XREAD: + return true; + + default: + break; + } + + return false; +} + +static bool redis_argxreadgroup(struct msg *r) { + switch (r->type) { + case MSG_REQ_REDIS_XREADGROUP_GROUP: + return true; + + default: + break; + } + + return false; +} + +static bool redis_has_subcommand(struct msg *r) { + switch (r->type) { + case MSG_REQ_REDIS_XINFO: + case MSG_REQ_REDIS_XGROUP: + case MSG_REQ_REDIS_XREADGROUP: + case MSG_REQ_REDIS_SCRIPT: + return true; + + default: + break; + } + + return false; +} + /* * Return true, if the redis response is an error response i.e. a simple * string whose first character is '-', otherwise return false. @@ -534,7 +599,7 @@ void redis_parse_req(struct msg *r, const struct string *hash_tag) { state = r->state; b = STAILQ_LAST(&r->mhdr, mbuf, next); - + ASSERT(r->is_request); ASSERT(state >= SW_START && state < SW_SENTINEL); ASSERT(b != NULL); @@ -872,6 +937,7 @@ void redis_parse_req(struct msg *r, const struct string *hash_tag) { if (str4icmp(m, 'l', 'o', 'a', 'd')) { // A command called 'LOAD' does not exist. This is the second half of the // command 'SCRIPT LOAD'. + ASSERT(r->type == MSG_REQ_REDIS_SCRIPT); r->type = MSG_REQ_REDIS_SCRIPT_LOAD; r->msg_routing = ROUTING_ALL_NODES_ALL_RACKS_ALL_DCS; @@ -888,6 +954,56 @@ void redis_parse_req(struct msg *r, const struct string *hash_tag) { break; } + if (str4icmp(m, 'x', 'a', 'd', 'd')) { + r->type = MSG_REQ_REDIS_XADD; + r->is_read = 0; + break; + } + + if (str4icmp(m, 'x', 'a', 'c', 'k')) { + r->type = MSG_REQ_REDIS_XACK; + r->is_read = 0; + break; + } + + if (str4icmp(m, 'x', 'd', 'e', 'l')) { + r->type = MSG_REQ_REDIS_XDEL; + r->is_read = 0; + break; + } + + if (str4icmp(m, 'x', 'l', 'e', 'n')) { + r->type = MSG_REQ_REDIS_XLEN; + r->is_read = 1; + break; + } + + if (str4icmp(m, 'h', 'e', 'l', 'p') && r->require_subcommand == MSG_REQ_REDIS_SCRIPT) { + // This is the second half of the command 'XINFO HELP'. + ASSERT(r->type == MSG_REQ_REDIS_SCRIPT); + r->type = MSG_REQ_REDIS_SCRIPT_HELP; + r->msg_routing = ROUTING_LOCAL_NODE_ONLY; + r->is_read = 1; + break; + } + + if (str4icmp(m, 'h', 'e', 'l', 'p') && r->require_subcommand == MSG_REQ_REDIS_XINFO) { + // This is the second half of the command 'XINFO HELP'. + ASSERT(r->type == MSG_REQ_REDIS_XINFO); + r->type = MSG_REQ_REDIS_XINFO_HELP; + r->msg_routing = ROUTING_LOCAL_NODE_ONLY; + r->is_read = 1; + break; + } + + if (str4icmp(m, 'h', 'e', 'l', 'p') && r->require_subcommand == MSG_REQ_REDIS_XGROUP) { + // This is the second half of the command 'XGROUP HELP'. + ASSERT(r->type == MSG_REQ_REDIS_XGROUP); + r->type = MSG_REQ_REDIS_XGROUP_HELP; + r->msg_routing = ROUTING_LOCAL_NODE_ONLY; + r->is_read = 1; + break; + } break; @@ -998,11 +1114,13 @@ void redis_parse_req(struct msg *r, const struct string *hash_tag) { r->is_read = 1; break; } + if (str5icmp(m, 'p', 'f', 'a', 'd', 'd')) { r->type = MSG_REQ_REDIS_PFADD; r->is_read = 0; break; } + if (str5icmp(m, 'f', 'l', 'u', 's', 'h')) { // A command called 'FLUSH' does not exist. This is the second half of the // command 'SCRIPT FLUSH'. @@ -1013,6 +1131,41 @@ void redis_parse_req(struct msg *r, const struct string *hash_tag) { break; } + if (str5icmp(m, 'x', 'i', 'n', 'f', 'o')) { + r->type = MSG_REQ_REDIS_XINFO; + r->is_read = 1; + r->require_subcommand = r->type; + break; + } + + if (str5icmp(m, 'x', 'r', 'e', 'a', 'd')) { + r->type = MSG_REQ_REDIS_XREAD; + r->is_read = 1; + break; + } + + if (str5icmp(m, 'x', 't', 'r', 'i', 'm')) { + r->type = MSG_REQ_REDIS_XTRIM; + r->is_read = 1; + break; + } + + if (str5icmp(m, 's', 'e', 't', 'i', 'd') && r->require_subcommand == MSG_REQ_REDIS_XGROUP) { + // This is the second half of the command 'XGROUP SETID'. + ASSERT(r->type == MSG_REQ_REDIS_XGROUP); + r->type = MSG_REQ_REDIS_XGROUP_SETID; + r->is_read = 0; + break; + } + + if (str5icmp(m, 'g', 'r', 'o', 'u', 'p') && r->require_subcommand == MSG_REQ_REDIS_XREADGROUP) { + // This is the second half of the command 'XREADGROUP GROUP'. + ASSERT(r->type == MSG_REQ_REDIS_XREADGROUP); + r->type = MSG_REQ_REDIS_XREADGROUP_GROUP; + r->is_read = 0; + break; + } + break; case 6: @@ -1034,6 +1187,7 @@ void redis_parse_req(struct msg *r, const struct string *hash_tag) { r->is_read = 1; break; } + if (str6icmp(m, 'e', 'x', 'i', 's', 't', 's') && r->type == MSG_REQ_REDIS_SCRIPT) { // This is not to be confused with 'EXISTS'. This is the second half of the @@ -1043,7 +1197,6 @@ void redis_parse_req(struct msg *r, const struct string *hash_tag) { break; } - if (str6icmp(m, 'e', 'x', 'p', 'i', 'r', 'e')) { r->type = MSG_REQ_REDIS_EXPIRE; r->is_read = 0; @@ -1152,26 +1305,82 @@ void redis_parse_req(struct msg *r, const struct string *hash_tag) { r->is_read = 1; break; } + if (str6icmp(m, 'g', 'e', 'o', 'a', 'd', 'd')) { r->type = MSG_REQ_REDIS_GEOADD; r->is_read = 0; break; } + if (str6icmp(m, 'g', 'e', 'o', 'p', 'o', 's')) { r->type = MSG_REQ_REDIS_GEOPOS; r->is_read = 1; break; } + if (str6icmp(m, 'u', 'n', 'l', 'i', 'n', 'k')) { r->type = MSG_REQ_REDIS_UNLINK; r->is_read = 0; break; } + if (str6icmp(m, 's', 'c', 'r', 'i', 'p', 't')) { r->type = MSG_REQ_REDIS_SCRIPT; + r->require_subcommand = r->type; r->is_read = 0; break; } + + if (str6icmp(m, 'x', 'c', 'l', 'a', 'i', 'm')) { + r->type = MSG_REQ_REDIS_XCLAIM; + r->is_read = 0; + break; + } + + if (str6icmp(m, 'x', 'g', 'r', 'o', 'u', 'p')) { + r->type = MSG_REQ_REDIS_XGROUP; + r->is_read = 0; + r->require_subcommand = r->type; + break; + } + + if (str6icmp(m, 'x', 'r', 'a', 'n', 'g', 'e')) { + r->type = MSG_REQ_REDIS_XRANGE; + r->is_read = 1; + break; + } + + if (str6icmp(m, 's', 't', 'r', 'e', 'a', 'm') && r->require_subcommand == MSG_REQ_REDIS_XINFO) { + // This is the second half of the command 'XINFO STREAM'. + ASSERT(r->type == MSG_REQ_REDIS_XINFO); + r->type = MSG_REQ_REDIS_XINFO_STREAM; + r->is_read = 1; + break; + } + + if (str6icmp(m, 'g', 'r', 'o', 'u', 'p', 's') && r->require_subcommand == MSG_REQ_REDIS_XINFO) { + // This is the second half of the command 'XINFO GROUPS'. + ASSERT(r->type == MSG_REQ_REDIS_XINFO); + r->type = MSG_REQ_REDIS_XINFO_GROUPS; + r->is_read = 1; + break; + } + + if (str6icmp(m, 'c', 'r', 'e', 'a', 't', 'e') && r->require_subcommand == MSG_REQ_REDIS_XGROUP) { + // This is the second half of the command 'XGROUP CREATE'. + ASSERT(r->type == MSG_REQ_REDIS_XGROUP); + r->type = MSG_REQ_REDIS_XGROUP_CREATE; + r->is_read = 0; + break; + } + + if (str6icmp(m, 'd', 'b', 's', 'i', 'z', 'e')) { + r->type = MSG_REQ_REDIS_DBSIZE; + r->msg_routing = ROUTING_LOCAL_NODE_ONLY; + r->is_read = 1; + break; + } + break; case 7: @@ -1236,27 +1445,39 @@ void redis_parse_req(struct msg *r, const struct string *hash_tag) { r->is_read = 0; break; } + if (str7icmp(m, 'p', 'f', 'c', 'o', 'u', 'n', 't')) { r->type = MSG_REQ_REDIS_PFCOUNT; r->is_read = 0; break; } + if (str7icmp(m, 'g', 'e', 'o', 'h', 'a', 's', 'h')) { r->type = MSG_REQ_REDIS_GEOHASH; r->is_read = 1; break; } + if (str7icmp(m, 'g', 'e', 'o', 'd', 'i', 's', 't')) { r->type = MSG_REQ_REDIS_GEODIST; r->is_read = 1; break; } + if (str7icmp(m, 'h', 's', 't', 'r', 'l', 'e', 'n')) { r->type = MSG_REQ_REDIS_HSTRLEN; r->is_read = 1; break; } + if (str7icmp(m, 'd', 'e', 's', 't', 'r', 'o', 'y') && r->require_subcommand == MSG_REQ_REDIS_XGROUP) { + // This is the second half of the command 'XGROUP DESTROY'. + ASSERT(r->type == MSG_REQ_REDIS_XGROUP); + r->type = MSG_REQ_REDIS_XGROUP_DESTROY; + r->is_read = 0; + break; + } + break; case 8: @@ -1313,6 +1534,12 @@ void redis_parse_req(struct msg *r, const struct string *hash_tag) { break; } + if (str8icmp(m, 'x', 'p', 'e', 'n', 'd', 'i', 'n', 'g')) { + r->type = MSG_REQ_REDIS_XPENDING; + r->is_read = 1; + break; + } + break; case 9: @@ -1351,6 +1578,7 @@ void redis_parse_req(struct msg *r, const struct string *hash_tag) { r->is_read = 1; break; } + if (str9icmp(m, 'j', 's', 'o', 'n', '.', 't', 'y', 'p', 'e')) { r->type = MSG_REQ_REDIS_JSONTYPE; r->is_read = 1; @@ -1363,6 +1591,20 @@ void redis_parse_req(struct msg *r, const struct string *hash_tag) { break; } + if (str9icmp(m, 'x', 'r', 'e', 'v', 'r', 'a', 'n', 'g', 'e')) { + r->type = MSG_REQ_REDIS_XREVRANGE; + r->is_read = 1; + break; + } + + if (str9icmp(m, 'c', 'o', 'n', 's', 'u', 'm', 'e', 'r', 's') && r->require_subcommand == MSG_REQ_REDIS_XINFO) { + // This is the second half of the command 'XINFO CONSUMERS'. + ASSERT(r->type == MSG_REQ_REDIS_XINFO); + r->type = MSG_REQ_REDIS_XINFO_CONSUMERS; + r->is_read = 1; + break; + } + break; case 10: @@ -1373,6 +1615,14 @@ void redis_parse_req(struct msg *r, const struct string *hash_tag) { break; } + if (str10icmp(m, 'x', 'r', 'e', 'a', 'd', 'g', 'r', 'o', 'u', + 'p')) { + r->type = MSG_REQ_REDIS_XREADGROUP; + r->is_read = 1; + r->require_subcommand = r-> type; + break; + } + break; case 11: @@ -1439,6 +1689,14 @@ void redis_parse_req(struct msg *r, const struct string *hash_tag) { break; } + if (str11icmp(m, 'd', 'e', 'l', 'c', 'o', 'n', 's', 'u', 'm', 'e', 'r') && r->require_subcommand == MSG_REQ_REDIS_XGROUP) { + // This is the second half of the command 'XGROUP DELCONSUMER'. + ASSERT(r->type == MSG_REQ_REDIS_XGROUP); + r->type = MSG_REQ_REDIS_XGROUP_DELCONSUMER; + r->is_read = 1; + break; + } + break; case 12: @@ -1551,18 +1809,20 @@ void redis_parse_req(struct msg *r, const struct string *hash_tag) { break; case SW_REQ_TYPE_LF: - if (r->type == MSG_REQ_REDIS_SCRIPT) { + if (redis_has_subcommand(r)) { // Parsing "SCRIPT " is a special case since there is a // space between the keyword SCRIPT and the following keyword. To deal with // this, we reset the state machine to a previous state to allow parsing of the // second keyword as well. // Set 'state' back to 'SW_REQ_TYPE_LEN' to parse the second half of the command // the space in 'SCRIPT '. + // Same logic applies to commands like XINFO, XGROUP etc. state = SW_REQ_TYPE_LEN; log_debug(LOG_VERB, "parsed partial command '%.*s'. Continuing to parse" \ "remainaing part of command", p - m, m); break; } + switch (ch) { case LF: if (redis_argz(r) && (r->rntokens == 0)) { @@ -1577,6 +1837,10 @@ void redis_parse_req(struct msg *r, const struct string *hash_tag) { state = SW_ARG1_LEN; } else if (redis_argeval(r)) { state = SW_ARG1_LEN; + } else if (redis_argxread(r)) { + state = SW_ARGN_LEN; + } else if (redis_argxreadgroup(r)) { + state = SW_ARG1_LEN; } else { state = SW_KEY_LEN; } @@ -1720,7 +1984,7 @@ void redis_parse_req(struct msg *r, const struct string *hash_tag) { goto error; } state = SW_ARG1_LEN; - } else if (redis_argeval(r)) { + } else if (redis_argeval(r) || redis_argxread(r) || redis_argxreadgroup(r)) { r->nkeys--; if (r->nkeys > 0) { // if there are more keys pending, parse them @@ -1779,7 +2043,6 @@ void redis_parse_req(struct msg *r, const struct string *hash_tag) { break; case SW_ARG1: - if (r->type == MSG_REQ_REDIS_CONFIG && !str3icmp(m, 'g', 'e', 't')) { log_error("Redis CONFIG command not supported '%.*s'", p - m, m); goto error; @@ -1842,6 +2105,12 @@ void redis_parse_req(struct msg *r, const struct string *hash_tag) { goto error; } state = SW_ARG2_LEN; + } else if (redis_argxreadgroup(r)) { + if (r->rntokens < 1) { + log_error("Dynomite XREADGROUP requires exactly two arguments following the keyword GROUP"); + goto error; + } + state = SW_ARG2_LEN; } else if (redis_argkvx(r)) { if (r->rntokens == 0) { goto done; @@ -1988,6 +2257,11 @@ void redis_parse_req(struct msg *r, const struct string *hash_tag) { goto error; } state = SW_KEY_LEN; + } else if (redis_argxreadgroup(r)) { + if (r->rntokens < 1) { + goto error; + } + state = SW_ARGN_LEN; } else { goto error; } @@ -2122,6 +2396,16 @@ void redis_parse_req(struct msg *r, const struct string *hash_tag) { break; case SW_ARGN: + if ((r->type == MSG_REQ_REDIS_XREAD || r->type == MSG_REQ_REDIS_XREADGROUP_GROUP) && str7icmp(p,'s','t','r','e','a','m','s')) { + if ((r->rntokens % 2) != 0) { + log_error("Dynomite XREAD/XREADGROUP requires an even number of keys with args. " \ + "For each key an arg must be specified " \ + "Currently have %d key(s) with matching arg", r->rntokens - 1); + goto error; + } + r->nkeys = r->rntokens / 2; + } + m = p + r->rlen; if (m >= b->last) { r->rlen -= (uint32_t)(b->last - p); @@ -2152,11 +2436,20 @@ void redis_parse_req(struct msg *r, const struct string *hash_tag) { case SW_ARGN_LF: switch (ch) { case LF: - if (redis_argn(r) || redis_argeval(r)) { + if (redis_argn(r) || + redis_argeval(r) || + redis_argxread(r) || + redis_argxreadgroup(r)) { + if (r->rntokens == 0) { goto done; } - state = SW_ARGN_LEN; + + if ((redis_argxread(r) || redis_argxreadgroup(r)) && (r->nkeys > 0)) { + state = SW_KEY_LEN; + } else { + state = SW_ARGN_LEN; + } } else { goto error; } @@ -2267,6 +2560,7 @@ void redis_parse_rsp(struct msg *r, const struct string *UNUSED) { struct mbuf *b; uint8_t *p, *m; uint8_t ch; + int depth; enum { SW_START, @@ -2305,7 +2599,7 @@ void redis_parse_rsp(struct msg *r, const struct string *UNUSED) { for (p = r->pos; p < b->last; p++) { ch = *p; - switch (state) { + switch (state) { case SW_START: r->type = MSG_UNKNOWN; switch (ch) { @@ -2489,8 +2783,9 @@ void redis_parse_rsp(struct msg *r, const struct string *UNUSED) { case SW_SIMPLE: if (ch == CR) { - state = SW_MULTIBULK_ARGN_LF; r->rntokens--; + r->stack[r->nested_depth-1]--; + state = SW_MULTIBULK_ARGN_LF; } break; @@ -2607,8 +2902,18 @@ void redis_parse_rsp(struct msg *r, const struct string *UNUSED) { /* rsp_start <- p */ r->ntoken_start = p; r->rntokens = 0; + r->nested_depth++; + + if (r->nested_depth > MAX_BULK_REPLY_DEPTH) { + log_debug(LOG_ERR, "only support %d levels of multibulk", MAX_BULK_REPLY_DEPTH); + goto error; + } } else if (ch == '-') { - state = SW_RUNTO_CRLF; + p = p-1; + r->token = NULL; + r->rntokens = 1; + r->stack[r->nested_depth-1] = r->rntokens; + state = SW_MULTIBULK_ARGN_LEN; } else if (isdigit(ch)) { r->rntokens = r->rntokens * 10 + (uint32_t)(ch - '0'); } else if (ch == CR) { @@ -2619,6 +2924,7 @@ void redis_parse_rsp(struct msg *r, const struct string *UNUSED) { r->ntokens = r->rntokens; r->ntoken_end = p; r->token = NULL; + r->stack[r->nested_depth-1] = r->ntokens; state = SW_MULTIBULK_NARG_LF; } else { goto error; @@ -2631,7 +2937,18 @@ void redis_parse_rsp(struct msg *r, const struct string *UNUSED) { case LF: if (r->rntokens == 0) { /* response is '*0\r\n' */ - goto done; + + if (r->nested_depth == 1) { + goto done; + } else { + log_debug(LOG_VVVERB, + "multibulk support@end of a nested empty bulk (level/entries) %d/%d", + r->nested_depth, r->stack[r->nested_depth-1]); + + p = p - 1; + state = SW_MULTIBULK_ARGN_LF; + break; + } } state = SW_MULTIBULK_ARGN_LEN; break; @@ -2705,6 +3022,7 @@ void redis_parse_rsp(struct msg *r, const struct string *UNUSED) { } r->rntokens--; r->token = NULL; + r->stack[r->nested_depth-1]--; } else { goto error; } @@ -2746,7 +3064,23 @@ void redis_parse_rsp(struct msg *r, const struct string *UNUSED) { case SW_MULTIBULK_ARGN_LF: switch (ch) { case LF: - if (r->rntokens == 0) { + log_debug(LOG_VVVERB, + "multibulk support@the end of the bulk (level/entries): %d/%d", + r->nested_depth, r->stack[r->nested_depth-1]); + + depth = r->nested_depth; + while (depth > 1 && r->stack[depth-1] == 0) { + depth--; + r->stack[depth-1]--; + r->nested_depth = depth; + r->rntokens = r->stack[depth-1]; + + log_debug(LOG_VVVERB, + "multibulk support@the end of a nested multibulk (level/entries): %d/%d", + r->nested_depth, r->stack[r->nested_depth-1]); + } + + if (r->stack[0] == 0) { goto done; } diff --git a/test/dual_run.py b/test/dual_run.py index ce84694a0..5ba827731 100644 --- a/test/dual_run.py +++ b/test/dual_run.py @@ -2,11 +2,12 @@ import redis class ResultMismatchError(Exception): - def __init__(self, r_result, d_result, func, *args): + def __init__(self, r_result, d_result, func, *args, **kwargs): self.r_result = r_result self.d_result = d_result self.func = func self.args = args + self.kwargs = kwargs def __str__(self): ret = "\n\t======Result Mismatch=======\n" ret += "\tQuery: %s %s" % (self.func, str(self.args)) @@ -21,17 +22,17 @@ def __init__(self, r, d, debug=None): self.r = r self.d = d self.debug = debug - def run_verify(self, func, *args): + def run_verify(self, func, *args, **kwargs): r_result = None d_result = None r_func = getattr(self.r, func) d_func = getattr(self.d, func) - r_result = r_func(*args) + r_result = r_func(*args, **kwargs) i = 0 retry_limit = 3 while i < retry_limit: try: - d_result = d_func(*args) + d_result = d_func(*args, **kwargs) if i > 0: print("\tSucceeded in attempt {}".format(i+1)) break @@ -43,21 +44,27 @@ def run_verify(self, func, *args): print("\tGot error '{}'\n\tQuery '{} {}'".format(e, func, str(args))) break if self.debug: - print("Query: %s %s" % (func, str(args))) + print("Query: %s %s" % (func, str(args), str(kwargs))) print("Redis result: %s" % str(r_result)) print("Dyno result: %s" % str(d_result)) + + if func in ['xadd']: + # Truncate milliseconds + r_result = str(r_result).split('-')[0][:-5] + d_result = str(d_result).split('-')[0][:-5] + if r_result != d_result: - raise ResultMismatchError(r_result, d_result, func, *args) + raise ResultMismatchError(r_result, d_result, func, *args, **kwargs) return d_result - def run_dynomite_only(self, func, *args): + def run_dynomite_only(self, func, *args, **kwargs): d_result = None d_func = getattr(self.d, func) i = 0 retry_limit = 3 while i < retry_limit: try: - d_result = d_func(*args) + d_result = d_func(*args, **kwargs) if i > 0: print("\tSucceeded in attempt {}".format(i+1)) break diff --git a/test/func_test.py b/test/func_test.py index 981fabdb0..275995b38 100755 --- a/test/func_test.py +++ b/test/func_test.py @@ -162,6 +162,32 @@ def create_key_field(keyid=None, fieldid=None): #if next_index == 0: #break +def run_streams_tests(c, max_stream_entries=1000): + #Set some + test_name="STREAMS" + print("Running %s tests" % test_name) + + STREAM_ENTRY_COUNT = 32 + STREAM_NAME = "TEST_STREAM" + key = STREAM_NAME + + # Populate stream: + # XADD key * data1 x data 2 + for x in range(0, max_stream_entries): + c.run_verify("xadd", key, {"data1": x, "data2": x + 10}) + + xlen = c.run_verify("xlen", key) + assert xlen == max_stream_entries + + c.run_dynomite_only("xinfo_stream", key) + + # XRANGE + res = c.run_dynomite_only("xrange", key, count=STREAM_ENTRY_COUNT) + assert len(res) == STREAM_ENTRY_COUNT + + res = c.run_dynomite_only("xread", {key: 0}, count=10) + + def comparison_test(redis, dynomite, debug): r_c = redis.get_connection() d_c = dynomite.get_connection() @@ -173,6 +199,8 @@ def comparison_test(redis, dynomite, debug): run_multikey_test(c) run_hash_tests(c, max_keys=10, max_fields=100) run_script_tests(c) + # Skip streams tests on GitHub's CI (env. does not support redis 5.x.x) + # run_streams_tests(c) print("All test ran fine") def main(args):