Skip to content

Commit

Permalink
add mongo sink and some fixes for drizzle sink (#127)
Browse files Browse the repository at this point in the history
  • Loading branch information
fracek authored Dec 20, 2024
2 parents c294f4d + 3b81140 commit 5d3b8d0
Show file tree
Hide file tree
Showing 25 changed files with 967 additions and 19 deletions.
6 changes: 6 additions & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ jobs:
uses: arduino/setup-protoc@v1
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
- name: Start MongoDB
uses: supercharge/[email protected]
with:
mongodb-version: '7.0'
mongodb-replica-set: rs0
mongodb-port: 27017
- name: Install dependencies
run: pnpm install --strict-peer-dependencies=false
- name: Run lint
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"type": "prerelease",
"comment": "indexer: rename drizzle sink factory and fix update method",
"packageName": "@apibara/indexer",
"email": "[email protected]",
"dependentChangeType": "patch"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"type": "prerelease",
"comment": "sink-mongo: add mongodb sink",
"packageName": "@apibara/sink-mongo",
"email": "[email protected]",
"dependentChangeType": "patch"
}
2 changes: 1 addition & 1 deletion examples/cli/indexers/1-evm.indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { EvmStream } from "@apibara/evm";
import { defineIndexer, useSink } from "@apibara/indexer";
import { drizzlePersistence } from "@apibara/indexer/plugins/drizzle-persistence";
import { useLogger } from "@apibara/indexer/plugins/logger";
import { drizzle as drizzleSink } from "@apibara/indexer/sinks/drizzle";
import { drizzleSink } from "@apibara/indexer/sinks/drizzle";

import type { ApibaraRuntimeConfig } from "apibara/types";
import type {
Expand Down
2 changes: 1 addition & 1 deletion examples/cli/indexers/2-starknet.indexer.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { defineIndexer, useSink } from "@apibara/indexer";
import { drizzlePersistence } from "@apibara/indexer/plugins/drizzle-persistence";
import { useLogger } from "@apibara/indexer/plugins/logger";
import { drizzle as drizzleSink } from "@apibara/indexer/sinks/drizzle";
import { drizzleSink } from "@apibara/indexer/sinks/drizzle";
import { StarknetStream } from "@apibara/starknet";

import type { ApibaraRuntimeConfig } from "apibara/types";
Expand Down
2 changes: 1 addition & 1 deletion examples/indexer/src/indexer.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { EvmStream } from "@apibara/evm";
import { defineIndexer, useSink } from "@apibara/indexer";
import { drizzle as drizzleSink } from "@apibara/indexer/sinks/drizzle";
import { drizzleSink } from "@apibara/indexer/sinks/drizzle";
import consola from "consola";
import { pgTable, serial, text, varchar } from "drizzle-orm/pg-core";
import { drizzle } from "drizzle-orm/postgres-js";
Expand Down
5 changes: 1 addition & 4 deletions examples/starknet-indexer/src/indexer.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
import { defineIndexer, useSink } from "@apibara/indexer";
import {
drizzle as drizzleSink,
pgIndexerTable,
} from "@apibara/indexer/sinks/drizzle";
import { drizzleSink, pgIndexerTable } from "@apibara/indexer/sinks/drizzle";
import { StarknetStream } from "@apibara/starknet";
import consola from "consola";
import { bigint } from "drizzle-orm/pg-core";
Expand Down
1 change: 1 addition & 0 deletions packages/indexer/build.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ export default defineBuildConfig({
"./src/plugins/logger.ts",
"./src/plugins/persistence.ts",
"./src/plugins/drizzle-persistence.ts",
"./src/internal/testing.ts",
],
clean: true,
outDir: "./dist",
Expand Down
10 changes: 10 additions & 0 deletions packages/indexer/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
version: "3.8"

services:
postgres:
image: postgres:16
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
ports:
- 5432:5432
6 changes: 6 additions & 0 deletions packages/indexer/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,12 @@
"import": "./dist/plugins/drizzle-persistence.mjs",
"require": "./dist/plugins/drizzle-persistence.cjs",
"default": "./dist/plugins/drizzle-persistence.mjs"
},
"./internal": {
"types": "./dist/internal/testing.d.ts",
"import": "./dist/internal/testing.mjs",
"require": "./dist/internal/testing.cjs",
"default": "./dist/internal/testing.mjs"
}
},
"scripts": {
Expand Down
13 changes: 8 additions & 5 deletions packages/indexer/src/sinks/drizzle/drizzle.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import { run } from "../../indexer";
import { generateMockMessages, getMockIndexer } from "../../internal/testing";
import { useSink } from "../../sink";
import type { Int8Range } from "./Int8Range";
import { drizzle as drizzleSink } from "./drizzle";
import { drizzleSink } from "./drizzle";
import { getDrizzleCursor, pgIndexerTable } from "./utils";

const testTable = pgIndexerTable("test_table", {
Expand All @@ -35,7 +35,7 @@ describe("Drizzle Test", () => {
await db.execute(sql`DROP TABLE IF EXISTS test_table`);
// create test_table with db
await db.execute(
sql`CREATE TABLE test_table (id SERIAL PRIMARY KEY, data TEXT, _cursor INT8RANGE)`,
sql`CREATE TABLE test_table (id SERIAL, data TEXT, _cursor INT8RANGE)`,
);
});

Expand Down Expand Up @@ -108,11 +108,14 @@ describe("Drizzle Test", () => {

const result = await db.select().from(testTable).orderBy(asc(testTable.id));

expect(result).toHaveLength(5);
expect(result[2].data).toBe("0000000");
expect(result).toHaveLength(6);
expect(
result.find((r) => r.id === 5000002 && r._cursor?.range.upper === null)
?.data,
).toBe("0000000");
});

it("should delete data", async () => {
it("should soft delete data", async () => {
const client = new MockClient<MockFilter, MockBlock>((request, options) => {
return generateMockMessages(5);
});
Expand Down
2 changes: 1 addition & 1 deletion packages/indexer/src/sinks/drizzle/drizzle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ export class DrizzleSink<
}
}

export const drizzle = <
export const drizzleSink = <
TQueryResult extends PgQueryResultHKT,
TFullSchema extends Record<string, unknown> = Record<string, never>,
TSchema extends
Expand Down
35 changes: 29 additions & 6 deletions packages/indexer/src/sinks/drizzle/update.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import type {
PgUpdateBase,
PgUpdateSetSource,
} from "drizzle-orm/pg-core";
import type { Int8Range } from "./Int8Range";
import { getDrizzleCursor } from "./utils";

export class DrizzleSinkUpdate<
TTable extends PgTable,
Expand All @@ -32,15 +34,36 @@ export class DrizzleSinkUpdate<
return {
...originalSet,
where: async (where: SQL | undefined) => {
await this.db
.update(this.table)
.set({
_cursor: sql`int8range(lower(_cursor), ${Number(this.endCursor?.orderKey!)}, '[)')`,
} as PgUpdateSetSource<TTable>)
// 1. Find and store old versions of matching records
const oldRecords = await this.db
.select()
.from(this.table)
.where(sql`${where ? sql`${where} AND ` : sql``}upper_inf(_cursor)`)
.execute();

return originalSet.where(where);
// 2. Insert old versions with updated upperbound cursor
if (oldRecords.length > 0) {
const oldRecordsWithNewCursor = oldRecords.map((record) => ({
...record,
_cursor: getDrizzleCursor([
BigInt((record._cursor as Int8Range).range.lower!),
this.endCursor?.orderKey,
]),
}));

await this.db
.insert(this.table)
.values(oldRecordsWithNewCursor)
.execute();
}

// 3. Update matching records with new values and new 'lowerbound' cursor
return originalUpdate
.set({
...values,
_cursor: sql`int8range(${Number(this.endCursor?.orderKey!)}, NULL, '[)')`,
} as PgUpdateSetSource<TTable>)
.where(sql`${where ? sql`${where} AND ` : sql``}upper_inf(_cursor)`);
},
} as PgUpdateBase<TTable, TQueryResult>;
}
Expand Down
7 changes: 7 additions & 0 deletions packages/sink-mongo/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# `@apibara/sink-mongo`

TODO

## Installation

TODO
11 changes: 11 additions & 0 deletions packages/sink-mongo/build.config.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import { defineBuildConfig } from "unbuild";

export default defineBuildConfig({
entries: ["./src/index.ts"],
clean: true,
outDir: "./dist",
declaration: true,
rollup: {
emitCJS: true,
},
});
26 changes: 26 additions & 0 deletions packages/sink-mongo/docker-compose.orbstack.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Reference: https://medium.com/workleap/the-only-local-mongodb-replica-set-with-docker-compose-guide-youll-ever-need-2f0b74dd8384

version: "3.8"

services:
mongo1:
image: mongo:7.0
command: ["--replSet", "rs0", "--bind_ip_all", "--port", "27017"]
ports:
- 27017:27017
extra_hosts:
- "localhost:host-gateway"
healthcheck:
test: echo "try { rs.status() } catch (err) { rs.initiate({_id:'rs0',members:[{_id:0,host:'localhost:27017'}]}) }" | mongosh --port 27017 --quiet
interval: 5s
timeout: 30s
start_period: 0s
start_interval: 1s
retries: 30
volumes:
- "mongo1_data:/data/db"
- "mongo1_config:/data/configdb"

volumes:
mongo1_data:
mongo1_config:
26 changes: 26 additions & 0 deletions packages/sink-mongo/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Reference: https://medium.com/workleap/the-only-local-mongodb-replica-set-with-docker-compose-guide-youll-ever-need-2f0b74dd8384

version: "3.8"

services:
mongo1:
image: mongo:7.0
command: ["--replSet", "rs0", "--bind_ip_all", "--port", "27017"]
ports:
- 27017:27017
extra_hosts:
- "host.docker.internal:host-gateway"
healthcheck:
test: echo "try { rs.status() } catch (err) { rs.initiate({_id:'rs0',members:[{_id:0,host:'host.docker.internal:27017'}]}) }" | mongosh --port 27017 --quiet
interval: 5s
timeout: 30s
start_period: 0s
start_interval: 1s
retries: 30
volumes:
- "mongo1_data:/data/db"
- "mongo1_config:/data/configdb"

volumes:
mongo1_data:
mongo1_config:
41 changes: 41 additions & 0 deletions packages/sink-mongo/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
{
"name": "@apibara/sink-mongo",
"version": "2.0.0-beta.26",
"type": "module",
"files": [
"dist",
"src",
"README.md"
],
"main": "./dist/index.mjs",
"types": "./dist/index.d.ts",
"exports": {
".": {
"types": "./dist/index.d.ts",
"import": "./dist/index.mjs",
"require": "./dist/index.cjs",
"default": "./dist/index.mjs"
}
},
"scripts": {
"build": "unbuild",
"typecheck": "tsc --noEmit",
"lint": "biome check .",
"lint:fix": "pnpm lint --write",
"test": "vitest",
"test:ci": "vitest run"
},
"devDependencies": {
"@types/node": "^20.14.0",
"mongodb": "^6.12.0",
"unbuild": "^2.0.0",
"vitest": "^1.6.0"
},
"peerDependencies": {
"mongodb": "^6.12.0"
},
"dependencies": {
"@apibara/indexer": "workspace:*",
"@apibara/protocol": "workspace:*"
}
}
Loading

0 comments on commit 5d3b8d0

Please sign in to comment.