Skip to content

Commit

Permalink
kafka: fix merge configs (#1803)
Browse files Browse the repository at this point in the history
  • Loading branch information
YangKian authored May 7, 2024
1 parent ca965b3 commit 1060655
Show file tree
Hide file tree
Showing 7 changed files with 112 additions and 65 deletions.
21 changes: 13 additions & 8 deletions hstream-kafka/HStream/Kafka/Server/Config.hs
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,25 @@ module HStream.Kafka.Server.Config
, advertisedListenersToPB
, StorageOptions (..)
, ExperimentalFeature (..)

, KafkaBrokerConfigs
, updateConfigs
, mkKafkaBrokerConfigs
) where

import Control.Exception (throwIO)
import qualified Data.Text as Text
import Data.Yaml (ParseException (..),
decodeFileThrow,
parseEither)
import System.Directory (makeAbsolute)
import Control.Exception (throwIO)
import qualified Data.Text as Text
import Data.Yaml (ParseException (..),
decodeFileThrow,
parseEither)
import System.Directory (makeAbsolute)

import HStream.Common.Types (getHStreamVersion)
import HStream.Common.Types (getHStreamVersion)
import HStream.Kafka.Server.Config.FromCli
import HStream.Kafka.Server.Config.FromJson
import HStream.Kafka.Server.Config.KafkaConfig
import HStream.Kafka.Server.Config.Types
import qualified HStream.Server.HStreamApi as A
import qualified HStream.Server.HStreamApi as A


runServerConfig :: [String] -> (ServerOpts -> IO ()) -> IO ()
Expand Down
72 changes: 31 additions & 41 deletions hstream-kafka/HStream/Kafka/Server/Config/FromCli.hs
Original file line number Diff line number Diff line change
Expand Up @@ -14,32 +14,29 @@ module HStream.Kafka.Server.Config.FromCli
, parseMetaStoreAddr
) where

import qualified Data.Attoparsec.Text as AP
import Data.Bifunctor (second)
import Data.ByteString (ByteString)
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as Map
import qualified Data.Set as Set
import Data.Text (Text)
import qualified Data.Text as T
import Data.Word (Word16, Word32)
import Options.Applicative as O (auto, flag, help,
long,
maybeReader,
metavar, option,
optional, short,
strOption, value,
(<**>), (<|>))
import qualified Options.Applicative as O
import System.Environment (getProgName)
import System.Exit (exitSuccess)
import Z.Data.CBytes (CBytes)

import qualified HStream.Kafka.Server.Config.KafkaConfig as KC
import qualified Data.Attoparsec.Text as AP
import Data.Bifunctor (second)
import Data.ByteString (ByteString)
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as Map
import qualified Data.Set as Set
import Data.Text (Text)
import qualified Data.Text as T
import Data.Word (Word16, Word32)
import Options.Applicative as O (auto, flag, help, long,
maybeReader, metavar,
option, optional,
short, strOption,
value, (<**>), (<|>))
import qualified Options.Applicative as O
import System.Environment (getProgName)
import System.Exit (exitSuccess)
import Z.Data.CBytes (CBytes)

import HStream.Kafka.Server.Config.Types
import qualified HStream.Logger as Log
import HStream.Store (Compression (..))
import HStream.Store.Logger (LDLogLevel (..))
import qualified HStream.Logger as Log
import HStream.Store (Compression (..))
import HStream.Store.Logger (LDLogLevel (..))

-------------------------------------------------------------------------------

Expand Down Expand Up @@ -106,7 +103,7 @@ cliOptionsParser = do
cliEnableSaslAuth <- enableSaslAuthParser
cliEnableAcl <- enableAclParser

cliBrokerConfigs <- brokerConfigsParser
cliBrokerProps <- brokerConfigsParser

cliExperimentalFeatures <- O.many experimentalFeatureParser

Expand Down Expand Up @@ -300,27 +297,20 @@ experimentalFeatureParser :: O.Parser ExperimentalFeature
experimentalFeatureParser = option parseExperimentalFeature $
long "experimental" <> metavar "ExperimentalFeature"

brokerConfigsParser :: O.Parser KC.KafkaBrokerConfigs
brokerConfigsParser = toKafkaBrokerConfigs . Map.fromList
<$> O.many
( O.option propertyReader
( O.long "prop"
<> metavar "KEY=VALUE"
<> help "Broker property"
)
)
brokerConfigsParser :: O.Parser (Map Text Text)
brokerConfigsParser = Map.fromList <$> O.many
( O.option propertyReader
( O.long "prop"
<> metavar "KEY=VALUE"
<> help "Broker property"
)
)
where
propertyReader :: O.ReadM (Text, Text)
propertyReader = O.eitherReader $ \kv ->
let (k, v) = second tail $ span (/= '=') kv
in Right (T.pack k, T.pack v)

toKafkaBrokerConfigs :: Map Text Text -> KC.KafkaBrokerConfigs
toKafkaBrokerConfigs mp =
case KC.mkConfigs (mp Map.!?) of
Left msg -> errorWithoutStackTrace (T.unpack msg)
Right v -> v

-------------------------------------------------------------------------------

parserOpt :: (Text -> Either String a) -> O.Mod O.OptionFields a -> O.Parser a
Expand Down
3 changes: 2 additions & 1 deletion hstream-kafka/HStream/Kafka/Server/Config/FromJson.hs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ parseJSONToOptions CliOptions{..} obj = do
nodeLogWithColor <- nodeCfgObj .:? "log-with-color" .!= True

-- Kafka config
!_kafkaBrokerConfigs <- KC.mergeBrokerConfigs cliBrokerConfigs <$> KC.parseBrokerConfigs nodeCfgObj
brokerConfigs <- KC.parseBrokerConfigs nodeCfgObj
let !_kafkaBrokerConfigs = either (errorWithoutStackTrace . show) id $ KC.updateConfigs brokerConfigs cliBrokerProps

metricsPort <- nodeCfgObj .:? "metrics-port" .!= 9700
let !_metricsPort = fromMaybe metricsPort cliMetricsPort
Expand Down
39 changes: 25 additions & 14 deletions hstream-kafka/HStream/Kafka/Server/Config/KafkaConfig.hs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ import qualified Data.Aeson.Key as Y
import qualified Data.Aeson.Text as Y
import Data.Int (Int32)
import Data.List (intercalate)
import qualified Data.Map as Map
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as Map
import qualified Data.Text as T
import qualified Data.Text.Lazy as TL
import qualified Data.Text.Read as T
Expand Down Expand Up @@ -183,8 +184,14 @@ parseBrokerConfigs obj =
allBrokerConfigs :: KafkaBrokerConfigs -> V.Vector KafkaConfigInstance
allBrokerConfigs = V.fromList . Map.elems . dumpConfigs

mergeBrokerConfigs :: KafkaBrokerConfigs -> KafkaBrokerConfigs -> KafkaBrokerConfigs
mergeBrokerConfigs = mergeConfigs
updateBrokerConfigs :: KafkaBrokerConfigs -> Map T.Text T.Text -> Either T.Text KafkaBrokerConfigs
updateBrokerConfigs = updateConfigs

mkKafkaBrokerConfigs :: Map T.Text T.Text -> KafkaBrokerConfigs
mkKafkaBrokerConfigs mp =
case mkConfigs (mp Map.!?) of
Left msg -> errorWithoutStackTrace (T.unpack msg)
Right v -> v

---------------------------------------------------------------------------
-- Config Helpers
Expand All @@ -193,10 +200,12 @@ type Lookup = T.Text -> Maybe T.Text
type ConfigMap = Map.Map T.Text KafkaConfigInstance

class KafkaConfigs a where
mkConfigs :: Lookup -> Either T.Text a
dumpConfigs :: a -> ConfigMap
mkConfigs :: Lookup -> Either T.Text a
dumpConfigs :: a -> ConfigMap
defaultConfigs :: a
mergeConfigs :: a -> a -> a
-- Update current configs. New properties will be added and existing properties will be overwrite.
-- Unknow properties will be ignored.
updateConfigs :: a -> Map T.Text T.Text -> Either T.Text a

default mkConfigs :: (G.Generic a, GKafkaConfigs (G.Rep a)) => Lookup -> Either T.Text a
mkConfigs lk = G.to <$> gmkConfigs lk
Expand All @@ -207,34 +216,36 @@ class KafkaConfigs a where
default defaultConfigs :: (G.Generic a, GKafkaConfigs (G.Rep a)) => a
defaultConfigs = G.to gdefaultConfigs

default mergeConfigs :: (G.Generic a, GKafkaConfigs (G.Rep a)) => a -> a -> a
mergeConfigs x y = G.to (gmergeConfigs (G.from x) (G.from y))
default updateConfigs :: (G.Generic a, GKafkaConfigs (G.Rep a)) => a -> Map T.Text T.Text -> Either T.Text a
updateConfigs x mp = G.to <$> gupdateConfigs (G.from x) mp

class GKafkaConfigs f where
gmkConfigs :: Lookup -> Either T.Text (f p)
gdumpConfigs :: (f p) -> ConfigMap
gmkConfigs :: Lookup -> Either T.Text (f p)
gdumpConfigs :: (f p) -> ConfigMap
gdefaultConfigs :: f p
gmergeConfigs :: f p -> f p -> f p
gupdateConfigs :: f p -> Map T.Text T.Text -> Either T.Text (f p)

instance KafkaConfig c => GKafkaConfigs (G.K1 i c) where
gmkConfigs lk = G.K1 <$> case lk (name @c defaultConfig) of
Nothing -> Right (defaultConfig @c)
Just textValue -> fromText @c textValue
gdumpConfigs (G.K1 x) = (Map.singleton (name x) (KafkaConfigInstance x))
gdefaultConfigs = G.K1 (defaultConfig @c)
gmergeConfigs (G.K1 x) (G.K1 y) = G.K1 (if isDefaultValue x then y else x)
gupdateConfigs (G.K1 x) mp = G.K1 <$> case Map.lookup (name x) mp of
Nothing -> Right x
Just textValue -> fromText @c textValue

instance GKafkaConfigs f => GKafkaConfigs (G.M1 i c f) where
gmkConfigs lk = G.M1 <$> (gmkConfigs lk)
gdumpConfigs (G.M1 x) = gdumpConfigs x
gdefaultConfigs = G.M1 gdefaultConfigs
gmergeConfigs (G.M1 x) (G.M1 y) = G.M1 (gmergeConfigs x y)
gupdateConfigs (G.M1 x) mp = G.M1 <$> gupdateConfigs x mp

instance (GKafkaConfigs a, GKafkaConfigs b) => GKafkaConfigs (a G.:*: b) where
gmkConfigs lk = (G.:*:) <$> (gmkConfigs lk) <*> (gmkConfigs lk)
gdumpConfigs (x G.:*: y) = Map.union (gdumpConfigs x) (gdumpConfigs y)
gdefaultConfigs = gdefaultConfigs G.:*: gdefaultConfigs
gmergeConfigs (x1 G.:*: y1) (x2 G.:*: y2) = (gmergeConfigs x1 x2) G.:*: (gmergeConfigs y1 y2)
gupdateConfigs (x G.:*: y) mp = (G.:*:) <$> gupdateConfigs x mp <*> gupdateConfigs y mp

#define MK_CONFIG_PAIR(configType) \
let dc = defaultConfig @configType in (name dc, (KafkaConfigInstance dc, fmap KafkaConfigInstance . fromText @configType))
Expand Down
2 changes: 1 addition & 1 deletion hstream-kafka/HStream/Kafka/Server/Config/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ data CliOptions = CliOptions
, cliEnableAcl :: !Bool

-- Kafka broker config
, cliBrokerConfigs :: !KC.KafkaBrokerConfigs
, cliBrokerProps :: Map Text Text

-- HStream Experimental Features
, cliExperimentalFeatures :: ![ExperimentalFeature]
Expand Down
1 change: 1 addition & 0 deletions hstream-kafka/hstream-kafka.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ test-suite hstream-kafka-test
HStream.Kafka.Common.AuthorizerSpec
HStream.Kafka.Common.OffsetManagerSpec
HStream.Kafka.Common.TestUtils
HStream.Kafka.Common.ConfigSpec

hs-source-dirs: tests
build-depends:
Expand Down
39 changes: 39 additions & 0 deletions hstream-kafka/tests/HStream/Kafka/Common/ConfigSpec.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
{-# OPTIONS_GHC -Wno-orphans #-}

module HStream.Kafka.Common.ConfigSpec where

import qualified Data.Map.Strict as M
import HStream.Kafka.Server.Config
import Test.Hspec

spec :: Spec
spec = describe "KafkaConfigTest" $ do

it "updateConfigs" $ do
let kc1 = mkKafkaBrokerConfigs $ M.fromList
[
("auto.create.topics.enable", "false"),
("num.partitions", "2"),
("offsets.topic.replication.factor", "1")
]
let updates = M.fromList
[
("num.partitions", "3"),
("default.replication.factor", "2"),
("offsets.topic.replication.factor", "2"),
-- unknown properties should be ignored
("unknown", "c")
]
let expected = mkKafkaBrokerConfigs $ M.fromList
[
-- properties not include in updates will remain unchanged
("auto.create.topics.enable", "false"),
-- properties includes in updates will be overridde
("num.partitions", "3"),
("default.replication.factor", "2"),
("offsets.topic.replication.factor", "2")
]
let kc = updateConfigs kc1 updates
case kc of
Left e -> fail $ "updateConfigs failed: " <> show e
Right cfg -> cfg `shouldBe` expected

0 comments on commit 1060655

Please sign in to comment.