From 356157cda0ab85a665c1238fa8351bf7d83b33be Mon Sep 17 00:00:00 2001 From: Berzan Date: Mon, 29 Apr 2024 23:42:43 +0000 Subject: [PATCH] impl: fetcher process --- event-fetcher-process/src/fetch.ts | 24 ++-- event-fetcher-process/src/index.ts | 8 +- event-fetcher-process/src/output.ts | 39 ++++--- event-fetcher-process/src/parse.ts | 6 +- processes/src/fetcher/mod.rs | 5 + processes/src/fetcher/process.rs | 164 ++++++++++++++++++++++++++++ processes/src/fetcher/processor.rs | 2 + 7 files changed, 218 insertions(+), 30 deletions(-) create mode 100644 processes/src/fetcher/mod.rs create mode 100644 processes/src/fetcher/process.rs create mode 100644 processes/src/fetcher/processor.rs diff --git a/event-fetcher-process/src/fetch.ts b/event-fetcher-process/src/fetch.ts index c78ad70..5461c07 100644 --- a/event-fetcher-process/src/fetch.ts +++ b/event-fetcher-process/src/fetch.ts @@ -5,19 +5,29 @@ import { UInt32 } from "o1js" export const fetchDepositedEvents = async (fromBlock: number, bridgeContract: BridgeContract) => { const allEvents = await bridgeContract.fetchEvents(UInt32.from(fromBlock)) - const depositedEvents = allEvents - .filter(({ event, type }) => type === "deposited") + const last_fetched_block = allEvents + .filter(({ type }) => type === "deposited") + .map((e) => Number(e.blockHeight.toBigint())) + .reduce((a, b) => (a > b ? a : b), 0) + + const events = allEvents + .filter(({ type }) => type === "deposited") .map(({ event }) => event.data as unknown as Deposit) - return depositedEvents + return { last_fetched_block, events } } export const fetchWithdrawnEvents = async (fromBlock: number, bridgeContract: BridgeContract) => { - const events = await bridgeContract.fetchEvents(UInt32.from(fromBlock)) + const allEvents = await bridgeContract.fetchEvents(UInt32.from(fromBlock)) + + const last_fetched_block = allEvents + .filter(({ type }) => type === "withdrawn") + .map((e) => Number(e.blockHeight.toBigint())) + .reduce((a, b) => (a > b ? a : b), 0) - const withdrawnEvents = events - .filter(({ event, type }) => type === "withdrawn") + const events = allEvents + .filter(({ type }) => type === "withdrawn") .map(({ event }) => event.data as unknown as Withdrawal) - return withdrawnEvents + return { last_fetched_block, events } } diff --git a/event-fetcher-process/src/index.ts b/event-fetcher-process/src/index.ts index 170e5f8..fbefce5 100644 --- a/event-fetcher-process/src/index.ts +++ b/event-fetcher-process/src/index.ts @@ -25,15 +25,15 @@ stdin.on("data", async (chunk) => { try { const input = parseInput(chunk) - const events = + const { events, last_fetched_block } = input.kind === "FetchDepositedEvents" ? await fetchDepositedEvents(input.fromBlock, bridgeContract) : input.kind === "FetchWithdrawnEvents" ? await fetchWithdrawnEvents(input.fromBlock, bridgeContract) - : null + : { events: null, last_fetched_block: null } - if (events) { - const buffer = unparseOutput(events) + if (events && last_fetched_block) { + const buffer = unparseOutput(events, last_fetched_block) stdout.write(buffer) } else { diff --git a/event-fetcher-process/src/output.ts b/event-fetcher-process/src/output.ts index 6f32d7f..8ecd373 100644 --- a/event-fetcher-process/src/output.ts +++ b/event-fetcher-process/src/output.ts @@ -1,14 +1,20 @@ import { Deposit, Withdrawal } from "nacho-common-o1js" import { Field, PublicKey, UInt64 } from "o1js" -export const unparseOutput = (output: Array | Array) => { - const arrayBuffer = new ArrayBuffer(output.length * 96) +export const unparseOutput = ( + events: Array | Array, + last_fetched_block: number, +) => { + const arrayBuffer = new ArrayBuffer(8 + events.length * 95) const buffer = new Uint8Array(arrayBuffer) - for (let i = 0; i < output.length; i++) { - const event = output[i] + uint32EncodeInto(last_fetched_block, buffer.subarray(0, 4)) + uint32EncodeInto(events.length, buffer.subarray(4, 8)) - eventEncodeInto(event, buffer.subarray(i * 96, (i + 1) * 96)) + for (let i = 0; i < events.length; i++) { + const event = events[i] + + eventEncodeInto(event, buffer.subarray(8 + i * 95, 8 + (i + 1) * 95)) } return buffer @@ -22,29 +28,21 @@ export const unparseError = () => { } const eventEncodeInto = (event: Deposit | Withdrawal, buffer: Uint8Array) => { - const tag = event instanceof Deposit ? 0 : 1 - const address = event instanceof Deposit ? event.depositor : event.withdrawer const tokenId = event.tokenId const tokenAmount = event.tokenAmount - tagEncodeInto(tag, buffer.subarray(0, 1)) - - publicKeyEncodeInto(address, buffer.subarray(1, 56)) + publicKeyEncodeInto(address, buffer.subarray(0, 55)) - fieldEncodeInto(tokenId, buffer.subarray(56, 88)) + fieldEncodeInto(tokenId, buffer.subarray(55, 87)) - uint64EncodeInto(tokenAmount, buffer.subarray(88, 96)) + uint64EncodeInto(tokenAmount, buffer.subarray(87, 95)) return buffer } -const tagEncodeInto = (tag: 0 | 1, buffer: Uint8Array) => { - buffer[0] = tag -} - const publicKeyEncodeInto = (publicKey: PublicKey, buffer: Uint8Array) => { new TextEncoder().encodeInto(publicKey.toBase58(), buffer) } @@ -66,3 +64,12 @@ const uint64EncodeInto = (uint64: UInt64, uint8Array: Uint8Array) => { number >>= 8n } } + +const uint32EncodeInto = (uint32: number, uint8Array: Uint8Array) => { + let number = BigInt(uint32) + + for (let i = 0; i < 4; i++) { + uint8Array[i] = Number(number & 0xffn) + number >>= 8n + } +} diff --git a/event-fetcher-process/src/parse.ts b/event-fetcher-process/src/parse.ts index e7dce36..89da6eb 100644 --- a/event-fetcher-process/src/parse.ts +++ b/event-fetcher-process/src/parse.ts @@ -3,7 +3,7 @@ import { Input } from "./input" export const parseInput = (buffer: Buffer): Input => { const array = new Uint8Array(buffer) - if (array.length !== 33) { + if (array.length !== 5) { return { kind: "MistakenInput", } @@ -13,13 +13,13 @@ export const parseInput = (buffer: Buffer): Input => { case 0: { return { kind: "FetchDepositedEvents", - fromBlock: parseUint32(buffer.subarray(1, 33)), + fromBlock: parseUint32(buffer.subarray(1, 5)), } } case 1: { return { kind: "FetchWithdrawnEvents", - fromBlock: parseUint32(buffer.subarray(1, 33)), + fromBlock: parseUint32(buffer.subarray(1, 5)), } } default: { diff --git a/processes/src/fetcher/mod.rs b/processes/src/fetcher/mod.rs new file mode 100644 index 0000000..6071557 --- /dev/null +++ b/processes/src/fetcher/mod.rs @@ -0,0 +1,5 @@ +mod process; +mod processor; + +pub use process::process; +pub use processor::Processor; diff --git a/processes/src/fetcher/process.rs b/processes/src/fetcher/process.rs new file mode 100644 index 0000000..8bafb8b --- /dev/null +++ b/processes/src/fetcher/process.rs @@ -0,0 +1,164 @@ +use std::time::Duration; + +use super::Processor; +use crate::{ + balances, burns, executor, liquidities, mempool, pools, proofpool, transactions, verifier, + withdrawals, +}; +use nacho_data_structures::{ + ByteConversion, Deposit, DepositTokensTransaction, Transaction, Withdrawal, +}; +use nacho_events_db::EventsDb; +use tokio::{ + io::{AsyncReadExt, AsyncWriteExt}, + process::{ChildStdin, ChildStdout}, +}; + +pub fn process( + events_db_path: &str, + event_fetcher_process_path: &str, + verifier: verifier::Processor, + executor: executor::Processor, + mempool: mempool::Processor, + transactions: transactions::Processor, + proofpool: proofpool::Processor, + balances: balances::Processor, + pools: pools::Processor, + liquidities: liquidities::Processor, + burns: burns::Processor, + withdrawals: withdrawals::Processor, +) -> Processor { + let (stdin, stdout) = nacho_js_process::spawn(&[event_fetcher_process_path]).unwrap(); + + let events_db_path = events_db_path.to_string(); + + tokio::spawn(async move { + let mut events_db = EventsDb::new(events_db_path).await.unwrap(); + + loop { + tokio::time::sleep(Duration::from_secs(60 * 1)).await; + + let (mut from_block_deposited, mut from_block_withdrawn) = + match events_db.get_last_fetched_blocks().await { + Ok(value) => value, + Err(_) => continue, + }; + + match fetch_deposited_events(stdin, stdout, from_block_deposited).await { + Some((last_fetched_block, deposited_events)) => { + from_block_deposited = last_fetched_block; + + let deposit_transactions = deposited_events.into_iter().map(|event| { + Transaction::DepositTokens(DepositTokensTransaction { + user_address: event.depositor, + token_id: event.token_id, + token_amount: event.token_amount, + }) + }); + + for transaction in deposit_transactions { + if let Some(_) = transactions.add_new_tx().await { + mempool.push(transaction).await; + } + } + + executor.keep_executing(); + } + None => (), + }; + + match fetch_withdrawn_events(stdin, stdout, from_block_withdrawn).await { + Some((last_fetched_block, withdrawn_events)) => { + from_block_withdrawn = last_fetched_block; + + for event in withdrawn_events { + if let Some(index) = burns + .get_index(event.withdrawer.clone(), event.token_id.clone()) + .await + { + withdrawals.set(index, event).await; + } + } + } + None => (), + }; + + events_db + .set_last_fetched_blocks(from_block_deposited, from_block_withdrawn) + .await + .ok(); + } + }); + + Processor {} +} + +pub async fn fetch_deposited_events( + stdin: &mut ChildStdin, + stdout: &mut ChildStdout, + from_block: u32, +) -> Option<(u32, Vec)> { + let mut input = [0u8; 5]; + + input[0] = 0; + input[1..5].copy_from_slice(&from_block.to_bytes()); + + let mut output = [0u8; 4]; + + stdin.write_all(&input).await.ok()?; + + stdout.read_exact(&mut output).await.ok()?; + let last_block_fetched = match u32::from_bytes(&output) { + 0 => from_block, + x => x, + }; + + stdout.read_exact(&mut output).await.ok()?; + let events_count = u32::from_bytes(&output); + + let mut events = Vec::with_capacity(events_count as usize); + + for _ in 0..events_count { + let mut output = [0u8; 95]; + stdout.read_exact(&mut output).await.ok()?; + let event = Deposit::from_bytes(&output); + events.push(event); + } + + Some((last_block_fetched, events)) +} + +pub async fn fetch_withdrawn_events( + stdin: &mut ChildStdin, + stdout: &mut ChildStdout, + from_block: u32, +) -> Option<(u32, Vec)> { + let mut input = [0u8; 5]; + + input[0] = 1; + input[1..5].copy_from_slice(&from_block.to_bytes()); + + let mut output = [0u8; 4]; + + stdin.write_all(&input).await.ok()?; + + stdout.read_exact(&mut output).await.ok()?; + let last_block_fetched = match u32::from_bytes(&output) { + 0 => from_block, + x => x, + }; + + stdout.read_exact(&mut output).await.ok()?; + let events_count = u32::from_bytes(&output); + + let mut events = Vec::with_capacity(events_count as usize); + + for _ in 0..events_count { + let mut output = [0u8; 95]; + stdout.read_exact(&mut output).await.ok()?; + let event = Withdrawal::from_bytes(&output); + events.push(event); + } + + Some((last_block_fetched, events)) +} diff --git a/processes/src/fetcher/processor.rs b/processes/src/fetcher/processor.rs new file mode 100644 index 0000000..fb3755f --- /dev/null +++ b/processes/src/fetcher/processor.rs @@ -0,0 +1,2 @@ +#[derive(Clone, Copy, Debug)] +pub struct Processor {}