From 95c266fbe12fd71d89aa4296c7ec781695163696 Mon Sep 17 00:00:00 2001 From: Commelina Date: Fri, 19 Jan 2024 17:40:43 +0800 Subject: [PATCH] hstream: trim log which stores changelog of query state after snapshotting (#1745) --- .../src/HStream/Processing/Processor/ChangeLog.hs | 1 + hstream/src/HStream/Server/Handler/Common.hs | 6 ++++++ 2 files changed, 7 insertions(+) diff --git a/hstream-processing/src/HStream/Processing/Processor/ChangeLog.hs b/hstream-processing/src/HStream/Processing/Processor/ChangeLog.hs index bc4fe066a..76677ccc6 100644 --- a/hstream-processing/src/HStream/Processing/Processor/ChangeLog.hs +++ b/hstream-processing/src/HStream/Processing/Processor/ChangeLog.hs @@ -18,6 +18,7 @@ import qualified RIO.ByteString.Lazy as BL class ChangeLogger h where logChangelog :: h -> BL.ByteString -> IO () getChangelogProgress :: h -> IO Word64 -- FIXME: use type variable i + trimChangelog :: h -> Word64 -> IO () -- FIXME: use type variable i data StateStoreChangelog k v ser = CLKSPut Text k v -- HS.table: K/V; HG.aggregate: K/V; HTW.aggregate: K/V diff --git a/hstream/src/HStream/Server/Handler/Common.hs b/hstream/src/HStream/Server/Handler/Common.hs index f1f2ceb17..8ff364775 100644 --- a/hstream/src/HStream/Server/Handler/Common.hs +++ b/hstream/src/HStream/Server/Handler/Common.hs @@ -309,12 +309,14 @@ data QueryRunner = QueryRunner { instance ChangeLogger () where logChangelog () _ = return () getChangelogProgress () = return minBound + trimChangelog () _ = return () -- use logdevice stream instance ChangeLogger (S.LDClient, S.C_LogID) where logChangelog (ldClient, logId) bs = void $ S.append ldClient logId (lazyByteStringToBytes bs) Nothing getChangelogProgress (ldClient, logId) = S.getTailLSN ldClient logId + trimChangelog (ldClient, logId) lsn = S.trim ldClient logId lsn ---- store processing node states (snapshot) -- do nothing @@ -325,6 +327,8 @@ instance Snapshotter () where instance Snapshotter RocksDB.DB where snapshot db = RocksDB.put db def +-- | Do snapshot for a task, then trim old changelogs for this task. +-- May throw exceptions. doSnapshot :: (ChangeLogger h1, Snapshotter h2) => h1 -> h2 -> Task -> IO () doSnapshot h1 h2 Task{..} = do changelogTail <- getChangelogProgress h1 @@ -350,6 +354,8 @@ doSnapshot h1 h2 Task{..} = do valueSer = BL.toStrict $ Aeson.encode value snapshot h2 keySer valueSer Log.debug $ "Query " <> Log.build taskName <> ": I have successfully done a snapshot!" + trimChangelog h1 changelogTail + Log.debug $ "Query " <> Log.build taskName <> ": I have successfully trimmed the old changelog!" --------------------------------------------------------------------------------