Skip to content

Commit

Permalink
feat: implement oauthbearer token refresh cb setter (#546)
Browse files Browse the repository at this point in the history
  • Loading branch information
cb-freddysart authored Jan 5, 2024
1 parent b21a905 commit bcd5004
Show file tree
Hide file tree
Showing 10 changed files with 150 additions and 2 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test/build-librdkafka.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ set -ex
if ! [ -f ~/build-cache/librdkafka/usr/local/include/librdkafka/rdkafka.h ] || ! [ -f ~/build-cache/librdkafka/usr/local/bin/kafkacat ]; then
echo "librdkafka build is not cached"

git clone --depth 1 --branch "${LIBRDKAFKA_VERSION:-1.5.0}" "${LIBRDKAFKA_REPOSITORY_URL:-https://github.com/edenhill/librdkafka.git}"
git clone --depth 1 --branch "${LIBRDKAFKA_VERSION:-v2.3.0}" "${LIBRDKAFKA_REPOSITORY_URL:-https://github.com/edenhill/librdkafka.git}"

cd librdkafka
./configure
Expand Down
70 changes: 70 additions & 0 deletions conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ void kafka_conf_callbacks_dtor(kafka_conf_callbacks *cbs) /* {{{ */
cbs->offset_commit = NULL;
kafka_conf_callback_dtor(cbs->log);
cbs->log = NULL;
kafka_conf_callback_dtor(cbs->oauthbearer_token_refresh);
cbs->oauthbearer_token_refresh = NULL;
} /* }}} */

static void kafka_conf_callback_copy(kafka_conf_callback **to, kafka_conf_callback *from) /* {{{ */
Expand Down Expand Up @@ -337,6 +339,40 @@ static void kafka_conf_log_cb(const rd_kafka_t *rk, int level, const char *facil
zval_ptr_dtor(&args[3]);
}

/*
void rd_kafka_conf_set_oauthbearer_token_refresh_cb(
rd_kafka_conf_t *conf,
void (*oauthbearer_token_refresh_cb)(rd_kafka_t *rk,
const char *oauthbearer_config,
void *opaque)) {
}*/
static void kafka_conf_set_oauthbearer_token_refresh_cb(rd_kafka_t *rk, const char *oauthbearer_config, void *opaque)
{
kafka_conf_callbacks *cbs = (kafka_conf_callbacks*) opaque;
zval args[2];

if (!opaque) {
return;
}

if (!cbs->oauthbearer_token_refresh) {
return;
}

ZVAL_NULL(&args[0]);
ZVAL_NULL(&args[1]);

ZVAL_ZVAL(&args[0], &cbs->zrk, 1, 0);
ZVAL_STRING(&args[1], oauthbearer_config);

rdkafka_call_function(&cbs->oauthbearer_token_refresh->fci, &cbs->oauthbearer_token_refresh->fcc, NULL, 2, args);

zval_ptr_dtor(&args[0]);
zval_ptr_dtor(&args[1]);
}



/* {{{ proto RdKafka\Conf::__construct() */
PHP_METHOD(RdKafka_Conf, __construct)
{
Expand Down Expand Up @@ -698,6 +734,40 @@ PHP_METHOD(RdKafka_Conf, setLogCb)
}
/* }}} */

#ifdef HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB
/* {{{ proto void RdKafka\Conf::setOauthbearerTokenRefreshCb(mixed $callback)
Set token refresh callback for OAUTHBEARER sasl */
PHP_METHOD(RdKafka_Conf, setOauthbearerTokenRefreshCb)
{
zend_fcall_info fci;
zend_fcall_info_cache fcc;
kafka_conf_object *conf;

if (zend_parse_parameters(ZEND_NUM_ARGS(), "f", &fci, &fcc) == FAILURE) {
return;
}

conf = get_kafka_conf_object(getThis());
if (!conf) {
return;
}

Z_ADDREF_P(&fci.function_name);

if (conf->cbs.oauthbearer_token_refresh) {
zval_ptr_dtor(&conf->cbs.oauthbearer_token_refresh->fci.function_name);
} else {
conf->cbs.oauthbearer_token_refresh = ecalloc(1, sizeof(*conf->cbs.oauthbearer_token_refresh));
}

conf->cbs.oauthbearer_token_refresh->fci = fci;
conf->cbs.oauthbearer_token_refresh->fcc = fcc;

rd_kafka_conf_set_oauthbearer_token_refresh_cb(conf->u.conf, kafka_conf_set_oauthbearer_token_refresh_cb);
}
/* }}} */
#endif

/* {{{ proto RdKafka\TopicConf::__construct() */
PHP_METHOD(RdKafka_TopicConf, __construct)
{
Expand Down
1 change: 1 addition & 0 deletions conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ typedef struct _kafka_conf_callbacks {
kafka_conf_callback *consume;
kafka_conf_callback *offset_commit;
kafka_conf_callback *log;
kafka_conf_callback *oauthbearer_token_refresh;
} kafka_conf_callbacks;

typedef struct _kafka_conf_object {
Expand Down
5 changes: 5 additions & 0 deletions conf.stub.php
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ public function setOffsetCommitCb(callable $callback): void {}

/** @tentative-return-type */
public function setLogCb(callable $callback): void {}

#ifdef HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB
/** @tentative-return-type */
public function setOauthbearerTokenRefreshCb(callable $callback): void {}
#endif
}

class TopicConf
Expand Down
12 changes: 12 additions & 0 deletions conf_arginfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ ZEND_END_ARG_INFO()

#define arginfo_class_RdKafka_Conf_setLogCb arginfo_class_RdKafka_Conf_setErrorCb

#ifdef HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB
#define arginfo_class_RdKafka_Conf_setOauthbearerTokenRefreshCb arginfo_class_RdKafka_Conf_setErrorCb
#endif

#define arginfo_class_RdKafka_TopicConf___construct arginfo_class_RdKafka_Conf___construct

#define arginfo_class_RdKafka_TopicConf_dump arginfo_class_RdKafka_Conf_dump
Expand All @@ -54,6 +58,11 @@ ZEND_METHOD(RdKafka_Conf, setRebalanceCb);
ZEND_METHOD(RdKafka_Conf, setConsumeCb);
ZEND_METHOD(RdKafka_Conf, setOffsetCommitCb);
ZEND_METHOD(RdKafka_Conf, setLogCb);

#ifdef HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB
ZEND_METHOD(RdKafka_Conf, setOauthbearerTokenRefreshCb);
#endif

ZEND_METHOD(RdKafka_TopicConf, __construct);
ZEND_METHOD(RdKafka_TopicConf, setPartitioner);

Expand All @@ -70,6 +79,9 @@ static const zend_function_entry class_RdKafka_Conf_methods[] = {
ZEND_ME(RdKafka_Conf, setConsumeCb, arginfo_class_RdKafka_Conf_setConsumeCb, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_Conf, setOffsetCommitCb, arginfo_class_RdKafka_Conf_setOffsetCommitCb, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_Conf, setLogCb, arginfo_class_RdKafka_Conf_setLogCb, ZEND_ACC_PUBLIC)
#ifdef HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB
ZEND_ME(RdKafka_Conf, setOauthbearerTokenRefreshCb, arginfo_class_RdKafka_Conf_setOauthbearerTokenRefreshCb, ZEND_ACC_PUBLIC)
#endif
ZEND_FE_END
};

Expand Down
10 changes: 10 additions & 0 deletions conf_legacy_arginfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ ZEND_END_ARG_INFO()

#define arginfo_class_RdKafka_Conf_setLogCb arginfo_class_RdKafka_Conf_setErrorCb

#ifdef HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB
#define arginfo_class_RdKafka_Conf_setOauthbearerTokenRefreshCb arginfo_class_RdKafka_Conf_setErrorCb
#endif

#define arginfo_class_RdKafka_TopicConf___construct arginfo_class_RdKafka_Conf___construct

#define arginfo_class_RdKafka_TopicConf_dump arginfo_class_RdKafka_Conf_dump
Expand All @@ -54,6 +58,9 @@ ZEND_METHOD(RdKafka_Conf, setRebalanceCb);
ZEND_METHOD(RdKafka_Conf, setConsumeCb);
ZEND_METHOD(RdKafka_Conf, setOffsetCommitCb);
ZEND_METHOD(RdKafka_Conf, setLogCb);
#ifdef HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB
ZEND_METHOD(RdKafka_Conf, setOauthbearerTokenRefreshCb);
#endif
ZEND_METHOD(RdKafka_TopicConf, __construct);
ZEND_METHOD(RdKafka_TopicConf, setPartitioner);

Expand All @@ -70,6 +77,9 @@ static const zend_function_entry class_RdKafka_Conf_methods[] = {
ZEND_ME(RdKafka_Conf, setConsumeCb, arginfo_class_RdKafka_Conf_setConsumeCb, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_Conf, setOffsetCommitCb, arginfo_class_RdKafka_Conf_setOffsetCommitCb, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_Conf, setLogCb, arginfo_class_RdKafka_Conf_setLogCb, ZEND_ACC_PUBLIC)
#ifdef HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB
ZEND_ME(RdKafka_Conf, setOauthbearerTokenRefreshCb, arginfo_class_RdKafka_Conf_setOauthbearerTokenRefreshCb, ZEND_ACC_PUBLIC)
#endif
ZEND_FE_END
};

Expand Down
6 changes: 6 additions & 0 deletions config.m4
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,12 @@ if test "$PHP_RDKAFKA" != "no"; then
AC_MSG_WARN([murmur2 partitioner is not available])
])

AC_CHECK_LIB($LIBNAME,[rd_kafka_conf_set_oauthbearer_token_refresh_cb],[
AC_DEFINE(HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB,1,[ ])
],[
AC_MSG_WARN([oauthbearer token refresh cb is not available])
])

LDFLAGS="$ORIG_LDFLAGS"
CPPFLAGS="$ORIG_CPPFLAGS"

Expand Down
1 change: 1 addition & 0 deletions package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@
<file role="test" name="bug88.phpt"/>
<file role="test" name="bugConfSetArgument.phpt"/>
<file role="test" name="conf_callbacks_integration.phpt"/>
<file role="test" name="conf_callbacks_rdkafka11.phpt"/>
<file role="test" name="conf_callbacks.phpt"/>
<file role="test" name="conf.phpt"/>
<file role="test" name="conf_setDefaultTopicConf8.phpt"/>
Expand Down
6 changes: 5 additions & 1 deletion tests/conf_callbacks.phpt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
RdKafka\Conf
--SKIPIF--
<?php
RD_KAFKA_VERSION >= 0x090000 || die("skip librdkafka too old");
(RD_KAFKA_VERSION >= 0x090000 && RD_KAFKA_VERSION < 0x010100ff) || die("skip librdkafka too old");
--FILE--
<?php

Expand All @@ -23,6 +23,8 @@ $conf->setRebalanceCb(function () { });
$dump = $conf->dump();
var_dump(isset($dump["rebalance_cb"]));

echo "Checking if oauthbearer cb exists\n";
var_dump(method_exists($conf, 'setOauthbearerTokenRefreshCb'));

--EXPECT--
Setting consume callback
Expand All @@ -31,3 +33,5 @@ Setting offset_commit callback
bool(true)
Setting rebalance callback
bool(true)
Checking if oauthbearer cb exists
bool(false)
39 changes: 39 additions & 0 deletions tests/conf_callbacks_rdkafka11.phpt
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
--TEST--
RdKafka\Conf
--SKIPIF--
<?php
RD_KAFKA_VERSION >= 0x010100ff || die("skip librdkafka too old");
--FILE--
<?php

$conf = new RdKafka\Conf();

echo "Setting consume callback\n";
$conf->setConsumeCb(function () { });
$dump = $conf->dump();
var_dump(isset($dump["consume_cb"]));

echo "Setting offset_commit callback\n";
$conf->setOffsetCommitCb(function () { });
$dump = $conf->dump();
var_dump(isset($dump["offset_commit_cb"]));

echo "Setting rebalance callback\n";
$conf->setRebalanceCb(function () { });
$dump = $conf->dump();
var_dump(isset($dump["rebalance_cb"]));

echo "Setting oauth token bearer callback\n";
$conf->setOauthbearerTokenRefreshCb(function () {});
$dump = $conf->dump();
var_dump(isset($dump["oauthbearer_token_refresh_cb"]));

--EXPECT--
Setting consume callback
bool(true)
Setting offset_commit callback
bool(true)
Setting rebalance callback
bool(true)
Setting oauth token bearer callback
bool(true)

0 comments on commit bcd5004

Please sign in to comment.