Skip to content

Commit efff6b5

Browse files
authored
RESP2REDIS: Should not fail trying to load empty module (#53)
If RDB2RESP was configured to "supportRestoreModuleAux" and generates RESTOREMODAUX commands, currently relevant only to Redis enterprise, then if RDB was generated by a server with some module, but user didn't make any use of that module, attempting to play it to another server that wasn't loaded with that module, the RDB parser will get fail. This is because the module always store something in the AUX field, and the RDB parser will try to load it. In order to overcome this issue, A module that its AUX payload is less than 15 Bytes (including RDB version and checksum) counted as AUX field of an empty Module (not in use), then the parser, when restoring the empty module, it should ignore returned error: "-ERR Module X not found..." Few changes were made to support it: - Propagate restoreSize to RESP2REDIS handlers. - ReaderResp gets now error-callback which can indicate to mask given error. - RESP2REDIS indicates to ignore RESTOREMODAUX error message of type "-ERR Module .. not found ..." - Added tests accordingly. Additionally added timeout to recv() from redis to avoid blocking forever. Irrelevant to this fix.
1 parent f953088 commit efff6b5

File tree

10 files changed

+155
-77
lines changed

10 files changed

+155
-77
lines changed

api/librdb-ext-api.h

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ typedef enum {
5050
RDBX_ERR_RESP2REDIS_FAILED_WRITE,
5151
RDBX_ERR_RESP2REDIS_CONN_CLOSE,
5252
RDBX_ERR_RESP2REDIS_MAX_RETRIES,
53+
RDBX_ERR_RESP2REDIS_SET_TIMEOUT,
5354
} RdbxRes;
5455

5556
/****************************************************************
@@ -198,14 +199,21 @@ _LIBRDB_API RdbxToResp *RDBX_createHandlersToResp(RdbParser *, RdbxToRespConf *)
198199
* <user-defined-writer>
199200
****************************************************************/
200201

201-
/* On start command pass command info. NULL otherwise. */
202+
/* As streaming RESP protocol, when starting a new command, provide details
203+
* about the command. Otherwise, pass NULL. This information will be used to log
204+
* and report the command in case of a failure from Redis server. */
202205
typedef struct RdbxRespWriterStartCmd {
203206
/* Redis Command name (Ex: "SET", "RESTORE"). Owned by the caller. It is
204207
* constant static string and Valid for ref behind the duration of the call. */
205208
const char *cmd;
209+
206210
/* If key available as part of command. Else empty string.
207211
* Owned by the caller. */
208212
const char *key;
213+
214+
/* On restore command, size of serialized data. Otherwise, set to 0. */
215+
size_t restoreSize;
216+
209217
} RdbxRespWriterStartCmd;
210218

211219
typedef struct RdbxRespWriter {

src/ext/handlersToResp.c

Lines changed: 19 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -225,9 +225,7 @@ static inline RdbRes onWriteNewCmdDbg(RdbxToResp *ctx) {
225225
if (ctx->debug.flags & RFLAG_ENUM_CMD_ID) {
226226
char keyLenStr[32], cmdIdLenStr[32], cmdIdStr[32];
227227

228-
RdbxRespWriterStartCmd startCmd;
229-
startCmd.cmd = "SET";
230-
startCmd.key = KEY_CMD_ID_DBG;
228+
RdbxRespWriterStartCmd startCmd = {"SET", KEY_CMD_ID_DBG, 0};
231229

232230
struct iovec iov[7];
233231
/* write SET */
@@ -296,9 +294,7 @@ static inline RdbRes sendFirstRestoreFrag(RdbxToResp *ctx, RdbBulk frag, size_t
296294
if (ctx->keyCtx.delBeforeWrite == DEL_KEY_BEFORE_BY_RESTORE_REPLACE)
297295
extra_args++;
298296

299-
RdbxRespWriterStartCmd startCmd;
300-
startCmd.cmd = "RESTORE";
301-
startCmd.key = ctx->keyCtx.key;
297+
RdbxRespWriterStartCmd startCmd = {"RESTORE", ctx->keyCtx.key, ctx->restoreCtx.restoreSize};
302298

303299
/* writev RESTORE */
304300
char cmd[64];
@@ -326,9 +322,7 @@ static inline RdbRes sendFirstRestoreFragModuleAux(RdbxToResp *ctx, RdbBulk frag
326322
struct iovec iov[3];
327323
char lenStr[32];
328324

329-
RdbxRespWriterStartCmd startCmd;
330-
startCmd.cmd = "RESTOREMODAUX";
331-
startCmd.key = "";
325+
RdbxRespWriterStartCmd startCmd = {"RESTOREMODAUX", "", ctx->restoreCtx.restoreSize};
332326

333327
/* writev RESTOREMODAUX */
334328
iov[0].iov_base = ctx->restoreCtx.moduleAux.cmdPrefix;
@@ -354,9 +348,7 @@ static RdbRes toRespNewDb(RdbParser *p, void *userData, int dbid) {
354348

355349
int cnt = ll2string(dbidStr, sizeof(dbidStr), dbid);
356350

357-
RdbxRespWriterStartCmd startCmd;
358-
startCmd.cmd = "SELECT";
359-
startCmd.key = "";
351+
RdbxRespWriterStartCmd startCmd = {"SELECT", "", 0};
360352

361353
IOV_CONST(&iov[0], "*2\r\n$6\r\nSELECT");
362354
IOV_LENGTH(&iov[1], cnt, cntStr);
@@ -394,9 +386,7 @@ static RdbRes toRespNewKey(RdbParser *p, void *userData, RdbBulk key, RdbKeyInfo
394386
struct iovec iov[4];
395387
char keyLenStr[32];
396388

397-
RdbxRespWriterStartCmd startCmd;
398-
startCmd.cmd = "DEL";
399-
startCmd.key = ctx->keyCtx.key;
389+
RdbxRespWriterStartCmd startCmd = {"DEL", ctx->keyCtx.key, 0};
400390

401391
IOV_CONST(&iov[0], "*2\r\n$3\r\nDEL");
402392
IOV_LENGTH(&iov[1], ctx->keyCtx.keyLen, keyLenStr);
@@ -415,9 +405,7 @@ static RdbRes toRespEndKey(RdbParser *p, void *userData) {
415405
/* key is in db. Set its expiration time */
416406
if (ctx->keyCtx.info.expiretime != -1) {
417407
struct iovec iov[6];
418-
RdbxRespWriterStartCmd startCmd;
419-
startCmd.cmd = "PEXPIREAT";
420-
startCmd.key = ctx->keyCtx.key;
408+
RdbxRespWriterStartCmd startCmd = {"PEXPIREAT", ctx->keyCtx.key, 0};
421409

422410
char keyLenStr[32], expireLenStr[32], expireStr[32];
423411
/* PEXPIREAT */
@@ -448,9 +436,7 @@ static RdbRes toRespString(RdbParser *p, void *userData, RdbBulk string) {
448436

449437
struct iovec iov[7];
450438

451-
RdbxRespWriterStartCmd startCmd;
452-
startCmd.cmd = "SET";
453-
startCmd.key = ctx->keyCtx.key;
439+
RdbxRespWriterStartCmd startCmd = {"SET", ctx->keyCtx.key, 0};
454440

455441
/* write SET */
456442
IOV_CONST(&iov[0], "*3\r\n$3\r\nSET");
@@ -473,9 +459,7 @@ static RdbRes toRespList(RdbParser *p, void *userData, RdbBulk item) {
473459
char keyLenStr[32], valLenStr[32];
474460
int valLen = RDB_bulkLen(p, item);
475461

476-
RdbxRespWriterStartCmd startCmd;
477-
startCmd.cmd = "RPUSH";
478-
startCmd.key = ctx->keyCtx.key;
462+
RdbxRespWriterStartCmd startCmd = {"RPUSH", ctx->keyCtx.key, 0};
479463

480464
/* write RPUSH */
481465
IOV_CONST(&iov[0], "*3\r\n$5\r\nRPUSH");
@@ -500,9 +484,7 @@ static RdbRes toRespHash(RdbParser *p, void *userData, RdbBulk field, RdbBulk va
500484
int fieldLen = RDB_bulkLen(p, field);
501485
int valueLen = RDB_bulkLen(p, value);
502486

503-
RdbxRespWriterStartCmd hsetCmd;
504-
hsetCmd.cmd = "HSET";
505-
hsetCmd.key = ctx->keyCtx.key;
487+
RdbxRespWriterStartCmd hsetCmd = {"HSET", ctx->keyCtx.key, 0};
506488

507489
/* write RPUSH */
508490
IOV_CONST(&iov[0], "*4\r\n$4\r\nHSET");
@@ -520,9 +502,8 @@ static RdbRes toRespHash(RdbParser *p, void *userData, RdbBulk field, RdbBulk va
520502

521503
if (expireAt == -1) return RDB_OK;
522504

523-
RdbxRespWriterStartCmd hpexpireatCmd;
524-
hpexpireatCmd.cmd = "HPEXPIREAT";
525-
hpexpireatCmd.key = ctx->keyCtx.key;
505+
RdbxRespWriterStartCmd hpexpireatCmd = {"HPEXPIREAT", ctx->keyCtx.key, 0};
506+
526507
/* write HPEXPIREAT */
527508
IOV_CONST(&iov[0], "*6\r\n$10\r\nHPEXPIREAT");
528509
/* write key */
@@ -545,9 +526,7 @@ static RdbRes toRespSet(RdbParser *p, void *userData, RdbBulk member) {
545526

546527
int valLen = RDB_bulkLen(p, member);
547528

548-
RdbxRespWriterStartCmd startCmd;
549-
startCmd.cmd = "SADD";
550-
startCmd.key = ctx->keyCtx.key;
529+
RdbxRespWriterStartCmd startCmd = {"SADD", ctx->keyCtx.key, 0};
551530

552531
/* write RPUSH */
553532
IOV_CONST(&iov[0], "*3\r\n$4\r\nSADD");
@@ -568,9 +547,7 @@ static RdbRes toRespZset(RdbParser *p, void *userData, RdbBulk member, double sc
568547

569548
int valLen = RDB_bulkLen(p, member);
570549

571-
RdbxRespWriterStartCmd startCmd;
572-
startCmd.cmd = "ZADD";
573-
startCmd.key = ctx->keyCtx.key;
550+
RdbxRespWriterStartCmd startCmd = {"ZADD", ctx->keyCtx.key, 0};
574551

575552
/* write ZADD */
576553
IOV_CONST(&iov[0], "*4\r\n$4\r\nZADD");
@@ -615,9 +592,7 @@ static RdbRes toRespFunction(RdbParser *p, void *userData, RdbBulk func) {
615592

616593
int funcLen = RDB_bulkLen(p, func);
617594

618-
RdbxRespWriterStartCmd startCmd;
619-
startCmd.cmd = "FUNCTION";
620-
startCmd.key = "";
595+
RdbxRespWriterStartCmd startCmd = {"FUNCTION", "", 0};
621596

622597
if (ctx->conf.funcLibReplaceIfExist)
623598
IOV_CONST(&iov[0], "*4\r\n$8\r\nFUNCTION\r\n$4\r\nLOAD\r\n$7\r\nREPLACE");
@@ -644,9 +619,7 @@ static RdbRes toRespStreamMetaData(RdbParser *p, void *userData, RdbStreamMeta *
644619
* for the Stream type. (We don't use the MAXLEN 0 trick from aof.c
645620
* because of Redis Enterprise CRDT compatibility issues - Can't XSETID "back") */
646621

647-
RdbxRespWriterStartCmd startCmd;
648-
startCmd.cmd = "XGROUP CREATE";
649-
startCmd.key = ctx->keyCtx.key;
622+
RdbxRespWriterStartCmd startCmd = {"XGROUP CREATE", ctx->keyCtx.key, 0};
650623

651624
IOV_CONST(&iov[0], "*6\r\n$6\r\nXGROUP\r\n$6\r\nCREATE");
652625
IOV_LENGTH(&iov[1], ctx->keyCtx.keyLen, keyLenStr);
@@ -671,9 +644,7 @@ static RdbRes toRespStreamMetaData(RdbParser *p, void *userData, RdbStreamMeta *
671644
int idLen = snprintf(idStr, sizeof(idStr), "%lu-%lu",meta->lastID.ms,meta->lastID.seq);
672645
int maxDelEntryIdLen = snprintf(maxDelEntryId, sizeof(maxDelEntryId), "%lu-%lu", meta->maxDelEntryID.ms, meta->maxDelEntryID.seq);
673646

674-
RdbxRespWriterStartCmd startCmd;
675-
startCmd.cmd = "XSETID";
676-
startCmd.key = ctx->keyCtx.key;
647+
RdbxRespWriterStartCmd startCmd = {"XSETID", ctx->keyCtx.key, 0};
677648

678649
if ((ctx->keyCtx.info.opcode >= _RDB_TYPE_STREAM_LISTPACKS_2) && (ctx->targetRedisVerVal >= VER_VAL(7, 0))) {
679650
IOV_CONST(&iov[0], "*7\r\n$6\r\nXSETID");
@@ -711,8 +682,7 @@ static RdbRes toRespStreamItem(RdbParser *p, void *userData, RdbStreamID *id, Rd
711682

712683
/* Start of (another) stream item? */
713684
if ((ctx->streamCtx.xaddStartEndCounter % 2) == 0) {
714-
startCmd.cmd = "XADD";
715-
startCmd.key = ctx->keyCtx.key;
685+
startCmd = (RdbxRespWriterStartCmd) {"XADD", ctx->keyCtx.key, 0};
716686
startCmdRef = &startCmd;
717687

718688
/* writev XADD */
@@ -763,9 +733,7 @@ static RdbRes toRespStreamNewCGroup(RdbParser *p, void *userData, RdbBulk grpNam
763733

764734
int idLen = snprintf(idStr, sizeof(idStr), "%lu-%lu",meta->lastId.ms,meta->lastId.seq);
765735

766-
RdbxRespWriterStartCmd startCmd;
767-
startCmd.cmd = "XGROUP";
768-
startCmd.key = ctx->keyCtx.key;
736+
RdbxRespWriterStartCmd startCmd = { "XGROUP", ctx->keyCtx.key, 0};
769737

770738
/* writev XGROUP */
771739
if ( (meta->entriesRead>=0) && (ctx->targetRedisVerVal >= VER_VAL(7, 0))) {
@@ -845,9 +813,7 @@ static RdbRes toRespStreamConsumerPendingEntry(RdbParser *p, void *userData, Rdb
845813
return (RdbRes) RDBX_ERR_STREAM_INTEG_CHECK;
846814
}
847815

848-
RdbxRespWriterStartCmd startCmd;
849-
startCmd.cmd = "XCLAIM";
850-
startCmd.key = ctx->keyCtx.key;
816+
RdbxRespWriterStartCmd startCmd = {"XCLAIM", ctx->keyCtx.key, 0};
851817

852818
/* writev XCLAIM */
853819
IOV_CONST(&iov[iovs++], "*12\r\n$6\r\nXCLAIM");

src/ext/readerResp.c

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,11 @@ static RespRes readRespReplyError(RespReaderCtx *ctx, RespReplyBuff *buffInfo) {
132132
else
133133
ctx->errorMsg[ctx->errorMsgLen - 1] = '\0';
134134

135-
res = RESP_REPLY_ERR;
135+
/* Report the error. cb return 1 to propagate. 0 to mask */
136+
if ((ctx->errCb) && (ctx->errCb(ctx->errCbCtx, ctx->errorMsg) == 0))
137+
return RESP_REPLY_OK;
138+
139+
return RESP_REPLY_ERR;
136140
}
137141

138142
return res;
@@ -450,9 +454,12 @@ static RespRes readRespReply(RespReaderCtx *ctx, RespReplyBuff *buffInfo) {
450454
/*** non-static functions (public) ***/
451455

452456
void readRespInit(RespReaderCtx *ctx) {
453-
ctx->type = 0;
454-
ctx->errorMsgLen = 0;
455-
ctx->countReplies = 0;
457+
memset(ctx, 0, sizeof(RespReaderCtx));
458+
}
459+
460+
void setErrorCb(RespReaderCtx *respReaderCtx, void *errorCbCtx, OnRespErrorCb cb) {
461+
respReaderCtx->errCbCtx = errorCbCtx;
462+
respReaderCtx->errCb = cb;
456463
}
457464

458465
RespRes readRespReplies(RespReaderCtx *ctx, const char *buff, int buffLen) {

src/ext/readerResp.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@ typedef struct RespReplyBuff {
1414
int at;
1515
} RespReplyBuff;
1616

17+
/* cb to report on RESP error. Returns 1 to propagate. 0 to mask. */
18+
typedef int (*OnRespErrorCb) (void *callerCtx, char *msg);
19+
1720
typedef struct {
1821

1922
/* PUBLIC: read-only */
@@ -33,8 +36,15 @@ typedef struct {
3336
/* private bulk-array response state */
3437
long long numBulksArray;
3538

39+
/* On RESP error callback */
40+
void *errCbCtx;
41+
OnRespErrorCb errCb;
42+
3643
} RespReaderCtx;
3744

3845
void readRespInit(RespReaderCtx *ctx);
3946

47+
/* Can register cb to decide whether to ignore given error or propagate it */
48+
void setErrorCb(RespReaderCtx *respReaderCtx, void *errorCbCtx, OnRespErrorCb cb);
49+
4050
RespRes readRespReplies(RespReaderCtx *ctx, const char *buff, int buffLen);

0 commit comments

Comments
 (0)