From 17f68e6f95571c76307343d367da0ce6e88657d9 Mon Sep 17 00:00:00 2001 From: Thomas Bonnin <233326+TBonnin@users.noreply.github.com> Date: Thu, 5 Dec 2024 09:36:43 -0500 Subject: [PATCH] feat: introducing the fleet package (#3105) The fleet package will be used to managed the lifecycle of runners This commit includes: - migration for deployments and nodes tables - deployments and nodes models (with tests) This commit DOESN'T include (I though that would be enough for a first PR): - the control loop to handle nodes state transition - the routing logic # How to test The code is not used so it is not possible to run it manually. There is tests though :) --- Dockerfile | 1 + package-lock.json | 15 ++ packages/fleet/lib/db/client.ts | 65 +++++ packages/fleet/lib/db/helpers.test.ts | 9 + .../20241102115903_initial_fleet_models.ts | 58 +++++ packages/fleet/lib/fleet.integration.test.ts | 29 +++ packages/fleet/lib/fleet.ts | 19 ++ packages/fleet/lib/index.ts | 2 + .../models/deployments.integration.test.ts | 43 ++++ packages/fleet/lib/models/deployments.ts | 82 ++++++ packages/fleet/lib/models/helpers.test.ts | 17 ++ .../lib/models/nodes.integration.test.ts | 161 ++++++++++++ packages/fleet/lib/models/nodes.ts | 234 ++++++++++++++++++ packages/fleet/lib/tracer.ts | 11 + packages/fleet/lib/types.ts | 28 +++ packages/fleet/lib/utils/errors.ts | 23 ++ packages/fleet/lib/utils/logger.ts | 3 + packages/fleet/package.json | 27 ++ packages/fleet/tsconfig.json | 9 + packages/types/lib/utils.ts | 2 + tsconfig.build.json | 3 + 21 files changed, 841 insertions(+) create mode 100644 packages/fleet/lib/db/client.ts create mode 100644 packages/fleet/lib/db/helpers.test.ts create mode 100644 packages/fleet/lib/db/migrations/20241102115903_initial_fleet_models.ts create mode 100644 packages/fleet/lib/fleet.integration.test.ts create mode 100644 packages/fleet/lib/fleet.ts create mode 100644 packages/fleet/lib/index.ts create mode 100644 packages/fleet/lib/models/deployments.integration.test.ts create mode 100644 packages/fleet/lib/models/deployments.ts create mode 100644 packages/fleet/lib/models/helpers.test.ts create mode 100644 packages/fleet/lib/models/nodes.integration.test.ts create mode 100644 packages/fleet/lib/models/nodes.ts create mode 100644 packages/fleet/lib/tracer.ts create mode 100644 packages/fleet/lib/types.ts create mode 100644 packages/fleet/lib/utils/errors.ts create mode 100644 packages/fleet/lib/utils/logger.ts create mode 100644 packages/fleet/package.json create mode 100644 packages/fleet/tsconfig.json diff --git a/Dockerfile b/Dockerfile index 4c9ba4edc1..5ecf456f9b 100644 --- a/Dockerfile +++ b/Dockerfile @@ -36,6 +36,7 @@ COPY packages/connect-ui/package.json ./packages/connect-ui/package.json COPY packages/utils/package.json ./packages/utils/package.json COPY packages/webapp/package.json ./packages/webapp/package.json COPY packages/webhooks/package.json ./packages/webhooks/package.json +COPY packages/fleet/package.json ./packages/fleet/package.json COPY package*.json ./ # Install every dependencies diff --git a/package-lock.json b/package-lock.json index 1731fa9978..36b79bed02 100644 --- a/package-lock.json +++ b/package-lock.json @@ -5789,6 +5789,10 @@ "resolved": "packages/database", "link": true }, + "node_modules/@nangohq/fleet": { + "resolved": "packages/fleet", + "link": true + }, "node_modules/@nangohq/frontend": { "resolved": "packages/frontend", "link": true @@ -35216,6 +35220,17 @@ "vitest": "1.6.0" } }, + "packages/fleet": { + "version": "1.0.0", + "dependencies": { + "@nangohq/utils": "file:../utils", + "dd-trace": "5.21.0", + "knex": "3.1.0" + }, + "devDependencies": { + "vitest": "1.6.0" + } + }, "packages/frontend": { "name": "@nangohq/frontend", "version": "0.46.0", diff --git a/packages/fleet/lib/db/client.ts b/packages/fleet/lib/db/client.ts new file mode 100644 index 0000000000..550a1b89e5 --- /dev/null +++ b/packages/fleet/lib/db/client.ts @@ -0,0 +1,65 @@ +import path from 'node:path'; +import knex from 'knex'; +import { fileURLToPath } from 'node:url'; +import { logger } from '../utils/logger.js'; +import { isTest } from '@nangohq/utils'; + +const runningMigrationOnly = process.argv.some((v) => v === 'migrate:latest'); +const isJS = !runningMigrationOnly; + +export class DatabaseClient { + public db: knex.Knex; + public schema: string; + public url: string; + private config: knex.Knex.Config; + + constructor({ url, schema, poolMax = 50 }: { url: string; schema: string; poolMax?: number }) { + this.url = url; + this.schema = schema; + this.config = { + client: 'postgres', + connection: { + connectionString: url, + statement_timeout: 60000 + }, + searchPath: schema, + pool: { min: 2, max: poolMax }, + migrations: { + extension: isJS ? 'js' : 'ts', + directory: 'migrations', + tableName: 'migrations', + loadExtensions: [isJS ? '.js' : '.ts'], + schemaName: schema + } + }; + this.db = knex(this.config); + } + + async migrate(): Promise { + logger.info(`[fleet - ${this.schema}] migration`); + + const filename = fileURLToPath(import.meta.url); + const dirname = path.dirname(path.join(filename, '../../')); + const dir = path.join(dirname, 'dist/db/migrations'); + await this.db.raw(`CREATE SCHEMA IF NOT EXISTS ${this.schema}`); + + const [, pendingMigrations] = (await this.db.migrate.list({ ...this.config.migrations, directory: dir })) as [unknown, string[]]; + + if (pendingMigrations.length === 0) { + logger.info(`[fleet - ${this.schema}] nothing to do`); + return; + } + + await this.db.migrate.latest({ ...this.config.migrations, directory: dir }); + logger.info(`[fleet - ${this.schema}] migrations completed.`); + } + + /*********************************/ + /* WARNING: to use only in tests */ + /*********************************/ + async clearDatabase(): Promise { + if (isTest) { + await this.db.raw(`DROP SCHEMA IF EXISTS ${this.schema} CASCADE`); + } + } +} diff --git a/packages/fleet/lib/db/helpers.test.ts b/packages/fleet/lib/db/helpers.test.ts new file mode 100644 index 0000000000..1918c61d2c --- /dev/null +++ b/packages/fleet/lib/db/helpers.test.ts @@ -0,0 +1,9 @@ +import { DatabaseClient } from './client.js'; + +export const testDbUrl = `postgres://${process.env['NANGO_DB_USER']}:${process.env['NANGO_DB_PASSWORD']}@${process.env['NANGO_DB_HOST']}:${process.env['NANGO_DB_PORT']}`; + +export const getTestDbClient = (schema: string) => + new DatabaseClient({ + url: testDbUrl, + schema + }); diff --git a/packages/fleet/lib/db/migrations/20241102115903_initial_fleet_models.ts b/packages/fleet/lib/db/migrations/20241102115903_initial_fleet_models.ts new file mode 100644 index 0000000000..cba40b9d31 --- /dev/null +++ b/packages/fleet/lib/db/migrations/20241102115903_initial_fleet_models.ts @@ -0,0 +1,58 @@ +import type { Knex } from 'knex'; +import { NODES_TABLE } from '../../models/nodes.js'; +import { DEPLOYMENTS_TABLE } from '../../models/deployments.js'; + +export async function up(knex: Knex): Promise { + await knex.transaction(async (trx) => { + await trx.raw(` + CREATE TABLE ${DEPLOYMENTS_TABLE} ( + id SERIAL PRIMARY KEY, + commit_id char(40) NOT NULL, + created_at timestamp with time zone NOT NULL DEFAULT CURRENT_TIMESTAMP, + superseded_at timestamp with time zone + ); + `); + await trx.raw(` + CREATE INDEX idx_${DEPLOYMENTS_TABLE}_active ON ${DEPLOYMENTS_TABLE}(superseded_at) WHERE superseded_at IS NULL; + `); + await trx.raw(` + CREATE TYPE node_states AS ENUM ( + 'PENDING', + 'STARTING', + 'RUNNING', + 'OUTDATED', + 'FINISHING', + 'IDLE', + 'TERMINATED', + 'ERROR' + ); + `); + await trx.raw(` + CREATE TABLE ${NODES_TABLE} ( + id SERIAL PRIMARY KEY, + routing_id varchar(255) NOT NULL, + deployment_id int NOT NULL REFERENCES deployments(id) ON DELETE CASCADE, + url varchar(1024), + state node_states NOT NULL, + image varchar(255) NOT NULL, + cpu_milli int NOT NULL, + memory_mb int NOT NULL, + storage_mb int NOT NULL, + error text, + created_at timestamp with time zone NOT NULL DEFAULT CURRENT_TIMESTAMP, + last_state_transition_at timestamp with time zone NOT NULL DEFAULT CURRENT_TIMESTAMP + ); + `); + await trx.raw(` + CREATE INDEX idx_${NODES_TABLE}_routingId_state + ON ${NODES_TABLE}(routing_id, state) + WHERE state IN ('PENDING', 'STARTING', 'RUNNING', 'OUTDATED'); + `); + }); +} + +export async function down(knex: Knex): Promise { + await knex.raw(`DROP TABLE IF EXISTS ${DEPLOYMENTS_TABLE}`); + await knex.raw(`DROP TABLE IF EXISTS ${NODES_TABLE}`); + await knex.raw(`DROP TYPE IF EXISTS node_states`); +} diff --git a/packages/fleet/lib/fleet.integration.test.ts b/packages/fleet/lib/fleet.integration.test.ts new file mode 100644 index 0000000000..fcdac4ef8e --- /dev/null +++ b/packages/fleet/lib/fleet.integration.test.ts @@ -0,0 +1,29 @@ +import { expect, describe, it, beforeAll, afterAll, afterEach } from 'vitest'; +import { Fleet } from './fleet.js'; +import { getTestDbClient, testDbUrl } from './db/helpers.test.js'; +import { generateCommitHash } from './models/helpers.test.js'; + +describe('fleet', () => { + const fleetId = 'my_fleet'; + const fleet = new Fleet({ fleetId, dbUrl: testDbUrl }); + + beforeAll(async () => { + await fleet.migrate(); + }); + + afterEach(() => {}); + + afterAll(async () => { + await getTestDbClient(fleetId).clearDatabase(); + }); + + describe('deploy', () => { + it('should create a new deployment', async () => { + const commitId = generateCommitHash(); + const deployment = (await fleet.deploy(commitId)).unwrap(); + expect(deployment.commitId).toBe(commitId); + expect(deployment.createdAt).toBeInstanceOf(Date); + expect(deployment.supersededAt).toBe(null); + }); + }); +}); diff --git a/packages/fleet/lib/fleet.ts b/packages/fleet/lib/fleet.ts new file mode 100644 index 0000000000..9f377dc757 --- /dev/null +++ b/packages/fleet/lib/fleet.ts @@ -0,0 +1,19 @@ +import type { Result } from '@nangohq/utils'; +import { DatabaseClient } from './db/client.js'; +import * as deployments from './models/deployments.js'; +import type { CommitHash, Deployment } from './types.js'; + +export class Fleet { + private dbClient: DatabaseClient; + constructor({ fleetId, dbUrl }: { fleetId: string; dbUrl: string }) { + this.dbClient = new DatabaseClient({ url: dbUrl, schema: fleetId }); + } + + public async migrate(): Promise { + await this.dbClient.migrate(); + } + + public async deploy(commitId: CommitHash): Promise> { + return deployments.create(this.dbClient.db, commitId); + } +} diff --git a/packages/fleet/lib/index.ts b/packages/fleet/lib/index.ts new file mode 100644 index 0000000000..1680bf7845 --- /dev/null +++ b/packages/fleet/lib/index.ts @@ -0,0 +1,2 @@ +export * from './fleet.js'; +export * from './types.js'; diff --git a/packages/fleet/lib/models/deployments.integration.test.ts b/packages/fleet/lib/models/deployments.integration.test.ts new file mode 100644 index 0000000000..e5228c5483 --- /dev/null +++ b/packages/fleet/lib/models/deployments.integration.test.ts @@ -0,0 +1,43 @@ +import { expect, describe, it, beforeEach, afterEach } from 'vitest'; +import * as deployments from './deployments.js'; +import { getTestDbClient } from '../db/helpers.test.js'; +import { generateCommitHash } from './helpers.test.js'; + +describe('Deployments', () => { + const dbClient = getTestDbClient('deployments'); + const db = dbClient.db; + beforeEach(async () => { + await dbClient.migrate(); + }); + + afterEach(async () => { + await dbClient.clearDatabase(); + }); + + describe('create', () => { + it('should create a deployment', async () => { + const commitId = generateCommitHash(); + const deployment = (await deployments.create(db, commitId)).unwrap(); + expect(deployment.commitId).toBe(commitId); + expect(deployment.createdAt).toBeInstanceOf(Date); + expect(deployment.supersededAt).toBe(null); + }); + it('should supersede any active deployments', async () => { + const commitId1 = generateCommitHash(); + const commitId2 = generateCommitHash(); + + const deployment1 = (await deployments.create(db, commitId1)).unwrap(); + const deployment2 = (await deployments.create(db, commitId2)).unwrap(); + + expect((await deployments.get(db, deployment1.id)).unwrap().supersededAt).not.toBe(null); + expect((await deployments.get(db, deployment2.id)).unwrap().supersededAt).toBe(null); + }); + }); + + describe('getActive', () => { + it('should return undefined if no deployments yet', async () => { + const active = (await deployments.getActive(db)).unwrap(); + expect(active).toBe(undefined); + }); + }); +}); diff --git a/packages/fleet/lib/models/deployments.ts b/packages/fleet/lib/models/deployments.ts new file mode 100644 index 0000000000..872b2bab17 --- /dev/null +++ b/packages/fleet/lib/models/deployments.ts @@ -0,0 +1,82 @@ +import type knex from 'knex'; +import type { Result } from '@nangohq/utils'; +import { Err, Ok } from '@nangohq/utils'; +import type { CommitHash, Deployment } from '../types.js'; +import { FleetError } from '../utils/errors.js'; + +export const DEPLOYMENTS_TABLE = 'deployments'; + +export interface DBDeployment { + readonly id: number; + readonly commit_id: CommitHash; + readonly created_at: Date; + readonly superseded_at: Date | null; +} + +const DBDeployment = { + to(dbDeployment: DBDeployment): Deployment { + return { + id: dbDeployment.id, + commitId: dbDeployment.commit_id, + createdAt: dbDeployment.created_at, + supersededAt: dbDeployment.superseded_at + }; + }, + from(deployment: Deployment): DBDeployment { + return { + id: deployment.id, + commit_id: deployment.commitId, + created_at: deployment.createdAt, + superseded_at: deployment.supersededAt + }; + } +}; + +export async function create(db: knex.Knex, commitId: CommitHash): Promise> { + try { + return db.transaction(async (trx) => { + const now = new Date(); + // supersede any active deployments + await trx + .from(DEPLOYMENTS_TABLE) + .where({ + superseded_at: null + }) + .update({ superseded_at: now }); + // insert new deployment + const dbDeployment: Omit = { + commit_id: commitId, + created_at: now, + superseded_at: null + }; + const [inserted] = await trx.into(DEPLOYMENTS_TABLE).insert(dbDeployment).returning('*'); + if (!inserted) { + return Err(new Error(`Error: no deployment '${commitId}' created`)); + } + return Ok(DBDeployment.to(inserted)); + }); + } catch (err) { + return Err(new FleetError(`deployment_creation_error`, { cause: err, context: { commitId } })); + } +} + +export async function getActive(db: knex.Knex): Promise> { + try { + const active = await db.select('*').from(DEPLOYMENTS_TABLE).where({ superseded_at: null }).first(); + return Ok(active ? DBDeployment.to(active) : undefined); + } catch (err: unknown) { + return Err(new FleetError(`deployment_get_active_error`, { cause: err })); + } +} + +export async function get(db: knex.Knex, id: number): Promise> { + try { + const deployment = await db.select('*').from(DEPLOYMENTS_TABLE).where({ id }).first(); + if (!deployment) { + return Err(new FleetError(`deployment_not_found`, { context: { id } })); + } + return Ok(DBDeployment.to(deployment)); + } catch (err: unknown) { + return Err(new FleetError(`deployment_not_found`, { cause: err, context: { id } })); + } +} diff --git a/packages/fleet/lib/models/helpers.test.ts b/packages/fleet/lib/models/helpers.test.ts new file mode 100644 index 0000000000..38705aa4c2 --- /dev/null +++ b/packages/fleet/lib/models/helpers.test.ts @@ -0,0 +1,17 @@ +import type { CommitHash } from '../types'; +import crypto from 'crypto'; + +export function generateCommitHash(): CommitHash { + const charset = '0123456789abcdef'; + const length = 40; + const randomBytes = new Uint8Array(length); + crypto.getRandomValues(randomBytes); + + const value = Array.from(randomBytes) + .map((byte) => charset[byte % charset.length]) + .join(''); + if (value.length !== 40) { + throw new Error('CommitHash must be exactly 40 characters'); + } + return value as CommitHash; +} diff --git a/packages/fleet/lib/models/nodes.integration.test.ts b/packages/fleet/lib/models/nodes.integration.test.ts new file mode 100644 index 0000000000..5e8afebe2c --- /dev/null +++ b/packages/fleet/lib/models/nodes.integration.test.ts @@ -0,0 +1,161 @@ +import { expect, describe, it, beforeEach, afterEach } from 'vitest'; +import * as nodes from './nodes.js'; +import * as deployments from './deployments.js'; +import { nodeStates } from '../types.js'; +import type { NodeState, Node, RoutingId, Deployment } from '../types.js'; +import { getTestDbClient } from '../db/helpers.test.js'; +import type { knex } from 'knex'; +import { nanoid } from '@nangohq/utils'; +import { generateCommitHash } from './helpers.test.js'; + +describe('Nodes', () => { + const dbClient = getTestDbClient('nodes'); + const db = dbClient.db; + let previousDeployment: Deployment; + let activeDeployment: Deployment; + beforeEach(async () => { + await dbClient.migrate(); + previousDeployment = (await deployments.create(db, generateCommitHash())).unwrap(); + activeDeployment = (await deployments.create(db, generateCommitHash())).unwrap(); + }); + + afterEach(async () => { + await dbClient.clearDatabase(); + }); + + it('should be successfully created', async () => { + const node = ( + await nodes.create(db, { + routingId: 'my-routing-id', + deploymentId: activeDeployment.id, + url: 'http://localhost:3000', + image: 'nangohq/my-image:latest', + cpuMilli: 500, + memoryMb: 1024, + storageMb: 512 + }) + ).unwrap(); + expect(node).toStrictEqual({ + id: expect.any(Number), + routingId: 'my-routing-id', + deploymentId: activeDeployment.id, + url: 'http://localhost:3000', + state: 'PENDING', + image: 'nangohq/my-image:latest', + cpuMilli: 500, + memoryMb: 1024, + storageMb: 512, + error: null, + createdAt: expect.any(Date), + lastStateTransitionAt: expect.any(Date) + }); + }); + it('should transition between valid states and error when transitioning between invalid states', async () => { + for (const from of nodeStates) { + for (const to of nodeStates) { + const t = await createNodeWithState(db, { state: from, deploymentId: activeDeployment.id }); + if (nodes.validNodeStateTransitions.find((v) => v.from === from && v.to === to)) { + // sleep to ensure lastStateTransitionAt is different from the previous state + await new Promise((resolve) => void setTimeout(resolve, 2)); + const updated = await nodes.transitionTo(db, { nodeId: t.id, newState: to }); + expect(updated.unwrap().state).toBe(to); + expect(updated.unwrap().lastStateTransitionAt.getTime()).toBeGreaterThan(t.lastStateTransitionAt.getTime()); + } else { + const updated = await nodes.transitionTo(db, { nodeId: t.id, newState: to }); + expect(updated.isErr(), `transition from ${from} to ${to} failed`).toBe(true); + } + } + } + }); + it('should be searchable', async () => { + const route1PendingNode = await createNodeWithState(db, { state: 'PENDING', routingId: '1', deploymentId: activeDeployment.id }); + const route1RunningNode = await createNodeWithState(db, { + state: 'RUNNING', + routingId: route1PendingNode.routingId, + deploymentId: previousDeployment.id + }); + const startingNode = await createNodeWithState(db, { state: 'STARTING', deploymentId: activeDeployment.id }); + const runningNode = await createNodeWithState(db, { state: 'RUNNING', deploymentId: activeDeployment.id }); + const outdatedNode = await createNodeWithState(db, { state: 'OUTDATED', deploymentId: activeDeployment.id }); + const finishingNode = await createNodeWithState(db, { state: 'FINISHING', deploymentId: activeDeployment.id }); + const idleNode = await createNodeWithState(db, { state: 'IDLE', deploymentId: activeDeployment.id }); + const terminatedNode = await createNodeWithState(db, { state: 'TERMINATED', deploymentId: activeDeployment.id }); + const errorNode = await createNodeWithState(db, { state: 'ERROR', deploymentId: activeDeployment.id }); + + const searchAllStates = await nodes.search(db, { + states: ['PENDING', 'STARTING', 'RUNNING', 'OUTDATED', 'FINISHING', 'IDLE', 'TERMINATED', 'ERROR'] + }); + expect(searchAllStates.unwrap().nodes).toEqual( + new Map([ + [route1PendingNode.routingId, [route1PendingNode, route1RunningNode]], + [startingNode.routingId, [startingNode]], + [runningNode.routingId, [runningNode]], + [outdatedNode.routingId, [outdatedNode]], + [finishingNode.routingId, [finishingNode]], + [idleNode.routingId, [idleNode]], + [terminatedNode.routingId, [terminatedNode]], + [errorNode.routingId, [errorNode]] + ]) + ); + + const searchRunning = await nodes.search(db, { states: ['RUNNING'] }); + expect(searchRunning.unwrap().nodes).toEqual( + new Map([ + [route1RunningNode.routingId, [route1RunningNode]], + [runningNode.routingId, [runningNode]] + ]) + ); + + const searchWithWrongRoute = await nodes.search(db, { states: ['PENDING'], routingId: terminatedNode.routingId }); + expect(searchWithWrongRoute.unwrap().nodes).toEqual(new Map()); + }); + it('should be searchable (with pagination support)', async () => { + for (let i = 0; i < 12; i++) { + await createNodeWithState(db, { state: 'PENDING', routingId: i.toString(), deploymentId: activeDeployment.id }); + } + const searchFirstPage = (await nodes.search(db, { states: ['PENDING'], limit: 5 })).unwrap(); + expect(searchFirstPage.nodes.size).toBe(5); + expect(searchFirstPage.nextCursor).toBe(6); + + const searchSecondPage = (await nodes.search(db, { states: ['PENDING'], limit: 5, cursor: searchFirstPage.nextCursor! })).unwrap(); + expect(searchSecondPage.nodes.size).toBe(5); + expect(searchSecondPage.nextCursor).toBe(11); + + const searchThirdPage = (await nodes.search(db, { states: ['PENDING'], limit: 5, cursor: searchSecondPage.nextCursor! })).unwrap(); + expect(searchThirdPage.nodes.size).toBe(2); + expect(searchThirdPage.nextCursor).toBe(undefined); + }); +}); + +async function createNodeWithState( + db: knex.Knex, + { state, deploymentId, routingId = nanoid() }: { state: NodeState; deploymentId: number; routingId?: RoutingId } +): Promise { + let node = await createNode(db, { routingId, deploymentId }); + if (state == 'ERROR') { + return (await nodes.fail(db, { nodeId: node.id, error: 'my error' })).unwrap(); + } + // transition to the desired state + while (node.state !== state) { + const nextState = nodes.validNodeStateTransitions.find((v) => v.from === node.state)?.to; + if (nextState) { + node = (await nodes.transitionTo(db, { nodeId: node.id, newState: nextState })).unwrap(); + } else { + throw new Error(`Cannot transition node to state '${state}'`); + } + } + return node; +} + +async function createNode(db: knex.Knex, { routingId, deploymentId }: { routingId: RoutingId; deploymentId: number }): Promise { + const node = await nodes.create(db, { + routingId, + deploymentId, + url: 'http://localhost:1234', + image: 'nangohq/my-image:latest', + cpuMilli: 500, + memoryMb: 1024, + storageMb: 512 + }); + return node.unwrap(); +} diff --git a/packages/fleet/lib/models/nodes.ts b/packages/fleet/lib/models/nodes.ts new file mode 100644 index 0000000000..9495fa0e9f --- /dev/null +++ b/packages/fleet/lib/models/nodes.ts @@ -0,0 +1,234 @@ +import type knex from 'knex'; +import type { Result } from '@nangohq/utils'; +import { Ok, Err } from '@nangohq/utils'; +import type { NodeState, Node, RoutingId } from '../types.js'; +import { FleetError } from '../utils/errors.js'; + +export const NODES_TABLE = 'nodes'; + +interface NodeStateTransition { + from: NodeState; + to: NodeState; +} + +export const validNodeStateTransitions = [ + { from: 'PENDING', to: 'STARTING' }, + { from: 'STARTING', to: 'RUNNING' }, + { from: 'RUNNING', to: 'OUTDATED' }, + { from: 'OUTDATED', to: 'FINISHING' }, + { from: 'FINISHING', to: 'IDLE' }, + { from: 'IDLE', to: 'TERMINATED' } +] as const; +export type ValidNodeStateTransitions = (typeof validNodeStateTransitions)[number]; +const NodeStateTransition = { + validate({ from, to }: { from: NodeState; to: Omit }): Result { + const transition = validNodeStateTransitions.find((t) => t.from === from && t.to === to); + if (transition) { + return Ok(transition); + } else { + return Err(new FleetError(`node_invalid_state_transition`, { context: { from: from.toString(), to: to.toString() } })); + } + } +}; + +export interface DBNode { + readonly id: number; + readonly routing_id: RoutingId; + readonly deployment_id: number; + readonly url: string | null; + readonly state: NodeState; + readonly image: string; + readonly cpu_milli: number; + readonly memory_mb: number; + readonly storage_mb: number; + readonly error: string | null; + readonly created_at: Date; + readonly last_state_transition_at: Date; +} + +export const DBNode = { + to: (node: Node): DBNode => { + return { + id: node.id, + routing_id: node.routingId, + deployment_id: node.deploymentId, + url: node.url, + state: node.state, + image: node.image, + cpu_milli: node.cpuMilli, + memory_mb: node.memoryMb, + storage_mb: node.storageMb, + error: node.error, + created_at: node.createdAt, + last_state_transition_at: node.lastStateTransitionAt + }; + }, + from: (dbNode: DBNode): Node => { + return { + id: dbNode.id, + routingId: dbNode.routing_id, + deploymentId: dbNode.deployment_id, + url: dbNode.url, + state: dbNode.state, + image: dbNode.image, + cpuMilli: dbNode.cpu_milli, + memoryMb: dbNode.memory_mb, + storageMb: dbNode.storage_mb, + error: dbNode.error, + createdAt: dbNode.created_at, + lastStateTransitionAt: dbNode.last_state_transition_at + }; + } +}; + +export async function create(db: knex.Knex, nodeProps: Omit): Promise> { + const now = new Date(); + const newNode: Omit = { + routing_id: nodeProps.routingId, + deployment_id: nodeProps.deploymentId, + url: nodeProps.url, + state: 'PENDING', + image: nodeProps.image, + cpu_milli: nodeProps.cpuMilli, + memory_mb: nodeProps.memoryMb, + storage_mb: nodeProps.storageMb, + error: null, + created_at: now, + last_state_transition_at: now + }; + try { + const inserted = await db.from(NODES_TABLE).insert(newNode).returning('*'); + if (!inserted?.[0]) { + return Err(new FleetError(`node_creation_error`, { context: nodeProps })); + } + return Ok(DBNode.from(inserted[0])); + } catch (err) { + return Err(new FleetError(`node_creation_error`, { cause: err, context: nodeProps })); + } +} +export async function get(db: knex.Knex, nodeId: number, options: { forUpdate: boolean } = { forUpdate: false }): Promise> { + try { + const query = db.select('*').from(NODES_TABLE).where({ id: nodeId }).first(); + if (options.forUpdate) { + query.forUpdate(); + } + const node = await query; + if (!node) { + return Err(new FleetError(`node_not_found`, { context: { nodeId } })); + } + return Ok(DBNode.from(node)); + } catch (err) { + return Err(new FleetError(`node_not_found`, { cause: err, context: { nodeId } })); + } +} + +export async function search( + db: knex.Knex, + params: { + states: [NodeState, ...NodeState[]]; // non-empty array + routingId?: RoutingId; + cursor?: number; + limit?: number; + } +): Promise< + Result<{ + nodes: Map; + nextCursor?: number; + }> +> { + try { + const limit = params.limit || 1000; + const query = db + .select('*') + .from(NODES_TABLE) + .whereIn('state', params.states) + .orderBy('id') + .limit(limit + 1); // fetch one more than limit to determine if there are more results + + if (params.routingId) { + query.where({ routing_id: params.routingId }); + } + if (params.cursor) { + query.where('id', '>=', params.cursor); + } + + const nodes = await query; + + const nextCursor = nodes.length > limit ? nodes.pop()?.id : undefined; + + const nodesMap = new Map(); + + for (const node of nodes) { + const routingId = node.routing_id; + const existingNodes = nodesMap.get(routingId) || []; + existingNodes.push(DBNode.from(node)); + nodesMap.set(routingId, existingNodes); + } + + return Ok({ + nodes: nodesMap, + ...(nextCursor ? { nextCursor } : {}) + }); + } catch (err) { + return Err(new FleetError(`node_search_error`, { cause: err, context: params })); + } +} + +export async function transitionTo( + db: knex.Knex, + props: { + nodeId: number; + newState: Omit; + } +): Promise> { + try { + return db.transaction(async (trx) => { + const getNode = await get(trx, props.nodeId, { forUpdate: true }); + if (getNode.isErr()) { + return getNode; + } + + const transition = NodeStateTransition.validate({ from: getNode.value.state, to: props.newState }); + if (transition.isErr()) { + return Err(transition.error); + } + + const updated = await trx + .from(NODES_TABLE) + .where('id', props.nodeId) + .update({ + state: transition.value.to, + last_state_transition_at: new Date() + }) + .returning('*'); + if (!updated?.[0]) { + return Err(new FleetError(`node_transition_error`, { context: { nodeId: props.nodeId, newState: props.newState.toString() } })); + } + return Ok(DBNode.from(updated[0])); + }); + } catch (err) { + return Err(new FleetError(`node_transition_error`, { cause: err, context: { nodeId: props.nodeId, newState: props.newState.toString() } })); + } +} + +export async function fail( + db: knex.Knex, + props: { + nodeId: number; + error: string; + } +): Promise> { + try { + const updated = await db + .from(NODES_TABLE) + .where({ id: props.nodeId }) + .update({ state: 'ERROR', error: props.error, last_state_transition_at: new Date() }) + .returning('*'); + if (!updated?.[0]) { + return Err(new FleetError(`node_fail_error`, { context: props })); + } + return Ok(DBNode.from(updated[0])); + } catch (err) { + return Err(new FleetError(`node_fail_error`, { cause: err, context: props })); + } +} diff --git a/packages/fleet/lib/tracer.ts b/packages/fleet/lib/tracer.ts new file mode 100644 index 0000000000..6f0a5a9271 --- /dev/null +++ b/packages/fleet/lib/tracer.ts @@ -0,0 +1,11 @@ +import tracer from 'dd-trace'; + +tracer.init({ + service: 'nango-fleet' +}); +tracer.use('pg', { + service: (params: { database: string }) => `postgres-${params.database}` +}); +tracer.use('dns', { + enabled: false +}); diff --git a/packages/fleet/lib/types.ts b/packages/fleet/lib/types.ts new file mode 100644 index 0000000000..cf8b4aabbd --- /dev/null +++ b/packages/fleet/lib/types.ts @@ -0,0 +1,28 @@ +export type CommitHash = string & { readonly length: 40 }; + +export interface Deployment { + readonly id: number; + readonly commitId: CommitHash; + readonly createdAt: Date; + readonly supersededAt: Date | null; +} + +export type RoutingId = string; + +export const nodeStates = ['PENDING', 'STARTING', 'RUNNING', 'OUTDATED', 'FINISHING', 'IDLE', 'TERMINATED', 'ERROR'] as const; +export type NodeState = (typeof nodeStates)[number]; + +export interface Node { + readonly id: number; + readonly routingId: RoutingId; + readonly deploymentId: number; + readonly url: string | null; + readonly state: NodeState; + readonly image: string; + readonly cpuMilli: number; + readonly memoryMb: number; + readonly storageMb: number; + readonly error: string | null; + readonly createdAt: Date; + readonly lastStateTransitionAt: Date; +} diff --git a/packages/fleet/lib/utils/errors.ts b/packages/fleet/lib/utils/errors.ts new file mode 100644 index 0000000000..b6aea14f72 --- /dev/null +++ b/packages/fleet/lib/utils/errors.ts @@ -0,0 +1,23 @@ +import type { Jsonable } from '@nangohq/types'; + +type FleetErrorCode = + | 'deployment_creation_error' + | 'deployment_get_active_error' + | 'deployment_not_found' + | 'node_invalid_state_transition' + | 'node_not_found' + | 'node_creation_error' + | 'node_search_error' + | 'node_transition_error' + | 'node_fail_error'; + +export class FleetError extends Error { + public readonly context?: Jsonable; + + constructor(code: FleetErrorCode, options: { cause?: unknown; context?: Jsonable } = {}) { + const { cause, context } = options; + super(code, { cause }); + this.name = this.constructor.name; + this.context = context; + } +} diff --git a/packages/fleet/lib/utils/logger.ts b/packages/fleet/lib/utils/logger.ts new file mode 100644 index 0000000000..86a0f133eb --- /dev/null +++ b/packages/fleet/lib/utils/logger.ts @@ -0,0 +1,3 @@ +import { getLogger } from '@nangohq/utils'; + +export const logger = getLogger('fleet'); diff --git a/packages/fleet/package.json b/packages/fleet/package.json new file mode 100644 index 0000000000..5c2e074dbe --- /dev/null +++ b/packages/fleet/package.json @@ -0,0 +1,27 @@ +{ + "name": "@nangohq/fleet", + "version": "1.0.0", + "type": "module", + "main": "dist/index.js", + "private": true, + "scripts": { + "dev:migration:create": "npm run knex -- migrate:make", + "dev:migration:run": "npm run knex -- migrate:latest", + "knex": "tsx ../../node_modules/knex/bin/cli.js --knexfile lib/db/knexfile.ts", + "prod:migration:run": "knex --knexfile dist/db/knexfile.js migrate:latest" + }, + "keywords": [], + "repository": { + "type": "git", + "url": "git+https://github.com/NangoHQ/nango.git", + "directory": "packages/fleet" + }, + "dependencies": { + "@nangohq/utils": "file:../utils", + "dd-trace": "5.21.0", + "knex": "3.1.0" + }, + "devDependencies": { + "vitest": "1.6.0" + } +} diff --git a/packages/fleet/tsconfig.json b/packages/fleet/tsconfig.json new file mode 100644 index 0000000000..7d2bb9e2a7 --- /dev/null +++ b/packages/fleet/tsconfig.json @@ -0,0 +1,9 @@ +{ + "extends": "../../tsconfig.json", + "compilerOptions": { + "rootDir": "lib", + "outDir": "dist" + }, + "references": [{ "path": "../utils" }], + "include": ["lib/**/*", "../utils/lib/vitest.d.ts"] +} diff --git a/packages/types/lib/utils.ts b/packages/types/lib/utils.ts index f5ecc9279c..9a3c2840a7 100644 --- a/packages/types/lib/utils.ts +++ b/packages/types/lib/utils.ts @@ -17,3 +17,5 @@ export type NullablePartial > & Pick>; + +export type Jsonable = string | number | boolean | null | undefined | readonly Jsonable[] | { readonly [key: string]: Jsonable } | { toJSON(): Jsonable }; diff --git a/tsconfig.build.json b/tsconfig.build.json index 127f36c6e4..349773ae41 100644 --- a/tsconfig.build.json +++ b/tsconfig.build.json @@ -63,6 +63,9 @@ }, { "path": "packages/keystore" + }, + { + "path": "packages/fleet" } ] }