From 05150c781658b3851692adf571e548f1417b5ac7 Mon Sep 17 00:00:00 2001 From: mu <59917266+4eUeP@users.noreply.github.com> Date: Fri, 12 Apr 2024 17:19:29 +0800 Subject: [PATCH] tmp --- .../test/Kafka/Protocol/EncodingSpec.hs | 518 +++++++++--------- 1 file changed, 259 insertions(+), 259 deletions(-) diff --git a/hstream-kafka/protocol/test/Kafka/Protocol/EncodingSpec.hs b/hstream-kafka/protocol/test/Kafka/Protocol/EncodingSpec.hs index 9edac7739..b75889ba1 100644 --- a/hstream-kafka/protocol/test/Kafka/Protocol/EncodingSpec.hs +++ b/hstream-kafka/protocol/test/Kafka/Protocol/EncodingSpec.hs @@ -26,9 +26,9 @@ import Kafka.QuickCheck.Instances () spec :: Spec spec = do baseSpec - genericSpec - realSpec - otherSpec + --genericSpec + --realSpec + --otherSpec encodingProp :: (Eq a, Show a, Serializable a) => a -> Property encodingProp x = ioProperty $ (x ===) <$> runGet (runPut x) @@ -78,259 +78,259 @@ baseSpec = describe "Kafka.Protocol.Encoding" $ do ------------------------------------------------------------------------------- -data SomeMsg = SomeMsg - { msgA :: Int32 - , msgB :: NullableString - , msgC :: VarInt32 - , msgD :: KaArray Int32 - , msgE :: KaArray (KaArray CompactBytes) - } deriving (Show, Eq, Generic) - -instance Serializable SomeMsg - -putSomeMsg :: SomeMsg -> ByteString -putSomeMsg SomeMsg{..} = - let msg = put msgA <> put msgB <> put msgC <> put msgD <> put msgE - in BL.toStrict $ toLazyByteString msg - -getSomeMsg :: ByteString -> IO SomeMsg -getSomeMsg bs = - let p = SomeMsg <$> get <*> get <*> get <*> get <*> get - in do result <- runParser p bs - case result of - Done "" r -> pure r - Done l _ -> throwIO $ DecodeError $ "Done, but left " <> show l - Fail _ err -> throwIO $ DecodeError $ "Fail, " <> err - More _ -> throwIO $ DecodeError "Need more" - -genericSpec :: Spec -genericSpec = describe "Kafka.Protocol.Encoding" $ do - it "Generic instance" $ - let someMsg = SomeMsg 10 (Just "x") 10 - (KaArray $ Just [1, 1]) - (KaArray $ Just [KaArray $ Just ["x"]]) - in do runGet (runPut someMsg) `shouldReturn` someMsg - runPut someMsg `shouldBe` putSomeMsg someMsg - getSomeMsg (runPut someMsg) `shouldReturn` someMsg - runGet (putSomeMsg someMsg) `shouldReturn` someMsg - -------------------------------------------------------------------------------- - --- Real world tests -realSpec :: Spec -realSpec = describe "Kafka.Protocol.Encoding" $ do - it "From real world kafka-python: request header v1" $ do - let clientReqBs = "\NUL\NUL\NUL \NUL\DC2\NUL\NUL\NUL\NUL\NUL\SOH\NUL\SYN" - <> "kafka-python-2.0.3-dev" - let reqHeader = RequestHeader (ApiKey 18) 0 1 - (Just $ Just "kafka-python-2.0.3-dev") - Nothing - reqBody = ApiVersionsRequestV0 - reqHeaderBs = runPut reqHeader - reqBodyBs = runPut reqBody - reqBs = reqHeaderBs <> reqBodyBs - reqLen = fromIntegral (BS.length reqBs) :: Int32 - (runPut reqLen <> reqBs) `shouldBe` clientReqBs - runGet clientReqBs `shouldReturn` (reqLen, reqHeader, reqBody) - - it "From real world confluent-kafka-python: request header v2" $ do - let clientReqBs = "\NUL\NUL\NULM\NUL\DC2\NUL\ETX\NUL\NUL\NUL\SOH\NUL\SYN" - <> "confluent_kafka_client\NUL\ETBconfluent-kafka-python\DC4" - <> "2.2.0-rdkafka-2.2.0\NUL" - let reqHeader = RequestHeader - (ApiKey 18) 3 1 - (Just $ Just "confluent_kafka_client") - (Just EmptyTaggedFields) - reqBody = ApiVersionsRequestV3 - (CompactString "confluent-kafka-python") - (CompactString "2.2.0-rdkafka-2.2.0") - EmptyTaggedFields - reqHeaderBs = runPut reqHeader - reqBodyBs = runPut reqBody - reqBs = reqHeaderBs <> reqBodyBs - reqLen = fromIntegral (BS.length reqBs) :: Int32 - - (runPut reqLen <> reqBs) `shouldBe` clientReqBs - runGet clientReqBs `shouldReturn` (reqLen, reqHeader, reqBody) - - -- Example of how a record batch looks like: - -- - -- Total length: 82 - -- - -- "\NUL\NUL\NUL\NUL\NUL\NUL\NUL\NUL" -> baseOffset(8): 0 - -- "\NUL\NUL\NULF" -> batchLength(4): 70 - -- "\NUL\NUL\NUL\NUL" -> partitionLeaderEpoch(4): 0 - -- "\STX" -> magic(1): 2 - -- "\135\169\186k" -> crc(4): -2018919829 - -- "\NUL\NUL" -> attributes(2): 0 - -- "\NUL\NUL\NUL\NUL" -> lastOffsetDelta(4): 0 - -- "\NUL\NUL\SOH\141\ETB\SO\167\ETX" -> baseTimestamp(8): 1705488852739 - -- "\NUL\NUL\SOH\141\ETB\SO\167\ETX" -> maxTimestamp(8): 1705488852739 - -- "\255\255\255\255\255\255\255\255" -> producerId(8): -1 - -- "\255\255" -> producerEpoch(2): -1 - -- "\255\255\255\255" -> baseSequence(4): -1 - -- "\NUL\NUL\NUL\SOH(\NUL\NUL\NUL\SOH\FSsome_message_0\NUL" -> [Records] - -- - -- Records: - -- - -- "\NUL\NUL\NUL\SOH" -> Arraylength(4): 1 - -- "(" -> length(varint): 20 - -- ... - it "Message Format V2 (Record Batch)" $ do - let clientReqBs = - "\NUL\NUL\NUL\NUL\NUL\NUL\NUL\NUL\NUL\NUL\NUL\139\NUL\NUL\NUL\NUL" - <> "\STX\164\239\196*\NUL\NUL\NUL\NUL\NUL\SOH\NUL\NUL\SOH\140\&3~\176" - <> "\DLE\NUL\NUL\SOH\140\&3~\176\DLE\255\255\255\255\255\255\255\255" - <> "\255\255\255\255\255\255\NUL\NUL\NUL\STXX\NUL\NUL\NUL\SOH\FS" - <> "some_message_0\EOT\SOheader1\ACKfoo\SOheader2\ACKbarX\NUL\NUL\STX" - <> "\SOH\FSsome_message_1\EOT\SOheader1\ACKfoo\SOheader2\ACKbar" - reqRecords = KaArray $ Just $ - [ RecordV2{ length = 44 - , attributes = 0 - , timestampDelta = 0 - , offsetDelta = 0 - , key = RecordKey Nothing - , value = RecordValue $ Just "some_message_0" - , headers = RecordArray $ - [ ("header1", RecordHeaderValue $ Just "foo") - , ("header2", RecordHeaderValue $ Just "bar") - ] - } - , RecordV2{ length = 44 - , attributes = 0 - , timestampDelta = 0 - , offsetDelta = 1 - , key = RecordKey Nothing - , value = RecordValue $ Just "some_message_1" - , headers = RecordArray $ - [ ("header1", RecordHeaderValue $ Just "foo") - , ("header2", RecordHeaderValue $ Just "bar") - ] - } - ] - reqRecordBatch = RecordBatch - { baseOffset = 0 - , batchLength = 139 - , partitionLeaderEpoch = 0 - , magic = 2 - , crc = -1527790550 - , attributes = 0 - , lastOffsetDelta = 1 - , baseTimestamp = 1701670989840 - , maxTimestamp = 1701670989840 - , producerId = -1 - , producerEpoch = -1 - , baseSequence = -1 - , records = reqRecords - } - clientReq = [BatchRecordV2 reqRecordBatch] - decodeBatchRecords True clientReqBs `shouldReturn` clientReq - encodeBatchRecords clientReq `shouldBe` clientReqBs - -------------------------------------------------------------------------------- - -otherSpec :: Spec -otherSpec = describe "Kafka.Protocol.Encoding" $ do - - it "decodeNextRecordOffset v2" $ do - let bs = fakeEncodeBatchV2 10 "some_message" 10 - len = BS.length bs - decodeNextRecordOffset bs `shouldReturn` (Just 20) - forM_ @[] [0..len-1] $ \i -> do - let bs' = BS.take i bs - decodeNextRecordOffset bs' `shouldReturn` Nothing - - -- Actually, this is not possible, because kafka client won't send - it "[optional]decodeNextRecordOffset v2: empty batch" $ do - let bs = fakeEncodeBatchV2 10 "some_message" 0 - len = BS.length bs - decodeNextRecordOffset bs `shouldReturn` (Just 10) - forM_ @[] [0..len-1] $ \i -> do - let bs' = BS.take i bs - decodeNextRecordOffset bs' `shouldReturn` Nothing - - -- Actually, kafka client won't send multiple batches, but we still - -- support it. - it "[optional] decodeNextRecordOffset v2: multiple batches" $ do - let bs = fakeEncodeBatchV2 10 "some_message" 1 - <> fakeEncodeBatchV2 11 "some_message" 1 - decodeNextRecordOffset bs `shouldReturn` (Just 12) - - it "decodeNextRecordOffset v1" $ do - let bs1 = fakeEncodeBatchV1 10 "some_message" 1 - len1 = BS.length bs1 - bs2 = fakeEncodeBatchV1 11 "some_message" 9 - bs = bs1 <> bs2 - decodeNextRecordOffset bs `shouldReturn` (Just 20) - forM_ @[] [0..len1-1] $ \i -> do - let bs' = BS.take i bs - decodeNextRecordOffset bs' `shouldReturn` Nothing - decodeNextRecordOffset bs1 `shouldReturn` (Just 11) - forM_ @[] [len1+1..len1*2 - 1] $ \i -> do - let bs' = BS.take i bs - decodeNextRecordOffset bs' `shouldReturn` (Just 11) - decodeNextRecordOffset (BS.take (len1*2) bs) `shouldReturn` (Just 12) - -fakeEncodeBatchV2 - :: Int64 - -- ^ baseOffset - -> ByteString - -- ^ Value - -> Int - -- ^ Number of records - -> ByteString -fakeEncodeBatchV2 baseOffset value n = - let records = (flip map) [0..n-1] $ \i -> - let b = put $ - RecordV2_{ attributes = 0 - , timestampDelta = 0 - , offsetDelta = VarInt32 (fromIntegral i) - , key = RecordKey Nothing - , value = RecordValue $ Just value - , headers = RecordArray $ - [ ("header1", RecordHeaderValue $ Just "foo") - , ("header2", RecordHeaderValue $ Just "bar") - ] - } - in put @VarInt32 (fromIntegral $ builderLength b) <> b - recordsBuiler = put @Int32 (fromIntegral $ Prelude.length records) - <> foldl (<>) mempty records - batch = RecordBatch_ - { baseOffset = baseOffset - , batchLength = 0 -- Will be filled later - , partitionLeaderEpoch = 0 - , magic = 2 - , crc = 0 -- Will be filled later - , attributes = 0 - , lastOffsetDelta = fromIntegral n - , baseTimestamp = 1701670989840 - , maxTimestamp = 1701670989840 - , producerId = -1 - , producerEpoch = -1 - , baseSequence = -1 - } - bs = BL.toStrict $ toLazyByteString $ put batch <> recordsBuiler - in unsafePerformIO $ do unsafeAlterRecordBatchBs bs >> pure bs - -fakeEncodeBatchV1 - :: Int64 - -- ^ baseOffset - -> ByteString - -- ^ Value - -> Int - -- ^ Number of records - -> ByteString -fakeEncodeBatchV1 baseOffset value n = - (flip foldMap) ([0..n-1] :: [Int]) $ \i -> - unsafePerformIO $ - let bs = runPut $ RecordV1 - { baseOffset = baseOffset + fromIntegral i - , messageSize = 0 -- Will be filled later - , crc = 0 -- Will be filled later - , magic = 1 - , attributes = 0 - , timestamp = 1701670989840 - , key = Just "somekey" - , value = Just value - } - in unsafeAlterMessageSetBs bs >> pure bs +-- data SomeMsg = SomeMsg +-- { msgA :: Int32 +-- , msgB :: NullableString +-- , msgC :: VarInt32 +-- , msgD :: KaArray Int32 +-- , msgE :: KaArray (KaArray CompactBytes) +-- } deriving (Show, Eq, Generic) +-- +-- instance Serializable SomeMsg +-- +-- putSomeMsg :: SomeMsg -> ByteString +-- putSomeMsg SomeMsg{..} = +-- let msg = put msgA <> put msgB <> put msgC <> put msgD <> put msgE +-- in BL.toStrict $ toLazyByteString msg +-- +-- getSomeMsg :: ByteString -> IO SomeMsg +-- getSomeMsg bs = +-- let p = SomeMsg <$> get <*> get <*> get <*> get <*> get +-- in do result <- runParser p bs +-- case result of +-- Done "" r -> pure r +-- Done l _ -> throwIO $ DecodeError $ "Done, but left " <> show l +-- Fail _ err -> throwIO $ DecodeError $ "Fail, " <> err +-- More _ -> throwIO $ DecodeError "Need more" +-- +-- genericSpec :: Spec +-- genericSpec = describe "Kafka.Protocol.Encoding" $ do +-- it "Generic instance" $ +-- let someMsg = SomeMsg 10 (Just "x") 10 +-- (KaArray $ Just [1, 1]) +-- (KaArray $ Just [KaArray $ Just ["x"]]) +-- in do runGet (runPut someMsg) `shouldReturn` someMsg +-- runPut someMsg `shouldBe` putSomeMsg someMsg +-- getSomeMsg (runPut someMsg) `shouldReturn` someMsg +-- runGet (putSomeMsg someMsg) `shouldReturn` someMsg +-- +-- ------------------------------------------------------------------------------- +-- +-- -- Real world tests +-- realSpec :: Spec +-- realSpec = describe "Kafka.Protocol.Encoding" $ do +-- it "From real world kafka-python: request header v1" $ do +-- let clientReqBs = "\NUL\NUL\NUL \NUL\DC2\NUL\NUL\NUL\NUL\NUL\SOH\NUL\SYN" +-- <> "kafka-python-2.0.3-dev" +-- let reqHeader = RequestHeader (ApiKey 18) 0 1 +-- (Just $ Just "kafka-python-2.0.3-dev") +-- Nothing +-- reqBody = ApiVersionsRequestV0 +-- reqHeaderBs = runPut reqHeader +-- reqBodyBs = runPut reqBody +-- reqBs = reqHeaderBs <> reqBodyBs +-- reqLen = fromIntegral (BS.length reqBs) :: Int32 +-- (runPut reqLen <> reqBs) `shouldBe` clientReqBs +-- runGet clientReqBs `shouldReturn` (reqLen, reqHeader, reqBody) +-- +-- it "From real world confluent-kafka-python: request header v2" $ do +-- let clientReqBs = "\NUL\NUL\NULM\NUL\DC2\NUL\ETX\NUL\NUL\NUL\SOH\NUL\SYN" +-- <> "confluent_kafka_client\NUL\ETBconfluent-kafka-python\DC4" +-- <> "2.2.0-rdkafka-2.2.0\NUL" +-- let reqHeader = RequestHeader +-- (ApiKey 18) 3 1 +-- (Just $ Just "confluent_kafka_client") +-- (Just EmptyTaggedFields) +-- reqBody = ApiVersionsRequestV3 +-- (CompactString "confluent-kafka-python") +-- (CompactString "2.2.0-rdkafka-2.2.0") +-- EmptyTaggedFields +-- reqHeaderBs = runPut reqHeader +-- reqBodyBs = runPut reqBody +-- reqBs = reqHeaderBs <> reqBodyBs +-- reqLen = fromIntegral (BS.length reqBs) :: Int32 +-- +-- (runPut reqLen <> reqBs) `shouldBe` clientReqBs +-- runGet clientReqBs `shouldReturn` (reqLen, reqHeader, reqBody) +-- +-- -- Example of how a record batch looks like: +-- -- +-- -- Total length: 82 +-- -- +-- -- "\NUL\NUL\NUL\NUL\NUL\NUL\NUL\NUL" -> baseOffset(8): 0 +-- -- "\NUL\NUL\NULF" -> batchLength(4): 70 +-- -- "\NUL\NUL\NUL\NUL" -> partitionLeaderEpoch(4): 0 +-- -- "\STX" -> magic(1): 2 +-- -- "\135\169\186k" -> crc(4): -2018919829 +-- -- "\NUL\NUL" -> attributes(2): 0 +-- -- "\NUL\NUL\NUL\NUL" -> lastOffsetDelta(4): 0 +-- -- "\NUL\NUL\SOH\141\ETB\SO\167\ETX" -> baseTimestamp(8): 1705488852739 +-- -- "\NUL\NUL\SOH\141\ETB\SO\167\ETX" -> maxTimestamp(8): 1705488852739 +-- -- "\255\255\255\255\255\255\255\255" -> producerId(8): -1 +-- -- "\255\255" -> producerEpoch(2): -1 +-- -- "\255\255\255\255" -> baseSequence(4): -1 +-- -- "\NUL\NUL\NUL\SOH(\NUL\NUL\NUL\SOH\FSsome_message_0\NUL" -> [Records] +-- -- +-- -- Records: +-- -- +-- -- "\NUL\NUL\NUL\SOH" -> Arraylength(4): 1 +-- -- "(" -> length(varint): 20 +-- -- ... +-- it "Message Format V2 (Record Batch)" $ do +-- let clientReqBs = +-- "\NUL\NUL\NUL\NUL\NUL\NUL\NUL\NUL\NUL\NUL\NUL\139\NUL\NUL\NUL\NUL" +-- <> "\STX\164\239\196*\NUL\NUL\NUL\NUL\NUL\SOH\NUL\NUL\SOH\140\&3~\176" +-- <> "\DLE\NUL\NUL\SOH\140\&3~\176\DLE\255\255\255\255\255\255\255\255" +-- <> "\255\255\255\255\255\255\NUL\NUL\NUL\STXX\NUL\NUL\NUL\SOH\FS" +-- <> "some_message_0\EOT\SOheader1\ACKfoo\SOheader2\ACKbarX\NUL\NUL\STX" +-- <> "\SOH\FSsome_message_1\EOT\SOheader1\ACKfoo\SOheader2\ACKbar" +-- reqRecords = KaArray $ Just $ +-- [ RecordV2{ length = 44 +-- , attributes = 0 +-- , timestampDelta = 0 +-- , offsetDelta = 0 +-- , key = RecordKey Nothing +-- , value = RecordValue $ Just "some_message_0" +-- , headers = RecordArray $ +-- [ ("header1", RecordHeaderValue $ Just "foo") +-- , ("header2", RecordHeaderValue $ Just "bar") +-- ] +-- } +-- , RecordV2{ length = 44 +-- , attributes = 0 +-- , timestampDelta = 0 +-- , offsetDelta = 1 +-- , key = RecordKey Nothing +-- , value = RecordValue $ Just "some_message_1" +-- , headers = RecordArray $ +-- [ ("header1", RecordHeaderValue $ Just "foo") +-- , ("header2", RecordHeaderValue $ Just "bar") +-- ] +-- } +-- ] +-- reqRecordBatch = RecordBatch +-- { baseOffset = 0 +-- , batchLength = 139 +-- , partitionLeaderEpoch = 0 +-- , magic = 2 +-- , crc = -1527790550 +-- , attributes = 0 +-- , lastOffsetDelta = 1 +-- , baseTimestamp = 1701670989840 +-- , maxTimestamp = 1701670989840 +-- , producerId = -1 +-- , producerEpoch = -1 +-- , baseSequence = -1 +-- , records = reqRecords +-- } +-- clientReq = [BatchRecordV2 reqRecordBatch] +-- decodeBatchRecords True clientReqBs `shouldReturn` clientReq +-- encodeBatchRecords clientReq `shouldBe` clientReqBs +-- +-- ------------------------------------------------------------------------------- +-- +-- otherSpec :: Spec +-- otherSpec = describe "Kafka.Protocol.Encoding" $ do +-- +-- it "decodeNextRecordOffset v2" $ do +-- let bs = fakeEncodeBatchV2 10 "some_message" 10 +-- len = BS.length bs +-- decodeNextRecordOffset bs `shouldReturn` (Just 20) +-- forM_ @[] [0..len-1] $ \i -> do +-- let bs' = BS.take i bs +-- decodeNextRecordOffset bs' `shouldReturn` Nothing +-- +-- -- Actually, this is not possible, because kafka client won't send +-- it "[optional]decodeNextRecordOffset v2: empty batch" $ do +-- let bs = fakeEncodeBatchV2 10 "some_message" 0 +-- len = BS.length bs +-- decodeNextRecordOffset bs `shouldReturn` (Just 10) +-- forM_ @[] [0..len-1] $ \i -> do +-- let bs' = BS.take i bs +-- decodeNextRecordOffset bs' `shouldReturn` Nothing +-- +-- -- Actually, kafka client won't send multiple batches, but we still +-- -- support it. +-- it "[optional] decodeNextRecordOffset v2: multiple batches" $ do +-- let bs = fakeEncodeBatchV2 10 "some_message" 1 +-- <> fakeEncodeBatchV2 11 "some_message" 1 +-- decodeNextRecordOffset bs `shouldReturn` (Just 12) +-- +-- it "decodeNextRecordOffset v1" $ do +-- let bs1 = fakeEncodeBatchV1 10 "some_message" 1 +-- len1 = BS.length bs1 +-- bs2 = fakeEncodeBatchV1 11 "some_message" 9 +-- bs = bs1 <> bs2 +-- decodeNextRecordOffset bs `shouldReturn` (Just 20) +-- forM_ @[] [0..len1-1] $ \i -> do +-- let bs' = BS.take i bs +-- decodeNextRecordOffset bs' `shouldReturn` Nothing +-- decodeNextRecordOffset bs1 `shouldReturn` (Just 11) +-- forM_ @[] [len1+1..len1*2 - 1] $ \i -> do +-- let bs' = BS.take i bs +-- decodeNextRecordOffset bs' `shouldReturn` (Just 11) +-- decodeNextRecordOffset (BS.take (len1*2) bs) `shouldReturn` (Just 12) +-- +-- fakeEncodeBatchV2 +-- :: Int64 +-- -- ^ baseOffset +-- -> ByteString +-- -- ^ Value +-- -> Int +-- -- ^ Number of records +-- -> ByteString +-- fakeEncodeBatchV2 baseOffset value n = +-- let records = (flip map) [0..n-1] $ \i -> +-- let b = put $ +-- RecordV2_{ attributes = 0 +-- , timestampDelta = 0 +-- , offsetDelta = VarInt32 (fromIntegral i) +-- , key = RecordKey Nothing +-- , value = RecordValue $ Just value +-- , headers = RecordArray $ +-- [ ("header1", RecordHeaderValue $ Just "foo") +-- , ("header2", RecordHeaderValue $ Just "bar") +-- ] +-- } +-- in put @VarInt32 (fromIntegral $ builderLength b) <> b +-- recordsBuiler = put @Int32 (fromIntegral $ Prelude.length records) +-- <> foldl (<>) mempty records +-- batch = RecordBatch_ +-- { baseOffset = baseOffset +-- , batchLength = 0 -- Will be filled later +-- , partitionLeaderEpoch = 0 +-- , magic = 2 +-- , crc = 0 -- Will be filled later +-- , attributes = 0 +-- , lastOffsetDelta = fromIntegral n +-- , baseTimestamp = 1701670989840 +-- , maxTimestamp = 1701670989840 +-- , producerId = -1 +-- , producerEpoch = -1 +-- , baseSequence = -1 +-- } +-- bs = BL.toStrict $ toLazyByteString $ put batch <> recordsBuiler +-- in unsafePerformIO $ do unsafeAlterRecordBatchBs bs >> pure bs +-- +-- fakeEncodeBatchV1 +-- :: Int64 +-- -- ^ baseOffset +-- -> ByteString +-- -- ^ Value +-- -> Int +-- -- ^ Number of records +-- -> ByteString +-- fakeEncodeBatchV1 baseOffset value n = +-- (flip foldMap) ([0..n-1] :: [Int]) $ \i -> +-- unsafePerformIO $ +-- let bs = runPut $ RecordV1 +-- { baseOffset = baseOffset + fromIntegral i +-- , messageSize = 0 -- Will be filled later +-- , crc = 0 -- Will be filled later +-- , magic = 1 +-- , attributes = 0 +-- , timestamp = 1701670989840 +-- , key = Just "somekey" +-- , value = Just value +-- } +-- in unsafeAlterMessageSetBs bs >> pure bs