Skip to content

Commit

Permalink
Update the stat-collector to work properly with the library
Browse files Browse the repository at this point in the history
  • Loading branch information
Adithya Obilisetty committed Feb 5, 2024
1 parent 01a4ca7 commit ea471a2
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 123 deletions.
1 change: 1 addition & 0 deletions haskell-perf.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -141,3 +141,4 @@ executable stat-collector
, containers
, format-numbers
, text
, haskell-perf
27 changes: 16 additions & 11 deletions lib/Stat.hs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ import qualified Streamly.Internal.Data.MutByteArray as MBA
import Prelude
import Language.Haskell.TH

#define PRIM_OP_AVAILABLE
-- #undef PRIM_OP_AVAILABLE

#ifdef PRIM_OP_AVAILABLE
import GHC.Exts (threadCPUTime#)
#else
Expand Down Expand Up @@ -77,12 +80,14 @@ $(MBA.deriveSerialize [d|instance MBA.Serialize Counter|])
data Metric =
Metric
{ m_tid :: Int32
, m_namespace :: String
, m_modName :: String
, m_lineNum :: Int32
, m_counter :: Counter
, m_location :: EvLoc
, m_value :: Int64
}
deriving (Show)
$(MBA.deriveSerialize [d|instance MBA.Serialize Metric|])

{-# INLINE tenPow9 #-}
Expand Down Expand Up @@ -178,20 +183,20 @@ printMetricList mList = do
putChunk stdout arr


eventGeneric :: (forall b. IO b -> m b) -> EvLoc -> SrcLoc -> m ()
eventGeneric liftio evLoc srcLoc = liftio $ do
eventGeneric :: (forall b. IO b -> m b) -> String -> EvLoc -> SrcLoc -> m ()
eventGeneric liftio namespace evLoc srcLoc = liftio $ do
(a, b, c, d) <- getThreadStat
let modName = loc_module srcLoc
let lnNum = (fromIntegral :: Int -> Int32) $ fst (loc_start srcLoc)
pCpuTime <- picoToNanoSeconds <$> getCPUTime
wTimeU <- getCurrentTime
let wTime = round $ diffUTCTime wTimeU epochTime * 1e9
let mList =
[ Metric a modName lnNum ThreadCpuTime evLoc b
, Metric a modName lnNum Allocated evLoc c
, Metric a modName lnNum SchedOut evLoc d
, Metric a modName lnNum ProcessCpuTime evLoc pCpuTime
, Metric a modName lnNum WallClockTime evLoc wTime
[ Metric a namespace modName lnNum ThreadCpuTime evLoc b
, Metric a namespace modName lnNum Allocated evLoc c
, Metric a namespace modName lnNum SchedOut evLoc d
, Metric a namespace modName lnNum ProcessCpuTime evLoc pCpuTime
, Metric a namespace modName lnNum WallClockTime evLoc wTime
]
{-
shouldStat <- testBitOn (fromEnum win) statEnv
Expand All @@ -209,19 +214,19 @@ withEvLoc f = do
start :: Q Exp
start = do
Loc a b c d e <- location
[|\liftio -> eventGeneric liftio Start (Loc a b c d e)|]
[|eventGeneric id "g" Start (Loc a b c d e)|]

end :: Q Exp
end = do
Loc a b c d e <- location
[|\liftio -> eventGeneric liftio End (Loc a b c d e)|]
[|eventGeneric id "g" End (Loc a b c d e)|]

record :: Q Exp
record = do
Loc a b c d e <- location
[|\liftio -> eventGeneric liftio Record (Loc a b c d e)|]
[|eventGeneric id "g" Record (Loc a b c d e)|]

restart :: Q Exp
restart = do
Loc a b c d e <- location
[|\liftio -> eventGeneric liftio Restart (Loc a b c d e)|]
[|eventGeneric id "g" Restart (Loc a b c d e)|]
148 changes: 36 additions & 112 deletions stat-collector-src/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,27 @@ import Streamly.Data.Array (Array)
import Streamly.Data.Fold (Fold)
import Streamly.Data.Stream (Stream)
import Streamly.Internal.Data.Fold (Fold(..), Step(..))
import Streamly.Data.ParserK (ParserK)
import Streamly.Unicode.String (str)
import System.IO (stdin)
import Data.Text.Format.Numbers (prettyI)
import Control.Exception
(catch, SomeException, displayException, throwIO, ErrorCall(..))
import qualified Streamly.Internal.Data.StreamK as StreamK (unfoldrM)

import qualified Data.Text as Text
import qualified Data.Map as Map
import qualified Streamly.Data.Fold as Fold
import qualified Streamly.Data.Stream.Prelude as Stream
import qualified Streamly.Data.Parser as Parser
import qualified Streamly.Data.ParserK as ParserK
import qualified Streamly.Data.Stream as Stream
import qualified Streamly.Data.StreamK as StreamK
import qualified Streamly.FileSystem.Handle as Handle
import qualified Streamly.Unicode.Stream as Unicode
import qualified Streamly.Internal.Data.Array as Array
import qualified Streamly.Internal.Data.Binary.Parser as Parser

import Stat

--------------------------------------------------------------------------------
-- Utils
Expand All @@ -44,30 +53,9 @@ double = fromIntegral
-- Types
--------------------------------------------------------------------------------

data Boundary a b
= Start a b
| Record a b
| Restart a b
| End a b
deriving (Read, Show, Ord, Eq)

getNameSpace :: Boundary NameSpace PointId -> NameSpace
getNameSpace (Start a _) = a
getNameSpace (Record a _) = a
getNameSpace (Restart a _) = a
getNameSpace (End a _) = a

data Counter
= ThreadCpuTime
| ProcessCpuTime
| WallClockTime
| Allocated
| SchedOut
deriving (Read, Show, Ord, Eq)

type NameSpace = String
type ModuleName = String
type LineNum = Int
type LineNum = Int32
type PointId = (ModuleName, LineNum)
type ThreadId = Int32
type Tag = String
Expand All @@ -81,10 +69,6 @@ data EventId =
}
deriving (Eq, Ord, Show)

data UnboundedEvent
= UEvent (Boundary NameSpace PointId) ThreadId Counter Value
deriving (Show)

data Event
= Event EventId Value
deriving (Show)
Expand Down Expand Up @@ -121,101 +105,41 @@ stats =
-- Parsing Input
--------------------------------------------------------------------------------

-- Event format:
-- Start,<tag>,<tid>,<counterName>,<value>
-- Record,<tag>,<tid>,<counterName>,<value>
-- Restart,<tag>,<tid>,<counterName>,<value>
-- End,<tag>,<tid>,<counterName,<value>

-- Tag format:
-- NameSpace[ModuleName:LineNumber]

errorString :: String -> String -> String
errorString line reason = [str|Error:
Line: #{line}
Reason: #{reason}
|]

fIntegral :: Integral a => Fold IO Char a
fIntegral =
Fold.foldlM' combine (pure 0)
where
combine b a =
case ord a - 48 of
x ->
if x >= 0 || x <= 9
then pure $ fromIntegral x + b * 10
else throwIO $ ErrorCall "fIntegral: NaN"

fEventBoundary ::
Fold IO Char (NameSpace -> PointId -> Boundary NameSpace PointId)
fEventBoundary =
Fold.rmapM f Fold.toList
where
f "Start" = pure Start
f "Record" = pure Record
f "Restart" = pure Restart
f "End" = pure End
f _ = throwIO $ ErrorCall "fEventBoundary: undefined"

fCounterName :: Monad m => Fold m Char Counter
fCounterName = read <$> Fold.toList

fTag :: Fold IO Char (NameSpace, PointId)
fTag =
(\a b c -> (a, (b, c)))
<$> Fold.takeEndBy_ (== '[') Fold.toList
<*> Fold.takeEndBy_ (== ':') Fold.toList
<*> Fold.takeEndBy_ (== ']') fIntegral

fUnboundedEvent :: Fold IO Char UnboundedEvent
fUnboundedEvent =
f
<$> (Fold.takeEndBy_ (== ',') fEventBoundary)
<*> (fTag <* Fold.one) -- fTag is a terminating fold
<*> (Fold.takeEndBy_ (== ',') fIntegral)
<*> (Fold.takeEndBy_ (== ',') fCounterName)
<*> fIntegral

where

f a b c d e = UEvent (a (fst b) (snd b)) c d e

parseLineToEvent :: String -> IO (Either String UnboundedEvent)
parseLineToEvent line =
catch
(Right <$> Stream.fold fUnboundedEvent (Stream.fromList line))
(\(e :: SomeException) ->
pure (Left (errorString line (displayException e))))
-- Use ParserK here?
-- There are only 1 inner bind, ParserD should work fine.
metricParser :: Parser.Parser Word8 IO Metric
metricParser = do
size64 <- Parser.int64le
let size = fromIntegral size64 - 8
fmap Array.deserialize $ Parser.takeEQ size (Array.unsafeCreateOf size)

-- parseMany can be implemented in a recursive manner using parserK
parseInputToEventStream
:: Stream IO (Array Word8) -> Stream IO UnboundedEvent
:: Stream IO (Array Word8) -> Stream IO Metric
parseInputToEventStream inp =
Unicode.decodeUtf8Chunks inp
& Stream.foldMany
(Fold.takeEndBy_
(== '\n')
(Fold.rmapM parseLineToEvent Fold.toList))
& Stream.catRights
fmap f $ Stream.parseMany metricParser $ Array.concat inp
where
f (Left err) = error $ show err
f (Right v) = v

--------------------------------------------------------------------------------
-- Processing stats
--------------------------------------------------------------------------------

boundEvents :: Monad m => Fold m UnboundedEvent (Maybe Event)
boundEvents :: Monad m => Fold m Metric (Maybe Event)
boundEvents = Fold step initial extract extract
where
initial = pure $ Partial (Nothing, Map.empty)

alterFunc
:: UnboundedEvent
:: Metric
-> Maybe [(PointId, Value)]
-> (Maybe Event, Maybe [(PointId, Value)])
alterFunc (UEvent (Start _ point) _ _ val) Nothing =
(Nothing, Just [(point, val)])
alterFunc (UEvent (Start _ point) _ _ val) (Just xs) =
(Nothing, Just ((point, val):xs))
alterFunc (UEvent (End ns (md, ln)) tid counter val) (Just stk) =
alterFunc (Metric _ _ m l _ Start val) Nothing =
(Nothing, Just [((m, l), val)])
alterFunc (Metric _ _ m l _ Start val) (Just xs) =
(Nothing, Just (((m, l), val):xs))
alterFunc (Metric tid ns md ln counter End val) (Just stk) =
case uncons stk of
Just (((md1, ln1), prevVal), stk1) ->
let lnStr = show ln
Expand All @@ -225,17 +149,17 @@ boundEvents = Fold step initial extract extract
, Just stk1
)
Nothing -> error "boundEvents: Empty stack"
alterFunc (UEvent (Restart ns point@(md, ln)) tid counter val) (Just stk) =
alterFunc (Metric tid ns md ln counter Restart val) (Just stk) =
case uncons stk of
Just (((md1, ln1), prevVal), stk1) ->
let lnStr = show ln
ln1Str = show ln1
win = [str|#{ns}[#{md1}:#{ln1Str}-#{md}:#{lnStr}]|]
in ( Just (Event (EventId tid counter win) (val - prevVal))
, Just ((point ,val):stk1)
, Just (((md, ln) ,val):stk1)
)
Nothing -> error "boundEvents: Empty stack"
alterFunc (UEvent (Record ns (md, ln)) tid counter val) (Just stk) =
alterFunc (Metric tid ns md ln counter Record val) (Just stk) =
case uncons stk of
Just (((md1, ln1), prevVal), _) ->
let lnStr = show ln
Expand All @@ -247,9 +171,9 @@ boundEvents = Fold step initial extract extract
Nothing -> error "boundEvents: Empty stack"
alterFunc _ Nothing = (Nothing, Nothing)

step (_, mp) uev@(UEvent b tid counter _) =
step (_, mp) uev@(Metric tid ns _ _ counter _ _) =
pure $ Partial
$ Map.alterF (alterFunc uev) (getNameSpace b, tid, counter) mp
$ Map.alterF (alterFunc uev) (ns, tid, counter) mp

extract (ev, _) = pure ev

Expand Down

0 comments on commit ea471a2

Please sign in to comment.