Skip to content
Merged
1 change: 1 addition & 0 deletions config.m4
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ if test "$PHP_RDKAFKA" != "no"; then

AC_CHECK_LIB($LIBNAME,[rd_kafka_oauthbearer_set_token],[
AC_DEFINE(HAS_RD_KAFKA_OAUTHBEARER,1,[ ])
SOURCES="$SOURCES oauthbearer.c"
],[
AC_MSG_WARN([oauthbearer support is not available])
])
Expand Down
87 changes: 87 additions & 0 deletions kafka_consumer.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
#include "topic.h"
#include "message.h"
#include "metadata.h"
#ifdef HAS_RD_KAFKA_OAUTHBEARER
#include "oauthbearer.h"
#endif
#if PHP_VERSION_ID < 80000
#include "kafka_consumer_legacy_arginfo.h"
#else
Expand Down Expand Up @@ -868,6 +871,90 @@ PHP_METHOD(RdKafka_KafkaConsumer, resumePartitions)
}
/* }}} */

/* {{{ proto int RdKafka::poll(int $timeout_ms)
Polls the provided kafka handle for events */
PHP_METHOD(RdKafka_KafkaConsumer, poll)
{
object_intern *intern;
zend_long timeout;

if (zend_parse_parameters(ZEND_NUM_ARGS(), "l", &timeout) == FAILURE) {
return;
}

intern = get_object(getThis());
if (!intern) {
return;
}

RETURN_LONG(rd_kafka_poll(intern->rk, timeout));
}
/* }}} */

#ifdef HAS_RD_KAFKA_OAUTHBEARER
/* {{{ proto void RdKafka\KafkaConsumer::oauthbearerSetToken(string $token_value, int $lifetime_ms, string $principal_name, array $extensions = [])
* Set SASL/OAUTHBEARER token and metadata
*
* The SASL/OAUTHBEARER token refresh callback or event handler should cause
* this method to be invoked upon success, via
* $consumer->oauthbearerSetToken(). The extension keys must not include the
* reserved key "`auth`", and all extension keys and values must conform to the
* required format as per https://tools.ietf.org/html/rfc7628#section-3.1:
*
* key = 1*(ALPHA)
* value = *(VCHAR / SP / HTAB / CR / LF )
*/
PHP_METHOD(RdKafka_KafkaConsumer, oauthbearerSetToken)
{
object_intern *intern;
char *token_value;
size_t token_value_len;
zval *zlifetime_ms;
int64_t lifetime_ms;
char *principal_name;
size_t principal_len;
HashTable *extensions_hash = NULL;

if (zend_parse_parameters(ZEND_NUM_ARGS(), "szs|h", &token_value, &token_value_len, &zlifetime_ms, &principal_name, &principal_len, &extensions_hash) == FAILURE) {
return;
}

lifetime_ms = zval_to_int64(zlifetime_ms, "Argument #2 ($lifetime_ms) must be a valid integer");

intern = get_object(getThis());
if (!intern) {
return;
}

oauthbearer_set_token(intern->rk, token_value, lifetime_ms, principal_name, extensions_hash);
}
/* }}} */

/* {{{ proto void RdKafka::oauthbearerSetTokenFailure(string $error)
The SASL/OAUTHBEARER token refresh callback or event handler should cause
this method to be invoked upon failure, via
rd_kafka_oauthbearer_set_token_failure().
*/
PHP_METHOD(RdKafka_KafkaConsumer, oauthbearerSetTokenFailure)
{
object_intern *intern;
const char *errstr;
size_t errstr_len;

if (zend_parse_parameters(ZEND_NUM_ARGS(), "s", &errstr, &errstr_len) == FAILURE) {
return;
}

intern = get_object(getThis());
if (!intern) {
return;
}

oauthbearer_set_token_failure(intern->rk, errstr);
}
/* }}} */
#endif

void kafka_kafka_consumer_minit(INIT_FUNC_ARGS) /* {{{ */
{
ce = register_class_RdKafka_KafkaConsumer();
Expand Down
11 changes: 11 additions & 0 deletions kafka_consumer.stub.php
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,15 @@ public function pausePartitions(array $topic_partitions): array {}

/** @tentative-return-type */
public function resumePartitions(array $topic_partitions): array {}

/** @tentative-return-type */
public function poll(int $timeout_ms): int {}

#ifdef HAS_RD_KAFKA_OAUTHBEARER
/** @tentative-return-type */
public function oauthbearerSetToken(string $token_value, int $lifetime_ms, string $principal_name, array $extensions = []): void {}

/** @tentative-return-type */
public function oauthbearerSetTokenFailure(string $error): void {}
#endif
}
53 changes: 48 additions & 5 deletions kafka_consumer_arginfo.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/* This is a generated file, edit the .stub.php file instead.
* Stub hash: 19d0e5f9de1e91016dd8e8c87e88c3d17e0c094f */
* Stub hash: c992beb679f8970a0d6af9fb06592d8c3c4cb232 */

ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer___construct, 0, 0, 1)
ZEND_ARG_OBJ_INFO(0, conf, RdKafka\\Conf, 0)
Expand Down Expand Up @@ -135,6 +135,38 @@ ZEND_END_ARG_INFO()

#define arginfo_class_RdKafka_KafkaConsumer_resumePartitions arginfo_class_RdKafka_KafkaConsumer_getOffsetPositions

#if (PHP_VERSION_ID >= 80100)
ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_poll, 0, 1, IS_LONG, 0)
#else
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_poll, 0, 0, 1)
#endif
ZEND_ARG_TYPE_INFO(0, timeout_ms, IS_LONG, 0)
ZEND_END_ARG_INFO()

#if defined(HAS_RD_KAFKA_OAUTHBEARER)
#if (PHP_VERSION_ID >= 80100)
ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_oauthbearerSetToken, 0, 3, IS_VOID, 0)
#else
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_oauthbearerSetToken, 0, 0, 3)
#endif
ZEND_ARG_TYPE_INFO(0, token_value, IS_STRING, 0)
ZEND_ARG_TYPE_INFO(0, lifetime_ms, IS_LONG, 0)
ZEND_ARG_TYPE_INFO(0, principal_name, IS_STRING, 0)
ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(0, extensions, IS_ARRAY, 0, "[]")
ZEND_END_ARG_INFO()
#endif

#if defined(HAS_RD_KAFKA_OAUTHBEARER)
#if (PHP_VERSION_ID >= 80100)
ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_oauthbearerSetTokenFailure, 0, 1, IS_VOID, 0)
#else
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_oauthbearerSetTokenFailure, 0, 0, 1)
#endif
ZEND_ARG_TYPE_INFO(0, error, IS_STRING, 0)
ZEND_END_ARG_INFO()
#endif


ZEND_METHOD(RdKafka_KafkaConsumer, __construct);
ZEND_METHOD(RdKafka_KafkaConsumer, assign);
#if defined(HAS_RD_KAFKA_INCREMENTAL_ASSIGN)
Expand Down Expand Up @@ -162,6 +194,14 @@ ZEND_METHOD(RdKafka_KafkaConsumer, queryWatermarkOffsets);
ZEND_METHOD(RdKafka_KafkaConsumer, offsetsForTimes);
ZEND_METHOD(RdKafka_KafkaConsumer, pausePartitions);
ZEND_METHOD(RdKafka_KafkaConsumer, resumePartitions);
ZEND_METHOD(RdKafka_KafkaConsumer, poll);
#if defined(HAS_RD_KAFKA_OAUTHBEARER)
ZEND_METHOD(RdKafka_KafkaConsumer, oauthbearerSetToken);
#endif
#if defined(HAS_RD_KAFKA_OAUTHBEARER)
ZEND_METHOD(RdKafka_KafkaConsumer, oauthbearerSetTokenFailure);
#endif


static const zend_function_entry class_RdKafka_KafkaConsumer_methods[] = {
ZEND_ME(RdKafka_KafkaConsumer, __construct, arginfo_class_RdKafka_KafkaConsumer___construct, ZEND_ACC_PUBLIC)
Expand Down Expand Up @@ -191,6 +231,13 @@ static const zend_function_entry class_RdKafka_KafkaConsumer_methods[] = {
ZEND_ME(RdKafka_KafkaConsumer, offsetsForTimes, arginfo_class_RdKafka_KafkaConsumer_offsetsForTimes, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_KafkaConsumer, pausePartitions, arginfo_class_RdKafka_KafkaConsumer_pausePartitions, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_KafkaConsumer, resumePartitions, arginfo_class_RdKafka_KafkaConsumer_resumePartitions, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_KafkaConsumer, poll, arginfo_class_RdKafka_KafkaConsumer_poll, ZEND_ACC_PUBLIC)
#if defined(HAS_RD_KAFKA_OAUTHBEARER)
ZEND_ME(RdKafka_KafkaConsumer, oauthbearerSetToken, arginfo_class_RdKafka_KafkaConsumer_oauthbearerSetToken, ZEND_ACC_PUBLIC)
#endif
#if defined(HAS_RD_KAFKA_OAUTHBEARER)
ZEND_ME(RdKafka_KafkaConsumer, oauthbearerSetTokenFailure, arginfo_class_RdKafka_KafkaConsumer_oauthbearerSetTokenFailure, ZEND_ACC_PUBLIC)
#endif
ZEND_FE_END
};

Expand All @@ -199,11 +246,7 @@ static zend_class_entry *register_class_RdKafka_KafkaConsumer(void)
zend_class_entry ce, *class_entry;

INIT_NS_CLASS_ENTRY(ce, "RdKafka", "KafkaConsumer", class_RdKafka_KafkaConsumer_methods);
#if (PHP_VERSION_ID >= 80400)
class_entry = zend_register_internal_class_with_flags(&ce, NULL, 0);
#else
class_entry = zend_register_internal_class_ex(&ce, NULL);
#endif

zval property_error_cb_default_value;
ZVAL_UNDEF(&property_error_cb_default_value);
Expand Down
39 changes: 34 additions & 5 deletions kafka_consumer_legacy_arginfo.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/* This is a generated file, edit the .stub.php file instead.
* Stub hash: 19d0e5f9de1e91016dd8e8c87e88c3d17e0c094f */
* Stub hash: c992beb679f8970a0d6af9fb06592d8c3c4cb232 */

ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer___construct, 0, 0, 1)
ZEND_ARG_INFO(0, conf)
Expand Down Expand Up @@ -82,6 +82,24 @@ ZEND_END_ARG_INFO()

#define arginfo_class_RdKafka_KafkaConsumer_resumePartitions arginfo_class_RdKafka_KafkaConsumer_getOffsetPositions

#define arginfo_class_RdKafka_KafkaConsumer_poll arginfo_class_RdKafka_KafkaConsumer_consume

#if defined(HAS_RD_KAFKA_OAUTHBEARER)
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_oauthbearerSetToken, 0, 0, 3)
ZEND_ARG_INFO(0, token_value)
ZEND_ARG_INFO(0, lifetime_ms)
ZEND_ARG_INFO(0, principal_name)
ZEND_ARG_INFO(0, extensions)
ZEND_END_ARG_INFO()
#endif

#if defined(HAS_RD_KAFKA_OAUTHBEARER)
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_oauthbearerSetTokenFailure, 0, 0, 1)
ZEND_ARG_INFO(0, error)
ZEND_END_ARG_INFO()
#endif


ZEND_METHOD(RdKafka_KafkaConsumer, __construct);
ZEND_METHOD(RdKafka_KafkaConsumer, assign);
#if defined(HAS_RD_KAFKA_INCREMENTAL_ASSIGN)
Expand Down Expand Up @@ -109,6 +127,14 @@ ZEND_METHOD(RdKafka_KafkaConsumer, queryWatermarkOffsets);
ZEND_METHOD(RdKafka_KafkaConsumer, offsetsForTimes);
ZEND_METHOD(RdKafka_KafkaConsumer, pausePartitions);
ZEND_METHOD(RdKafka_KafkaConsumer, resumePartitions);
ZEND_METHOD(RdKafka_KafkaConsumer, poll);
#if defined(HAS_RD_KAFKA_OAUTHBEARER)
ZEND_METHOD(RdKafka_KafkaConsumer, oauthbearerSetToken);
#endif
#if defined(HAS_RD_KAFKA_OAUTHBEARER)
ZEND_METHOD(RdKafka_KafkaConsumer, oauthbearerSetTokenFailure);
#endif


static const zend_function_entry class_RdKafka_KafkaConsumer_methods[] = {
ZEND_ME(RdKafka_KafkaConsumer, __construct, arginfo_class_RdKafka_KafkaConsumer___construct, ZEND_ACC_PUBLIC)
Expand Down Expand Up @@ -138,6 +164,13 @@ static const zend_function_entry class_RdKafka_KafkaConsumer_methods[] = {
ZEND_ME(RdKafka_KafkaConsumer, offsetsForTimes, arginfo_class_RdKafka_KafkaConsumer_offsetsForTimes, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_KafkaConsumer, pausePartitions, arginfo_class_RdKafka_KafkaConsumer_pausePartitions, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_KafkaConsumer, resumePartitions, arginfo_class_RdKafka_KafkaConsumer_resumePartitions, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_KafkaConsumer, poll, arginfo_class_RdKafka_KafkaConsumer_poll, ZEND_ACC_PUBLIC)
#if defined(HAS_RD_KAFKA_OAUTHBEARER)
ZEND_ME(RdKafka_KafkaConsumer, oauthbearerSetToken, arginfo_class_RdKafka_KafkaConsumer_oauthbearerSetToken, ZEND_ACC_PUBLIC)
#endif
#if defined(HAS_RD_KAFKA_OAUTHBEARER)
ZEND_ME(RdKafka_KafkaConsumer, oauthbearerSetTokenFailure, arginfo_class_RdKafka_KafkaConsumer_oauthbearerSetTokenFailure, ZEND_ACC_PUBLIC)
#endif
ZEND_FE_END
};

Expand All @@ -146,11 +179,7 @@ static zend_class_entry *register_class_RdKafka_KafkaConsumer(void)
zend_class_entry ce, *class_entry;

INIT_NS_CLASS_ENTRY(ce, "RdKafka", "KafkaConsumer", class_RdKafka_KafkaConsumer_methods);
#if (PHP_VERSION_ID >= 80400)
class_entry = zend_register_internal_class_with_flags(&ce, NULL, 0);
#else
class_entry = zend_register_internal_class_ex(&ce, NULL);
#endif

zval property_error_cb_default_value;
ZVAL_NULL(&property_error_cb_default_value);
Expand Down
Loading
Loading