From ea471a2f6d7bd6ff2d0f2ad278b3e2f6eda48a2b Mon Sep 17 00:00:00 2001 From: Adithya Obilisetty Date: Tue, 6 Feb 2024 00:21:17 +0530 Subject: [PATCH] Update the stat-collector to work properly with the library --- haskell-perf.cabal | 1 + lib/Stat.hs | 27 ++++--- stat-collector-src/Main.hs | 148 +++++++++---------------------------- 3 files changed, 53 insertions(+), 123 deletions(-) diff --git a/haskell-perf.cabal b/haskell-perf.cabal index fda16eb..3e3908a 100644 --- a/haskell-perf.cabal +++ b/haskell-perf.cabal @@ -141,3 +141,4 @@ executable stat-collector , containers , format-numbers , text + , haskell-perf diff --git a/lib/Stat.hs b/lib/Stat.hs index b1f203d..e61b84f 100644 --- a/lib/Stat.hs +++ b/lib/Stat.hs @@ -41,6 +41,9 @@ import qualified Streamly.Internal.Data.MutByteArray as MBA import Prelude import Language.Haskell.TH +#define PRIM_OP_AVAILABLE +-- #undef PRIM_OP_AVAILABLE + #ifdef PRIM_OP_AVAILABLE import GHC.Exts (threadCPUTime#) #else @@ -77,12 +80,14 @@ $(MBA.deriveSerialize [d|instance MBA.Serialize Counter|]) data Metric = Metric { m_tid :: Int32 + , m_namespace :: String , m_modName :: String , m_lineNum :: Int32 , m_counter :: Counter , m_location :: EvLoc , m_value :: Int64 } + deriving (Show) $(MBA.deriveSerialize [d|instance MBA.Serialize Metric|]) {-# INLINE tenPow9 #-} @@ -178,8 +183,8 @@ printMetricList mList = do putChunk stdout arr -eventGeneric :: (forall b. IO b -> m b) -> EvLoc -> SrcLoc -> m () -eventGeneric liftio evLoc srcLoc = liftio $ do +eventGeneric :: (forall b. IO b -> m b) -> String -> EvLoc -> SrcLoc -> m () +eventGeneric liftio namespace evLoc srcLoc = liftio $ do (a, b, c, d) <- getThreadStat let modName = loc_module srcLoc let lnNum = (fromIntegral :: Int -> Int32) $ fst (loc_start srcLoc) @@ -187,11 +192,11 @@ eventGeneric liftio evLoc srcLoc = liftio $ do wTimeU <- getCurrentTime let wTime = round $ diffUTCTime wTimeU epochTime * 1e9 let mList = - [ Metric a modName lnNum ThreadCpuTime evLoc b - , Metric a modName lnNum Allocated evLoc c - , Metric a modName lnNum SchedOut evLoc d - , Metric a modName lnNum ProcessCpuTime evLoc pCpuTime - , Metric a modName lnNum WallClockTime evLoc wTime + [ Metric a namespace modName lnNum ThreadCpuTime evLoc b + , Metric a namespace modName lnNum Allocated evLoc c + , Metric a namespace modName lnNum SchedOut evLoc d + , Metric a namespace modName lnNum ProcessCpuTime evLoc pCpuTime + , Metric a namespace modName lnNum WallClockTime evLoc wTime ] {- shouldStat <- testBitOn (fromEnum win) statEnv @@ -209,19 +214,19 @@ withEvLoc f = do start :: Q Exp start = do Loc a b c d e <- location - [|\liftio -> eventGeneric liftio Start (Loc a b c d e)|] + [|eventGeneric id "g" Start (Loc a b c d e)|] end :: Q Exp end = do Loc a b c d e <- location - [|\liftio -> eventGeneric liftio End (Loc a b c d e)|] + [|eventGeneric id "g" End (Loc a b c d e)|] record :: Q Exp record = do Loc a b c d e <- location - [|\liftio -> eventGeneric liftio Record (Loc a b c d e)|] + [|eventGeneric id "g" Record (Loc a b c d e)|] restart :: Q Exp restart = do Loc a b c d e <- location - [|\liftio -> eventGeneric liftio Restart (Loc a b c d e)|] + [|eventGeneric id "g" Restart (Loc a b c d e)|] diff --git a/stat-collector-src/Main.hs b/stat-collector-src/Main.hs index d16fdd2..6d38e42 100644 --- a/stat-collector-src/Main.hs +++ b/stat-collector-src/Main.hs @@ -20,18 +20,27 @@ import Streamly.Data.Array (Array) import Streamly.Data.Fold (Fold) import Streamly.Data.Stream (Stream) import Streamly.Internal.Data.Fold (Fold(..), Step(..)) +import Streamly.Data.ParserK (ParserK) import Streamly.Unicode.String (str) import System.IO (stdin) import Data.Text.Format.Numbers (prettyI) import Control.Exception (catch, SomeException, displayException, throwIO, ErrorCall(..)) +import qualified Streamly.Internal.Data.StreamK as StreamK (unfoldrM) import qualified Data.Text as Text import qualified Data.Map as Map import qualified Streamly.Data.Fold as Fold -import qualified Streamly.Data.Stream.Prelude as Stream +import qualified Streamly.Data.Parser as Parser +import qualified Streamly.Data.ParserK as ParserK +import qualified Streamly.Data.Stream as Stream +import qualified Streamly.Data.StreamK as StreamK import qualified Streamly.FileSystem.Handle as Handle import qualified Streamly.Unicode.Stream as Unicode +import qualified Streamly.Internal.Data.Array as Array +import qualified Streamly.Internal.Data.Binary.Parser as Parser + +import Stat -------------------------------------------------------------------------------- -- Utils @@ -44,30 +53,9 @@ double = fromIntegral -- Types -------------------------------------------------------------------------------- -data Boundary a b - = Start a b - | Record a b - | Restart a b - | End a b - deriving (Read, Show, Ord, Eq) - -getNameSpace :: Boundary NameSpace PointId -> NameSpace -getNameSpace (Start a _) = a -getNameSpace (Record a _) = a -getNameSpace (Restart a _) = a -getNameSpace (End a _) = a - -data Counter - = ThreadCpuTime - | ProcessCpuTime - | WallClockTime - | Allocated - | SchedOut - deriving (Read, Show, Ord, Eq) - type NameSpace = String type ModuleName = String -type LineNum = Int +type LineNum = Int32 type PointId = (ModuleName, LineNum) type ThreadId = Int32 type Tag = String @@ -81,10 +69,6 @@ data EventId = } deriving (Eq, Ord, Show) -data UnboundedEvent - = UEvent (Boundary NameSpace PointId) ThreadId Counter Value - deriving (Show) - data Event = Event EventId Value deriving (Show) @@ -121,101 +105,41 @@ stats = -- Parsing Input -------------------------------------------------------------------------------- --- Event format: --- Start,,,, --- Record,,,, --- Restart,,,, --- End,,, - --- Tag format: --- NameSpace[ModuleName:LineNumber] - -errorString :: String -> String -> String -errorString line reason = [str|Error: -Line: #{line} -Reason: #{reason} -|] - -fIntegral :: Integral a => Fold IO Char a -fIntegral = - Fold.foldlM' combine (pure 0) - where - combine b a = - case ord a - 48 of - x -> - if x >= 0 || x <= 9 - then pure $ fromIntegral x + b * 10 - else throwIO $ ErrorCall "fIntegral: NaN" - -fEventBoundary :: - Fold IO Char (NameSpace -> PointId -> Boundary NameSpace PointId) -fEventBoundary = - Fold.rmapM f Fold.toList - where - f "Start" = pure Start - f "Record" = pure Record - f "Restart" = pure Restart - f "End" = pure End - f _ = throwIO $ ErrorCall "fEventBoundary: undefined" - -fCounterName :: Monad m => Fold m Char Counter -fCounterName = read <$> Fold.toList - -fTag :: Fold IO Char (NameSpace, PointId) -fTag = - (\a b c -> (a, (b, c))) - <$> Fold.takeEndBy_ (== '[') Fold.toList - <*> Fold.takeEndBy_ (== ':') Fold.toList - <*> Fold.takeEndBy_ (== ']') fIntegral - -fUnboundedEvent :: Fold IO Char UnboundedEvent -fUnboundedEvent = - f - <$> (Fold.takeEndBy_ (== ',') fEventBoundary) - <*> (fTag <* Fold.one) -- fTag is a terminating fold - <*> (Fold.takeEndBy_ (== ',') fIntegral) - <*> (Fold.takeEndBy_ (== ',') fCounterName) - <*> fIntegral - - where - - f a b c d e = UEvent (a (fst b) (snd b)) c d e - -parseLineToEvent :: String -> IO (Either String UnboundedEvent) -parseLineToEvent line = - catch - (Right <$> Stream.fold fUnboundedEvent (Stream.fromList line)) - (\(e :: SomeException) -> - pure (Left (errorString line (displayException e)))) +-- Use ParserK here? +-- There are only 1 inner bind, ParserD should work fine. +metricParser :: Parser.Parser Word8 IO Metric +metricParser = do + size64 <- Parser.int64le + let size = fromIntegral size64 - 8 + fmap Array.deserialize $ Parser.takeEQ size (Array.unsafeCreateOf size) +-- parseMany can be implemented in a recursive manner using parserK parseInputToEventStream - :: Stream IO (Array Word8) -> Stream IO UnboundedEvent + :: Stream IO (Array Word8) -> Stream IO Metric parseInputToEventStream inp = - Unicode.decodeUtf8Chunks inp - & Stream.foldMany - (Fold.takeEndBy_ - (== '\n') - (Fold.rmapM parseLineToEvent Fold.toList)) - & Stream.catRights + fmap f $ Stream.parseMany metricParser $ Array.concat inp + where + f (Left err) = error $ show err + f (Right v) = v -------------------------------------------------------------------------------- -- Processing stats -------------------------------------------------------------------------------- -boundEvents :: Monad m => Fold m UnboundedEvent (Maybe Event) +boundEvents :: Monad m => Fold m Metric (Maybe Event) boundEvents = Fold step initial extract extract where initial = pure $ Partial (Nothing, Map.empty) alterFunc - :: UnboundedEvent + :: Metric -> Maybe [(PointId, Value)] -> (Maybe Event, Maybe [(PointId, Value)]) - alterFunc (UEvent (Start _ point) _ _ val) Nothing = - (Nothing, Just [(point, val)]) - alterFunc (UEvent (Start _ point) _ _ val) (Just xs) = - (Nothing, Just ((point, val):xs)) - alterFunc (UEvent (End ns (md, ln)) tid counter val) (Just stk) = + alterFunc (Metric _ _ m l _ Start val) Nothing = + (Nothing, Just [((m, l), val)]) + alterFunc (Metric _ _ m l _ Start val) (Just xs) = + (Nothing, Just (((m, l), val):xs)) + alterFunc (Metric tid ns md ln counter End val) (Just stk) = case uncons stk of Just (((md1, ln1), prevVal), stk1) -> let lnStr = show ln @@ -225,17 +149,17 @@ boundEvents = Fold step initial extract extract , Just stk1 ) Nothing -> error "boundEvents: Empty stack" - alterFunc (UEvent (Restart ns point@(md, ln)) tid counter val) (Just stk) = + alterFunc (Metric tid ns md ln counter Restart val) (Just stk) = case uncons stk of Just (((md1, ln1), prevVal), stk1) -> let lnStr = show ln ln1Str = show ln1 win = [str|#{ns}[#{md1}:#{ln1Str}-#{md}:#{lnStr}]|] in ( Just (Event (EventId tid counter win) (val - prevVal)) - , Just ((point ,val):stk1) + , Just (((md, ln) ,val):stk1) ) Nothing -> error "boundEvents: Empty stack" - alterFunc (UEvent (Record ns (md, ln)) tid counter val) (Just stk) = + alterFunc (Metric tid ns md ln counter Record val) (Just stk) = case uncons stk of Just (((md1, ln1), prevVal), _) -> let lnStr = show ln @@ -247,9 +171,9 @@ boundEvents = Fold step initial extract extract Nothing -> error "boundEvents: Empty stack" alterFunc _ Nothing = (Nothing, Nothing) - step (_, mp) uev@(UEvent b tid counter _) = + step (_, mp) uev@(Metric tid ns _ _ counter _ _) = pure $ Partial - $ Map.alterF (alterFunc uev) (getNameSpace b, tid, counter) mp + $ Map.alterF (alterFunc uev) (ns, tid, counter) mp extract (ev, _) = pure ev