Skip to content

Commit

Permalink
impl: fetcher process
Browse files Browse the repository at this point in the history
  • Loading branch information
berzanorg committed Apr 29, 2024
1 parent 799f5bb commit 356157c
Show file tree
Hide file tree
Showing 7 changed files with 218 additions and 30 deletions.
24 changes: 17 additions & 7 deletions event-fetcher-process/src/fetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,29 @@ import { UInt32 } from "o1js"
export const fetchDepositedEvents = async (fromBlock: number, bridgeContract: BridgeContract) => {
const allEvents = await bridgeContract.fetchEvents(UInt32.from(fromBlock))

const depositedEvents = allEvents
.filter(({ event, type }) => type === "deposited")
const last_fetched_block = allEvents
.filter(({ type }) => type === "deposited")
.map((e) => Number(e.blockHeight.toBigint()))
.reduce((a, b) => (a > b ? a : b), 0)

const events = allEvents
.filter(({ type }) => type === "deposited")
.map(({ event }) => event.data as unknown as Deposit)

return depositedEvents
return { last_fetched_block, events }
}

export const fetchWithdrawnEvents = async (fromBlock: number, bridgeContract: BridgeContract) => {
const events = await bridgeContract.fetchEvents(UInt32.from(fromBlock))
const allEvents = await bridgeContract.fetchEvents(UInt32.from(fromBlock))

const last_fetched_block = allEvents
.filter(({ type }) => type === "withdrawn")
.map((e) => Number(e.blockHeight.toBigint()))
.reduce((a, b) => (a > b ? a : b), 0)

const withdrawnEvents = events
.filter(({ event, type }) => type === "withdrawn")
const events = allEvents
.filter(({ type }) => type === "withdrawn")
.map(({ event }) => event.data as unknown as Withdrawal)

return withdrawnEvents
return { last_fetched_block, events }
}
8 changes: 4 additions & 4 deletions event-fetcher-process/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,15 @@ stdin.on("data", async (chunk) => {
try {
const input = parseInput(chunk)

const events =
const { events, last_fetched_block } =
input.kind === "FetchDepositedEvents"
? await fetchDepositedEvents(input.fromBlock, bridgeContract)
: input.kind === "FetchWithdrawnEvents"
? await fetchWithdrawnEvents(input.fromBlock, bridgeContract)
: null
: { events: null, last_fetched_block: null }

if (events) {
const buffer = unparseOutput(events)
if (events && last_fetched_block) {
const buffer = unparseOutput(events, last_fetched_block)

stdout.write(buffer)
} else {
Expand Down
39 changes: 23 additions & 16 deletions event-fetcher-process/src/output.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
import { Deposit, Withdrawal } from "nacho-common-o1js"
import { Field, PublicKey, UInt64 } from "o1js"

export const unparseOutput = (output: Array<Deposit> | Array<Withdrawal>) => {
const arrayBuffer = new ArrayBuffer(output.length * 96)
export const unparseOutput = (
events: Array<Deposit> | Array<Withdrawal>,
last_fetched_block: number,
) => {
const arrayBuffer = new ArrayBuffer(8 + events.length * 95)
const buffer = new Uint8Array(arrayBuffer)

for (let i = 0; i < output.length; i++) {
const event = output[i]
uint32EncodeInto(last_fetched_block, buffer.subarray(0, 4))
uint32EncodeInto(events.length, buffer.subarray(4, 8))

eventEncodeInto(event, buffer.subarray(i * 96, (i + 1) * 96))
for (let i = 0; i < events.length; i++) {
const event = events[i]

eventEncodeInto(event, buffer.subarray(8 + i * 95, 8 + (i + 1) * 95))
}

return buffer
Expand All @@ -22,29 +28,21 @@ export const unparseError = () => {
}

const eventEncodeInto = (event: Deposit | Withdrawal, buffer: Uint8Array) => {
const tag = event instanceof Deposit ? 0 : 1

const address = event instanceof Deposit ? event.depositor : event.withdrawer

const tokenId = event.tokenId

const tokenAmount = event.tokenAmount

tagEncodeInto(tag, buffer.subarray(0, 1))

publicKeyEncodeInto(address, buffer.subarray(1, 56))
publicKeyEncodeInto(address, buffer.subarray(0, 55))

fieldEncodeInto(tokenId, buffer.subarray(56, 88))
fieldEncodeInto(tokenId, buffer.subarray(55, 87))

uint64EncodeInto(tokenAmount, buffer.subarray(88, 96))
uint64EncodeInto(tokenAmount, buffer.subarray(87, 95))

return buffer
}

const tagEncodeInto = (tag: 0 | 1, buffer: Uint8Array) => {
buffer[0] = tag
}

const publicKeyEncodeInto = (publicKey: PublicKey, buffer: Uint8Array) => {
new TextEncoder().encodeInto(publicKey.toBase58(), buffer)
}
Expand All @@ -66,3 +64,12 @@ const uint64EncodeInto = (uint64: UInt64, uint8Array: Uint8Array) => {
number >>= 8n
}
}

const uint32EncodeInto = (uint32: number, uint8Array: Uint8Array) => {
let number = BigInt(uint32)

for (let i = 0; i < 4; i++) {
uint8Array[i] = Number(number & 0xffn)
number >>= 8n
}
}
6 changes: 3 additions & 3 deletions event-fetcher-process/src/parse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { Input } from "./input"
export const parseInput = (buffer: Buffer): Input => {
const array = new Uint8Array(buffer)

if (array.length !== 33) {
if (array.length !== 5) {
return {
kind: "MistakenInput",
}
Expand All @@ -13,13 +13,13 @@ export const parseInput = (buffer: Buffer): Input => {
case 0: {
return {
kind: "FetchDepositedEvents",
fromBlock: parseUint32(buffer.subarray(1, 33)),
fromBlock: parseUint32(buffer.subarray(1, 5)),
}
}
case 1: {
return {
kind: "FetchWithdrawnEvents",
fromBlock: parseUint32(buffer.subarray(1, 33)),
fromBlock: parseUint32(buffer.subarray(1, 5)),
}
}
default: {
Expand Down
5 changes: 5 additions & 0 deletions processes/src/fetcher/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
mod process;
mod processor;

pub use process::process;
pub use processor::Processor;
164 changes: 164 additions & 0 deletions processes/src/fetcher/process.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
use std::time::Duration;

use super::Processor;
use crate::{
balances, burns, executor, liquidities, mempool, pools, proofpool, transactions, verifier,
withdrawals,
};
use nacho_data_structures::{
ByteConversion, Deposit, DepositTokensTransaction, Transaction, Withdrawal,
};
use nacho_events_db::EventsDb;
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
process::{ChildStdin, ChildStdout},
};

pub fn process(
events_db_path: &str,
event_fetcher_process_path: &str,
verifier: verifier::Processor,
executor: executor::Processor,
mempool: mempool::Processor,
transactions: transactions::Processor,
proofpool: proofpool::Processor,
balances: balances::Processor,
pools: pools::Processor,
liquidities: liquidities::Processor,
burns: burns::Processor,
withdrawals: withdrawals::Processor,
) -> Processor {
let (stdin, stdout) = nacho_js_process::spawn(&[event_fetcher_process_path]).unwrap();

let events_db_path = events_db_path.to_string();

tokio::spawn(async move {
let mut events_db = EventsDb::new(events_db_path).await.unwrap();

loop {
tokio::time::sleep(Duration::from_secs(60 * 1)).await;

let (mut from_block_deposited, mut from_block_withdrawn) =
match events_db.get_last_fetched_blocks().await {
Ok(value) => value,
Err(_) => continue,
};

match fetch_deposited_events(stdin, stdout, from_block_deposited).await {
Some((last_fetched_block, deposited_events)) => {
from_block_deposited = last_fetched_block;

let deposit_transactions = deposited_events.into_iter().map(|event| {
Transaction::DepositTokens(DepositTokensTransaction {
user_address: event.depositor,
token_id: event.token_id,
token_amount: event.token_amount,
})
});

for transaction in deposit_transactions {
if let Some(_) = transactions.add_new_tx().await {
mempool.push(transaction).await;
}
}

executor.keep_executing();
}
None => (),
};

match fetch_withdrawn_events(stdin, stdout, from_block_withdrawn).await {
Some((last_fetched_block, withdrawn_events)) => {
from_block_withdrawn = last_fetched_block;

for event in withdrawn_events {
if let Some(index) = burns
.get_index(event.withdrawer.clone(), event.token_id.clone())
.await
{
withdrawals.set(index, event).await;
}
}
}
None => (),
};

events_db
.set_last_fetched_blocks(from_block_deposited, from_block_withdrawn)
.await
.ok();
}
});

Processor {}
}

pub async fn fetch_deposited_events(
stdin: &mut ChildStdin,
stdout: &mut ChildStdout,
from_block: u32,
) -> Option<(u32, Vec<Deposit>)> {
let mut input = [0u8; 5];

input[0] = 0;
input[1..5].copy_from_slice(&from_block.to_bytes());

let mut output = [0u8; 4];

stdin.write_all(&input).await.ok()?;

stdout.read_exact(&mut output).await.ok()?;
let last_block_fetched = match u32::from_bytes(&output) {
0 => from_block,
x => x,
};

stdout.read_exact(&mut output).await.ok()?;
let events_count = u32::from_bytes(&output);

let mut events = Vec::with_capacity(events_count as usize);

for _ in 0..events_count {
let mut output = [0u8; 95];
stdout.read_exact(&mut output).await.ok()?;
let event = Deposit::from_bytes(&output);
events.push(event);
}

Some((last_block_fetched, events))
}

pub async fn fetch_withdrawn_events(
stdin: &mut ChildStdin,
stdout: &mut ChildStdout,
from_block: u32,
) -> Option<(u32, Vec<Withdrawal>)> {
let mut input = [0u8; 5];

input[0] = 1;
input[1..5].copy_from_slice(&from_block.to_bytes());

let mut output = [0u8; 4];

stdin.write_all(&input).await.ok()?;

stdout.read_exact(&mut output).await.ok()?;
let last_block_fetched = match u32::from_bytes(&output) {
0 => from_block,
x => x,
};

stdout.read_exact(&mut output).await.ok()?;
let events_count = u32::from_bytes(&output);

let mut events = Vec::with_capacity(events_count as usize);

for _ in 0..events_count {
let mut output = [0u8; 95];
stdout.read_exact(&mut output).await.ok()?;
let event = Withdrawal::from_bytes(&output);
events.push(event);
}

Some((last_block_fetched, events))
}
2 changes: 2 additions & 0 deletions processes/src/fetcher/processor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
#[derive(Clone, Copy, Debug)]
pub struct Processor {}

0 comments on commit 356157c

Please sign in to comment.