Skip to content

Commit

Permalink
feat: implement oauthbearer token refresh cb setter
Browse files Browse the repository at this point in the history
  • Loading branch information
cb-freddysart committed Jan 3, 2024
1 parent b21a905 commit d26ded3
Show file tree
Hide file tree
Showing 9 changed files with 140 additions and 1 deletion.
73 changes: 73 additions & 0 deletions conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ void kafka_conf_callbacks_dtor(kafka_conf_callbacks *cbs) /* {{{ */
cbs->offset_commit = NULL;
kafka_conf_callback_dtor(cbs->log);
cbs->log = NULL;
kafka_conf_callback_dtor(cbs->oauthbearer_token_refresh);
cbs->oauthbearer_token_refresh = NULL;
} /* }}} */

static void kafka_conf_callback_copy(kafka_conf_callback **to, kafka_conf_callback *from) /* {{{ */
Expand Down Expand Up @@ -337,6 +339,40 @@ static void kafka_conf_log_cb(const rd_kafka_t *rk, int level, const char *facil
zval_ptr_dtor(&args[3]);
}

/*
void rd_kafka_conf_set_oauthbearer_token_refresh_cb(
rd_kafka_conf_t *conf,
void (*oauthbearer_token_refresh_cb)(rd_kafka_t *rk,
const char *oauthbearer_config,
void *opaque)) {
}*/
static void kafka_conf_set_oauthbearer_token_refresh_cb(rd_kafka_t *rk, const char *oauthbearer_config, void *opaque)
{
kafka_conf_callbacks *cbs = (kafka_conf_callbacks*) opaque;
zval args[2];

if (!opaque) {
return;
}

if (!cbs->oauthbearer_token_refresh) {
return;
}

ZVAL_NULL(&args[0]);
ZVAL_NULL(&args[1]);

ZVAL_ZVAL(&args[0], &cbs->zrk, 1, 0);
ZVAL_STRING(&args[1], oauthbearer_config);

rdkafka_call_function(&cbs->oauthbearer_token_refresh->fci, &cbs->oauthbearer_token_refresh->fcc, NULL, 2, args);

zval_ptr_dtor(&args[0]);
zval_ptr_dtor(&args[1]);
}



/* {{{ proto RdKafka\Conf::__construct() */
PHP_METHOD(RdKafka_Conf, __construct)
{
Expand Down Expand Up @@ -698,6 +734,43 @@ PHP_METHOD(RdKafka_Conf, setLogCb)
}
/* }}} */

/* {{{ proto void RdKafka\Conf::setOauthbearerTokenRefreshCb(mixed $callback)
Set token refresh callback for OAUTHBEARER sasl */
PHP_METHOD(RdKafka_Conf, setOauthbearerTokenRefreshCb)
{
#ifndef HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB
zend_throw_exception_ex(NULL, 0, "This version of rdkafka does not support the OAUTHBEARER sasl mechanism");
return;
#endif

zend_fcall_info fci;
zend_fcall_info_cache fcc;
kafka_conf_object *conf;

if (zend_parse_parameters(ZEND_NUM_ARGS(), "f", &fci, &fcc) == FAILURE) {
return;
}

conf = get_kafka_conf_object(getThis());
if (!conf) {
return;
}

Z_ADDREF_P(&fci.function_name);

if (conf->cbs.log) {
zval_ptr_dtor(&conf->cbs.oauthbearer_token_refresh->fci.function_name);
} else {
conf->cbs.oauthbearer_token_refresh = ecalloc(1, sizeof(*conf->cbs.oauthbearer_token_refresh));
}

conf->cbs.oauthbearer_token_refresh->fci = fci;
conf->cbs.oauthbearer_token_refresh->fcc = fcc;

rd_kafka_conf_set_oauthbearer_token_refresh_cb(conf->u.conf, kafka_conf_set_oauthbearer_token_refresh_cb);
}
/* }}} */

/* {{{ proto RdKafka\TopicConf::__construct() */
PHP_METHOD(RdKafka_TopicConf, __construct)
{
Expand Down
1 change: 1 addition & 0 deletions conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ typedef struct _kafka_conf_callbacks {
kafka_conf_callback *consume;
kafka_conf_callback *offset_commit;
kafka_conf_callback *log;
kafka_conf_callback *oauthbearer_token_refresh;
} kafka_conf_callbacks;

typedef struct _kafka_conf_object {
Expand Down
3 changes: 3 additions & 0 deletions conf.stub.php
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ public function setOffsetCommitCb(callable $callback): void {}

/** @tentative-return-type */
public function setLogCb(callable $callback): void {}

/** @tentative-return-type */
public function setOauthbearerTokenRefreshCb(callable $callback): void {}
}

class TopicConf
Expand Down
4 changes: 4 additions & 0 deletions conf_arginfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ ZEND_END_ARG_INFO()

#define arginfo_class_RdKafka_Conf_setLogCb arginfo_class_RdKafka_Conf_setErrorCb

#define arginfo_class_RdKafka_Conf_setOauthbearerTokenRefreshCb arginfo_class_RdKafka_Conf_setErrorCb

#define arginfo_class_RdKafka_TopicConf___construct arginfo_class_RdKafka_Conf___construct

#define arginfo_class_RdKafka_TopicConf_dump arginfo_class_RdKafka_Conf_dump
Expand All @@ -54,6 +56,7 @@ ZEND_METHOD(RdKafka_Conf, setRebalanceCb);
ZEND_METHOD(RdKafka_Conf, setConsumeCb);
ZEND_METHOD(RdKafka_Conf, setOffsetCommitCb);
ZEND_METHOD(RdKafka_Conf, setLogCb);
ZEND_METHOD(RdKafka_Conf, setOauthbearerTokenRefreshCb);
ZEND_METHOD(RdKafka_TopicConf, __construct);
ZEND_METHOD(RdKafka_TopicConf, setPartitioner);

Expand All @@ -70,6 +73,7 @@ static const zend_function_entry class_RdKafka_Conf_methods[] = {
ZEND_ME(RdKafka_Conf, setConsumeCb, arginfo_class_RdKafka_Conf_setConsumeCb, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_Conf, setOffsetCommitCb, arginfo_class_RdKafka_Conf_setOffsetCommitCb, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_Conf, setLogCb, arginfo_class_RdKafka_Conf_setLogCb, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_Conf, setOauthbearerTokenRefreshCb, arginfo_class_RdKafka_Conf_setOauthbearerTokenRefreshCb, ZEND_ACC_PUBLIC)
ZEND_FE_END
};

Expand Down
4 changes: 4 additions & 0 deletions conf_legacy_arginfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ ZEND_END_ARG_INFO()

#define arginfo_class_RdKafka_Conf_setLogCb arginfo_class_RdKafka_Conf_setErrorCb

#define arginfo_class_RdKafka_Conf_setOauthbearerTokenRefreshCb arginfo_class_RdKafka_Conf_setErrorCb

#define arginfo_class_RdKafka_TopicConf___construct arginfo_class_RdKafka_Conf___construct

#define arginfo_class_RdKafka_TopicConf_dump arginfo_class_RdKafka_Conf_dump
Expand All @@ -54,6 +56,7 @@ ZEND_METHOD(RdKafka_Conf, setRebalanceCb);
ZEND_METHOD(RdKafka_Conf, setConsumeCb);
ZEND_METHOD(RdKafka_Conf, setOffsetCommitCb);
ZEND_METHOD(RdKafka_Conf, setLogCb);
ZEND_METHOD(RdKafka_Conf, setOauthbearerTokenRefreshCb);
ZEND_METHOD(RdKafka_TopicConf, __construct);
ZEND_METHOD(RdKafka_TopicConf, setPartitioner);

Expand All @@ -70,6 +73,7 @@ static const zend_function_entry class_RdKafka_Conf_methods[] = {
ZEND_ME(RdKafka_Conf, setConsumeCb, arginfo_class_RdKafka_Conf_setConsumeCb, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_Conf, setOffsetCommitCb, arginfo_class_RdKafka_Conf_setOffsetCommitCb, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_Conf, setLogCb, arginfo_class_RdKafka_Conf_setLogCb, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_Conf, setOauthbearerTokenRefreshCb, arginfo_class_RdKafka_Conf_setOauthbearerTokenRefreshCb, ZEND_ACC_PUBLIC)
ZEND_FE_END
};

Expand Down
6 changes: 6 additions & 0 deletions config.m4
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,12 @@ if test "$PHP_RDKAFKA" != "no"; then
AC_MSG_WARN([murmur2 partitioner is not available])
])

AC_CHECK_LIB($LIBNAME,[rd_kafka_conf_set_oauthbearer_token_refresh_cb],[
AC_DEFINE(HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB,1,[ ])
],[
AC_MSG_WARN([oauthbearer token refresh cb is not available])
])

LDFLAGS="$ORIG_LDFLAGS"
CPPFLAGS="$ORIG_CPPFLAGS"

Expand Down
1 change: 1 addition & 0 deletions package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@
<file role="test" name="bug88.phpt"/>
<file role="test" name="bugConfSetArgument.phpt"/>
<file role="test" name="conf_callbacks_integration.phpt"/>
<file role="test" name="conf_callbacks_rdkafka11.phpt"/>
<file role="test" name="conf_callbacks.phpt"/>
<file role="test" name="conf.phpt"/>
<file role="test" name="conf_setDefaultTopicConf8.phpt"/>
Expand Down
10 changes: 9 additions & 1 deletion tests/conf_callbacks.phpt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
RdKafka\Conf
--SKIPIF--
<?php
RD_KAFKA_VERSION >= 0x090000 || die("skip librdkafka too old");
(RD_KAFKA_VERSION >= 0x090000 && RD_KAFKA_VERSION < 0x010100ff) || die("skip librdkafka too old");
--FILE--
<?php

Expand All @@ -23,6 +23,12 @@ $conf->setRebalanceCb(function () { });
$dump = $conf->dump();
var_dump(isset($dump["rebalance_cb"]));

echo "Setting oauth token bearer callback\n";
try {
$conf->setOauthbearerTokenRefreshCb(function () {});
} catch (\Exception $e) {
echo $e->getMessage()."\n";
}

--EXPECT--
Setting consume callback
Expand All @@ -31,3 +37,5 @@ Setting offset_commit callback
bool(true)
Setting rebalance callback
bool(true)
Setting oauth token bearer callback
This version of rdkafka does not support the OAUTHBEARER sasl mechanism
39 changes: 39 additions & 0 deletions tests/conf_callbacks_rdkafka11.phpt
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
--TEST--
RdKafka\Conf
--SKIPIF--
<?php
RD_KAFKA_VERSION >= 0x010100ff || die("skip librdkafka too old");
--FILE--
<?php

$conf = new RdKafka\Conf();

echo "Setting consume callback\n";
$conf->setConsumeCb(function () { });
$dump = $conf->dump();
var_dump(isset($dump["consume_cb"]));

echo "Setting offset_commit callback\n";
$conf->setOffsetCommitCb(function () { });
$dump = $conf->dump();
var_dump(isset($dump["offset_commit_cb"]));

echo "Setting rebalance callback\n";
$conf->setRebalanceCb(function () { });
$dump = $conf->dump();
var_dump(isset($dump["rebalance_cb"]));

echo "Setting oauth token bearer callback\n";
$conf->setOauthbearerTokenRefreshCb(function () {});
$dump = $conf->dump();
var_dump(isset($dump["oauthbearer_token_refresh_cb"]));

--EXPECT--
Setting consume callback
bool(true)
Setting offset_commit callback
bool(true)
Setting rebalance callback
bool(true)
Setting oauth token bearer callback
bool(true)

0 comments on commit d26ded3

Please sign in to comment.