From 123ebbf2ae6f946cd7687d79b6c585fb517806e1 Mon Sep 17 00:00:00 2001 From: Rastusik <1735097+Rastusik@users.noreply.github.com> Date: Fri, 11 Apr 2025 09:03:41 +0100 Subject: [PATCH 01/15] OAUTHBEARER support for high level consumer --- kafka_consumer.c | 139 +++++++++++++++++++ kafka_consumer.stub.php | 8 ++ kafka_consumer_arginfo.h | 29 ++++ tests/oauthbearer_integration_hl.phpt | 192 ++++++++++++++++++++++++++ 4 files changed, 368 insertions(+) create mode 100644 tests/oauthbearer_integration_hl.phpt diff --git a/kafka_consumer.c b/kafka_consumer.c index 2c0eddb9..c218d879 100644 --- a/kafka_consumer.c +++ b/kafka_consumer.c @@ -868,6 +868,145 @@ PHP_METHOD(RdKafka_KafkaConsumer, resumePartitions) } /* }}} */ +#ifdef HAS_RD_KAFKA_OAUTHBEARER +/* {{{ proto void RdKafka::oauthbearerSetToken(string $token_value, int $lifetime_ms, string $principal_name, array $extensions = []) + * Set SASL/OAUTHBEARER token and metadata + * + * The SASL/OAUTHBEARER token refresh callback or event handler should cause + * this method to be invoked upon success, via + * $kafka->oauthbearerSetToken(). The extension keys must not include the + * reserved key "`auth`", and all extension keys and values must conform to the + * required format as per https://tools.ietf.org/html/rfc7628#section-3.1: + * + * key = 1*(ALPHA) + * value = *(VCHAR / SP / HTAB / CR / LF ) +*/ +PHP_METHOD(RdKafka_KafkaConsumer, oauthbearerSetToken) +{ + object_intern *intern; + char *token_value; + size_t token_value_len; + zend_long lifetime_ms; + char *principal_name; + size_t principal_len; + HashTable *extensions_hash = NULL; + + char errstr[512]; + rd_kafka_resp_err_t ret = 0; + + if (zend_parse_parameters(ZEND_NUM_ARGS(), "sls|h", &token_value, &token_value_len, &lifetime_ms, &principal_name, &principal_len, &extensions_hash) == FAILURE) { + return; + } + + intern = get_object(getThis()); + if (!intern) { + return; + } + + errstr[0] = '\0'; + + int extensions_size = 0; + char **extensions = NULL; + + if (extensions_hash != NULL) { + extensions_size = zend_hash_num_elements(extensions_hash) * 2; + extensions = safe_emalloc((extensions_size * 2), sizeof(char *), 0); + + int pos = 0; + zend_ulong num_key; + zend_string *extension_key_str; + zval *extension_zval; + ZEND_HASH_FOREACH_KEY_VAL(extensions_hash, num_key, extension_key_str, extension_zval) { + if (!extension_key_str) { + extension_key_str = zend_long_to_str(num_key); + extensions[pos++] = estrdup(ZSTR_VAL(extension_key_str)); + zend_string_release(extension_key_str); + } else { + extensions[pos++] = estrdup(ZSTR_VAL(extension_key_str)); + } + + zend_string *tmp_extension_val_str; + zend_string *extension_val_str = zval_get_tmp_string(extension_zval, &tmp_extension_val_str); + extensions[pos++] = estrdup(ZSTR_VAL(extension_val_str)); + if (tmp_extension_val_str) { + zend_string_release(tmp_extension_val_str); + } + } ZEND_HASH_FOREACH_END(); + } + + ret = rd_kafka_oauthbearer_set_token( + intern->rk, + token_value, + lifetime_ms, + principal_name, + (const char **)extensions, + extensions_size, + errstr, + sizeof(errstr)); + + if (extensions != NULL) { + for (int i = 0; i < extensions_size; i++) { + efree(extensions[i]); + } + efree(extensions); + } + + switch (ret) { + case RD_KAFKA_RESP_ERR__INVALID_ARG: + zend_throw_exception(ce_kafka_exception, errstr, RD_KAFKA_RESP_ERR__INVALID_ARG); + return; + case RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED: + zend_throw_exception(ce_kafka_exception, errstr, RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED); + return; + case RD_KAFKA_RESP_ERR__STATE: + zend_throw_exception(ce_kafka_exception, errstr, RD_KAFKA_RESP_ERR__STATE); + return; + case RD_KAFKA_RESP_ERR_NO_ERROR: + break; + default: + return; + } +} +/* }}} */ + +/* {{{ proto void RdKafka::oauthbearerSetTokenFailure(string $error) + The SASL/OAUTHBEARER token refresh callback or event handler should cause + this method to be invoked upon failure, via + rd_kafka_oauthbearer_set_token_failure(). +*/ +PHP_METHOD(RdKafka_KafkaConsumer, oauthbearerSetTokenFailure) +{ + object_intern *intern; + const char *errstr; + size_t errstr_len; + + if (zend_parse_parameters(ZEND_NUM_ARGS(), "s", &errstr, &errstr_len) == FAILURE) { + return; + } + + intern = get_object(getThis()); + if (!intern) { + return; + } + + rd_kafka_resp_err_t ret = rd_kafka_oauthbearer_set_token_failure(intern->rk, errstr); + + switch (ret) { + case RD_KAFKA_RESP_ERR__INVALID_ARG: + zend_throw_exception(ce_kafka_exception, NULL, RD_KAFKA_RESP_ERR__INVALID_ARG); + return; + case RD_KAFKA_RESP_ERR__STATE: + zend_throw_exception(ce_kafka_exception, NULL, RD_KAFKA_RESP_ERR__STATE); + return; + case RD_KAFKA_RESP_ERR_NO_ERROR: + break; + default: + return; + } +} +/* }}} */ +#endif + void kafka_kafka_consumer_minit(INIT_FUNC_ARGS) /* {{{ */ { ce = register_class_RdKafka_KafkaConsumer(); diff --git a/kafka_consumer.stub.php b/kafka_consumer.stub.php index 8d21bbd6..977904bd 100644 --- a/kafka_consumer.stub.php +++ b/kafka_consumer.stub.php @@ -81,4 +81,12 @@ public function pausePartitions(array $topic_partitions): array {} /** @tentative-return-type */ public function resumePartitions(array $topic_partitions): array {} + + #ifdef HAS_RD_KAFKA_OAUTHBEARER + /** @tentative-return-type */ + public function oauthbearerSetToken(string $token_value, int $lifetime_ms, string $principal_name, array $extensions = []): void {} + + /** @tentative-return-type */ + public function oauthbearerSetTokenFailure(string $error): void {} + #endif } diff --git a/kafka_consumer_arginfo.h b/kafka_consumer_arginfo.h index bff6420c..f0b22645 100644 --- a/kafka_consumer_arginfo.h +++ b/kafka_consumer_arginfo.h @@ -129,6 +129,27 @@ ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_queryWatermarkOffsets ZEND_ARG_TYPE_INFO(0, timeout_ms, IS_LONG, 0) ZEND_END_ARG_INFO() +#if defined(HAS_RD_KAFKA_OAUTHBEARER) +#if (PHP_VERSION_ID >= 80100) +ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_oauthbearerSetToken, 0, 3, IS_VOID, 0) +#else +ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_oauthbearerSetToken, 0, 0, 3) +#endif + ZEND_ARG_TYPE_INFO(0, token_value, IS_STRING, 0) + ZEND_ARG_TYPE_INFO(0, lifetime_ms, IS_LONG, 0) + ZEND_ARG_TYPE_INFO(0, principal_name, IS_STRING, 0) + ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(0, extensions, IS_ARRAY, 0, "[]") +ZEND_END_ARG_INFO() + +#if (PHP_VERSION_ID >= 80100) +ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_oauthbearerSetTokenFailure, 0, 1, IS_VOID, 0) +#else +ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_oauthbearerSetTokenFailure, 0, 0, 1) +#endif + ZEND_ARG_TYPE_INFO(0, error, IS_STRING, 0) +ZEND_END_ARG_INFO() +#endif + #define arginfo_class_RdKafka_KafkaConsumer_offsetsForTimes arginfo_class_RdKafka_KafkaConsumer_getCommittedOffsets #define arginfo_class_RdKafka_KafkaConsumer_pausePartitions arginfo_class_RdKafka_KafkaConsumer_getOffsetPositions @@ -162,6 +183,10 @@ ZEND_METHOD(RdKafka_KafkaConsumer, queryWatermarkOffsets); ZEND_METHOD(RdKafka_KafkaConsumer, offsetsForTimes); ZEND_METHOD(RdKafka_KafkaConsumer, pausePartitions); ZEND_METHOD(RdKafka_KafkaConsumer, resumePartitions); +#if defined(HAS_RD_KAFKA_OAUTHBEARER) +ZEND_METHOD(RdKafka_KafkaConsumer, oauthbearerSetToken); +ZEND_METHOD(RdKafka_KafkaConsumer, oauthbearerSetTokenFailure); +#endif static const zend_function_entry class_RdKafka_KafkaConsumer_methods[] = { ZEND_ME(RdKafka_KafkaConsumer, __construct, arginfo_class_RdKafka_KafkaConsumer___construct, ZEND_ACC_PUBLIC) @@ -191,6 +216,10 @@ static const zend_function_entry class_RdKafka_KafkaConsumer_methods[] = { ZEND_ME(RdKafka_KafkaConsumer, offsetsForTimes, arginfo_class_RdKafka_KafkaConsumer_offsetsForTimes, ZEND_ACC_PUBLIC) ZEND_ME(RdKafka_KafkaConsumer, pausePartitions, arginfo_class_RdKafka_KafkaConsumer_pausePartitions, ZEND_ACC_PUBLIC) ZEND_ME(RdKafka_KafkaConsumer, resumePartitions, arginfo_class_RdKafka_KafkaConsumer_resumePartitions, ZEND_ACC_PUBLIC) +#if defined(HAS_RD_KAFKA_OAUTHBEARER) + ZEND_ME(RdKafka_KafkaConsumer, oauthbearerSetToken, arginfo_class_RdKafka_KafkaConsumer_oauthbearerSetToken, ZEND_ACC_PUBLIC) + ZEND_ME(RdKafka_KafkaConsumer, oauthbearerSetTokenFailure, arginfo_class_RdKafka_KafkaConsumer_oauthbearerSetTokenFailure, ZEND_ACC_PUBLIC) +#endif ZEND_FE_END }; diff --git a/tests/oauthbearer_integration_hl.phpt b/tests/oauthbearer_integration_hl.phpt new file mode 100644 index 00000000..08fc2898 --- /dev/null +++ b/tests/oauthbearer_integration_hl.phpt @@ -0,0 +1,192 @@ +--TEST-- +Oauthbearer +--SKIPIF-- += 0x01010000 || die("skip librdkafka too old does not support oauthbearer"); +--FILE-- + $jws, + 'principal' => $principal, + 'expiryMs' => $expiryMs, + ]; +} + +// Set up tests +$conf = new RdKafka\Conf(); +if (RD_KAFKA_VERSION >= 0x090000 && false !== getenv('TEST_KAFKA_BROKER_VERSION')) { + $conf->set('broker.version.fallback', getenv('TEST_KAFKA_BROKER_VERSION')); +} +$conf->set('metadata.broker.list', getenv('TEST_KAFKA_OAUTH_BROKERS')); +$conf->set('security.protocol', 'SASL_PLAINTEXT'); +$conf->set('sasl.mechanisms', 'OAUTHBEARER'); +$conf->set('sasl.oauthbearer.config', 'principal=admin'); +$conf->setLogCb(function ($kafka, $level, $facility, $message) {}); +$conf->setErrorCb(function ($producer, $err, $errstr) { + printf("%s: %s\n", rd_kafka_err2str($err), $errstr); +}); + +// Test that refresh token with setting token accurately will succeed when getting metadata +$conf->setOauthbearerTokenRefreshCb(function ($producer) { + echo "Refreshing token and succeeding\n"; + $token = generateJws(); + $producer->oauthbearerSetToken($token['value'], (string) $token['expiryMs'], $token['principal']); +}); +$producer = new \RdKafka\Producer($conf); +$producer->poll(0); +$topicName = sprintf("test_rdkafka_%s", uniqid()); +$topic = $producer->newTopic($topicName); + +try { + $producer->getMetadata(false, $topic, 10*1000); + echo "Metadata retrieved successfully when refresh callback set token\n"; +} catch (\RdKafka\Exception $e) { + echo "FAIL: Caught exception when getting metadata after successfully refreshing any token:\n"; + printf("%s: %s\n", get_class($e), $e->getMessage()); +} + +echo "Writing test data\n"; +$topic->produce(RD_KAFKA_PARTITION_UA, 0, "Test"); +$producer->poll(0); +$result = $producer->flush(10000); +echo RD_KAFKA_RESP_ERR_NO_ERROR === $result ? "Write successful\n" : "Write error\n"; + +$confConsumer = new RdKafka\Conf(); +if (RD_KAFKA_VERSION >= 0x090000 && false !== getenv('TEST_KAFKA_BROKER_VERSION')) { + $confConsumer->set('broker.version.fallback', getenv('TEST_KAFKA_BROKER_VERSION')); +} +$confConsumer->set('metadata.broker.list', getenv('TEST_KAFKA_OAUTH_BROKERS')); +$confConsumer->set('security.protocol', 'SASL_PLAINTEXT'); +$confConsumer->set('sasl.mechanisms', 'OAUTHBEARER'); +$confConsumer->set('sasl.oauthbearer.config', 'principal=admin'); +$confConsumer->set('group.id', 'test_group'); +$confConsumer->set('auto.offset.reset', 'earliest'); +$confConsumer->setLogCb(function ($kafka, $level, $facility, $message) {}); +$confConsumer->setErrorCb(function ($producer, $err, $errstr) { + printf("%s: %s\n", rd_kafka_err2str($err), $errstr); +}); + +// Test that refresh token with setting token accurately will succeed when getting metadata +$confConsumer->setOauthbearerTokenRefreshCb(function ($consumer) { + echo "Refreshing token and succeeding\n"; + $token = generateJws(); + $consumer->oauthbearerSetToken($token['value'], (string) $token['expiryMs'], $token['principal']); +}); + +$consumer = new \RdKafka\KafkaConsumer($confConsumer); +$consumer->subscribe([$topicName]); +echo "Reading data\n"; +$message = $consumer->consume(5500); +echo ($message->err === RD_KAFKA_RESP_ERR_NO_ERROR) ? "Read successful\n" : "Read Error\n"; +echo $message->payload . "\n"; + +//// Test that refresh token with setting token failure will fail when getting metadata +//$conf->setOauthbearerTokenRefreshCb(function ($producer) { +// echo "Setting token failure in refresh cb\n"; +// $producer->oauthbearerSetTokenFailure('Token failure before getting metadata'); +// $producer->poll(0); +//}); +//$producer = new \RdKafka\Producer($conf); +//$producer->poll(0); +//$topicName = sprintf("test_rdkafka_%s", uniqid()); +//$topic = $producer->newTopic($topicName); +//try { +// $producer->getMetadata(false, $topic, 10*1000); +// echo "FAIL: Did not catch exception after not setting or refreshing any token\n"; +//} catch (\RdKafka\Exception $e) { +// echo "Caught exception when getting metadata after not setting or refreshing any token\n"; +//} +// +//// Test that setting token without refreshing will get metadata successfully +//$conf->setOauthbearerTokenRefreshCb(function ($producer) {}); +//$producer = new \RdKafka\Producer($conf); +//$token = generateJws(); +//$producer->oauthbearerSetToken($token['value'], $token['expiryMs'], $token['principal']); +//$topicName = sprintf("test_rdkafka_%s", uniqid()); +//$topic = $producer->newTopic($topicName); +//try { +// $producer->getMetadata(false, $topic, 10*1000); +// echo "Got metadata successfully\n"; +//} catch (\RdKafka\Exception $e) { +// echo "FAIL: Set token but still got exception \n"; +// exit; +//} +// +//// Test that token refresh is called after token expires +//$conf->setOauthbearerTokenRefreshCb(function ($producer) { +// echo "Refreshing token\n"; +//}); +//$producer = new \RdKafka\Producer($conf); +//$token = generateJws('required-scope', 5); +//$producer->oauthbearerSetToken($token['value'], $token['expiryMs'], $token['principal']); +//$producer->poll(0); +//echo "Polled with refresh\n"; +//sleep(1); +//$producer->poll(0); +//echo "Polled without refresh\n"; +//sleep(4); +//$producer->poll(0); +//echo "Polled with refresh\n"; +// +//// Test that tokens without required scope fail +//$producer = new \RdKafka\Producer($conf); +//$token = generateJws('not-required-scope'); +//$producer->oauthbearerSetToken($token['value'], $token['expiryMs'], $token['principal']); +//$topicName = sprintf("test_rdkafka_%s", uniqid()); +//$topic = $producer->newTopic($topicName); +//try { +// $producer->getMetadata(false, $topic, 10*1000); +// echo "FAIL: Exception not thrown as expected when using insufficient scope\n"; +// exit; +//} catch (\RdKafka\Exception $e) { +// echo "Caught expected exception with insufficient_scope\n"; +//} +// +//// Test that setting token with extensions succeeds +//$conf->setOauthbearerTokenRefreshCb(function ($producer) {}); +//$producer = new \RdKafka\Producer($conf); +//$token = generateJws(); +//$producer->oauthbearerSetToken($token['value'], $token['expiryMs'], $token['principal'], ['testExtensionKey' => 'Test extension value']); +//$producer->poll(0); + +//Setting token failure in refresh cb +//Local: Authentication failure: Failed to acquire SASL OAUTHBEARER token: Token failure before getting metadata +//Caught exception when getting metadata after not setting or refreshing any token--EXPECT-- +//Got metadata successfullyRefreshing token and succeeding +//Refreshing tokenMetadata retrieved successfully when refresh callback set token +//Polled with refreshWriting test data +//Polled without refreshWrite successful +//Refreshing token + +--EXPECT-- +Refreshing token and succeeding +Metadata retrieved successfully when refresh callback set token +Writing test data +Write successful +Reading data +Refreshing token and succeeding +Read successful +Test From 34acde8243d5229fbfc79594551a87b3651cc133 Mon Sep 17 00:00:00 2001 From: Rastusik <1735097+Rastusik@users.noreply.github.com> Date: Tue, 15 Apr 2025 13:31:40 +0200 Subject: [PATCH 02/15] cleanup --- config.m4 | 1 + kafka_consumer.c | 96 ++-------------- oauthbearer.c | 155 ++++++++++++++++++++++++++ oauthbearer.h | 30 +++++ rdkafka.c | 101 +---------------- tests/oauthbearer_integration_hl.phpt | 95 +++------------- 6 files changed, 222 insertions(+), 256 deletions(-) create mode 100644 oauthbearer.c create mode 100644 oauthbearer.h diff --git a/config.m4 b/config.m4 index 356a1df3..91d5f90d 100644 --- a/config.m4 +++ b/config.m4 @@ -96,6 +96,7 @@ if test "$PHP_RDKAFKA" != "no"; then AC_CHECK_LIB($LIBNAME,[rd_kafka_oauthbearer_set_token],[ AC_DEFINE(HAS_RD_KAFKA_OAUTHBEARER,1,[ ]) + SOURCES="$SOURCES oauthbearer.c" ],[ AC_MSG_WARN([oauthbearer support is not available]) ]) diff --git a/kafka_consumer.c b/kafka_consumer.c index c218d879..4d91b792 100644 --- a/kafka_consumer.c +++ b/kafka_consumer.c @@ -31,6 +31,9 @@ #include "topic.h" #include "message.h" #include "metadata.h" +#ifdef HAS_RD_KAFKA_OAUTHBEARER +#include "oauthbearer.h" +#endif #if PHP_VERSION_ID < 80000 #include "kafka_consumer_legacy_arginfo.h" #else @@ -869,12 +872,12 @@ PHP_METHOD(RdKafka_KafkaConsumer, resumePartitions) /* }}} */ #ifdef HAS_RD_KAFKA_OAUTHBEARER -/* {{{ proto void RdKafka::oauthbearerSetToken(string $token_value, int $lifetime_ms, string $principal_name, array $extensions = []) +/* {{{ proto void RdKafka\KafkaConsumer::oauthbearerSetToken(string $token_value, int $lifetime_ms, string $principal_name, array $extensions = []) * Set SASL/OAUTHBEARER token and metadata * * The SASL/OAUTHBEARER token refresh callback or event handler should cause * this method to be invoked upon success, via - * $kafka->oauthbearerSetToken(). The extension keys must not include the + * $consumer->oauthbearerSetToken(). The extension keys must not include the * reserved key "`auth`", and all extension keys and values must conform to the * required format as per https://tools.ietf.org/html/rfc7628#section-3.1: * @@ -886,86 +889,24 @@ PHP_METHOD(RdKafka_KafkaConsumer, oauthbearerSetToken) object_intern *intern; char *token_value; size_t token_value_len; - zend_long lifetime_ms; + zval *zlifetime_ms; + int64_t lifetime_ms; char *principal_name; size_t principal_len; HashTable *extensions_hash = NULL; - char errstr[512]; - rd_kafka_resp_err_t ret = 0; - - if (zend_parse_parameters(ZEND_NUM_ARGS(), "sls|h", &token_value, &token_value_len, &lifetime_ms, &principal_name, &principal_len, &extensions_hash) == FAILURE) { + if (zend_parse_parameters(ZEND_NUM_ARGS(), "szs|h", &token_value, &token_value_len, &zlifetime_ms, &principal_name, &principal_len, &extensions_hash) == FAILURE) { return; } + lifetime_ms = zval_to_int64(zlifetime_ms, "Argument #2 ($lifetime_ms) must be a valid integer"); + intern = get_object(getThis()); if (!intern) { return; } - errstr[0] = '\0'; - - int extensions_size = 0; - char **extensions = NULL; - - if (extensions_hash != NULL) { - extensions_size = zend_hash_num_elements(extensions_hash) * 2; - extensions = safe_emalloc((extensions_size * 2), sizeof(char *), 0); - - int pos = 0; - zend_ulong num_key; - zend_string *extension_key_str; - zval *extension_zval; - ZEND_HASH_FOREACH_KEY_VAL(extensions_hash, num_key, extension_key_str, extension_zval) { - if (!extension_key_str) { - extension_key_str = zend_long_to_str(num_key); - extensions[pos++] = estrdup(ZSTR_VAL(extension_key_str)); - zend_string_release(extension_key_str); - } else { - extensions[pos++] = estrdup(ZSTR_VAL(extension_key_str)); - } - - zend_string *tmp_extension_val_str; - zend_string *extension_val_str = zval_get_tmp_string(extension_zval, &tmp_extension_val_str); - extensions[pos++] = estrdup(ZSTR_VAL(extension_val_str)); - if (tmp_extension_val_str) { - zend_string_release(tmp_extension_val_str); - } - } ZEND_HASH_FOREACH_END(); - } - - ret = rd_kafka_oauthbearer_set_token( - intern->rk, - token_value, - lifetime_ms, - principal_name, - (const char **)extensions, - extensions_size, - errstr, - sizeof(errstr)); - - if (extensions != NULL) { - for (int i = 0; i < extensions_size; i++) { - efree(extensions[i]); - } - efree(extensions); - } - - switch (ret) { - case RD_KAFKA_RESP_ERR__INVALID_ARG: - zend_throw_exception(ce_kafka_exception, errstr, RD_KAFKA_RESP_ERR__INVALID_ARG); - return; - case RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED: - zend_throw_exception(ce_kafka_exception, errstr, RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED); - return; - case RD_KAFKA_RESP_ERR__STATE: - zend_throw_exception(ce_kafka_exception, errstr, RD_KAFKA_RESP_ERR__STATE); - return; - case RD_KAFKA_RESP_ERR_NO_ERROR: - break; - default: - return; - } + oauthbearer_set_token(intern->rk, token_value, lifetime_ms, principal_name, extensions_hash); } /* }}} */ @@ -989,20 +930,7 @@ PHP_METHOD(RdKafka_KafkaConsumer, oauthbearerSetTokenFailure) return; } - rd_kafka_resp_err_t ret = rd_kafka_oauthbearer_set_token_failure(intern->rk, errstr); - - switch (ret) { - case RD_KAFKA_RESP_ERR__INVALID_ARG: - zend_throw_exception(ce_kafka_exception, NULL, RD_KAFKA_RESP_ERR__INVALID_ARG); - return; - case RD_KAFKA_RESP_ERR__STATE: - zend_throw_exception(ce_kafka_exception, NULL, RD_KAFKA_RESP_ERR__STATE); - return; - case RD_KAFKA_RESP_ERR_NO_ERROR: - break; - default: - return; - } + oauthbearer_set_token_failure(intern->rk, errstr); } /* }}} */ #endif diff --git a/oauthbearer.c b/oauthbearer.c new file mode 100644 index 00000000..cdeedfc7 --- /dev/null +++ b/oauthbearer.c @@ -0,0 +1,155 @@ +/* ++----------------------------------------------------------------------+ + | php-rdkafka | + +----------------------------------------------------------------------+ + | Copyright (c) 2025 Arnaud Le Blanc | + +----------------------------------------------------------------------+ + | This source file is subject to version 3.01 of the PHP license, | + | that is bundled with this package in the file LICENSE, and is | + | available through the world-wide-web at the following url: | + | http://www.php.net/license/3_01.txt | + | If you did not receive a copy of the PHP license and are unable to | + | obtain it through the world-wide-web, please send a note to | + | license@php.net so we can mail you a copy immediately. | + +----------------------------------------------------------------------+ + | Author: Martin Fris | + +----------------------------------------------------------------------+ +*/ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include "php.h" +#include "php_rdkafka.h" +#include "php_rdkafka_priv.h" +#include "librdkafka/rdkafka.h" +#include "Zend/zend_exceptions.h" +#include "ext/spl/spl_exceptions.h" +#include "conf.h" +#include "topic_partition.h" +#include "topic.h" +#include "message.h" +#include "metadata.h" +#if PHP_VERSION_ID < 80000 +#include "kafka_consumer_legacy_arginfo.h" +#else +#include "kafka_consumer_arginfo.h" +#endif + +void oauthbearer_set_token( + rd_kafka_t *rk, + const char *token_value, + zend_long lifetime_ms, + const char *principal_name, + const HashTable *extensions_hash +) { + char errstr[512]; + rd_kafka_resp_err_t ret = 0; + + errstr[0] = '\0'; + + int extensions_size = 0; + char **extensions = NULL; + + if (extensions_hash != NULL) { + extensions_size = zend_hash_num_elements(extensions_hash) * 2; + extensions = safe_emalloc((extensions_size * 2), sizeof(char *), 0); + + int pos = 0; + zend_ulong num_key; + zend_string *extension_key_str; + zval *extension_zval; + ZEND_HASH_FOREACH_KEY_VAL(extensions_hash, num_key, extension_key_str, extension_zval) { + if (!extension_key_str) { + extension_key_str = zend_long_to_str(num_key); + extensions[pos++] = estrdup(ZSTR_VAL(extension_key_str)); + zend_string_release(extension_key_str); + } else { + extensions[pos++] = estrdup(ZSTR_VAL(extension_key_str)); + } + + zend_string *tmp_extension_val_str; + zend_string *extension_val_str = zval_get_tmp_string(extension_zval, &tmp_extension_val_str); + extensions[pos++] = estrdup(ZSTR_VAL(extension_val_str)); + if (tmp_extension_val_str) { + zend_string_release(tmp_extension_val_str); + } + } ZEND_HASH_FOREACH_END(); + } + + ret = rd_kafka_oauthbearer_set_token( + rk, + token_value, + lifetime_ms, + principal_name, + (const char **)extensions, + extensions_size, + errstr, + sizeof(errstr)); + + if (extensions != NULL) { + for (int i = 0; i < extensions_size; i++) { + efree(extensions[i]); + } + efree(extensions); + } + + switch (ret) { + case RD_KAFKA_RESP_ERR__INVALID_ARG: + zend_throw_exception(ce_kafka_exception, errstr, RD_KAFKA_RESP_ERR__INVALID_ARG); + return; + case RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED: + zend_throw_exception(ce_kafka_exception, errstr, RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED); + return; + case RD_KAFKA_RESP_ERR__STATE: + zend_throw_exception(ce_kafka_exception, errstr, RD_KAFKA_RESP_ERR__STATE); + return; + case RD_KAFKA_RESP_ERR_NO_ERROR: + break; + default: + return; + } +} + +void oauthbearer_set_token_failure(rd_kafka_t *rk, const char *errstr) { + rd_kafka_resp_err_t ret = rd_kafka_oauthbearer_set_token_failure(rk, errstr); + + switch (ret) { + case RD_KAFKA_RESP_ERR__INVALID_ARG: + zend_throw_exception(ce_kafka_exception, NULL, RD_KAFKA_RESP_ERR__INVALID_ARG); + return; + case RD_KAFKA_RESP_ERR__STATE: + zend_throw_exception(ce_kafka_exception, NULL, RD_KAFKA_RESP_ERR__STATE); + return; + case RD_KAFKA_RESP_ERR_NO_ERROR: + break; + default: + return; + } +} + +int64_t zval_to_int64(zval *zval, const char *errstr) { + int64_t converted; + + /* On 32-bits, it might be required to pass $lifetime_ms as a float or a + * string */ + switch (Z_TYPE_P(zval)) { + case IS_LONG: + return (int64_t) Z_LVAL_P(zval); + break; + case IS_DOUBLE: + return (int64_t) Z_DVAL_P(zval); + break; + case IS_STRING:; + char *str = Z_STRVAL_P(zval); + char *end; + converted = (int64_t) strtoll(str, &end, 10); + if (end != str + Z_STRLEN_P(zval)) { + zend_throw_exception_ex(spl_ce_InvalidArgumentException, 0, errstr); + return 0; + } + break; + EMPTY_SWITCH_DEFAULT_CASE(); + } +} \ No newline at end of file diff --git a/oauthbearer.h b/oauthbearer.h new file mode 100644 index 00000000..d5dd42f4 --- /dev/null +++ b/oauthbearer.h @@ -0,0 +1,30 @@ +/* ++----------------------------------------------------------------------+ + | php-rdkafka | + +----------------------------------------------------------------------+ + | Copyright (c) 2025 Arnaud Le Blanc | + +----------------------------------------------------------------------+ + | This source file is subject to version 3.01 of the PHP license, | + | that is bundled with this package in the file LICENSE, and is | + | available through the world-wide-web at the following url: | + | http://www.php.net/license/3_01.txt | + | If you did not receive a copy of the PHP license and are unable to | + | obtain it through the world-wide-web, please send a note to | + | license@php.net so we can mail you a copy immediately. | + +----------------------------------------------------------------------+ + | Author: Martin Fris | + +----------------------------------------------------------------------+ +*/ + +#include "librdkafka/rdkafka.h" + +void oauthbearer_set_token(rd_kafka_t *rk, + const char *token_value, + zend_long lifetime_ms, + const char *principal_name, + const HashTable *extensions_hash +); + +void oauthbearer_set_token_failure(rd_kafka_t *rk, const char *errstr); + +int64_t zval_to_int64(zval *zval, const char *errstr); \ No newline at end of file diff --git a/rdkafka.c b/rdkafka.c index ef5656aa..c34ec9de 100644 --- a/rdkafka.c +++ b/rdkafka.c @@ -40,6 +40,9 @@ #include "queue.h" #include "message.h" #include "kafka_consumer.h" +#ifdef HAS_RD_KAFKA_OAUTHBEARER +#include "oauthbearer.h" +#endif #include "topic_partition.h" #if PHP_VERSION_ID < 80000 #include "rdkafka_legacy_arginfo.h" @@ -462,93 +465,14 @@ PHP_METHOD(RdKafka, oauthbearerSetToken) return; } - /* On 32-bits, it might be required to pass $lifetime_ms as a float or a - * string */ - switch (Z_TYPE_P(zlifetime_ms)) { - case IS_LONG: - lifetime_ms = (int64_t) Z_LVAL_P(zlifetime_ms); - break; - case IS_DOUBLE: - lifetime_ms = (int64_t) Z_DVAL_P(zlifetime_ms); - break; - case IS_STRING:; - char *str = Z_STRVAL_P(zlifetime_ms); - char *end; - lifetime_ms = (int64_t) strtoll(str, &end, 10); - if (end != str + Z_STRLEN_P(zlifetime_ms)) { - zend_throw_exception_ex(spl_ce_InvalidArgumentException, 0, "Argument #2 ($lifetime_ms) must be a valid integer"); - return; - } - break; - EMPTY_SWITCH_DEFAULT_CASE(); - } + lifetime_ms = zval_to_int64(zlifetime_ms, "Argument #2 ($lifetime_ms) must be a valid integer"); intern = get_kafka_object(getThis()); if (!intern) { return; } - errstr[0] = '\0'; - - int extensions_size = 0; - char **extensions = NULL; - - if (extensions_hash != NULL) { - extensions_size = zend_hash_num_elements(extensions_hash) * 2; - extensions = safe_emalloc((extensions_size * 2), sizeof(char *), 0); - - int pos = 0; - zend_ulong num_key; - zend_string *extension_key_str; - zval *extension_zval; - ZEND_HASH_FOREACH_KEY_VAL(extensions_hash, num_key, extension_key_str, extension_zval) { - if (!extension_key_str) { - extension_key_str = zend_long_to_str(num_key); - extensions[pos++] = estrdup(ZSTR_VAL(extension_key_str)); - zend_string_release(extension_key_str); - } else { - extensions[pos++] = estrdup(ZSTR_VAL(extension_key_str)); - } - - zend_string *tmp_extension_val_str; - zend_string *extension_val_str = zval_get_tmp_string(extension_zval, &tmp_extension_val_str); - extensions[pos++] = estrdup(ZSTR_VAL(extension_val_str)); - zend_tmp_string_release(tmp_extension_val_str); - } ZEND_HASH_FOREACH_END(); - } - - ret = rd_kafka_oauthbearer_set_token( - intern->rk, - token_value, - lifetime_ms, - principal_name, - (const char **)extensions, - extensions_size, - errstr, - sizeof(errstr)); - - if (extensions != NULL) { - for (int i = 0; i < extensions_size; i++) { - efree(extensions[i]); - } - efree(extensions); - } - - switch (ret) { - case RD_KAFKA_RESP_ERR__INVALID_ARG: - zend_throw_exception(ce_kafka_exception, errstr, RD_KAFKA_RESP_ERR__INVALID_ARG); - return; - case RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED: - zend_throw_exception(ce_kafka_exception, errstr, RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED); - return; - case RD_KAFKA_RESP_ERR__STATE: - zend_throw_exception(ce_kafka_exception, errstr, RD_KAFKA_RESP_ERR__STATE); - return; - case RD_KAFKA_RESP_ERR_NO_ERROR: - break; - default: - return; - } + oauthbearer_set_token(intern->rk, token_value, lifetime_ms, principal_name, extensions_hash); } /* }}} */ @@ -572,20 +496,7 @@ PHP_METHOD(RdKafka, oauthbearerSetTokenFailure) return; } - rd_kafka_resp_err_t ret = rd_kafka_oauthbearer_set_token_failure(intern->rk, errstr); - - switch (ret) { - case RD_KAFKA_RESP_ERR__INVALID_ARG: - zend_throw_exception(ce_kafka_exception, NULL, RD_KAFKA_RESP_ERR__INVALID_ARG); - return; - case RD_KAFKA_RESP_ERR__STATE: - zend_throw_exception(ce_kafka_exception, NULL, RD_KAFKA_RESP_ERR__STATE); - return; - case RD_KAFKA_RESP_ERR_NO_ERROR: - break; - default: - return; - } + oauthbearer_set_token_failure(intern->rk, errstr); } /* }}} */ #endif diff --git a/tests/oauthbearer_integration_hl.phpt b/tests/oauthbearer_integration_hl.phpt index 08fc2898..e78b664d 100644 --- a/tests/oauthbearer_integration_hl.phpt +++ b/tests/oauthbearer_integration_hl.phpt @@ -99,87 +99,24 @@ $confConsumer->setOauthbearerTokenRefreshCb(function ($consumer) { $consumer = new \RdKafka\KafkaConsumer($confConsumer); $consumer->subscribe([$topicName]); echo "Reading data\n"; -$message = $consumer->consume(5500); +$message = $consumer->consume(500); echo ($message->err === RD_KAFKA_RESP_ERR_NO_ERROR) ? "Read successful\n" : "Read Error\n"; echo $message->payload . "\n"; -//// Test that refresh token with setting token failure will fail when getting metadata -//$conf->setOauthbearerTokenRefreshCb(function ($producer) { -// echo "Setting token failure in refresh cb\n"; -// $producer->oauthbearerSetTokenFailure('Token failure before getting metadata'); -// $producer->poll(0); -//}); -//$producer = new \RdKafka\Producer($conf); -//$producer->poll(0); -//$topicName = sprintf("test_rdkafka_%s", uniqid()); -//$topic = $producer->newTopic($topicName); -//try { -// $producer->getMetadata(false, $topic, 10*1000); -// echo "FAIL: Did not catch exception after not setting or refreshing any token\n"; -//} catch (\RdKafka\Exception $e) { -// echo "Caught exception when getting metadata after not setting or refreshing any token\n"; -//} -// -//// Test that setting token without refreshing will get metadata successfully -//$conf->setOauthbearerTokenRefreshCb(function ($producer) {}); -//$producer = new \RdKafka\Producer($conf); -//$token = generateJws(); -//$producer->oauthbearerSetToken($token['value'], $token['expiryMs'], $token['principal']); -//$topicName = sprintf("test_rdkafka_%s", uniqid()); -//$topic = $producer->newTopic($topicName); -//try { -// $producer->getMetadata(false, $topic, 10*1000); -// echo "Got metadata successfully\n"; -//} catch (\RdKafka\Exception $e) { -// echo "FAIL: Set token but still got exception \n"; -// exit; -//} -// -//// Test that token refresh is called after token expires -//$conf->setOauthbearerTokenRefreshCb(function ($producer) { -// echo "Refreshing token\n"; -//}); -//$producer = new \RdKafka\Producer($conf); -//$token = generateJws('required-scope', 5); -//$producer->oauthbearerSetToken($token['value'], $token['expiryMs'], $token['principal']); -//$producer->poll(0); -//echo "Polled with refresh\n"; -//sleep(1); -//$producer->poll(0); -//echo "Polled without refresh\n"; -//sleep(4); -//$producer->poll(0); -//echo "Polled with refresh\n"; -// -//// Test that tokens without required scope fail -//$producer = new \RdKafka\Producer($conf); -//$token = generateJws('not-required-scope'); -//$producer->oauthbearerSetToken($token['value'], $token['expiryMs'], $token['principal']); -//$topicName = sprintf("test_rdkafka_%s", uniqid()); -//$topic = $producer->newTopic($topicName); -//try { -// $producer->getMetadata(false, $topic, 10*1000); -// echo "FAIL: Exception not thrown as expected when using insufficient scope\n"; -// exit; -//} catch (\RdKafka\Exception $e) { -// echo "Caught expected exception with insufficient_scope\n"; -//} -// -//// Test that setting token with extensions succeeds -//$conf->setOauthbearerTokenRefreshCb(function ($producer) {}); -//$producer = new \RdKafka\Producer($conf); -//$token = generateJws(); -//$producer->oauthbearerSetToken($token['value'], $token['expiryMs'], $token['principal'], ['testExtensionKey' => 'Test extension value']); -//$producer->poll(0); +// Test that refresh token with setting token failure will fail when trying to read data +$confConsumer->setOauthbearerTokenRefreshCb(function ($consumer) { + echo "Setting token failure in refresh cb\n"; + $consumer->oauthbearerSetTokenFailure('Token failure before data consumption'); +}); +$confConsumer->set('group.id', 'test_group_fail'); -//Setting token failure in refresh cb -//Local: Authentication failure: Failed to acquire SASL OAUTHBEARER token: Token failure before getting metadata -//Caught exception when getting metadata after not setting or refreshing any token--EXPECT-- -//Got metadata successfullyRefreshing token and succeeding -//Refreshing tokenMetadata retrieved successfully when refresh callback set token -//Polled with refreshWriting test data -//Polled without refreshWrite successful -//Refreshing token +$consumer = new \RdKafka\KafkaConsumer($confConsumer); +$consumer->subscribe([$topicName]); +echo "Reading data\n"; + +$message = $consumer->consume(500); +echo $message->err === -185 ? "Received empty message when reading data after not setting or refreshing any token\n" : + "FAIL: Did receive a message after not setting or refreshing any token\n"; --EXPECT-- Refreshing token and succeeding @@ -190,3 +127,7 @@ Reading data Refreshing token and succeeding Read successful Test +Reading data +Setting token failure in refresh cb +Local: Authentication failure: Failed to acquire SASL OAUTHBEARER token: Token failure before data consumption +Received empty message when reading data after not setting or refreshing any token From 805aac083c597768b11bff36c509d7d8017d67e3 Mon Sep 17 00:00:00 2001 From: Rastusik <1735097+Rastusik@users.noreply.github.com> Date: Tue, 15 Apr 2025 13:42:24 +0200 Subject: [PATCH 03/15] package fix --- package.xml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/package.xml b/package.xml index cf03069e..fc8f2cef 100644 --- a/package.xml +++ b/package.xml @@ -85,6 +85,8 @@ + + @@ -136,6 +138,7 @@ + From ad53bce857b5051851cdc945a9d741517b7f21d5 Mon Sep 17 00:00:00 2001 From: Rastusik <1735097+Rastusik@users.noreply.github.com> Date: Wed, 16 Apr 2025 13:15:38 +0200 Subject: [PATCH 04/15] adding poll method to KafkaConsumer --- kafka_consumer.c | 20 +++++++++++++++++ kafka_consumer.stub.php | 3 +++ kafka_consumer_arginfo.h | 10 +++++++++ tests/oauthbearer_integration_hl.phpt | 32 ++++++++++++++++++--------- 4 files changed, 55 insertions(+), 10 deletions(-) diff --git a/kafka_consumer.c b/kafka_consumer.c index 4d91b792..0d201596 100644 --- a/kafka_consumer.c +++ b/kafka_consumer.c @@ -871,6 +871,26 @@ PHP_METHOD(RdKafka_KafkaConsumer, resumePartitions) } /* }}} */ +/* {{{ proto int RdKafka::poll(int $timeout_ms) + Polls the provided kafka handle for events */ +PHP_METHOD(RdKafka_KafkaConsumer, poll) +{ + object_intern *intern; + zend_long timeout; + + if (zend_parse_parameters(ZEND_NUM_ARGS(), "l", &timeout) == FAILURE) { + return; + } + + intern = get_object(getThis()); + if (!intern) { + return; + } + + RETURN_LONG(rd_kafka_poll(intern->rk, timeout)); +} +/* }}} */ + #ifdef HAS_RD_KAFKA_OAUTHBEARER /* {{{ proto void RdKafka\KafkaConsumer::oauthbearerSetToken(string $token_value, int $lifetime_ms, string $principal_name, array $extensions = []) * Set SASL/OAUTHBEARER token and metadata diff --git a/kafka_consumer.stub.php b/kafka_consumer.stub.php index 977904bd..28d760eb 100644 --- a/kafka_consumer.stub.php +++ b/kafka_consumer.stub.php @@ -82,6 +82,9 @@ public function pausePartitions(array $topic_partitions): array {} /** @tentative-return-type */ public function resumePartitions(array $topic_partitions): array {} + /** @tentative-return-type */ + public function poll(int $timeout_ms): int {} + #ifdef HAS_RD_KAFKA_OAUTHBEARER /** @tentative-return-type */ public function oauthbearerSetToken(string $token_value, int $lifetime_ms, string $principal_name, array $extensions = []): void {} diff --git a/kafka_consumer_arginfo.h b/kafka_consumer_arginfo.h index f0b22645..b57f127a 100644 --- a/kafka_consumer_arginfo.h +++ b/kafka_consumer_arginfo.h @@ -129,6 +129,14 @@ ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_queryWatermarkOffsets ZEND_ARG_TYPE_INFO(0, timeout_ms, IS_LONG, 0) ZEND_END_ARG_INFO() +#if (PHP_VERSION_ID >= 80100) +ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_poll, 0, 1, IS_LONG, 0) +#else +ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_poll, 0, 0, 1) +#endif + ZEND_ARG_TYPE_INFO(0, timeout_ms, IS_LONG, 0) +ZEND_END_ARG_INFO() + #if defined(HAS_RD_KAFKA_OAUTHBEARER) #if (PHP_VERSION_ID >= 80100) ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_oauthbearerSetToken, 0, 3, IS_VOID, 0) @@ -183,6 +191,7 @@ ZEND_METHOD(RdKafka_KafkaConsumer, queryWatermarkOffsets); ZEND_METHOD(RdKafka_KafkaConsumer, offsetsForTimes); ZEND_METHOD(RdKafka_KafkaConsumer, pausePartitions); ZEND_METHOD(RdKafka_KafkaConsumer, resumePartitions); +ZEND_METHOD(RdKafka_KafkaConsumer, poll); #if defined(HAS_RD_KAFKA_OAUTHBEARER) ZEND_METHOD(RdKafka_KafkaConsumer, oauthbearerSetToken); ZEND_METHOD(RdKafka_KafkaConsumer, oauthbearerSetTokenFailure); @@ -216,6 +225,7 @@ static const zend_function_entry class_RdKafka_KafkaConsumer_methods[] = { ZEND_ME(RdKafka_KafkaConsumer, offsetsForTimes, arginfo_class_RdKafka_KafkaConsumer_offsetsForTimes, ZEND_ACC_PUBLIC) ZEND_ME(RdKafka_KafkaConsumer, pausePartitions, arginfo_class_RdKafka_KafkaConsumer_pausePartitions, ZEND_ACC_PUBLIC) ZEND_ME(RdKafka_KafkaConsumer, resumePartitions, arginfo_class_RdKafka_KafkaConsumer_resumePartitions, ZEND_ACC_PUBLIC) + ZEND_ME(RdKafka_KafkaConsumer, poll, arginfo_class_RdKafka_KafkaConsumer_poll, ZEND_ACC_PUBLIC) #if defined(HAS_RD_KAFKA_OAUTHBEARER) ZEND_ME(RdKafka_KafkaConsumer, oauthbearerSetToken, arginfo_class_RdKafka_KafkaConsumer_oauthbearerSetToken, ZEND_ACC_PUBLIC) ZEND_ME(RdKafka_KafkaConsumer, oauthbearerSetTokenFailure, arginfo_class_RdKafka_KafkaConsumer_oauthbearerSetTokenFailure, ZEND_ACC_PUBLIC) diff --git a/tests/oauthbearer_integration_hl.phpt b/tests/oauthbearer_integration_hl.phpt index e78b664d..3877364f 100644 --- a/tests/oauthbearer_integration_hl.phpt +++ b/tests/oauthbearer_integration_hl.phpt @@ -60,14 +60,6 @@ $producer->poll(0); $topicName = sprintf("test_rdkafka_%s", uniqid()); $topic = $producer->newTopic($topicName); -try { - $producer->getMetadata(false, $topic, 10*1000); - echo "Metadata retrieved successfully when refresh callback set token\n"; -} catch (\RdKafka\Exception $e) { - echo "FAIL: Caught exception when getting metadata after successfully refreshing any token:\n"; - printf("%s: %s\n", get_class($e), $e->getMessage()); -} - echo "Writing test data\n"; $topic->produce(RD_KAFKA_PARTITION_UA, 0, "Test"); $producer->poll(0); @@ -89,7 +81,7 @@ $confConsumer->setErrorCb(function ($producer, $err, $errstr) { printf("%s: %s\n", rd_kafka_err2str($err), $errstr); }); -// Test that refresh token with setting token accurately will succeed when getting metadata +// Test that refresh token with setting token accurately will succeed when consuming data $confConsumer->setOauthbearerTokenRefreshCb(function ($consumer) { echo "Refreshing token and succeeding\n"; $token = generateJws(); @@ -118,9 +110,26 @@ $message = $consumer->consume(500); echo $message->err === -185 ? "Received empty message when reading data after not setting or refreshing any token\n" : "FAIL: Did receive a message after not setting or refreshing any token\n"; +// Test that metadata will be loaded before data consumption, under the condition that poll is called +$confConsumer->setOauthbearerTokenRefreshCb(function ($consumer) { + echo "Refreshing token on poll and succeeding\n"; + $token = generateJws(); + $consumer->oauthbearerSetToken($token['value'], (string) $token['expiryMs'], $token['principal']); +}); +$consumer = new \RdKafka\KafkaConsumer($confConsumer); +$consumerTopic = $consumer->newTopic($topicName); +$consumer->poll(0); + +try { + echo "Reading metadata\n"; + $consumer->getMetadata(false, $consumerTopic, 1000); + echo "Metadata was fetched successfully after calling poll\n"; +} catch (\RdKafka\Exception $e) { + echo "FAIL: Caught exception when getting metadata after calling poll\n"; +} + --EXPECT-- Refreshing token and succeeding -Metadata retrieved successfully when refresh callback set token Writing test data Write successful Reading data @@ -131,3 +140,6 @@ Reading data Setting token failure in refresh cb Local: Authentication failure: Failed to acquire SASL OAUTHBEARER token: Token failure before data consumption Received empty message when reading data after not setting or refreshing any token +Refreshing token on poll and succeeding +Reading metadata +Metadata was fetched successfully after calling poll From 6db8d21799f63fb53bc66dfe3843a175638444df Mon Sep 17 00:00:00 2001 From: Arnaud Le Blanc Date: Wed, 21 May 2025 17:09:28 +0200 Subject: [PATCH 05/15] Fix const qualifier error --- oauthbearer.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/oauthbearer.c b/oauthbearer.c index cdeedfc7..293c4ab5 100644 --- a/oauthbearer.c +++ b/oauthbearer.c @@ -60,7 +60,7 @@ void oauthbearer_set_token( zend_ulong num_key; zend_string *extension_key_str; zval *extension_zval; - ZEND_HASH_FOREACH_KEY_VAL(extensions_hash, num_key, extension_key_str, extension_zval) { + ZEND_HASH_FOREACH_KEY_VAL((HashTable*)extensions_hash, num_key, extension_key_str, extension_zval) { if (!extension_key_str) { extension_key_str = zend_long_to_str(num_key); extensions[pos++] = estrdup(ZSTR_VAL(extension_key_str)); From 3018ba4be16781215988daa51e56dae85a025652 Mon Sep 17 00:00:00 2001 From: Arnaud Le Blanc Date: Wed, 21 May 2025 17:09:44 +0200 Subject: [PATCH 06/15] Fix passing non-literal format string --- oauthbearer.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/oauthbearer.c b/oauthbearer.c index 293c4ab5..4beaed36 100644 --- a/oauthbearer.c +++ b/oauthbearer.c @@ -146,10 +146,10 @@ int64_t zval_to_int64(zval *zval, const char *errstr) { char *end; converted = (int64_t) strtoll(str, &end, 10); if (end != str + Z_STRLEN_P(zval)) { - zend_throw_exception_ex(spl_ce_InvalidArgumentException, 0, errstr); + zend_throw_exception(spl_ce_InvalidArgumentException, errstr, 0); return 0; } break; EMPTY_SWITCH_DEFAULT_CASE(); } -} \ No newline at end of file +} From e661a9b196260c84f90569486868ef827b3f6211 Mon Sep 17 00:00:00 2001 From: Arnaud Le Blanc Date: Wed, 21 May 2025 17:10:42 +0200 Subject: [PATCH 07/15] WS --- oauthbearer.c | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/oauthbearer.c b/oauthbearer.c index 4beaed36..274f197d 100644 --- a/oauthbearer.c +++ b/oauthbearer.c @@ -142,13 +142,13 @@ int64_t zval_to_int64(zval *zval, const char *errstr) { return (int64_t) Z_DVAL_P(zval); break; case IS_STRING:; - char *str = Z_STRVAL_P(zval); - char *end; - converted = (int64_t) strtoll(str, &end, 10); - if (end != str + Z_STRLEN_P(zval)) { - zend_throw_exception(spl_ce_InvalidArgumentException, errstr, 0); - return 0; - } + char *str = Z_STRVAL_P(zval); + char *end; + converted = (int64_t) strtoll(str, &end, 10); + if (end != str + Z_STRLEN_P(zval)) { + zend_throw_exception(spl_ce_InvalidArgumentException, errstr, 0); + return 0; + } break; EMPTY_SWITCH_DEFAULT_CASE(); } From 9a786e3c303f583435d2fd0ab2d22c3743b89590 Mon Sep 17 00:00:00 2001 From: Arnaud Le Blanc Date: Wed, 21 May 2025 17:10:57 +0200 Subject: [PATCH 08/15] Return converted value --- oauthbearer.c | 1 + 1 file changed, 1 insertion(+) diff --git a/oauthbearer.c b/oauthbearer.c index 274f197d..e7f1efce 100644 --- a/oauthbearer.c +++ b/oauthbearer.c @@ -149,6 +149,7 @@ int64_t zval_to_int64(zval *zval, const char *errstr) { zend_throw_exception(spl_ce_InvalidArgumentException, errstr, 0); return 0; } + return converted; break; EMPTY_SWITCH_DEFAULT_CASE(); } From 4e908907e9cd3a3bce16215606d18a216bbd6e13 Mon Sep 17 00:00:00 2001 From: Arnaud Le Blanc Date: Wed, 21 May 2025 17:11:53 +0200 Subject: [PATCH 09/15] Move comment --- oauthbearer.c | 2 -- rdkafka.c | 2 ++ 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/oauthbearer.c b/oauthbearer.c index e7f1efce..7b64da5e 100644 --- a/oauthbearer.c +++ b/oauthbearer.c @@ -132,8 +132,6 @@ void oauthbearer_set_token_failure(rd_kafka_t *rk, const char *errstr) { int64_t zval_to_int64(zval *zval, const char *errstr) { int64_t converted; - /* On 32-bits, it might be required to pass $lifetime_ms as a float or a - * string */ switch (Z_TYPE_P(zval)) { case IS_LONG: return (int64_t) Z_LVAL_P(zval); diff --git a/rdkafka.c b/rdkafka.c index c34ec9de..d21bd1ab 100644 --- a/rdkafka.c +++ b/rdkafka.c @@ -465,6 +465,8 @@ PHP_METHOD(RdKafka, oauthbearerSetToken) return; } + /* On 32-bits, it might be required to pass $lifetime_ms as a float or a + * string */ lifetime_ms = zval_to_int64(zlifetime_ms, "Argument #2 ($lifetime_ms) must be a valid integer"); intern = get_kafka_object(getThis()); From 030370fd9f5d122bd1130d5c1fbe54960bbbce11 Mon Sep 17 00:00:00 2001 From: Arnaud Le Blanc Date: Wed, 21 May 2025 17:37:03 +0200 Subject: [PATCH 10/15] Fix warning --- rdkafka.c | 3 --- 1 file changed, 3 deletions(-) diff --git a/rdkafka.c b/rdkafka.c index d21bd1ab..94ce9172 100644 --- a/rdkafka.c +++ b/rdkafka.c @@ -458,9 +458,6 @@ PHP_METHOD(RdKafka, oauthbearerSetToken) size_t principal_len; HashTable *extensions_hash = NULL; - char errstr[512]; - rd_kafka_resp_err_t ret = 0; - if (zend_parse_parameters(ZEND_NUM_ARGS(), "szs|h", &token_value, &token_value_len, &zlifetime_ms, &principal_name, &principal_len, &extensions_hash) == FAILURE) { return; } From bb5ec1f778b886c95e51d08a31271b6eb5e2bf2d Mon Sep 17 00:00:00 2001 From: Arnaud Le Blanc Date: Wed, 21 May 2025 17:46:40 +0200 Subject: [PATCH 11/15] Fix warning: register_class_RdKafka_KafkaConsumer defined but not used --- oauthbearer.c | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/oauthbearer.c b/oauthbearer.c index 7b64da5e..4ae089cf 100644 --- a/oauthbearer.c +++ b/oauthbearer.c @@ -23,19 +23,8 @@ #include "php.h" #include "php_rdkafka.h" #include "php_rdkafka_priv.h" -#include "librdkafka/rdkafka.h" #include "Zend/zend_exceptions.h" #include "ext/spl/spl_exceptions.h" -#include "conf.h" -#include "topic_partition.h" -#include "topic.h" -#include "message.h" -#include "metadata.h" -#if PHP_VERSION_ID < 80000 -#include "kafka_consumer_legacy_arginfo.h" -#else -#include "kafka_consumer_arginfo.h" -#endif void oauthbearer_set_token( rd_kafka_t *rk, From e8a565182595e94a635d29033e1491cac8673df3 Mon Sep 17 00:00:00 2001 From: Arnaud Le Blanc Date: Mon, 26 May 2025 15:25:55 +0200 Subject: [PATCH 12/15] Increase timeout --- tests/oauthbearer_integration_hl.phpt | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/oauthbearer_integration_hl.phpt b/tests/oauthbearer_integration_hl.phpt index 3877364f..57db6b67 100644 --- a/tests/oauthbearer_integration_hl.phpt +++ b/tests/oauthbearer_integration_hl.phpt @@ -91,8 +91,8 @@ $confConsumer->setOauthbearerTokenRefreshCb(function ($consumer) { $consumer = new \RdKafka\KafkaConsumer($confConsumer); $consumer->subscribe([$topicName]); echo "Reading data\n"; -$message = $consumer->consume(500); -echo ($message->err === RD_KAFKA_RESP_ERR_NO_ERROR) ? "Read successful\n" : "Read Error\n"; +$message = $consumer->consume(10*1000); +echo ($message->err === RD_KAFKA_RESP_ERR_NO_ERROR) ? "Read successful\n" : "Read Error: " . $message->errstr() . "\n"; echo $message->payload . "\n"; // Test that refresh token with setting token failure will fail when trying to read data @@ -106,7 +106,7 @@ $consumer = new \RdKafka\KafkaConsumer($confConsumer); $consumer->subscribe([$topicName]); echo "Reading data\n"; -$message = $consumer->consume(500); +$message = $consumer->consume(10*1000); echo $message->err === -185 ? "Received empty message when reading data after not setting or refreshing any token\n" : "FAIL: Did receive a message after not setting or refreshing any token\n"; From a7f25efc8d2a592d220fccaf478864b0a6f9e8ab Mon Sep 17 00:00:00 2001 From: Arnaud Le Blanc Date: Mon, 26 May 2025 15:50:58 +0200 Subject: [PATCH 13/15] Update arginfos --- kafka_consumer_arginfo.h | 24 +++++++++++--------- kafka_consumer_legacy_arginfo.h | 39 ++++++++++++++++++++++++++++----- 2 files changed, 48 insertions(+), 15 deletions(-) diff --git a/kafka_consumer_arginfo.h b/kafka_consumer_arginfo.h index b57f127a..3f4a6b20 100644 --- a/kafka_consumer_arginfo.h +++ b/kafka_consumer_arginfo.h @@ -1,5 +1,5 @@ /* This is a generated file, edit the .stub.php file instead. - * Stub hash: 19d0e5f9de1e91016dd8e8c87e88c3d17e0c094f */ + * Stub hash: c992beb679f8970a0d6af9fb06592d8c3c4cb232 */ ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer___construct, 0, 0, 1) ZEND_ARG_OBJ_INFO(0, conf, RdKafka\\Conf, 0) @@ -129,6 +129,12 @@ ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_queryWatermarkOffsets ZEND_ARG_TYPE_INFO(0, timeout_ms, IS_LONG, 0) ZEND_END_ARG_INFO() +#define arginfo_class_RdKafka_KafkaConsumer_offsetsForTimes arginfo_class_RdKafka_KafkaConsumer_getCommittedOffsets + +#define arginfo_class_RdKafka_KafkaConsumer_pausePartitions arginfo_class_RdKafka_KafkaConsumer_getOffsetPositions + +#define arginfo_class_RdKafka_KafkaConsumer_resumePartitions arginfo_class_RdKafka_KafkaConsumer_getOffsetPositions + #if (PHP_VERSION_ID >= 80100) ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_poll, 0, 1, IS_LONG, 0) #else @@ -148,7 +154,9 @@ ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_oauthbearerSetToken, ZEND_ARG_TYPE_INFO(0, principal_name, IS_STRING, 0) ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(0, extensions, IS_ARRAY, 0, "[]") ZEND_END_ARG_INFO() +#endif +#if defined(HAS_RD_KAFKA_OAUTHBEARER) #if (PHP_VERSION_ID >= 80100) ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_oauthbearerSetTokenFailure, 0, 1, IS_VOID, 0) #else @@ -158,11 +166,6 @@ ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_oauthbearerSetTokenFa ZEND_END_ARG_INFO() #endif -#define arginfo_class_RdKafka_KafkaConsumer_offsetsForTimes arginfo_class_RdKafka_KafkaConsumer_getCommittedOffsets - -#define arginfo_class_RdKafka_KafkaConsumer_pausePartitions arginfo_class_RdKafka_KafkaConsumer_getOffsetPositions - -#define arginfo_class_RdKafka_KafkaConsumer_resumePartitions arginfo_class_RdKafka_KafkaConsumer_getOffsetPositions ZEND_METHOD(RdKafka_KafkaConsumer, __construct); ZEND_METHOD(RdKafka_KafkaConsumer, assign); @@ -194,9 +197,12 @@ ZEND_METHOD(RdKafka_KafkaConsumer, resumePartitions); ZEND_METHOD(RdKafka_KafkaConsumer, poll); #if defined(HAS_RD_KAFKA_OAUTHBEARER) ZEND_METHOD(RdKafka_KafkaConsumer, oauthbearerSetToken); +#endif +#if defined(HAS_RD_KAFKA_OAUTHBEARER) ZEND_METHOD(RdKafka_KafkaConsumer, oauthbearerSetTokenFailure); #endif + static const zend_function_entry class_RdKafka_KafkaConsumer_methods[] = { ZEND_ME(RdKafka_KafkaConsumer, __construct, arginfo_class_RdKafka_KafkaConsumer___construct, ZEND_ACC_PUBLIC) ZEND_ME(RdKafka_KafkaConsumer, assign, arginfo_class_RdKafka_KafkaConsumer_assign, ZEND_ACC_PUBLIC) @@ -228,6 +234,8 @@ static const zend_function_entry class_RdKafka_KafkaConsumer_methods[] = { ZEND_ME(RdKafka_KafkaConsumer, poll, arginfo_class_RdKafka_KafkaConsumer_poll, ZEND_ACC_PUBLIC) #if defined(HAS_RD_KAFKA_OAUTHBEARER) ZEND_ME(RdKafka_KafkaConsumer, oauthbearerSetToken, arginfo_class_RdKafka_KafkaConsumer_oauthbearerSetToken, ZEND_ACC_PUBLIC) +#endif +#if defined(HAS_RD_KAFKA_OAUTHBEARER) ZEND_ME(RdKafka_KafkaConsumer, oauthbearerSetTokenFailure, arginfo_class_RdKafka_KafkaConsumer_oauthbearerSetTokenFailure, ZEND_ACC_PUBLIC) #endif ZEND_FE_END @@ -238,11 +246,7 @@ static zend_class_entry *register_class_RdKafka_KafkaConsumer(void) zend_class_entry ce, *class_entry; INIT_NS_CLASS_ENTRY(ce, "RdKafka", "KafkaConsumer", class_RdKafka_KafkaConsumer_methods); -#if (PHP_VERSION_ID >= 80400) - class_entry = zend_register_internal_class_with_flags(&ce, NULL, 0); -#else class_entry = zend_register_internal_class_ex(&ce, NULL); -#endif zval property_error_cb_default_value; ZVAL_UNDEF(&property_error_cb_default_value); diff --git a/kafka_consumer_legacy_arginfo.h b/kafka_consumer_legacy_arginfo.h index 559fbfef..3c4b285d 100644 --- a/kafka_consumer_legacy_arginfo.h +++ b/kafka_consumer_legacy_arginfo.h @@ -1,5 +1,5 @@ /* This is a generated file, edit the .stub.php file instead. - * Stub hash: 19d0e5f9de1e91016dd8e8c87e88c3d17e0c094f */ + * Stub hash: c992beb679f8970a0d6af9fb06592d8c3c4cb232 */ ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer___construct, 0, 0, 1) ZEND_ARG_INFO(0, conf) @@ -82,6 +82,24 @@ ZEND_END_ARG_INFO() #define arginfo_class_RdKafka_KafkaConsumer_resumePartitions arginfo_class_RdKafka_KafkaConsumer_getOffsetPositions +#define arginfo_class_RdKafka_KafkaConsumer_poll arginfo_class_RdKafka_KafkaConsumer_consume + +#if defined(HAS_RD_KAFKA_OAUTHBEARER) +ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_oauthbearerSetToken, 0, 0, 3) + ZEND_ARG_INFO(0, token_value) + ZEND_ARG_INFO(0, lifetime_ms) + ZEND_ARG_INFO(0, principal_name) + ZEND_ARG_INFO(0, extensions) +ZEND_END_ARG_INFO() +#endif + +#if defined(HAS_RD_KAFKA_OAUTHBEARER) +ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_oauthbearerSetTokenFailure, 0, 0, 1) + ZEND_ARG_INFO(0, error) +ZEND_END_ARG_INFO() +#endif + + ZEND_METHOD(RdKafka_KafkaConsumer, __construct); ZEND_METHOD(RdKafka_KafkaConsumer, assign); #if defined(HAS_RD_KAFKA_INCREMENTAL_ASSIGN) @@ -109,6 +127,14 @@ ZEND_METHOD(RdKafka_KafkaConsumer, queryWatermarkOffsets); ZEND_METHOD(RdKafka_KafkaConsumer, offsetsForTimes); ZEND_METHOD(RdKafka_KafkaConsumer, pausePartitions); ZEND_METHOD(RdKafka_KafkaConsumer, resumePartitions); +ZEND_METHOD(RdKafka_KafkaConsumer, poll); +#if defined(HAS_RD_KAFKA_OAUTHBEARER) +ZEND_METHOD(RdKafka_KafkaConsumer, oauthbearerSetToken); +#endif +#if defined(HAS_RD_KAFKA_OAUTHBEARER) +ZEND_METHOD(RdKafka_KafkaConsumer, oauthbearerSetTokenFailure); +#endif + static const zend_function_entry class_RdKafka_KafkaConsumer_methods[] = { ZEND_ME(RdKafka_KafkaConsumer, __construct, arginfo_class_RdKafka_KafkaConsumer___construct, ZEND_ACC_PUBLIC) @@ -138,6 +164,13 @@ static const zend_function_entry class_RdKafka_KafkaConsumer_methods[] = { ZEND_ME(RdKafka_KafkaConsumer, offsetsForTimes, arginfo_class_RdKafka_KafkaConsumer_offsetsForTimes, ZEND_ACC_PUBLIC) ZEND_ME(RdKafka_KafkaConsumer, pausePartitions, arginfo_class_RdKafka_KafkaConsumer_pausePartitions, ZEND_ACC_PUBLIC) ZEND_ME(RdKafka_KafkaConsumer, resumePartitions, arginfo_class_RdKafka_KafkaConsumer_resumePartitions, ZEND_ACC_PUBLIC) + ZEND_ME(RdKafka_KafkaConsumer, poll, arginfo_class_RdKafka_KafkaConsumer_poll, ZEND_ACC_PUBLIC) +#if defined(HAS_RD_KAFKA_OAUTHBEARER) + ZEND_ME(RdKafka_KafkaConsumer, oauthbearerSetToken, arginfo_class_RdKafka_KafkaConsumer_oauthbearerSetToken, ZEND_ACC_PUBLIC) +#endif +#if defined(HAS_RD_KAFKA_OAUTHBEARER) + ZEND_ME(RdKafka_KafkaConsumer, oauthbearerSetTokenFailure, arginfo_class_RdKafka_KafkaConsumer_oauthbearerSetTokenFailure, ZEND_ACC_PUBLIC) +#endif ZEND_FE_END }; @@ -146,11 +179,7 @@ static zend_class_entry *register_class_RdKafka_KafkaConsumer(void) zend_class_entry ce, *class_entry; INIT_NS_CLASS_ENTRY(ce, "RdKafka", "KafkaConsumer", class_RdKafka_KafkaConsumer_methods); -#if (PHP_VERSION_ID >= 80400) - class_entry = zend_register_internal_class_with_flags(&ce, NULL, 0); -#else class_entry = zend_register_internal_class_ex(&ce, NULL); -#endif zval property_error_cb_default_value; ZVAL_NULL(&property_error_cb_default_value); From b59da8529d419d3a6a1cb9c23788f7a08b7f1573 Mon Sep 17 00:00:00 2001 From: Arnaud Le Blanc Date: Mon, 26 May 2025 15:53:02 +0200 Subject: [PATCH 14/15] Increase timeout --- tests/oauthbearer_integration_hl.phpt | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/oauthbearer_integration_hl.phpt b/tests/oauthbearer_integration_hl.phpt index 57db6b67..fbe2cba5 100644 --- a/tests/oauthbearer_integration_hl.phpt +++ b/tests/oauthbearer_integration_hl.phpt @@ -122,10 +122,11 @@ $consumer->poll(0); try { echo "Reading metadata\n"; - $consumer->getMetadata(false, $consumerTopic, 1000); + $consumer->getMetadata(false, $consumerTopic, 10*1000); echo "Metadata was fetched successfully after calling poll\n"; } catch (\RdKafka\Exception $e) { - echo "FAIL: Caught exception when getting metadata after calling poll\n"; + echo "FAIL: Caught exception when getting metadata after calling poll:\n"; + echo $e, "\n"; } --EXPECT-- From f7bcd7b90e828244bb928126a82ff454202241d9 Mon Sep 17 00:00:00 2001 From: Arnaud Le Blanc Date: Mon, 26 May 2025 16:12:10 +0200 Subject: [PATCH 15/15] Fix regression --- oauthbearer.c | 2 +- oauthbearer.h | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/oauthbearer.c b/oauthbearer.c index 4ae089cf..e692ef0c 100644 --- a/oauthbearer.c +++ b/oauthbearer.c @@ -29,7 +29,7 @@ void oauthbearer_set_token( rd_kafka_t *rk, const char *token_value, - zend_long lifetime_ms, + int64_t lifetime_ms, const char *principal_name, const HashTable *extensions_hash ) { diff --git a/oauthbearer.h b/oauthbearer.h index d5dd42f4..9c7b466e 100644 --- a/oauthbearer.h +++ b/oauthbearer.h @@ -20,11 +20,11 @@ void oauthbearer_set_token(rd_kafka_t *rk, const char *token_value, - zend_long lifetime_ms, + int64_t lifetime_ms, const char *principal_name, const HashTable *extensions_hash ); void oauthbearer_set_token_failure(rd_kafka_t *rk, const char *errstr); -int64_t zval_to_int64(zval *zval, const char *errstr); \ No newline at end of file +int64_t zval_to_int64(zval *zval, const char *errstr);