Skip to content

Commit

Permalink
feat: POC code for multi-tenant autoscaling
Browse files Browse the repository at this point in the history
  • Loading branch information
bsod90 committed Dec 29, 2022
1 parent 66a6257 commit 2e5e138
Show file tree
Hide file tree
Showing 11 changed files with 108 additions and 17 deletions.
6 changes: 6 additions & 0 deletions packages/cubejs-api-gateway/src/gateway.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import {
import {
CheckAuthMiddlewareFn,
RequestLoggerMiddlewareFn,
ContextRejectionMiddlewareFn,
} from './interfaces';
import { getRequestIdFromRequest, requestParser } from './requestParser';
import { UserError } from './UserError';
Expand Down Expand Up @@ -126,6 +127,8 @@ class ApiGateway {
protected readonly requestLoggerMiddleware: RequestLoggerMiddlewareFn;

protected readonly securityContextExtractor: SecurityContextExtractorFn;

protected readonly contextRejectionMiddleware: ContextRejectionMiddlewareFn;

protected readonly releaseListeners: (() => any)[] = [];

Expand Down Expand Up @@ -158,12 +161,14 @@ class ApiGateway {
: this.checkAuth;
this.securityContextExtractor = this.createSecurityContextExtractor(options.jwt);
this.requestLoggerMiddleware = options.requestLoggerMiddleware || this.requestLogger;
this.contextRejectionMiddleware = options.contextRejectionMiddleware || (async (req, res, next) => { next(); });
}

public initApp(app: ExpressApplication) {
const userMiddlewares: RequestHandler[] = [
this.checkAuthMiddleware,
this.requestContextMiddleware,
this.contextRejectionMiddleware,
this.logNetworkUsage,
this.requestLoggerMiddleware
];
Expand Down Expand Up @@ -279,6 +284,7 @@ class ApiGateway {
const systemMiddlewares: RequestHandler[] = [
this.checkAuthSystemMiddleware,
this.requestContextMiddleware,
this.contextRejectionMiddleware,
this.requestLoggerMiddleware
];

Expand Down
10 changes: 10 additions & 0 deletions packages/cubejs-api-gateway/src/interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,16 @@ export type CheckAuthMiddlewareFn =
next: ExpressNextFunction,
) => void;

/**
* Context rejection middleware.
*/
export type ContextRejectionMiddlewareFn =
(
req: Request,
res: ExpressResponse,
next: ExpressNextFunction,
) => void;

/**
* Logger middleware.
* @deprecated
Expand Down
2 changes: 2 additions & 0 deletions packages/cubejs-api-gateway/src/types/gateway.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import {
import {
CheckAuthMiddlewareFn,
RequestLoggerMiddlewareFn,
ContextRejectionMiddlewareFn,
} from '../interfaces';

type UserBackgroundContext = {
Expand Down Expand Up @@ -59,6 +60,7 @@ interface ApiGatewayOptions {
enforceSecurityChecks?: boolean;
playgroundAuthSecret?: string;
serverCoreVersion?: string;
contextRejectionMiddleware?: ContextRejectionMiddlewareFn;
checkAuth?: CheckAuthFn;
/**
* @deprecated Use checkAuth property instead.
Expand Down
1 change: 1 addition & 0 deletions packages/cubejs-server-core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
"lru-cache": "^5.1.1",
"moment": "^2.29.1",
"node-fetch": "^2.6.0",
"p-limit": "^4.0.0",
"promise-timeout": "^1.3.0",
"ramda": "^0.27.0",
"semver": "^6.3.0",
Expand Down
2 changes: 2 additions & 0 deletions packages/cubejs-server-core/src/core/OptsHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,8 @@ export class OptsHandler {
dashboardAppPort: 3000,
scheduledRefreshConcurrency:
parseInt(process.env.CUBEJS_SCHEDULED_REFRESH_CONCURRENCY, 10),
scheduledRefreshBatchSize:
parseInt(process.env.CUBEJS_SCHEDULED_REFRESH_BATCH_SIZE, 10) || 1,
preAggregationsSchema:
getEnv('preAggregationsSchema') ||
(this.isDevMode()
Expand Down
1 change: 1 addition & 0 deletions packages/cubejs-server-core/src/core/optionsValidate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ const schemaOptions = Joi.object().keys({
scheduledRefreshTimeZones: Joi.array().items(Joi.string()),
scheduledRefreshContexts: Joi.func(),
scheduledRefreshConcurrency: Joi.number().min(1).integer(),
scheduledRefreshBatchSize: Joi.number().min(1).integer(),
// Compiler cache
compilerCacheSize: Joi.number().min(0).integer(),
updateCompilerCacheKeepAlive: Joi.boolean(),
Expand Down
82 changes: 67 additions & 15 deletions packages/cubejs-server-core/src/core/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import crypto from 'crypto';
import fs from 'fs-extra';
import LRUCache from 'lru-cache';
import isDocker from 'is-docker';
import pLimit from 'p-limit';

import { ApiGateway, UserBackgroundContext } from '@cubejs-backend/api-gateway';
import {
Expand All @@ -11,7 +12,7 @@ import {
getEnv, assertDataSource, getRealType, internalExceptions, track,
} from '@cubejs-backend/shared';

import type { Application as ExpressApplication } from 'express';
import type { Application as ExpressApplication, NextFunction } from 'express';

import { BaseDriver, DriverFactoryByDataSource } from '@cubejs-backend/query-orchestrator';
import { FileRepository, SchemaFileRepository } from './FileRepository';
Expand Down Expand Up @@ -59,6 +60,22 @@ function wrapToFnIfNeeded<T, R>(possibleFn: T | ((a: R) => T)): (a: R) => T {
return () => possibleFn;
}

export type ContextAcceptanceResult = {
accepted: boolean;
rejectHeaders?: { [key: string]: string };
rejectStatusCode?: number;
};

export interface ContextAcceptor {
shouldAccept(context: RequestContext): ContextAcceptanceResult;
}

class AcceptAllAcceptor {
public shouldAccept(_context: RequestContext): ContextAcceptanceResult {
return { accepted: true };
}
}

export class CubejsServerCore {
/**
* Returns core version based on package.json.
Expand Down Expand Up @@ -134,14 +151,16 @@ export class CubejsServerCore {

protected apiGatewayInstance: ApiGateway | null = null;

protected contextAcceptor: ContextAcceptor;

public readonly event: (name: string, props?: object) => Promise<void>;

public projectFingerprint: string | null = null;

public anonymousId: string | null = null;

public coreServerVersion: string | null = null;

/**
* Class constructor.
*/
Expand Down Expand Up @@ -179,6 +198,8 @@ export class CubejsServerCore {
this.standalone = false;
}

this.contextAcceptor = new AcceptAllAcceptor();

if (this.options.contextToDataSourceId) {
throw new Error('contextToDataSourceId has been deprecated and removed. Use contextToOrchestratorId instead.');
}
Expand Down Expand Up @@ -419,7 +440,7 @@ export class CubejsServerCore {
return this.apiGatewayInstance;
}

return this.apiGatewayInstance = new ApiGateway(
return (this.apiGatewayInstance = new ApiGateway(
this.options.apiSecret,
this.getCompilerApi.bind(this),
this.getOrchestratorApi.bind(this),
Expand All @@ -429,17 +450,33 @@ export class CubejsServerCore {
dataSourceStorage: this.orchestratorStorage,
basePath: this.options.basePath,
checkAuthMiddleware: this.options.checkAuthMiddleware,
contextRejectionMiddleware: this.contextRejectionMiddleware.bind(this),
checkAuth: this.options.checkAuth,
queryRewrite: this.options.queryRewrite || this.options.queryTransformer,
queryRewrite:
this.options.queryRewrite || this.options.queryTransformer,
extendContext: this.options.extendContext,
playgroundAuthSecret: getEnv('playgroundAuthSecret'),
playgroundAuthSecret: getEnv("playgroundAuthSecret"),
jwt: this.options.jwt,
refreshScheduler: () => new RefreshScheduler(this),
scheduledRefreshContexts: this.options.scheduledRefreshContexts,
scheduledRefreshTimeZones: this.options.scheduledRefreshTimeZones,
serverCoreVersion: this.coreServerVersion
serverCoreVersion: this.coreServerVersion,
}
);
));
}

protected async contextRejectionMiddleware(req, res, next) {
if (!this.standalone) {
const result = this.contextAcceptor.shouldAccept(req.context);
if (!result.accepted) {
res.writeHead(result.rejectStatusCode!, result.rejectHeaders!);
res.send();
return;
}
}
if (next) {
next();
}
}

public getCompilerApi(context: RequestContext) {
Expand Down Expand Up @@ -648,22 +685,37 @@ export class CubejsServerCore {
* @internal Please dont use this method directly, use refreshTimer
*/
public handleScheduledRefreshInterval = async (options) => {
const contexts = await this.options.scheduledRefreshContexts();
const contexts = (await this.options.scheduledRefreshContexts()).filter(
(context) => context === null || this.contextAcceptor.shouldAccept(
this.migrateBackgroundContext(context)!
).accepted
);
console.log(`Selected ${contexts.length} contexts for refresh`);
if (contexts.length < 1) {
this.logger('Refresh Scheduler Error', {
error: 'At least one context should be returned by scheduledRefreshContexts'
});
}

return Promise.all(contexts.map(async context => {
const queryingOptions: any = { ...options, concurrency: this.options.scheduledRefreshConcurrency };
const batchLimit = pLimit(this.options.scheduledRefreshBatchSize);
console.log(`New batch with a size of ${this.options.scheduledRefreshBatchSize}`);
return Promise.all(
contexts
.map((context) => async () => {
const queryingOptions: any = {
...options,
concurrency: this.options.scheduledRefreshConcurrency,
};

if (this.options.scheduledRefreshTimeZones) {
queryingOptions.timezones = this.options.scheduledRefreshTimeZones;
}
if (this.options.scheduledRefreshTimeZones) {
queryingOptions.timezones = this.options.scheduledRefreshTimeZones;
}

return this.runScheduledRefresh(context, queryingOptions);
}));
return this.runScheduledRefresh(context, queryingOptions);
})
// Limit the number of refresh contexts we process per iteration
.map(batchLimit)
);
};

protected getRefreshScheduler() {
Expand Down
1 change: 1 addition & 0 deletions packages/cubejs-server-core/src/core/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ export interface CreateOptions {
scheduledRefreshTimeZones?: string[];
scheduledRefreshContexts?: () => Promise<UserBackgroundContext[]>;
scheduledRefreshConcurrency?: number;
scheduledRefreshBatchSize?: number;
compilerCacheSize?: number;
maxCompilerCacheKeepAlive?: number;
updateCompilerCacheKeepAlive?: boolean;
Expand Down
4 changes: 3 additions & 1 deletion packages/cubejs-server/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ type RequireOne<T, K extends keyof T> = {
};

export class CubejsServer {
protected ServerCoreClass: typeof CubeCore = CubeCore;

protected readonly core: CubejsServerCore;

protected readonly config: RequireOne<CreateOptions, 'webSockets' | 'http' | 'sqlPort' | 'pgSqlPort'>;
Expand Down Expand Up @@ -77,7 +79,7 @@ export class CubejsServer {
},
};

this.core = CubeCore.create(config, systemOptions);
this.core = this.ServerCoreClass.create(config, systemOptions);
this.server = null;
}

Expand Down
4 changes: 3 additions & 1 deletion packages/cubejs-server/src/server/container.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ function safetyParseSemver(version: string | null) {
export class ServerContainer {
protected isCubeConfigEmpty: boolean = true;

protected ServerClass: typeof CubejsServer = CubejsServer;

public constructor(
protected readonly configuration: { debug: boolean }
) {
Expand Down Expand Up @@ -231,7 +233,7 @@ export class ServerContainer {
configuration.scheduledRefreshTimer = false;
}

const server = new CubejsServer(configuration, {
const server = new this.ServerClass(configuration, {
isCubeConfigEmpty
});

Expand Down
12 changes: 12 additions & 0 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -20758,6 +20758,13 @@ p-limit@^3.0.1, p-limit@^3.0.2, p-limit@^3.1.0:
dependencies:
yocto-queue "^0.1.0"

p-limit@^4.0.0:
version "4.0.0"
resolved "https://registry.yarnpkg.com/p-limit/-/p-limit-4.0.0.tgz#914af6544ed32bfa54670b061cafcbd04984b644"
integrity sha512-5b0R4txpzjPWVw/cXXUResoD4hb6U/x9BH08L7nw+GN1sezDzPdxeRvpc9c433fZhBan/wusjbCsqwqm4EIBIQ==
dependencies:
yocto-queue "^1.0.0"

p-locate@^2.0.0:
version "2.0.0"
resolved "https://registry.yarnpkg.com/p-locate/-/p-locate-2.0.0.tgz#20a0103b222a70c8fd39cc2e580680f3dde5ec43"
Expand Down Expand Up @@ -28810,6 +28817,11 @@ yocto-queue@^0.1.0:
resolved "https://registry.yarnpkg.com/yocto-queue/-/yocto-queue-0.1.0.tgz#0294eb3dee05028d31ee1a5fa2c556a6aaf10a1b"
integrity sha512-rVksvsnNCdJ/ohGc6xgPwyN8eheCxsiLM8mxuE/t/mOVqJewPuO1miLpTHQiRgTKCLexL4MeAFVagts7HmNZ2Q==

yocto-queue@^1.0.0:
version "1.0.0"
resolved "https://registry.yarnpkg.com/yocto-queue/-/yocto-queue-1.0.0.tgz#7f816433fb2cbc511ec8bf7d263c3b58a1a3c251"
integrity sha512-9bnSc/HEW2uRy67wc+T8UwauLuPJVn28jb+GtJY16iiKWyvmYJRXVT4UamsAEGQfPohgr2q4Tq0sQbQlxTfi1g==

yorkie@^2.0.0:
version "2.0.0"
resolved "https://registry.yarnpkg.com/yorkie/-/yorkie-2.0.0.tgz#92411912d435214e12c51c2ae1093e54b6bb83d9"
Expand Down

0 comments on commit 2e5e138

Please sign in to comment.