Skip to content

Commit

Permalink
replace remained listStreamPartitionsOrdered to listTopicPartitions
Browse files Browse the repository at this point in the history
  • Loading branch information
YangKian committed Jan 25, 2024
1 parent 4a3a11b commit 0e7bf76
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 13 deletions.
10 changes: 5 additions & 5 deletions hstream-kafka/HStream/Kafka/Server/Handler/Consume.hs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import qualified Data.ByteString as BS
import qualified Data.ByteString.Builder as BB
import Data.Either (isRight)
import Data.Int
import qualified Data.Map.Strict as Map
import Data.Maybe
import qualified Data.Text as T
import qualified Data.Vector as V
Expand All @@ -25,6 +26,7 @@ import qualified HStream.Kafka.Common.OffsetManager as K
import qualified HStream.Kafka.Common.RecordFormat as K
import HStream.Kafka.Server.Config (ServerOpts (..),
StorageOptions (..))
import HStream.Kafka.Server.Core.Store (listTopicPartitions)
import HStream.Kafka.Server.Types (ServerContext (..))
import qualified HStream.Logger as Log
import qualified HStream.Store as S
Expand Down Expand Up @@ -68,19 +70,17 @@ handleFetch ServerContext{..} _ r = K.catchFetchResponseEx $ do
topics <- V.forM topicReqs $ \t{- K.FetchTopic -} -> do
-- Partition should be non-empty
let K.NonNullKaArray partitionReqs = t.partitions
orderedParts <- S.listStreamPartitionsOrdered scLDClient
(S.transToTopicStreamName t.topic)
partitions <- listTopicPartitions scLDClient (S.transToTopicStreamName t.topic)
ps <- V.forM partitionReqs $ \p{- K.FetchPartition -} -> do
M.withLabel M.totalConsumeRequest (t.topic, T.pack . show $ p.partition) $
\counter -> void $ M.addCounter counter 1
let m_logid = orderedParts V.!? fromIntegral p.partition
case m_logid of
case Map.lookup p.partition partitions of
Nothing -> do
let elsn = errorPartitionResponse p.partition K.UNKNOWN_TOPIC_OR_PARTITION
-- Actually, the logid should be Nothing but 0, however, we won't
-- use it, so just set it to 0
pure $ Partition 0 (Left elsn) p
Just (_, logid) -> do
Just logid -> do
elsn <- getPartitionLsn scLDClient scOffsetManager logid p.partition
p.fetchOffset
when (isRight elsn) $ void $ atomicFetchAddFastMut mutNumOfReads 1
Expand Down
8 changes: 5 additions & 3 deletions hstream-kafka/HStream/Kafka/Server/Handler/Offset.hs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ module HStream.Kafka.Server.Handler.Offset
where

import Data.Int (Int64)
import qualified Data.Map.Strict as Map
import Data.Maybe (fromMaybe)
import Data.Text (Text)
import Data.Vector (Vector)
Expand All @@ -19,6 +20,7 @@ import HStream.Kafka.Common.OffsetManager (getLatestOffset,
getOldestOffset)
import HStream.Kafka.Common.Utils (mapKaArray)
import qualified HStream.Kafka.Group.GroupCoordinator as GC
import HStream.Kafka.Server.Core.Store (listTopicPartitions)
import HStream.Kafka.Server.Types (ServerContext (..))
import qualified HStream.Logger as Log
import qualified HStream.Store as S
Expand Down Expand Up @@ -77,11 +79,11 @@ listOffsetTopicPartitions
listOffsetTopicPartitions _ topicName Nothing = do
return $ K.ListOffsetsTopicResponse {partitions = K.KaArray {unKaArray = Nothing}, name = topicName}
listOffsetTopicPartitions ServerContext{..} topicName (Just offsetsPartitions) = do
orderedParts <- S.listStreamPartitionsOrdered scLDClient (S.transToTopicStreamName topicName)
partitions <- listTopicPartitions scLDClient (S.transToTopicStreamName topicName)
res <- V.forM offsetsPartitions $ \K.ListOffsetsPartition{..} -> do
-- TODO: handle Nothing
let partition = orderedParts V.! (fromIntegral partitionIndex)
offset <- getOffset (snd partition) timestamp
let logId = partitions Map.! partitionIndex
offset <- getOffset logId timestamp
return $ K.ListOffsetsPartitionResponse
{ offset = offset
, timestamp = timestamp
Expand Down
7 changes: 4 additions & 3 deletions hstream-kafka/HStream/Kafka/Server/Handler/Produce.hs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import Control.Monad
import Data.ByteString (ByteString)
import qualified Data.ByteString as BS
import Data.Int
import qualified Data.Map.Strict as Map
import Data.Maybe (fromMaybe)
import Data.Text (Text)
import qualified Data.Text as T
Expand All @@ -17,6 +18,7 @@ import Data.Word
import qualified HStream.Kafka.Common.Metrics as M
import qualified HStream.Kafka.Common.OffsetManager as K
import qualified HStream.Kafka.Common.RecordFormat as K
import HStream.Kafka.Server.Core.Store (listTopicPartitions)
import HStream.Kafka.Server.Types (ServerContext (..))
import qualified HStream.Logger as Log
import qualified HStream.Store as S
Expand Down Expand Up @@ -52,15 +54,14 @@ handleProduce ServerContext{..} reqCtx req = do
responses <- V.forM topicData $ \topic{- TopicProduceData -} -> do
-- A topic is a stream. Here we donot need to check the topic existence,
-- because the metadata api already does(?)
partitions <- S.listStreamPartitionsOrdered
scLDClient (S.transToTopicStreamName topic.name)
partitions <- listTopicPartitions scLDClient (S.transToTopicStreamName topic.name)
let partitionData = fromMaybe V.empty (K.unKaArray topic.partitionData)
-- TODO: limit total concurrencies ?
let loopPart = if V.length partitionData > 1
then Async.forConcurrently
else V.forM
partitionResponses <- loopPart partitionData $ \partition -> do
let Just (_, logid) = partitions V.!? (fromIntegral partition.index) -- TODO: handle Nothing
let Just logid = Map.lookup partition.index partitions -- TODO: handle Nothing
M.withLabel
M.totalProduceRequest
(topic.name, T.pack . show $ partition.index) $ \counter ->
Expand Down
5 changes: 3 additions & 2 deletions hstream-kafka/HStream/Kafka/Server/Handler/Topic.hs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import qualified Data.Text as T
import qualified Data.Vector as V
import HStream.Kafka.Common.OffsetManager (cleanOffsetCache)
import qualified HStream.Kafka.Common.Utils as Utils
import HStream.Kafka.Server.Core.Store (listTopicPartitions)
import HStream.Kafka.Server.Core.Topic (createPartitions)
import qualified HStream.Kafka.Server.Core.Topic as Core
import HStream.Kafka.Server.Types (ServerContext (..))
Expand Down Expand Up @@ -152,8 +153,8 @@ handleDeleteTopics ServerContext{..} _ K.DeleteTopicsRequest{..} =
--
-- XXX: Normally we do not need to delete this because the logid is a
-- random number and will unlikely be reused.
partitions <- S.listStreamPartitionsOrdered scLDClient streamId
V.forM_ partitions $ \(_, logid) ->
logIds <- Map.elems <$> listTopicPartitions scLDClient streamId
forM_ logIds $ \logid ->
cleanOffsetCache scOffsetManager logid
S.removeStream scLDClient streamId
return $ K.DeletableTopicResult topicName K.NONE
Expand Down

0 comments on commit 0e7bf76

Please sign in to comment.