From 4a3a11b9244853b659cc6a417b8938e469e62769 Mon Sep 17 00:00:00 2001 From: YangKian <1207783292@qq.com> Date: Thu, 25 Jan 2024 15:36:11 +0800 Subject: [PATCH] update parse partitionId --- .../HStream/Kafka/Server/Core/Store.hs | 19 +++++++++++++------ hstream-store/HStream/Store/Stream.hs | 1 - 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/hstream-kafka/HStream/Kafka/Server/Core/Store.hs b/hstream-kafka/HStream/Kafka/Server/Core/Store.hs index 6b4fc4878..a6185cf3d 100644 --- a/hstream-kafka/HStream/Kafka/Server/Core/Store.hs +++ b/hstream-kafka/HStream/Kafka/Server/Core/Store.hs @@ -1,13 +1,15 @@ module HStream.Kafka.Server.Core.Store where -import Control.Exception (Exception, throw) +import Control.Exception (Exception, throwIO) import Control.Monad (foldM, forM) import Data.Int (Int32) import qualified Data.Map.Strict as M import GHC.Stack (HasCallStack) +import qualified HStream.Logger as Log import qualified HStream.Store as S -import Text.Read (readMaybe) +import qualified Z.Data.Builder as CB import qualified Z.Data.CBytes as CB +import qualified Z.Data.Parser as CB createTopicPartitions :: S.LDClient -> S.StreamId -> Int32 -> IO [S.C_LogID] createTopicPartitions client streamId partitions = do @@ -31,7 +33,12 @@ listTopicPartitions client streamId = do partitions <- S.listStreamPartitions client streamId foldM mapKey M.empty (M.toList partitions) where - mapKey acc (k, v) = return $ M.insert (parsePartitionId k) v acc - parsePartitionId key = case readMaybe (CB.unpack key) of - Just i -> i - Nothing -> throw $ ParsePartitionIdError $ "Invalid partition id: " <> show key + mapKey acc (k, v) = do + partitionId <- parsePartitionId k + return $ M.insert partitionId v acc + parsePartitionId key = case CB.parse' CB.uint $ CB.build $ CB.toBuilder key of + Right i -> return i + Left e -> do + Log.fatal $ "parse partitionId error: " <> Log.build (show e) + throwIO $ ParsePartitionIdError $ "Invalid partition id: " <> show key + diff --git a/hstream-store/HStream/Store/Stream.hs b/hstream-store/HStream/Store/Stream.hs index dcb696f45..9bfe715df 100644 --- a/hstream-store/HStream/Store/Stream.hs +++ b/hstream-store/HStream/Store/Stream.hs @@ -175,7 +175,6 @@ import Data.Hashable (Hashable) import Data.Int (Int64) import Data.IORef (IORef, atomicModifyIORef', newIORef, readIORef) -import Data.List (sort) import Data.Map.Strict (Map) import qualified Data.Map.Strict as Map import Data.Text (Text)