Skip to content

Commit

Permalink
kafka fix: resolve concurrent issue in withOffsetN (#1784)
Browse files Browse the repository at this point in the history
  • Loading branch information
4eUeP authored Mar 26, 2024
1 parent 74ed0b9 commit 39d9334
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 15 deletions.
39 changes: 24 additions & 15 deletions hstream-kafka/HStream/Kafka/Common/OffsetManager.hs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import Control.Concurrent
import Control.Exception
import qualified Data.HashTable.IO as H
import Data.Int
import Data.Maybe
import Data.Word
import Foreign.ForeignPtr (newForeignPtr_)
import Foreign.Ptr (nullPtr)
Expand Down Expand Up @@ -68,23 +69,31 @@ withOffset m logid = withOffsetN m logid 0
withOffsetN :: OffsetManager -> Word64 -> Int64 -> (Int64 -> IO a) -> IO a
withOffsetN m@OffsetManager{..} logid n f = do
m_offset <- H.lookup offsets logid
-- FIXME: currently, any exception happen in f will cause the offset not
-- updated. This may cause inconsistent between the offset and the actual
-- stored data.
case m_offset of
Just offset -> modifyMVar offset $ \o -> do
Just offset -> doUpdate offset
Nothing -> do
offset <- withMVar offsetsLock $ \_ -> do
-- There may other threads that have already created the offset, because
-- we have the 'offsetsLock' after the 'H.lookup' operation. So we need
-- to check again.
m_offset' <- H.lookup offsets logid
maybe (do o <- catch (do mo <- getLatestOffset m logid
pure $ fromMaybe (-1) mo)
(\(_ :: S.NOTFOUND) -> pure (-1))
ov <- newMVar o
H.insert offsets logid ov
pure ov)
pure
m_offset'
doUpdate offset
where
-- FIXME: currently, any exception happen in f will cause the offset not
-- updated. This may cause inconsistent between the offset and the actual
-- stored data.
doUpdate offset = modifyMVar offset $ \o -> do
let !o' = o + n
!a <- f o'
pure (o', a)
Nothing -> withMVar offsetsLock $ \_ -> do
o' <- catch (do mo <- getLatestOffset m logid
pure $ maybe (n - 1) (+ n) mo)
(\(_ :: S.NOTFOUND) -> pure $ n - 1)
mask $ \restore -> do
ov <- newMVar o'
a <- restore (f o') `onException` pure ()
H.insert offsets logid ov
pure a

cleanOffsetCache :: OffsetManager -> Word64 -> IO ()
cleanOffsetCache OffsetManager{..} = H.delete offsets
Expand All @@ -93,7 +102,7 @@ cleanOffsetCache OffsetManager{..} = H.delete offsets
getOldestOffset :: HasCallStack => OffsetManager -> Word64 -> IO (Maybe Int64)
getOldestOffset OffsetManager{..} logid =
-- Actually, we only need the first lsn but there is no easy way to get
(fmap $ calOffset . third) <$> readOneRecordBypassGap store reader logid (pure (S.LSN_MIN, S.LSN_MAX))
fmap (calOffset . third) <$> readOneRecordBypassGap store reader logid (pure (S.LSN_MIN, S.LSN_MAX))

getLatestOffset :: HasCallStack => OffsetManager -> Word64 -> IO (Maybe Int64)
getLatestOffset o logid = (fmap fst) <$> getLatestOffsetWithLsn o logid
Expand All @@ -117,7 +126,7 @@ getOffsetByTimestamp OffsetManager{..} logid timestamp = do
-- here we donot use (lsn, lsn) because this may result in
-- a gap or empty record.
else pure (lsn, tailLsn)
in (fmap $ calOffset . third) <$> readOneRecordBypassGap store reader logid getLsn
in fmap (calOffset . third) <$> readOneRecordBypassGap store reader logid getLsn

-------------------------------------------------------------------------------

Expand Down
3 changes: 3 additions & 0 deletions hstream-kafka/hstream-kafka.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ test-suite hstream-kafka-test
HStream.Kafka.Common.AclEntrySpec
HStream.Kafka.Common.AclSpec
HStream.Kafka.Common.AuthorizerSpec
HStream.Kafka.Common.OffsetManagerSpec
HStream.Kafka.Common.TestUtils

hs-source-dirs: tests
Expand All @@ -272,8 +273,10 @@ test-suite hstream-kafka-test
, hspec
, hspec-expectations
, hstream-common
, hstream-common-base
, hstream-common-server
, hstream-kafka:{hstream-kafka, kafka-protocol}
, hstream-store
, http-client
, text
, uuid
Expand Down
58 changes: 58 additions & 0 deletions hstream-kafka/tests/HStream/Kafka/Common/OffsetManagerSpec.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
module HStream.Kafka.Common.OffsetManagerSpec where

import Control.Monad
import Data.IORef
import Data.Word
import Test.Hspec

import HStream.Base.Concurrent (runConc)
import HStream.Kafka.Common.OffsetManager
import qualified HStream.Kafka.Common.RecordFormat as K
import HStream.Kafka.Common.TestUtils (ldclient)
import qualified HStream.Store as S
import qualified HStream.Utils as U
import qualified Kafka.Protocol.Encoding as K

initOm :: Bool -> IO (OffsetManager, Word64)
initOm shouldTrim = do
om <- initOffsetReader =<< newOffsetManager ldclient
let logid = 50 -- we already has logids from 1 to 100
when shouldTrim $
S.trimLast ldclient logid -- clear the log
pure (om, logid)

spec :: Spec
spec = describe "OffsetManagerSpec" $ do
it "withOffsetN" $ do
(om, logid) <- initOm True
r <- newIORef 0
withOffsetN om logid 10 $ writeIORef r
readIORef r `shouldReturn` 9
withOffsetN om logid 10 $ writeIORef r
readIORef r `shouldReturn` 19

it "withOffsetN multi-thread" $ do
(om, logid) <- initOm True
r <- newIORef []
runConc 100 $
withOffsetN om logid 10 $ \o -> modifyIORef' r (++ [o])
offsets <- readIORef r
offsets `shouldBe` [9, 19..999]

it "withOffsetN multi-thread with existent data" $ do
(_, logid) <- initOm True
let existOffset = 11
appendKey = U.intToCBytesWithPadding existOffset
appendAttrs = Just [(S.KeyTypeFindKey, appendKey)]
storedRecord = K.runPut $ K.RecordFormat 0{- version -}
existOffset
10 {-batchLength-}
(K.CompactBytes "xxx")
void $ S.appendCompressedBS ldclient logid storedRecord S.CompressionNone appendAttrs

(om, _) <- initOm False
r <- newIORef []
runConc 100 $ do
withOffsetN om logid 10 $ \o -> modifyIORef' r (++ [o])
offsets <- readIORef r
offsets `shouldBe` [21, 31..1011]
16 changes: 16 additions & 0 deletions hstream-kafka/tests/HStream/Kafka/Common/TestUtils.hs
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,22 @@ module HStream.Kafka.Common.TestUtils
, withZkBasedAclAuthorizer
, withRqliteBasedAclAuthorizer
, withFileBasedAclAuthorizer

, ldclient
) where

import Data.Maybe
import Data.Text (Text)
import qualified Data.Text as T
import qualified Data.UUID as UUID
import qualified Data.UUID.V4 as UUID
import qualified HStream.Store.Logger as S
import qualified Network.HTTP.Client as HTTP
import System.Environment (lookupEnv)
import System.IO.Unsafe (unsafePerformIO)
import Test.Hspec
import qualified Z.Data.CBytes as CB
import qualified Z.Data.CBytes as CBytes
import qualified ZooKeeper as ZK

import HStream.Kafka.Common.Acl
Expand All @@ -32,6 +37,7 @@ import HStream.Kafka.Common.Resource hiding (match)
import HStream.Kafka.Common.Security
import qualified HStream.Kafka.Server.MetaData as Meta
import qualified HStream.MetaStore.Types as Meta
import qualified HStream.Store as S

------------------------------------------------------------
-- Construct 'ResourcePattern':
Expand Down Expand Up @@ -132,3 +138,13 @@ withFileBasedAclAuthorizer action = do
authorizer <- newAclAuthorizer (pure filePath)
initAclAuthorizer authorizer
action (AuthorizerObject $ Just authorizer)

--------------------------------------------------------------------------------

-- Copy from: hstream-store/test/HStream/Store/SpecUtils.hs
ldclient :: S.LDClient
ldclient = unsafePerformIO $ do
config <- fromMaybe "/data/store/logdevice.conf" <$> lookupEnv "TEST_LD_CONFIG"
_ <- S.setLogDeviceDbgLevel S.C_DBG_ERROR
S.newLDClient $ CBytes.pack config
{-# NOINLINE ldclient #-}

0 comments on commit 39d9334

Please sign in to comment.