diff --git a/.github/workflows/package.yml b/.github/workflows/package.yml index 38655673..955b3e4c 100644 --- a/.github/workflows/package.yml +++ b/.github/workflows/package.yml @@ -20,6 +20,6 @@ jobs: run: './php-rdkafka/.github/workflows/package/package.sh' - name: 'Archive package' - uses: 'actions/upload-artifact@v2' + uses: 'actions/upload-artifact@v4' with: path: 'php-rdkafka/rdkafka.tgz' diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 7ae9ecb2..d986c660 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -109,28 +109,39 @@ jobs: # librdkafka 1.0.1 - php: '8.1.0' librdkafka: 'v1.0.1' + skipoauth: '1' - php: '8.0.0' librdkafka: 'v1.0.1' + skipoauth: '1' - php: '7.4.0' librdkafka: 'v1.0.1' + skipoauth: '1' - php: '7.3.0' librdkafka: 'v1.0.1' + skipoauth: '1' # librdkafka 0.11.6 - php: '8.1.0' librdkafka: 'v0.11.6' + skipoauth: '1' - php: '8.0.0' librdkafka: 'v0.11.6' + skipoauth: '1' - php: '7.4.0' librdkafka: 'v0.11.6' + skipoauth: '1' - php: '7.3.0' librdkafka: 'v0.11.6' + skipoauth: '1' - php: '7.2.0' librdkafka: 'v0.11.6' + skipoauth: '1' - php: '7.1.0' librdkafka: 'v0.11.6' + skipoauth: '1' - php: '7.0.0' librdkafka: 'v0.11.6' + skipoauth: '1' # librdkafka master (experimental, does not block PRs) - php: '8.3.0' @@ -158,6 +169,7 @@ jobs: PHP_VERSION: ${{ matrix.php }} LIBRDKAFKA_VERSION: ${{ matrix.librdkafka }} MEMORY_CHECK: ${{ matrix.memcheck }} + SKIP_OAUTH: ${{ matrix.skipoauth }} TEST_KAFKA_BROKERS: kafka:9092 TEST_KAFKA_BROKER_VERSION: 2.6 steps: diff --git a/.github/workflows/test/start-kafka.sh b/.github/workflows/test/start-kafka.sh index 8892f6f5..1d53b60d 100755 --- a/.github/workflows/test/start-kafka.sh +++ b/.github/workflows/test/start-kafka.sh @@ -4,17 +4,63 @@ docker network create kafka_network docker pull wurstmeister/zookeeper:latest docker run -d --network kafka_network --name zookeeper wurstmeister/zookeeper:latest docker pull wurstmeister/kafka:latest -docker run -d -p 9092:9092 --network kafka_network -e "KAFKA_AUTO_CREATE_TOPICS_ENABLE=true" -e "KAFKA_CREATE_TOPICS=test-topic:1:1:compact" -e "KAFKA_ADVERTISED_HOST_NAME=kafka" -e "KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181" -e "KAFKA_ADVERTISED_PORT=9092" --name kafka wurstmeister/kafka:latest -printf "\n127.0.0.1 kafka\n"|sudo tee /etc/hosts >/dev/null +docker run -d -p 9092:9092 --network kafka_network \ + -e "KAFKA_AUTO_CREATE_TOPICS_ENABLE=true" \ + -e "KAFKA_CREATE_TOPICS=test-topic:1:1:compact" \ + -e "KAFKA_BROKER_ID=1" \ + -e "KAFKA_ADVERTISED_HOST_NAME=kafka" \ + -e "KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181/kafka" \ + -e "KAFKA_ADVERTISED_PORT=9092" \ + --name kafka wurstmeister/kafka:latest -echo "Waiting for Kafka to be ready" +if [ ${SKIP_OAUTH:-0} -ne 1 ]; then + docker run -d -p 29092:29092 --network kafka_network \ + -e "KAFKA_AUTO_CREATE_TOPICS_ENABLE=true" \ + -e "KAFKA_CREATE_TOPICS=test-topic:1:1:compact" \ + -e "KAFKA_ADVERTISED_HOST_NAME=kafka_oauth2" \ + -e "KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181/kafka_oauth2" \ + -e "KAFKA_ADVERTISED_PORT=29092" \ + -e "KAFKA_BROKER_ID=2" \ + -e "KAFKA_LISTENERS=SASLPLAINTEXT://kafka_oauth2:29092" \ + -e "KAFKA_ADVERTISED_LISTENERS=SASLPLAINTEXT://kafka_oauth2:29092" \ + -e "KAFKA_LISTENER_NAME_SASLPLAINTEXT_OAUTHBEARER_SASL_JAAS_CONFIG=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required unsecuredValidatorRequiredScope=\"required-scope\" unsecuredLoginStringClaim_scope=\"required-scope\" unsecuredLoginStringClaim_sub=\"admin\";" \ + -e "KAFKA_INTER_BROKER_LISTENER_NAME=SASLPLAINTEXT" \ + -e "KAFKA_SASL_ENABLED_MECHANISMS=OAUTHBEARER" \ + -e "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=SASLPLAINTEXT:SASL_PLAINTEXT" \ + -e "KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL=OAUTHBEARER" \ + --name kafka_oauth2 wurstmeister/kafka:latest +fi + +printf "\n127.0.0.1 kafka\n127.0.0.1 kafka_oauth2\n"|sudo tee /etc/hosts >/dev/null + +echo "Waiting for Kafka services to be ready" + +kafka_ready=0 +kafka_oauth2_ready=0 for i in $(seq 1 20); do - if kafkacat -b 127.0.0.1 -L; then - echo "Kafka is ready" - exit 0 + if [ $kafka_ready -eq 0 ]; then + if kafkacat -b kafka:9092 -L -m 30 -d broker; then + kafka_ready=1 + echo "Kafka is ready" + fi + fi + if [ $kafka_oauth2_ready -eq 0 ] && [ ${SKIP_OAUTH:-0} -ne 1 ]; then + if kafkacat -b kafka_oauth2:29092 \ + -X security.protocol=SASL_PLAINTEXT \ + -X sasl.mechanisms=OAUTHBEARER \ + -X enable.sasl.oauthbearer.unsecure.jwt="true" \ + -X sasl.oauthbearer.config="principal=admin scope=required-scope" -L + then + kafka_oauth2_ready=1 + echo "Kafka OAuth2 is ready" + fi + fi + + if [ $kafka_ready -eq 1 ] && ( [ $kafka_oauth2_ready -eq 1 ] || [ ${SKIP_OAUTH:-0} -eq 1 ] ); then + exit 0 fi done -echo "Timedout waiting for Kafka to be ready" +echo "Timedout waiting for Kafka services to be ready" exit 1 diff --git a/conf.c b/conf.c index df3472b5..9aa8132f 100644 --- a/conf.c +++ b/conf.c @@ -81,6 +81,7 @@ static void kafka_conf_callback_copy(kafka_conf_callback **to, kafka_conf_callba void kafka_conf_callbacks_copy(kafka_conf_callbacks *to, kafka_conf_callbacks *from) /* {{{ */ { + kafka_conf_callback_copy(&to->oauthbearer_token_refresh, from->oauthbearer_token_refresh); kafka_conf_callback_copy(&to->error, from->error); kafka_conf_callback_copy(&to->rebalance, from->rebalance); kafka_conf_callback_copy(&to->dr_msg, from->dr_msg); @@ -734,7 +735,7 @@ PHP_METHOD(RdKafka_Conf, setLogCb) } /* }}} */ -#ifdef HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB +#ifdef HAS_RD_KAFKA_OAUTHBEARER /* {{{ proto void RdKafka\Conf::setOauthbearerTokenRefreshCb(mixed $callback) Set token refresh callback for OAUTHBEARER sasl */ PHP_METHOD(RdKafka_Conf, setOauthbearerTokenRefreshCb) diff --git a/conf.stub.php b/conf.stub.php index 7aa6a62d..d1ce97e9 100644 --- a/conf.stub.php +++ b/conf.stub.php @@ -45,7 +45,7 @@ public function setOffsetCommitCb(callable $callback): void {} /** @tentative-return-type */ public function setLogCb(callable $callback): void {} - #ifdef HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB + #ifdef HAS_RD_KAFKA_OAUTHBEARER /** @tentative-return-type */ public function setOauthbearerTokenRefreshCb(callable $callback): void {} #endif diff --git a/conf_arginfo.h b/conf_arginfo.h index 75d7a2da..d179aee1 100644 --- a/conf_arginfo.h +++ b/conf_arginfo.h @@ -1,5 +1,5 @@ /* This is a generated file, edit the .stub.php file instead. - * Stub hash: 86934f54199c0af37cadfcedafeaffb569e33beb */ + * Stub hash: a72d2e1796ed7f89185f543973c659a6a704f347 */ ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_Conf___construct, 0, 0, 0) ZEND_END_ARG_INFO() @@ -48,7 +48,7 @@ ZEND_END_ARG_INFO() #define arginfo_class_RdKafka_Conf_setLogCb arginfo_class_RdKafka_Conf_setErrorCb -#if defined(HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB) +#if defined(HAS_RD_KAFKA_OAUTHBEARER) #if (PHP_VERSION_ID >= 80100) ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_Conf_setOauthbearerTokenRefreshCb, 0, 1, IS_VOID, 0) #else @@ -83,7 +83,7 @@ ZEND_METHOD(RdKafka_Conf, setRebalanceCb); ZEND_METHOD(RdKafka_Conf, setConsumeCb); ZEND_METHOD(RdKafka_Conf, setOffsetCommitCb); ZEND_METHOD(RdKafka_Conf, setLogCb); -#if defined(HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB) +#if defined(HAS_RD_KAFKA_OAUTHBEARER) ZEND_METHOD(RdKafka_Conf, setOauthbearerTokenRefreshCb); #endif ZEND_METHOD(RdKafka_TopicConf, __construct); @@ -101,7 +101,7 @@ static const zend_function_entry class_RdKafka_Conf_methods[] = { ZEND_ME(RdKafka_Conf, setConsumeCb, arginfo_class_RdKafka_Conf_setConsumeCb, ZEND_ACC_PUBLIC) ZEND_ME(RdKafka_Conf, setOffsetCommitCb, arginfo_class_RdKafka_Conf_setOffsetCommitCb, ZEND_ACC_PUBLIC) ZEND_ME(RdKafka_Conf, setLogCb, arginfo_class_RdKafka_Conf_setLogCb, ZEND_ACC_PUBLIC) -#if defined(HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB) +#if defined(HAS_RD_KAFKA_OAUTHBEARER) ZEND_ME(RdKafka_Conf, setOauthbearerTokenRefreshCb, arginfo_class_RdKafka_Conf_setOauthbearerTokenRefreshCb, ZEND_ACC_PUBLIC) #endif ZEND_FE_END @@ -128,7 +128,11 @@ static zend_class_entry *register_class_RdKafka_Conf(void) zend_class_entry ce, *class_entry; INIT_NS_CLASS_ENTRY(ce, "RdKafka", "Conf", class_RdKafka_Conf_methods); +#if (PHP_VERSION_ID >= 80400) + class_entry = zend_register_internal_class_with_flags(&ce, NULL, 0); +#else class_entry = zend_register_internal_class_ex(&ce, NULL); +#endif return class_entry; } @@ -138,7 +142,11 @@ static zend_class_entry *register_class_RdKafka_TopicConf(void) zend_class_entry ce, *class_entry; INIT_NS_CLASS_ENTRY(ce, "RdKafka", "TopicConf", class_RdKafka_TopicConf_methods); +#if (PHP_VERSION_ID >= 80400) + class_entry = zend_register_internal_class_with_flags(&ce, NULL, 0); +#else class_entry = zend_register_internal_class_ex(&ce, NULL); +#endif return class_entry; } diff --git a/conf_legacy_arginfo.h b/conf_legacy_arginfo.h index 5601e9d6..72305d3b 100644 --- a/conf_legacy_arginfo.h +++ b/conf_legacy_arginfo.h @@ -1,5 +1,5 @@ /* This is a generated file, edit the .stub.php file instead. - * Stub hash: 86934f54199c0af37cadfcedafeaffb569e33beb */ + * Stub hash: a72d2e1796ed7f89185f543973c659a6a704f347 */ ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_Conf___construct, 0, 0, 0) ZEND_END_ARG_INFO() @@ -32,7 +32,7 @@ ZEND_END_ARG_INFO() #define arginfo_class_RdKafka_Conf_setLogCb arginfo_class_RdKafka_Conf_setErrorCb -#if defined(HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB) +#if defined(HAS_RD_KAFKA_OAUTHBEARER) ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_Conf_setOauthbearerTokenRefreshCb, 0, 0, 1) ZEND_ARG_INFO(0, callback) ZEND_END_ARG_INFO() @@ -59,7 +59,7 @@ ZEND_METHOD(RdKafka_Conf, setRebalanceCb); ZEND_METHOD(RdKafka_Conf, setConsumeCb); ZEND_METHOD(RdKafka_Conf, setOffsetCommitCb); ZEND_METHOD(RdKafka_Conf, setLogCb); -#if defined(HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB) +#if defined(HAS_RD_KAFKA_OAUTHBEARER) ZEND_METHOD(RdKafka_Conf, setOauthbearerTokenRefreshCb); #endif ZEND_METHOD(RdKafka_TopicConf, __construct); @@ -77,7 +77,7 @@ static const zend_function_entry class_RdKafka_Conf_methods[] = { ZEND_ME(RdKafka_Conf, setConsumeCb, arginfo_class_RdKafka_Conf_setConsumeCb, ZEND_ACC_PUBLIC) ZEND_ME(RdKafka_Conf, setOffsetCommitCb, arginfo_class_RdKafka_Conf_setOffsetCommitCb, ZEND_ACC_PUBLIC) ZEND_ME(RdKafka_Conf, setLogCb, arginfo_class_RdKafka_Conf_setLogCb, ZEND_ACC_PUBLIC) -#if defined(HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB) +#if defined(HAS_RD_KAFKA_OAUTHBEARER) ZEND_ME(RdKafka_Conf, setOauthbearerTokenRefreshCb, arginfo_class_RdKafka_Conf_setOauthbearerTokenRefreshCb, ZEND_ACC_PUBLIC) #endif ZEND_FE_END @@ -104,7 +104,11 @@ static zend_class_entry *register_class_RdKafka_Conf(void) zend_class_entry ce, *class_entry; INIT_NS_CLASS_ENTRY(ce, "RdKafka", "Conf", class_RdKafka_Conf_methods); +#if (PHP_VERSION_ID >= 80400) + class_entry = zend_register_internal_class_with_flags(&ce, NULL, 0); +#else class_entry = zend_register_internal_class_ex(&ce, NULL); +#endif return class_entry; } @@ -114,7 +118,11 @@ static zend_class_entry *register_class_RdKafka_TopicConf(void) zend_class_entry ce, *class_entry; INIT_NS_CLASS_ENTRY(ce, "RdKafka", "TopicConf", class_RdKafka_TopicConf_methods); +#if (PHP_VERSION_ID >= 80400) + class_entry = zend_register_internal_class_with_flags(&ce, NULL, 0); +#else class_entry = zend_register_internal_class_ex(&ce, NULL); +#endif return class_entry; } diff --git a/config.m4 b/config.m4 index 17c9c32e..356a1df3 100644 --- a/config.m4 +++ b/config.m4 @@ -94,10 +94,10 @@ if test "$PHP_RDKAFKA" != "no"; then AC_MSG_WARN([murmur2 partitioner is not available]) ]) - AC_CHECK_LIB($LIBNAME,[rd_kafka_conf_set_oauthbearer_token_refresh_cb],[ - AC_DEFINE(HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB,1,[ ]) + AC_CHECK_LIB($LIBNAME,[rd_kafka_oauthbearer_set_token],[ + AC_DEFINE(HAS_RD_KAFKA_OAUTHBEARER,1,[ ]) ],[ - AC_MSG_WARN([oauthbearer token refresh cb is not available]) + AC_MSG_WARN([oauthbearer support is not available]) ]) AC_CHECK_LIB($LIBNAME,[rd_kafka_incremental_assign, rd_kafka_incremental_unassign],[ diff --git a/package.xml b/package.xml index 16957120..7ac09576 100644 --- a/package.xml +++ b/package.xml @@ -132,6 +132,7 @@ + diff --git a/rdkafka.c b/rdkafka.c index 5d9636e1..4029409a 100644 --- a/rdkafka.c +++ b/rdkafka.c @@ -431,6 +431,145 @@ PHP_METHOD(RdKafka, setLogLevel) } /* }}} */ +#ifdef HAS_RD_KAFKA_OAUTHBEARER +/* {{{ proto void RdKafka::oauthbearerSetToken(string $token_value, int $lifetime_ms, string $principal_name, array $extensions = []) + * Set SASL/OAUTHBEARER token and metadata + * + * The SASL/OAUTHBEARER token refresh callback or event handler should cause + * this method to be invoked upon success, via + * $kafka->oauthbearerSetToken(). The extension keys must not include the + * reserved key "`auth`", and all extension keys and values must conform to the + * required format as per https://tools.ietf.org/html/rfc7628#section-3.1: + * + * key = 1*(ALPHA) + * value = *(VCHAR / SP / HTAB / CR / LF ) +*/ +PHP_METHOD(RdKafka, oauthbearerSetToken) +{ + kafka_object *intern; + char *token_value; + size_t token_value_len; + zend_long lifetime_ms; + char *principal_name; + size_t principal_len; + HashTable *extensions_hash = NULL; + + char errstr[512]; + rd_kafka_resp_err_t ret = 0; + + if (zend_parse_parameters(ZEND_NUM_ARGS(), "sls|h", &token_value, &token_value_len, &lifetime_ms, &principal_name, &principal_len, &extensions_hash) == FAILURE) { + return; + } + + intern = get_kafka_object(getThis()); + if (!intern) { + return; + } + + errstr[0] = '\0'; + + int extensions_size = 0; + char **extensions = NULL; + + if (extensions_hash != NULL) { + extensions_size = zend_hash_num_elements(extensions_hash) * 2; + extensions = safe_emalloc((extensions_size * 2), sizeof(char *), 0); + + int pos = 0; + zend_ulong num_key; + zend_string *extension_key_str; + zval *extension_zval; + ZEND_HASH_FOREACH_KEY_VAL(extensions_hash, num_key, extension_key_str, extension_zval) { + if (!extension_key_str) { + extension_key_str = zend_long_to_str(num_key); + extensions[pos++] = estrdup(ZSTR_VAL(extension_key_str)); + zend_string_release(extension_key_str); + } else { + extensions[pos++] = estrdup(ZSTR_VAL(extension_key_str)); + } + + zend_string *tmp_extension_val_str; + zend_string *extension_val_str = zval_get_tmp_string(extension_zval, &tmp_extension_val_str); + extensions[pos++] = estrdup(ZSTR_VAL(extension_val_str)); + if (tmp_extension_val_str) { + zend_string_release(tmp_extension_val_str); + } + } ZEND_HASH_FOREACH_END(); + } + + ret = rd_kafka_oauthbearer_set_token( + intern->rk, + token_value, + lifetime_ms, + principal_name, + (const char **)extensions, + extensions_size, + errstr, + sizeof(errstr)); + + if (extensions != NULL) { + for (int i = 0; i < extensions_size; i++) { + efree(extensions[i]); + } + efree(extensions); + } + + switch (ret) { + case RD_KAFKA_RESP_ERR__INVALID_ARG: + zend_throw_exception(ce_kafka_exception, errstr, RD_KAFKA_RESP_ERR__INVALID_ARG); + return; + case RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED: + zend_throw_exception(ce_kafka_exception, errstr, RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED); + return; + case RD_KAFKA_RESP_ERR__STATE: + zend_throw_exception(ce_kafka_exception, errstr, RD_KAFKA_RESP_ERR__STATE); + return; + case RD_KAFKA_RESP_ERR_NO_ERROR: + break; + default: + return; + } +} +/* }}} */ + +/* {{{ proto void RdKafka::oauthbearerSetTokenFailure(string $error) + The SASL/OAUTHBEARER token refresh callback or event handler should cause + this method to be invoked upon failure, via + rd_kafka_oauthbearer_set_token_failure(). +*/ +PHP_METHOD(RdKafka, oauthbearerSetTokenFailure) +{ + kafka_object *intern; + const char *errstr; + size_t errstr_len; + + if (zend_parse_parameters(ZEND_NUM_ARGS(), "s", &errstr, &errstr_len) == FAILURE) { + return; + } + + intern = get_kafka_object(getThis()); + if (!intern) { + return; + } + + rd_kafka_resp_err_t ret = rd_kafka_oauthbearer_set_token_failure(intern->rk, errstr); + + switch (ret) { + case RD_KAFKA_RESP_ERR__INVALID_ARG: + zend_throw_exception(ce_kafka_exception, NULL, RD_KAFKA_RESP_ERR__INVALID_ARG); + return; + case RD_KAFKA_RESP_ERR__STATE: + zend_throw_exception(ce_kafka_exception, NULL, RD_KAFKA_RESP_ERR__STATE); + return; + case RD_KAFKA_RESP_ERR_NO_ERROR: + break; + default: + return; + } +} +/* }}} */ +#endif + /* {{{ proto RdKafka\Topic RdKafka::newTopic(string $topic) Returns an RdKafka\Topic object */ PHP_METHOD(RdKafka, newTopic) diff --git a/rdkafka.stub.php b/rdkafka.stub.php index 26601ac5..3d99ef22 100644 --- a/rdkafka.stub.php +++ b/rdkafka.stub.php @@ -79,6 +79,14 @@ public function pausePartitions(array $topic_partitions): array {} /** @tentative-return-type */ public function resumePartitions(array $topic_partitions): array {} + + #ifdef HAS_RD_KAFKA_OAUTHBEARER + /** @tentative-return-type */ + public function oauthbearerSetToken(string $token_value, int $lifetime_ms, string $principal_name, array $extensions = []): void {} + + /** @tentative-return-type */ + public function oauthbearerSetTokenFailure(string $error): void {} + #endif } } diff --git a/rdkafka_arginfo.h b/rdkafka_arginfo.h index 220d6da3..0e4fe52e 100644 --- a/rdkafka_arginfo.h +++ b/rdkafka_arginfo.h @@ -1,5 +1,5 @@ /* This is a generated file, edit the .stub.php file instead. - * Stub hash: 2d70b7756ae39db557148a4c65fc6bc5c164b102 */ + * Stub hash: ea957a110b42c19bcb4a244655c1eaf99a1e3961 */ ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka___construct, 0, 0, 0) ZEND_END_ARG_INFO() @@ -119,6 +119,27 @@ ZEND_END_ARG_INFO() #define arginfo_class_RdKafka_resumePartitions arginfo_class_RdKafka_pausePartitions +#if defined(HAS_RD_KAFKA_OAUTHBEARER) +#if (PHP_VERSION_ID >= 80100) +ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_oauthbearerSetToken, 0, 3, IS_VOID, 0) +#else +ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_oauthbearerSetToken, 0, 0, 3) +#endif + ZEND_ARG_TYPE_INFO(0, token_value, IS_STRING, 0) + ZEND_ARG_TYPE_INFO(0, lifetime_ms, IS_LONG, 0) + ZEND_ARG_TYPE_INFO(0, principal_name, IS_STRING, 0) + ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(0, extensions, IS_ARRAY, 0, "[]") +ZEND_END_ARG_INFO() + +#if (PHP_VERSION_ID >= 80100) +ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_oauthbearerSetTokenFailure, 0, 1, IS_VOID, 0) +#else +ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_oauthbearerSetTokenFailure, 0, 0, 1) +#endif + ZEND_ARG_TYPE_INFO(0, error, IS_STRING, 0) +ZEND_END_ARG_INFO() +#endif + ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_Consumer___construct, 0, 0, 0) ZEND_ARG_OBJ_INFO_WITH_DEFAULT_VALUE(0, conf, RdKafka\\Conf, 1, "null") ZEND_END_ARG_INFO() @@ -140,22 +161,16 @@ ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_Producer_initTransactions, 0, 0, 1) #endif ZEND_ARG_TYPE_INFO(0, timeout_ms, IS_LONG, 0) ZEND_END_ARG_INFO() -#endif -#if defined(HAS_RD_KAFKA_TRANSACTIONS) #if (PHP_VERSION_ID >= 80100) ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_Producer_beginTransaction, 0, 0, IS_VOID, 0) #else ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_Producer_beginTransaction, 0, 0, 0) #endif ZEND_END_ARG_INFO() -#endif -#if defined(HAS_RD_KAFKA_TRANSACTIONS) #define arginfo_class_RdKafka_Producer_commitTransaction arginfo_class_RdKafka_Producer_initTransactions -#endif -#if defined(HAS_RD_KAFKA_TRANSACTIONS) #define arginfo_class_RdKafka_Producer_abortTransaction arginfo_class_RdKafka_Producer_initTransactions #endif @@ -178,19 +193,17 @@ ZEND_METHOD(RdKafka, queryWatermarkOffsets); ZEND_METHOD(RdKafka, offsetsForTimes); ZEND_METHOD(RdKafka, pausePartitions); ZEND_METHOD(RdKafka, resumePartitions); +#if defined(HAS_RD_KAFKA_OAUTHBEARER) +ZEND_METHOD(RdKafka, oauthbearerSetToken); +ZEND_METHOD(RdKafka, oauthbearerSetTokenFailure); +#endif ZEND_METHOD(RdKafka_Consumer, __construct); ZEND_METHOD(RdKafka_Consumer, newQueue); ZEND_METHOD(RdKafka_Producer, __construct); #if defined(HAS_RD_KAFKA_TRANSACTIONS) ZEND_METHOD(RdKafka_Producer, initTransactions); -#endif -#if defined(HAS_RD_KAFKA_TRANSACTIONS) ZEND_METHOD(RdKafka_Producer, beginTransaction); -#endif -#if defined(HAS_RD_KAFKA_TRANSACTIONS) ZEND_METHOD(RdKafka_Producer, commitTransaction); -#endif -#if defined(HAS_RD_KAFKA_TRANSACTIONS) ZEND_METHOD(RdKafka_Producer, abortTransaction); #endif @@ -224,10 +237,10 @@ static const zend_function_entry class_RdKafka_methods[] = { ZEND_ME(RdKafka, offsetsForTimes, arginfo_class_RdKafka_offsetsForTimes, ZEND_ACC_PUBLIC) ZEND_ME(RdKafka, pausePartitions, arginfo_class_RdKafka_pausePartitions, ZEND_ACC_PUBLIC) ZEND_ME(RdKafka, resumePartitions, arginfo_class_RdKafka_resumePartitions, ZEND_ACC_PUBLIC) - ZEND_FE_END -}; - -static const zend_function_entry class_RdKafka_Exception_methods[] = { +#if defined(HAS_RD_KAFKA_OAUTHBEARER) + ZEND_ME(RdKafka, oauthbearerSetToken, arginfo_class_RdKafka_oauthbearerSetToken, ZEND_ACC_PUBLIC) + ZEND_ME(RdKafka, oauthbearerSetTokenFailure, arginfo_class_RdKafka_oauthbearerSetTokenFailure, ZEND_ACC_PUBLIC) +#endif ZEND_FE_END }; @@ -241,14 +254,8 @@ static const zend_function_entry class_RdKafka_Producer_methods[] = { ZEND_ME(RdKafka_Producer, __construct, arginfo_class_RdKafka_Producer___construct, ZEND_ACC_PUBLIC) #if defined(HAS_RD_KAFKA_TRANSACTIONS) ZEND_ME(RdKafka_Producer, initTransactions, arginfo_class_RdKafka_Producer_initTransactions, ZEND_ACC_PUBLIC) -#endif -#if defined(HAS_RD_KAFKA_TRANSACTIONS) ZEND_ME(RdKafka_Producer, beginTransaction, arginfo_class_RdKafka_Producer_beginTransaction, ZEND_ACC_PUBLIC) -#endif -#if defined(HAS_RD_KAFKA_TRANSACTIONS) ZEND_ME(RdKafka_Producer, commitTransaction, arginfo_class_RdKafka_Producer_commitTransaction, ZEND_ACC_PUBLIC) -#endif -#if defined(HAS_RD_KAFKA_TRANSACTIONS) ZEND_ME(RdKafka_Producer, abortTransaction, arginfo_class_RdKafka_Producer_abortTransaction, ZEND_ACC_PUBLIC) #endif ZEND_FE_END @@ -259,8 +266,12 @@ static zend_class_entry *register_class_RdKafka(void) zend_class_entry ce, *class_entry; INIT_CLASS_ENTRY(ce, "RdKafka", class_RdKafka_methods); +#if (PHP_VERSION_ID >= 80400) + class_entry = zend_register_internal_class_with_flags(&ce, NULL, ZEND_ACC_ABSTRACT); +#else class_entry = zend_register_internal_class_ex(&ce, NULL); class_entry->ce_flags |= ZEND_ACC_ABSTRACT; +#endif zval property_error_cb_default_value; ZVAL_UNDEF(&property_error_cb_default_value); @@ -281,8 +292,12 @@ static zend_class_entry *register_class_RdKafka_Exception(zend_class_entry *clas { zend_class_entry ce, *class_entry; - INIT_NS_CLASS_ENTRY(ce, "RdKafka", "Exception", class_RdKafka_Exception_methods); + INIT_NS_CLASS_ENTRY(ce, "RdKafka", "Exception", NULL); +#if (PHP_VERSION_ID >= 80400) + class_entry = zend_register_internal_class_with_flags(&ce, class_entry_Exception, 0); +#else class_entry = zend_register_internal_class_ex(&ce, class_entry_Exception); +#endif return class_entry; } @@ -292,7 +307,11 @@ static zend_class_entry *register_class_RdKafka_Consumer(zend_class_entry *class zend_class_entry ce, *class_entry; INIT_NS_CLASS_ENTRY(ce, "RdKafka", "Consumer", class_RdKafka_Consumer_methods); +#if (PHP_VERSION_ID >= 80400) + class_entry = zend_register_internal_class_with_flags(&ce, class_entry_RdKafka, 0); +#else class_entry = zend_register_internal_class_ex(&ce, class_entry_RdKafka); +#endif return class_entry; } @@ -302,7 +321,11 @@ static zend_class_entry *register_class_RdKafka_Producer(zend_class_entry *class zend_class_entry ce, *class_entry; INIT_NS_CLASS_ENTRY(ce, "RdKafka", "Producer", class_RdKafka_Producer_methods); +#if (PHP_VERSION_ID >= 80400) + class_entry = zend_register_internal_class_with_flags(&ce, class_entry_RdKafka, 0); +#else class_entry = zend_register_internal_class_ex(&ce, class_entry_RdKafka); +#endif return class_entry; } diff --git a/rdkafka_legacy_arginfo.h b/rdkafka_legacy_arginfo.h index 5fcb7263..49f594b4 100644 --- a/rdkafka_legacy_arginfo.h +++ b/rdkafka_legacy_arginfo.h @@ -1,5 +1,5 @@ /* This is a generated file, edit the .stub.php file instead. - * Stub hash: 2d70b7756ae39db557148a4c65fc6bc5c164b102 */ + * Stub hash: ea957a110b42c19bcb4a244655c1eaf99a1e3961 */ ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka___construct, 0, 0, 0) ZEND_END_ARG_INFO() @@ -71,6 +71,19 @@ ZEND_END_ARG_INFO() #define arginfo_class_RdKafka_resumePartitions arginfo_class_RdKafka_pausePartitions +#if defined(HAS_RD_KAFKA_OAUTHBEARER) +ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_oauthbearerSetToken, 0, 0, 3) + ZEND_ARG_INFO(0, token_value) + ZEND_ARG_INFO(0, lifetime_ms) + ZEND_ARG_INFO(0, principal_name) + ZEND_ARG_INFO(0, extensions) +ZEND_END_ARG_INFO() + +ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_oauthbearerSetTokenFailure, 0, 0, 1) + ZEND_ARG_INFO(0, error) +ZEND_END_ARG_INFO() +#endif + ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_Consumer___construct, 0, 0, 0) ZEND_ARG_INFO(0, conf) ZEND_END_ARG_INFO() @@ -83,18 +96,12 @@ ZEND_END_ARG_INFO() ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_Producer_initTransactions, 0, 0, 1) ZEND_ARG_INFO(0, timeout_ms) ZEND_END_ARG_INFO() -#endif -#if defined(HAS_RD_KAFKA_TRANSACTIONS) ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_Producer_beginTransaction, 0, 0, 0) ZEND_END_ARG_INFO() -#endif -#if defined(HAS_RD_KAFKA_TRANSACTIONS) #define arginfo_class_RdKafka_Producer_commitTransaction arginfo_class_RdKafka_Producer_initTransactions -#endif -#if defined(HAS_RD_KAFKA_TRANSACTIONS) #define arginfo_class_RdKafka_Producer_abortTransaction arginfo_class_RdKafka_Producer_initTransactions #endif @@ -117,19 +124,17 @@ ZEND_METHOD(RdKafka, queryWatermarkOffsets); ZEND_METHOD(RdKafka, offsetsForTimes); ZEND_METHOD(RdKafka, pausePartitions); ZEND_METHOD(RdKafka, resumePartitions); +#if defined(HAS_RD_KAFKA_OAUTHBEARER) +ZEND_METHOD(RdKafka, oauthbearerSetToken); +ZEND_METHOD(RdKafka, oauthbearerSetTokenFailure); +#endif ZEND_METHOD(RdKafka_Consumer, __construct); ZEND_METHOD(RdKafka_Consumer, newQueue); ZEND_METHOD(RdKafka_Producer, __construct); #if defined(HAS_RD_KAFKA_TRANSACTIONS) ZEND_METHOD(RdKafka_Producer, initTransactions); -#endif -#if defined(HAS_RD_KAFKA_TRANSACTIONS) ZEND_METHOD(RdKafka_Producer, beginTransaction); -#endif -#if defined(HAS_RD_KAFKA_TRANSACTIONS) ZEND_METHOD(RdKafka_Producer, commitTransaction); -#endif -#if defined(HAS_RD_KAFKA_TRANSACTIONS) ZEND_METHOD(RdKafka_Producer, abortTransaction); #endif @@ -163,10 +168,10 @@ static const zend_function_entry class_RdKafka_methods[] = { ZEND_ME(RdKafka, offsetsForTimes, arginfo_class_RdKafka_offsetsForTimes, ZEND_ACC_PUBLIC) ZEND_ME(RdKafka, pausePartitions, arginfo_class_RdKafka_pausePartitions, ZEND_ACC_PUBLIC) ZEND_ME(RdKafka, resumePartitions, arginfo_class_RdKafka_resumePartitions, ZEND_ACC_PUBLIC) - ZEND_FE_END -}; - -static const zend_function_entry class_RdKafka_Exception_methods[] = { +#if defined(HAS_RD_KAFKA_OAUTHBEARER) + ZEND_ME(RdKafka, oauthbearerSetToken, arginfo_class_RdKafka_oauthbearerSetToken, ZEND_ACC_PUBLIC) + ZEND_ME(RdKafka, oauthbearerSetTokenFailure, arginfo_class_RdKafka_oauthbearerSetTokenFailure, ZEND_ACC_PUBLIC) +#endif ZEND_FE_END }; @@ -180,14 +185,8 @@ static const zend_function_entry class_RdKafka_Producer_methods[] = { ZEND_ME(RdKafka_Producer, __construct, arginfo_class_RdKafka_Producer___construct, ZEND_ACC_PUBLIC) #if defined(HAS_RD_KAFKA_TRANSACTIONS) ZEND_ME(RdKafka_Producer, initTransactions, arginfo_class_RdKafka_Producer_initTransactions, ZEND_ACC_PUBLIC) -#endif -#if defined(HAS_RD_KAFKA_TRANSACTIONS) ZEND_ME(RdKafka_Producer, beginTransaction, arginfo_class_RdKafka_Producer_beginTransaction, ZEND_ACC_PUBLIC) -#endif -#if defined(HAS_RD_KAFKA_TRANSACTIONS) ZEND_ME(RdKafka_Producer, commitTransaction, arginfo_class_RdKafka_Producer_commitTransaction, ZEND_ACC_PUBLIC) -#endif -#if defined(HAS_RD_KAFKA_TRANSACTIONS) ZEND_ME(RdKafka_Producer, abortTransaction, arginfo_class_RdKafka_Producer_abortTransaction, ZEND_ACC_PUBLIC) #endif ZEND_FE_END @@ -198,8 +197,12 @@ static zend_class_entry *register_class_RdKafka(void) zend_class_entry ce, *class_entry; INIT_CLASS_ENTRY(ce, "RdKafka", class_RdKafka_methods); +#if (PHP_VERSION_ID >= 80400) + class_entry = zend_register_internal_class_with_flags(&ce, NULL, ZEND_ACC_ABSTRACT); +#else class_entry = zend_register_internal_class_ex(&ce, NULL); class_entry->ce_flags |= ZEND_ACC_ABSTRACT; +#endif zval property_error_cb_default_value; ZVAL_NULL(&property_error_cb_default_value); @@ -220,8 +223,12 @@ static zend_class_entry *register_class_RdKafka_Exception(zend_class_entry *clas { zend_class_entry ce, *class_entry; - INIT_NS_CLASS_ENTRY(ce, "RdKafka", "Exception", class_RdKafka_Exception_methods); + INIT_NS_CLASS_ENTRY(ce, "RdKafka", "Exception", NULL); +#if (PHP_VERSION_ID >= 80400) + class_entry = zend_register_internal_class_with_flags(&ce, class_entry_Exception, 0); +#else class_entry = zend_register_internal_class_ex(&ce, class_entry_Exception); +#endif return class_entry; } @@ -231,7 +238,11 @@ static zend_class_entry *register_class_RdKafka_Consumer(zend_class_entry *class zend_class_entry ce, *class_entry; INIT_NS_CLASS_ENTRY(ce, "RdKafka", "Consumer", class_RdKafka_Consumer_methods); +#if (PHP_VERSION_ID >= 80400) + class_entry = zend_register_internal_class_with_flags(&ce, class_entry_RdKafka, 0); +#else class_entry = zend_register_internal_class_ex(&ce, class_entry_RdKafka); +#endif return class_entry; } @@ -241,7 +252,11 @@ static zend_class_entry *register_class_RdKafka_Producer(zend_class_entry *class zend_class_entry ce, *class_entry; INIT_NS_CLASS_ENTRY(ce, "RdKafka", "Producer", class_RdKafka_Producer_methods); +#if (PHP_VERSION_ID >= 80400) + class_entry = zend_register_internal_class_with_flags(&ce, class_entry_RdKafka, 0); +#else class_entry = zend_register_internal_class_ex(&ce, class_entry_RdKafka); +#endif return class_entry; } diff --git a/tests/controller_id.phpt b/tests/controller_id.phpt index af7e95f0..b9724804 100644 --- a/tests/controller_id.phpt +++ b/tests/controller_id.phpt @@ -20,6 +20,6 @@ $conf->set('group.id', 'test'); echo (new RdKafka\KafkaConsumer($conf))->getControllerId(10*1000) . \PHP_EOL; --EXPECT-- -1001 -1001 -1001 \ No newline at end of file +1 +1 +1 \ No newline at end of file diff --git a/tests/oauthbearer_integration.phpt b/tests/oauthbearer_integration.phpt new file mode 100644 index 00000000..b95de0f6 --- /dev/null +++ b/tests/oauthbearer_integration.phpt @@ -0,0 +1,151 @@ +--TEST-- +Oauthbearer +--SKIPIF-- += 0x01010000 || die("skip librdkafka too old does not support oauthbearer"); +--FILE-- + $jws, + 'principal' => $principal, + 'expiryMs' => $expiryMs, + ]; +} + +// Set up tests +$conf = new RdKafka\Conf(); +if (RD_KAFKA_VERSION >= 0x090000 && false !== getenv('TEST_KAFKA_BROKER_VERSION')) { + $conf->set('broker.version.fallback', getenv('TEST_KAFKA_BROKER_VERSION')); +} +$conf->set('metadata.broker.list', getenv('TEST_KAFKA_OAUTH_BROKERS')); +$conf->set('security.protocol', 'SASL_PLAINTEXT'); +$conf->set('sasl.mechanisms', 'OAUTHBEARER'); +$conf->set('sasl.oauthbearer.config', 'principal=admin'); +$conf->setLogCb(function ($kafka, $level, $facility, $message) {}); +$conf->setErrorCb(function ($producer, $err, $errstr) { + printf("%s: %s\n", rd_kafka_err2str($err), $errstr); +}); + +// Test that refresh token with setting token accurately will succeed when getting metadata +$conf->setOauthbearerTokenRefreshCb(function ($producer) { + echo "Refreshing token and succeeding\n"; + $token = generateJws(); + $producer->oauthbearerSetToken($token['value'], $token['expiryMs'], $token['principal']); +}); +$producer = new \RdKafka\Producer($conf); +$producer->poll(0); +$topicName = sprintf("test_rdkafka_%s", uniqid()); +$topic = $producer->newTopic($topicName); + +try { + $producer->getMetadata(false, $topic, 10*1000); + echo "Metadata retrieved successfully when refresh callback set token\n"; +} catch (\RdKafka\Exception $e) { + echo "FAIL: Caught exception when getting metadata after successfully refreshing any token\n"; +} + +// Test that refresh token with setting token failure will fail when getting metadata +$conf->setOauthbearerTokenRefreshCb(function ($producer) { + echo "Setting token failure in refresh cb\n"; + $producer->oauthbearerSetTokenFailure('Token failure before getting metadata'); + $producer->poll(0); +}); +$producer = new \RdKafka\Producer($conf); +$producer->poll(0); +$topicName = sprintf("test_rdkafka_%s", uniqid()); +$topic = $producer->newTopic($topicName); +try { + $producer->getMetadata(false, $topic, 10*1000); + echo "FAIL: Did not catch exception after not setting or refreshing any token\n"; +} catch (\RdKafka\Exception $e) { + echo "Caught exception when getting metadata after not setting or refreshing any token\n"; +} + +// Test that setting token without refreshing will get metadata successfully +$conf->setOauthbearerTokenRefreshCb(function ($producer) {}); +$producer = new \RdKafka\Producer($conf); +$token = generateJws(); +$producer->oauthbearerSetToken($token['value'], $token['expiryMs'], $token['principal']); +$topicName = sprintf("test_rdkafka_%s", uniqid()); +$topic = $producer->newTopic($topicName); +try { + $producer->getMetadata(false, $topic, 10*1000); + echo "Got metadata successfully\n"; +} catch (\RdKafka\Exception $e) { + echo "FAIL: Set token but still got exception \n"; + exit; +} + +// Test that token refresh is called after token expires +$conf->setOauthbearerTokenRefreshCb(function ($producer) { + echo "Refreshing token\n"; +}); +$producer = new \RdKafka\Producer($conf); +$token = generateJws('required-scope', 5); +$producer->oauthbearerSetToken($token['value'], $token['expiryMs'], $token['principal']); +$producer->poll(0); +echo "Polled with refresh\n"; +sleep(1); +$producer->poll(0); +echo "Polled without refresh\n"; +sleep(4); +$producer->poll(0); +echo "Polled with refresh\n"; + +// Test that tokens without required scope fail +$producer = new \RdKafka\Producer($conf); +$token = generateJws('not-required-scope'); +$producer->oauthbearerSetToken($token['value'], $token['expiryMs'], $token['principal']); +$topicName = sprintf("test_rdkafka_%s", uniqid()); +$topic = $producer->newTopic($topicName); +try { + $producer->getMetadata(false, $topic, 10*1000); + echo "FAIL: Exception not thrown as expected when using insufficient scope\n"; + exit; +} catch (\RdKafka\Exception $e) { + echo "Caught expected exception with insufficient_scope\n"; +} + +// Test that setting token with extensions succeeds +$conf->setOauthbearerTokenRefreshCb(function ($producer) {}); +$producer = new \RdKafka\Producer($conf); +$token = generateJws(); +$producer->oauthbearerSetToken($token['value'], $token['expiryMs'], $token['principal'], ['testExtensionKey' => 'Test extension value']); +$producer->poll(0); + +--EXPECT-- +Refreshing token and succeeding +Metadata retrieved successfully when refresh callback set token +Setting token failure in refresh cb +Local: Authentication failure: Failed to acquire SASL OAUTHBEARER token: Token failure before getting metadata +Caught exception when getting metadata after not setting or refreshing any token +Got metadata successfully +Refreshing token +Polled with refresh +Polled without refresh +Refreshing token +Polled with refresh +Caught expected exception with insufficient_scope diff --git a/tests/test_env.php.sample b/tests/test_env.php.sample index e43476aa..0d6be87a 100644 --- a/tests/test_env.php.sample +++ b/tests/test_env.php.sample @@ -4,6 +4,10 @@ if (false === getenv('TEST_KAFKA_BROKERS')) { putenv('TEST_KAFKA_BROKERS=localhost:9092'); } +if (false === getenv('TEST_KAFKA_OAUTH_BROKERS')) { + putenv('TEST_KAFKA_OAUTH_BROKERS=kafka_oauth2:29092'); +} + if (false === getenv('TEST_KAFKA_BROKER_VERSION')) { putenv('TEST_KAFKA_BROKER_VERSION=2.3'); }