Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: POC code for multi-tenant autoscaling #2

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions packages/cubejs-api-gateway/src/gateway.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import {
import {
CheckAuthMiddlewareFn,
RequestLoggerMiddlewareFn,
ContextRejectionMiddlewareFn,
} from './interfaces';
import { getRequestIdFromRequest, requestParser } from './requestParser';
import { UserError } from './UserError';
Expand Down Expand Up @@ -126,6 +127,8 @@ class ApiGateway {
protected readonly requestLoggerMiddleware: RequestLoggerMiddlewareFn;

protected readonly securityContextExtractor: SecurityContextExtractorFn;

protected readonly contextRejectionMiddleware: ContextRejectionMiddlewareFn;

protected readonly releaseListeners: (() => any)[] = [];

Expand Down Expand Up @@ -158,12 +161,14 @@ class ApiGateway {
: this.checkAuth;
this.securityContextExtractor = this.createSecurityContextExtractor(options.jwt);
this.requestLoggerMiddleware = options.requestLoggerMiddleware || this.requestLogger;
this.contextRejectionMiddleware = options.contextRejectionMiddleware || (async (req, res, next) => { next(); });
}

public initApp(app: ExpressApplication) {
const userMiddlewares: RequestHandler[] = [
this.checkAuthMiddleware,
this.requestContextMiddleware,
this.contextRejectionMiddleware,
this.logNetworkUsage,
this.requestLoggerMiddleware
];
Expand Down Expand Up @@ -279,6 +284,7 @@ class ApiGateway {
const systemMiddlewares: RequestHandler[] = [
this.checkAuthSystemMiddleware,
this.requestContextMiddleware,
this.contextRejectionMiddleware,
this.requestLoggerMiddleware
];

Expand Down
10 changes: 10 additions & 0 deletions packages/cubejs-api-gateway/src/interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,16 @@ export type CheckAuthMiddlewareFn =
next: ExpressNextFunction,
) => void;

/**
* Context rejection middleware.
*/
export type ContextRejectionMiddlewareFn =
(
req: Request,
res: ExpressResponse,
next: ExpressNextFunction,
) => void;

/**
* Logger middleware.
* @deprecated
Expand Down
2 changes: 2 additions & 0 deletions packages/cubejs-api-gateway/src/types/gateway.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import {
import {
CheckAuthMiddlewareFn,
RequestLoggerMiddlewareFn,
ContextRejectionMiddlewareFn,
} from '../interfaces';

type UserBackgroundContext = {
Expand Down Expand Up @@ -59,6 +60,7 @@ interface ApiGatewayOptions {
enforceSecurityChecks?: boolean;
playgroundAuthSecret?: string;
serverCoreVersion?: string;
contextRejectionMiddleware?: ContextRejectionMiddlewareFn;
checkAuth?: CheckAuthFn;
/**
* @deprecated Use checkAuth property instead.
Expand Down
1 change: 1 addition & 0 deletions packages/cubejs-server-core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
"lru-cache": "^5.1.1",
"moment": "^2.29.1",
"node-fetch": "^2.6.0",
"p-limit": "^4.0.0",
"promise-timeout": "^1.3.0",
"ramda": "^0.27.0",
"semver": "^6.3.0",
Expand Down
2 changes: 2 additions & 0 deletions packages/cubejs-server-core/src/core/OptsHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,8 @@ export class OptsHandler {
dashboardAppPort: 3000,
scheduledRefreshConcurrency:
parseInt(process.env.CUBEJS_SCHEDULED_REFRESH_CONCURRENCY, 10),
scheduledRefreshBatchSize:
parseInt(process.env.CUBEJS_SCHEDULED_REFRESH_BATCH_SIZE, 10) || 1,
preAggregationsSchema:
getEnv('preAggregationsSchema') ||
(this.isDevMode()
Expand Down
1 change: 1 addition & 0 deletions packages/cubejs-server-core/src/core/optionsValidate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ const schemaOptions = Joi.object().keys({
scheduledRefreshTimeZones: Joi.array().items(Joi.string()),
scheduledRefreshContexts: Joi.func(),
scheduledRefreshConcurrency: Joi.number().min(1).integer(),
scheduledRefreshBatchSize: Joi.number().min(1).integer(),
// Compiler cache
compilerCacheSize: Joi.number().min(0).integer(),
updateCompilerCacheKeepAlive: Joi.boolean(),
Expand Down
84 changes: 69 additions & 15 deletions packages/cubejs-server-core/src/core/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import crypto from 'crypto';
import fs from 'fs-extra';
import LRUCache from 'lru-cache';
import isDocker from 'is-docker';
import pLimit from 'p-limit';

import { ApiGateway, UserBackgroundContext } from '@cubejs-backend/api-gateway';
import {
Expand All @@ -11,7 +12,7 @@ import {
getEnv, assertDataSource, getRealType, internalExceptions, track,
} from '@cubejs-backend/shared';

import type { Application as ExpressApplication } from 'express';
import type { Application as ExpressApplication, NextFunction } from 'express';

import { BaseDriver, DriverFactoryByDataSource } from '@cubejs-backend/query-orchestrator';
import { FileRepository, SchemaFileRepository } from './FileRepository';
Expand Down Expand Up @@ -59,6 +60,22 @@ function wrapToFnIfNeeded<T, R>(possibleFn: T | ((a: R) => T)): (a: R) => T {
return () => possibleFn;
}

export type ContextAcceptanceResult = {
accepted: boolean;
rejectHeaders?: { [key: string]: string };
rejectStatusCode?: number;
};

export interface ContextAcceptor {
shouldAccept(context: RequestContext): ContextAcceptanceResult;
}

class AcceptAllAcceptor {
public shouldAccept(_context: RequestContext): ContextAcceptanceResult {
return { accepted: true };
}
}

export class CubejsServerCore {
/**
* Returns core version based on package.json.
Expand Down Expand Up @@ -134,14 +151,16 @@ export class CubejsServerCore {

protected apiGatewayInstance: ApiGateway | null = null;

protected contextAcceptor: ContextAcceptor;

public readonly event: (name: string, props?: object) => Promise<void>;

public projectFingerprint: string | null = null;

public anonymousId: string | null = null;

public coreServerVersion: string | null = null;

/**
* Class constructor.
*/
Expand Down Expand Up @@ -179,6 +198,8 @@ export class CubejsServerCore {
this.standalone = false;
}

this.contextAcceptor = this.createContextAcceptor();

if (this.options.contextToDataSourceId) {
throw new Error('contextToDataSourceId has been deprecated and removed. Use contextToOrchestratorId instead.');
}
Expand Down Expand Up @@ -309,6 +330,10 @@ export class CubejsServerCore {
}
}

protected createContextAcceptor(): ContextAcceptor {
return new AcceptAllAcceptor();
}

/**
* Determines whether current instance is ready to process queries.
*/
Expand Down Expand Up @@ -419,7 +444,7 @@ export class CubejsServerCore {
return this.apiGatewayInstance;
}

return this.apiGatewayInstance = new ApiGateway(
return (this.apiGatewayInstance = new ApiGateway(
this.options.apiSecret,
this.getCompilerApi.bind(this),
this.getOrchestratorApi.bind(this),
Expand All @@ -429,17 +454,33 @@ export class CubejsServerCore {
dataSourceStorage: this.orchestratorStorage,
basePath: this.options.basePath,
checkAuthMiddleware: this.options.checkAuthMiddleware,
contextRejectionMiddleware: this.contextRejectionMiddleware.bind(this),
checkAuth: this.options.checkAuth,
queryRewrite: this.options.queryRewrite || this.options.queryTransformer,
queryRewrite:
this.options.queryRewrite || this.options.queryTransformer,
extendContext: this.options.extendContext,
playgroundAuthSecret: getEnv('playgroundAuthSecret'),
playgroundAuthSecret: getEnv("playgroundAuthSecret"),
jwt: this.options.jwt,
refreshScheduler: () => new RefreshScheduler(this),
scheduledRefreshContexts: this.options.scheduledRefreshContexts,
scheduledRefreshTimeZones: this.options.scheduledRefreshTimeZones,
serverCoreVersion: this.coreServerVersion
serverCoreVersion: this.coreServerVersion,
}
);
));
}

protected async contextRejectionMiddleware(req, res, next) {
if (!this.standalone) {
const result = this.contextAcceptor.shouldAccept(req.context);
if (!result.accepted) {
res.writeHead(result.rejectStatusCode!, result.rejectHeaders!);
res.send();
return;
}
}
if (next) {
next();
}
}

public getCompilerApi(context: RequestContext) {
Expand Down Expand Up @@ -648,22 +689,35 @@ export class CubejsServerCore {
* @internal Please dont use this method directly, use refreshTimer
*/
public handleScheduledRefreshInterval = async (options) => {
const contexts = await this.options.scheduledRefreshContexts();
const contexts = (await this.options.scheduledRefreshContexts()).filter(
(context) => context === null || this.contextAcceptor.shouldAccept(
this.migrateBackgroundContext(context)!
).accepted
);
if (contexts.length < 1) {
this.logger('Refresh Scheduler Error', {
error: 'At least one context should be returned by scheduledRefreshContexts'
});
}

return Promise.all(contexts.map(async context => {
const queryingOptions: any = { ...options, concurrency: this.options.scheduledRefreshConcurrency };
const batchLimit = pLimit(this.options.scheduledRefreshBatchSize);
return Promise.all(
contexts
.map((context) => async () => {
const queryingOptions: any = {
...options,
concurrency: this.options.scheduledRefreshConcurrency,
};

if (this.options.scheduledRefreshTimeZones) {
queryingOptions.timezones = this.options.scheduledRefreshTimeZones;
}
if (this.options.scheduledRefreshTimeZones) {
queryingOptions.timezones = this.options.scheduledRefreshTimeZones;
}

return this.runScheduledRefresh(context, queryingOptions);
}));
return this.runScheduledRefresh(context, queryingOptions);
})
// Limit the number of refresh contexts we process per iteration
.map(batchLimit)
);
};

protected getRefreshScheduler() {
Expand Down
1 change: 1 addition & 0 deletions packages/cubejs-server-core/src/core/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ export interface CreateOptions {
scheduledRefreshTimeZones?: string[];
scheduledRefreshContexts?: () => Promise<UserBackgroundContext[]>;
scheduledRefreshConcurrency?: number;
scheduledRefreshBatchSize?: number;
compilerCacheSize?: number;
maxCompilerCacheKeepAlive?: number;
updateCompilerCacheKeepAlive?: boolean;
Expand Down
Loading