Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement oauthbearer token refresh cb setter #546

Merged
merged 5 commits into from
Jan 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test/build-librdkafka.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ set -ex
if ! [ -f ~/build-cache/librdkafka/usr/local/include/librdkafka/rdkafka.h ] || ! [ -f ~/build-cache/librdkafka/usr/local/bin/kafkacat ]; then
echo "librdkafka build is not cached"

git clone --depth 1 --branch "${LIBRDKAFKA_VERSION:-1.5.0}" "${LIBRDKAFKA_REPOSITORY_URL:-https://github.com/edenhill/librdkafka.git}"
git clone --depth 1 --branch "${LIBRDKAFKA_VERSION:-v2.3.0}" "${LIBRDKAFKA_REPOSITORY_URL:-https://github.com/edenhill/librdkafka.git}"

cd librdkafka
./configure
Expand Down
70 changes: 70 additions & 0 deletions conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ void kafka_conf_callbacks_dtor(kafka_conf_callbacks *cbs) /* {{{ */
cbs->offset_commit = NULL;
kafka_conf_callback_dtor(cbs->log);
cbs->log = NULL;
kafka_conf_callback_dtor(cbs->oauthbearer_token_refresh);
cbs->oauthbearer_token_refresh = NULL;
} /* }}} */

static void kafka_conf_callback_copy(kafka_conf_callback **to, kafka_conf_callback *from) /* {{{ */
Expand Down Expand Up @@ -337,6 +339,40 @@ static void kafka_conf_log_cb(const rd_kafka_t *rk, int level, const char *facil
zval_ptr_dtor(&args[3]);
}

/*
void rd_kafka_conf_set_oauthbearer_token_refresh_cb(
rd_kafka_conf_t *conf,
void (*oauthbearer_token_refresh_cb)(rd_kafka_t *rk,
const char *oauthbearer_config,
void *opaque)) {
}*/
static void kafka_conf_set_oauthbearer_token_refresh_cb(rd_kafka_t *rk, const char *oauthbearer_config, void *opaque)
{
kafka_conf_callbacks *cbs = (kafka_conf_callbacks*) opaque;
zval args[2];

if (!opaque) {
return;
}

if (!cbs->oauthbearer_token_refresh) {
return;
}

ZVAL_NULL(&args[0]);
ZVAL_NULL(&args[1]);

ZVAL_ZVAL(&args[0], &cbs->zrk, 1, 0);
ZVAL_STRING(&args[1], oauthbearer_config);

rdkafka_call_function(&cbs->oauthbearer_token_refresh->fci, &cbs->oauthbearer_token_refresh->fcc, NULL, 2, args);

zval_ptr_dtor(&args[0]);
zval_ptr_dtor(&args[1]);
}



/* {{{ proto RdKafka\Conf::__construct() */
PHP_METHOD(RdKafka_Conf, __construct)
{
Expand Down Expand Up @@ -698,6 +734,40 @@ PHP_METHOD(RdKafka_Conf, setLogCb)
}
/* }}} */

#ifdef HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB
/* {{{ proto void RdKafka\Conf::setOauthbearerTokenRefreshCb(mixed $callback)
Set token refresh callback for OAUTHBEARER sasl */
PHP_METHOD(RdKafka_Conf, setOauthbearerTokenRefreshCb)
{
zend_fcall_info fci;
zend_fcall_info_cache fcc;
kafka_conf_object *conf;

if (zend_parse_parameters(ZEND_NUM_ARGS(), "f", &fci, &fcc) == FAILURE) {
return;
}

conf = get_kafka_conf_object(getThis());
if (!conf) {
return;
}

Z_ADDREF_P(&fci.function_name);

if (conf->cbs.oauthbearer_token_refresh) {
zval_ptr_dtor(&conf->cbs.oauthbearer_token_refresh->fci.function_name);
} else {
conf->cbs.oauthbearer_token_refresh = ecalloc(1, sizeof(*conf->cbs.oauthbearer_token_refresh));
}

conf->cbs.oauthbearer_token_refresh->fci = fci;
conf->cbs.oauthbearer_token_refresh->fcc = fcc;

rd_kafka_conf_set_oauthbearer_token_refresh_cb(conf->u.conf, kafka_conf_set_oauthbearer_token_refresh_cb);
}
/* }}} */
#endif

/* {{{ proto RdKafka\TopicConf::__construct() */
PHP_METHOD(RdKafka_TopicConf, __construct)
{
Expand Down
1 change: 1 addition & 0 deletions conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ typedef struct _kafka_conf_callbacks {
kafka_conf_callback *consume;
kafka_conf_callback *offset_commit;
kafka_conf_callback *log;
kafka_conf_callback *oauthbearer_token_refresh;
} kafka_conf_callbacks;

typedef struct _kafka_conf_object {
Expand Down
5 changes: 5 additions & 0 deletions conf.stub.php
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ public function setOffsetCommitCb(callable $callback): void {}

/** @tentative-return-type */
public function setLogCb(callable $callback): void {}

#ifdef HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB
/** @tentative-return-type */
public function setOauthbearerTokenRefreshCb(callable $callback): void {}
#endif
}

class TopicConf
Expand Down
12 changes: 12 additions & 0 deletions conf_arginfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ ZEND_END_ARG_INFO()

#define arginfo_class_RdKafka_Conf_setLogCb arginfo_class_RdKafka_Conf_setErrorCb

#ifdef HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB
#define arginfo_class_RdKafka_Conf_setOauthbearerTokenRefreshCb arginfo_class_RdKafka_Conf_setErrorCb
#endif

#define arginfo_class_RdKafka_TopicConf___construct arginfo_class_RdKafka_Conf___construct

#define arginfo_class_RdKafka_TopicConf_dump arginfo_class_RdKafka_Conf_dump
Expand All @@ -54,6 +58,11 @@ ZEND_METHOD(RdKafka_Conf, setRebalanceCb);
ZEND_METHOD(RdKafka_Conf, setConsumeCb);
ZEND_METHOD(RdKafka_Conf, setOffsetCommitCb);
ZEND_METHOD(RdKafka_Conf, setLogCb);

#ifdef HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB
ZEND_METHOD(RdKafka_Conf, setOauthbearerTokenRefreshCb);
#endif

ZEND_METHOD(RdKafka_TopicConf, __construct);
ZEND_METHOD(RdKafka_TopicConf, setPartitioner);

Expand All @@ -70,6 +79,9 @@ static const zend_function_entry class_RdKafka_Conf_methods[] = {
ZEND_ME(RdKafka_Conf, setConsumeCb, arginfo_class_RdKafka_Conf_setConsumeCb, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_Conf, setOffsetCommitCb, arginfo_class_RdKafka_Conf_setOffsetCommitCb, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_Conf, setLogCb, arginfo_class_RdKafka_Conf_setLogCb, ZEND_ACC_PUBLIC)
#ifdef HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB
ZEND_ME(RdKafka_Conf, setOauthbearerTokenRefreshCb, arginfo_class_RdKafka_Conf_setOauthbearerTokenRefreshCb, ZEND_ACC_PUBLIC)
#endif
ZEND_FE_END
};

Expand Down
10 changes: 10 additions & 0 deletions conf_legacy_arginfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ ZEND_END_ARG_INFO()

#define arginfo_class_RdKafka_Conf_setLogCb arginfo_class_RdKafka_Conf_setErrorCb

#ifdef HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB
#define arginfo_class_RdKafka_Conf_setOauthbearerTokenRefreshCb arginfo_class_RdKafka_Conf_setErrorCb
#endif

#define arginfo_class_RdKafka_TopicConf___construct arginfo_class_RdKafka_Conf___construct

#define arginfo_class_RdKafka_TopicConf_dump arginfo_class_RdKafka_Conf_dump
Expand All @@ -54,6 +58,9 @@ ZEND_METHOD(RdKafka_Conf, setRebalanceCb);
ZEND_METHOD(RdKafka_Conf, setConsumeCb);
ZEND_METHOD(RdKafka_Conf, setOffsetCommitCb);
ZEND_METHOD(RdKafka_Conf, setLogCb);
#ifdef HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB
ZEND_METHOD(RdKafka_Conf, setOauthbearerTokenRefreshCb);
#endif
ZEND_METHOD(RdKafka_TopicConf, __construct);
ZEND_METHOD(RdKafka_TopicConf, setPartitioner);

Expand All @@ -70,6 +77,9 @@ static const zend_function_entry class_RdKafka_Conf_methods[] = {
ZEND_ME(RdKafka_Conf, setConsumeCb, arginfo_class_RdKafka_Conf_setConsumeCb, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_Conf, setOffsetCommitCb, arginfo_class_RdKafka_Conf_setOffsetCommitCb, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_Conf, setLogCb, arginfo_class_RdKafka_Conf_setLogCb, ZEND_ACC_PUBLIC)
#ifdef HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB
ZEND_ME(RdKafka_Conf, setOauthbearerTokenRefreshCb, arginfo_class_RdKafka_Conf_setOauthbearerTokenRefreshCb, ZEND_ACC_PUBLIC)
#endif
ZEND_FE_END
};

Expand Down
6 changes: 6 additions & 0 deletions config.m4
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,12 @@ if test "$PHP_RDKAFKA" != "no"; then
AC_MSG_WARN([murmur2 partitioner is not available])
])

AC_CHECK_LIB($LIBNAME,[rd_kafka_conf_set_oauthbearer_token_refresh_cb],[
AC_DEFINE(HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB,1,[ ])
],[
AC_MSG_WARN([oauthbearer token refresh cb is not available])
])

LDFLAGS="$ORIG_LDFLAGS"
CPPFLAGS="$ORIG_CPPFLAGS"

Expand Down
1 change: 1 addition & 0 deletions package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@
<file role="test" name="bug88.phpt"/>
<file role="test" name="bugConfSetArgument.phpt"/>
<file role="test" name="conf_callbacks_integration.phpt"/>
<file role="test" name="conf_callbacks_rdkafka11.phpt"/>
<file role="test" name="conf_callbacks.phpt"/>
<file role="test" name="conf.phpt"/>
<file role="test" name="conf_setDefaultTopicConf8.phpt"/>
Expand Down
6 changes: 5 additions & 1 deletion tests/conf_callbacks.phpt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
RdKafka\Conf
--SKIPIF--
<?php
RD_KAFKA_VERSION >= 0x090000 || die("skip librdkafka too old");
(RD_KAFKA_VERSION >= 0x090000 && RD_KAFKA_VERSION < 0x010100ff) || die("skip librdkafka too old");
--FILE--
<?php

Expand All @@ -23,6 +23,8 @@ $conf->setRebalanceCb(function () { });
$dump = $conf->dump();
var_dump(isset($dump["rebalance_cb"]));

echo "Checking if oauthbearer cb exists\n";
var_dump(method_exists($conf, 'setOauthbearerTokenRefreshCb'));

--EXPECT--
Setting consume callback
Expand All @@ -31,3 +33,5 @@ Setting offset_commit callback
bool(true)
Setting rebalance callback
bool(true)
Checking if oauthbearer cb exists
bool(false)
39 changes: 39 additions & 0 deletions tests/conf_callbacks_rdkafka11.phpt
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
--TEST--
RdKafka\Conf
--SKIPIF--
<?php
RD_KAFKA_VERSION >= 0x010100ff || die("skip librdkafka too old");
--FILE--
<?php

$conf = new RdKafka\Conf();

echo "Setting consume callback\n";
$conf->setConsumeCb(function () { });
$dump = $conf->dump();
var_dump(isset($dump["consume_cb"]));

echo "Setting offset_commit callback\n";
$conf->setOffsetCommitCb(function () { });
$dump = $conf->dump();
var_dump(isset($dump["offset_commit_cb"]));

echo "Setting rebalance callback\n";
$conf->setRebalanceCb(function () { });
$dump = $conf->dump();
var_dump(isset($dump["rebalance_cb"]));

echo "Setting oauth token bearer callback\n";
$conf->setOauthbearerTokenRefreshCb(function () {});
$dump = $conf->dump();
var_dump(isset($dump["oauthbearer_token_refresh_cb"]));

--EXPECT--
Setting consume callback
bool(true)
Setting offset_commit callback
bool(true)
Setting rebalance callback
bool(true)
Setting oauth token bearer callback
bool(true)
Loading