Skip to content

Commit

Permalink
feat: introducing the fleet package (#3105)
Browse files Browse the repository at this point in the history
The fleet package will be used to managed the lifecycle of runners 

This commit includes:
- migration for deployments and nodes tables
- deployments and nodes models (with tests) 

This commit DOESN'T include (I though that would be enough for a first
PR):
- the control loop to handle nodes state transition
- the routing logic

<!-- Testing instructions (skip if just adding/editing providers) -->
# How to test
The code is not used so it is not possible to run it manually. There is
tests though :)
  • Loading branch information
TBonnin authored Dec 5, 2024
1 parent 2379484 commit 17f68e6
Show file tree
Hide file tree
Showing 21 changed files with 841 additions and 0 deletions.
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ COPY packages/connect-ui/package.json ./packages/connect-ui/package.json
COPY packages/utils/package.json ./packages/utils/package.json
COPY packages/webapp/package.json ./packages/webapp/package.json
COPY packages/webhooks/package.json ./packages/webhooks/package.json
COPY packages/fleet/package.json ./packages/fleet/package.json
COPY package*.json ./

# Install every dependencies
Expand Down
15 changes: 15 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

65 changes: 65 additions & 0 deletions packages/fleet/lib/db/client.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import path from 'node:path';
import knex from 'knex';
import { fileURLToPath } from 'node:url';
import { logger } from '../utils/logger.js';
import { isTest } from '@nangohq/utils';

const runningMigrationOnly = process.argv.some((v) => v === 'migrate:latest');
const isJS = !runningMigrationOnly;

export class DatabaseClient {
public db: knex.Knex;
public schema: string;
public url: string;
private config: knex.Knex.Config;

constructor({ url, schema, poolMax = 50 }: { url: string; schema: string; poolMax?: number }) {
this.url = url;
this.schema = schema;
this.config = {
client: 'postgres',
connection: {
connectionString: url,
statement_timeout: 60000
},
searchPath: schema,
pool: { min: 2, max: poolMax },
migrations: {
extension: isJS ? 'js' : 'ts',
directory: 'migrations',
tableName: 'migrations',
loadExtensions: [isJS ? '.js' : '.ts'],
schemaName: schema
}
};
this.db = knex(this.config);
}

async migrate(): Promise<void> {
logger.info(`[fleet - ${this.schema}] migration`);

const filename = fileURLToPath(import.meta.url);
const dirname = path.dirname(path.join(filename, '../../'));
const dir = path.join(dirname, 'dist/db/migrations');
await this.db.raw(`CREATE SCHEMA IF NOT EXISTS ${this.schema}`);

const [, pendingMigrations] = (await this.db.migrate.list({ ...this.config.migrations, directory: dir })) as [unknown, string[]];

if (pendingMigrations.length === 0) {
logger.info(`[fleet - ${this.schema}] nothing to do`);
return;
}

await this.db.migrate.latest({ ...this.config.migrations, directory: dir });
logger.info(`[fleet - ${this.schema}] migrations completed.`);
}

/*********************************/
/* WARNING: to use only in tests */
/*********************************/
async clearDatabase(): Promise<void> {
if (isTest) {
await this.db.raw(`DROP SCHEMA IF EXISTS ${this.schema} CASCADE`);
}
}
}
9 changes: 9 additions & 0 deletions packages/fleet/lib/db/helpers.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import { DatabaseClient } from './client.js';

export const testDbUrl = `postgres://${process.env['NANGO_DB_USER']}:${process.env['NANGO_DB_PASSWORD']}@${process.env['NANGO_DB_HOST']}:${process.env['NANGO_DB_PORT']}`;

export const getTestDbClient = (schema: string) =>
new DatabaseClient({
url: testDbUrl,
schema
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import type { Knex } from 'knex';
import { NODES_TABLE } from '../../models/nodes.js';
import { DEPLOYMENTS_TABLE } from '../../models/deployments.js';

export async function up(knex: Knex): Promise<void> {
await knex.transaction(async (trx) => {
await trx.raw(`
CREATE TABLE ${DEPLOYMENTS_TABLE} (
id SERIAL PRIMARY KEY,
commit_id char(40) NOT NULL,
created_at timestamp with time zone NOT NULL DEFAULT CURRENT_TIMESTAMP,
superseded_at timestamp with time zone
);
`);
await trx.raw(`
CREATE INDEX idx_${DEPLOYMENTS_TABLE}_active ON ${DEPLOYMENTS_TABLE}(superseded_at) WHERE superseded_at IS NULL;
`);
await trx.raw(`
CREATE TYPE node_states AS ENUM (
'PENDING',
'STARTING',
'RUNNING',
'OUTDATED',
'FINISHING',
'IDLE',
'TERMINATED',
'ERROR'
);
`);
await trx.raw(`
CREATE TABLE ${NODES_TABLE} (
id SERIAL PRIMARY KEY,
routing_id varchar(255) NOT NULL,
deployment_id int NOT NULL REFERENCES deployments(id) ON DELETE CASCADE,
url varchar(1024),
state node_states NOT NULL,
image varchar(255) NOT NULL,
cpu_milli int NOT NULL,
memory_mb int NOT NULL,
storage_mb int NOT NULL,
error text,
created_at timestamp with time zone NOT NULL DEFAULT CURRENT_TIMESTAMP,
last_state_transition_at timestamp with time zone NOT NULL DEFAULT CURRENT_TIMESTAMP
);
`);
await trx.raw(`
CREATE INDEX idx_${NODES_TABLE}_routingId_state
ON ${NODES_TABLE}(routing_id, state)
WHERE state IN ('PENDING', 'STARTING', 'RUNNING', 'OUTDATED');
`);
});
}

export async function down(knex: Knex): Promise<void> {
await knex.raw(`DROP TABLE IF EXISTS ${DEPLOYMENTS_TABLE}`);
await knex.raw(`DROP TABLE IF EXISTS ${NODES_TABLE}`);
await knex.raw(`DROP TYPE IF EXISTS node_states`);
}
29 changes: 29 additions & 0 deletions packages/fleet/lib/fleet.integration.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import { expect, describe, it, beforeAll, afterAll, afterEach } from 'vitest';
import { Fleet } from './fleet.js';
import { getTestDbClient, testDbUrl } from './db/helpers.test.js';
import { generateCommitHash } from './models/helpers.test.js';

describe('fleet', () => {
const fleetId = 'my_fleet';
const fleet = new Fleet({ fleetId, dbUrl: testDbUrl });

beforeAll(async () => {
await fleet.migrate();
});

afterEach(() => {});

afterAll(async () => {
await getTestDbClient(fleetId).clearDatabase();
});

describe('deploy', () => {
it('should create a new deployment', async () => {
const commitId = generateCommitHash();
const deployment = (await fleet.deploy(commitId)).unwrap();
expect(deployment.commitId).toBe(commitId);
expect(deployment.createdAt).toBeInstanceOf(Date);
expect(deployment.supersededAt).toBe(null);
});
});
});
19 changes: 19 additions & 0 deletions packages/fleet/lib/fleet.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import type { Result } from '@nangohq/utils';
import { DatabaseClient } from './db/client.js';
import * as deployments from './models/deployments.js';
import type { CommitHash, Deployment } from './types.js';

export class Fleet {
private dbClient: DatabaseClient;
constructor({ fleetId, dbUrl }: { fleetId: string; dbUrl: string }) {
this.dbClient = new DatabaseClient({ url: dbUrl, schema: fleetId });
}

public async migrate(): Promise<void> {
await this.dbClient.migrate();
}

public async deploy(commitId: CommitHash): Promise<Result<Deployment>> {
return deployments.create(this.dbClient.db, commitId);
}
}
2 changes: 2 additions & 0 deletions packages/fleet/lib/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export * from './fleet.js';
export * from './types.js';
43 changes: 43 additions & 0 deletions packages/fleet/lib/models/deployments.integration.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import { expect, describe, it, beforeEach, afterEach } from 'vitest';
import * as deployments from './deployments.js';
import { getTestDbClient } from '../db/helpers.test.js';
import { generateCommitHash } from './helpers.test.js';

describe('Deployments', () => {
const dbClient = getTestDbClient('deployments');
const db = dbClient.db;
beforeEach(async () => {
await dbClient.migrate();
});

afterEach(async () => {
await dbClient.clearDatabase();
});

describe('create', () => {
it('should create a deployment', async () => {
const commitId = generateCommitHash();
const deployment = (await deployments.create(db, commitId)).unwrap();
expect(deployment.commitId).toBe(commitId);
expect(deployment.createdAt).toBeInstanceOf(Date);
expect(deployment.supersededAt).toBe(null);
});
it('should supersede any active deployments', async () => {
const commitId1 = generateCommitHash();
const commitId2 = generateCommitHash();

const deployment1 = (await deployments.create(db, commitId1)).unwrap();
const deployment2 = (await deployments.create(db, commitId2)).unwrap();

expect((await deployments.get(db, deployment1.id)).unwrap().supersededAt).not.toBe(null);
expect((await deployments.get(db, deployment2.id)).unwrap().supersededAt).toBe(null);
});
});

describe('getActive', () => {
it('should return undefined if no deployments yet', async () => {
const active = (await deployments.getActive(db)).unwrap();
expect(active).toBe(undefined);
});
});
});
82 changes: 82 additions & 0 deletions packages/fleet/lib/models/deployments.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import type knex from 'knex';
import type { Result } from '@nangohq/utils';
import { Err, Ok } from '@nangohq/utils';
import type { CommitHash, Deployment } from '../types.js';
import { FleetError } from '../utils/errors.js';

export const DEPLOYMENTS_TABLE = 'deployments';

export interface DBDeployment {
readonly id: number;
readonly commit_id: CommitHash;
readonly created_at: Date;
readonly superseded_at: Date | null;
}

const DBDeployment = {
to(dbDeployment: DBDeployment): Deployment {
return {
id: dbDeployment.id,
commitId: dbDeployment.commit_id,
createdAt: dbDeployment.created_at,
supersededAt: dbDeployment.superseded_at
};
},
from(deployment: Deployment): DBDeployment {
return {
id: deployment.id,
commit_id: deployment.commitId,
created_at: deployment.createdAt,
superseded_at: deployment.supersededAt
};
}
};

export async function create(db: knex.Knex, commitId: CommitHash): Promise<Result<Deployment>> {
try {
return db.transaction(async (trx) => {
const now = new Date();
// supersede any active deployments
await trx
.from<DBDeployment>(DEPLOYMENTS_TABLE)
.where({
superseded_at: null
})
.update({ superseded_at: now });
// insert new deployment
const dbDeployment: Omit<DBDeployment, 'id'> = {
commit_id: commitId,
created_at: now,
superseded_at: null
};
const [inserted] = await trx.into<DBDeployment>(DEPLOYMENTS_TABLE).insert(dbDeployment).returning('*');
if (!inserted) {
return Err(new Error(`Error: no deployment '${commitId}' created`));
}
return Ok(DBDeployment.to(inserted));
});
} catch (err) {
return Err(new FleetError(`deployment_creation_error`, { cause: err, context: { commitId } }));
}
}

export async function getActive(db: knex.Knex): Promise<Result<Deployment | undefined>> {
try {
const active = await db.select<DBDeployment>('*').from(DEPLOYMENTS_TABLE).where({ superseded_at: null }).first();
return Ok(active ? DBDeployment.to(active) : undefined);
} catch (err: unknown) {
return Err(new FleetError(`deployment_get_active_error`, { cause: err }));
}
}

export async function get(db: knex.Knex, id: number): Promise<Result<Deployment>> {
try {
const deployment = await db.select<DBDeployment>('*').from(DEPLOYMENTS_TABLE).where({ id }).first();
if (!deployment) {
return Err(new FleetError(`deployment_not_found`, { context: { id } }));
}
return Ok(DBDeployment.to(deployment));
} catch (err: unknown) {
return Err(new FleetError(`deployment_not_found`, { cause: err, context: { id } }));
}
}
17 changes: 17 additions & 0 deletions packages/fleet/lib/models/helpers.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import type { CommitHash } from '../types';
import crypto from 'crypto';

export function generateCommitHash(): CommitHash {
const charset = '0123456789abcdef';
const length = 40;
const randomBytes = new Uint8Array(length);
crypto.getRandomValues(randomBytes);

const value = Array.from(randomBytes)
.map((byte) => charset[byte % charset.length])
.join('');
if (value.length !== 40) {
throw new Error('CommitHash must be exactly 40 characters');
}
return value as CommitHash;
}
Loading

0 comments on commit 17f68e6

Please sign in to comment.