Skip to content

Commit

Permalink
indexer: sink rfc (#98)
Browse files Browse the repository at this point in the history
- upgrade sink interface
- add sink and its transaction to indexer context
- add drizzle sink for postgres
- update csv, sqlite sink
- add `useSink` hook
- update indexer examples
- update persistence and kv plugin
- update vcr and tests
  • Loading branch information
fracek authored Aug 30, 2024
2 parents 2bb0ed4 + 7ee0b35 commit a6a6e92
Show file tree
Hide file tree
Showing 29 changed files with 701 additions and 327 deletions.
2 changes: 2 additions & 0 deletions examples/indexer/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
"citty": "^0.1.6",
"consola": "^3.2.3",
"csv-stringify": "^6.5.0",
"drizzle-orm": "^0.33.0",
"postgres": "^3.4.4",
"viem": "^2.12.4"
},
"devDependencies": {
Expand Down
84 changes: 28 additions & 56 deletions examples/indexer/src/indexer.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,28 @@
import assert from "node:assert";
import { EvmStream } from "@apibara/evm";
import { defineIndexer, useIndexerContext } from "@apibara/indexer";
import { trace } from "@opentelemetry/api";
import { defineIndexer, useSink } from "@apibara/indexer";
import { drizzle as drizzleSink } from "@apibara/indexer/sinks/drizzle";
import consola from "consola";
import { pgTable, serial, text, varchar } from "drizzle-orm/pg-core";
import { drizzle } from "drizzle-orm/postgres-js";
import postgres from "postgres";
import { encodeEventTopics, parseAbi } from "viem";

const abi = parseAbi([
"event Transfer(address indexed from, address indexed to, uint256 value)",
]);

const tracer = trace.getTracer("evm-indexer-demo");
const users = pgTable("users", {
id: serial("id").primaryKey(),
firstName: text("full_name"),
phone: varchar("phone", { length: 256 }),
});

export function createIndexerConfig(streamUrl: string) {
const pgClient = postgres("your_connection_string");
const db = drizzle(pgClient);

const sink = drizzleSink({ database: db });

return defineIndexer(EvmStream)({
streamUrl,
finality: "accepted",
Expand All @@ -30,62 +41,23 @@ export function createIndexerConfig(streamUrl: string) {
},
],
},
async transform({ block: { header, logs, transactions } }) {
const ctx = useIndexerContext();
ctx.counter += 1;

if (!transactions || !header || !header.number) return [];

return tracer.startActiveSpan("parseLogs", (span) => {
const rows = logs.map((log) => {
assert(log.topics.length === 3, "Transfer event has 3 topics");

const { args } = tracer.startActiveSpan("decodeEventLog", (span) => {
// const decoded = decodeEventLog({
// abi,
// topics: log.topics as [`0x${string}`, ...`0x${string}`[]],
// data: log.data,
// eventName: "Transfer",
// });
const decoded = { args: { from: "0x0", to: "0x0", value: "0" } };
sink,
async transform({ block: { header }, context }) {
const { db } = useSink({ context });

span.end();
return decoded;
});

return {
blockNumber: Number(header.number),
blockHash: header.hash,
logIndex: Number(log.logIndex),
fromAddress: args.from,
toAddress: args.to,
value: Number(args.value),
};
});
await db.insert(users).values([
{
id: Number(header?.number),
firstName: `John Doe ${Number(header?.number)}`,
phone: "+91 1234567890",
},
]);

span.end();
return rows;
});
consola.info("Transforming block", header?.number);
},
hooks: {
async "run:before"() {},
"handler:after"({ output }) {
for (const transfer of output) {
consola.debug(
"Transfer",
transfer.blockNumber,
transfer.logIndex,
transfer.fromAddress,
transfer.toAddress,
transfer.value.toString(),
);
}
},
"sink:write"({ data }) {
consola.info("Wrote", data.length, "transfers");
},
"sink:flush"() {
consola.debug("Flushing");
"handler:after": ({ endCursor }) => {
consola.info("Transformed ", endCursor?.orderKey);
},
},
plugins: [],
Expand Down
2 changes: 2 additions & 0 deletions examples/starknet-indexer/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
"citty": "^0.1.6",
"consola": "^3.2.3",
"csv-stringify": "^6.5.0",
"drizzle-orm": "^0.33.0",
"postgres": "^3.4.4",
"sqlite": "^5.1.1",
"viem": "^2.12.4"
},
Expand Down
52 changes: 35 additions & 17 deletions examples/starknet-indexer/src/indexer.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,31 @@
import { defineIndexer } from "@apibara/indexer";
import { StarknetStream, getReceipt, getTransaction } from "@apibara/starknet";
import { defineIndexer, useSink } from "@apibara/indexer";
import { drizzle as drizzleSink } from "@apibara/indexer/sinks/drizzle";
import { StarknetStream } from "@apibara/starknet";
import consola from "consola";
import { pgTable, serial, text, varchar } from "drizzle-orm/pg-core";
import { drizzle } from "drizzle-orm/postgres-js";
import postgres from "postgres";

const users = pgTable("users", {
id: serial("id").primaryKey(),
firstName: text("full_name"),
phone: varchar("phone", { length: 256 }),
});

export function createIndexerConfig(streamUrl: string) {
const pgClient = postgres(
"postgresql://postgres.......supabase.com:6543/postgres",
{ prepare: false },
);
const db = drizzle(pgClient);

const sink = drizzleSink({ database: db });

return defineIndexer(StarknetStream)({
streamUrl,
finality: "accepted",
startingCursor: {
orderKey: 300_000n,
orderKey: 80_000n,
},
filter: {
events: [
Expand All @@ -19,21 +37,21 @@ export function createIndexerConfig(streamUrl: string) {
},
],
},
async transform({ block: { header, events, transactions, receipts } }) {
const ts = header?.timestamp!;
for (const event of events) {
// Use helpers to access transaction and receipt
const tx = getTransaction(event.transactionIndex!, transactions ?? []);
const receipt = getReceipt(event.transactionIndex!, receipts ?? []);
sink,
async transform({ block: { header }, context }) {
consola.info("Transforming block ", header?.blockNumber);

const { db } = useSink({ context });

await db.insert(users).values([
{
id: Number(header?.blockNumber),
firstName: `John Doe ${Number(header?.blockNumber)}`,
phone: "+91 1234567890",
},
]);

consola.info({
ts,
eventIndex: event.eventIndex,
actualFee: receipt?.meta?.actualFee,
txType: tx?.transaction?._tag,
});
}
return [];
consola.info("Inserted Data ", header?.blockNumber);
},
});
}
2 changes: 1 addition & 1 deletion examples/starknet-indexer/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ const command = defineCommand({
args: {
stream: {
type: "string",
default: "http://127.0.0.1:7007",
default: "http://mainnet-v2.starknet.a5a.ch:7007",
description: "Starknet stream URL",
},
authToken: {
Expand Down
14 changes: 3 additions & 11 deletions packages/cli/playground/indexers/starknet.indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,11 @@ export default function indexer(runtimeConfig: ApibaraRuntimeConfig) {
],
},
async transform({ block: { header, events } }) {
return [
{
blockNumber: header.blockNumber,
events,
},
];
consola.info("Transforming block ", header?.blockNumber);
},
hooks: {
"sink:write"({ data }) {
consola.log("Wrote:", data[0].events.length, "Tranfer events");
},
"sink:flush"({ endCursor }) {
consola.log("Flushing", endCursor.orderKey);
"handler:after": ({ endCursor }) => {
consola.info("Handler After ", endCursor?.orderKey);
},
},
});
Expand Down
9 changes: 9 additions & 0 deletions packages/indexer/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
"./plugins": "./src/plugins/index.ts",
"./sinks/sqlite": "./src/sinks/sqlite.ts",
"./sinks/csv": "./src/sinks/csv.ts",
"./sinks/drizzle": "./src/sinks/drizzle.ts",
"./testing": "./src/testing/index.ts"
},
"publishConfig": {
Expand All @@ -34,6 +35,12 @@
"require": "./dist/sinks/csv.cjs",
"default": "./dist/sinks/csv.mjs"
},
"./sinks/drizzle": {
"types": "./dist/sinks/drizzle.d.ts",
"import": "./dist/sinks/drizzle.mjs",
"require": "./dist/sinks/drizzle.cjs",
"default": "./dist/sinks/drizzle.mjs"
},
"./testing": {
"types": "./dist/testing/index.d.ts",
"import": "./dist/testing/index.mjs",
Expand All @@ -56,6 +63,7 @@
"@types/node": "^20.14.0",
"better-sqlite3": "^11.1.2",
"csv-stringify": "^6.5.0",
"drizzle-orm": "^0.33.0",
"unbuild": "^2.0.0",
"vitest": "^1.6.0"
},
Expand All @@ -71,6 +79,7 @@
"peerDependencies": {
"better-sqlite3": "^11.1.2",
"csv-stringify": "^6.5.0",
"drizzle-orm": "^0.33.0",
"vitest": "^1.6.0"
}
}
11 changes: 8 additions & 3 deletions packages/indexer/src/context.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
import { AsyncLocalStorage } from "node:async_hooks";
import { getContext } from "unctx";
import type { Sink } from "./sink";

// biome-ignore lint/suspicious/noExplicitAny: context type
export interface IndexerContext extends Record<string, any> {}
export interface IndexerContext<TTxnParams = any> extends Record<string, any> {
sink?: Sink<TTxnParams>;
sinkTransaction?: TTxnParams;
}

export const indexerAsyncContext = getContext<IndexerContext>("indexer", {
asyncContext: true,
AsyncLocalStorage,
});

export function useIndexerContext() {
return indexerAsyncContext.use();
// biome-ignore lint/suspicious/noExplicitAny: <explanation>
export function useIndexerContext<TTxnParams = any>() {
return indexerAsyncContext.use() as IndexerContext<TTxnParams>;
}
1 change: 1 addition & 0 deletions packages/indexer/src/hooks/index.ts
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
export * from "./useKVStore";
export * from "./useSink";
13 changes: 13 additions & 0 deletions packages/indexer/src/hooks/useSink.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import type { IndexerContext } from "../context";

export function useSink<TTxnParams>({
context,
}: {
context: IndexerContext<TTxnParams>;
}) {
if (!context.sinkTransaction) {
throw new Error("Transaction context doesn't exist!");
}

return context.sinkTransaction;
}
1 change: 1 addition & 0 deletions packages/indexer/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ export { useIndexerContext } from "./context";

export * from "./plugins";
export * from "./vcr";
export * from "./hooks";
Loading

0 comments on commit a6a6e92

Please sign in to comment.