From a5e0a92e7d252eb269172ceff41a991c37f43b02 Mon Sep 17 00:00:00 2001 From: jaipaljadeja Date: Tue, 13 Aug 2024 12:21:31 +0530 Subject: [PATCH] indexer: add drizzle sink --- examples/indexer/package.json | 2 + examples/indexer/src/main.ts | 23 +++++- packages/indexer/build.config.ts | 1 + packages/indexer/package.json | 9 +++ packages/indexer/src/sinks/drizzle.ts | 81 +++++++++++++++++++ pnpm-lock.yaml | 109 +++++++++++++++++++++++++- 6 files changed, 222 insertions(+), 3 deletions(-) create mode 100644 packages/indexer/src/sinks/drizzle.ts diff --git a/examples/indexer/package.json b/examples/indexer/package.json index 6a8e39e..3aa2940 100644 --- a/examples/indexer/package.json +++ b/examples/indexer/package.json @@ -23,6 +23,8 @@ "citty": "^0.1.6", "consola": "^3.2.3", "csv-stringify": "^6.5.0", + "drizzle-orm": "^0.33.0", + "postgres": "^3.4.4", "viem": "^2.12.4" }, "devDependencies": { diff --git a/examples/indexer/src/main.ts b/examples/indexer/src/main.ts index e931224..0f10d1a 100644 --- a/examples/indexer/src/main.ts +++ b/examples/indexer/src/main.ts @@ -6,6 +6,10 @@ import consola from "consola"; import { createIndexerConfig } from "./indexer"; import "./instrumentation"; +import { drizzle as drizzleSink } from "@apibara/indexer/sinks/drizzle"; +import { pgTable, serial, text, varchar } from "drizzle-orm/pg-core"; +import { drizzle } from "drizzle-orm/postgres-js"; +import postgres from "postgres"; const command = defineCommand({ meta: { @@ -33,7 +37,24 @@ const command = defineCommand({ indexer.options.streamUrl, ); - await run(client, indexer); + /** TEMPORARY EXAMPLE OF DRIZZLE SINK - WILL BE REMOVED */ + const users = pgTable("users", { + id: serial("id").primaryKey(), + fullName: text("full_name"), + phone: varchar("phone", { length: 256 }), + }); + + const pgClient = postgres("your_connection_string"); + const db = drizzle(pgClient); + + const sink = drizzleSink({ database: db, table: users }); + + // Demo of how we can infer type for the transform function + async function _transform(): typeof sink.$inferTransform { + return [{ id: 1, fullName: "John Doe", phone: "1234567890" }]; + } + + await run(client, indexer, sink); }, }); diff --git a/packages/indexer/build.config.ts b/packages/indexer/build.config.ts index 4159bf0..365b34d 100644 --- a/packages/indexer/build.config.ts +++ b/packages/indexer/build.config.ts @@ -5,6 +5,7 @@ export default defineBuildConfig({ "./src/index.ts", "./src/sinks/sqlite.ts", "./src/sinks/csv.ts", + "./src/sinks/drizzle.ts", "./src/testing/index.ts", ], clean: true, diff --git a/packages/indexer/package.json b/packages/indexer/package.json index 685d806..2afb174 100644 --- a/packages/indexer/package.json +++ b/packages/indexer/package.json @@ -9,6 +9,7 @@ "./plugins": "./src/plugins/index.ts", "./sinks/sqlite": "./src/sinks/sqlite.ts", "./sinks/csv": "./src/sinks/csv.ts", + "./sinks/drizzle": "./src/sinks/drizzle.ts", "./testing": "./src/testing/index.ts" }, "publishConfig": { @@ -34,6 +35,12 @@ "require": "./dist/sinks/csv.cjs", "default": "./dist/sinks/csv.mjs" }, + "./sinks/drizzle": { + "types": "./dist/sinks/drizzle.d.ts", + "import": "./dist/sinks/drizzle.mjs", + "require": "./dist/sinks/drizzle.cjs", + "default": "./dist/sinks/drizzle.mjs" + }, "./testing": { "types": "./dist/testing/index.d.ts", "import": "./dist/testing/index.mjs", @@ -56,6 +63,7 @@ "@types/node": "^20.14.0", "better-sqlite3": "^11.1.2", "csv-stringify": "^6.5.0", + "drizzle-orm": "^0.33.0", "unbuild": "^2.0.0", "vitest": "^1.6.0" }, @@ -71,6 +79,7 @@ "peerDependencies": { "better-sqlite3": "^11.1.2", "csv-stringify": "^6.5.0", + "drizzle-orm": "^0.33.0", "vitest": "^1.6.0" } } diff --git a/packages/indexer/src/sinks/drizzle.ts b/packages/indexer/src/sinks/drizzle.ts new file mode 100644 index 0000000..e56531f --- /dev/null +++ b/packages/indexer/src/sinks/drizzle.ts @@ -0,0 +1,81 @@ +import type { + ExtractTablesWithRelations, + TablesRelationalConfig, +} from "drizzle-orm"; +import type { + PgDatabase, + PgInsertValue, + PgQueryResultHKT, + PgTable, + PgTableWithColumns, + TableConfig, +} from "drizzle-orm/pg-core"; +import { Sink, type SinkWriteArgs } from "../sink"; + +export type DrizzleSinkOptions< + TTableConfig extends TableConfig, + TQueryResult extends PgQueryResultHKT, + TFullSchema extends Record = Record, + TSchema extends + TablesRelationalConfig = ExtractTablesWithRelations, +> = { + /** + * Database instance of drizzle-orm + */ + database: PgDatabase; + /** + * The table where data will be inserted. + */ + table: PgTableWithColumns; +}; + +export class DrizzleSink< + TTableConfig extends TableConfig, + TQueryResult extends PgQueryResultHKT, + TFullSchema extends Record = Record, + TSchema extends + TablesRelationalConfig = ExtractTablesWithRelations, +> extends Sink { + private _table: PgTableWithColumns; + private _db: PgDatabase; + + public $inferTransform: Promise>[]> = + Promise.resolve([]); + + constructor( + options: DrizzleSinkOptions< + TTableConfig, + TQueryResult, + TFullSchema, + TSchema + >, + ) { + super(); + const { database, table } = options; + this._table = table; + this._db = database; + } + + async write({ data, endCursor, finality }: SinkWriteArgs) { + await this.callHook("write", { data }); + + await this._db + .insert(this._table) + // biome-ignore lint/suspicious/noExplicitAny: + .values(data as any[]); + + await this.callHook("flush", { endCursor, finality }); + } +} + +export const drizzle = < + TTableConfig extends TableConfig, + TQueryResult extends PgQueryResultHKT, + TFullSchema extends Record = Record, + TSchema extends + TablesRelationalConfig = ExtractTablesWithRelations, +>( + args: DrizzleSinkOptions, +) => { + return new DrizzleSink(args); +}; diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 95a607a..7becc61 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -118,6 +118,12 @@ importers: csv-stringify: specifier: ^6.5.0 version: 6.5.0 + drizzle-orm: + specifier: ^0.33.0 + version: 0.33.0(@opentelemetry/api@1.9.0)(@types/better-sqlite3@7.6.11)(better-sqlite3@11.1.2)(postgres@3.4.4) + postgres: + specifier: ^3.4.4 + version: 3.4.4 viem: specifier: ^2.12.4 version: 2.13.8(typescript@5.4.5) @@ -422,6 +428,9 @@ importers: csv-stringify: specifier: ^6.5.0 version: 6.5.0 + drizzle-orm: + specifier: ^0.33.0 + version: 0.33.0(@opentelemetry/api@1.9.0)(@types/better-sqlite3@7.6.11)(better-sqlite3@11.1.2)(postgres@3.4.4) unbuild: specifier: ^2.0.0 version: 2.0.0(typescript@5.4.5) @@ -1937,7 +1946,6 @@ packages: /@opentelemetry/api@1.9.0: resolution: {integrity: sha512-3giAOQvZiH5F9bMlMiv8+GSPMeqg0dbaeo58/0SlA9sxSqZhnUtxzX9/2FzyhS9sWQf5S0GJE0AKBrFqjpeYcg==} engines: {node: '>=8.0.0'} - dev: false /@opentelemetry/context-async-hooks@1.25.0(@opentelemetry/api@1.9.0): resolution: {integrity: sha512-sBW313mnMyFg0cp/40BRzrZBWG+581s2j5gIsa5fgGadswyILk4mNFATsqrCOpAx945RDuZ2B7ThQLgor9OpfA==} @@ -2564,7 +2572,6 @@ packages: resolution: {integrity: sha512-i8KcD3PgGtGBLl3+mMYA8PdKkButvPyARxA7IQAd6qeslht13qxb1zzO8dRCtE7U3IoJS782zDBAeoKiM695kg==} dependencies: '@types/node': 20.14.0 - dev: true /@types/estree@1.0.5: resolution: {integrity: sha512-/kYRxGDLWzHOB7q+wtSUQlFrtcdUccpfy+X+9iMBpHK8QLLhx2wIPYuS5DYtR9Wa/YlZAbIovy7qVdB1Aq6Lyw==} @@ -3449,6 +3456,100 @@ packages: engines: {node: '>=10'} dev: true + /drizzle-orm@0.33.0(@opentelemetry/api@1.9.0)(@types/better-sqlite3@7.6.11)(better-sqlite3@11.1.2)(postgres@3.4.4): + resolution: {integrity: sha512-SHy72R2Rdkz0LEq0PSG/IdvnT3nGiWuRk+2tXZQ90GVq/XQhpCzu/EFT3V2rox+w8MlkBQxifF8pCStNYnERfA==} + peerDependencies: + '@aws-sdk/client-rds-data': '>=3' + '@cloudflare/workers-types': '>=3' + '@electric-sql/pglite': '>=0.1.1' + '@libsql/client': '*' + '@neondatabase/serverless': '>=0.1' + '@op-engineering/op-sqlite': '>=2' + '@opentelemetry/api': ^1.4.1 + '@planetscale/database': '>=1' + '@prisma/client': '*' + '@tidbcloud/serverless': '*' + '@types/better-sqlite3': '*' + '@types/pg': '*' + '@types/react': '>=18' + '@types/sql.js': '*' + '@vercel/postgres': '>=0.8.0' + '@xata.io/client': '*' + better-sqlite3: '>=7' + bun-types: '*' + expo-sqlite: '>=13.2.0' + knex: '*' + kysely: '*' + mysql2: '>=2' + pg: '>=8' + postgres: '>=3' + prisma: '*' + react: '>=18' + sql.js: '>=1' + sqlite3: '>=5' + peerDependenciesMeta: + '@aws-sdk/client-rds-data': + optional: true + '@cloudflare/workers-types': + optional: true + '@electric-sql/pglite': + optional: true + '@libsql/client': + optional: true + '@neondatabase/serverless': + optional: true + '@op-engineering/op-sqlite': + optional: true + '@opentelemetry/api': + optional: true + '@planetscale/database': + optional: true + '@prisma/client': + optional: true + '@tidbcloud/serverless': + optional: true + '@types/better-sqlite3': + optional: true + '@types/pg': + optional: true + '@types/react': + optional: true + '@types/sql.js': + optional: true + '@vercel/postgres': + optional: true + '@xata.io/client': + optional: true + better-sqlite3: + optional: true + bun-types: + optional: true + expo-sqlite: + optional: true + knex: + optional: true + kysely: + optional: true + mysql2: + optional: true + pg: + optional: true + postgres: + optional: true + prisma: + optional: true + react: + optional: true + sql.js: + optional: true + sqlite3: + optional: true + dependencies: + '@opentelemetry/api': 1.9.0 + '@types/better-sqlite3': 7.6.11 + better-sqlite3: 11.1.2 + postgres: 3.4.4 + /eastasianwidth@0.2.0: resolution: {integrity: sha512-I88TYZWc9XiYHRQ4/3c5rjjfgkjhLyW2luGIheGERbNQ6OY7yTybanSpDXZa8y7VUP9YmDcYa+eyq4ca7iLqWA==} dev: false @@ -5306,6 +5407,10 @@ packages: source-map-js: 1.2.0 dev: true + /postgres@3.4.4: + resolution: {integrity: sha512-IbyN+9KslkqcXa8AO9fxpk97PA4pzewvpi2B3Dwy9u4zpV32QicaEdgmF3eSQUzdRk7ttDHQejNgAEr4XoeH4A==} + engines: {node: '>=12'} + /prebuild-install@7.1.2: resolution: {integrity: sha512-UnNke3IQb6sgarcZIDU3gbMeTp/9SSU1DAIkil7PrqG1vZlBtY5msYccSKSHDqa3hNg436IXK+SNImReuA1wEQ==} engines: {node: '>=10'}