Skip to content

Commit

Permalink
update parse partitionId
Browse files Browse the repository at this point in the history
  • Loading branch information
YangKian committed Jan 25, 2024
1 parent 98e07a9 commit 4a3a11b
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 7 deletions.
19 changes: 13 additions & 6 deletions hstream-kafka/HStream/Kafka/Server/Core/Store.hs
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
module HStream.Kafka.Server.Core.Store where

import Control.Exception (Exception, throw)
import Control.Exception (Exception, throwIO)
import Control.Monad (foldM, forM)
import Data.Int (Int32)
import qualified Data.Map.Strict as M
import GHC.Stack (HasCallStack)
import qualified HStream.Logger as Log
import qualified HStream.Store as S
import Text.Read (readMaybe)
import qualified Z.Data.Builder as CB
import qualified Z.Data.CBytes as CB
import qualified Z.Data.Parser as CB

createTopicPartitions :: S.LDClient -> S.StreamId -> Int32 -> IO [S.C_LogID]
createTopicPartitions client streamId partitions = do
Expand All @@ -31,7 +33,12 @@ listTopicPartitions client streamId = do
partitions <- S.listStreamPartitions client streamId
foldM mapKey M.empty (M.toList partitions)
where
mapKey acc (k, v) = return $ M.insert (parsePartitionId k) v acc
parsePartitionId key = case readMaybe (CB.unpack key) of
Just i -> i
Nothing -> throw $ ParsePartitionIdError $ "Invalid partition id: " <> show key
mapKey acc (k, v) = do
partitionId <- parsePartitionId k
return $ M.insert partitionId v acc
parsePartitionId key = case CB.parse' CB.uint $ CB.build $ CB.toBuilder key of
Right i -> return i
Left e -> do
Log.fatal $ "parse partitionId error: " <> Log.build (show e)
throwIO $ ParsePartitionIdError $ "Invalid partition id: " <> show key

1 change: 0 additions & 1 deletion hstream-store/HStream/Store/Stream.hs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,6 @@ import Data.Hashable (Hashable)
import Data.Int (Int64)
import Data.IORef (IORef, atomicModifyIORef',
newIORef, readIORef)
import Data.List (sort)
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as Map
import Data.Text (Text)
Expand Down

0 comments on commit 4a3a11b

Please sign in to comment.