diff --git a/hstream-kafka/HStream/Kafka/Server/Handler/Basic.hs b/hstream-kafka/HStream/Kafka/Server/Handler/Basic.hs index e0cc572c4..184680096 100644 --- a/hstream-kafka/HStream/Kafka/Server/Handler/Basic.hs +++ b/hstream-kafka/HStream/Kafka/Server/Handler/Basic.hs @@ -19,6 +19,7 @@ import Data.Functor ((<&>)) import Data.Int (Int32) import qualified Data.List as L import qualified Data.Map as Map +import Data.Maybe (fromJust) import qualified Data.Set as S import Data.Text (Text) import qualified Data.Text as Text @@ -222,11 +223,14 @@ handleMetadata ctx reqCtx req = do <> " is too large, it should be less than " <> Log.build (maxBound :: Int32) let (theNodeId :: Int32) = fromIntegral (A.serverNodeId theNode) + streamId = S.transToTopicStreamName topicName + -- The logReplicationFactor should not be Nothing, so we use fromJust here. + repfac <- fromJust . S.attrValue . S.logReplicationFactor <$> S.getStreamLogAttrs ctx.scLDClient streamId pure $ K.MetadataResponsePartition { errorCode = K.NONE , partitionIndex = (fromIntegral idx) , leaderId = theNodeId - , replicaNodes = K.KaArray $ Just (V.singleton theNodeId) -- FIXME: what should it be? + , replicaNodes = K.NonNullKaArray $ (V.replicate repfac theNodeId) -- FIXME: what should it be? , isrNodes = K.KaArray $ Just (V.singleton theNodeId) -- FIXME: what should it be? , offlineReplicas = K.KaArray $ Just V.empty -- TODO }