Skip to content

Commit

Permalink
kafka protocol: code refactoring
Browse files Browse the repository at this point in the history
- Support get Attributes
- Drop support for old message formats v0 and v1
  • Loading branch information
4eUeP committed Apr 8, 2024
1 parent a3ee52e commit 495e9f8
Show file tree
Hide file tree
Showing 6 changed files with 710 additions and 500 deletions.
3 changes: 2 additions & 1 deletion hstream-kafka/HStream/Kafka/Network.hs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import HStream.Kafka.Server.Types (ServerContext (..),
initConnectionContext)
import qualified HStream.Logger as Log
import Kafka.Protocol.Encoding
import Kafka.Protocol.Error
import Kafka.Protocol.Message
import Kafka.Protocol.Service

Expand Down Expand Up @@ -387,7 +388,7 @@ runParseIO more parser = more >>= go Nothing
Done l r -> pure (r, l)
More f -> do msg <- more
go (Just f) msg
Fail _ err -> E.throwIO $ DecodeError $ "Fail, " <> err
Fail _ err -> E.throwIO $ DecodeError $ (CORRUPT_MESSAGE, "Fail, " <> err)

showSockAddrHost :: N.SockAddr -> String
showSockAddrHost (N.SockAddrUnix str) = str
Expand Down
11 changes: 6 additions & 5 deletions hstream-kafka/HStream/Kafka/Network/IO.hs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import qualified Network.Socket.ByteString as N
import qualified Network.Socket.ByteString.Lazy as NL

import Kafka.Protocol.Encoding
import Kafka.Protocol.Error
import Kafka.Protocol.Message

-- | Receive a kafka message with its request header from socket.
Expand Down Expand Up @@ -54,20 +55,20 @@ recvKafkaMsgBS peer m_more s = do
headerResult <- liftIO $ runParser @RequestHeader get reqBs
case headerResult of
Done l h -> return $ Just (h, l)
Fail _ err -> E.throw $ DecodeError $ "Fail, " <> err
More _ -> E.throw $ DecodeError $ "More"
Fail _ err -> E.throw $ DecodeError $ (CORRUPT_MESSAGE, "Fail, " <> err)
More _ -> E.throw $ DecodeError $ (CORRUPT_MESSAGE, "More")
Done l reqBs -> do
State.put l
headerResult <- liftIO $ runParser @RequestHeader get reqBs
case headerResult of
Done l' h -> return $ Just (h, l')
Fail _ err -> E.throw $ DecodeError $ "Fail, " <> err
More _ -> E.throw $ DecodeError $ "More"
Fail _ err -> E.throw $ DecodeError $ (CORRUPT_MESSAGE, "Fail, " <> err)
More _ -> E.throw $ DecodeError $ (CORRUPT_MESSAGE, "More")
More f -> do
i_new <- liftIO $ N.recv s 1024
State.put i_new
recvKafkaMsgBS peer (Just f) s
Fail _ err -> liftIO . E.throwIO $ DecodeError $ "Fail, " <> err
Fail _ err -> liftIO . E.throwIO $ DecodeError $ (CORRUPT_MESSAGE, "Fail, " <> err)

-- | Send a kafka message to socket. Note the message should be packed
-- with its response header.
Expand Down
1 change: 1 addition & 0 deletions hstream-kafka/hstream-kafka.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ library kafka-protocol
Kafka.Protocol.Encoding.Encode
Kafka.Protocol.Encoding.Internal
Kafka.Protocol.Encoding.Parser
Kafka.Protocol.Encoding.Types
Kafka.Protocol.Message.Struct
Kafka.Protocol.Message.Total

Expand Down
Loading

0 comments on commit 495e9f8

Please sign in to comment.