Skip to content

Commit

Permalink
feat: More hooks for overriding CubejsServerCore behavior
Browse files Browse the repository at this point in the history
  • Loading branch information
bsod90 committed Jan 3, 2023
1 parent 66a6257 commit 1566b13
Show file tree
Hide file tree
Showing 11 changed files with 157 additions and 86 deletions.
6 changes: 6 additions & 0 deletions packages/cubejs-api-gateway/src/gateway.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import {
import {
CheckAuthMiddlewareFn,
RequestLoggerMiddlewareFn,
ContextRejectionMiddlewareFn,
} from './interfaces';
import { getRequestIdFromRequest, requestParser } from './requestParser';
import { UserError } from './UserError';
Expand Down Expand Up @@ -126,6 +127,8 @@ class ApiGateway {
protected readonly requestLoggerMiddleware: RequestLoggerMiddlewareFn;

protected readonly securityContextExtractor: SecurityContextExtractorFn;

protected readonly contextRejectionMiddleware: ContextRejectionMiddlewareFn;

protected readonly releaseListeners: (() => any)[] = [];

Expand Down Expand Up @@ -158,12 +161,14 @@ class ApiGateway {
: this.checkAuth;
this.securityContextExtractor = this.createSecurityContextExtractor(options.jwt);
this.requestLoggerMiddleware = options.requestLoggerMiddleware || this.requestLogger;
this.contextRejectionMiddleware = options.contextRejectionMiddleware || (async (req, res, next) => { next(); });
}

public initApp(app: ExpressApplication) {
const userMiddlewares: RequestHandler[] = [
this.checkAuthMiddleware,
this.requestContextMiddleware,
this.contextRejectionMiddleware,
this.logNetworkUsage,
this.requestLoggerMiddleware
];
Expand Down Expand Up @@ -279,6 +284,7 @@ class ApiGateway {
const systemMiddlewares: RequestHandler[] = [
this.checkAuthSystemMiddleware,
this.requestContextMiddleware,
this.contextRejectionMiddleware,
this.requestLoggerMiddleware
];

Expand Down
10 changes: 10 additions & 0 deletions packages/cubejs-api-gateway/src/interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,16 @@ export type CheckAuthMiddlewareFn =
next: ExpressNextFunction,
) => void;

/**
* Context rejection middleware.
*/
export type ContextRejectionMiddlewareFn =
(
req: Request,
res: ExpressResponse,
next: ExpressNextFunction,
) => void;

/**
* Logger middleware.
* @deprecated
Expand Down
2 changes: 2 additions & 0 deletions packages/cubejs-api-gateway/src/types/gateway.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import {
import {
CheckAuthMiddlewareFn,
RequestLoggerMiddlewareFn,
ContextRejectionMiddlewareFn,
} from '../interfaces';

type UserBackgroundContext = {
Expand Down Expand Up @@ -59,6 +60,7 @@ interface ApiGatewayOptions {
enforceSecurityChecks?: boolean;
playgroundAuthSecret?: string;
serverCoreVersion?: string;
contextRejectionMiddleware?: ContextRejectionMiddlewareFn;
checkAuth?: CheckAuthFn;
/**
* @deprecated Use checkAuth property instead.
Expand Down
1 change: 1 addition & 0 deletions packages/cubejs-server-core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
"lru-cache": "^5.1.1",
"moment": "^2.29.1",
"node-fetch": "^2.6.0",
"p-limit": "^4.0.0",
"promise-timeout": "^1.3.0",
"ramda": "^0.27.0",
"semver": "^6.3.0",
Expand Down
2 changes: 2 additions & 0 deletions packages/cubejs-server-core/src/core/OptsHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,8 @@ export class OptsHandler {
dashboardAppPort: 3000,
scheduledRefreshConcurrency:
parseInt(process.env.CUBEJS_SCHEDULED_REFRESH_CONCURRENCY, 10),
scheduledRefreshBatchSize:
parseInt(process.env.CUBEJS_SCHEDULED_REFRESH_BATCH_SIZE, 10) || 1,
preAggregationsSchema:
getEnv('preAggregationsSchema') ||
(this.isDevMode()
Expand Down
1 change: 1 addition & 0 deletions packages/cubejs-server-core/src/core/optionsValidate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ const schemaOptions = Joi.object().keys({
scheduledRefreshTimeZones: Joi.array().items(Joi.string()),
scheduledRefreshContexts: Joi.func(),
scheduledRefreshConcurrency: Joi.number().min(1).integer(),
scheduledRefreshBatchSize: Joi.number().min(1).integer(),
// Compiler cache
compilerCacheSize: Joi.number().min(0).integer(),
updateCompilerCacheKeepAlive: Joi.boolean(),
Expand Down
84 changes: 69 additions & 15 deletions packages/cubejs-server-core/src/core/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import crypto from 'crypto';
import fs from 'fs-extra';
import LRUCache from 'lru-cache';
import isDocker from 'is-docker';
import pLimit from 'p-limit';

import { ApiGateway, UserBackgroundContext } from '@cubejs-backend/api-gateway';
import {
Expand All @@ -11,7 +12,7 @@ import {
getEnv, assertDataSource, getRealType, internalExceptions, track,
} from '@cubejs-backend/shared';

import type { Application as ExpressApplication } from 'express';
import type { Application as ExpressApplication, NextFunction } from 'express';

import { BaseDriver, DriverFactoryByDataSource } from '@cubejs-backend/query-orchestrator';
import { FileRepository, SchemaFileRepository } from './FileRepository';
Expand Down Expand Up @@ -59,6 +60,22 @@ function wrapToFnIfNeeded<T, R>(possibleFn: T | ((a: R) => T)): (a: R) => T {
return () => possibleFn;
}

export type ContextAcceptanceResult = {
accepted: boolean;
rejectHeaders?: { [key: string]: string };
rejectStatusCode?: number;
};

export interface ContextAcceptor {
shouldAccept(context: RequestContext): ContextAcceptanceResult;
}

class AcceptAllAcceptor {
public shouldAccept(_context: RequestContext): ContextAcceptanceResult {
return { accepted: true };
}
}

export class CubejsServerCore {
/**
* Returns core version based on package.json.
Expand Down Expand Up @@ -134,14 +151,16 @@ export class CubejsServerCore {

protected apiGatewayInstance: ApiGateway | null = null;

protected contextAcceptor: ContextAcceptor;

public readonly event: (name: string, props?: object) => Promise<void>;

public projectFingerprint: string | null = null;

public anonymousId: string | null = null;

public coreServerVersion: string | null = null;

/**
* Class constructor.
*/
Expand Down Expand Up @@ -179,6 +198,8 @@ export class CubejsServerCore {
this.standalone = false;
}

this.contextAcceptor = this.createContextAcceptor();

if (this.options.contextToDataSourceId) {
throw new Error('contextToDataSourceId has been deprecated and removed. Use contextToOrchestratorId instead.');
}
Expand Down Expand Up @@ -309,6 +330,10 @@ export class CubejsServerCore {
}
}

protected createContextAcceptor(): ContextAcceptor {
return new AcceptAllAcceptor();
}

/**
* Determines whether current instance is ready to process queries.
*/
Expand Down Expand Up @@ -419,7 +444,7 @@ export class CubejsServerCore {
return this.apiGatewayInstance;
}

return this.apiGatewayInstance = new ApiGateway(
return (this.apiGatewayInstance = new ApiGateway(
this.options.apiSecret,
this.getCompilerApi.bind(this),
this.getOrchestratorApi.bind(this),
Expand All @@ -429,17 +454,33 @@ export class CubejsServerCore {
dataSourceStorage: this.orchestratorStorage,
basePath: this.options.basePath,
checkAuthMiddleware: this.options.checkAuthMiddleware,
contextRejectionMiddleware: this.contextRejectionMiddleware.bind(this),
checkAuth: this.options.checkAuth,
queryRewrite: this.options.queryRewrite || this.options.queryTransformer,
queryRewrite:
this.options.queryRewrite || this.options.queryTransformer,
extendContext: this.options.extendContext,
playgroundAuthSecret: getEnv('playgroundAuthSecret'),
playgroundAuthSecret: getEnv("playgroundAuthSecret"),
jwt: this.options.jwt,
refreshScheduler: () => new RefreshScheduler(this),
scheduledRefreshContexts: this.options.scheduledRefreshContexts,
scheduledRefreshTimeZones: this.options.scheduledRefreshTimeZones,
serverCoreVersion: this.coreServerVersion
serverCoreVersion: this.coreServerVersion,
}
);
));
}

protected async contextRejectionMiddleware(req, res, next) {
if (!this.standalone) {
const result = this.contextAcceptor.shouldAccept(req.context);
if (!result.accepted) {
res.writeHead(result.rejectStatusCode!, result.rejectHeaders!);
res.send();
return;
}
}
if (next) {
next();
}
}

public getCompilerApi(context: RequestContext) {
Expand Down Expand Up @@ -648,22 +689,35 @@ export class CubejsServerCore {
* @internal Please dont use this method directly, use refreshTimer
*/
public handleScheduledRefreshInterval = async (options) => {
const contexts = await this.options.scheduledRefreshContexts();
const contexts = (await this.options.scheduledRefreshContexts()).filter(
(context) => context === null || this.contextAcceptor.shouldAccept(
this.migrateBackgroundContext(context)!
).accepted
);
if (contexts.length < 1) {
this.logger('Refresh Scheduler Error', {
error: 'At least one context should be returned by scheduledRefreshContexts'
});
}

return Promise.all(contexts.map(async context => {
const queryingOptions: any = { ...options, concurrency: this.options.scheduledRefreshConcurrency };
const batchLimit = pLimit(this.options.scheduledRefreshBatchSize);
return Promise.all(
contexts
.map((context) => async () => {
const queryingOptions: any = {
...options,
concurrency: this.options.scheduledRefreshConcurrency,
};

if (this.options.scheduledRefreshTimeZones) {
queryingOptions.timezones = this.options.scheduledRefreshTimeZones;
}
if (this.options.scheduledRefreshTimeZones) {
queryingOptions.timezones = this.options.scheduledRefreshTimeZones;
}

return this.runScheduledRefresh(context, queryingOptions);
}));
return this.runScheduledRefresh(context, queryingOptions);
})
// Limit the number of refresh contexts we process per iteration
.map(batchLimit)
);
};

protected getRefreshScheduler() {
Expand Down
1 change: 1 addition & 0 deletions packages/cubejs-server-core/src/core/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ export interface CreateOptions {
scheduledRefreshTimeZones?: string[];
scheduledRefreshContexts?: () => Promise<UserBackgroundContext[]>;
scheduledRefreshConcurrency?: number;
scheduledRefreshBatchSize?: number;
compilerCacheSize?: number;
maxCompilerCacheKeepAlive?: number;
updateCompilerCacheKeepAlive?: boolean;
Expand Down
Loading

0 comments on commit 1566b13

Please sign in to comment.