diff --git a/hstream-kafka/HStream/Kafka/Server/Config.hs b/hstream-kafka/HStream/Kafka/Server/Config.hs index 85dbedd64..737319e22 100644 --- a/hstream-kafka/HStream/Kafka/Server/Config.hs +++ b/hstream-kafka/HStream/Kafka/Server/Config.hs @@ -15,20 +15,25 @@ module HStream.Kafka.Server.Config , advertisedListenersToPB , StorageOptions (..) , ExperimentalFeature (..) + + , KafkaBrokerConfigs + , updateConfigs + , mkKafkaBrokerConfigs ) where -import Control.Exception (throwIO) -import qualified Data.Text as Text -import Data.Yaml (ParseException (..), - decodeFileThrow, - parseEither) -import System.Directory (makeAbsolute) +import Control.Exception (throwIO) +import qualified Data.Text as Text +import Data.Yaml (ParseException (..), + decodeFileThrow, + parseEither) +import System.Directory (makeAbsolute) -import HStream.Common.Types (getHStreamVersion) +import HStream.Common.Types (getHStreamVersion) import HStream.Kafka.Server.Config.FromCli import HStream.Kafka.Server.Config.FromJson +import HStream.Kafka.Server.Config.KafkaConfig import HStream.Kafka.Server.Config.Types -import qualified HStream.Server.HStreamApi as A +import qualified HStream.Server.HStreamApi as A runServerConfig :: [String] -> (ServerOpts -> IO ()) -> IO () diff --git a/hstream-kafka/HStream/Kafka/Server/Config/FromCli.hs b/hstream-kafka/HStream/Kafka/Server/Config/FromCli.hs index 7bd4f05c6..22ad45770 100644 --- a/hstream-kafka/HStream/Kafka/Server/Config/FromCli.hs +++ b/hstream-kafka/HStream/Kafka/Server/Config/FromCli.hs @@ -14,32 +14,29 @@ module HStream.Kafka.Server.Config.FromCli , parseMetaStoreAddr ) where -import qualified Data.Attoparsec.Text as AP -import Data.Bifunctor (second) -import Data.ByteString (ByteString) -import Data.Map.Strict (Map) -import qualified Data.Map.Strict as Map -import qualified Data.Set as Set -import Data.Text (Text) -import qualified Data.Text as T -import Data.Word (Word16, Word32) -import Options.Applicative as O (auto, flag, help, - long, - maybeReader, - metavar, option, - optional, short, - strOption, value, - (<**>), (<|>)) -import qualified Options.Applicative as O -import System.Environment (getProgName) -import System.Exit (exitSuccess) -import Z.Data.CBytes (CBytes) - -import qualified HStream.Kafka.Server.Config.KafkaConfig as KC +import qualified Data.Attoparsec.Text as AP +import Data.Bifunctor (second) +import Data.ByteString (ByteString) +import Data.Map.Strict (Map) +import qualified Data.Map.Strict as Map +import qualified Data.Set as Set +import Data.Text (Text) +import qualified Data.Text as T +import Data.Word (Word16, Word32) +import Options.Applicative as O (auto, flag, help, long, + maybeReader, metavar, + option, optional, + short, strOption, + value, (<**>), (<|>)) +import qualified Options.Applicative as O +import System.Environment (getProgName) +import System.Exit (exitSuccess) +import Z.Data.CBytes (CBytes) + import HStream.Kafka.Server.Config.Types -import qualified HStream.Logger as Log -import HStream.Store (Compression (..)) -import HStream.Store.Logger (LDLogLevel (..)) +import qualified HStream.Logger as Log +import HStream.Store (Compression (..)) +import HStream.Store.Logger (LDLogLevel (..)) ------------------------------------------------------------------------------- @@ -106,7 +103,7 @@ cliOptionsParser = do cliEnableSaslAuth <- enableSaslAuthParser cliEnableAcl <- enableAclParser - cliBrokerConfigs <- brokerConfigsParser + cliBrokerProps <- brokerConfigsParser cliExperimentalFeatures <- O.many experimentalFeatureParser @@ -300,27 +297,20 @@ experimentalFeatureParser :: O.Parser ExperimentalFeature experimentalFeatureParser = option parseExperimentalFeature $ long "experimental" <> metavar "ExperimentalFeature" -brokerConfigsParser :: O.Parser KC.KafkaBrokerConfigs -brokerConfigsParser = toKafkaBrokerConfigs . Map.fromList - <$> O.many - ( O.option propertyReader - ( O.long "prop" - <> metavar "KEY=VALUE" - <> help "Broker property" - ) - ) +brokerConfigsParser :: O.Parser (Map Text Text) +brokerConfigsParser = Map.fromList <$> O.many + ( O.option propertyReader + ( O.long "prop" + <> metavar "KEY=VALUE" + <> help "Broker property" + ) + ) where propertyReader :: O.ReadM (Text, Text) propertyReader = O.eitherReader $ \kv -> let (k, v) = second tail $ span (/= '=') kv in Right (T.pack k, T.pack v) - toKafkaBrokerConfigs :: Map Text Text -> KC.KafkaBrokerConfigs - toKafkaBrokerConfigs mp = - case KC.mkConfigs (mp Map.!?) of - Left msg -> errorWithoutStackTrace (T.unpack msg) - Right v -> v - ------------------------------------------------------------------------------- parserOpt :: (Text -> Either String a) -> O.Mod O.OptionFields a -> O.Parser a diff --git a/hstream-kafka/HStream/Kafka/Server/Config/FromJson.hs b/hstream-kafka/HStream/Kafka/Server/Config/FromJson.hs index 7b5d0bf9c..b3fad012a 100644 --- a/hstream-kafka/HStream/Kafka/Server/Config/FromJson.hs +++ b/hstream-kafka/HStream/Kafka/Server/Config/FromJson.hs @@ -40,7 +40,8 @@ parseJSONToOptions CliOptions{..} obj = do nodeLogWithColor <- nodeCfgObj .:? "log-with-color" .!= True -- Kafka config - !_kafkaBrokerConfigs <- KC.mergeBrokerConfigs cliBrokerConfigs <$> KC.parseBrokerConfigs nodeCfgObj + brokerConfigs <- KC.parseBrokerConfigs nodeCfgObj + let !_kafkaBrokerConfigs = either (errorWithoutStackTrace . show) id $ KC.updateConfigs brokerConfigs cliBrokerProps metricsPort <- nodeCfgObj .:? "metrics-port" .!= 9700 let !_metricsPort = fromMaybe metricsPort cliMetricsPort diff --git a/hstream-kafka/HStream/Kafka/Server/Config/KafkaConfig.hs b/hstream-kafka/HStream/Kafka/Server/Config/KafkaConfig.hs index ce88622d7..dd8365565 100644 --- a/hstream-kafka/HStream/Kafka/Server/Config/KafkaConfig.hs +++ b/hstream-kafka/HStream/Kafka/Server/Config/KafkaConfig.hs @@ -9,7 +9,8 @@ import qualified Data.Aeson.Key as Y import qualified Data.Aeson.Text as Y import Data.Int (Int32) import Data.List (intercalate) -import qualified Data.Map as Map +import Data.Map.Strict (Map) +import qualified Data.Map.Strict as Map import qualified Data.Text as T import qualified Data.Text.Lazy as TL import qualified Data.Text.Read as T @@ -183,8 +184,14 @@ parseBrokerConfigs obj = allBrokerConfigs :: KafkaBrokerConfigs -> V.Vector KafkaConfigInstance allBrokerConfigs = V.fromList . Map.elems . dumpConfigs -mergeBrokerConfigs :: KafkaBrokerConfigs -> KafkaBrokerConfigs -> KafkaBrokerConfigs -mergeBrokerConfigs = mergeConfigs +updateBrokerConfigs :: KafkaBrokerConfigs -> Map T.Text T.Text -> Either T.Text KafkaBrokerConfigs +updateBrokerConfigs = updateConfigs + +mkKafkaBrokerConfigs :: Map T.Text T.Text -> KafkaBrokerConfigs +mkKafkaBrokerConfigs mp = + case mkConfigs (mp Map.!?) of + Left msg -> errorWithoutStackTrace (T.unpack msg) + Right v -> v --------------------------------------------------------------------------- -- Config Helpers @@ -193,10 +200,12 @@ type Lookup = T.Text -> Maybe T.Text type ConfigMap = Map.Map T.Text KafkaConfigInstance class KafkaConfigs a where - mkConfigs :: Lookup -> Either T.Text a - dumpConfigs :: a -> ConfigMap + mkConfigs :: Lookup -> Either T.Text a + dumpConfigs :: a -> ConfigMap defaultConfigs :: a - mergeConfigs :: a -> a -> a + -- Update current configs. New properties will be added and existing properties will be overwrite. + -- Unknow properties will be ignored. + updateConfigs :: a -> Map T.Text T.Text -> Either T.Text a default mkConfigs :: (G.Generic a, GKafkaConfigs (G.Rep a)) => Lookup -> Either T.Text a mkConfigs lk = G.to <$> gmkConfigs lk @@ -207,14 +216,14 @@ class KafkaConfigs a where default defaultConfigs :: (G.Generic a, GKafkaConfigs (G.Rep a)) => a defaultConfigs = G.to gdefaultConfigs - default mergeConfigs :: (G.Generic a, GKafkaConfigs (G.Rep a)) => a -> a -> a - mergeConfigs x y = G.to (gmergeConfigs (G.from x) (G.from y)) + default updateConfigs :: (G.Generic a, GKafkaConfigs (G.Rep a)) => a -> Map T.Text T.Text -> Either T.Text a + updateConfigs x mp = G.to <$> gupdateConfigs (G.from x) mp class GKafkaConfigs f where - gmkConfigs :: Lookup -> Either T.Text (f p) - gdumpConfigs :: (f p) -> ConfigMap + gmkConfigs :: Lookup -> Either T.Text (f p) + gdumpConfigs :: (f p) -> ConfigMap gdefaultConfigs :: f p - gmergeConfigs :: f p -> f p -> f p + gupdateConfigs :: f p -> Map T.Text T.Text -> Either T.Text (f p) instance KafkaConfig c => GKafkaConfigs (G.K1 i c) where gmkConfigs lk = G.K1 <$> case lk (name @c defaultConfig) of @@ -222,19 +231,21 @@ instance KafkaConfig c => GKafkaConfigs (G.K1 i c) where Just textValue -> fromText @c textValue gdumpConfigs (G.K1 x) = (Map.singleton (name x) (KafkaConfigInstance x)) gdefaultConfigs = G.K1 (defaultConfig @c) - gmergeConfigs (G.K1 x) (G.K1 y) = G.K1 (if isDefaultValue x then y else x) + gupdateConfigs (G.K1 x) mp = G.K1 <$> case Map.lookup (name x) mp of + Nothing -> Right x + Just textValue -> fromText @c textValue instance GKafkaConfigs f => GKafkaConfigs (G.M1 i c f) where gmkConfigs lk = G.M1 <$> (gmkConfigs lk) gdumpConfigs (G.M1 x) = gdumpConfigs x gdefaultConfigs = G.M1 gdefaultConfigs - gmergeConfigs (G.M1 x) (G.M1 y) = G.M1 (gmergeConfigs x y) + gupdateConfigs (G.M1 x) mp = G.M1 <$> gupdateConfigs x mp instance (GKafkaConfigs a, GKafkaConfigs b) => GKafkaConfigs (a G.:*: b) where gmkConfigs lk = (G.:*:) <$> (gmkConfigs lk) <*> (gmkConfigs lk) gdumpConfigs (x G.:*: y) = Map.union (gdumpConfigs x) (gdumpConfigs y) gdefaultConfigs = gdefaultConfigs G.:*: gdefaultConfigs - gmergeConfigs (x1 G.:*: y1) (x2 G.:*: y2) = (gmergeConfigs x1 x2) G.:*: (gmergeConfigs y1 y2) + gupdateConfigs (x G.:*: y) mp = (G.:*:) <$> gupdateConfigs x mp <*> gupdateConfigs y mp #define MK_CONFIG_PAIR(configType) \ let dc = defaultConfig @configType in (name dc, (KafkaConfigInstance dc, fmap KafkaConfigInstance . fromText @configType)) diff --git a/hstream-kafka/HStream/Kafka/Server/Config/Types.hs b/hstream-kafka/HStream/Kafka/Server/Config/Types.hs index a3c0c642f..9bb0bff52 100644 --- a/hstream-kafka/HStream/Kafka/Server/Config/Types.hs +++ b/hstream-kafka/HStream/Kafka/Server/Config/Types.hs @@ -142,7 +142,7 @@ data CliOptions = CliOptions , cliEnableAcl :: !Bool -- Kafka broker config - , cliBrokerConfigs :: !KC.KafkaBrokerConfigs + , cliBrokerProps :: Map Text Text -- HStream Experimental Features , cliExperimentalFeatures :: ![ExperimentalFeature] diff --git a/hstream-kafka/hstream-kafka.cabal b/hstream-kafka/hstream-kafka.cabal index 21596a8b0..31eef9df9 100644 --- a/hstream-kafka/hstream-kafka.cabal +++ b/hstream-kafka/hstream-kafka.cabal @@ -264,6 +264,7 @@ test-suite hstream-kafka-test HStream.Kafka.Common.AuthorizerSpec HStream.Kafka.Common.OffsetManagerSpec HStream.Kafka.Common.TestUtils + HStream.Kafka.Common.ConfigSpec hs-source-dirs: tests build-depends: diff --git a/hstream-kafka/tests/HStream/Kafka/Common/ConfigSpec.hs b/hstream-kafka/tests/HStream/Kafka/Common/ConfigSpec.hs new file mode 100644 index 000000000..d1f5e180f --- /dev/null +++ b/hstream-kafka/tests/HStream/Kafka/Common/ConfigSpec.hs @@ -0,0 +1,39 @@ +{-# OPTIONS_GHC -Wno-orphans #-} + +module HStream.Kafka.Common.ConfigSpec where + +import qualified Data.Map.Strict as M +import HStream.Kafka.Server.Config +import Test.Hspec + +spec :: Spec +spec = describe "KafkaConfigTest" $ do + + it "updateConfigs" $ do + let kc1 = mkKafkaBrokerConfigs $ M.fromList + [ + ("auto.create.topics.enable", "false"), + ("num.partitions", "2"), + ("offsets.topic.replication.factor", "1") + ] + let updates = M.fromList + [ + ("num.partitions", "3"), + ("default.replication.factor", "2"), + ("offsets.topic.replication.factor", "2"), + -- unknown properties should be ignored + ("unknown", "c") + ] + let expected = mkKafkaBrokerConfigs $ M.fromList + [ + -- properties not include in updates will remain unchanged + ("auto.create.topics.enable", "false"), + -- properties includes in updates will be overridde + ("num.partitions", "3"), + ("default.replication.factor", "2"), + ("offsets.topic.replication.factor", "2") + ] + let kc = updateConfigs kc1 updates + case kc of + Left e -> fail $ "updateConfigs failed: " <> show e + Right cfg -> cfg `shouldBe` expected