Skip to content

Commit

Permalink
refactor: decouple Prometheus server in hstream-kafka (#1717)
Browse files Browse the repository at this point in the history
  • Loading branch information
4eUeP authored Dec 22, 2023
1 parent 542246a commit 24c1168
Show file tree
Hide file tree
Showing 12 changed files with 124 additions and 87 deletions.
65 changes: 65 additions & 0 deletions hstream-kafka/HStream/Kafka/Common/Metrics.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
module HStream.Kafka.Common.Metrics
( startMetricsServer
, observeWithLabel
, P.withLabel
, P.setGauge
, P.addCounter
, P.incCounter
, P.observeDuration

, module HStream.Kafka.Common.Metrics.ConsumeStats
, module HStream.Kafka.Common.Metrics.ProduceStats
, module HStream.Kafka.Common.Metrics.ServerStats
) where

import Data.Ratio ((%))
import Data.String (fromString)
import qualified Network.Wai.Handler.Warp as Warp
import qualified Network.Wai.Middleware.Prometheus as P
import qualified Prometheus as P
import System.Clock (Clock (..),
diffTimeSpec,
getTime, toNanoSecs)

import HStream.Kafka.Common.Metrics.ConsumeStats
import HStream.Kafka.Common.Metrics.ProduceStats
import HStream.Kafka.Common.Metrics.ServerStats

-- | Start a prometheus server
--
-- Note: The host recognizes the following special values:
--
-- * means HostAny - "any IPv4 or IPv6 hostname"
-- *4 means HostIPv4 - "any IPv4 or IPv6 hostname, IPv4 preferred"
-- !4 means HostIPv4Only - "any IPv4 hostname"
-- *6 means HostIPv6@ - "any IPv4 or IPv6 hostname, IPv6 preferred"
-- !6 means HostIPv6Only - "any IPv6 hostname"
--
-- Note that the permissive * values allow binding to an IPv4 or an IPv6
-- hostname, which means you might be able to successfully bind to a port more
-- times than you expect (eg once on the IPv4 localhost 127.0.0.1 and again on
-- the IPv6 localhost 0:0:0:0:0:0:0:1).
--
-- Any other value is treated as a hostname. As an example, to bind to the IPv4
-- local host only, use "127.0.0.1".
startMetricsServer :: String -> Int -> IO ()
startMetricsServer host port = do
let settings = Warp.setHost (fromString host)
$ Warp.setPort port
$ Warp.defaultSettings
Warp.runSettings settings $
P.prometheus P.def{P.prometheusInstrumentPrometheus = False} P.metricsApp

observeWithLabel
:: (P.Observer metric, P.Label label)
=> P.Vector label metric
-> label
-> IO a
-> IO a
observeWithLabel metric labels action = do
start <- getTime Monotonic
result <- action
end <- getTime Monotonic
let duration = toNanoSecs (end `diffTimeSpec` start) % 1000000000
P.withLabel metric labels $ flip P.observe (fromRational duration)
return result
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
module HStream.Kafka.Metrics.ConsumeStats where
module HStream.Kafka.Common.Metrics.ConsumeStats where

import qualified Prometheus as P

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
module HStream.Kafka.Metrics.ProduceStats where
module HStream.Kafka.Common.Metrics.ProduceStats where

import qualified Prometheus as P

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
module HStream.Kafka.Metrics.ServerStats where
module HStream.Kafka.Common.Metrics.ServerStats where

import qualified Prometheus as P

Expand Down
18 changes: 0 additions & 18 deletions hstream-kafka/HStream/Kafka/Common/Utils.hs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,11 @@ import qualified Data.HashTable.IO as H
import qualified Data.HashTable.ST.Basic as HB
import qualified Data.IORef as IO
import Data.Maybe (fromMaybe)
import Data.Ratio ((%))
import qualified Data.Text as T
import qualified Data.Text.Encoding as T
import qualified Data.Vector as V
import HStream.Kafka.Common.KafkaException (ErrorCodeException (ErrorCodeException))
import qualified Kafka.Protocol.Encoding as K
import qualified Prometheus as P
import System.Clock (Clock (..), diffTimeSpec,
getTime, toNanoSecs)

type HashTable k v = H.BasicHashTable k v

Expand Down Expand Up @@ -84,17 +80,3 @@ encodeBase64 = Base64.encodeBase64

decodeBase64 :: T.Text -> BS.ByteString
decodeBase64 = Base64.decodeBase64Lenient . T.encodeUtf8

observeWithLabel
:: (P.Observer metric, P.Label label)
=> P.Vector label metric
-> label
-> IO a
-> IO a
observeWithLabel metric labels action = do
start <- getTime Monotonic
result <- action
end <- getTime Monotonic
let duration = toNanoSecs (end `diffTimeSpec` start) % 1000000000
P.withLabel metric labels $ flip P.observe (fromRational duration)
return result
7 changes: 3 additions & 4 deletions hstream-kafka/HStream/Kafka/Group/GroupOffsetManager.hs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ import GHC.Generics (Generic)
import System.Clock

import HStream.Kafka.Common.KafkaException (ErrorCodeException (ErrorCodeException))
import qualified HStream.Kafka.Common.Metrics as M
import HStream.Kafka.Group.OffsetsStore (OffsetStorage (..),
mkCkpOffsetStorage)
import HStream.Kafka.Metrics.ConsumeStats (consumerGroupCommittedOffsets)
import qualified HStream.Logger as Log
import qualified HStream.Store as LD
import qualified HStream.Store as S
Expand All @@ -39,7 +39,6 @@ import qualified Kafka.Protocol.Error as K
import Kafka.Protocol.Message (OffsetCommitRequestPartition (..),
OffsetCommitResponsePartition (..),
OffsetFetchResponsePartition (..))
import qualified Prometheus as P

-- NOTE: All operations on the GroupMetadataManager are not concurrency-safe,
-- and the caller needs to ensure concurrency-safety on its own.
Expand Down Expand Up @@ -128,8 +127,8 @@ storeOffsets gmm@GroupOffsetManager{..} topicName arrayOffsets = do
<> "} to topic " <> Log.build topicName

V.forM_ offsetsInfo $ \(tp, _, offset) -> do
P.withLabel consumerGroupCommittedOffsets (groupName, topicName, T.pack . show $ tp.topicPartitionIdx) $
flip P.setGauge (fromIntegral offset)
M.withLabel M.consumerGroupCommittedOffsets (groupName, topicName, T.pack . show $ tp.topicPartitionIdx) $
flip M.setGauge (fromIntegral offset)

-- update cache
modifyIORef' offsetsCache $ \cache -> do
Expand Down
52 changes: 25 additions & 27 deletions hstream-kafka/HStream/Kafka/Network.hs
Original file line number Diff line number Diff line change
Expand Up @@ -19,34 +19,29 @@ module HStream.Kafka.Network
) where

import Control.Concurrent
import qualified Control.Exception as E
import qualified Control.Exception as E
import Control.Monad
import Control.Monad.IO.Class (liftIO)
import qualified Control.Monad.State as State
import Data.ByteString (ByteString)
import qualified Data.ByteString as BS
import Control.Monad.IO.Class (liftIO)
import qualified Control.Monad.State as State
import Data.ByteString (ByteString)
import qualified Data.ByteString as BS
import Data.Int
import Data.List (find, intersperse)
import Data.Maybe (fromMaybe, isJust,
isNothing)
import qualified Data.Text as T
import qualified Network.Socket as N
import qualified Network.Socket.ByteString as N
import qualified Network.Socket.ByteString.Lazy as NL
import Numeric (showHex, showInt)
import qualified Prometheus as P
import Data.List (find, intersperse)
import Data.Maybe (fromMaybe, isJust,
isNothing)
import qualified Data.Text as T
import qualified Network.Socket as N
import qualified Network.Socket.ByteString as N
import qualified Network.Socket.ByteString.Lazy as NL
import Numeric (showHex, showInt)

import HStream.Kafka.Common.KafkaException (ErrorCodeException (..))
import HStream.Kafka.Common.OffsetManager (initOffsetReader)
import HStream.Kafka.Common.Utils (observeWithLabel)
import HStream.Kafka.Metrics.ServerStats (handlerLatencies,
totalRequests)
import qualified HStream.Kafka.Network.IO as KIO
import qualified HStream.Kafka.Network.Security as Security
import HStream.Kafka.Server.Config.Types (SaslOptions (..))
import HStream.Kafka.Server.Types (ServerContext (..),
initConnectionContext)
import qualified HStream.Logger as Log
import qualified HStream.Kafka.Common.Metrics as M
import qualified HStream.Kafka.Network.IO as KIO
import qualified HStream.Kafka.Network.Security as Security
import HStream.Kafka.Server.Config.Types (SaslOptions (..))
import HStream.Kafka.Server.Types (ServerContext (..),
initConnectionContext)
import qualified HStream.Logger as Log
import Kafka.Protocol.Encoding
import Kafka.Protocol.Message
import Kafka.Protocol.Service
Expand Down Expand Up @@ -117,11 +112,14 @@ runServer opts sc_ mkPreAuthedHandlers mkAuthedHandlers =

runHandler peer handlers reqHeader@RequestHeader{..} reqBs = do
Log.debug $ "Received request header: " <> Log.buildString' reqHeader
P.incCounter totalRequests
M.incCounter M.totalRequests
let ServiceHandler{..} = findHandler handlers requestApiKey requestApiVersion
case rpcHandler of
UnaryHandler rpcHandler' -> do
observeWithLabel handlerLatencies (T.pack $ show requestApiKey) $ doUnaryHandler reqBs reqHeader rpcHandler' peer
M.observeWithLabel
M.handlerLatencies
(T.pack $ show requestApiKey) $
doUnaryHandler reqBs reqHeader rpcHandler' peer

doUnaryHandler l reqHeader@RequestHeader{..} rpcHandler' peer = do
(req, left) <- runGet' l
Expand Down
23 changes: 10 additions & 13 deletions hstream-kafka/HStream/Kafka/Server/Handler/Consume.hs
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,11 @@ import qualified Data.Vector.Hashtables as HT
import qualified Data.Vector.Storable as VS
import GHC.Data.FastMutInt
import GHC.Stack (HasCallStack)
import qualified Prometheus as P

import qualified HStream.Base.Growing as GV
import qualified HStream.Kafka.Common.Metrics as M
import qualified HStream.Kafka.Common.OffsetManager as K
import qualified HStream.Kafka.Common.RecordFormat as K
import HStream.Kafka.Metrics.ConsumeStats (readLatencySnd,
topicTotalSendBytes,
topicTotalSendMessages,
totalConsumeRequest)
import HStream.Kafka.Server.Config (ServerOpts (..),
StorageOptions (..))
import HStream.Kafka.Server.Types (ServerContext (..))
Expand Down Expand Up @@ -75,8 +71,8 @@ handleFetch ServerContext{..} _ r = K.catchFetchResponseEx $ do
orderedParts <- S.listStreamPartitionsOrdered scLDClient
(S.transToTopicStreamName t.topic)
ps <- V.forM partitionReqs $ \p{- K.FetchPartition -} -> do
P.withLabel totalConsumeRequest (t.topic, T.pack . show $ p.partition) $
\counter -> void $ P.addCounter counter 1
M.withLabel M.totalConsumeRequest (t.topic, T.pack . show $ p.partition) $
\counter -> void $ M.addCounter counter 1
let m_logid = orderedParts V.!? fromIntegral p.partition
case m_logid of
Nothing -> do
Expand Down Expand Up @@ -167,11 +163,11 @@ handleFetch ServerContext{..} _ r = K.catchFetchResponseEx $ do
bs <- encodePartition mutMaxBytes mutIsFirstPartition request v
-- Stats
let partLabel = (topic, T.pack . show $ request.partition)
P.withLabel topicTotalSendBytes partLabel $ \counter -> void $
P.addCounter counter (fromIntegral $ BS.length bs)
P.withLabel topicTotalSendMessages partLabel $ \counter -> void $ do
M.withLabel M.topicTotalSendBytes partLabel $ \counter -> void $
M.addCounter counter (fromIntegral $ BS.length bs)
M.withLabel M.topicTotalSendMessages partLabel $ \counter -> void $ do
let totalRecords = V.sum $ V.map (\K.RecordFormat{..} -> batchLength) v
P.addCounter counter (fromIntegral totalRecords)
M.addCounter counter (fromIntegral totalRecords)
-- PartitionData
pure $ K.PartitionData request.partition K.NONE hioffset (Just bs)
(-1){- TODO: lastStableOffset -}
Expand All @@ -188,7 +184,7 @@ handleFetch ServerContext{..} _ r = K.catchFetchResponseEx $ do
else S.readerSetTimeout reader r.maxWaitMs
S.readerSetWaitOnlyWhenNoData reader
(_, records) <- foldWhileM (0, []) $ \(size, acc) -> do
rs <- P.observeDuration readLatencySnd $ S.readerRead reader 100
rs <- M.observeDuration M.readLatencySnd $ S.readerRead reader 100
if null rs
then pure ((size, acc), False)
else do let size' = size + sum (map (K.recordBytesSize . (.recordPayload)) rs)
Expand All @@ -210,7 +206,8 @@ handleFetch ServerContext{..} _ r = K.catchFetchResponseEx $ do
if r.maxWaitMs > defTimeout
then do
S.readerSetTimeout reader defTimeout
rs1 <- P.observeDuration readLatencySnd $ S.readerRead reader storageOpts.fetchMaxLen
rs1 <- M.observeDuration M.readLatencySnd $
S.readerRead reader storageOpts.fetchMaxLen
let size = sum (map (K.recordBytesSize . (.recordPayload)) rs1)
if size >= fromIntegral r.minBytes
then pure rs1
Expand Down
22 changes: 10 additions & 12 deletions hstream-kafka/HStream/Kafka/Server/Handler/Produce.hs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,9 @@ import qualified Data.Text as T
import qualified Data.Vector as V
import Data.Word

import qualified HStream.Kafka.Common.Metrics as M
import qualified HStream.Kafka.Common.OffsetManager as K
import qualified HStream.Kafka.Common.RecordFormat as K
import HStream.Kafka.Common.Utils (observeWithLabel)
import HStream.Kafka.Metrics.ProduceStats (appendLatencySnd,
topicTotalAppendBytes,
topicTotalAppendMessages,
totalProduceRequest)
import HStream.Kafka.Server.Types (ServerContext (..))
import qualified HStream.Logger as Log
import qualified HStream.Store as S
Expand All @@ -28,7 +24,6 @@ import qualified Kafka.Protocol.Encoding as K
import qualified Kafka.Protocol.Error as K
import qualified Kafka.Protocol.Message as K
import qualified Kafka.Protocol.Service as K
import qualified Prometheus as P

-- acks: (FIXME: Currently we only support -1)
-- 0: The server will not send any response(this is the only case where the
Expand Down Expand Up @@ -65,7 +60,10 @@ handleProduce ServerContext{..} _ req = do
else V.forM
partitionResponses <- loopPart partitionData $ \partition -> do
let Just (_, logid) = partitions V.!? (fromIntegral partition.index) -- TODO: handle Nothing
P.withLabel totalProduceRequest (topic.name, T.pack . show $ partition.index) $ \counter -> void $ P.addCounter counter 1
M.withLabel
M.totalProduceRequest
(topic.name, T.pack . show $ partition.index) $ \counter ->
void $ M.addCounter counter 1
let Just recordBytes = partition.recordBytes -- TODO: handle Nothing
Log.debug1 $ "Try to append to logid " <> Log.build logid
<> "(" <> Log.build partition.index <> ")"
Expand Down Expand Up @@ -129,12 +127,12 @@ appendRecords shouldValidateCrc ldclient om (streamName, partition) logid bs = d
o (fromIntegral batchLength)
(K.CompactBytes storedBs)
Log.debug1 $ "Append key " <> Log.buildString' appendKey
r <- observeWithLabel appendLatencySnd streamName $
r <- M.observeWithLabel M.appendLatencySnd streamName $
S.appendCompressedBS ldclient logid storedRecord S.CompressionNone
appendAttrs
let !partLabel = (streamName, T.pack . show $ partition)
P.withLabel topicTotalAppendBytes partLabel $ \counter ->
void $ P.addCounter counter (fromIntegral $ BS.length storedRecord)
P.withLabel topicTotalAppendMessages partLabel $ \counter ->
void $ P.addCounter counter (fromIntegral batchLength)
M.withLabel M.topicTotalAppendBytes partLabel $ \counter ->
void $ M.addCounter counter (fromIntegral $ BS.length storedRecord)
M.withLabel M.topicTotalAppendMessages partLabel $ \counter ->
void $ M.addCounter counter (fromIntegral batchLength)
pure (r, startOffset)
8 changes: 5 additions & 3 deletions hstream-kafka/hstream-kafka.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ library
HStream.Kafka.Client.Api
HStream.Kafka.Client.Cli
HStream.Kafka.Common.KafkaException
HStream.Kafka.Common.Metrics
HStream.Kafka.Common.OffsetManager
HStream.Kafka.Common.Read
HStream.Kafka.Common.RecordFormat
Expand All @@ -126,9 +127,9 @@ library

other-modules:
HStream.Kafka.Common.AdminCli
HStream.Kafka.Metrics.ConsumeStats
HStream.Kafka.Metrics.ProduceStats
HStream.Kafka.Metrics.ServerStats
HStream.Kafka.Common.Metrics.ConsumeStats
HStream.Kafka.Common.Metrics.ProduceStats
HStream.Kafka.Common.Metrics.ServerStats
HStream.Kafka.Server.Config.FromCli
HStream.Kafka.Server.Config.FromJson
HStream.Kafka.Server.Config.KafkaConfig
Expand Down Expand Up @@ -186,6 +187,7 @@ library
, uuid
, vector
, vector-hashtables
, wai-middleware-prometheus
, warp
, yaml
, Z-Data
Expand Down
7 changes: 3 additions & 4 deletions hstream/app/lib/KafkaServer.hs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import HStream.Gossip (GossipContext (..),
initGossipContext,
startGossip, waitGossipBoot)
import qualified HStream.Gossip.Types as Gossip
import HStream.Kafka.Common.Metrics (startMetricsServer)
import qualified HStream.Kafka.Network as K
import HStream.Kafka.Server.Config (AdvertisedListeners,
FileLoggerSettings (..),
Expand All @@ -66,8 +67,6 @@ import qualified HStream.Server.HStreamInternal as I
import qualified HStream.Store.Logger as S
import qualified HStream.ThirdParty.Protobuf as Proto
import HStream.Utils (getProtoTimestamp)
import qualified Network.Wai.Handler.Warp as Warp
import qualified Network.Wai.Middleware.Prometheus as P

#ifndef HSTREAM_ENABLE_ASAN
import Text.RawString.QQ (r)
Expand Down Expand Up @@ -143,8 +142,8 @@ app config@ServerOpts{..} = do
forM_ as (Async.link2Only (const True) a)
-- wait the default server
waitGossipBoot gossipContext
-- start prometheus app to export metrics
a2 <- Async.async $ Warp.run (fromIntegral _metricsPort) $ P.prometheus P.def {P.prometheusInstrumentPrometheus = False} P.metricsApp
-- start prometheus server to export metrics
a2 <- Async.async $ startMetricsServer "*4" (fromIntegral _metricsPort)
Async.link2Only (const True) a a2
Async.wait a

Expand Down
Loading

0 comments on commit 24c1168

Please sign in to comment.