From 8685edf684f06415e515ae7c19a08d47232cc7d4 Mon Sep 17 00:00:00 2001 From: YangKian <1207783292@qq.com> Date: Tue, 9 Jan 2024 17:02:16 +0800 Subject: [PATCH] kafka(cli): set isolation level to read_uncommited --- hstream-kafka/HStream/Kafka/Server/Handler/Offset.hs | 5 ++++- hstream-kafka/cbits/hs_kafka_client.cpp | 1 + 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/hstream-kafka/HStream/Kafka/Server/Handler/Offset.hs b/hstream-kafka/HStream/Kafka/Server/Handler/Offset.hs index 097f5daa8..dce183211 100644 --- a/hstream-kafka/HStream/Kafka/Server/Handler/Offset.hs +++ b/hstream-kafka/HStream/Kafka/Server/Handler/Offset.hs @@ -20,6 +20,7 @@ import HStream.Kafka.Common.OffsetManager (getLatestOffset, import HStream.Kafka.Common.Utils (mapKaArray) import qualified HStream.Kafka.Group.GroupCoordinator as GC import HStream.Kafka.Server.Types (ServerContext (..)) +import qualified HStream.Logger as Log import qualified HStream.Store as S import qualified Kafka.Protocol as K import qualified Kafka.Protocol.Error as K @@ -37,7 +38,9 @@ pattern EarliestTimestamp = (-2) handleListOffsets :: ServerContext -> K.RequestContext -> K.ListOffsetsRequest -> IO K.ListOffsetsResponse handleListOffsets sc reqCtx req - | reqCtx.apiVersion >= 2 && req.isolationLevel /= 0 = return $ mkErrResponse req + | reqCtx.apiVersion >= 2 && req.isolationLevel /= 0 = do + Log.warning $ "currently only support READ_UNCOMMITED(isolationLevel = 0) request." + return $ mkErrResponse req | otherwise = listOffsets sc reqCtx req where mkErrResponse K.ListOffsetsRequest{..} = diff --git a/hstream-kafka/cbits/hs_kafka_client.cpp b/hstream-kafka/cbits/hs_kafka_client.cpp index 91d7383f9..c9a389e80 100644 --- a/hstream-kafka/cbits/hs_kafka_client.cpp +++ b/hstream-kafka/cbits/hs_kafka_client.cpp @@ -334,6 +334,7 @@ HsConsumer* hs_new_consumer(const char* brokers_, HsInt brokers_size_, CONF_SET("group.id", std::string(group_id_, group_id_size_)); CONF_SET("auto.offset.reset", std::string(offset_reset_, offset_reset_size_)); CONF_SET("enable.auto.commit", auto_commit ? "true" : "false"); + CONF_SET("isolation.level", "read_uncommitted"); std::string brokers(brokers_, brokers_size_); CONF_SET("metadata.broker.list", brokers);