diff --git a/config.m4 b/config.m4 index 356a1df..91d5f90 100644 --- a/config.m4 +++ b/config.m4 @@ -96,6 +96,7 @@ if test "$PHP_RDKAFKA" != "no"; then AC_CHECK_LIB($LIBNAME,[rd_kafka_oauthbearer_set_token],[ AC_DEFINE(HAS_RD_KAFKA_OAUTHBEARER,1,[ ]) + SOURCES="$SOURCES oauthbearer.c" ],[ AC_MSG_WARN([oauthbearer support is not available]) ]) diff --git a/kafka_consumer.c b/kafka_consumer.c index 2c0eddb..0d20159 100644 --- a/kafka_consumer.c +++ b/kafka_consumer.c @@ -31,6 +31,9 @@ #include "topic.h" #include "message.h" #include "metadata.h" +#ifdef HAS_RD_KAFKA_OAUTHBEARER +#include "oauthbearer.h" +#endif #if PHP_VERSION_ID < 80000 #include "kafka_consumer_legacy_arginfo.h" #else @@ -868,6 +871,90 @@ PHP_METHOD(RdKafka_KafkaConsumer, resumePartitions) } /* }}} */ +/* {{{ proto int RdKafka::poll(int $timeout_ms) + Polls the provided kafka handle for events */ +PHP_METHOD(RdKafka_KafkaConsumer, poll) +{ + object_intern *intern; + zend_long timeout; + + if (zend_parse_parameters(ZEND_NUM_ARGS(), "l", &timeout) == FAILURE) { + return; + } + + intern = get_object(getThis()); + if (!intern) { + return; + } + + RETURN_LONG(rd_kafka_poll(intern->rk, timeout)); +} +/* }}} */ + +#ifdef HAS_RD_KAFKA_OAUTHBEARER +/* {{{ proto void RdKafka\KafkaConsumer::oauthbearerSetToken(string $token_value, int $lifetime_ms, string $principal_name, array $extensions = []) + * Set SASL/OAUTHBEARER token and metadata + * + * The SASL/OAUTHBEARER token refresh callback or event handler should cause + * this method to be invoked upon success, via + * $consumer->oauthbearerSetToken(). The extension keys must not include the + * reserved key "`auth`", and all extension keys and values must conform to the + * required format as per https://tools.ietf.org/html/rfc7628#section-3.1: + * + * key = 1*(ALPHA) + * value = *(VCHAR / SP / HTAB / CR / LF ) +*/ +PHP_METHOD(RdKafka_KafkaConsumer, oauthbearerSetToken) +{ + object_intern *intern; + char *token_value; + size_t token_value_len; + zval *zlifetime_ms; + int64_t lifetime_ms; + char *principal_name; + size_t principal_len; + HashTable *extensions_hash = NULL; + + if (zend_parse_parameters(ZEND_NUM_ARGS(), "szs|h", &token_value, &token_value_len, &zlifetime_ms, &principal_name, &principal_len, &extensions_hash) == FAILURE) { + return; + } + + lifetime_ms = zval_to_int64(zlifetime_ms, "Argument #2 ($lifetime_ms) must be a valid integer"); + + intern = get_object(getThis()); + if (!intern) { + return; + } + + oauthbearer_set_token(intern->rk, token_value, lifetime_ms, principal_name, extensions_hash); +} +/* }}} */ + +/* {{{ proto void RdKafka::oauthbearerSetTokenFailure(string $error) + The SASL/OAUTHBEARER token refresh callback or event handler should cause + this method to be invoked upon failure, via + rd_kafka_oauthbearer_set_token_failure(). +*/ +PHP_METHOD(RdKafka_KafkaConsumer, oauthbearerSetTokenFailure) +{ + object_intern *intern; + const char *errstr; + size_t errstr_len; + + if (zend_parse_parameters(ZEND_NUM_ARGS(), "s", &errstr, &errstr_len) == FAILURE) { + return; + } + + intern = get_object(getThis()); + if (!intern) { + return; + } + + oauthbearer_set_token_failure(intern->rk, errstr); +} +/* }}} */ +#endif + void kafka_kafka_consumer_minit(INIT_FUNC_ARGS) /* {{{ */ { ce = register_class_RdKafka_KafkaConsumer(); diff --git a/kafka_consumer.stub.php b/kafka_consumer.stub.php index 8d21bbd..28d760e 100644 --- a/kafka_consumer.stub.php +++ b/kafka_consumer.stub.php @@ -81,4 +81,15 @@ public function pausePartitions(array $topic_partitions): array {} /** @tentative-return-type */ public function resumePartitions(array $topic_partitions): array {} + + /** @tentative-return-type */ + public function poll(int $timeout_ms): int {} + + #ifdef HAS_RD_KAFKA_OAUTHBEARER + /** @tentative-return-type */ + public function oauthbearerSetToken(string $token_value, int $lifetime_ms, string $principal_name, array $extensions = []): void {} + + /** @tentative-return-type */ + public function oauthbearerSetTokenFailure(string $error): void {} + #endif } diff --git a/kafka_consumer_arginfo.h b/kafka_consumer_arginfo.h index bff6420..3f4a6b2 100644 --- a/kafka_consumer_arginfo.h +++ b/kafka_consumer_arginfo.h @@ -1,5 +1,5 @@ /* This is a generated file, edit the .stub.php file instead. - * Stub hash: 19d0e5f9de1e91016dd8e8c87e88c3d17e0c094f */ + * Stub hash: c992beb679f8970a0d6af9fb06592d8c3c4cb232 */ ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer___construct, 0, 0, 1) ZEND_ARG_OBJ_INFO(0, conf, RdKafka\\Conf, 0) @@ -135,6 +135,38 @@ ZEND_END_ARG_INFO() #define arginfo_class_RdKafka_KafkaConsumer_resumePartitions arginfo_class_RdKafka_KafkaConsumer_getOffsetPositions +#if (PHP_VERSION_ID >= 80100) +ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_poll, 0, 1, IS_LONG, 0) +#else +ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_poll, 0, 0, 1) +#endif + ZEND_ARG_TYPE_INFO(0, timeout_ms, IS_LONG, 0) +ZEND_END_ARG_INFO() + +#if defined(HAS_RD_KAFKA_OAUTHBEARER) +#if (PHP_VERSION_ID >= 80100) +ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_oauthbearerSetToken, 0, 3, IS_VOID, 0) +#else +ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_oauthbearerSetToken, 0, 0, 3) +#endif + ZEND_ARG_TYPE_INFO(0, token_value, IS_STRING, 0) + ZEND_ARG_TYPE_INFO(0, lifetime_ms, IS_LONG, 0) + ZEND_ARG_TYPE_INFO(0, principal_name, IS_STRING, 0) + ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(0, extensions, IS_ARRAY, 0, "[]") +ZEND_END_ARG_INFO() +#endif + +#if defined(HAS_RD_KAFKA_OAUTHBEARER) +#if (PHP_VERSION_ID >= 80100) +ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_oauthbearerSetTokenFailure, 0, 1, IS_VOID, 0) +#else +ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_oauthbearerSetTokenFailure, 0, 0, 1) +#endif + ZEND_ARG_TYPE_INFO(0, error, IS_STRING, 0) +ZEND_END_ARG_INFO() +#endif + + ZEND_METHOD(RdKafka_KafkaConsumer, __construct); ZEND_METHOD(RdKafka_KafkaConsumer, assign); #if defined(HAS_RD_KAFKA_INCREMENTAL_ASSIGN) @@ -162,6 +194,14 @@ ZEND_METHOD(RdKafka_KafkaConsumer, queryWatermarkOffsets); ZEND_METHOD(RdKafka_KafkaConsumer, offsetsForTimes); ZEND_METHOD(RdKafka_KafkaConsumer, pausePartitions); ZEND_METHOD(RdKafka_KafkaConsumer, resumePartitions); +ZEND_METHOD(RdKafka_KafkaConsumer, poll); +#if defined(HAS_RD_KAFKA_OAUTHBEARER) +ZEND_METHOD(RdKafka_KafkaConsumer, oauthbearerSetToken); +#endif +#if defined(HAS_RD_KAFKA_OAUTHBEARER) +ZEND_METHOD(RdKafka_KafkaConsumer, oauthbearerSetTokenFailure); +#endif + static const zend_function_entry class_RdKafka_KafkaConsumer_methods[] = { ZEND_ME(RdKafka_KafkaConsumer, __construct, arginfo_class_RdKafka_KafkaConsumer___construct, ZEND_ACC_PUBLIC) @@ -191,6 +231,13 @@ static const zend_function_entry class_RdKafka_KafkaConsumer_methods[] = { ZEND_ME(RdKafka_KafkaConsumer, offsetsForTimes, arginfo_class_RdKafka_KafkaConsumer_offsetsForTimes, ZEND_ACC_PUBLIC) ZEND_ME(RdKafka_KafkaConsumer, pausePartitions, arginfo_class_RdKafka_KafkaConsumer_pausePartitions, ZEND_ACC_PUBLIC) ZEND_ME(RdKafka_KafkaConsumer, resumePartitions, arginfo_class_RdKafka_KafkaConsumer_resumePartitions, ZEND_ACC_PUBLIC) + ZEND_ME(RdKafka_KafkaConsumer, poll, arginfo_class_RdKafka_KafkaConsumer_poll, ZEND_ACC_PUBLIC) +#if defined(HAS_RD_KAFKA_OAUTHBEARER) + ZEND_ME(RdKafka_KafkaConsumer, oauthbearerSetToken, arginfo_class_RdKafka_KafkaConsumer_oauthbearerSetToken, ZEND_ACC_PUBLIC) +#endif +#if defined(HAS_RD_KAFKA_OAUTHBEARER) + ZEND_ME(RdKafka_KafkaConsumer, oauthbearerSetTokenFailure, arginfo_class_RdKafka_KafkaConsumer_oauthbearerSetTokenFailure, ZEND_ACC_PUBLIC) +#endif ZEND_FE_END }; @@ -199,11 +246,7 @@ static zend_class_entry *register_class_RdKafka_KafkaConsumer(void) zend_class_entry ce, *class_entry; INIT_NS_CLASS_ENTRY(ce, "RdKafka", "KafkaConsumer", class_RdKafka_KafkaConsumer_methods); -#if (PHP_VERSION_ID >= 80400) - class_entry = zend_register_internal_class_with_flags(&ce, NULL, 0); -#else class_entry = zend_register_internal_class_ex(&ce, NULL); -#endif zval property_error_cb_default_value; ZVAL_UNDEF(&property_error_cb_default_value); diff --git a/kafka_consumer_legacy_arginfo.h b/kafka_consumer_legacy_arginfo.h index 559fbfe..3c4b285 100644 --- a/kafka_consumer_legacy_arginfo.h +++ b/kafka_consumer_legacy_arginfo.h @@ -1,5 +1,5 @@ /* This is a generated file, edit the .stub.php file instead. - * Stub hash: 19d0e5f9de1e91016dd8e8c87e88c3d17e0c094f */ + * Stub hash: c992beb679f8970a0d6af9fb06592d8c3c4cb232 */ ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer___construct, 0, 0, 1) ZEND_ARG_INFO(0, conf) @@ -82,6 +82,24 @@ ZEND_END_ARG_INFO() #define arginfo_class_RdKafka_KafkaConsumer_resumePartitions arginfo_class_RdKafka_KafkaConsumer_getOffsetPositions +#define arginfo_class_RdKafka_KafkaConsumer_poll arginfo_class_RdKafka_KafkaConsumer_consume + +#if defined(HAS_RD_KAFKA_OAUTHBEARER) +ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_oauthbearerSetToken, 0, 0, 3) + ZEND_ARG_INFO(0, token_value) + ZEND_ARG_INFO(0, lifetime_ms) + ZEND_ARG_INFO(0, principal_name) + ZEND_ARG_INFO(0, extensions) +ZEND_END_ARG_INFO() +#endif + +#if defined(HAS_RD_KAFKA_OAUTHBEARER) +ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_oauthbearerSetTokenFailure, 0, 0, 1) + ZEND_ARG_INFO(0, error) +ZEND_END_ARG_INFO() +#endif + + ZEND_METHOD(RdKafka_KafkaConsumer, __construct); ZEND_METHOD(RdKafka_KafkaConsumer, assign); #if defined(HAS_RD_KAFKA_INCREMENTAL_ASSIGN) @@ -109,6 +127,14 @@ ZEND_METHOD(RdKafka_KafkaConsumer, queryWatermarkOffsets); ZEND_METHOD(RdKafka_KafkaConsumer, offsetsForTimes); ZEND_METHOD(RdKafka_KafkaConsumer, pausePartitions); ZEND_METHOD(RdKafka_KafkaConsumer, resumePartitions); +ZEND_METHOD(RdKafka_KafkaConsumer, poll); +#if defined(HAS_RD_KAFKA_OAUTHBEARER) +ZEND_METHOD(RdKafka_KafkaConsumer, oauthbearerSetToken); +#endif +#if defined(HAS_RD_KAFKA_OAUTHBEARER) +ZEND_METHOD(RdKafka_KafkaConsumer, oauthbearerSetTokenFailure); +#endif + static const zend_function_entry class_RdKafka_KafkaConsumer_methods[] = { ZEND_ME(RdKafka_KafkaConsumer, __construct, arginfo_class_RdKafka_KafkaConsumer___construct, ZEND_ACC_PUBLIC) @@ -138,6 +164,13 @@ static const zend_function_entry class_RdKafka_KafkaConsumer_methods[] = { ZEND_ME(RdKafka_KafkaConsumer, offsetsForTimes, arginfo_class_RdKafka_KafkaConsumer_offsetsForTimes, ZEND_ACC_PUBLIC) ZEND_ME(RdKafka_KafkaConsumer, pausePartitions, arginfo_class_RdKafka_KafkaConsumer_pausePartitions, ZEND_ACC_PUBLIC) ZEND_ME(RdKafka_KafkaConsumer, resumePartitions, arginfo_class_RdKafka_KafkaConsumer_resumePartitions, ZEND_ACC_PUBLIC) + ZEND_ME(RdKafka_KafkaConsumer, poll, arginfo_class_RdKafka_KafkaConsumer_poll, ZEND_ACC_PUBLIC) +#if defined(HAS_RD_KAFKA_OAUTHBEARER) + ZEND_ME(RdKafka_KafkaConsumer, oauthbearerSetToken, arginfo_class_RdKafka_KafkaConsumer_oauthbearerSetToken, ZEND_ACC_PUBLIC) +#endif +#if defined(HAS_RD_KAFKA_OAUTHBEARER) + ZEND_ME(RdKafka_KafkaConsumer, oauthbearerSetTokenFailure, arginfo_class_RdKafka_KafkaConsumer_oauthbearerSetTokenFailure, ZEND_ACC_PUBLIC) +#endif ZEND_FE_END }; @@ -146,11 +179,7 @@ static zend_class_entry *register_class_RdKafka_KafkaConsumer(void) zend_class_entry ce, *class_entry; INIT_NS_CLASS_ENTRY(ce, "RdKafka", "KafkaConsumer", class_RdKafka_KafkaConsumer_methods); -#if (PHP_VERSION_ID >= 80400) - class_entry = zend_register_internal_class_with_flags(&ce, NULL, 0); -#else class_entry = zend_register_internal_class_ex(&ce, NULL); -#endif zval property_error_cb_default_value; ZVAL_NULL(&property_error_cb_default_value); diff --git a/oauthbearer.c b/oauthbearer.c new file mode 100644 index 0000000..e692ef0 --- /dev/null +++ b/oauthbearer.c @@ -0,0 +1,143 @@ +/* ++----------------------------------------------------------------------+ + | php-rdkafka | + +----------------------------------------------------------------------+ + | Copyright (c) 2025 Arnaud Le Blanc | + +----------------------------------------------------------------------+ + | This source file is subject to version 3.01 of the PHP license, | + | that is bundled with this package in the file LICENSE, and is | + | available through the world-wide-web at the following url: | + | http://www.php.net/license/3_01.txt | + | If you did not receive a copy of the PHP license and are unable to | + | obtain it through the world-wide-web, please send a note to | + | license@php.net so we can mail you a copy immediately. | + +----------------------------------------------------------------------+ + | Author: Martin Fris | + +----------------------------------------------------------------------+ +*/ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include "php.h" +#include "php_rdkafka.h" +#include "php_rdkafka_priv.h" +#include "Zend/zend_exceptions.h" +#include "ext/spl/spl_exceptions.h" + +void oauthbearer_set_token( + rd_kafka_t *rk, + const char *token_value, + int64_t lifetime_ms, + const char *principal_name, + const HashTable *extensions_hash +) { + char errstr[512]; + rd_kafka_resp_err_t ret = 0; + + errstr[0] = '\0'; + + int extensions_size = 0; + char **extensions = NULL; + + if (extensions_hash != NULL) { + extensions_size = zend_hash_num_elements(extensions_hash) * 2; + extensions = safe_emalloc((extensions_size * 2), sizeof(char *), 0); + + int pos = 0; + zend_ulong num_key; + zend_string *extension_key_str; + zval *extension_zval; + ZEND_HASH_FOREACH_KEY_VAL((HashTable*)extensions_hash, num_key, extension_key_str, extension_zval) { + if (!extension_key_str) { + extension_key_str = zend_long_to_str(num_key); + extensions[pos++] = estrdup(ZSTR_VAL(extension_key_str)); + zend_string_release(extension_key_str); + } else { + extensions[pos++] = estrdup(ZSTR_VAL(extension_key_str)); + } + + zend_string *tmp_extension_val_str; + zend_string *extension_val_str = zval_get_tmp_string(extension_zval, &tmp_extension_val_str); + extensions[pos++] = estrdup(ZSTR_VAL(extension_val_str)); + if (tmp_extension_val_str) { + zend_string_release(tmp_extension_val_str); + } + } ZEND_HASH_FOREACH_END(); + } + + ret = rd_kafka_oauthbearer_set_token( + rk, + token_value, + lifetime_ms, + principal_name, + (const char **)extensions, + extensions_size, + errstr, + sizeof(errstr)); + + if (extensions != NULL) { + for (int i = 0; i < extensions_size; i++) { + efree(extensions[i]); + } + efree(extensions); + } + + switch (ret) { + case RD_KAFKA_RESP_ERR__INVALID_ARG: + zend_throw_exception(ce_kafka_exception, errstr, RD_KAFKA_RESP_ERR__INVALID_ARG); + return; + case RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED: + zend_throw_exception(ce_kafka_exception, errstr, RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED); + return; + case RD_KAFKA_RESP_ERR__STATE: + zend_throw_exception(ce_kafka_exception, errstr, RD_KAFKA_RESP_ERR__STATE); + return; + case RD_KAFKA_RESP_ERR_NO_ERROR: + break; + default: + return; + } +} + +void oauthbearer_set_token_failure(rd_kafka_t *rk, const char *errstr) { + rd_kafka_resp_err_t ret = rd_kafka_oauthbearer_set_token_failure(rk, errstr); + + switch (ret) { + case RD_KAFKA_RESP_ERR__INVALID_ARG: + zend_throw_exception(ce_kafka_exception, NULL, RD_KAFKA_RESP_ERR__INVALID_ARG); + return; + case RD_KAFKA_RESP_ERR__STATE: + zend_throw_exception(ce_kafka_exception, NULL, RD_KAFKA_RESP_ERR__STATE); + return; + case RD_KAFKA_RESP_ERR_NO_ERROR: + break; + default: + return; + } +} + +int64_t zval_to_int64(zval *zval, const char *errstr) { + int64_t converted; + + switch (Z_TYPE_P(zval)) { + case IS_LONG: + return (int64_t) Z_LVAL_P(zval); + break; + case IS_DOUBLE: + return (int64_t) Z_DVAL_P(zval); + break; + case IS_STRING:; + char *str = Z_STRVAL_P(zval); + char *end; + converted = (int64_t) strtoll(str, &end, 10); + if (end != str + Z_STRLEN_P(zval)) { + zend_throw_exception(spl_ce_InvalidArgumentException, errstr, 0); + return 0; + } + return converted; + break; + EMPTY_SWITCH_DEFAULT_CASE(); + } +} diff --git a/oauthbearer.h b/oauthbearer.h new file mode 100644 index 0000000..9c7b466 --- /dev/null +++ b/oauthbearer.h @@ -0,0 +1,30 @@ +/* ++----------------------------------------------------------------------+ + | php-rdkafka | + +----------------------------------------------------------------------+ + | Copyright (c) 2025 Arnaud Le Blanc | + +----------------------------------------------------------------------+ + | This source file is subject to version 3.01 of the PHP license, | + | that is bundled with this package in the file LICENSE, and is | + | available through the world-wide-web at the following url: | + | http://www.php.net/license/3_01.txt | + | If you did not receive a copy of the PHP license and are unable to | + | obtain it through the world-wide-web, please send a note to | + | license@php.net so we can mail you a copy immediately. | + +----------------------------------------------------------------------+ + | Author: Martin Fris | + +----------------------------------------------------------------------+ +*/ + +#include "librdkafka/rdkafka.h" + +void oauthbearer_set_token(rd_kafka_t *rk, + const char *token_value, + int64_t lifetime_ms, + const char *principal_name, + const HashTable *extensions_hash +); + +void oauthbearer_set_token_failure(rd_kafka_t *rk, const char *errstr); + +int64_t zval_to_int64(zval *zval, const char *errstr); diff --git a/package.xml b/package.xml index cf03069..fc8f2ce 100644 --- a/package.xml +++ b/package.xml @@ -85,6 +85,8 @@ + + @@ -136,6 +138,7 @@ + diff --git a/rdkafka.c b/rdkafka.c index ef5656a..94ce917 100644 --- a/rdkafka.c +++ b/rdkafka.c @@ -40,6 +40,9 @@ #include "queue.h" #include "message.h" #include "kafka_consumer.h" +#ifdef HAS_RD_KAFKA_OAUTHBEARER +#include "oauthbearer.h" +#endif #include "topic_partition.h" #if PHP_VERSION_ID < 80000 #include "rdkafka_legacy_arginfo.h" @@ -455,100 +458,20 @@ PHP_METHOD(RdKafka, oauthbearerSetToken) size_t principal_len; HashTable *extensions_hash = NULL; - char errstr[512]; - rd_kafka_resp_err_t ret = 0; - if (zend_parse_parameters(ZEND_NUM_ARGS(), "szs|h", &token_value, &token_value_len, &zlifetime_ms, &principal_name, &principal_len, &extensions_hash) == FAILURE) { return; } /* On 32-bits, it might be required to pass $lifetime_ms as a float or a * string */ - switch (Z_TYPE_P(zlifetime_ms)) { - case IS_LONG: - lifetime_ms = (int64_t) Z_LVAL_P(zlifetime_ms); - break; - case IS_DOUBLE: - lifetime_ms = (int64_t) Z_DVAL_P(zlifetime_ms); - break; - case IS_STRING:; - char *str = Z_STRVAL_P(zlifetime_ms); - char *end; - lifetime_ms = (int64_t) strtoll(str, &end, 10); - if (end != str + Z_STRLEN_P(zlifetime_ms)) { - zend_throw_exception_ex(spl_ce_InvalidArgumentException, 0, "Argument #2 ($lifetime_ms) must be a valid integer"); - return; - } - break; - EMPTY_SWITCH_DEFAULT_CASE(); - } + lifetime_ms = zval_to_int64(zlifetime_ms, "Argument #2 ($lifetime_ms) must be a valid integer"); intern = get_kafka_object(getThis()); if (!intern) { return; } - errstr[0] = '\0'; - - int extensions_size = 0; - char **extensions = NULL; - - if (extensions_hash != NULL) { - extensions_size = zend_hash_num_elements(extensions_hash) * 2; - extensions = safe_emalloc((extensions_size * 2), sizeof(char *), 0); - - int pos = 0; - zend_ulong num_key; - zend_string *extension_key_str; - zval *extension_zval; - ZEND_HASH_FOREACH_KEY_VAL(extensions_hash, num_key, extension_key_str, extension_zval) { - if (!extension_key_str) { - extension_key_str = zend_long_to_str(num_key); - extensions[pos++] = estrdup(ZSTR_VAL(extension_key_str)); - zend_string_release(extension_key_str); - } else { - extensions[pos++] = estrdup(ZSTR_VAL(extension_key_str)); - } - - zend_string *tmp_extension_val_str; - zend_string *extension_val_str = zval_get_tmp_string(extension_zval, &tmp_extension_val_str); - extensions[pos++] = estrdup(ZSTR_VAL(extension_val_str)); - zend_tmp_string_release(tmp_extension_val_str); - } ZEND_HASH_FOREACH_END(); - } - - ret = rd_kafka_oauthbearer_set_token( - intern->rk, - token_value, - lifetime_ms, - principal_name, - (const char **)extensions, - extensions_size, - errstr, - sizeof(errstr)); - - if (extensions != NULL) { - for (int i = 0; i < extensions_size; i++) { - efree(extensions[i]); - } - efree(extensions); - } - - switch (ret) { - case RD_KAFKA_RESP_ERR__INVALID_ARG: - zend_throw_exception(ce_kafka_exception, errstr, RD_KAFKA_RESP_ERR__INVALID_ARG); - return; - case RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED: - zend_throw_exception(ce_kafka_exception, errstr, RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED); - return; - case RD_KAFKA_RESP_ERR__STATE: - zend_throw_exception(ce_kafka_exception, errstr, RD_KAFKA_RESP_ERR__STATE); - return; - case RD_KAFKA_RESP_ERR_NO_ERROR: - break; - default: - return; - } + oauthbearer_set_token(intern->rk, token_value, lifetime_ms, principal_name, extensions_hash); } /* }}} */ @@ -572,20 +495,7 @@ PHP_METHOD(RdKafka, oauthbearerSetTokenFailure) return; } - rd_kafka_resp_err_t ret = rd_kafka_oauthbearer_set_token_failure(intern->rk, errstr); - - switch (ret) { - case RD_KAFKA_RESP_ERR__INVALID_ARG: - zend_throw_exception(ce_kafka_exception, NULL, RD_KAFKA_RESP_ERR__INVALID_ARG); - return; - case RD_KAFKA_RESP_ERR__STATE: - zend_throw_exception(ce_kafka_exception, NULL, RD_KAFKA_RESP_ERR__STATE); - return; - case RD_KAFKA_RESP_ERR_NO_ERROR: - break; - default: - return; - } + oauthbearer_set_token_failure(intern->rk, errstr); } /* }}} */ #endif diff --git a/tests/oauthbearer_integration_hl.phpt b/tests/oauthbearer_integration_hl.phpt new file mode 100644 index 0000000..fbe2cba --- /dev/null +++ b/tests/oauthbearer_integration_hl.phpt @@ -0,0 +1,146 @@ +--TEST-- +Oauthbearer +--SKIPIF-- += 0x01010000 || die("skip librdkafka too old does not support oauthbearer"); +--FILE-- + $jws, + 'principal' => $principal, + 'expiryMs' => $expiryMs, + ]; +} + +// Set up tests +$conf = new RdKafka\Conf(); +if (RD_KAFKA_VERSION >= 0x090000 && false !== getenv('TEST_KAFKA_BROKER_VERSION')) { + $conf->set('broker.version.fallback', getenv('TEST_KAFKA_BROKER_VERSION')); +} +$conf->set('metadata.broker.list', getenv('TEST_KAFKA_OAUTH_BROKERS')); +$conf->set('security.protocol', 'SASL_PLAINTEXT'); +$conf->set('sasl.mechanisms', 'OAUTHBEARER'); +$conf->set('sasl.oauthbearer.config', 'principal=admin'); +$conf->setLogCb(function ($kafka, $level, $facility, $message) {}); +$conf->setErrorCb(function ($producer, $err, $errstr) { + printf("%s: %s\n", rd_kafka_err2str($err), $errstr); +}); + +// Test that refresh token with setting token accurately will succeed when getting metadata +$conf->setOauthbearerTokenRefreshCb(function ($producer) { + echo "Refreshing token and succeeding\n"; + $token = generateJws(); + $producer->oauthbearerSetToken($token['value'], (string) $token['expiryMs'], $token['principal']); +}); +$producer = new \RdKafka\Producer($conf); +$producer->poll(0); +$topicName = sprintf("test_rdkafka_%s", uniqid()); +$topic = $producer->newTopic($topicName); + +echo "Writing test data\n"; +$topic->produce(RD_KAFKA_PARTITION_UA, 0, "Test"); +$producer->poll(0); +$result = $producer->flush(10000); +echo RD_KAFKA_RESP_ERR_NO_ERROR === $result ? "Write successful\n" : "Write error\n"; + +$confConsumer = new RdKafka\Conf(); +if (RD_KAFKA_VERSION >= 0x090000 && false !== getenv('TEST_KAFKA_BROKER_VERSION')) { + $confConsumer->set('broker.version.fallback', getenv('TEST_KAFKA_BROKER_VERSION')); +} +$confConsumer->set('metadata.broker.list', getenv('TEST_KAFKA_OAUTH_BROKERS')); +$confConsumer->set('security.protocol', 'SASL_PLAINTEXT'); +$confConsumer->set('sasl.mechanisms', 'OAUTHBEARER'); +$confConsumer->set('sasl.oauthbearer.config', 'principal=admin'); +$confConsumer->set('group.id', 'test_group'); +$confConsumer->set('auto.offset.reset', 'earliest'); +$confConsumer->setLogCb(function ($kafka, $level, $facility, $message) {}); +$confConsumer->setErrorCb(function ($producer, $err, $errstr) { + printf("%s: %s\n", rd_kafka_err2str($err), $errstr); +}); + +// Test that refresh token with setting token accurately will succeed when consuming data +$confConsumer->setOauthbearerTokenRefreshCb(function ($consumer) { + echo "Refreshing token and succeeding\n"; + $token = generateJws(); + $consumer->oauthbearerSetToken($token['value'], (string) $token['expiryMs'], $token['principal']); +}); + +$consumer = new \RdKafka\KafkaConsumer($confConsumer); +$consumer->subscribe([$topicName]); +echo "Reading data\n"; +$message = $consumer->consume(10*1000); +echo ($message->err === RD_KAFKA_RESP_ERR_NO_ERROR) ? "Read successful\n" : "Read Error: " . $message->errstr() . "\n"; +echo $message->payload . "\n"; + +// Test that refresh token with setting token failure will fail when trying to read data +$confConsumer->setOauthbearerTokenRefreshCb(function ($consumer) { + echo "Setting token failure in refresh cb\n"; + $consumer->oauthbearerSetTokenFailure('Token failure before data consumption'); +}); +$confConsumer->set('group.id', 'test_group_fail'); + +$consumer = new \RdKafka\KafkaConsumer($confConsumer); +$consumer->subscribe([$topicName]); +echo "Reading data\n"; + +$message = $consumer->consume(10*1000); +echo $message->err === -185 ? "Received empty message when reading data after not setting or refreshing any token\n" : + "FAIL: Did receive a message after not setting or refreshing any token\n"; + +// Test that metadata will be loaded before data consumption, under the condition that poll is called +$confConsumer->setOauthbearerTokenRefreshCb(function ($consumer) { + echo "Refreshing token on poll and succeeding\n"; + $token = generateJws(); + $consumer->oauthbearerSetToken($token['value'], (string) $token['expiryMs'], $token['principal']); +}); +$consumer = new \RdKafka\KafkaConsumer($confConsumer); +$consumerTopic = $consumer->newTopic($topicName); +$consumer->poll(0); + +try { + echo "Reading metadata\n"; + $consumer->getMetadata(false, $consumerTopic, 10*1000); + echo "Metadata was fetched successfully after calling poll\n"; +} catch (\RdKafka\Exception $e) { + echo "FAIL: Caught exception when getting metadata after calling poll:\n"; + echo $e, "\n"; +} + +--EXPECT-- +Refreshing token and succeeding +Writing test data +Write successful +Reading data +Refreshing token and succeeding +Read successful +Test +Reading data +Setting token failure in refresh cb +Local: Authentication failure: Failed to acquire SASL OAUTHBEARER token: Token failure before data consumption +Received empty message when reading data after not setting or refreshing any token +Refreshing token on poll and succeeding +Reading metadata +Metadata was fetched successfully after calling poll