diff --git a/cabal.project.user b/cabal.project.user index a178b69..23b949c 100644 --- a/cabal.project.user +++ b/cabal.project.user @@ -5,3 +5,8 @@ source-repository-package location: https://github.com/composewell/streamly.git tag: master subdir: core + +source-repository-package + type: git + location: https://github.com/composewell/streamly.git + tag: master diff --git a/dev/LiveStatWindows.md b/dev/LiveStatWindows.md new file mode 100644 index 0000000..29d9e00 --- /dev/null +++ b/dev/LiveStatWindows.md @@ -0,0 +1,51 @@ +# Windows + +In a linear flow, we have the ability to start a window anywhere and +stop it anywhere. A winow is basically the realestate between any 2 +points in the linear code. + +The goal is to make it easier and convinent to create windows. A +point in the linear flow should have some identification. We can use +the `ModuleName:LineNumber` to identify this. Using this has two +advantages. One being, removal of developer overhead of thinking about +a name and the other being extremely fast identification of whether +the point is enabled or not based on the line number. + +A window can be indentified by a tag. The tag, can be derived from +point identification. We can use +`ModuleName:LineNumber-ModuleName:LineNumber` respectively. + +This nomenclature lets us create non overlapping windows and +overlapping windows windows that are completely encapsulated. + +Non overlapping: +``` +............[...........]..................................... +......................................[.............]......... +``` + +Overlapping: +``` +............[.......................................]......... +.....................[................]....................... +``` + + +Window like this aren't possible: +``` +............[.........................]....................... +.....................[...........................]............ +``` + +From my experience, These kind of windows are rarely needed. But for +the 1% use-cases we can tackle this problem. + +To make the windows like the above possible, we can use namespaces. +Windows can belong to certain namespaces. + +We can use the `NameSpace[ModuleName:LineNumber]` to identify a point, +and `NameSpace[ModuleName:LineNumber-ModuleName:LineNumber]` to +identify a window. + +We can consider the point to be in a global namespace if the namespace +isn't mentioned. diff --git a/haskell-perf.cabal b/haskell-perf.cabal index eb93ddf..0d9e3f7 100644 --- a/haskell-perf.cabal +++ b/haskell-perf.cabal @@ -34,6 +34,11 @@ flag dev manual: True default: False +flag prim-op + description: Prim-op is available + manual: True + default: False + flag fusion-plugin description: Use fusion plugin for benchmarks and executables manual: True @@ -104,6 +109,19 @@ common compile-options ghc-options: -Wmissed-specialisations -Wall-missed-specialisations -fno-ignore-asserts + if flag(prim-op) + cpp-options: -DPRIM_OP_AVAILABLE + +library + import: compile-options + hs-source-dirs: lib + exposed-modules: Stat + ghc-options: -O2 -fmax-worker-args=16 -fspec-constr-recursive=16 + build-depends: + base >= 4.9 && < 5 + , streamly-core == 0.2.2 + , time + , template-haskell executable hperf import: compile-options @@ -114,6 +132,33 @@ executable hperf build-depends: base >= 4.9 && < 5 , containers - , streamly-core == 0.2.0 + , streamly-core == 0.2.2 + , format-numbers + , text + +executable stat-collector + import: compile-options + hs-source-dirs: stat-collector-src + main-is: Main.hs + ghc-options: -O2 -fmax-worker-args=16 -fspec-constr-recursive=16 + build-depends: + base + , streamly-core + , containers + , format-numbers + , text + , haskell-perf + +test-suite stat-test + import: compile-options + type: exitcode-stdio-1.0 + hs-source-dirs: test + main-is: Main.hs + ghc-options: -O2 -fmax-worker-args=16 -fspec-constr-recursive=16 + build-depends: + base + , streamly-core + , containers , format-numbers , text + , haskell-perf diff --git a/lib/Stat.hs b/lib/Stat.hs new file mode 100644 index 0000000..3807313 --- /dev/null +++ b/lib/Stat.hs @@ -0,0 +1,248 @@ +{-# LANGUAGE CPP #-} +{-# LANGUAGE RankNTypes #-} +{-# LANGUAGE MagicHash #-} +{-# LANGUAGE UnboxedTuples #-} +{-# LANGUAGE UnliftedFFITypes #-} +{-# LANGUAGE TemplateHaskell #-} +{-# LANGUAGE BangPatterns #-} +{-# LANGUAGE QuasiQuotes #-} +{-# OPTIONS_GHC -Wno-missing-export-lists #-} +{-# OPTIONS_GHC -Wno-implicit-lift #-} + +module Stat where + +-------------------------------------------------------------------------------- +-- Imports +-------------------------------------------------------------------------------- + +import Control.Monad (forM_) +import Data.Bits (shiftR, (.&.), setBit, testBit, complement) +-- import Data.IORef (newIORef, IORef, atomicModifyIORef) +import Data.Word (Word8) +import Foreign.C.Types (CInt(..)) +import GHC.Conc.Sync (myThreadId, ThreadId(..)) +import GHC.Exts (ThreadId#) +import GHC.IO(IO(..)) +import GHC.IO.Unsafe (unsafePerformIO) +import GHC.Int(Int64(..), Int32(..)) +import System.Environment (lookupEnv) +import System.IO + (openFile, IOMode(..), Handle, BufferMode(..), hSetBuffering, hClose) +-- import Streamly.Unicode.String (str) +import System.CPUTime (getCPUTime) +import Data.Time.Clock (getCurrentTime, UTCTime(..), diffUTCTime) +import Streamly.Internal.Data.Array (Array(..)) +import Streamly.FileSystem.Handle as Handle + +import qualified Streamly.Data.StreamK as StreamK +import qualified Streamly.Data.Stream as Stream +import qualified Streamly.Internal.Data.Array as Array +import qualified Streamly.Internal.Data.MutByteArray as MBA + +import Prelude +import Language.Haskell.TH + +#ifdef PRIM_OP_AVAILABLE +import GHC.Exts (threadCPUTime#) +#else +import GHC.Exts (RealWorld, Int32#, Int64#, State#) + +threadCPUTime# :: + State# RealWorld -> (# State# RealWorld, Int64#, Int64#, Int32#, Int32# #) +threadCPUTime# = undefined +#endif + +-------------------------------------------------------------------------------- +-- Thread stat +-------------------------------------------------------------------------------- + +type SrcLoc = Loc + +data EvLoc + = Start + | Record + | Restart + | End + deriving (Read, Show, Ord, Eq) +$(MBA.deriveSerialize [d|instance MBA.Serialize EvLoc|]) + +data Counter + = ThreadCpuTime + | ProcessCpuTime + | WallClockTime + | Allocated + | SchedOut + deriving (Read, Show, Ord, Eq) +$(MBA.deriveSerialize [d|instance MBA.Serialize Counter|]) + +data Metric = + Metric + { m_tid :: Int32 + , m_namespace :: Int32 + , m_modName :: String + , m_lineNum :: Int32 + , m_counter :: Counter + , m_location :: EvLoc + , m_value :: Int64 + } + deriving (Show) +$(MBA.deriveSerialize [d|instance MBA.Serialize Metric|]) + +{-# INLINE tenPow9 #-} +tenPow9 :: Int64 +tenPow9 = 1000000000 + +-------------------------------------------------------------------------------- +-- Perf handle +-------------------------------------------------------------------------------- + +-- Perf handle +-- Can we rely on the RTS to close the handle? +perfHandle :: Handle +perfHandle = + unsafePerformIO $ do + h <- openFile "perf.bin" WriteMode + hSetBuffering h NoBuffering + pure h + +closePerfHandle :: IO () +closePerfHandle = hClose perfHandle + + +-------------------------------------------------------------------------------- +-- Window Type +-------------------------------------------------------------------------------- + +createBitMap :: Maybe [Int] -> IO MBA.MutByteArray +createBitMap mLineList = do + let maxLines = 8000 + arr <- MBA.new maxLines + forM_ [0..(maxLines - 1)] + $ \i -> MBA.pokeAt i arr (0 :: Word8) + case mLineList of + Nothing -> pure arr + Just [] -> do + forM_ [0..(maxLines - 1)] + $ \i -> MBA.pokeAt i arr (complement (0 :: Word8)) + pure arr + Just lineList -> do + forM_ lineList $ \ln -> pokeBitOn ln arr + pure arr + +-------------------------------------------------------------------------------- +-- Helpers +-------------------------------------------------------------------------------- + +pokeBitOn :: Int -> MBA.MutByteArray -> IO () +pokeBitOn i arr = do + val <- MBA.peekAt (shiftR i 3) arr + MBA.pokeAt (shiftR i 3) arr (setBit (val :: Word8) (i .&. 7)) + +testBitOn :: Int -> MBA.MutByteArray -> IO Bool +testBitOn i arr = do + val <- MBA.peekAt (shiftR i 3) arr + pure $ testBit (val :: Word8) (i .&. 7) + +sizedSerialize :: MBA.Serialize a => a -> IO (Array Word8) +sizedSerialize a = do + let len = MBA.addSizeTo 0 a + 8 + mbarr <- MBA.new len + off0 <- MBA.serializeAt 0 mbarr (fromIntegral len :: Int64) + off1 <- MBA.serializeAt off0 mbarr a + pure $ Array mbarr 0 off1 + +-------------------------------------------------------------------------------- +-- Setup Env +-------------------------------------------------------------------------------- + +envMeasurementWindows :: Maybe String +envMeasurementWindows = unsafePerformIO $ lookupEnv "MEASUREMENT_WINDOWS" + +-------------------------------------------------------------------------------- +-- Measurement +-------------------------------------------------------------------------------- + +foreign import ccall unsafe "rts_getThreadId" getThreadId :: ThreadId# -> CInt + +{-# INLINE getThreadStatLowLevel #-} +getThreadStatLowLevel :: IO (Int64, Int64, Int32, Int32) +getThreadStatLowLevel = IO $ \s -> + case threadCPUTime# s of + (# s', sec, nsec, alloc, count_sched #) -> + (# s', (I64# sec, I64# nsec, I32# alloc, I32# count_sched) #) + +picoToNanoSeconds :: Integer -> Int64 +picoToNanoSeconds x = fromIntegral (x `div` 1000) + +epochTime :: UTCTime +epochTime = UTCTime (toEnum 0) 0 + +getThreadStat :: IO (Int32, Int64, Int64, Int64) +getThreadStat = do + ThreadId tid <- myThreadId + (sec, nsec, alloc, switches) <- getThreadStatLowLevel + pure + ( fromIntegral (getThreadId tid) + , sec * tenPow9 + nsec + , fromIntegral (alloc * 8) + , fromIntegral switches + ) + +printMetricList :: Handle -> [Metric] -> IO () +printMetricList handle mList = do + arr <- + Array.fromChunksK + $ StreamK.mapM sizedSerialize + $ StreamK.fromStream + $ Stream.fromList mList + putChunk handle arr + + +eventGeneric :: + (forall b. IO b -> m b) -> Int32 -> EvLoc -> SrcLoc -> Handle -> m () +eventGeneric liftio namespace evLoc srcLoc handle = liftio $ do + (a, b, c, d) <- getThreadStat + let modName = loc_module srcLoc + let lnNum = (fromIntegral :: Int -> Int32) $ fst (loc_start srcLoc) + pCpuTime <- picoToNanoSeconds <$> getCPUTime + wTimeU <- getCurrentTime + let wTime = round $ diffUTCTime wTimeU epochTime * 1e9 + let mList = + [ Metric a namespace modName lnNum ThreadCpuTime evLoc b + , Metric a namespace modName lnNum Allocated evLoc c + , Metric a namespace modName lnNum SchedOut evLoc d + , Metric a namespace modName lnNum ProcessCpuTime evLoc pCpuTime + , Metric a namespace modName lnNum WallClockTime evLoc wTime + ] +{- + shouldStat <- testBitOn (fromEnum win) statEnv + if True + then printMetricList mList + else pure () +-} + printMetricList handle mList + +withEvLoc :: Q Exp -> Q Exp +withEvLoc f = do + Loc a b c d e <- location + appE f [| Loc a b c d e |] + +start :: Q Exp +start = do + Loc a b c d e <- location + [|eventGeneric id 0 Start (Loc a b c d e) perfHandle|] + +end :: Q Exp +end = do + Loc a b c d e <- location + [|eventGeneric id 0 End (Loc a b c d e) perfHandle|] + +record :: Q Exp +record = do + Loc a b c d e <- location + [|eventGeneric id 0 Record (Loc a b c d e) perfHandle|] + +restart :: Q Exp +restart = do + Loc a b c d e <- location + [|eventGeneric id 0 Restart (Loc a b c d e) perfHandle|] diff --git a/stat-collector-src/Main.hs b/stat-collector-src/Main.hs new file mode 100644 index 0000000..b8a8672 --- /dev/null +++ b/stat-collector-src/Main.hs @@ -0,0 +1,277 @@ +{-# LANGUAGE QuasiQuotes #-} +{-# LANGUAGE ScopedTypeVariables #-} + +module Main (main) where + +-------------------------------------------------------------------------------- +-- Imports +-------------------------------------------------------------------------------- + +-- import Control.Concurrent (threadDelay) +import Data.Int (Int32, Int64) +import System.Environment (getArgs) +import Data.Function ((&)) +import Data.List (foldl', uncons) +import Data.Map (Map) +import Data.Maybe (fromJust) +import Data.Word (Word8) +import Streamly.Data.Array (Array) +import Streamly.Data.Fold (Fold) +import Streamly.Data.Stream (Stream) +import Streamly.Internal.Data.Fold (Fold(..), Step(..)) +import Streamly.Unicode.String (str) +import System.IO (stdin) +import Data.Text.Format.Numbers (prettyI) + +import qualified Data.Text as Text +import qualified Data.Map as Map +import qualified Streamly.Data.Fold as Fold +import qualified Streamly.Data.Parser as Parser +import qualified Streamly.Data.Stream as Stream +import qualified Streamly.FileSystem.Handle as Handle +import qualified Streamly.Internal.Data.Array as Array +import qualified Streamly.Internal.Data.Binary.Parser as Parser + +import Stat + +-------------------------------------------------------------------------------- +-- Utils +-------------------------------------------------------------------------------- + +double :: Int -> Double +double = fromIntegral + +-------------------------------------------------------------------------------- +-- Types +-------------------------------------------------------------------------------- + +type ModuleName = String +type LineNum = Int32 +type PointId = (ModuleName, LineNum) +type ThreadId = Int32 +type Tag = String +type Value = Int64 + +data EventId = + EventId + { evTid :: ThreadId + , evCounter :: Counter + , evTag :: Tag + } + deriving (Eq, Ord, Show) + +data Event + = Event EventId Value + deriving (Show) + +getEventId :: Event -> EventId +getEventId (Event evId _) = evId + +getEventVal :: Event -> Value +getEventVal (Event _ evVal) = evVal + +-------------------------------------------------------------------------------- +-- Folds +-------------------------------------------------------------------------------- + +statsLayout :: [String] +statsLayout = + [ "latest", "total", "count", "avg", "minimum", "maximum", "stddev"] + +{-# INLINE stats #-} +stats :: Fold IO Int64 [(String, Int)] +stats = + Fold.lmap (fromIntegral :: Int64 -> Int) + $ Fold.distribute + [ fmap (\x -> ("latest", fromJust x)) Fold.latest + , fmap (\x -> ("total", x)) Fold.sum + , fmap (\x -> ("count", x)) Fold.length + , fmap (\x -> ("avg", round x)) (Fold.lmap double Fold.mean) + , fmap (\x -> ("minimum", fromJust x)) Fold.minimum + , fmap (\x -> ("maximum", fromJust x)) Fold.maximum + , fmap (\x -> ("stddev", round x)) (Fold.lmap double Fold.stdDev) + ] + +-------------------------------------------------------------------------------- +-- Parsing Input +-------------------------------------------------------------------------------- + +-- Use ParserK here? +-- There are only 1 inner bind, ParserD should work fine. +metricParser :: Parser.Parser Word8 IO Metric +metricParser = do + size64 <- Parser.int64le + let size = fromIntegral size64 - 8 + fmap Array.deserialize $ Parser.takeEQ size (Array.unsafeCreateOf size) + +-- parseMany can be implemented in a recursive manner using parserK +parseInputToEventStream + :: Stream IO (Array Word8) -> Stream IO Metric +parseInputToEventStream inp = + fmap f $ Stream.parseMany metricParser $ Array.concat inp + where + f (Left err) = error $ show err + f (Right v) = v + +-------------------------------------------------------------------------------- +-- Processing stats +-------------------------------------------------------------------------------- + +boundEvents :: Monad m => Fold m Metric (Maybe Event) +boundEvents = Fold step initial extract extract + where + initial = pure $ Partial (Nothing, Map.empty) + + alterFunc + :: Metric + -> Maybe [(PointId, Value)] + -> (Maybe Event, Maybe [(PointId, Value)]) + alterFunc (Metric _ _ m l _ Start val) Nothing = + (Nothing, Just [((m, l), val)]) + alterFunc (Metric _ _ m l _ Start val) (Just xs) = + (Nothing, Just (((m, l), val):xs)) + alterFunc (Metric tid ns md ln counter End val) (Just stk) = + case uncons stk of + Just (((md1, ln1), prevVal), stk1) -> + let lnStr = show ln + ln1Str = show ln1 + nsStr = show ns + win = [str|#{nsStr}[#{md1}:#{ln1Str}-#{md}:#{lnStr}]|] + in ( Just (Event (EventId tid counter win) (val - prevVal)) + , Just stk1 + ) + Nothing -> error "boundEvents: Empty stack" + alterFunc (Metric tid ns md ln counter Restart val) (Just stk) = + case uncons stk of + Just (((md1, ln1), prevVal), stk1) -> + let lnStr = show ln + ln1Str = show ln1 + nsStr = show ns + win = [str|#{nsStr}[#{md1}:#{ln1Str}-#{md}:#{lnStr}]|] + in ( Just (Event (EventId tid counter win) (val - prevVal)) + , Just (((md, ln) ,val):stk1) + ) + Nothing -> error "boundEvents: Empty stack" + alterFunc (Metric tid ns md ln counter Record val) (Just stk) = + case uncons stk of + Just (((md1, ln1), prevVal), _) -> + let lnStr = show ln + ln1Str = show ln1 + nsStr = show ns + win = [str|#{nsStr}[#{md1}:#{ln1Str}-#{md}:#{lnStr}]|] + in ( Just (Event (EventId tid counter win) (val - prevVal)) + , Just stk + ) + Nothing -> error "boundEvents: Empty stack" + alterFunc _ Nothing = (Nothing, Nothing) + + step (_, mp) uev@(Metric tid ns _ _ counter _ _) = + pure $ Partial + $ Map.alterF (alterFunc uev) (ns, tid, counter) mp + + extract (ev, _) = pure ev + +statCollector :: Fold IO Event (Map EventId [(String, Int)]) +statCollector = + Fold.demuxToMap getEventId deriveFold + + where + + deriveFold _ = pure (Fold.lmap getEventVal stats) + +-------------------------------------------------------------------------------- +-- Printing stats +-------------------------------------------------------------------------------- + +fill :: Int -> String -> String +fill i x = + let len = length x + in replicate (i - len) ' ' ++ x + +printTable :: [[String]] -> IO () +printTable rows = do + case map (unwords . fillRow) rows of + [] -> putStrLn "printTable: empty rows" + (header:rest) -> putStrLn $ unlines $ header:unwords separatorRow:rest + + where + + rowLengths = map (map length) rows -- [[Int]] + maxLengths = foldl' (zipWith max) (head rowLengths) rowLengths + separatorRow = map (\n -> replicate n '-') maxLengths + fillRow r = zipWith (\n x -> fill n x) maxLengths r + +printStatsMap + :: (Show a, Show b, Show c, Ord a, Ord b) + => (EventId -> a) + -> (EventId -> b) + -> (EventId -> c) + -> Map EventId [(String, Int)] + -> IO () +printStatsMap index1 index2 index3 mp0 = + mapM_ printOneTable $ Map.toList $ anchorOnTidAndCounter mp0 + + where + + alterFunction v Nothing = Just [v] + alterFunction v (Just v0) = Just (v:v0) + + foldingFunction mp ev v = + Map.alter (alterFunction (index3 ev, v)) (index1 ev, index2 ev) mp + + anchorOnTidAndCounter mp = + Map.foldlWithKey' foldingFunction Map.empty mp + + printOneTable ((i1, i2), rows) = do + let i1Str = show i1 + i2Str = show i2 + headingL1 = [str|Index1: #{i1Str}|] + headingL2 = [str|Index2: #{i2Str}|] + divider = replicate (max (length headingL2) (length headingL1)) '-' + putStrLn divider + putStrLn headingL1 + putStrLn headingL2 + putStrLn divider + putStrLn "" + printTable + $ (:) tableHeader + $ map (\(i3, v) -> show i3 : map (pShowInt . snd) v) rows + putStrLn "" + + pShowInt = Text.unpack . prettyI (Just ',') + + tableHeader = "Index3":statsLayout + +-------------------------------------------------------------------------------- +-- Main +-------------------------------------------------------------------------------- + +main :: IO () +main = do + statsMap <- + Stream.unfold Handle.chunkReader stdin + & parseInputToEventStream + & Stream.scan boundEvents + & Stream.catMaybes + & Stream.fold statCollector + (arg:[]) <- getArgs + case arg of + "Tag" -> + printStatsMap + (evTid) + (evCounter) + (evTag) + statsMap + "Counter" -> + printStatsMap + (evTid) + (evTag) + (evCounter) + statsMap + "ThreadId" -> + printStatsMap + (evCounter) + (evTag) + (evTid) + statsMap + _ -> error "Undefined arg."