Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
ce6d3e9
feat: add support for oauthbearer_set_token/set_token_failure
cb-freddysart Jan 10, 2024
c0a98e7
fix: back out unrelated arginfo changes
cb-freddysart Jan 10, 2024
ee02108
fix: further arginfo fixes
cb-freddysart Jan 10, 2024
ad773d0
fix: handle strict errors
cb-freddysart Jan 10, 2024
12cb781
fix: remove comment
cb-freddysart Jan 15, 2024
410bccb
fix: mem leak around extensions
cb-freddysart Jan 18, 2024
8441345
fix: add length param for errstr
cb-freddysart Jan 18, 2024
363a9e9
fix: remove comment
cb-freddysart Jan 18, 2024
9d9e823
fix: swap to h, rename extension_size to extensions_size for consistency
cb-freddysart Jan 18, 2024
41c5d1f
fix: const cast
cb-freddysart Jan 18, 2024
4d3f493
Implement integration tests for oauth token methods
scorgn Oct 13, 2024
edead1c
Fix typo in kafka_ready
scorgn Oct 13, 2024
b92336b
Skip oauthbearer tests and setup steps if librdkafka version does not…
scorgn Oct 13, 2024
b86825b
Ensure tests compatible with all php versions
scorgn Oct 13, 2024
42d3110
Fix RD_KAFKA_VERSION comparison
scorgn Oct 14, 2024
9079b9a
Remove usage of json_encode as we don't have access to json extension
scorgn Oct 14, 2024
28cc1cc
Add newline to end of test file
scorgn Oct 14, 2024
2b859d6
Change test name to match contents
scorgn Oct 14, 2024
2a78a6c
Change test name to match contents
scorgn Oct 14, 2024
73acf80
Merge remote-tracking branch 'origin/6.x' into feat/oauthbearer-set-t…
scorgn Oct 14, 2024
32e3dfb
Change controller_id test to use 1 instead of 1001
scorgn Oct 14, 2024
be78aa0
Merge pull request #2 from scorgn/feat/oauthbearer-set-token-solve-co…
cb-freddysart Oct 15, 2024
82e320c
Merge branch 'feat/oauthbearer-set-token' of github.com:cb-freddysart…
cb-freddysart Oct 15, 2024
da58f00
feat: bump upload-artifact GH action
cb-freddysart Oct 15, 2024
976fc01
feat: add oauthbearer integration test to package.xml
cb-freddysart Oct 15, 2024
f497c3e
feat: bump metadata timeout
cb-freddysart Oct 16, 2024
3231fe2
feat: debug broker startup
cb-freddysart Oct 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -734,7 +734,7 @@ PHP_METHOD(RdKafka_Conf, setLogCb)
}
/* }}} */

#ifdef HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB
#ifdef HAS_RD_KAFKA_OAUTHBEARER
/* {{{ proto void RdKafka\Conf::setOauthbearerTokenRefreshCb(mixed $callback)
Set token refresh callback for OAUTHBEARER sasl */
PHP_METHOD(RdKafka_Conf, setOauthbearerTokenRefreshCb)
Expand Down
2 changes: 1 addition & 1 deletion conf.stub.php
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public function setOffsetCommitCb(callable $callback): void {}
/** @tentative-return-type */
public function setLogCb(callable $callback): void {}

#ifdef HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB
#ifdef HAS_RD_KAFKA_OAUTHBEARER
/** @tentative-return-type */
public function setOauthbearerTokenRefreshCb(callable $callback): void {}
#endif
Expand Down
6 changes: 3 additions & 3 deletions conf_arginfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ ZEND_END_ARG_INFO()

#define arginfo_class_RdKafka_Conf_setLogCb arginfo_class_RdKafka_Conf_setErrorCb

#ifdef HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB
#ifdef HAS_RD_KAFKA_OAUTHBEARER
#define arginfo_class_RdKafka_Conf_setOauthbearerTokenRefreshCb arginfo_class_RdKafka_Conf_setErrorCb
#endif

Expand All @@ -59,7 +59,7 @@ ZEND_METHOD(RdKafka_Conf, setConsumeCb);
ZEND_METHOD(RdKafka_Conf, setOffsetCommitCb);
ZEND_METHOD(RdKafka_Conf, setLogCb);

#ifdef HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB
#ifdef HAS_RD_KAFKA_OAUTHBEARER
ZEND_METHOD(RdKafka_Conf, setOauthbearerTokenRefreshCb);
#endif

Expand All @@ -79,7 +79,7 @@ static const zend_function_entry class_RdKafka_Conf_methods[] = {
ZEND_ME(RdKafka_Conf, setConsumeCb, arginfo_class_RdKafka_Conf_setConsumeCb, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_Conf, setOffsetCommitCb, arginfo_class_RdKafka_Conf_setOffsetCommitCb, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_Conf, setLogCb, arginfo_class_RdKafka_Conf_setLogCb, ZEND_ACC_PUBLIC)
#ifdef HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB
#ifdef HAS_RD_KAFKA_OAUTHBEARER
ZEND_ME(RdKafka_Conf, setOauthbearerTokenRefreshCb, arginfo_class_RdKafka_Conf_setOauthbearerTokenRefreshCb, ZEND_ACC_PUBLIC)
#endif
ZEND_FE_END
Expand Down
6 changes: 3 additions & 3 deletions conf_legacy_arginfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ ZEND_END_ARG_INFO()

#define arginfo_class_RdKafka_Conf_setLogCb arginfo_class_RdKafka_Conf_setErrorCb

#ifdef HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB
#ifdef HAS_RD_KAFKA_OAUTHBEARER
#define arginfo_class_RdKafka_Conf_setOauthbearerTokenRefreshCb arginfo_class_RdKafka_Conf_setErrorCb
#endif

Expand All @@ -58,7 +58,7 @@ ZEND_METHOD(RdKafka_Conf, setRebalanceCb);
ZEND_METHOD(RdKafka_Conf, setConsumeCb);
ZEND_METHOD(RdKafka_Conf, setOffsetCommitCb);
ZEND_METHOD(RdKafka_Conf, setLogCb);
#ifdef HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB
#ifdef HAS_RD_KAFKA_OAUTHBEARER
ZEND_METHOD(RdKafka_Conf, setOauthbearerTokenRefreshCb);
#endif
ZEND_METHOD(RdKafka_TopicConf, __construct);
Expand All @@ -77,7 +77,7 @@ static const zend_function_entry class_RdKafka_Conf_methods[] = {
ZEND_ME(RdKafka_Conf, setConsumeCb, arginfo_class_RdKafka_Conf_setConsumeCb, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_Conf, setOffsetCommitCb, arginfo_class_RdKafka_Conf_setOffsetCommitCb, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_Conf, setLogCb, arginfo_class_RdKafka_Conf_setLogCb, ZEND_ACC_PUBLIC)
#ifdef HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB
#ifdef HAS_RD_KAFKA_OAUTHBEARER
ZEND_ME(RdKafka_Conf, setOauthbearerTokenRefreshCb, arginfo_class_RdKafka_Conf_setOauthbearerTokenRefreshCb, ZEND_ACC_PUBLIC)
#endif
ZEND_FE_END
Expand Down
6 changes: 3 additions & 3 deletions config.m4
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,10 @@ if test "$PHP_RDKAFKA" != "no"; then
AC_MSG_WARN([murmur2 partitioner is not available])
])

AC_CHECK_LIB($LIBNAME,[rd_kafka_conf_set_oauthbearer_token_refresh_cb],[
AC_DEFINE(HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB,1,[ ])
AC_CHECK_LIB($LIBNAME,[rd_kafka_oauthbearer_set_token],[
AC_DEFINE(HAS_RD_KAFKA_OAUTHBEARER,1,[ ])
],[
AC_MSG_WARN([oauthbearer token refresh cb is not available])
AC_MSG_WARN([oauthbearer support is not available])
])

LDFLAGS="$ORIG_LDFLAGS"
Expand Down
132 changes: 132 additions & 0 deletions rdkafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,138 @@ PHP_METHOD(RdKafka, setLogLevel)
}
/* }}} */

#ifdef HAS_RD_KAFKA_OAUTHBEARER
/* {{{ proto void RdKafka::oauthbearerSetToken(string $token_value, int $lifetime_ms, string $principal_name, array $extensions = [])
* Set SASL/OAUTHBEARER token and metadata
*
* The SASL/OAUTHBEARER token refresh callback or event handler should cause
* this method to be invoked upon success, via
* $kafka->oauthbearerSetToken(). The extension keys must not include the
* reserved key "`auth`", and all extension keys and values must conform to the
* required format as per https://tools.ietf.org/html/rfc7628#section-3.1:
*
* key = 1*(ALPHA)
* value = *(VCHAR / SP / HTAB / CR / LF )
*/
PHP_METHOD(RdKafka, oauthbearerSetToken)
{
kafka_object *intern;
char *token_value;
size_t token_value_len;
zend_long lifetime_ms;
char *principal_name;
size_t principal_len;
zval *extensions_arg = NULL;

char errstr[512];
rd_kafka_resp_err_t ret = 0;

if (zend_parse_parameters(ZEND_NUM_ARGS(), "sls|a", &token_value, &token_value_len, &lifetime_ms, &principal_name, &principal_len, &extensions_arg) == FAILURE) {
return;
}

intern = get_kafka_object(getThis());
if (!intern) {
return;
}

errstr[0] = '\0';

int extension_size;
char **extensions = NULL;

if (extensions_arg != NULL) {
extension_size = zend_hash_num_elements(Z_ARRVAL_P(extensions_arg)) * 2;
extensions = safe_emalloc((extension_size * 2), sizeof(char *), 0);

int pos = 0;
zend_ulong num_key;
zend_string *extension_key_str;
zval *extension_zval;
ZEND_HASH_FOREACH_KEY_VAL(Z_ARRVAL_P(extensions_arg), num_key, extension_key_str, extension_zval) {
zend_string *tmp_extension_val_str;
zend_string *extension_val_str = zval_get_tmp_string(extension_zval, &tmp_extension_val_str);

extensions[pos++] = estrdup(ZSTR_VAL(extension_key_str));
extensions[pos++] = estrdup(ZSTR_VAL(extension_val_str));

zend_tmp_string_release(tmp_extension_val_str);
} ZEND_HASH_FOREACH_END();
}

/* rd_kafka_oauthbearer_set_token(rd_kafka_t *rk,
const char *token_value,
int64_t md_lifetime_ms,
const char *md_principal_name,
const char **extensions,
size_t extension_size,
char *errstr,
size_t errstr_size */
ret = rd_kafka_oauthbearer_set_token(
intern->rk,
token_value,
lifetime_ms,
principal_name,
extensions,
extension_size,
errstr,
sizeof(errstr));

if (extensions != NULL) {
efree(extensions);
}

switch (ret) {
case RD_KAFKA_RESP_ERR__INVALID_ARG:
zend_throw_exception(ce_kafka_exception, errstr, RD_KAFKA_RESP_ERR__INVALID_ARG);
return;
case RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED:
zend_throw_exception(ce_kafka_exception, errstr, RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED);
return;
case RD_KAFKA_RESP_ERR__STATE:
zend_throw_exception(ce_kafka_exception, errstr, RD_KAFKA_RESP_ERR__STATE);
return;
case RD_KAFKA_RESP_ERR_NO_ERROR:
break;
}
}
/* }}} */

/* {{{ proto void RdKafka::oauthbearerSetTokenFailure(string $error)
The SASL/OAUTHBEARER token refresh callback or event handler should cause
this method to be invoked upon failure, via
rd_kafka_oauthbearer_set_token_failure().
*/
PHP_METHOD(RdKafka, oauthbearerSetTokenFailure)
{
kafka_object *intern;
const char *errstr;

if (zend_parse_parameters(ZEND_NUM_ARGS(), "s", &errstr) == FAILURE) {
return;
}

intern = get_kafka_object(getThis());
if (!intern) {
return;
}

rd_kafka_resp_err_t ret = rd_kafka_oauthbearer_set_token_failure(intern->rk, errstr);

switch (ret) {
case RD_KAFKA_RESP_ERR__INVALID_ARG:
zend_throw_exception(ce_kafka_exception, NULL, RD_KAFKA_RESP_ERR__INVALID_ARG);
return;
case RD_KAFKA_RESP_ERR__STATE:
zend_throw_exception(ce_kafka_exception, NULL, RD_KAFKA_RESP_ERR__STATE);
return;
case RD_KAFKA_RESP_ERR_NO_ERROR:
break;
}
}
/* }}} */
#endif

/* {{{ proto RdKafka\Topic RdKafka::newTopic(string $topic)
Returns an RdKafka\Topic object */
PHP_METHOD(RdKafka, newTopic)
Expand Down
8 changes: 8 additions & 0 deletions rdkafka.stub.php
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,14 @@ public function pausePartitions(array $topic_partitions): array {}

/** @tentative-return-type */
public function resumePartitions(array $topic_partitions): array {}

#ifdef HAS_RD_KAFKA_OAUTHBEARER
/** @tentative-return-type */
public function oauthbearerSetToken(string $token_value, int $lifetime_ms, string $principal_name, array $extensions = []): void;

/** @tentative-return-type */
public function oauthbearerSetTokenFailure(string $error): void;
#endif
}
}

Expand Down
Loading