diff --git a/.github/workflows/package.yml b/.github/workflows/package.yml
index 38655673..955b3e4c 100644
--- a/.github/workflows/package.yml
+++ b/.github/workflows/package.yml
@@ -20,6 +20,6 @@ jobs:
run: './php-rdkafka/.github/workflows/package/package.sh'
- name: 'Archive package'
- uses: 'actions/upload-artifact@v2'
+ uses: 'actions/upload-artifact@v4'
with:
path: 'php-rdkafka/rdkafka.tgz'
diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml
index 7ae9ecb2..d986c660 100644
--- a/.github/workflows/test.yml
+++ b/.github/workflows/test.yml
@@ -109,28 +109,39 @@ jobs:
# librdkafka 1.0.1
- php: '8.1.0'
librdkafka: 'v1.0.1'
+ skipoauth: '1'
- php: '8.0.0'
librdkafka: 'v1.0.1'
+ skipoauth: '1'
- php: '7.4.0'
librdkafka: 'v1.0.1'
+ skipoauth: '1'
- php: '7.3.0'
librdkafka: 'v1.0.1'
+ skipoauth: '1'
# librdkafka 0.11.6
- php: '8.1.0'
librdkafka: 'v0.11.6'
+ skipoauth: '1'
- php: '8.0.0'
librdkafka: 'v0.11.6'
+ skipoauth: '1'
- php: '7.4.0'
librdkafka: 'v0.11.6'
+ skipoauth: '1'
- php: '7.3.0'
librdkafka: 'v0.11.6'
+ skipoauth: '1'
- php: '7.2.0'
librdkafka: 'v0.11.6'
+ skipoauth: '1'
- php: '7.1.0'
librdkafka: 'v0.11.6'
+ skipoauth: '1'
- php: '7.0.0'
librdkafka: 'v0.11.6'
+ skipoauth: '1'
# librdkafka master (experimental, does not block PRs)
- php: '8.3.0'
@@ -158,6 +169,7 @@ jobs:
PHP_VERSION: ${{ matrix.php }}
LIBRDKAFKA_VERSION: ${{ matrix.librdkafka }}
MEMORY_CHECK: ${{ matrix.memcheck }}
+ SKIP_OAUTH: ${{ matrix.skipoauth }}
TEST_KAFKA_BROKERS: kafka:9092
TEST_KAFKA_BROKER_VERSION: 2.6
steps:
diff --git a/.github/workflows/test/start-kafka.sh b/.github/workflows/test/start-kafka.sh
index 8892f6f5..1d53b60d 100755
--- a/.github/workflows/test/start-kafka.sh
+++ b/.github/workflows/test/start-kafka.sh
@@ -4,17 +4,63 @@ docker network create kafka_network
docker pull wurstmeister/zookeeper:latest
docker run -d --network kafka_network --name zookeeper wurstmeister/zookeeper:latest
docker pull wurstmeister/kafka:latest
-docker run -d -p 9092:9092 --network kafka_network -e "KAFKA_AUTO_CREATE_TOPICS_ENABLE=true" -e "KAFKA_CREATE_TOPICS=test-topic:1:1:compact" -e "KAFKA_ADVERTISED_HOST_NAME=kafka" -e "KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181" -e "KAFKA_ADVERTISED_PORT=9092" --name kafka wurstmeister/kafka:latest
-printf "\n127.0.0.1 kafka\n"|sudo tee /etc/hosts >/dev/null
+docker run -d -p 9092:9092 --network kafka_network \
+ -e "KAFKA_AUTO_CREATE_TOPICS_ENABLE=true" \
+ -e "KAFKA_CREATE_TOPICS=test-topic:1:1:compact" \
+ -e "KAFKA_BROKER_ID=1" \
+ -e "KAFKA_ADVERTISED_HOST_NAME=kafka" \
+ -e "KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181/kafka" \
+ -e "KAFKA_ADVERTISED_PORT=9092" \
+ --name kafka wurstmeister/kafka:latest
-echo "Waiting for Kafka to be ready"
+if [ ${SKIP_OAUTH:-0} -ne 1 ]; then
+ docker run -d -p 29092:29092 --network kafka_network \
+ -e "KAFKA_AUTO_CREATE_TOPICS_ENABLE=true" \
+ -e "KAFKA_CREATE_TOPICS=test-topic:1:1:compact" \
+ -e "KAFKA_ADVERTISED_HOST_NAME=kafka_oauth2" \
+ -e "KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181/kafka_oauth2" \
+ -e "KAFKA_ADVERTISED_PORT=29092" \
+ -e "KAFKA_BROKER_ID=2" \
+ -e "KAFKA_LISTENERS=SASLPLAINTEXT://kafka_oauth2:29092" \
+ -e "KAFKA_ADVERTISED_LISTENERS=SASLPLAINTEXT://kafka_oauth2:29092" \
+ -e "KAFKA_LISTENER_NAME_SASLPLAINTEXT_OAUTHBEARER_SASL_JAAS_CONFIG=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required unsecuredValidatorRequiredScope=\"required-scope\" unsecuredLoginStringClaim_scope=\"required-scope\" unsecuredLoginStringClaim_sub=\"admin\";" \
+ -e "KAFKA_INTER_BROKER_LISTENER_NAME=SASLPLAINTEXT" \
+ -e "KAFKA_SASL_ENABLED_MECHANISMS=OAUTHBEARER" \
+ -e "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=SASLPLAINTEXT:SASL_PLAINTEXT" \
+ -e "KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL=OAUTHBEARER" \
+ --name kafka_oauth2 wurstmeister/kafka:latest
+fi
+
+printf "\n127.0.0.1 kafka\n127.0.0.1 kafka_oauth2\n"|sudo tee /etc/hosts >/dev/null
+
+echo "Waiting for Kafka services to be ready"
+
+kafka_ready=0
+kafka_oauth2_ready=0
for i in $(seq 1 20); do
- if kafkacat -b 127.0.0.1 -L; then
- echo "Kafka is ready"
- exit 0
+ if [ $kafka_ready -eq 0 ]; then
+ if kafkacat -b kafka:9092 -L -m 30 -d broker; then
+ kafka_ready=1
+ echo "Kafka is ready"
+ fi
+ fi
+ if [ $kafka_oauth2_ready -eq 0 ] && [ ${SKIP_OAUTH:-0} -ne 1 ]; then
+ if kafkacat -b kafka_oauth2:29092 \
+ -X security.protocol=SASL_PLAINTEXT \
+ -X sasl.mechanisms=OAUTHBEARER \
+ -X enable.sasl.oauthbearer.unsecure.jwt="true" \
+ -X sasl.oauthbearer.config="principal=admin scope=required-scope" -L
+ then
+ kafka_oauth2_ready=1
+ echo "Kafka OAuth2 is ready"
+ fi
+ fi
+
+ if [ $kafka_ready -eq 1 ] && ( [ $kafka_oauth2_ready -eq 1 ] || [ ${SKIP_OAUTH:-0} -eq 1 ] ); then
+ exit 0
fi
done
-echo "Timedout waiting for Kafka to be ready"
+echo "Timedout waiting for Kafka services to be ready"
exit 1
diff --git a/conf.c b/conf.c
index df3472b5..9aa8132f 100644
--- a/conf.c
+++ b/conf.c
@@ -81,6 +81,7 @@ static void kafka_conf_callback_copy(kafka_conf_callback **to, kafka_conf_callba
void kafka_conf_callbacks_copy(kafka_conf_callbacks *to, kafka_conf_callbacks *from) /* {{{ */
{
+ kafka_conf_callback_copy(&to->oauthbearer_token_refresh, from->oauthbearer_token_refresh);
kafka_conf_callback_copy(&to->error, from->error);
kafka_conf_callback_copy(&to->rebalance, from->rebalance);
kafka_conf_callback_copy(&to->dr_msg, from->dr_msg);
@@ -734,7 +735,7 @@ PHP_METHOD(RdKafka_Conf, setLogCb)
}
/* }}} */
-#ifdef HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB
+#ifdef HAS_RD_KAFKA_OAUTHBEARER
/* {{{ proto void RdKafka\Conf::setOauthbearerTokenRefreshCb(mixed $callback)
Set token refresh callback for OAUTHBEARER sasl */
PHP_METHOD(RdKafka_Conf, setOauthbearerTokenRefreshCb)
diff --git a/conf.stub.php b/conf.stub.php
index 7aa6a62d..d1ce97e9 100644
--- a/conf.stub.php
+++ b/conf.stub.php
@@ -45,7 +45,7 @@ public function setOffsetCommitCb(callable $callback): void {}
/** @tentative-return-type */
public function setLogCb(callable $callback): void {}
- #ifdef HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB
+ #ifdef HAS_RD_KAFKA_OAUTHBEARER
/** @tentative-return-type */
public function setOauthbearerTokenRefreshCb(callable $callback): void {}
#endif
diff --git a/conf_arginfo.h b/conf_arginfo.h
index 75d7a2da..d179aee1 100644
--- a/conf_arginfo.h
+++ b/conf_arginfo.h
@@ -1,5 +1,5 @@
/* This is a generated file, edit the .stub.php file instead.
- * Stub hash: 86934f54199c0af37cadfcedafeaffb569e33beb */
+ * Stub hash: a72d2e1796ed7f89185f543973c659a6a704f347 */
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_Conf___construct, 0, 0, 0)
ZEND_END_ARG_INFO()
@@ -48,7 +48,7 @@ ZEND_END_ARG_INFO()
#define arginfo_class_RdKafka_Conf_setLogCb arginfo_class_RdKafka_Conf_setErrorCb
-#if defined(HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB)
+#if defined(HAS_RD_KAFKA_OAUTHBEARER)
#if (PHP_VERSION_ID >= 80100)
ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_Conf_setOauthbearerTokenRefreshCb, 0, 1, IS_VOID, 0)
#else
@@ -83,7 +83,7 @@ ZEND_METHOD(RdKafka_Conf, setRebalanceCb);
ZEND_METHOD(RdKafka_Conf, setConsumeCb);
ZEND_METHOD(RdKafka_Conf, setOffsetCommitCb);
ZEND_METHOD(RdKafka_Conf, setLogCb);
-#if defined(HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB)
+#if defined(HAS_RD_KAFKA_OAUTHBEARER)
ZEND_METHOD(RdKafka_Conf, setOauthbearerTokenRefreshCb);
#endif
ZEND_METHOD(RdKafka_TopicConf, __construct);
@@ -101,7 +101,7 @@ static const zend_function_entry class_RdKafka_Conf_methods[] = {
ZEND_ME(RdKafka_Conf, setConsumeCb, arginfo_class_RdKafka_Conf_setConsumeCb, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_Conf, setOffsetCommitCb, arginfo_class_RdKafka_Conf_setOffsetCommitCb, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_Conf, setLogCb, arginfo_class_RdKafka_Conf_setLogCb, ZEND_ACC_PUBLIC)
-#if defined(HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB)
+#if defined(HAS_RD_KAFKA_OAUTHBEARER)
ZEND_ME(RdKafka_Conf, setOauthbearerTokenRefreshCb, arginfo_class_RdKafka_Conf_setOauthbearerTokenRefreshCb, ZEND_ACC_PUBLIC)
#endif
ZEND_FE_END
@@ -128,7 +128,11 @@ static zend_class_entry *register_class_RdKafka_Conf(void)
zend_class_entry ce, *class_entry;
INIT_NS_CLASS_ENTRY(ce, "RdKafka", "Conf", class_RdKafka_Conf_methods);
+#if (PHP_VERSION_ID >= 80400)
+ class_entry = zend_register_internal_class_with_flags(&ce, NULL, 0);
+#else
class_entry = zend_register_internal_class_ex(&ce, NULL);
+#endif
return class_entry;
}
@@ -138,7 +142,11 @@ static zend_class_entry *register_class_RdKafka_TopicConf(void)
zend_class_entry ce, *class_entry;
INIT_NS_CLASS_ENTRY(ce, "RdKafka", "TopicConf", class_RdKafka_TopicConf_methods);
+#if (PHP_VERSION_ID >= 80400)
+ class_entry = zend_register_internal_class_with_flags(&ce, NULL, 0);
+#else
class_entry = zend_register_internal_class_ex(&ce, NULL);
+#endif
return class_entry;
}
diff --git a/conf_legacy_arginfo.h b/conf_legacy_arginfo.h
index 5601e9d6..72305d3b 100644
--- a/conf_legacy_arginfo.h
+++ b/conf_legacy_arginfo.h
@@ -1,5 +1,5 @@
/* This is a generated file, edit the .stub.php file instead.
- * Stub hash: 86934f54199c0af37cadfcedafeaffb569e33beb */
+ * Stub hash: a72d2e1796ed7f89185f543973c659a6a704f347 */
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_Conf___construct, 0, 0, 0)
ZEND_END_ARG_INFO()
@@ -32,7 +32,7 @@ ZEND_END_ARG_INFO()
#define arginfo_class_RdKafka_Conf_setLogCb arginfo_class_RdKafka_Conf_setErrorCb
-#if defined(HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB)
+#if defined(HAS_RD_KAFKA_OAUTHBEARER)
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_Conf_setOauthbearerTokenRefreshCb, 0, 0, 1)
ZEND_ARG_INFO(0, callback)
ZEND_END_ARG_INFO()
@@ -59,7 +59,7 @@ ZEND_METHOD(RdKafka_Conf, setRebalanceCb);
ZEND_METHOD(RdKafka_Conf, setConsumeCb);
ZEND_METHOD(RdKafka_Conf, setOffsetCommitCb);
ZEND_METHOD(RdKafka_Conf, setLogCb);
-#if defined(HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB)
+#if defined(HAS_RD_KAFKA_OAUTHBEARER)
ZEND_METHOD(RdKafka_Conf, setOauthbearerTokenRefreshCb);
#endif
ZEND_METHOD(RdKafka_TopicConf, __construct);
@@ -77,7 +77,7 @@ static const zend_function_entry class_RdKafka_Conf_methods[] = {
ZEND_ME(RdKafka_Conf, setConsumeCb, arginfo_class_RdKafka_Conf_setConsumeCb, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_Conf, setOffsetCommitCb, arginfo_class_RdKafka_Conf_setOffsetCommitCb, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_Conf, setLogCb, arginfo_class_RdKafka_Conf_setLogCb, ZEND_ACC_PUBLIC)
-#if defined(HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB)
+#if defined(HAS_RD_KAFKA_OAUTHBEARER)
ZEND_ME(RdKafka_Conf, setOauthbearerTokenRefreshCb, arginfo_class_RdKafka_Conf_setOauthbearerTokenRefreshCb, ZEND_ACC_PUBLIC)
#endif
ZEND_FE_END
@@ -104,7 +104,11 @@ static zend_class_entry *register_class_RdKafka_Conf(void)
zend_class_entry ce, *class_entry;
INIT_NS_CLASS_ENTRY(ce, "RdKafka", "Conf", class_RdKafka_Conf_methods);
+#if (PHP_VERSION_ID >= 80400)
+ class_entry = zend_register_internal_class_with_flags(&ce, NULL, 0);
+#else
class_entry = zend_register_internal_class_ex(&ce, NULL);
+#endif
return class_entry;
}
@@ -114,7 +118,11 @@ static zend_class_entry *register_class_RdKafka_TopicConf(void)
zend_class_entry ce, *class_entry;
INIT_NS_CLASS_ENTRY(ce, "RdKafka", "TopicConf", class_RdKafka_TopicConf_methods);
+#if (PHP_VERSION_ID >= 80400)
+ class_entry = zend_register_internal_class_with_flags(&ce, NULL, 0);
+#else
class_entry = zend_register_internal_class_ex(&ce, NULL);
+#endif
return class_entry;
}
diff --git a/config.m4 b/config.m4
index 17c9c32e..356a1df3 100644
--- a/config.m4
+++ b/config.m4
@@ -94,10 +94,10 @@ if test "$PHP_RDKAFKA" != "no"; then
AC_MSG_WARN([murmur2 partitioner is not available])
])
- AC_CHECK_LIB($LIBNAME,[rd_kafka_conf_set_oauthbearer_token_refresh_cb],[
- AC_DEFINE(HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB,1,[ ])
+ AC_CHECK_LIB($LIBNAME,[rd_kafka_oauthbearer_set_token],[
+ AC_DEFINE(HAS_RD_KAFKA_OAUTHBEARER,1,[ ])
],[
- AC_MSG_WARN([oauthbearer token refresh cb is not available])
+ AC_MSG_WARN([oauthbearer support is not available])
])
AC_CHECK_LIB($LIBNAME,[rd_kafka_incremental_assign, rd_kafka_incremental_unassign],[
diff --git a/package.xml b/package.xml
index 16957120..7ac09576 100644
--- a/package.xml
+++ b/package.xml
@@ -132,6 +132,7 @@
+
diff --git a/rdkafka.c b/rdkafka.c
index 5d9636e1..4029409a 100644
--- a/rdkafka.c
+++ b/rdkafka.c
@@ -431,6 +431,145 @@ PHP_METHOD(RdKafka, setLogLevel)
}
/* }}} */
+#ifdef HAS_RD_KAFKA_OAUTHBEARER
+/* {{{ proto void RdKafka::oauthbearerSetToken(string $token_value, int $lifetime_ms, string $principal_name, array $extensions = [])
+ * Set SASL/OAUTHBEARER token and metadata
+ *
+ * The SASL/OAUTHBEARER token refresh callback or event handler should cause
+ * this method to be invoked upon success, via
+ * $kafka->oauthbearerSetToken(). The extension keys must not include the
+ * reserved key "`auth`", and all extension keys and values must conform to the
+ * required format as per https://tools.ietf.org/html/rfc7628#section-3.1:
+ *
+ * key = 1*(ALPHA)
+ * value = *(VCHAR / SP / HTAB / CR / LF )
+*/
+PHP_METHOD(RdKafka, oauthbearerSetToken)
+{
+ kafka_object *intern;
+ char *token_value;
+ size_t token_value_len;
+ zend_long lifetime_ms;
+ char *principal_name;
+ size_t principal_len;
+ HashTable *extensions_hash = NULL;
+
+ char errstr[512];
+ rd_kafka_resp_err_t ret = 0;
+
+ if (zend_parse_parameters(ZEND_NUM_ARGS(), "sls|h", &token_value, &token_value_len, &lifetime_ms, &principal_name, &principal_len, &extensions_hash) == FAILURE) {
+ return;
+ }
+
+ intern = get_kafka_object(getThis());
+ if (!intern) {
+ return;
+ }
+
+ errstr[0] = '\0';
+
+ int extensions_size = 0;
+ char **extensions = NULL;
+
+ if (extensions_hash != NULL) {
+ extensions_size = zend_hash_num_elements(extensions_hash) * 2;
+ extensions = safe_emalloc((extensions_size * 2), sizeof(char *), 0);
+
+ int pos = 0;
+ zend_ulong num_key;
+ zend_string *extension_key_str;
+ zval *extension_zval;
+ ZEND_HASH_FOREACH_KEY_VAL(extensions_hash, num_key, extension_key_str, extension_zval) {
+ if (!extension_key_str) {
+ extension_key_str = zend_long_to_str(num_key);
+ extensions[pos++] = estrdup(ZSTR_VAL(extension_key_str));
+ zend_string_release(extension_key_str);
+ } else {
+ extensions[pos++] = estrdup(ZSTR_VAL(extension_key_str));
+ }
+
+ zend_string *tmp_extension_val_str;
+ zend_string *extension_val_str = zval_get_tmp_string(extension_zval, &tmp_extension_val_str);
+ extensions[pos++] = estrdup(ZSTR_VAL(extension_val_str));
+ if (tmp_extension_val_str) {
+ zend_string_release(tmp_extension_val_str);
+ }
+ } ZEND_HASH_FOREACH_END();
+ }
+
+ ret = rd_kafka_oauthbearer_set_token(
+ intern->rk,
+ token_value,
+ lifetime_ms,
+ principal_name,
+ (const char **)extensions,
+ extensions_size,
+ errstr,
+ sizeof(errstr));
+
+ if (extensions != NULL) {
+ for (int i = 0; i < extensions_size; i++) {
+ efree(extensions[i]);
+ }
+ efree(extensions);
+ }
+
+ switch (ret) {
+ case RD_KAFKA_RESP_ERR__INVALID_ARG:
+ zend_throw_exception(ce_kafka_exception, errstr, RD_KAFKA_RESP_ERR__INVALID_ARG);
+ return;
+ case RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED:
+ zend_throw_exception(ce_kafka_exception, errstr, RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED);
+ return;
+ case RD_KAFKA_RESP_ERR__STATE:
+ zend_throw_exception(ce_kafka_exception, errstr, RD_KAFKA_RESP_ERR__STATE);
+ return;
+ case RD_KAFKA_RESP_ERR_NO_ERROR:
+ break;
+ default:
+ return;
+ }
+}
+/* }}} */
+
+/* {{{ proto void RdKafka::oauthbearerSetTokenFailure(string $error)
+ The SASL/OAUTHBEARER token refresh callback or event handler should cause
+ this method to be invoked upon failure, via
+ rd_kafka_oauthbearer_set_token_failure().
+*/
+PHP_METHOD(RdKafka, oauthbearerSetTokenFailure)
+{
+ kafka_object *intern;
+ const char *errstr;
+ size_t errstr_len;
+
+ if (zend_parse_parameters(ZEND_NUM_ARGS(), "s", &errstr, &errstr_len) == FAILURE) {
+ return;
+ }
+
+ intern = get_kafka_object(getThis());
+ if (!intern) {
+ return;
+ }
+
+ rd_kafka_resp_err_t ret = rd_kafka_oauthbearer_set_token_failure(intern->rk, errstr);
+
+ switch (ret) {
+ case RD_KAFKA_RESP_ERR__INVALID_ARG:
+ zend_throw_exception(ce_kafka_exception, NULL, RD_KAFKA_RESP_ERR__INVALID_ARG);
+ return;
+ case RD_KAFKA_RESP_ERR__STATE:
+ zend_throw_exception(ce_kafka_exception, NULL, RD_KAFKA_RESP_ERR__STATE);
+ return;
+ case RD_KAFKA_RESP_ERR_NO_ERROR:
+ break;
+ default:
+ return;
+ }
+}
+/* }}} */
+#endif
+
/* {{{ proto RdKafka\Topic RdKafka::newTopic(string $topic)
Returns an RdKafka\Topic object */
PHP_METHOD(RdKafka, newTopic)
diff --git a/rdkafka.stub.php b/rdkafka.stub.php
index 26601ac5..3d99ef22 100644
--- a/rdkafka.stub.php
+++ b/rdkafka.stub.php
@@ -79,6 +79,14 @@ public function pausePartitions(array $topic_partitions): array {}
/** @tentative-return-type */
public function resumePartitions(array $topic_partitions): array {}
+
+ #ifdef HAS_RD_KAFKA_OAUTHBEARER
+ /** @tentative-return-type */
+ public function oauthbearerSetToken(string $token_value, int $lifetime_ms, string $principal_name, array $extensions = []): void {}
+
+ /** @tentative-return-type */
+ public function oauthbearerSetTokenFailure(string $error): void {}
+ #endif
}
}
diff --git a/rdkafka_arginfo.h b/rdkafka_arginfo.h
index 220d6da3..0e4fe52e 100644
--- a/rdkafka_arginfo.h
+++ b/rdkafka_arginfo.h
@@ -1,5 +1,5 @@
/* This is a generated file, edit the .stub.php file instead.
- * Stub hash: 2d70b7756ae39db557148a4c65fc6bc5c164b102 */
+ * Stub hash: ea957a110b42c19bcb4a244655c1eaf99a1e3961 */
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka___construct, 0, 0, 0)
ZEND_END_ARG_INFO()
@@ -119,6 +119,27 @@ ZEND_END_ARG_INFO()
#define arginfo_class_RdKafka_resumePartitions arginfo_class_RdKafka_pausePartitions
+#if defined(HAS_RD_KAFKA_OAUTHBEARER)
+#if (PHP_VERSION_ID >= 80100)
+ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_oauthbearerSetToken, 0, 3, IS_VOID, 0)
+#else
+ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_oauthbearerSetToken, 0, 0, 3)
+#endif
+ ZEND_ARG_TYPE_INFO(0, token_value, IS_STRING, 0)
+ ZEND_ARG_TYPE_INFO(0, lifetime_ms, IS_LONG, 0)
+ ZEND_ARG_TYPE_INFO(0, principal_name, IS_STRING, 0)
+ ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(0, extensions, IS_ARRAY, 0, "[]")
+ZEND_END_ARG_INFO()
+
+#if (PHP_VERSION_ID >= 80100)
+ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_oauthbearerSetTokenFailure, 0, 1, IS_VOID, 0)
+#else
+ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_oauthbearerSetTokenFailure, 0, 0, 1)
+#endif
+ ZEND_ARG_TYPE_INFO(0, error, IS_STRING, 0)
+ZEND_END_ARG_INFO()
+#endif
+
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_Consumer___construct, 0, 0, 0)
ZEND_ARG_OBJ_INFO_WITH_DEFAULT_VALUE(0, conf, RdKafka\\Conf, 1, "null")
ZEND_END_ARG_INFO()
@@ -140,22 +161,16 @@ ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_Producer_initTransactions, 0, 0, 1)
#endif
ZEND_ARG_TYPE_INFO(0, timeout_ms, IS_LONG, 0)
ZEND_END_ARG_INFO()
-#endif
-#if defined(HAS_RD_KAFKA_TRANSACTIONS)
#if (PHP_VERSION_ID >= 80100)
ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_Producer_beginTransaction, 0, 0, IS_VOID, 0)
#else
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_Producer_beginTransaction, 0, 0, 0)
#endif
ZEND_END_ARG_INFO()
-#endif
-#if defined(HAS_RD_KAFKA_TRANSACTIONS)
#define arginfo_class_RdKafka_Producer_commitTransaction arginfo_class_RdKafka_Producer_initTransactions
-#endif
-#if defined(HAS_RD_KAFKA_TRANSACTIONS)
#define arginfo_class_RdKafka_Producer_abortTransaction arginfo_class_RdKafka_Producer_initTransactions
#endif
@@ -178,19 +193,17 @@ ZEND_METHOD(RdKafka, queryWatermarkOffsets);
ZEND_METHOD(RdKafka, offsetsForTimes);
ZEND_METHOD(RdKafka, pausePartitions);
ZEND_METHOD(RdKafka, resumePartitions);
+#if defined(HAS_RD_KAFKA_OAUTHBEARER)
+ZEND_METHOD(RdKafka, oauthbearerSetToken);
+ZEND_METHOD(RdKafka, oauthbearerSetTokenFailure);
+#endif
ZEND_METHOD(RdKafka_Consumer, __construct);
ZEND_METHOD(RdKafka_Consumer, newQueue);
ZEND_METHOD(RdKafka_Producer, __construct);
#if defined(HAS_RD_KAFKA_TRANSACTIONS)
ZEND_METHOD(RdKafka_Producer, initTransactions);
-#endif
-#if defined(HAS_RD_KAFKA_TRANSACTIONS)
ZEND_METHOD(RdKafka_Producer, beginTransaction);
-#endif
-#if defined(HAS_RD_KAFKA_TRANSACTIONS)
ZEND_METHOD(RdKafka_Producer, commitTransaction);
-#endif
-#if defined(HAS_RD_KAFKA_TRANSACTIONS)
ZEND_METHOD(RdKafka_Producer, abortTransaction);
#endif
@@ -224,10 +237,10 @@ static const zend_function_entry class_RdKafka_methods[] = {
ZEND_ME(RdKafka, offsetsForTimes, arginfo_class_RdKafka_offsetsForTimes, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka, pausePartitions, arginfo_class_RdKafka_pausePartitions, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka, resumePartitions, arginfo_class_RdKafka_resumePartitions, ZEND_ACC_PUBLIC)
- ZEND_FE_END
-};
-
-static const zend_function_entry class_RdKafka_Exception_methods[] = {
+#if defined(HAS_RD_KAFKA_OAUTHBEARER)
+ ZEND_ME(RdKafka, oauthbearerSetToken, arginfo_class_RdKafka_oauthbearerSetToken, ZEND_ACC_PUBLIC)
+ ZEND_ME(RdKafka, oauthbearerSetTokenFailure, arginfo_class_RdKafka_oauthbearerSetTokenFailure, ZEND_ACC_PUBLIC)
+#endif
ZEND_FE_END
};
@@ -241,14 +254,8 @@ static const zend_function_entry class_RdKafka_Producer_methods[] = {
ZEND_ME(RdKafka_Producer, __construct, arginfo_class_RdKafka_Producer___construct, ZEND_ACC_PUBLIC)
#if defined(HAS_RD_KAFKA_TRANSACTIONS)
ZEND_ME(RdKafka_Producer, initTransactions, arginfo_class_RdKafka_Producer_initTransactions, ZEND_ACC_PUBLIC)
-#endif
-#if defined(HAS_RD_KAFKA_TRANSACTIONS)
ZEND_ME(RdKafka_Producer, beginTransaction, arginfo_class_RdKafka_Producer_beginTransaction, ZEND_ACC_PUBLIC)
-#endif
-#if defined(HAS_RD_KAFKA_TRANSACTIONS)
ZEND_ME(RdKafka_Producer, commitTransaction, arginfo_class_RdKafka_Producer_commitTransaction, ZEND_ACC_PUBLIC)
-#endif
-#if defined(HAS_RD_KAFKA_TRANSACTIONS)
ZEND_ME(RdKafka_Producer, abortTransaction, arginfo_class_RdKafka_Producer_abortTransaction, ZEND_ACC_PUBLIC)
#endif
ZEND_FE_END
@@ -259,8 +266,12 @@ static zend_class_entry *register_class_RdKafka(void)
zend_class_entry ce, *class_entry;
INIT_CLASS_ENTRY(ce, "RdKafka", class_RdKafka_methods);
+#if (PHP_VERSION_ID >= 80400)
+ class_entry = zend_register_internal_class_with_flags(&ce, NULL, ZEND_ACC_ABSTRACT);
+#else
class_entry = zend_register_internal_class_ex(&ce, NULL);
class_entry->ce_flags |= ZEND_ACC_ABSTRACT;
+#endif
zval property_error_cb_default_value;
ZVAL_UNDEF(&property_error_cb_default_value);
@@ -281,8 +292,12 @@ static zend_class_entry *register_class_RdKafka_Exception(zend_class_entry *clas
{
zend_class_entry ce, *class_entry;
- INIT_NS_CLASS_ENTRY(ce, "RdKafka", "Exception", class_RdKafka_Exception_methods);
+ INIT_NS_CLASS_ENTRY(ce, "RdKafka", "Exception", NULL);
+#if (PHP_VERSION_ID >= 80400)
+ class_entry = zend_register_internal_class_with_flags(&ce, class_entry_Exception, 0);
+#else
class_entry = zend_register_internal_class_ex(&ce, class_entry_Exception);
+#endif
return class_entry;
}
@@ -292,7 +307,11 @@ static zend_class_entry *register_class_RdKafka_Consumer(zend_class_entry *class
zend_class_entry ce, *class_entry;
INIT_NS_CLASS_ENTRY(ce, "RdKafka", "Consumer", class_RdKafka_Consumer_methods);
+#if (PHP_VERSION_ID >= 80400)
+ class_entry = zend_register_internal_class_with_flags(&ce, class_entry_RdKafka, 0);
+#else
class_entry = zend_register_internal_class_ex(&ce, class_entry_RdKafka);
+#endif
return class_entry;
}
@@ -302,7 +321,11 @@ static zend_class_entry *register_class_RdKafka_Producer(zend_class_entry *class
zend_class_entry ce, *class_entry;
INIT_NS_CLASS_ENTRY(ce, "RdKafka", "Producer", class_RdKafka_Producer_methods);
+#if (PHP_VERSION_ID >= 80400)
+ class_entry = zend_register_internal_class_with_flags(&ce, class_entry_RdKafka, 0);
+#else
class_entry = zend_register_internal_class_ex(&ce, class_entry_RdKafka);
+#endif
return class_entry;
}
diff --git a/rdkafka_legacy_arginfo.h b/rdkafka_legacy_arginfo.h
index 5fcb7263..49f594b4 100644
--- a/rdkafka_legacy_arginfo.h
+++ b/rdkafka_legacy_arginfo.h
@@ -1,5 +1,5 @@
/* This is a generated file, edit the .stub.php file instead.
- * Stub hash: 2d70b7756ae39db557148a4c65fc6bc5c164b102 */
+ * Stub hash: ea957a110b42c19bcb4a244655c1eaf99a1e3961 */
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka___construct, 0, 0, 0)
ZEND_END_ARG_INFO()
@@ -71,6 +71,19 @@ ZEND_END_ARG_INFO()
#define arginfo_class_RdKafka_resumePartitions arginfo_class_RdKafka_pausePartitions
+#if defined(HAS_RD_KAFKA_OAUTHBEARER)
+ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_oauthbearerSetToken, 0, 0, 3)
+ ZEND_ARG_INFO(0, token_value)
+ ZEND_ARG_INFO(0, lifetime_ms)
+ ZEND_ARG_INFO(0, principal_name)
+ ZEND_ARG_INFO(0, extensions)
+ZEND_END_ARG_INFO()
+
+ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_oauthbearerSetTokenFailure, 0, 0, 1)
+ ZEND_ARG_INFO(0, error)
+ZEND_END_ARG_INFO()
+#endif
+
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_Consumer___construct, 0, 0, 0)
ZEND_ARG_INFO(0, conf)
ZEND_END_ARG_INFO()
@@ -83,18 +96,12 @@ ZEND_END_ARG_INFO()
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_Producer_initTransactions, 0, 0, 1)
ZEND_ARG_INFO(0, timeout_ms)
ZEND_END_ARG_INFO()
-#endif
-#if defined(HAS_RD_KAFKA_TRANSACTIONS)
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_Producer_beginTransaction, 0, 0, 0)
ZEND_END_ARG_INFO()
-#endif
-#if defined(HAS_RD_KAFKA_TRANSACTIONS)
#define arginfo_class_RdKafka_Producer_commitTransaction arginfo_class_RdKafka_Producer_initTransactions
-#endif
-#if defined(HAS_RD_KAFKA_TRANSACTIONS)
#define arginfo_class_RdKafka_Producer_abortTransaction arginfo_class_RdKafka_Producer_initTransactions
#endif
@@ -117,19 +124,17 @@ ZEND_METHOD(RdKafka, queryWatermarkOffsets);
ZEND_METHOD(RdKafka, offsetsForTimes);
ZEND_METHOD(RdKafka, pausePartitions);
ZEND_METHOD(RdKafka, resumePartitions);
+#if defined(HAS_RD_KAFKA_OAUTHBEARER)
+ZEND_METHOD(RdKafka, oauthbearerSetToken);
+ZEND_METHOD(RdKafka, oauthbearerSetTokenFailure);
+#endif
ZEND_METHOD(RdKafka_Consumer, __construct);
ZEND_METHOD(RdKafka_Consumer, newQueue);
ZEND_METHOD(RdKafka_Producer, __construct);
#if defined(HAS_RD_KAFKA_TRANSACTIONS)
ZEND_METHOD(RdKafka_Producer, initTransactions);
-#endif
-#if defined(HAS_RD_KAFKA_TRANSACTIONS)
ZEND_METHOD(RdKafka_Producer, beginTransaction);
-#endif
-#if defined(HAS_RD_KAFKA_TRANSACTIONS)
ZEND_METHOD(RdKafka_Producer, commitTransaction);
-#endif
-#if defined(HAS_RD_KAFKA_TRANSACTIONS)
ZEND_METHOD(RdKafka_Producer, abortTransaction);
#endif
@@ -163,10 +168,10 @@ static const zend_function_entry class_RdKafka_methods[] = {
ZEND_ME(RdKafka, offsetsForTimes, arginfo_class_RdKafka_offsetsForTimes, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka, pausePartitions, arginfo_class_RdKafka_pausePartitions, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka, resumePartitions, arginfo_class_RdKafka_resumePartitions, ZEND_ACC_PUBLIC)
- ZEND_FE_END
-};
-
-static const zend_function_entry class_RdKafka_Exception_methods[] = {
+#if defined(HAS_RD_KAFKA_OAUTHBEARER)
+ ZEND_ME(RdKafka, oauthbearerSetToken, arginfo_class_RdKafka_oauthbearerSetToken, ZEND_ACC_PUBLIC)
+ ZEND_ME(RdKafka, oauthbearerSetTokenFailure, arginfo_class_RdKafka_oauthbearerSetTokenFailure, ZEND_ACC_PUBLIC)
+#endif
ZEND_FE_END
};
@@ -180,14 +185,8 @@ static const zend_function_entry class_RdKafka_Producer_methods[] = {
ZEND_ME(RdKafka_Producer, __construct, arginfo_class_RdKafka_Producer___construct, ZEND_ACC_PUBLIC)
#if defined(HAS_RD_KAFKA_TRANSACTIONS)
ZEND_ME(RdKafka_Producer, initTransactions, arginfo_class_RdKafka_Producer_initTransactions, ZEND_ACC_PUBLIC)
-#endif
-#if defined(HAS_RD_KAFKA_TRANSACTIONS)
ZEND_ME(RdKafka_Producer, beginTransaction, arginfo_class_RdKafka_Producer_beginTransaction, ZEND_ACC_PUBLIC)
-#endif
-#if defined(HAS_RD_KAFKA_TRANSACTIONS)
ZEND_ME(RdKafka_Producer, commitTransaction, arginfo_class_RdKafka_Producer_commitTransaction, ZEND_ACC_PUBLIC)
-#endif
-#if defined(HAS_RD_KAFKA_TRANSACTIONS)
ZEND_ME(RdKafka_Producer, abortTransaction, arginfo_class_RdKafka_Producer_abortTransaction, ZEND_ACC_PUBLIC)
#endif
ZEND_FE_END
@@ -198,8 +197,12 @@ static zend_class_entry *register_class_RdKafka(void)
zend_class_entry ce, *class_entry;
INIT_CLASS_ENTRY(ce, "RdKafka", class_RdKafka_methods);
+#if (PHP_VERSION_ID >= 80400)
+ class_entry = zend_register_internal_class_with_flags(&ce, NULL, ZEND_ACC_ABSTRACT);
+#else
class_entry = zend_register_internal_class_ex(&ce, NULL);
class_entry->ce_flags |= ZEND_ACC_ABSTRACT;
+#endif
zval property_error_cb_default_value;
ZVAL_NULL(&property_error_cb_default_value);
@@ -220,8 +223,12 @@ static zend_class_entry *register_class_RdKafka_Exception(zend_class_entry *clas
{
zend_class_entry ce, *class_entry;
- INIT_NS_CLASS_ENTRY(ce, "RdKafka", "Exception", class_RdKafka_Exception_methods);
+ INIT_NS_CLASS_ENTRY(ce, "RdKafka", "Exception", NULL);
+#if (PHP_VERSION_ID >= 80400)
+ class_entry = zend_register_internal_class_with_flags(&ce, class_entry_Exception, 0);
+#else
class_entry = zend_register_internal_class_ex(&ce, class_entry_Exception);
+#endif
return class_entry;
}
@@ -231,7 +238,11 @@ static zend_class_entry *register_class_RdKafka_Consumer(zend_class_entry *class
zend_class_entry ce, *class_entry;
INIT_NS_CLASS_ENTRY(ce, "RdKafka", "Consumer", class_RdKafka_Consumer_methods);
+#if (PHP_VERSION_ID >= 80400)
+ class_entry = zend_register_internal_class_with_flags(&ce, class_entry_RdKafka, 0);
+#else
class_entry = zend_register_internal_class_ex(&ce, class_entry_RdKafka);
+#endif
return class_entry;
}
@@ -241,7 +252,11 @@ static zend_class_entry *register_class_RdKafka_Producer(zend_class_entry *class
zend_class_entry ce, *class_entry;
INIT_NS_CLASS_ENTRY(ce, "RdKafka", "Producer", class_RdKafka_Producer_methods);
+#if (PHP_VERSION_ID >= 80400)
+ class_entry = zend_register_internal_class_with_flags(&ce, class_entry_RdKafka, 0);
+#else
class_entry = zend_register_internal_class_ex(&ce, class_entry_RdKafka);
+#endif
return class_entry;
}
diff --git a/tests/controller_id.phpt b/tests/controller_id.phpt
index af7e95f0..b9724804 100644
--- a/tests/controller_id.phpt
+++ b/tests/controller_id.phpt
@@ -20,6 +20,6 @@ $conf->set('group.id', 'test');
echo (new RdKafka\KafkaConsumer($conf))->getControllerId(10*1000) . \PHP_EOL;
--EXPECT--
-1001
-1001
-1001
\ No newline at end of file
+1
+1
+1
\ No newline at end of file
diff --git a/tests/oauthbearer_integration.phpt b/tests/oauthbearer_integration.phpt
new file mode 100644
index 00000000..b95de0f6
--- /dev/null
+++ b/tests/oauthbearer_integration.phpt
@@ -0,0 +1,151 @@
+--TEST--
+Oauthbearer
+--SKIPIF--
+= 0x01010000 || die("skip librdkafka too old does not support oauthbearer");
+--FILE--
+ $jws,
+ 'principal' => $principal,
+ 'expiryMs' => $expiryMs,
+ ];
+}
+
+// Set up tests
+$conf = new RdKafka\Conf();
+if (RD_KAFKA_VERSION >= 0x090000 && false !== getenv('TEST_KAFKA_BROKER_VERSION')) {
+ $conf->set('broker.version.fallback', getenv('TEST_KAFKA_BROKER_VERSION'));
+}
+$conf->set('metadata.broker.list', getenv('TEST_KAFKA_OAUTH_BROKERS'));
+$conf->set('security.protocol', 'SASL_PLAINTEXT');
+$conf->set('sasl.mechanisms', 'OAUTHBEARER');
+$conf->set('sasl.oauthbearer.config', 'principal=admin');
+$conf->setLogCb(function ($kafka, $level, $facility, $message) {});
+$conf->setErrorCb(function ($producer, $err, $errstr) {
+ printf("%s: %s\n", rd_kafka_err2str($err), $errstr);
+});
+
+// Test that refresh token with setting token accurately will succeed when getting metadata
+$conf->setOauthbearerTokenRefreshCb(function ($producer) {
+ echo "Refreshing token and succeeding\n";
+ $token = generateJws();
+ $producer->oauthbearerSetToken($token['value'], $token['expiryMs'], $token['principal']);
+});
+$producer = new \RdKafka\Producer($conf);
+$producer->poll(0);
+$topicName = sprintf("test_rdkafka_%s", uniqid());
+$topic = $producer->newTopic($topicName);
+
+try {
+ $producer->getMetadata(false, $topic, 10*1000);
+ echo "Metadata retrieved successfully when refresh callback set token\n";
+} catch (\RdKafka\Exception $e) {
+ echo "FAIL: Caught exception when getting metadata after successfully refreshing any token\n";
+}
+
+// Test that refresh token with setting token failure will fail when getting metadata
+$conf->setOauthbearerTokenRefreshCb(function ($producer) {
+ echo "Setting token failure in refresh cb\n";
+ $producer->oauthbearerSetTokenFailure('Token failure before getting metadata');
+ $producer->poll(0);
+});
+$producer = new \RdKafka\Producer($conf);
+$producer->poll(0);
+$topicName = sprintf("test_rdkafka_%s", uniqid());
+$topic = $producer->newTopic($topicName);
+try {
+ $producer->getMetadata(false, $topic, 10*1000);
+ echo "FAIL: Did not catch exception after not setting or refreshing any token\n";
+} catch (\RdKafka\Exception $e) {
+ echo "Caught exception when getting metadata after not setting or refreshing any token\n";
+}
+
+// Test that setting token without refreshing will get metadata successfully
+$conf->setOauthbearerTokenRefreshCb(function ($producer) {});
+$producer = new \RdKafka\Producer($conf);
+$token = generateJws();
+$producer->oauthbearerSetToken($token['value'], $token['expiryMs'], $token['principal']);
+$topicName = sprintf("test_rdkafka_%s", uniqid());
+$topic = $producer->newTopic($topicName);
+try {
+ $producer->getMetadata(false, $topic, 10*1000);
+ echo "Got metadata successfully\n";
+} catch (\RdKafka\Exception $e) {
+ echo "FAIL: Set token but still got exception \n";
+ exit;
+}
+
+// Test that token refresh is called after token expires
+$conf->setOauthbearerTokenRefreshCb(function ($producer) {
+ echo "Refreshing token\n";
+});
+$producer = new \RdKafka\Producer($conf);
+$token = generateJws('required-scope', 5);
+$producer->oauthbearerSetToken($token['value'], $token['expiryMs'], $token['principal']);
+$producer->poll(0);
+echo "Polled with refresh\n";
+sleep(1);
+$producer->poll(0);
+echo "Polled without refresh\n";
+sleep(4);
+$producer->poll(0);
+echo "Polled with refresh\n";
+
+// Test that tokens without required scope fail
+$producer = new \RdKafka\Producer($conf);
+$token = generateJws('not-required-scope');
+$producer->oauthbearerSetToken($token['value'], $token['expiryMs'], $token['principal']);
+$topicName = sprintf("test_rdkafka_%s", uniqid());
+$topic = $producer->newTopic($topicName);
+try {
+ $producer->getMetadata(false, $topic, 10*1000);
+ echo "FAIL: Exception not thrown as expected when using insufficient scope\n";
+ exit;
+} catch (\RdKafka\Exception $e) {
+ echo "Caught expected exception with insufficient_scope\n";
+}
+
+// Test that setting token with extensions succeeds
+$conf->setOauthbearerTokenRefreshCb(function ($producer) {});
+$producer = new \RdKafka\Producer($conf);
+$token = generateJws();
+$producer->oauthbearerSetToken($token['value'], $token['expiryMs'], $token['principal'], ['testExtensionKey' => 'Test extension value']);
+$producer->poll(0);
+
+--EXPECT--
+Refreshing token and succeeding
+Metadata retrieved successfully when refresh callback set token
+Setting token failure in refresh cb
+Local: Authentication failure: Failed to acquire SASL OAUTHBEARER token: Token failure before getting metadata
+Caught exception when getting metadata after not setting or refreshing any token
+Got metadata successfully
+Refreshing token
+Polled with refresh
+Polled without refresh
+Refreshing token
+Polled with refresh
+Caught expected exception with insufficient_scope
diff --git a/tests/test_env.php.sample b/tests/test_env.php.sample
index e43476aa..0d6be87a 100644
--- a/tests/test_env.php.sample
+++ b/tests/test_env.php.sample
@@ -4,6 +4,10 @@ if (false === getenv('TEST_KAFKA_BROKERS')) {
putenv('TEST_KAFKA_BROKERS=localhost:9092');
}
+if (false === getenv('TEST_KAFKA_OAUTH_BROKERS')) {
+ putenv('TEST_KAFKA_OAUTH_BROKERS=kafka_oauth2:29092');
+}
+
if (false === getenv('TEST_KAFKA_BROKER_VERSION')) {
putenv('TEST_KAFKA_BROKER_VERSION=2.3');
}