From 0e7bf765185e3bd512c8ab2ff691d1a74ab5a71c Mon Sep 17 00:00:00 2001 From: YangKian <1207783292@qq.com> Date: Thu, 25 Jan 2024 15:51:09 +0800 Subject: [PATCH] replace remained listStreamPartitionsOrdered to listTopicPartitions --- hstream-kafka/HStream/Kafka/Server/Handler/Consume.hs | 10 +++++----- hstream-kafka/HStream/Kafka/Server/Handler/Offset.hs | 8 +++++--- hstream-kafka/HStream/Kafka/Server/Handler/Produce.hs | 7 ++++--- hstream-kafka/HStream/Kafka/Server/Handler/Topic.hs | 5 +++-- 4 files changed, 17 insertions(+), 13 deletions(-) diff --git a/hstream-kafka/HStream/Kafka/Server/Handler/Consume.hs b/hstream-kafka/HStream/Kafka/Server/Handler/Consume.hs index 2d1670b31..fcb26d07d 100644 --- a/hstream-kafka/HStream/Kafka/Server/Handler/Consume.hs +++ b/hstream-kafka/HStream/Kafka/Server/Handler/Consume.hs @@ -11,6 +11,7 @@ import qualified Data.ByteString as BS import qualified Data.ByteString.Builder as BB import Data.Either (isRight) import Data.Int +import qualified Data.Map.Strict as Map import Data.Maybe import qualified Data.Text as T import qualified Data.Vector as V @@ -25,6 +26,7 @@ import qualified HStream.Kafka.Common.OffsetManager as K import qualified HStream.Kafka.Common.RecordFormat as K import HStream.Kafka.Server.Config (ServerOpts (..), StorageOptions (..)) +import HStream.Kafka.Server.Core.Store (listTopicPartitions) import HStream.Kafka.Server.Types (ServerContext (..)) import qualified HStream.Logger as Log import qualified HStream.Store as S @@ -68,19 +70,17 @@ handleFetch ServerContext{..} _ r = K.catchFetchResponseEx $ do topics <- V.forM topicReqs $ \t{- K.FetchTopic -} -> do -- Partition should be non-empty let K.NonNullKaArray partitionReqs = t.partitions - orderedParts <- S.listStreamPartitionsOrdered scLDClient - (S.transToTopicStreamName t.topic) + partitions <- listTopicPartitions scLDClient (S.transToTopicStreamName t.topic) ps <- V.forM partitionReqs $ \p{- K.FetchPartition -} -> do M.withLabel M.totalConsumeRequest (t.topic, T.pack . show $ p.partition) $ \counter -> void $ M.addCounter counter 1 - let m_logid = orderedParts V.!? fromIntegral p.partition - case m_logid of + case Map.lookup p.partition partitions of Nothing -> do let elsn = errorPartitionResponse p.partition K.UNKNOWN_TOPIC_OR_PARTITION -- Actually, the logid should be Nothing but 0, however, we won't -- use it, so just set it to 0 pure $ Partition 0 (Left elsn) p - Just (_, logid) -> do + Just logid -> do elsn <- getPartitionLsn scLDClient scOffsetManager logid p.partition p.fetchOffset when (isRight elsn) $ void $ atomicFetchAddFastMut mutNumOfReads 1 diff --git a/hstream-kafka/HStream/Kafka/Server/Handler/Offset.hs b/hstream-kafka/HStream/Kafka/Server/Handler/Offset.hs index dce183211..d71824c7b 100644 --- a/hstream-kafka/HStream/Kafka/Server/Handler/Offset.hs +++ b/hstream-kafka/HStream/Kafka/Server/Handler/Offset.hs @@ -9,6 +9,7 @@ module HStream.Kafka.Server.Handler.Offset where import Data.Int (Int64) +import qualified Data.Map.Strict as Map import Data.Maybe (fromMaybe) import Data.Text (Text) import Data.Vector (Vector) @@ -19,6 +20,7 @@ import HStream.Kafka.Common.OffsetManager (getLatestOffset, getOldestOffset) import HStream.Kafka.Common.Utils (mapKaArray) import qualified HStream.Kafka.Group.GroupCoordinator as GC +import HStream.Kafka.Server.Core.Store (listTopicPartitions) import HStream.Kafka.Server.Types (ServerContext (..)) import qualified HStream.Logger as Log import qualified HStream.Store as S @@ -77,11 +79,11 @@ listOffsetTopicPartitions listOffsetTopicPartitions _ topicName Nothing = do return $ K.ListOffsetsTopicResponse {partitions = K.KaArray {unKaArray = Nothing}, name = topicName} listOffsetTopicPartitions ServerContext{..} topicName (Just offsetsPartitions) = do - orderedParts <- S.listStreamPartitionsOrdered scLDClient (S.transToTopicStreamName topicName) + partitions <- listTopicPartitions scLDClient (S.transToTopicStreamName topicName) res <- V.forM offsetsPartitions $ \K.ListOffsetsPartition{..} -> do -- TODO: handle Nothing - let partition = orderedParts V.! (fromIntegral partitionIndex) - offset <- getOffset (snd partition) timestamp + let logId = partitions Map.! partitionIndex + offset <- getOffset logId timestamp return $ K.ListOffsetsPartitionResponse { offset = offset , timestamp = timestamp diff --git a/hstream-kafka/HStream/Kafka/Server/Handler/Produce.hs b/hstream-kafka/HStream/Kafka/Server/Handler/Produce.hs index 23d6834d3..25771731b 100644 --- a/hstream-kafka/HStream/Kafka/Server/Handler/Produce.hs +++ b/hstream-kafka/HStream/Kafka/Server/Handler/Produce.hs @@ -8,6 +8,7 @@ import Control.Monad import Data.ByteString (ByteString) import qualified Data.ByteString as BS import Data.Int +import qualified Data.Map.Strict as Map import Data.Maybe (fromMaybe) import Data.Text (Text) import qualified Data.Text as T @@ -17,6 +18,7 @@ import Data.Word import qualified HStream.Kafka.Common.Metrics as M import qualified HStream.Kafka.Common.OffsetManager as K import qualified HStream.Kafka.Common.RecordFormat as K +import HStream.Kafka.Server.Core.Store (listTopicPartitions) import HStream.Kafka.Server.Types (ServerContext (..)) import qualified HStream.Logger as Log import qualified HStream.Store as S @@ -52,15 +54,14 @@ handleProduce ServerContext{..} reqCtx req = do responses <- V.forM topicData $ \topic{- TopicProduceData -} -> do -- A topic is a stream. Here we donot need to check the topic existence, -- because the metadata api already does(?) - partitions <- S.listStreamPartitionsOrdered - scLDClient (S.transToTopicStreamName topic.name) + partitions <- listTopicPartitions scLDClient (S.transToTopicStreamName topic.name) let partitionData = fromMaybe V.empty (K.unKaArray topic.partitionData) -- TODO: limit total concurrencies ? let loopPart = if V.length partitionData > 1 then Async.forConcurrently else V.forM partitionResponses <- loopPart partitionData $ \partition -> do - let Just (_, logid) = partitions V.!? (fromIntegral partition.index) -- TODO: handle Nothing + let Just logid = Map.lookup partition.index partitions -- TODO: handle Nothing M.withLabel M.totalProduceRequest (topic.name, T.pack . show $ partition.index) $ \counter -> diff --git a/hstream-kafka/HStream/Kafka/Server/Handler/Topic.hs b/hstream-kafka/HStream/Kafka/Server/Handler/Topic.hs index f9099324c..4f910cc97 100644 --- a/hstream-kafka/HStream/Kafka/Server/Handler/Topic.hs +++ b/hstream-kafka/HStream/Kafka/Server/Handler/Topic.hs @@ -22,6 +22,7 @@ import qualified Data.Text as T import qualified Data.Vector as V import HStream.Kafka.Common.OffsetManager (cleanOffsetCache) import qualified HStream.Kafka.Common.Utils as Utils +import HStream.Kafka.Server.Core.Store (listTopicPartitions) import HStream.Kafka.Server.Core.Topic (createPartitions) import qualified HStream.Kafka.Server.Core.Topic as Core import HStream.Kafka.Server.Types (ServerContext (..)) @@ -152,8 +153,8 @@ handleDeleteTopics ServerContext{..} _ K.DeleteTopicsRequest{..} = -- -- XXX: Normally we do not need to delete this because the logid is a -- random number and will unlikely be reused. - partitions <- S.listStreamPartitionsOrdered scLDClient streamId - V.forM_ partitions $ \(_, logid) -> + logIds <- Map.elems <$> listTopicPartitions scLDClient streamId + forM_ logIds $ \logid -> cleanOffsetCache scOffsetManager logid S.removeStream scLDClient streamId return $ K.DeletableTopicResult topicName K.NONE