From ccf1e594941b23061056104ae72e3cb9f229dbc5 Mon Sep 17 00:00:00 2001 From: Luke Tchang Date: Mon, 27 Nov 2023 20:34:15 -0500 Subject: [PATCH] batch preference in bundler (#654) * refactor batcher, server not compiling * batcher tests passing * add processor cli cmd * all builds * batcher tests passing again * clear timeout in balance monitor * renames and comments * changeset add * lint, prettier * fix dev package.json script * update window start, use passed in batch size no hardcode, unit tests check window start * fix bundler not setting window start properly * add gas price multiplier flag for test actor to test diff batch pref * changeset add * rename window start + private prefix method --- .changeset/famous-eggs-fold.md | 5 + .changeset/green-drinks-repair.md | 5 + .changeset/tasty-suits-bathe.md | 5 + .changeset/weak-moles-sit.md | 5 + actors/balance-monitor/src/monitor.ts | 4 +- actors/bundler/.env.example | 3 - actors/bundler/package.json | 5 +- actors/bundler/src/batcher.ts | 306 ++++++++++-------- .../bundler/src/cli/commands/run/batcher.ts | 48 --- actors/bundler/src/cli/commands/run/index.ts | 6 +- .../bundler/src/cli/commands/run/processor.ts | 99 ++++++ .../bundler/src/cli/commands/run/submitter.ts | 59 ---- actors/bundler/src/db/batcherdb.ts | 49 --- actors/bundler/src/db/bufferdb.ts | 90 ++++++ actors/bundler/src/db/index.ts | 2 +- actors/bundler/src/opValidation.ts | 2 +- actors/bundler/src/routes.ts | 57 ++-- actors/bundler/src/server.ts | 28 +- actors/bundler/src/types.ts | 1 - actors/bundler/src/utils.ts | 4 + actors/bundler/test/Batcher.test.ts | 120 +++---- actors/bundler/test/BatcherDB.test.ts | 72 ----- actors/bundler/test/BufferDB.test.ts | 75 +++++ actors/test-actor/src/actor.ts | 33 +- actors/test-actor/src/cli/run.ts | 7 + packages/e2e-tests/src/bundler.ts | 8 +- 26 files changed, 602 insertions(+), 496 deletions(-) create mode 100644 .changeset/famous-eggs-fold.md create mode 100644 .changeset/green-drinks-repair.md create mode 100644 .changeset/tasty-suits-bathe.md create mode 100644 .changeset/weak-moles-sit.md delete mode 100644 actors/bundler/src/cli/commands/run/batcher.ts create mode 100644 actors/bundler/src/cli/commands/run/processor.ts delete mode 100644 actors/bundler/src/cli/commands/run/submitter.ts delete mode 100644 actors/bundler/src/db/batcherdb.ts create mode 100644 actors/bundler/src/db/bufferdb.ts delete mode 100644 actors/bundler/test/BatcherDB.test.ts create mode 100644 actors/bundler/test/BufferDB.test.ts diff --git a/.changeset/famous-eggs-fold.md b/.changeset/famous-eggs-fold.md new file mode 100644 index 000000000..08d564a99 --- /dev/null +++ b/.changeset/famous-eggs-fold.md @@ -0,0 +1,5 @@ +--- +"@nocturne-xyz/test-actor": minor +--- + +add --op-gas-price-multiplier CLI arg to allow test actor to test batch preference diff --git a/.changeset/green-drinks-repair.md b/.changeset/green-drinks-repair.md new file mode 100644 index 000000000..28f115254 --- /dev/null +++ b/.changeset/green-drinks-repair.md @@ -0,0 +1,5 @@ +--- +"@nocturne-xyz/bundler": major +--- + +consolidate batcher and submitter into the processor command diff --git a/.changeset/tasty-suits-bathe.md b/.changeset/tasty-suits-bathe.md new file mode 100644 index 000000000..bff762533 --- /dev/null +++ b/.changeset/tasty-suits-bathe.md @@ -0,0 +1,5 @@ +--- +"@nocturne-xyz/balance-monitor": patch +--- + +fix start() function to clear interval on teardown diff --git a/.changeset/weak-moles-sit.md b/.changeset/weak-moles-sit.md new file mode 100644 index 000000000..a5ac0fd8b --- /dev/null +++ b/.changeset/weak-moles-sit.md @@ -0,0 +1,5 @@ +--- +"@nocturne-xyz/bundler": minor +--- + +add batching preference functionality by refactoring batcher and server to support fast/medium/slow buffers diff --git a/actors/balance-monitor/src/monitor.ts b/actors/balance-monitor/src/monitor.ts index ab85bb191..937ad9909 100644 --- a/actors/balance-monitor/src/monitor.ts +++ b/actors/balance-monitor/src/monitor.ts @@ -196,12 +196,14 @@ export class BalanceMonitor { ); this.registerMetrics(); + let timeoutId: NodeJS.Timeout; const promise = new Promise((resolve) => { const poll = async () => { this.logger.info("polling..."); if (this.closed) { this.logger.info("Balance Monitor stopping..."); + clearTimeout(timeoutId); resolve(); return; } @@ -211,7 +213,7 @@ export class BalanceMonitor { // balance monitor metrics piping is implicit, automatically executed via register metrics // callbacks - setTimeout(poll, 60_000); + timeoutId = setTimeout(poll, 60_000); }; void poll(); diff --git a/actors/bundler/.env.example b/actors/bundler/.env.example index 31cebe2a3..3c8bec114 100644 --- a/actors/bundler/.env.example +++ b/actors/bundler/.env.example @@ -1,9 +1,6 @@ REDIS_URL= REDIS_PASSWORD= -CONFIG_NAME_OR_PATH= -MAX_LATENCY= - OZ_RELAYER_API_KEY= OZ_RELAYER_API_SECRET= OZ_RELAYER_SPEED= diff --git a/actors/bundler/package.json b/actors/bundler/package.json index 669f592c0..06004007f 100644 --- a/actors/bundler/package.json +++ b/actors/bundler/package.json @@ -17,11 +17,10 @@ "bundler-cli": "npx ts-node src/cli/index.ts", "check": "tsc --noEmit", "clean": "rm -rf .turbo dist", - "dev": "yarn dev:env && concurrently --kill-others \"yarn dev:server\" \"yarn dev:batcher\" \"yarn dev:submitter\"", - "dev:batcher": "nodemon src/cli/index.ts run batcher --config-name-or-path localhost --max-latency 60 --log-dir ../../logs/bundler-batcher", + "dev": "yarn dev:env && concurrently --kill-others \"yarn dev:server\" \"yarn dev:processor\"", "dev:env": "cp .env.dev .env", "dev:server": "nodemon src/cli/index.ts run server --bundler-address 0x15d34AAf54267DB7D7c367839AAf71A00a2C6A65 --config-name-or-path localhost --port 3000 --log-dir ../../logs/bundler-server --log-level info", - "dev:submitter": "nodemon src/cli/index.ts run submitter --config-name-or-path localhost --log-dir ../../logs/bundler-submitter", + "dev:processor": "nodemon src/cli/index.ts run processor --config-name-or-path localhost --batch-poll-interval 10 --log-dir ../../logs/bundler-submitter", "lint": "eslint --fix src --ext .ts", "prettier:check": "prettier --check ./src ./test", "prettier:write": "prettier --write ./src ./test", diff --git a/actors/bundler/src/batcher.ts b/actors/bundler/src/batcher.ts index 240ec654e..55fc2faf1 100644 --- a/actors/bundler/src/batcher.ts +++ b/actors/bundler/src/batcher.ts @@ -1,6 +1,6 @@ import IORedis from "ioredis"; -import { Job, Queue, Worker } from "bullmq"; -import { BatcherDB, StatusDB } from "./db"; +import { Queue } from "bullmq"; +import { BufferDB, RedisTransaction, StatusDB } from "./db"; import { OperationStatus, OperationTrait, @@ -10,12 +10,10 @@ import { OperationBatchJobData, OPERATION_BATCH_QUEUE, OPERATION_BATCH_JOB_TAG, - OperationJobData, - SUBMITTABLE_OPERATION_QUEUE, ACTOR_NAME, } from "./types"; import * as JSON from "bigint-json-serialization"; -import { actorChain } from "./utils"; +import { unixTimestampSeconds } from "./utils"; import { Logger } from "winston"; import { ActorHandle, @@ -27,41 +25,63 @@ import * as ot from "@opentelemetry/api"; const COMPONENT_NAME = "batcher"; export interface BundlerBatcherMetrics { - relayRequestsEnqueuedInBatcherDBCounter: ot.Counter; + relayRequestsEnqueuedInBufferDBCounter: ot.Counter; relayRequestsBatchedCounter: ot.Counter; batchesCreatedCounter: ot.Counter; batchLatencyHistogram: ot.Histogram; batchSizeHistogram: ot.Histogram; } +export interface BatcherOpts { + pollIntervalSeconds?: number; + mediumBatchLatencySeconds?: number; + slowBatchLatencySeconds?: number; + mediumBatchSize?: number; + slowBatchSize?: number; +} + export class BundlerBatcher { redis: IORedis; statusDB: StatusDB; - batcherDB: BatcherDB; + fastBuffer: BufferDB; + mediumBuffer: BufferDB; + slowBuffer: BufferDB; outboundQueue: Queue; logger: Logger; metrics: BundlerBatcherMetrics; - readonly MAX_BATCH_LATENCY_SECS: number = 60; - readonly BATCH_SIZE: number = 8; - - constructor( - redis: IORedis, - logger: Logger, - maxLatencySeconds?: number, - batchSize?: number - ) { - if (batchSize) { - this.BATCH_SIZE = batchSize; + stopped = false; + + readonly pollIntervalSeconds: number = 30 * 60; // default 30 minutes + readonly mediumBatchLatencySeconds: number = 3 * 60 * 60; // default 3 hours + readonly slowBatchLatencySeconds: number = 6 * 60 * 60; // default 6 hours + + readonly mediumBatchSize: number = 3; // default 3 existing ops, next op will be 4th + readonly slowBatchSize: number = 7; // default 7 existing ops, next op will be 8th + + constructor(redis: IORedis, logger: Logger, opts?: BatcherOpts) { + if (opts?.pollIntervalSeconds) { + this.pollIntervalSeconds = opts.pollIntervalSeconds; + } + if (opts?.mediumBatchLatencySeconds) { + this.mediumBatchLatencySeconds = opts.mediumBatchLatencySeconds; + } + if (opts?.slowBatchLatencySeconds) { + this.slowBatchLatencySeconds = opts.slowBatchLatencySeconds; } - if (maxLatencySeconds) { - this.MAX_BATCH_LATENCY_SECS = maxLatencySeconds; + if (opts?.mediumBatchSize) { + this.mediumBatchSize = opts.mediumBatchSize; + } + if (opts?.slowBatchSize) { + this.slowBatchSize = opts.slowBatchSize; } this.redis = redis; this.logger = logger; this.statusDB = new StatusDB(redis); - this.batcherDB = new BatcherDB(redis); + this.fastBuffer = new BufferDB("FAST", redis); + this.mediumBuffer = new BufferDB("MEDIUM", redis); + this.slowBuffer = new BufferDB("SLOW", redis); this.outboundQueue = new Queue(OPERATION_BATCH_QUEUE, { connection: redis, }); @@ -79,7 +99,7 @@ export class BundlerBatcher { ); this.metrics = { - relayRequestsEnqueuedInBatcherDBCounter: createCounter( + relayRequestsEnqueuedInBufferDBCounter: createCounter( "relay_requests_enqueued_in_batcher_db.counter", "Number of relay requests enqueued in batcher DB" ), @@ -102,143 +122,145 @@ export class BundlerBatcher { }; } - start(): ActorHandle { - const batcher = this.startBatcher(); - const queuer = this.startQueuer(); - return actorChain(batcher, queuer); + async tryCreateBatch(): Promise { + const batch = []; + const [fastBatch, mediumBatch, slowBatch] = [ + await this.fastBuffer.getBatch(), + await this.mediumBuffer.getBatch(), + await this.slowBuffer.getBatch(), + ]; + const [fastSize, mediumSize, slowSize] = [ + fastBatch?.length ?? 0, + mediumBatch?.length ?? 0, + slowBatch?.length ?? 0, + ]; + + const currentTime = unixTimestampSeconds(); + const [_fastTimestamp, mediumTimestamp, slowTimestamp] = [ + (await this.fastBuffer.windowStartTime()) ?? currentTime, + (await this.mediumBuffer.windowStartTime()) ?? currentTime, + (await this.slowBuffer.windowStartTime()) ?? currentTime, + ]; + + const bufferUpdateTransactions: RedisTransaction[] = []; + + if ( + slowBatch && + (fastSize + mediumSize + slowSize >= this.slowBatchSize || + currentTime - slowTimestamp >= this.slowBatchLatencySeconds) + ) { + this.logger.info("creating slow batch", { + slowBatch, + fastSize, + mediumSize, + slowSize, + timeDiff: currentTime - slowTimestamp, + }); + batch.push(...slowBatch); + bufferUpdateTransactions.push( + this.slowBuffer.getPopTransaction(slowSize), + this.slowBuffer.getClearWindowStartTimeTransaction() + ); + } + + if ( + mediumBatch && + (batch.length + mediumSize + fastSize >= this.mediumBatchSize || + currentTime - mediumTimestamp >= this.mediumBatchLatencySeconds) + ) { + this.logger.info("creating medium batch", { + mediumBatch, + fastSize, + mediumSize, + slowSize, + timeDiff: currentTime - mediumTimestamp, + }); + batch.push(...mediumBatch); + bufferUpdateTransactions.push( + this.mediumBuffer.getPopTransaction(mediumSize), + this.mediumBuffer.getClearWindowStartTimeTransaction() + ); + } + + if (fastBatch) { + this.logger.info("creating fast batch", { + fastBatch, + fastSize, + mediumSize, + slowSize, + }); + batch.push(...fastBatch); + bufferUpdateTransactions.push( + this.fastBuffer.getPopTransaction(fastSize), + this.fastBuffer.getClearWindowStartTimeTransaction() + ); + } + + // add batch to outbound queue if non empty + if (batch.length > 0) { + this.logger.info("adding batch to outbound queue", { batch }); + const operationBatchJson = JSON.stringify(batch); + const operationBatchData: OperationBatchJobData = { + operationBatchJson, + }; + await this.outboundQueue.add(OPERATION_BATCH_JOB_TAG, operationBatchData); + + // TODO: if crash happens between queue.add and status setting, state will be out of sync + // create set status redis txs + const setJobStatusTransactions = batch.map((op) => { + const jobId = OperationTrait.computeDigest(op).toString(); + return this.statusDB.getSetJobStatusTransaction( + jobId, + OperationStatus.IN_BATCH + ); + }); + + // execute set status + clear buffer txs + this.logger.debug("clearing buffers and setting job statuses"); + const allTransactions = setJobStatusTransactions.concat( + bufferUpdateTransactions + ); + await this.redis.multi(allTransactions).exec((maybeErr) => { + if (maybeErr) { + const msg = `failed to set operation job and/or remove batch from DB: ${maybeErr}`; + this.logger.error(msg); + throw Error(msg); + } + }); + + this.metrics.relayRequestsBatchedCounter.add(batch.length); + this.metrics.batchesCreatedCounter.add(1); + this.metrics.batchSizeHistogram.record(batch.length); + } } - startBatcher(): ActorHandle { + start(): ActorHandle { const logger = this.logger.child({ function: "batcher" }); - logger.info("starting batcher..."); + let timeoutId: NodeJS.Timeout; - let stopped = false; const promise = new Promise((resolve) => { - let counterSeconds = 0; const poll = async () => { - const batch = await this.batcherDB.getBatch(this.BATCH_SIZE); - if (batch) { - if ( - (batch && batch.length >= this.BATCH_SIZE) || - (counterSeconds >= this.MAX_BATCH_LATENCY_SECS && batch.length > 0) - ) { - const operationBatchJson = JSON.stringify(batch); - const operationBatchData: OperationBatchJobData = { - operationBatchJson, - }; - - // TODO: race condition where crash occurs between queue.add and - // batcherDB.pop - await this.outboundQueue.add( - OPERATION_BATCH_JOB_TAG, - operationBatchData - ); - - const popTransaction = this.batcherDB.getPopTransaction( - batch.length - ); - const setJobStatusTransactions = batch.map((op) => { - const jobId = OperationTrait.computeDigest(op).toString(); - return this.statusDB.getSetJobStatusTransaction( - jobId, - OperationStatus.IN_BATCH - ); - }); - const allTransactions = setJobStatusTransactions.concat([ - popTransaction, - ]); - - logger.info(`Creating batch. batch size: ${batch.length}`); - await this.redis.multi(allTransactions).exec((maybeErr) => { - if (maybeErr) { - const msg = `failed to set operation job and/or remove batch from DB: ${maybeErr}`; - logger.error(msg); - throw Error(msg); - } - }); - - // Update metrics - this.metrics.relayRequestsBatchedCounter.add(batch.length); - this.metrics.batchesCreatedCounter.add(1); - this.metrics.batchLatencyHistogram.record(counterSeconds); - this.metrics.batchSizeHistogram.record(batch.length); - - counterSeconds = 0; - } - - counterSeconds += 1; - } + this.logger.info("polling..."); - if (stopped) { - logger.info("stopping..."); + if (this.stopped) { + this.logger.info("batcher stopping..."); + clearTimeout(timeoutId); resolve(); - } else { - setTimeout(poll, 1000); + return; } - }; - - void poll(); - }); - return { - promise, - teardown: async () => { - stopped = true; - await promise; - logger.info("teardown complete"); - }, - }; - } + await this.tryCreateBatch(); - startQueuer(): ActorHandle { - const logger = this.logger.child({ function: "queuer" }); - logger.info("starting queuer..."); - const queuer = new Worker( - SUBMITTABLE_OPERATION_QUEUE, - async (job: Job) => { - const operation = JSON.parse( - job.data.operationJson - ) as SubmittableOperationWithNetworkInfo; - - const batcherAddTransaction = - this.batcherDB.getAddTransaction(operation); - const setJobStatusTransaction = - this.statusDB.getSetJobStatusTransaction( - job.id!, - OperationStatus.PRE_BATCH - ); - const allTransactions = [batcherAddTransaction].concat([ - setJobStatusTransaction, - ]); - - logger.info(`Adding operation to batcher DB. op digest: ${job.id}`); - await this.redis.multi(allTransactions).exec((maybeErr) => { - if (maybeErr) { - const msg = `failed to execute batcher add and set job status transaction: ${maybeErr}`; - logger.error(msg); - throw new Error(msg); - } - }); - - this.metrics.relayRequestsEnqueuedInBatcherDBCounter.add(1); - }, - { - connection: this.redis, - autorun: true, - } - ); + timeoutId = setTimeout(poll, this.pollIntervalSeconds * 1000); + }; - const promise = new Promise((resolve) => { - queuer.on("closed", () => { - logger.info("stopping..."); - resolve(); - }); + void poll(); }); return { promise, teardown: async () => { - await queuer.close(); + this.stopped = true; await promise; logger.info("teardown complete"); }, diff --git a/actors/bundler/src/cli/commands/run/batcher.ts b/actors/bundler/src/cli/commands/run/batcher.ts deleted file mode 100644 index 0be85f6ec..000000000 --- a/actors/bundler/src/cli/commands/run/batcher.ts +++ /dev/null @@ -1,48 +0,0 @@ -import { Command } from "commander"; -import { BundlerBatcher } from "../../../batcher"; -import { makeLogger } from "@nocturne-xyz/offchain-utils"; -import { getRedis } from "./utils"; -import { extractConfigName } from "@nocturne-xyz/config"; - -const runBatcher = new Command("batcher") - .summary("run bundler batcher") - .description("must supply .env file with REDIS_URL and REDIS_PASSWORD.") - .requiredOption( - "--config-name-or-path ", - "config name or path to Nocturne contract JSON config file" - ) - .option("--batch-size ", "batch size") - .option( - "--max-latency ", - "max latency bundler will wait until creating a bundle in seconds" - ) - .option( - "--log-dir ", - "directory to write logs to. if not given, logs will only be emitted to stdout." - ) - .option("--log-level ", "min log importance to log to stdout") - .action(async (options) => { - const { configNameOrPath, maxLatency, batchSize, logDir, logLevel } = - options; - - // TODO: consolidate batcher and submitter into one component (batcher doesn't need config name) - const configName = extractConfigName(configNameOrPath); - const logger = makeLogger( - configName, - "bundler", - "batcher", - logLevel, - logDir - ); - const batcher = new BundlerBatcher( - getRedis(), - logger, - maxLatency, - batchSize - ); - - const { promise } = batcher.start(); - await promise; - }); - -export default runBatcher; diff --git a/actors/bundler/src/cli/commands/run/index.ts b/actors/bundler/src/cli/commands/run/index.ts index 57697b081..aef612694 100644 --- a/actors/bundler/src/cli/commands/run/index.ts +++ b/actors/bundler/src/cli/commands/run/index.ts @@ -1,13 +1,11 @@ import { Command } from "commander"; -import runBatcher from "./batcher"; import runServer from "./server"; -import runSubmitter from "./submitter"; +import runProcessor from "./processor"; const run = new Command("run").description( "run a bundler component (server, batcher, or submitter)" ); run.addCommand(runServer); -run.addCommand(runBatcher); -run.addCommand(runSubmitter); +run.addCommand(runProcessor); export default run; diff --git a/actors/bundler/src/cli/commands/run/processor.ts b/actors/bundler/src/cli/commands/run/processor.ts new file mode 100644 index 000000000..e49d1b010 --- /dev/null +++ b/actors/bundler/src/cli/commands/run/processor.ts @@ -0,0 +1,99 @@ +import { Command } from "commander"; +import { BundlerBatcher } from "../../../batcher"; +import { + getEthersProviderFromEnv, + getTxSubmitterFromEnv, + makeLogger, +} from "@nocturne-xyz/offchain-utils"; +import { getRedis } from "./utils"; +import { extractConfigName, loadNocturneConfig } from "@nocturne-xyz/config"; +import { BundlerSubmitter } from "../../../submitter"; + +const runProcessor = new Command("processor") + .summary("run bundler processor which batches and submits operations") + .description( + "must supply .env file with REDIS_URL,REDIS_PASSWORD, RPC_URL, and TX_SIGNER_KEY." + ) + .requiredOption( + "--config-name-or-path ", + "config name or path to Nocturne contract JSON config file" + ) + .option("--batch-poll-interval ", "batch poll interval in seconds") + .option("--medium-batch-size ", "batch size") + .option("--slow-batch-size ", "batch size") + .option( + "--medium-batch-latency ", + "max latency batcher will wait before force flushing medium ops into batch" + ) + .option( + "--slow-batch-latency ", + "max latency batcher will wait before force flushing slow ops into batch" + ) + .option( + "--finality-blocks ", + "number of confirmations to wait for before considering a submitted op finalized", + parseInt + ) + .option( + "--log-dir ", + "directory to write logs to. if not given, logs will only be emitted to stdout." + ) + .option("--log-level ", "min log importance to log to stdout") + .action(async (options) => { + const { + configNameOrPath, + batchPollInterval, + mediumBatchSize, + slowBatchSize, + mediumBatchLatency, + slowBatchLatency, + finalityBlocks, + logDir, + logLevel, + } = options; + + const config = loadNocturneConfig(configNameOrPath); + const configName = extractConfigName(configNameOrPath); + + const batcherLogger = makeLogger( + configName, + "bundler", + "batcher", + logLevel, + logDir + ); + const batcher = new BundlerBatcher(getRedis(), batcherLogger, { + pollIntervalSeconds: batchPollInterval, + mediumBatchSize, + slowBatchSize, + mediumBatchLatencySeconds: mediumBatchLatency, + slowBatchLatencySeconds: slowBatchLatency, + }); + + const provider = getEthersProviderFromEnv(); + const txSubmitter = getTxSubmitterFromEnv(); + + const submitterLogger = makeLogger( + configName, + "bundler", + "submitter", + logLevel, + logDir + ); + const submitter = new BundlerSubmitter( + config.tellerAddress, + config.handlerAddress, + provider, + txSubmitter, + getRedis(), + submitterLogger, + finalityBlocks ?? config.offchain.finalityBlocks + ); + + const batcherHandle = batcher.start(); + const submitterHandle = submitter.start(); + + await Promise.all([batcherHandle.promise, submitterHandle.promise]); + }); + +export default runProcessor; diff --git a/actors/bundler/src/cli/commands/run/submitter.ts b/actors/bundler/src/cli/commands/run/submitter.ts deleted file mode 100644 index 6ca202d0c..000000000 --- a/actors/bundler/src/cli/commands/run/submitter.ts +++ /dev/null @@ -1,59 +0,0 @@ -import { Command } from "commander"; -import { BundlerSubmitter } from "../../../submitter"; -import { - makeLogger, - getEthersProviderFromEnv, - getTxSubmitterFromEnv, -} from "@nocturne-xyz/offchain-utils"; -import { getRedis } from "./utils"; -import { extractConfigName, loadNocturneConfig } from "@nocturne-xyz/config"; - -const runSubmitter = new Command("submitter") - .summary("run bundler submitter") - .description( - "must supply .env file with REDIS_URL,REDIS_PASSWORD, RPC_URL, and TX_SIGNER_KEY. must also supply configPathOrName as an option." - ) - .requiredOption( - "--config-name-or-path ", - "config name or path to Nocturne contract JSON config file" - ) - .option( - "--log-dir ", - "directory to write logs to. if not given, logs will only be emitted to stdout." - ) - .option("--log-level ", "min log importance to log to stdout.") - .option( - "--finality-blocks ", - "number of confirmations to wait for before considering a submitted op finalized", - parseInt - ) - .action(async (options) => { - const { configNameOrPath, logDir, logLevel, finalityBlocks } = options; - const config = loadNocturneConfig(configNameOrPath); - - const provider = getEthersProviderFromEnv(); - const txSubmitter = getTxSubmitterFromEnv(); - - const configName = extractConfigName(configNameOrPath); - const logger = makeLogger( - configName, - "bundler", - "submitter", - logLevel, - logDir - ); - const submitter = new BundlerSubmitter( - config.tellerAddress, - config.handlerAddress, - provider, - txSubmitter, - getRedis(), - logger, - finalityBlocks ?? config.offchain.finalityBlocks - ); - - const { promise } = submitter.start(); - await promise; - }); - -export default runSubmitter; diff --git a/actors/bundler/src/db/batcherdb.ts b/actors/bundler/src/db/batcherdb.ts deleted file mode 100644 index 05231b7ce..000000000 --- a/actors/bundler/src/db/batcherdb.ts +++ /dev/null @@ -1,49 +0,0 @@ -import IORedis from "ioredis"; -import * as JSON from "bigint-json-serialization"; -import { RedisTransaction } from "."; - -const BATCH_DB_NAME = "BATCH_DB"; - -export class BatcherDB { - redis: IORedis; - - constructor(redis: IORedis) { - this.redis = redis; - } - - async add(elem: T): Promise { - await this.redis.rpush(BATCH_DB_NAME, JSON.stringify(elem)); - return true; - } - - async getBatch(count: number, exact = false): Promise { - const stringifiedItems = await this.redis.lrange(BATCH_DB_NAME, 0, count); - - if (stringifiedItems.length == 0) { - return undefined; - } - if (exact && stringifiedItems.length != count) { - return undefined; - } - - return stringifiedItems.map(JSON.parse) as T[]; - } - - async pop(count: number): Promise { - const stringifiedItems = await this.redis.lpop(BATCH_DB_NAME, count); - - if (!stringifiedItems) { - return undefined; - } - - return stringifiedItems.map(JSON.parse) as T[]; - } - - getAddTransaction(elem: T): RedisTransaction { - return ["rpush", BATCH_DB_NAME, JSON.stringify(elem)]; - } - - getPopTransaction(count: number): RedisTransaction { - return ["lpop", BATCH_DB_NAME, count.toString()]; - } -} diff --git a/actors/bundler/src/db/bufferdb.ts b/actors/bundler/src/db/bufferdb.ts new file mode 100644 index 000000000..573328fef --- /dev/null +++ b/actors/bundler/src/db/bufferdb.ts @@ -0,0 +1,90 @@ +import IORedis from "ioredis"; +import * as JSON from "bigint-json-serialization"; +import { RedisTransaction } from "."; +import { unixTimestampSeconds } from "../utils"; + +export type BufferSpeed = "FAST" | "MEDIUM" | "SLOW"; +const WINDOW_START_KEY = "_WINDOW_START"; + +export class BufferDB { + prefix: BufferSpeed; + redis: IORedis; + + constructor(prefix: BufferSpeed, redis: IORedis) { + this.prefix = prefix; + this.redis = redis; + } + + async add(elem: T): Promise { + await this.redis.rpush(this.prefix, JSON.stringify(elem)); + + // Set window start if not set + const windowStartTime = await this.windowStartTime(); + if (!windowStartTime) { + await this.setWindowStartTime(unixTimestampSeconds()); + } + + return true; + } + + async size(): Promise { + return this.redis.llen(this.prefix); + } + + private windowStartTimePrefix(): string { + return this.prefix + WINDOW_START_KEY; + } + + async setWindowStartTime(windowStartTime: number): Promise { + await this.redis.set( + this.windowStartTimePrefix(), + windowStartTime.toString() + ); + } + + async windowStartTime(): Promise { + const windowStartTime = await this.redis.get(this.windowStartTimePrefix()); + if (!windowStartTime) { + return undefined; + } + return Number(windowStartTime); + } + + async getBatch(count?: number, exact = false): Promise { + if (!count) { + count = await this.size(); + } + const stringifiedItems = await this.redis.lrange(this.prefix, 0, count); + + if (stringifiedItems.length == 0) { + return undefined; + } + if (exact && stringifiedItems.length != count) { + return undefined; + } + + return stringifiedItems.map(JSON.parse) as T[]; + } + + async pop(count: number): Promise { + const stringifiedItems = await this.redis.lpop(this.prefix, count); + + if (!stringifiedItems) { + return undefined; + } + + return stringifiedItems.map(JSON.parse) as T[]; + } + + getAddTransaction(elem: T): RedisTransaction { + return ["rpush", this.prefix, JSON.stringify(elem)]; + } + + getPopTransaction(count: number): RedisTransaction { + return ["lpop", this.prefix, count.toString()]; + } + + getClearWindowStartTimeTransaction(): RedisTransaction { + return ["set", this.windowStartTimePrefix(), ""]; + } +} diff --git a/actors/bundler/src/db/index.ts b/actors/bundler/src/db/index.ts index 59db3309f..b703d459a 100644 --- a/actors/bundler/src/db/index.ts +++ b/actors/bundler/src/db/index.ts @@ -1,4 +1,4 @@ -export * from "./batcherdb"; +export * from "./bufferdb"; export * from "./statusdb"; export * from "./nullifierdb"; diff --git a/actors/bundler/src/opValidation.ts b/actors/bundler/src/opValidation.ts index 4018b132b..ec7e82fed 100644 --- a/actors/bundler/src/opValidation.ts +++ b/actors/bundler/src/opValidation.ts @@ -99,7 +99,7 @@ export async function checkNotEnoughGasError( logger.debug( "checking that operation's gas price >= current chain's gas price" ); - const gasPrice = ((await provider.getGasPrice()).toBigInt() * 5n) / 10n; // -50% buffer + const gasPrice = ((await provider.getGasPrice()).toBigInt() * 1n) / 10n; // 10%, basically check for nonzero if (operation.gasPrice < gasPrice) { const id = OperationTrait.computeDigest(operation).toString(); return `operation ${id} gas price too low: ${operation.gasPrice} < current chain's gas price ${gasPrice}`; diff --git a/actors/bundler/src/routes.ts b/actors/bundler/src/routes.ts index eb54df225..467d340a9 100644 --- a/actors/bundler/src/routes.ts +++ b/actors/bundler/src/routes.ts @@ -1,10 +1,5 @@ -import { Queue } from "bullmq"; import IORedis from "ioredis"; -import { - OperationJobData, - PROVEN_OPERATION_JOB_TAG, - OpValidationFailure, -} from "./types"; +import { OpValidationFailure } from "./types"; import { Request, RequestHandler, Response } from "express"; import { OperationStatus, @@ -21,15 +16,18 @@ import { checkNullifierConflictError, checkRevertError, } from "./opValidation"; -import * as JSON from "bigint-json-serialization"; -import { NullifierDB, StatusDB } from "./db"; +import { BufferDB, NullifierDB, StatusDB } from "./db"; import { ethers } from "ethers"; import { tryParseRelayRequest } from "./request"; import { Logger } from "winston"; import { BundlerServerMetrics } from "./server"; export interface HandleRelayDeps { - queue: Queue; + buffers: { + fastBuffer: BufferDB; + mediumBuffer: BufferDB; + slowBuffer: BufferDB; + }; statusDB: StatusDB; nullifierDB: NullifierDB; redis: IORedis; @@ -43,7 +41,7 @@ export interface HandleRelayDeps { } export function makeRelayHandler({ - queue, + buffers, statusDB, nullifierDB, redis, @@ -141,10 +139,11 @@ export function makeRelayHandler({ let jobId; try { jobId = await postJob( - queue, + buffers, statusDB, nullifierDB, redis, + provider, childLogger, operation ); @@ -224,32 +223,40 @@ export function makeCheckNFHandler({ } async function postJob( - queue: Queue, + buffers: { + fastBuffer: BufferDB; + mediumBuffer: BufferDB; + slowBuffer: BufferDB; + }, statusDB: StatusDB, nullifierDB: NullifierDB, redis: IORedis, + provider: ethers.providers.Provider, logger: Logger, - operation: SubmittableOperationWithNetworkInfo + op: SubmittableOperationWithNetworkInfo ): Promise { - const jobId = OperationTrait.computeDigest(operation).toString(); - const operationJson = JSON.stringify(operation); - const jobData: OperationJobData = { - operationJson, - }; + const jobId = OperationTrait.computeDigest(op).toString(); - logger.info("posting op to queue"); - logger.debug("posting op to queue:", { jobData }); + const gasPrice = (await provider.getGasPrice()).toBigInt(); - // TODO: race condition between queue.add and redis transaction - await queue.add(PROVEN_OPERATION_JOB_TAG, jobData, { - jobId, - }); + if (op.gasPrice >= (gasPrice * 85n) / 100n) { + // client will pick 100%, 15% buffer for fluctuations + logger.info("posting op to fast queue", { op }); + await buffers.fastBuffer.add(op); + } else if (op.gasPrice >= (gasPrice * 60n) / 100n) { + // client will pick 70%, 10% buffer for fluctuations + logger.info("posting op to medium queue", { op }); + await buffers.mediumBuffer.add(op); + } else { + logger.info("posting op to slow queue", { op }); + await buffers.slowBuffer.add(op); + } const setJobStatusTransaction = statusDB.getSetJobStatusTransaction( jobId, OperationStatus.QUEUED ); - const addNfsTransaction = nullifierDB.getAddNullifierTransactions(operation); + const addNfsTransaction = nullifierDB.getAddNullifierTransactions(op); const allTransactions = addNfsTransaction.concat([setJobStatusTransaction]); await redis.multi(allTransactions).exec((maybeErr) => { if (maybeErr) { diff --git a/actors/bundler/src/server.ts b/actors/bundler/src/server.ts index 59c328ee5..9e467a22d 100644 --- a/actors/bundler/src/server.ts +++ b/actors/bundler/src/server.ts @@ -5,13 +5,8 @@ import { ethers } from "ethers"; import cors from "cors"; import { Logger } from "winston"; import morgan from "morgan"; -import { Queue } from "bullmq"; -import { - OperationJobData, - SUBMITTABLE_OPERATION_QUEUE, - ACTOR_NAME, -} from "./types"; -import { NullifierDB, StatusDB } from "./db"; +import { ACTOR_NAME } from "./types"; +import { BufferDB, NullifierDB, StatusDB } from "./db"; import { Handler, Handler__factory, @@ -31,7 +26,10 @@ import { makeCreateHistogramFn, } from "@nocturne-xyz/offchain-utils"; import * as ot from "@opentelemetry/api"; -import { Address } from "@nocturne-xyz/core"; +import { + Address, + SubmittableOperationWithNetworkInfo, +} from "@nocturne-xyz/core"; import { Knex } from "knex"; const COMPONENT_NAME = "server"; @@ -45,7 +43,9 @@ export interface BundlerServerMetrics { export class BundlerServer { redis: IORedis; pool: Knex; - queue: Queue; + fastBuffer: BufferDB; + mediumBuffer: BufferDB; + slowBuffer: BufferDB; statusDB: StatusDB; nullifierDB: NullifierDB; logger: Logger; @@ -68,7 +68,9 @@ export class BundlerServer { ) { this.redis = redis; this.pool = pool; - this.queue = new Queue(SUBMITTABLE_OPERATION_QUEUE, { connection: redis }); + this.fastBuffer = new BufferDB("FAST", redis); + this.mediumBuffer = new BufferDB("MEDIUM", redis); + this.slowBuffer = new BufferDB("SLOW", redis); this.statusDB = new StatusDB(redis); this.nullifierDB = new NullifierDB(redis); this.logger = logger; @@ -112,7 +114,11 @@ export class BundlerServer { router.post( "/relay", makeRelayHandler({ - queue: this.queue, + buffers: { + fastBuffer: this.fastBuffer, + mediumBuffer: this.mediumBuffer, + slowBuffer: this.slowBuffer, + }, statusDB: this.statusDB, nullifierDB: this.nullifierDB, redis: this.redis, diff --git a/actors/bundler/src/types.ts b/actors/bundler/src/types.ts index 40edfc6aa..4cc6437e6 100644 --- a/actors/bundler/src/types.ts +++ b/actors/bundler/src/types.ts @@ -1,6 +1,5 @@ export const ACTOR_NAME = "bundler"; -export const SUBMITTABLE_OPERATION_QUEUE = "SubmittableOperationQueue"; export const OPERATION_BATCH_QUEUE = "OperationBatchQueue"; export const PROVEN_OPERATION_JOB_TAG = "PROVEN_OPERATION"; export const OPERATION_BATCH_JOB_TAG = "OPERATION_BATCH"; diff --git a/actors/bundler/src/utils.ts b/actors/bundler/src/utils.ts index c51ec0372..d20f2bff9 100644 --- a/actors/bundler/src/utils.ts +++ b/actors/bundler/src/utils.ts @@ -20,3 +20,7 @@ export function actorChain(...actors: ActorHandle[]): ActorHandle { export function sleep(ms: number): Promise { return new Promise((resolve) => setTimeout(resolve, ms)); } + +export function unixTimestampSeconds(): number { + return Math.floor(Date.now() / 1000); +} diff --git a/actors/bundler/test/Batcher.test.ts b/actors/bundler/test/Batcher.test.ts index c8faf8851..f67fa124c 100644 --- a/actors/bundler/test/Batcher.test.ts +++ b/actors/bundler/test/Batcher.test.ts @@ -2,29 +2,30 @@ import "mocha"; import { expect } from "chai"; import IORedis from "ioredis"; import { RedisMemoryServer } from "redis-memory-server"; -import { Queue } from "bullmq"; import { BundlerBatcher } from "../src/batcher"; -import { - SUBMITTABLE_OPERATION_QUEUE, - OperationJobData, - PROVEN_OPERATION_JOB_TAG, -} from "../src/types"; import { VALID_RELAY_REQUEST } from "./utils"; import { makeTestLogger } from "@nocturne-xyz/offchain-utils"; import { sleep } from "../src/utils"; -import { BatcherDB, StatusDB } from "../src/db"; +import { BufferDB, BufferSpeed, StatusDB } from "../src/db"; import * as JSON from "bigint-json-serialization"; -import { OperationStatus, OperationTrait } from "@nocturne-xyz/core"; +import { + OperationStatus, + OperationTrait, + SubmittableOperationWithNetworkInfo, +} from "@nocturne-xyz/core"; const BATCH_SIZE = 8; -const MAX_BATCH_LATENCY_SECS = 5; describe("BundlerBatcher", async () => { let server: RedisMemoryServer; let redis: IORedis; let statusDB: StatusDB; - let batcherDB: BatcherDB; + let fastBuffer: BufferDB; + let mediumBuffer: BufferDB; + let slowBuffer: BufferDB; let batcher: BundlerBatcher; + let promise: Promise; + let teardown: () => Promise; const logger = makeTestLogger("bundler", "batcher"); before(async () => { @@ -35,102 +36,107 @@ describe("BundlerBatcher", async () => { redis = new IORedis(port, host); statusDB = new StatusDB(redis); - batcherDB = new BatcherDB(redis); - batcher = new BundlerBatcher( - redis, - logger, - MAX_BATCH_LATENCY_SECS, - BATCH_SIZE - ); // 6 second wait time + fastBuffer = new BufferDB("FAST", redis); + mediumBuffer = new BufferDB("MEDIUM", redis); + slowBuffer = new BufferDB("SLOW", redis); }); beforeEach(async () => { + batcher = new BundlerBatcher(redis, logger, { + pollIntervalSeconds: 3, + mediumBatchLatencySeconds: 5, + slowBatchLatencySeconds: 10, + }); + ({ promise, teardown } = batcher.start()); + }); + + afterEach(async () => { + await teardown(); await redis.flushall(); }); - async function enqueueOperation( - queue: Queue - ): Promise { + async function enqueueOperation(speed: BufferSpeed): Promise { let operationObj = VALID_RELAY_REQUEST.operation; operationObj.executionGasLimit = Math.floor(Math.random() * 100000).toString() + "n"; - const operationJson = JSON.stringify(operationObj); - const operation = JSON.parse(operationJson); - - const jobData: OperationJobData = { - operationJson, - }; - const jobId = OperationTrait.computeDigest(operation).toString(); - await queue.add(PROVEN_OPERATION_JOB_TAG, jobData, { - jobId, - }); + const operation = JSON.parse(JSON.stringify(operationObj)); + if (speed === "FAST") { + await fastBuffer.add(operation); + } else if (speed === "MEDIUM") { + await mediumBuffer.add(operation); + } else { + await slowBuffer.add(operation); + } - return jobId; + const digest = OperationTrait.computeDigest(operation).toString(); + await statusDB.setJobStatus(digest, OperationStatus.PRE_BATCH); + return digest; } - it("batches 8 inbound jobs as full batch", async () => { - const inboundQueue = new Queue( - SUBMITTABLE_OPERATION_QUEUE, - { - connection: redis, - } - ); - + it("batches 8 ops as full batch", async () => { expect(await batcher.outboundQueue.count()).to.equal(0); - const { promise } = batcher.start(); + expect(await slowBuffer.windowStartTime()).to.be.undefined; let jobIds: string[] = []; for (let i = 0; i < 6; i++) { - const jobId = await enqueueOperation(inboundQueue); + const jobId = await enqueueOperation("SLOW"); jobIds.push(jobId); } + // start window should be set now that elements have been added + expect(await slowBuffer.windowStartTime()).to.not.be.undefined; await Promise.race([sleep(1500), promise]); for (const id of jobIds) { const status = await statusDB.getJobStatus(id); expect(status).to.equal(OperationStatus.PRE_BATCH); } - expect((await batcherDB.getBatch(BATCH_SIZE))!.length).to.equal(6); + expect((await slowBuffer.getBatch(BATCH_SIZE))!.length).to.equal(6); for (let i = 6; i < 8; i++) { - const jobId = await enqueueOperation(inboundQueue); + const jobId = await enqueueOperation("MEDIUM"); jobIds.push(jobId); } await Promise.race([sleep(1500), promise]); expect(await batcher.outboundQueue.count()).to.equal(1); - expect(await batcherDB.getBatch(BATCH_SIZE)).to.be.undefined; + expect(await slowBuffer.windowStartTime()).to.be.undefined; + expect(await slowBuffer.getBatch(BATCH_SIZE)).to.be.undefined; + expect(await mediumBuffer.getBatch(BATCH_SIZE)).to.be.undefined; + expect(await slowBuffer.size()).to.equal(0); + expect(await mediumBuffer.size()).to.equal(0); for (const id of jobIds) { const status = await statusDB.getJobStatus(id); expect(status).to.equal(OperationStatus.IN_BATCH); } }); - it("batches 6 inbound jobs after passing wait time", async () => { - const inboundQueue = new Queue( - SUBMITTABLE_OPERATION_QUEUE, - { - connection: redis, - } - ); - + it("batches 6 slow ops after passing wait time", async () => { expect(await batcher.outboundQueue.count()).to.equal(0); - const { promise } = batcher.start(); + expect(await slowBuffer.windowStartTime()).to.be.undefined; let jobIds: string[] = []; for (let i = 0; i < 6; i++) { - const jobId = await enqueueOperation(inboundQueue); + const jobId = await enqueueOperation("SLOW"); jobIds.push(jobId); } - // Sleep 6 seconds, one more than wait time - await Promise.race([sleep(6000), promise]); + // Sleep 5 seconds, still more time to wait for slow + await Promise.race([sleep(5_000), promise]); + + expect(await slowBuffer.size()).to.equal(6); + expect(await slowBuffer.windowStartTime()).to.not.be.undefined; + expect(await batcher.outboundQueue.count()).to.equal(0); + + // Sleep 6 more seconds, now slow window should have passed + await Promise.race([sleep(8_000), promise]); expect(await batcher.outboundQueue.count()).to.equal(1); - expect(await batcherDB.getBatch(BATCH_SIZE)).to.be.undefined; + expect(await slowBuffer.windowStartTime()).to.be.undefined; + expect(await slowBuffer.getBatch(BATCH_SIZE)).to.be.undefined; + expect(await slowBuffer.size()).to.equal(0); for (const id of jobIds) { const status = await statusDB.getJobStatus(id); expect(status).to.equal(OperationStatus.IN_BATCH); diff --git a/actors/bundler/test/BatcherDB.test.ts b/actors/bundler/test/BatcherDB.test.ts deleted file mode 100644 index 2bb38ba03..000000000 --- a/actors/bundler/test/BatcherDB.test.ts +++ /dev/null @@ -1,72 +0,0 @@ -import "mocha"; -import { expect } from "chai"; -import IORedis from "ioredis"; -import { RedisMemoryServer } from "redis-memory-server"; -import { BatcherDB } from "../src/db"; - -const BATCH_SIZE = 8; - -describe("BatcherDB", async () => { - let server: RedisMemoryServer; - let redis: IORedis; - let batcherDB: BatcherDB; - - before(async () => { - server = await RedisMemoryServer.create(); - - const host = await server.getHost(); - const port = await server.getPort(); - redis = new IORedis(port, host); - - batcherDB = new BatcherDB(redis); - }); - - beforeEach(async () => { - await redis.flushall(); - }); - - async function fillBatch(): Promise { - for (let i = 0; i < BATCH_SIZE; i++) { - await batcherDB.add("ITEM_" + i.toString()); - } - } - - it("fills, detects, and pops batch", async () => { - expect(await batcherDB.getBatch(BATCH_SIZE)).to.be.undefined; - expect(await batcherDB.pop(BATCH_SIZE)).to.be.undefined; - - await fillBatch(); - expect((await batcherDB.getBatch(BATCH_SIZE))!.length).to.equal(BATCH_SIZE); - - const batch = await batcherDB.pop(BATCH_SIZE); - expect(batch!.length).to.equal(BATCH_SIZE); - expect(await batcherDB.getBatch(BATCH_SIZE)).to.be.undefined; - expect(await batcherDB.pop(BATCH_SIZE)).to.be.undefined; - }); - - it("responds to `exact` flag", async () => { - await fillBatch(); - - expect((await batcherDB.getBatch(BATCH_SIZE + 2))!.length).to.equal( - BATCH_SIZE - ); - expect(await batcherDB.getBatch(BATCH_SIZE + 2, true)).to.be.undefined; - }); - - it("produces add and pop transactions", async () => { - await fillBatch(); - expect((await batcherDB.getBatch(BATCH_SIZE))!.length).to.equal(BATCH_SIZE); - - const addTransaction = batcherDB.getAddTransaction( - `ITEM_${BATCH_SIZE + 1}` - ); - const popTransaction = batcherDB.getPopTransaction(BATCH_SIZE + 1); - - const res = await redis - .multi([addTransaction].concat([popTransaction])) - .exec(); - - expect((res![1][1] as Array).length).to.equal(9); - expect(await batcherDB.getBatch(BATCH_SIZE)).to.be.undefined; - }); -}); diff --git a/actors/bundler/test/BufferDB.test.ts b/actors/bundler/test/BufferDB.test.ts new file mode 100644 index 000000000..b4f8d1ccd --- /dev/null +++ b/actors/bundler/test/BufferDB.test.ts @@ -0,0 +1,75 @@ +import "mocha"; +import { expect } from "chai"; +import IORedis from "ioredis"; +import { RedisMemoryServer } from "redis-memory-server"; +import { BufferDB } from "../src/db"; +import { unixTimestampSeconds } from "../src/utils"; + +const BATCH_SIZE = 8; + +describe("BufferDB", async () => { + let server: RedisMemoryServer; + let redis: IORedis; + let bufferDB: BufferDB; + + before(async () => { + server = await RedisMemoryServer.create(); + + const host = await server.getHost(); + const port = await server.getPort(); + redis = new IORedis(port, host); + + bufferDB = new BufferDB("FAST", redis); + }); + + beforeEach(async () => { + await redis.flushall(); + }); + + async function fillBatch(): Promise { + for (let i = 0; i < BATCH_SIZE; i++) { + await bufferDB.add("ITEM_" + i.toString()); + } + } + + it("fills, detects, and pops batch", async () => { + expect(await bufferDB.getBatch(BATCH_SIZE)).to.be.undefined; + expect(await bufferDB.pop(BATCH_SIZE)).to.be.undefined; + expect(await bufferDB.windowStartTime()).to.be.undefined; + + await fillBatch(); + expect((await bufferDB.getBatch(BATCH_SIZE))!.length).to.equal(BATCH_SIZE); + expect(await bufferDB.windowStartTime()).to.be.lessThanOrEqual( + unixTimestampSeconds() + ); + + const batch = await bufferDB.pop(BATCH_SIZE); + expect(batch!.length).to.equal(BATCH_SIZE); + expect(await bufferDB.getBatch(BATCH_SIZE)).to.be.undefined; + expect(await bufferDB.pop(BATCH_SIZE)).to.be.undefined; + }); + + it("responds to `exact` flag", async () => { + await fillBatch(); + + expect((await bufferDB.getBatch(BATCH_SIZE + 2))!.length).to.equal( + BATCH_SIZE + ); + expect(await bufferDB.getBatch(BATCH_SIZE + 2, true)).to.be.undefined; + }); + + it("produces add and pop transactions", async () => { + await fillBatch(); + expect((await bufferDB.getBatch(BATCH_SIZE))!.length).to.equal(BATCH_SIZE); + + const addTransaction = bufferDB.getAddTransaction(`ITEM_${BATCH_SIZE + 1}`); + const popTransaction = bufferDB.getPopTransaction(BATCH_SIZE + 1); + + const res = await redis + .multi([addTransaction].concat([popTransaction])) + .exec(); + + expect((res![1][1] as Array).length).to.equal(9); + expect(await bufferDB.getBatch(BATCH_SIZE)).to.be.undefined; + }); +}); diff --git a/actors/test-actor/src/actor.ts b/actors/test-actor/src/actor.ts index 9711b21b2..e42ac6235 100644 --- a/actors/test-actor/src/actor.ts +++ b/actors/test-actor/src/actor.ts @@ -47,6 +47,7 @@ export class TestActorOpts { depositIntervalSeconds?: number; opIntervalSeconds?: number; syncIntervalSeconds?: number; + opGasPriceMultiplier?: number; fullBundleEvery?: number; onlyDeposits?: boolean; onlyOperations?: boolean; @@ -149,28 +150,26 @@ export class TestActor { } } - async runOps( - interval: number, - batchEvery?: number, - finalityBlocks?: number - ): Promise { + async runOps(interval: number, opts?: TestActorOpts): Promise { + const { fullBundleEvery, finalityBlocks } = opts ?? {}; + let i = 0; while (true) { await this.client.sync({ finalityBlocks }); const balances = await this.client.getAllAssetBalances(); this.logger.info("balances: ", balances); - if (batchEvery && i !== 0 && i % batchEvery === 0) { + if (fullBundleEvery && i !== 0 && i % fullBundleEvery === 0) { this.logger.info("performing 8 operations to fill a bundle"); for (let j = 0; j < 8; j++) { - await this.randomOperation(); + await this.randomOperation(opts); } } else { this.logger.info("performing operation"); - await this.randomOperation(); + await this.randomOperation(opts); } - this.logger.info(`sleeping for ${interval} seconds`); + this.logger.info(`sleeping for ${interval} ms`); await sleep(interval); i++; } @@ -189,17 +188,13 @@ export class TestActor { if (opts?.onlyDeposits) { await this.runDeposits(depositIntervalSeconds * 1000); } else if (opts?.onlyOperations) { - await this.runOps(opIntervalSeconds * 1000, 1); + await this.runOps(opIntervalSeconds * 1000, opts); } else if (opts?.onlySync) { await this.runSyncOnly(syncIntervalSeconds * 1000); } else { await Promise.all([ this.runDeposits(depositIntervalSeconds * 1000), - this.runOps( - opIntervalSeconds * 1000, - opts?.fullBundleEvery, - opts?.finalityBlocks - ), + this.runOps(opIntervalSeconds, opts), ]); } } @@ -392,7 +387,7 @@ export class TestActor { } } - private async randomOperation(): Promise { + private async randomOperation(opts?: TestActorOpts): Promise { // choose a random joinsplit asset for oprequest const maybeErc20AndValue = await this.getRandomErc20AndValue(); if (!maybeErc20AndValue) { @@ -417,7 +412,10 @@ export class TestActor { // prepare, sign, and prove try { - const preSign = await this.client.prepareOperation(opRequest, 1); + const preSign = await this.client.prepareOperation( + opRequest, + opts?.opGasPriceMultiplier ?? 1 + ); const signed = signOperation(this.nocturneSigner, preSign); await this.client.addOpToHistory(signed, { items: [] }); @@ -495,7 +493,6 @@ export class TestActor { BigInt((await this.provider.getBlock("latest")).timestamp) + ONE_DAY_SECONDS ) - .gasPrice(((await this.provider.getGasPrice()).toBigInt() * 14n) / 10n) .build(); } } diff --git a/actors/test-actor/src/cli/run.ts b/actors/test-actor/src/cli/run.ts index 7529f62c9..6ed19457e 100644 --- a/actors/test-actor/src/cli/run.ts +++ b/actors/test-actor/src/cli/run.ts @@ -53,6 +53,11 @@ export const run = new Command("run") "interval in seconds between ops in seconds. defaults to 60 (1 minute)", "60" ) + .option( + "--op-gas-price-multiplier ", + "multiplier for op gas price", + "1" + ) .option( "--full-bundle-every ", "perform 8 ops in rapid succession to fill a bundle every N iterations of the op loop" @@ -82,6 +87,7 @@ export const run = new Command("run") dbPath, depositInterval, opInterval, + opGasPriceMultiplier, fullBundleEvery, onlyDeposits, onlyOperations, @@ -195,6 +201,7 @@ export const run = new Command("run") syncIntervalSeconds: onlySyncInterval ? parseInt(onlySyncInterval) : undefined, + opGasPriceMultiplier, fullBundleEvery: fullBundleEvery ? parseInt(fullBundleEvery) : undefined, onlyDeposits, onlyOperations, diff --git a/packages/e2e-tests/src/bundler.ts b/packages/e2e-tests/src/bundler.ts index 04b937dbb..4e0cc4379 100644 --- a/packages/e2e-tests/src/bundler.ts +++ b/packages/e2e-tests/src/bundler.ts @@ -70,7 +70,13 @@ function startBundlerBatcher( redis: IORedis ): TeardownFn { const logger = makeTestLogger("bundler", "batcher"); - const batcher = new BundlerBatcher(redis, logger, config.maxLatency); + const batcher = new BundlerBatcher(redis, logger, { + pollIntervalSeconds: 1, + mediumBatchSize: 1, + slowBatchSize: 1, + mediumBatchLatencySeconds: config.maxLatency, + slowBatchLatencySeconds: config.maxLatency, + }); const { promise, teardown } = batcher.start(); promise.catch((err) => { console.error("bundler batcher error", err);