Skip to content

Commit

Permalink
feat(fleet): add control loop and state management (#3116)
Browse files Browse the repository at this point in the history
- Implement Supervisor class to manage node states and transitions
- Add methods to start, fail, outdate, terminate, and remove nodes
- Introduce NodeProvider interface for node operations
- Update node state transitions and error handling
- Add integration tests for Supervisor functionality

What's not yet implemented:
- routing logic
- Render NodeProvider
- wiring fleet in jobs

There are still a few TODOs that I will address in following PRs

# How to test
Still not wired. You can run the tests ;)
  • Loading branch information
TBonnin authored Dec 5, 2024
1 parent 17f68e6 commit 9eeb271
Show file tree
Hide file tree
Showing 8 changed files with 669 additions and 93 deletions.
1 change: 1 addition & 0 deletions packages/fleet/lib/models/deployments.integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ describe('Deployments', () => {
expect(deployment.createdAt).toBeInstanceOf(Date);
expect(deployment.supersededAt).toBe(null);
});

it('should supersede any active deployments', async () => {
const commitId1 = generateCommitHash();
const commitId2 = generateCommitHash();
Expand Down
56 changes: 56 additions & 0 deletions packages/fleet/lib/models/helpers.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
import type { CommitHash } from '../types';
import crypto from 'crypto';
import type { NodeState, Node, RoutingId } from '../types.js';
import type { knex } from 'knex';
import { nanoid } from '@nangohq/utils';
import * as nodes from './nodes.js';

export function generateCommitHash(): CommitHash {
const charset = '0123456789abcdef';
Expand All @@ -15,3 +19,55 @@ export function generateCommitHash(): CommitHash {
}
return value as CommitHash;
}

export async function createNodeWithAttributes(
db: knex.Knex,
{
state,
deploymentId,
routingId = nanoid(),
lastStateTransitionAt
}: { state: NodeState; deploymentId: number; routingId?: RoutingId; lastStateTransitionAt?: Date }
): Promise<Node> {
return db.transaction(async (trx) => {
let node = await createNode(trx, { routingId, deploymentId });
if (state == 'ERROR') {
node = (await nodes.fail(trx, { nodeId: node.id, reason: 'my error' })).unwrap();
}
// transition to the desired state
while (node.state !== state) {
const nextState = nodes.validNodeStateTransitions.find((v) => v.from === node.state && v.to !== 'ERROR')?.to;
if (nextState === 'RUNNING') {
node = (await nodes.register(trx, { nodeId: node.id, url: 'http://my-url' })).unwrap();
} else if (nextState && nextState !== 'ERROR') {
node = (await nodes.transitionTo(trx, { nodeId: node.id, newState: nextState })).unwrap();
} else {
throw new Error(`Cannot transition node to state '${state}'`);
}
}
if (lastStateTransitionAt) {
await trx
.from(nodes.NODES_TABLE)
.update({ created_at: lastStateTransitionAt, last_state_transition_at: lastStateTransitionAt })
.where('id', node.id);
node = {
...node,
createdAt: lastStateTransitionAt,
lastStateTransitionAt
};
}
return node;
});
}

async function createNode(db: knex.Knex, { routingId, deploymentId }: { routingId: RoutingId; deploymentId: number }): Promise<Node> {
const node = await nodes.create(db, {
routingId,
deploymentId,
image: 'nangohq/my-image:latest',
cpuMilli: 500,
memoryMb: 1024,
storageMb: 512
});
return node.unwrap();
}
118 changes: 58 additions & 60 deletions packages/fleet/lib/models/nodes.integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,14 @@ import { expect, describe, it, beforeEach, afterEach } from 'vitest';
import * as nodes from './nodes.js';
import * as deployments from './deployments.js';
import { nodeStates } from '../types.js';
import type { NodeState, Node, RoutingId, Deployment } from '../types.js';
import type { NodeState, Deployment } from '../types.js';
import { getTestDbClient } from '../db/helpers.test.js';
import type { knex } from 'knex';
import { nanoid } from '@nangohq/utils';
import { generateCommitHash } from './helpers.test.js';
import { generateCommitHash, createNodeWithAttributes } from './helpers.test.js';

describe('Nodes', () => {
const dbClient = getTestDbClient('nodes');
const db = dbClient.db;

let previousDeployment: Deployment;
let activeDeployment: Deployment;
beforeEach(async () => {
Expand All @@ -28,7 +27,6 @@ describe('Nodes', () => {
await nodes.create(db, {
routingId: 'my-routing-id',
deploymentId: activeDeployment.id,
url: 'http://localhost:3000',
image: 'nangohq/my-image:latest',
cpuMilli: 500,
memoryMb: 1024,
Expand All @@ -39,7 +37,7 @@ describe('Nodes', () => {
id: expect.any(Number),
routingId: 'my-routing-id',
deploymentId: activeDeployment.id,
url: 'http://localhost:3000',
url: null,
state: 'PENDING',
image: 'nangohq/my-image:latest',
cpuMilli: 500,
Expand All @@ -50,68 +48,80 @@ describe('Nodes', () => {
lastStateTransitionAt: expect.any(Date)
});
});

it('should transition between valid states and error when transitioning between invalid states', async () => {
const doTransition = async ({ nodeId, newState }: { nodeId: number; newState: NodeState }) => {
if (newState === 'RUNNING') {
return await nodes.register(db, { nodeId, url: 'http://my-url' });
} else if (newState === 'ERROR') {
return await nodes.fail(db, { nodeId, reason: 'my error' });
} else {
return await nodes.transitionTo(db, { nodeId, newState });
}
};
for (const from of nodeStates) {
for (const to of nodeStates) {
const t = await createNodeWithState(db, { state: from, deploymentId: activeDeployment.id });
const t = await createNodeWithAttributes(db, { state: from, deploymentId: activeDeployment.id });
if (nodes.validNodeStateTransitions.find((v) => v.from === from && v.to === to)) {
// sleep to ensure lastStateTransitionAt is different from the previous state
await new Promise((resolve) => void setTimeout(resolve, 2));
const updated = await nodes.transitionTo(db, { nodeId: t.id, newState: to });
const updated = await doTransition({ nodeId: t.id, newState: to });
expect(updated.unwrap().state).toBe(to);
expect(updated.unwrap().lastStateTransitionAt.getTime()).toBeGreaterThan(t.lastStateTransitionAt.getTime());
} else {
const updated = await nodes.transitionTo(db, { nodeId: t.id, newState: to });
const updated = await doTransition({ nodeId: t.id, newState: to });
expect(updated.isErr(), `transition from ${from} to ${to} failed`).toBe(true);
}
}
}
});

it('should be searchable', async () => {
const route1PendingNode = await createNodeWithState(db, { state: 'PENDING', routingId: '1', deploymentId: activeDeployment.id });
const route1RunningNode = await createNodeWithState(db, {
const route1PendingNode = await createNodeWithAttributes(db, { state: 'PENDING', routingId: '1', deploymentId: activeDeployment.id });
const route1RunningNode = await createNodeWithAttributes(db, {
state: 'RUNNING',
routingId: route1PendingNode.routingId,
deploymentId: previousDeployment.id
});
const startingNode = await createNodeWithState(db, { state: 'STARTING', deploymentId: activeDeployment.id });
const runningNode = await createNodeWithState(db, { state: 'RUNNING', deploymentId: activeDeployment.id });
const outdatedNode = await createNodeWithState(db, { state: 'OUTDATED', deploymentId: activeDeployment.id });
const finishingNode = await createNodeWithState(db, { state: 'FINISHING', deploymentId: activeDeployment.id });
const idleNode = await createNodeWithState(db, { state: 'IDLE', deploymentId: activeDeployment.id });
const terminatedNode = await createNodeWithState(db, { state: 'TERMINATED', deploymentId: activeDeployment.id });
const errorNode = await createNodeWithState(db, { state: 'ERROR', deploymentId: activeDeployment.id });
const startingNode = await createNodeWithAttributes(db, { state: 'STARTING', deploymentId: activeDeployment.id });
const runningNode = await createNodeWithAttributes(db, { state: 'RUNNING', deploymentId: activeDeployment.id });
const outdatedNode = await createNodeWithAttributes(db, { state: 'OUTDATED', deploymentId: activeDeployment.id });
const finishingNode = await createNodeWithAttributes(db, { state: 'FINISHING', deploymentId: activeDeployment.id });
const idleNode = await createNodeWithAttributes(db, { state: 'IDLE', deploymentId: activeDeployment.id });
const terminatedNode = await createNodeWithAttributes(db, { state: 'TERMINATED', deploymentId: activeDeployment.id });
const errorNode = await createNodeWithAttributes(db, { state: 'ERROR', deploymentId: activeDeployment.id });

const searchAllStates = await nodes.search(db, {
states: ['PENDING', 'STARTING', 'RUNNING', 'OUTDATED', 'FINISHING', 'IDLE', 'TERMINATED', 'ERROR']
});
expect(searchAllStates.unwrap().nodes).toEqual(
new Map([
[route1PendingNode.routingId, [route1PendingNode, route1RunningNode]],
[startingNode.routingId, [startingNode]],
[runningNode.routingId, [runningNode]],
[outdatedNode.routingId, [outdatedNode]],
[finishingNode.routingId, [finishingNode]],
[idleNode.routingId, [idleNode]],
[terminatedNode.routingId, [terminatedNode]],
[errorNode.routingId, [errorNode]]
[route1PendingNode.routingId, { PENDING: [route1PendingNode], RUNNING: [route1RunningNode] }],
[startingNode.routingId, { STARTING: [startingNode] }],
[runningNode.routingId, { RUNNING: [runningNode] }],
[outdatedNode.routingId, { OUTDATED: [outdatedNode] }],
[finishingNode.routingId, { FINISHING: [finishingNode] }],
[idleNode.routingId, { IDLE: [idleNode] }],
[terminatedNode.routingId, { TERMINATED: [terminatedNode] }],
[errorNode.routingId, { ERROR: [errorNode] }]
])
);

const searchRunning = await nodes.search(db, { states: ['RUNNING'] });
expect(searchRunning.unwrap().nodes).toEqual(
new Map([
[route1RunningNode.routingId, [route1RunningNode]],
[runningNode.routingId, [runningNode]]
[route1RunningNode.routingId, { RUNNING: [route1RunningNode] }],
[runningNode.routingId, { RUNNING: [runningNode] }]
])
);

const searchWithWrongRoute = await nodes.search(db, { states: ['PENDING'], routingId: terminatedNode.routingId });
expect(searchWithWrongRoute.unwrap().nodes).toEqual(new Map());
});

it('should be searchable (with pagination support)', async () => {
for (let i = 0; i < 12; i++) {
await createNodeWithState(db, { state: 'PENDING', routingId: i.toString(), deploymentId: activeDeployment.id });
await createNodeWithAttributes(db, { state: 'PENDING', routingId: i.toString(), deploymentId: activeDeployment.id });
}
const searchFirstPage = (await nodes.search(db, { states: ['PENDING'], limit: 5 })).unwrap();
expect(searchFirstPage.nodes.size).toBe(5);
Expand All @@ -125,37 +135,25 @@ describe('Nodes', () => {
expect(searchThirdPage.nodes.size).toBe(2);
expect(searchThirdPage.nextCursor).toBe(undefined);
});
});

async function createNodeWithState(
db: knex.Knex,
{ state, deploymentId, routingId = nanoid() }: { state: NodeState; deploymentId: number; routingId?: RoutingId }
): Promise<Node> {
let node = await createNode(db, { routingId, deploymentId });
if (state == 'ERROR') {
return (await nodes.fail(db, { nodeId: node.id, error: 'my error' })).unwrap();
}
// transition to the desired state
while (node.state !== state) {
const nextState = nodes.validNodeStateTransitions.find((v) => v.from === node.state)?.to;
if (nextState) {
node = (await nodes.transitionTo(db, { nodeId: node.id, newState: nextState })).unwrap();
} else {
throw new Error(`Cannot transition node to state '${state}'`);
}
}
return node;
}
it('should be able to fail a node', async () => {
const node = await createNodeWithAttributes(db, { state: 'PENDING', deploymentId: activeDeployment.id });
const failedNode = (await nodes.fail(db, { nodeId: node.id, reason: 'my error' })).unwrap();
expect(failedNode.state).toBe('ERROR');
expect(failedNode.error).toBe('my error');
});

async function createNode(db: knex.Knex, { routingId, deploymentId }: { routingId: RoutingId; deploymentId: number }): Promise<Node> {
const node = await nodes.create(db, {
routingId,
deploymentId,
url: 'http://localhost:1234',
image: 'nangohq/my-image:latest',
cpuMilli: 500,
memoryMb: 1024,
storageMb: 512
it('should be able to register a node', async () => {
const node = await createNodeWithAttributes(db, { state: 'STARTING', deploymentId: activeDeployment.id });
expect(node.url).toBe(null);
const registeredNode = (await nodes.register(db, { nodeId: node.id, url: 'http://my-url' })).unwrap();
expect(registeredNode.state).toBe('RUNNING');
expect(registeredNode.url).toBe('http://my-url');
});
return node.unwrap();
}

it('should be able to idle a node', async () => {
const node = await createNodeWithAttributes(db, { state: 'FINISHING', deploymentId: activeDeployment.id });
const idledNode = (await nodes.idle(db, { nodeId: node.id })).unwrap();
expect(idledNode.state).toBe('IDLE');
});
});
Loading

0 comments on commit 9eeb271

Please sign in to comment.