From d26ded3464343e4177f47afd29a53c3cea3c334d Mon Sep 17 00:00:00 2001 From: Fred Dysart Date: Wed, 3 Jan 2024 13:35:10 -0500 Subject: [PATCH 1/5] feat: implement oauthbearer token refresh cb setter --- conf.c | 73 +++++++++++++++++++++++++++++ conf.h | 1 + conf.stub.php | 3 ++ conf_arginfo.h | 4 ++ conf_legacy_arginfo.h | 4 ++ config.m4 | 6 +++ package.xml | 1 + tests/conf_callbacks.phpt | 10 +++- tests/conf_callbacks_rdkafka11.phpt | 39 +++++++++++++++ 9 files changed, 140 insertions(+), 1 deletion(-) create mode 100644 tests/conf_callbacks_rdkafka11.phpt diff --git a/conf.c b/conf.c index 1701a022..7a1cf4c3 100644 --- a/conf.c +++ b/conf.c @@ -66,6 +66,8 @@ void kafka_conf_callbacks_dtor(kafka_conf_callbacks *cbs) /* {{{ */ cbs->offset_commit = NULL; kafka_conf_callback_dtor(cbs->log); cbs->log = NULL; + kafka_conf_callback_dtor(cbs->oauthbearer_token_refresh); + cbs->oauthbearer_token_refresh = NULL; } /* }}} */ static void kafka_conf_callback_copy(kafka_conf_callback **to, kafka_conf_callback *from) /* {{{ */ @@ -337,6 +339,40 @@ static void kafka_conf_log_cb(const rd_kafka_t *rk, int level, const char *facil zval_ptr_dtor(&args[3]); } +/* +void rd_kafka_conf_set_oauthbearer_token_refresh_cb( + rd_kafka_conf_t *conf, + void (*oauthbearer_token_refresh_cb)(rd_kafka_t *rk, + const char *oauthbearer_config, + void *opaque)) { +}*/ +static void kafka_conf_set_oauthbearer_token_refresh_cb(rd_kafka_t *rk, const char *oauthbearer_config, void *opaque) +{ + kafka_conf_callbacks *cbs = (kafka_conf_callbacks*) opaque; + zval args[2]; + + if (!opaque) { + return; + } + + if (!cbs->oauthbearer_token_refresh) { + return; + } + + ZVAL_NULL(&args[0]); + ZVAL_NULL(&args[1]); + + ZVAL_ZVAL(&args[0], &cbs->zrk, 1, 0); + ZVAL_STRING(&args[1], oauthbearer_config); + + rdkafka_call_function(&cbs->oauthbearer_token_refresh->fci, &cbs->oauthbearer_token_refresh->fcc, NULL, 2, args); + + zval_ptr_dtor(&args[0]); + zval_ptr_dtor(&args[1]); +} + + + /* {{{ proto RdKafka\Conf::__construct() */ PHP_METHOD(RdKafka_Conf, __construct) { @@ -698,6 +734,43 @@ PHP_METHOD(RdKafka_Conf, setLogCb) } /* }}} */ +/* {{{ proto void RdKafka\Conf::setOauthbearerTokenRefreshCb(mixed $callback) + Set token refresh callback for OAUTHBEARER sasl */ +PHP_METHOD(RdKafka_Conf, setOauthbearerTokenRefreshCb) +{ +#ifndef HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB + zend_throw_exception_ex(NULL, 0, "This version of rdkafka does not support the OAUTHBEARER sasl mechanism"); + return; +#endif + + zend_fcall_info fci; + zend_fcall_info_cache fcc; + kafka_conf_object *conf; + + if (zend_parse_parameters(ZEND_NUM_ARGS(), "f", &fci, &fcc) == FAILURE) { + return; + } + + conf = get_kafka_conf_object(getThis()); + if (!conf) { + return; + } + + Z_ADDREF_P(&fci.function_name); + + if (conf->cbs.log) { + zval_ptr_dtor(&conf->cbs.oauthbearer_token_refresh->fci.function_name); + } else { + conf->cbs.oauthbearer_token_refresh = ecalloc(1, sizeof(*conf->cbs.oauthbearer_token_refresh)); + } + + conf->cbs.oauthbearer_token_refresh->fci = fci; + conf->cbs.oauthbearer_token_refresh->fcc = fcc; + + rd_kafka_conf_set_oauthbearer_token_refresh_cb(conf->u.conf, kafka_conf_set_oauthbearer_token_refresh_cb); +} +/* }}} */ + /* {{{ proto RdKafka\TopicConf::__construct() */ PHP_METHOD(RdKafka_TopicConf, __construct) { diff --git a/conf.h b/conf.h index 783cdd52..21df4470 100644 --- a/conf.h +++ b/conf.h @@ -46,6 +46,7 @@ typedef struct _kafka_conf_callbacks { kafka_conf_callback *consume; kafka_conf_callback *offset_commit; kafka_conf_callback *log; + kafka_conf_callback *oauthbearer_token_refresh; } kafka_conf_callbacks; typedef struct _kafka_conf_object { diff --git a/conf.stub.php b/conf.stub.php index 4d1ad3b6..ccd84ab1 100644 --- a/conf.stub.php +++ b/conf.stub.php @@ -44,6 +44,9 @@ public function setOffsetCommitCb(callable $callback): void {} /** @tentative-return-type */ public function setLogCb(callable $callback): void {} + + /** @tentative-return-type */ + public function setOauthbearerTokenRefreshCb(callable $callback): void {} } class TopicConf diff --git a/conf_arginfo.h b/conf_arginfo.h index f823b3e8..7befd4de 100644 --- a/conf_arginfo.h +++ b/conf_arginfo.h @@ -32,6 +32,8 @@ ZEND_END_ARG_INFO() #define arginfo_class_RdKafka_Conf_setLogCb arginfo_class_RdKafka_Conf_setErrorCb +#define arginfo_class_RdKafka_Conf_setOauthbearerTokenRefreshCb arginfo_class_RdKafka_Conf_setErrorCb + #define arginfo_class_RdKafka_TopicConf___construct arginfo_class_RdKafka_Conf___construct #define arginfo_class_RdKafka_TopicConf_dump arginfo_class_RdKafka_Conf_dump @@ -54,6 +56,7 @@ ZEND_METHOD(RdKafka_Conf, setRebalanceCb); ZEND_METHOD(RdKafka_Conf, setConsumeCb); ZEND_METHOD(RdKafka_Conf, setOffsetCommitCb); ZEND_METHOD(RdKafka_Conf, setLogCb); +ZEND_METHOD(RdKafka_Conf, setOauthbearerTokenRefreshCb); ZEND_METHOD(RdKafka_TopicConf, __construct); ZEND_METHOD(RdKafka_TopicConf, setPartitioner); @@ -70,6 +73,7 @@ static const zend_function_entry class_RdKafka_Conf_methods[] = { ZEND_ME(RdKafka_Conf, setConsumeCb, arginfo_class_RdKafka_Conf_setConsumeCb, ZEND_ACC_PUBLIC) ZEND_ME(RdKafka_Conf, setOffsetCommitCb, arginfo_class_RdKafka_Conf_setOffsetCommitCb, ZEND_ACC_PUBLIC) ZEND_ME(RdKafka_Conf, setLogCb, arginfo_class_RdKafka_Conf_setLogCb, ZEND_ACC_PUBLIC) + ZEND_ME(RdKafka_Conf, setOauthbearerTokenRefreshCb, arginfo_class_RdKafka_Conf_setOauthbearerTokenRefreshCb, ZEND_ACC_PUBLIC) ZEND_FE_END }; diff --git a/conf_legacy_arginfo.h b/conf_legacy_arginfo.h index d05e42f2..b86438b3 100644 --- a/conf_legacy_arginfo.h +++ b/conf_legacy_arginfo.h @@ -32,6 +32,8 @@ ZEND_END_ARG_INFO() #define arginfo_class_RdKafka_Conf_setLogCb arginfo_class_RdKafka_Conf_setErrorCb +#define arginfo_class_RdKafka_Conf_setOauthbearerTokenRefreshCb arginfo_class_RdKafka_Conf_setErrorCb + #define arginfo_class_RdKafka_TopicConf___construct arginfo_class_RdKafka_Conf___construct #define arginfo_class_RdKafka_TopicConf_dump arginfo_class_RdKafka_Conf_dump @@ -54,6 +56,7 @@ ZEND_METHOD(RdKafka_Conf, setRebalanceCb); ZEND_METHOD(RdKafka_Conf, setConsumeCb); ZEND_METHOD(RdKafka_Conf, setOffsetCommitCb); ZEND_METHOD(RdKafka_Conf, setLogCb); +ZEND_METHOD(RdKafka_Conf, setOauthbearerTokenRefreshCb); ZEND_METHOD(RdKafka_TopicConf, __construct); ZEND_METHOD(RdKafka_TopicConf, setPartitioner); @@ -70,6 +73,7 @@ static const zend_function_entry class_RdKafka_Conf_methods[] = { ZEND_ME(RdKafka_Conf, setConsumeCb, arginfo_class_RdKafka_Conf_setConsumeCb, ZEND_ACC_PUBLIC) ZEND_ME(RdKafka_Conf, setOffsetCommitCb, arginfo_class_RdKafka_Conf_setOffsetCommitCb, ZEND_ACC_PUBLIC) ZEND_ME(RdKafka_Conf, setLogCb, arginfo_class_RdKafka_Conf_setLogCb, ZEND_ACC_PUBLIC) + ZEND_ME(RdKafka_Conf, setOauthbearerTokenRefreshCb, arginfo_class_RdKafka_Conf_setOauthbearerTokenRefreshCb, ZEND_ACC_PUBLIC) ZEND_FE_END }; diff --git a/config.m4 b/config.m4 index 9df6b31b..9a10e91f 100644 --- a/config.m4 +++ b/config.m4 @@ -84,6 +84,12 @@ if test "$PHP_RDKAFKA" != "no"; then AC_MSG_WARN([murmur2 partitioner is not available]) ]) + AC_CHECK_LIB($LIBNAME,[rd_kafka_conf_set_oauthbearer_token_refresh_cb],[ + AC_DEFINE(HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB,1,[ ]) + ],[ + AC_MSG_WARN([oauthbearer token refresh cb is not available]) + ]) + LDFLAGS="$ORIG_LDFLAGS" CPPFLAGS="$ORIG_CPPFLAGS" diff --git a/package.xml b/package.xml index 368fcb8a..926b8167 100644 --- a/package.xml +++ b/package.xml @@ -114,6 +114,7 @@ + diff --git a/tests/conf_callbacks.phpt b/tests/conf_callbacks.phpt index 1ac26408..6c78358d 100644 --- a/tests/conf_callbacks.phpt +++ b/tests/conf_callbacks.phpt @@ -2,7 +2,7 @@ RdKafka\Conf --SKIPIF-- = 0x090000 || die("skip librdkafka too old"); +(RD_KAFKA_VERSION >= 0x090000 && RD_KAFKA_VERSION < 0x010100ff) || die("skip librdkafka too old"); --FILE-- setRebalanceCb(function () { }); $dump = $conf->dump(); var_dump(isset($dump["rebalance_cb"])); +echo "Setting oauth token bearer callback\n"; +try { + $conf->setOauthbearerTokenRefreshCb(function () {}); +} catch (\Exception $e) { + echo $e->getMessage()."\n"; +} --EXPECT-- Setting consume callback @@ -31,3 +37,5 @@ Setting offset_commit callback bool(true) Setting rebalance callback bool(true) +Setting oauth token bearer callback +This version of rdkafka does not support the OAUTHBEARER sasl mechanism diff --git a/tests/conf_callbacks_rdkafka11.phpt b/tests/conf_callbacks_rdkafka11.phpt new file mode 100644 index 00000000..1e7b72fa --- /dev/null +++ b/tests/conf_callbacks_rdkafka11.phpt @@ -0,0 +1,39 @@ +--TEST-- +RdKafka\Conf +--SKIPIF-- += 0x010100ff || die("skip librdkafka too old"); +--FILE-- +setConsumeCb(function () { }); +$dump = $conf->dump(); +var_dump(isset($dump["consume_cb"])); + +echo "Setting offset_commit callback\n"; +$conf->setOffsetCommitCb(function () { }); +$dump = $conf->dump(); +var_dump(isset($dump["offset_commit_cb"])); + +echo "Setting rebalance callback\n"; +$conf->setRebalanceCb(function () { }); +$dump = $conf->dump(); +var_dump(isset($dump["rebalance_cb"])); + +echo "Setting oauth token bearer callback\n"; +$conf->setOauthbearerTokenRefreshCb(function () {}); +$dump = $conf->dump(); +var_dump(isset($dump["oauthbearer_token_refresh_cb"])); + +--EXPECT-- +Setting consume callback +bool(true) +Setting offset_commit callback +bool(true) +Setting rebalance callback +bool(true) +Setting oauth token bearer callback +bool(true) From d2ebbd5e6120b831fcae78a7fec7747ac2b262d5 Mon Sep 17 00:00:00 2001 From: Fred Dysart Date: Thu, 4 Jan 2024 05:42:09 -0500 Subject: [PATCH 2/5] feat: fix default build of rdkafka --- .github/workflows/test/build-librdkafka.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test/build-librdkafka.sh b/.github/workflows/test/build-librdkafka.sh index 99a363b1..7674b7d6 100755 --- a/.github/workflows/test/build-librdkafka.sh +++ b/.github/workflows/test/build-librdkafka.sh @@ -5,7 +5,7 @@ set -ex if ! [ -f ~/build-cache/librdkafka/usr/local/include/librdkafka/rdkafka.h ] || ! [ -f ~/build-cache/librdkafka/usr/local/bin/kafkacat ]; then echo "librdkafka build is not cached" - git clone --depth 1 --branch "${LIBRDKAFKA_VERSION:-1.5.0}" "${LIBRDKAFKA_REPOSITORY_URL:-https://github.com/edenhill/librdkafka.git}" + git clone --depth 1 --branch "${LIBRDKAFKA_VERSION:-v2.3.0}" "${LIBRDKAFKA_REPOSITORY_URL:-https://github.com/edenhill/librdkafka.git}" cd librdkafka ./configure From 628ba23f913fad74474df99e38aa4ca9a867958c Mon Sep 17 00:00:00 2001 From: Fred Dysart Date: Thu, 4 Jan 2024 09:20:28 -0500 Subject: [PATCH 3/5] feat: exclude setOauthbearerTokenRefreshCb method pre librdkafka 1.1 --- conf.c | 7 ++----- conf.stub.php | 2 ++ tests/conf_callbacks.phpt | 12 ++++-------- 3 files changed, 8 insertions(+), 13 deletions(-) diff --git a/conf.c b/conf.c index 7a1cf4c3..26fd088b 100644 --- a/conf.c +++ b/conf.c @@ -734,15 +734,11 @@ PHP_METHOD(RdKafka_Conf, setLogCb) } /* }}} */ +#ifdef HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB /* {{{ proto void RdKafka\Conf::setOauthbearerTokenRefreshCb(mixed $callback) Set token refresh callback for OAUTHBEARER sasl */ PHP_METHOD(RdKafka_Conf, setOauthbearerTokenRefreshCb) { -#ifndef HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB - zend_throw_exception_ex(NULL, 0, "This version of rdkafka does not support the OAUTHBEARER sasl mechanism"); - return; -#endif - zend_fcall_info fci; zend_fcall_info_cache fcc; kafka_conf_object *conf; @@ -770,6 +766,7 @@ PHP_METHOD(RdKafka_Conf, setOauthbearerTokenRefreshCb) rd_kafka_conf_set_oauthbearer_token_refresh_cb(conf->u.conf, kafka_conf_set_oauthbearer_token_refresh_cb); } /* }}} */ +#endif /* {{{ proto RdKafka\TopicConf::__construct() */ PHP_METHOD(RdKafka_TopicConf, __construct) diff --git a/conf.stub.php b/conf.stub.php index ccd84ab1..7aa6a62d 100644 --- a/conf.stub.php +++ b/conf.stub.php @@ -45,8 +45,10 @@ public function setOffsetCommitCb(callable $callback): void {} /** @tentative-return-type */ public function setLogCb(callable $callback): void {} + #ifdef HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB /** @tentative-return-type */ public function setOauthbearerTokenRefreshCb(callable $callback): void {} + #endif } class TopicConf diff --git a/tests/conf_callbacks.phpt b/tests/conf_callbacks.phpt index 6c78358d..fd5e881e 100644 --- a/tests/conf_callbacks.phpt +++ b/tests/conf_callbacks.phpt @@ -23,12 +23,8 @@ $conf->setRebalanceCb(function () { }); $dump = $conf->dump(); var_dump(isset($dump["rebalance_cb"])); -echo "Setting oauth token bearer callback\n"; -try { - $conf->setOauthbearerTokenRefreshCb(function () {}); -} catch (\Exception $e) { - echo $e->getMessage()."\n"; -} +echo "Checking if oauthbearer cb exists\n"; +var_dump(method_exists($conf, 'setOauthbearerTokenRefreshCb')); --EXPECT-- Setting consume callback @@ -37,5 +33,5 @@ Setting offset_commit callback bool(true) Setting rebalance callback bool(true) -Setting oauth token bearer callback -This version of rdkafka does not support the OAUTHBEARER sasl mechanism +Checking if oauthbearer cb exists +bool(false) From 083405c42cfbe3c0ae0ba203cec76f2e33b73c1a Mon Sep 17 00:00:00 2001 From: Fred Dysart Date: Thu, 4 Jan 2024 09:21:05 -0500 Subject: [PATCH 4/5] fix: typo --- conf.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/conf.c b/conf.c index 26fd088b..df3472b5 100644 --- a/conf.c +++ b/conf.c @@ -754,7 +754,7 @@ PHP_METHOD(RdKafka_Conf, setOauthbearerTokenRefreshCb) Z_ADDREF_P(&fci.function_name); - if (conf->cbs.log) { + if (conf->cbs.oauthbearer_token_refresh) { zval_ptr_dtor(&conf->cbs.oauthbearer_token_refresh->fci.function_name); } else { conf->cbs.oauthbearer_token_refresh = ecalloc(1, sizeof(*conf->cbs.oauthbearer_token_refresh)); From 60bfaa5d9f796a56d8af25d339b7630ba12bc346 Mon Sep 17 00:00:00 2001 From: Fred Dysart Date: Thu, 4 Jan 2024 09:25:33 -0500 Subject: [PATCH 5/5] fix: also exclude oauth from arginfo entries --- conf_arginfo.h | 8 ++++++++ conf_legacy_arginfo.h | 6 ++++++ 2 files changed, 14 insertions(+) diff --git a/conf_arginfo.h b/conf_arginfo.h index 7befd4de..983e0e64 100644 --- a/conf_arginfo.h +++ b/conf_arginfo.h @@ -32,7 +32,9 @@ ZEND_END_ARG_INFO() #define arginfo_class_RdKafka_Conf_setLogCb arginfo_class_RdKafka_Conf_setErrorCb +#ifdef HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB #define arginfo_class_RdKafka_Conf_setOauthbearerTokenRefreshCb arginfo_class_RdKafka_Conf_setErrorCb +#endif #define arginfo_class_RdKafka_TopicConf___construct arginfo_class_RdKafka_Conf___construct @@ -56,7 +58,11 @@ ZEND_METHOD(RdKafka_Conf, setRebalanceCb); ZEND_METHOD(RdKafka_Conf, setConsumeCb); ZEND_METHOD(RdKafka_Conf, setOffsetCommitCb); ZEND_METHOD(RdKafka_Conf, setLogCb); + +#ifdef HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB ZEND_METHOD(RdKafka_Conf, setOauthbearerTokenRefreshCb); +#endif + ZEND_METHOD(RdKafka_TopicConf, __construct); ZEND_METHOD(RdKafka_TopicConf, setPartitioner); @@ -73,7 +79,9 @@ static const zend_function_entry class_RdKafka_Conf_methods[] = { ZEND_ME(RdKafka_Conf, setConsumeCb, arginfo_class_RdKafka_Conf_setConsumeCb, ZEND_ACC_PUBLIC) ZEND_ME(RdKafka_Conf, setOffsetCommitCb, arginfo_class_RdKafka_Conf_setOffsetCommitCb, ZEND_ACC_PUBLIC) ZEND_ME(RdKafka_Conf, setLogCb, arginfo_class_RdKafka_Conf_setLogCb, ZEND_ACC_PUBLIC) + #ifdef HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB ZEND_ME(RdKafka_Conf, setOauthbearerTokenRefreshCb, arginfo_class_RdKafka_Conf_setOauthbearerTokenRefreshCb, ZEND_ACC_PUBLIC) + #endif ZEND_FE_END }; diff --git a/conf_legacy_arginfo.h b/conf_legacy_arginfo.h index b86438b3..6120e1cf 100644 --- a/conf_legacy_arginfo.h +++ b/conf_legacy_arginfo.h @@ -32,7 +32,9 @@ ZEND_END_ARG_INFO() #define arginfo_class_RdKafka_Conf_setLogCb arginfo_class_RdKafka_Conf_setErrorCb +#ifdef HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB #define arginfo_class_RdKafka_Conf_setOauthbearerTokenRefreshCb arginfo_class_RdKafka_Conf_setErrorCb +#endif #define arginfo_class_RdKafka_TopicConf___construct arginfo_class_RdKafka_Conf___construct @@ -56,7 +58,9 @@ ZEND_METHOD(RdKafka_Conf, setRebalanceCb); ZEND_METHOD(RdKafka_Conf, setConsumeCb); ZEND_METHOD(RdKafka_Conf, setOffsetCommitCb); ZEND_METHOD(RdKafka_Conf, setLogCb); +#ifdef HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB ZEND_METHOD(RdKafka_Conf, setOauthbearerTokenRefreshCb); +#endif ZEND_METHOD(RdKafka_TopicConf, __construct); ZEND_METHOD(RdKafka_TopicConf, setPartitioner); @@ -73,7 +77,9 @@ static const zend_function_entry class_RdKafka_Conf_methods[] = { ZEND_ME(RdKafka_Conf, setConsumeCb, arginfo_class_RdKafka_Conf_setConsumeCb, ZEND_ACC_PUBLIC) ZEND_ME(RdKafka_Conf, setOffsetCommitCb, arginfo_class_RdKafka_Conf_setOffsetCommitCb, ZEND_ACC_PUBLIC) ZEND_ME(RdKafka_Conf, setLogCb, arginfo_class_RdKafka_Conf_setLogCb, ZEND_ACC_PUBLIC) + #ifdef HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB ZEND_ME(RdKafka_Conf, setOauthbearerTokenRefreshCb, arginfo_class_RdKafka_Conf_setOauthbearerTokenRefreshCb, ZEND_ACC_PUBLIC) + #endif ZEND_FE_END };