diff --git a/configure.ac b/configure.ac index cf3903ab16..fd1b9385ac 100644 --- a/configure.ac +++ b/configure.ac @@ -2638,6 +2638,16 @@ AC_ARG_ENABLE(redis_tests, ) AM_CONDITIONAL(ENABLE_REDIS_TESTS, test x$enable_redis_tests = xyes) +AC_ARG_ENABLE(redis_ssl, + [AS_HELP_STRING([--enable-redis-ssl],[Enable redis ssl support @<:@default=no@:>@])], + [case "${enableval}" in + yes) enable_redis_ssl="yes" ;; + no) enable_redis_ssl="no" ;; + *) AC_MSG_ERROR(bad value ${enableval} for --enable-redis-ssl) ;; + esac], + [enable_redis_ssl=no] +) + if test "x$enable_omhiredis" = "xyes" -o "x$enable_imhiredis" = "xyes" ; then PKG_CHECK_MODULES(HIREDIS, hiredis >= 0.10.1, [], [AC_SEARCH_LIBS(redisConnectWithTimeout, hiredis, @@ -2662,13 +2672,44 @@ if test "x$enable_omhiredis" = "xyes" -o "x$enable_imhiredis" = "xyes" ; then [AC_MSG_ERROR([hiredis not found])] )] ) + if test "x$enable_redis_ssl" = "xyes"; then + PKG_CHECK_MODULES(HIREDIS_SSL, hiredis_ssl >= 0.10.1, + # hiredis_ssl found + [ + AC_DEFINE(HIREDIS_SSL, 1, [TLS support enabled in hiredis]) + ], + [AC_SEARCH_LIBS(redisCreateSSLContext, hiredis_ssl, + [AC_COMPILE_IFELSE( + [AC_LANG_PROGRAM( + [[ #include + #include + ]], + [[ #define major 0 + #define minor 10 + #define patch 1 + #if (( HIREDIS_MAJOR > major ) || \ + (( HIREDIS_MAJOR == major ) && ( HIREDIS_MINOR > minor )) || \ + (( HIREDIS_MAJOR == major ) && ( HIREDIS_MINOR == minor ) && ( HIREDIS_PATCH >= patch ))) \ + /* OK */ + #else + # error Hiredis_ssl version must be >= major.minor.path + #endif + ]] + )], + [], + [AC_MSG_ERROR([hiredis_ssl version must be >= 0.10.1])] + )], + [AC_MSG_WARN([hiredis_ssl not found, no TLS support in hiredis])] + )] + ) + fi fi if test "x$enable_imhiredis" = "xyes" ; then PKG_CHECK_MODULES(LIBEVENT, [libevent >= 2.0, libevent_pthreads], # libevent found [ - HIREDIS_LIBS="$HIREDIS_LIBS -levent -levent_pthreads" + HIREDIS_LIBS="$HIREDIS_LIBS $HIREDIS_SSL_LIBS -levent -levent_pthreads" ], # libevent not found [AC_MSG_ERROR([no libevent >= 2.0 found with pthreads support, imhiredis cannot use pub/sub])]) @@ -2982,6 +3023,7 @@ echo " kafka static linking enabled: $enable_kafka_static" echo " qpid proton static linking enabled: $enable_qpidproton_static" echo " atomic operations enabled: $enable_atomic_operations" echo " libcap-ng support enabled: $enable_libcapng" +echo " redis ssl support enabled: $enable_redis_ssl" echo echo "---{ input plugins }---" diff --git a/contrib/imhiredis/Makefile.am b/contrib/imhiredis/Makefile.am index 6528a19889..c8c7586cf1 100644 --- a/contrib/imhiredis/Makefile.am +++ b/contrib/imhiredis/Makefile.am @@ -1,7 +1,7 @@ pkglib_LTLIBRARIES = imhiredis.la imhiredis_la_SOURCES = imhiredis.c -imhiredis_la_CPPFLAGS = $(RSRT_CFLAGS) $(PTHREADS_CFLAGS) $(HIREDIS_CFLAGS) +imhiredis_la_CPPFLAGS = $(RSRT_CFLAGS) $(PTHREADS_CFLAGS) $(HIREDIS_CFLAGS) $(HIREDIS_SSL_CFLAGS) imhiredis_la_LDFLAGS = -module -avoid-version -imhiredis_la_LIBADD = $(HIREDIS_LIBS) +imhiredis_la_LIBADD = $(HIREDIS_LIBS) $(HIREDIS_SSL_LIBS) EXTRA_DIST = diff --git a/contrib/imhiredis/imhiredis.c b/contrib/imhiredis/imhiredis.c index e1a8e1c321..b153cdca93 100644 --- a/contrib/imhiredis/imhiredis.c +++ b/contrib/imhiredis/imhiredis.c @@ -35,6 +35,9 @@ #include #include #include +#ifdef HIREDIS_SSL +#include +#endif #include #include #include @@ -119,6 +122,16 @@ struct instanceConf_s { redisNode *redisNodesList; struct instanceConf_s *next; +#ifdef HIREDIS_SSL + sbool use_tls; /* Should we use TLS to connect to redis ? */ + char *ca_cert_bundle; /* CA bundle file */ + char *ca_cert_dir; /* Path of trusted certificates */ + char *client_cert; /* Client certificate */ + char *client_key; /* Client private key */ + char *sni; /* TLS Server Name Indication */ + redisSSLContext *ssl_conn; /* redis ssl connection */ + redisSSLContextError ssl_error; /* ssl error handler */ +#endif }; @@ -193,6 +206,14 @@ static struct cnfparamdescr inppdescr[] = { { "stream.consumerACK", eCmdHdlrBinary, 0 }, { "stream.autoclaimIdleTime", eCmdHdlrNonNegInt, 0 }, { "fields", eCmdHdlrArray, 0 }, +#ifdef HIREDIS_SSL + { "use_tls", eCmdHdlrBinary, 0 }, + { "ca_cert_bundle", eCmdHdlrGetWord, 0 }, + { "ca_cert_dir", eCmdHdlrGetWord, 0 }, + { "client_cert", eCmdHdlrGetWord, 0 }, + { "client_key", eCmdHdlrGetWord, 0 }, + { "sni", eCmdHdlrGetWord, 0 }, +#endif }; static struct cnfparamblk inppblk = { CNFPARAMBLK_VERSION, @@ -221,9 +242,9 @@ static rsRetVal enqMsgJson(instanceConf_t *const inst, struct json_object *json, rsRetVal redisAuthentSynchronous(redisContext *conn, uchar *password); rsRetVal redisAuthentAsynchronous(redisAsyncContext *aconn, uchar *password); rsRetVal redisActualizeCurrentNode(instanceConf_t *inst); -rsRetVal redisGetServersList(redisNode *node, uchar *password, redisNode **result); +rsRetVal redisGetServersList(redisNode *node, uchar *password, redisNode **result, instanceConf_t *const inst); rsRetVal redisAuthenticate(instanceConf_t *inst); -rsRetVal redisConnectSync(redisContext **conn, redisNode *node); +rsRetVal redisConnectSync(redisContext **conn, redisNode *node, instanceConf_t *const inst); rsRetVal connectMasterSync(instanceConf_t *inst); static sbool isConnectedSync(instanceConf_t *inst); rsRetVal redisConnectAsync(redisAsyncContext **aconn, redisNode *node); @@ -275,6 +296,16 @@ createInstance(instanceConf_t **pinst) /* Redis objects */ inst->conn = NULL; inst->aconn = NULL; +#ifdef HIREDIS_SSL + inst->ssl_conn = NULL; /* Connect later */ + inst->ssl_error = REDIS_SSL_CTX_NONE; + inst->use_tls = 0; + inst->ca_cert_bundle = NULL; + inst->ca_cert_dir = NULL; + inst->client_cert = NULL; + inst->client_key = NULL; + inst->sni = NULL; +#endif /* redis nodes list */ CHKiRet(createRedisNode(&(inst->redisNodesList))); @@ -508,6 +539,20 @@ CODESTARTnewInpInst inst->batchsize = (int) pvals[i].val.d.n; } else if(!strcmp(inppblk.descr[i].name, "key")) { inst->key = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); +#ifdef HIREDIS_SSL + } else if(!strcmp(inppblk.descr[i].name, "use_tls")) { + inst->use_tls = pvals[i].val.d.n; + } else if(!strcmp(inppblk.descr[i].name, "ca_cert_bundle")) { + inst->ca_cert_bundle = (char*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(inppblk.descr[i].name, "ca_cert_dir")) { + inst->ca_cert_dir = (char*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(inppblk.descr[i].name, "client_cert")) { + inst->client_cert = (char*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(inppblk.descr[i].name, "client_key")) { + inst->client_key = (char*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(inppblk.descr[i].name, "sni")) { + inst->sni = (char*)es_str2cstr(pvals[i].val.d.estr, NULL); +#endif } else { dbgprintf("imhiredis: program error, non-handled " "param '%s'\n", inppblk.descr[i].name); @@ -534,6 +579,12 @@ CODESTARTnewInpInst if (inst->password == NULL) { LogMsg(0, RS_RET_OK, LOG_INFO, "imhiredis: no password specified"); } +#ifdef HIREDIS_SSL + if((inst->client_cert == NULL) ^ (inst->client_key == NULL)){ + LogMsg(0, RS_RET_PARAM_ERROR, LOG_ERR, "imhiredis: \"client_cert\" and \"client_key\" must be specified together!"); + ABORT_FINALIZE(RS_RET_PARAM_ERROR); + } +#endif DBGPRINTF("imhiredis: newInpInst key=%s, mode=%s, uselpop=%d\n", inst->key, inst->modeDescription, inst->useLPop); @@ -641,6 +692,22 @@ CODESTARTfreeCnf redisAsyncFree(inst->aconn); inst->aconn = NULL; } + #ifdef HIREDIS_SSL + if(inst->ssl_conn != NULL) { + redisFreeSSLContext(inst->ssl_conn); + inst->ssl_conn = NULL; + } + if(inst->ca_cert_bundle != NULL) + free(inst->ca_cert_bundle); + if(inst->ca_cert_dir != NULL) + free(inst->ca_cert_dir); + if(inst->client_cert != NULL) + free(inst->client_cert); + if(inst->client_key != NULL) + free(inst->client_key); + if(inst->sni) + free(inst->sni); + #endif for (node = inst->redisNodesList; node != NULL; node = freeNode(node)) {;} @@ -802,6 +869,11 @@ CODEmodInit_QueryRegCFSLineHdlr /* activate libevent for (p)threads support */ evthread_use_pthreads(); +#ifdef HIREDIS_SSL + // initialize OpenSSL + redisInitOpenSSL(); +#endif + ENDmodInit @@ -840,18 +912,59 @@ static void redisAsyncRecvCallback (redisAsyncContext *aconn, void *reply, void * Asynchronous connection callback handler */ static void redisAsyncConnectCallback (const redisAsyncContext *c, int status) { + instanceConf_t *inst = (instanceConf_t *) c->data; + redisContext* rconn = ((redisContext*)(&c->c)); if (status != REDIS_OK) { LogMsg(0, RS_RET_REDIS_ERROR, LOG_ERR, "imhiredis (async): could not connect to redis: " "%s", c->errstr); // remove async context from instance config object, still contained in context's 'data' field - instanceConf_t *inst = (instanceConf_t *) c->data; assert(inst != NULL); inst->aconn = NULL; return; } DBGPRINTF("imhiredis (async): successfully connected!\n"); +#ifdef HIREDIS_SSL + if (inst->use_tls) { + inst->ssl_conn = redisCreateSSLContext(inst->ca_cert_bundle, inst->ca_cert_dir, inst->client_cert, inst->client_key, inst->sni, &inst->ssl_error); + + if (!inst->ssl_conn || inst->ssl_error != REDIS_SSL_CTX_NONE) { + LogError(0, NO_ERRCODE, "imhiredis: SSL Context error: %s", redisSSLContextGetError(inst->ssl_error)); + if (inst->currentNode->usesSocket) { + LogError(0, RS_RET_REDIS_ERROR, "imhiredis: [TLS] can not connect to redis server '%s' " + "-> could not allocate context!\n", inst->currentNode->socketPath); + } else { + LogError(0, RS_RET_REDIS_ERROR, "imhiredis: [TLS] can not connect to redis server '%s', " + "port %d -> could not allocate context!\n", inst->currentNode->server, inst->currentNode->port); + } + if (inst->ssl_conn != NULL) { + redisFreeSSLContext(inst->ssl_conn); + inst->ssl_conn = NULL; + } + return; + } + if (redisInitiateSSLWithContext(rconn, inst->ssl_conn) != REDIS_OK) { + LogError(0, NO_ERRCODE, "imhiredis: %s", rconn->errstr); + LogError(0, NO_ERRCODE, "imhiredis: SSL Context error: %s", redisSSLContextGetError(inst->ssl_error)); + if (inst->currentNode->usesSocket) { + LogError(0, RS_RET_REDIS_ERROR, "imhiredis: [TLS] can not connect to redis server '%s' " + "-> could not allocate context!\n", inst->currentNode->socketPath); + } else { + LogError(0, RS_RET_REDIS_ERROR, "imhiredis: [TLS] can not connect to redis server '%s', " + "port %d -> could not allocate context!\n", inst->currentNode->server, inst->currentNode->port); + } + + if (inst->ssl_conn != NULL) { + redisFreeSSLContext(inst->ssl_conn); + inst->ssl_conn = NULL; + } + return; + } + } +#endif + return; + } @@ -1132,7 +1245,7 @@ rsRetVal redisAuthentAsynchronous(redisAsyncContext *aconn, uchar *password) { * - a single (master) node if the provided node was a replica * - a list of (replica) nodes if the provided node was a master */ -rsRetVal redisGetServersList(redisNode *node, uchar *password, redisNode **result) { +rsRetVal redisGetServersList(redisNode *node, uchar *password, redisNode **result, instanceConf_t *const inst) { DEFiRet; redisContext *context; redisReply *reply = NULL, *replica; @@ -1140,7 +1253,7 @@ rsRetVal redisGetServersList(redisNode *node, uchar *password, redisNode **resul assert(node != NULL); - CHKiRet(redisConnectSync(&context, node)); + CHKiRet(redisConnectSync(&context, node, inst)); if(password != NULL && password[0] != '\0') { CHKiRet(redisAuthentSynchronous(context, password)); @@ -1194,6 +1307,12 @@ rsRetVal redisGetServersList(redisNode *node, uchar *password, redisNode **resul freeReplyObject(reply); if (context != NULL) redisFree(context); +#ifdef HIREDIS_SSL + if (inst->ssl_conn != NULL) { + redisFreeSSLContext(inst->ssl_conn); + inst->ssl_conn = NULL; + } +#endif RETiRet; } @@ -1226,7 +1345,7 @@ rsRetVal redisActualizeCurrentNode(instanceConf_t *inst) { DBGPRINTF("imhiredis: trying to connect to node to get info...\n"); dbgPrintNode(node); - if (RS_RET_OK == redisGetServersList(node, inst->password, &tmp)) { + if (RS_RET_OK == redisGetServersList(node, inst->password, &tmp, inst)) { // server replied if (tmp && tmp->isMaster) { @@ -1236,7 +1355,7 @@ rsRetVal redisActualizeCurrentNode(instanceConf_t *inst) { tmp = NULL; // try to connect to the master and get replicas - if(RS_RET_OK != redisGetServersList(inst->currentNode, inst->password, &tmp)) { + if(RS_RET_OK != redisGetServersList(inst->currentNode, inst->password, &tmp, inst)) { /* had a master, but cannot connect * save suspected master in new list but keep searching with other nodes @@ -1315,7 +1434,7 @@ rsRetVal redisAuthenticate(instanceConf_t *inst) { // Create a temporary context for synchronous connection, used to validate AUTH command in asynchronous contexts if (inst->mode == IMHIREDIS_MODE_SUBSCRIBE) { - if (RS_RET_OK != redisConnectSync(&usedContext, inst->currentNode)) { + if (RS_RET_OK != redisConnectSync(&usedContext, inst->currentNode, inst)) { LogMsg(0, RS_RET_REDIS_ERROR, LOG_WARNING, "imhiredis: could not connect to current " "active node synchronously to validate authentication"); ABORT_FINALIZE(RS_RET_REDIS_ERROR); @@ -1341,6 +1460,12 @@ rsRetVal redisAuthenticate(instanceConf_t *inst) { redisFree(usedContext); if(reply) freeReplyObject(reply); +#ifdef HIREDIS_SSL + if (inst->mode == IMHIREDIS_MODE_SUBSCRIBE && inst-> ssl_conn != NULL) { + redisFreeSSLContext(inst->ssl_conn); + inst->ssl_conn = NULL; + } +#endif RETiRet; } @@ -1349,7 +1474,7 @@ rsRetVal redisAuthenticate(instanceConf_t *inst) { * connection function for synchronous (queue) mode * node should not be NULL */ -rsRetVal redisConnectSync(redisContext **conn, redisNode *node) { +rsRetVal redisConnectSync(redisContext **conn, redisNode *node, instanceConf_t *const inst) { DEFiRet; assert(node != NULL); @@ -1379,13 +1504,49 @@ rsRetVal redisConnectSync(redisContext **conn, redisNode *node) { } ABORT_FINALIZE(RS_RET_REDIS_ERROR); } - +#ifdef HIREDIS_SSL + if (inst->use_tls) { + inst->ssl_conn = redisCreateSSLContext(inst->ca_cert_bundle, inst->ca_cert_dir, inst->client_cert, inst->client_key, inst->sni, &inst->ssl_error); + + if (!inst->ssl_conn || inst->ssl_error != REDIS_SSL_CTX_NONE) { + LogError(0, NO_ERRCODE, "imhiredis: SSL Context error: %s", redisSSLContextGetError(inst->ssl_error)); + if (node->usesSocket) { + LogError(0, RS_RET_REDIS_ERROR, "imhiredis: [TLS] can not connect to redis server '%s' " + "-> could not allocate context!\n", node->socketPath); + } else { + LogError(0, RS_RET_REDIS_ERROR, "imhiredis: [TLS] can not connect to redis server '%s', " + "port %d -> could not allocate context!\n", node->server, node->port); + } + ABORT_FINALIZE(RS_RET_REDIS_ERROR); + } + if (redisInitiateSSLWithContext(*conn, inst->ssl_conn) != REDIS_OK) { + LogError(0, NO_ERRCODE, "imhiredis: %s", (*conn)->errstr); + LogError(0, NO_ERRCODE, "imhiredis: SSL Context error: %s", redisSSLContextGetError(inst->ssl_error)); + if (node->usesSocket) { + LogError(0, RS_RET_REDIS_ERROR, "imhiredis: [TLS] can not connect to redis server '%s' " + "-> could not allocate context!\n", node->socketPath); + } else { + LogError(0, RS_RET_REDIS_ERROR, "imhiredis: [TLS] can not connect to redis server '%s', " + "port %d -> could not allocate context!\n", node->server, node->port); + } + ABORT_FINALIZE(RS_RET_REDIS_ERROR); + } + } +#endif finalize_it: if (iRet != RS_RET_OK) { if (*conn) redisFree(*conn); *conn = NULL; } +#ifdef HIREDIS_SSL + if (iRet != RS_RET_OK) { + if(inst->ssl_conn != NULL){ + redisFreeSSLContext(inst->ssl_conn); + inst->ssl_conn = NULL; + } + } +#endif RETiRet; } @@ -1438,6 +1599,12 @@ rsRetVal connectMasterAsync(instanceConf_t *inst) { inst->currentNode = NULL; ABORT_FINALIZE(RS_RET_REDIS_ERROR); } + + inst->aconn->data = (void *)inst; + redisAsyncSetConnectCallback(inst->aconn, redisAsyncConnectCallback); + redisAsyncSetDisconnectCallback(inst->aconn, redisAsyncDisconnectCallback); + redisLibeventAttach(inst->aconn, inst->evtBase); + if( inst->password != NULL && inst->password[0] != '\0' && RS_RET_OK != redisAuthenticate(inst)) { @@ -1445,15 +1612,16 @@ rsRetVal connectMasterAsync(instanceConf_t *inst) { redisAsyncFree(inst->aconn); inst->aconn = NULL; inst->currentNode = NULL; + #ifdef HIREDIS_SSL + if(inst->ssl_conn != NULL){ + redisFreeSSLContext(inst->ssl_conn); + inst->ssl_conn = NULL; + } + + #endif ABORT_FINALIZE(RS_RET_REDIS_AUTH_FAILED); } - // finalize context creation - inst->aconn->data = (void *)inst; - redisAsyncSetConnectCallback(inst->aconn, redisAsyncConnectCallback); - redisAsyncSetDisconnectCallback(inst->aconn, redisAsyncDisconnectCallback); - redisLibeventAttach(inst->aconn, inst->evtBase); - finalize_it: RETiRet; } @@ -1473,8 +1641,7 @@ static sbool isConnectedAsync(instanceConf_t *inst) { */ rsRetVal connectMasterSync(instanceConf_t *inst) { DEFiRet; - - if(RS_RET_OK != redisConnectSync(&(inst->conn), inst->currentNode)) { + if(RS_RET_OK != redisConnectSync(&(inst->conn), inst->currentNode, inst)) { inst->currentNode = NULL; ABORT_FINALIZE(RS_RET_REDIS_ERROR); }