diff --git a/packages/cubejs-api-gateway/src/gateway.ts b/packages/cubejs-api-gateway/src/gateway.ts index 7cbaa25efb94e..1d1517f19ab4d 100644 --- a/packages/cubejs-api-gateway/src/gateway.ts +++ b/packages/cubejs-api-gateway/src/gateway.ts @@ -54,6 +54,7 @@ import { import { CheckAuthMiddlewareFn, RequestLoggerMiddlewareFn, + ContextRejectionMiddlewareFn, } from './interfaces'; import { getRequestIdFromRequest, requestParser } from './requestParser'; import { UserError } from './UserError'; @@ -126,6 +127,8 @@ class ApiGateway { protected readonly requestLoggerMiddleware: RequestLoggerMiddlewareFn; protected readonly securityContextExtractor: SecurityContextExtractorFn; + + protected readonly contextRejectionMiddleware: ContextRejectionMiddlewareFn; protected readonly releaseListeners: (() => any)[] = []; @@ -158,12 +161,14 @@ class ApiGateway { : this.checkAuth; this.securityContextExtractor = this.createSecurityContextExtractor(options.jwt); this.requestLoggerMiddleware = options.requestLoggerMiddleware || this.requestLogger; + this.contextRejectionMiddleware = options.contextRejectionMiddleware || (async (req, res, next) => { next(); }); } public initApp(app: ExpressApplication) { const userMiddlewares: RequestHandler[] = [ this.checkAuthMiddleware, this.requestContextMiddleware, + this.contextRejectionMiddleware, this.logNetworkUsage, this.requestLoggerMiddleware ]; @@ -279,6 +284,7 @@ class ApiGateway { const systemMiddlewares: RequestHandler[] = [ this.checkAuthSystemMiddleware, this.requestContextMiddleware, + this.contextRejectionMiddleware, this.requestLoggerMiddleware ]; diff --git a/packages/cubejs-api-gateway/src/interfaces.ts b/packages/cubejs-api-gateway/src/interfaces.ts index 8214963d49c07..ca31e71df5a52 100644 --- a/packages/cubejs-api-gateway/src/interfaces.ts +++ b/packages/cubejs-api-gateway/src/interfaces.ts @@ -75,6 +75,16 @@ export type CheckAuthMiddlewareFn = next: ExpressNextFunction, ) => void; +/** + * Context rejection middleware. + */ +export type ContextRejectionMiddlewareFn = + ( + req: Request, + res: ExpressResponse, + next: ExpressNextFunction, + ) => void; + /** * Logger middleware. * @deprecated diff --git a/packages/cubejs-api-gateway/src/types/gateway.ts b/packages/cubejs-api-gateway/src/types/gateway.ts index bc399cd05f683..256c68c93ac88 100644 --- a/packages/cubejs-api-gateway/src/types/gateway.ts +++ b/packages/cubejs-api-gateway/src/types/gateway.ts @@ -16,6 +16,7 @@ import { import { CheckAuthMiddlewareFn, RequestLoggerMiddlewareFn, + ContextRejectionMiddlewareFn, } from '../interfaces'; type UserBackgroundContext = { @@ -59,6 +60,7 @@ interface ApiGatewayOptions { enforceSecurityChecks?: boolean; playgroundAuthSecret?: string; serverCoreVersion?: string; + contextRejectionMiddleware?: ContextRejectionMiddlewareFn; checkAuth?: CheckAuthFn; /** * @deprecated Use checkAuth property instead. diff --git a/packages/cubejs-server-core/package.json b/packages/cubejs-server-core/package.json index 15182b5fd5647..c96e04c329841 100644 --- a/packages/cubejs-server-core/package.json +++ b/packages/cubejs-server-core/package.json @@ -48,6 +48,7 @@ "lru-cache": "^5.1.1", "moment": "^2.29.1", "node-fetch": "^2.6.0", + "p-limit": "^4.0.0", "promise-timeout": "^1.3.0", "ramda": "^0.27.0", "semver": "^6.3.0", diff --git a/packages/cubejs-server-core/src/core/OptsHandler.ts b/packages/cubejs-server-core/src/core/OptsHandler.ts index 9bdb8f4e661eb..bc8e39db49069 100644 --- a/packages/cubejs-server-core/src/core/OptsHandler.ts +++ b/packages/cubejs-server-core/src/core/OptsHandler.ts @@ -466,6 +466,8 @@ export class OptsHandler { dashboardAppPort: 3000, scheduledRefreshConcurrency: parseInt(process.env.CUBEJS_SCHEDULED_REFRESH_CONCURRENCY, 10), + scheduledRefreshBatchSize: + parseInt(process.env.CUBEJS_SCHEDULED_REFRESH_BATCH_SIZE, 10) || 1, preAggregationsSchema: getEnv('preAggregationsSchema') || (this.isDevMode() diff --git a/packages/cubejs-server-core/src/core/optionsValidate.ts b/packages/cubejs-server-core/src/core/optionsValidate.ts index 57f9fa88031ca..cd1561f4f6944 100644 --- a/packages/cubejs-server-core/src/core/optionsValidate.ts +++ b/packages/cubejs-server-core/src/core/optionsValidate.ts @@ -84,6 +84,7 @@ const schemaOptions = Joi.object().keys({ scheduledRefreshTimeZones: Joi.array().items(Joi.string()), scheduledRefreshContexts: Joi.func(), scheduledRefreshConcurrency: Joi.number().min(1).integer(), + scheduledRefreshBatchSize: Joi.number().min(1).integer(), // Compiler cache compilerCacheSize: Joi.number().min(0).integer(), updateCompilerCacheKeepAlive: Joi.boolean(), diff --git a/packages/cubejs-server-core/src/core/server.ts b/packages/cubejs-server-core/src/core/server.ts index e8d199e7bd3ea..2ed0caff46303 100644 --- a/packages/cubejs-server-core/src/core/server.ts +++ b/packages/cubejs-server-core/src/core/server.ts @@ -3,6 +3,7 @@ import crypto from 'crypto'; import fs from 'fs-extra'; import LRUCache from 'lru-cache'; import isDocker from 'is-docker'; +import pLimit from 'p-limit'; import { ApiGateway, UserBackgroundContext } from '@cubejs-backend/api-gateway'; import { @@ -11,7 +12,7 @@ import { getEnv, assertDataSource, getRealType, internalExceptions, track, } from '@cubejs-backend/shared'; -import type { Application as ExpressApplication } from 'express'; +import type { Application as ExpressApplication, NextFunction } from 'express'; import { BaseDriver, DriverFactoryByDataSource } from '@cubejs-backend/query-orchestrator'; import { FileRepository, SchemaFileRepository } from './FileRepository'; @@ -59,6 +60,22 @@ function wrapToFnIfNeeded(possibleFn: T | ((a: R) => T)): (a: R) => T { return () => possibleFn; } +export type ContextAcceptanceResult = { + accepted: boolean; + rejectHeaders?: { [key: string]: string }; + rejectStatusCode?: number; +}; + +export interface ContextAcceptor { + shouldAccept(context: RequestContext): ContextAcceptanceResult; +} + +class AcceptAllAcceptor { + public shouldAccept(_context: RequestContext): ContextAcceptanceResult { + return { accepted: true }; + } +} + export class CubejsServerCore { /** * Returns core version based on package.json. @@ -134,6 +151,8 @@ export class CubejsServerCore { protected apiGatewayInstance: ApiGateway | null = null; + protected contextAcceptor: ContextAcceptor; + public readonly event: (name: string, props?: object) => Promise; public projectFingerprint: string | null = null; @@ -141,7 +160,7 @@ export class CubejsServerCore { public anonymousId: string | null = null; public coreServerVersion: string | null = null; - + /** * Class constructor. */ @@ -179,6 +198,8 @@ export class CubejsServerCore { this.standalone = false; } + this.contextAcceptor = new AcceptAllAcceptor(); + if (this.options.contextToDataSourceId) { throw new Error('contextToDataSourceId has been deprecated and removed. Use contextToOrchestratorId instead.'); } @@ -419,7 +440,7 @@ export class CubejsServerCore { return this.apiGatewayInstance; } - return this.apiGatewayInstance = new ApiGateway( + return (this.apiGatewayInstance = new ApiGateway( this.options.apiSecret, this.getCompilerApi.bind(this), this.getOrchestratorApi.bind(this), @@ -429,17 +450,33 @@ export class CubejsServerCore { dataSourceStorage: this.orchestratorStorage, basePath: this.options.basePath, checkAuthMiddleware: this.options.checkAuthMiddleware, + contextRejectionMiddleware: this.contextRejectionMiddleware.bind(this), checkAuth: this.options.checkAuth, - queryRewrite: this.options.queryRewrite || this.options.queryTransformer, + queryRewrite: + this.options.queryRewrite || this.options.queryTransformer, extendContext: this.options.extendContext, - playgroundAuthSecret: getEnv('playgroundAuthSecret'), + playgroundAuthSecret: getEnv("playgroundAuthSecret"), jwt: this.options.jwt, refreshScheduler: () => new RefreshScheduler(this), scheduledRefreshContexts: this.options.scheduledRefreshContexts, scheduledRefreshTimeZones: this.options.scheduledRefreshTimeZones, - serverCoreVersion: this.coreServerVersion + serverCoreVersion: this.coreServerVersion, } - ); + )); + } + + protected async contextRejectionMiddleware(req, res, next) { + if (!this.standalone) { + const result = this.contextAcceptor.shouldAccept(req.context); + if (!result.accepted) { + res.writeHead(result.rejectStatusCode!, result.rejectHeaders!); + res.send(); + return; + } + } + if (next) { + next(); + } } public getCompilerApi(context: RequestContext) { @@ -648,22 +685,37 @@ export class CubejsServerCore { * @internal Please dont use this method directly, use refreshTimer */ public handleScheduledRefreshInterval = async (options) => { - const contexts = await this.options.scheduledRefreshContexts(); + const contexts = (await this.options.scheduledRefreshContexts()).filter( + (context) => context === null || this.contextAcceptor.shouldAccept( + this.migrateBackgroundContext(context)! + ).accepted + ); + console.log(`Selected ${contexts.length} contexts for refresh`); if (contexts.length < 1) { this.logger('Refresh Scheduler Error', { error: 'At least one context should be returned by scheduledRefreshContexts' }); } - return Promise.all(contexts.map(async context => { - const queryingOptions: any = { ...options, concurrency: this.options.scheduledRefreshConcurrency }; + const batchLimit = pLimit(this.options.scheduledRefreshBatchSize); + console.log(`New batch with a size of ${this.options.scheduledRefreshBatchSize}`); + return Promise.all( + contexts + .map((context) => async () => { + const queryingOptions: any = { + ...options, + concurrency: this.options.scheduledRefreshConcurrency, + }; - if (this.options.scheduledRefreshTimeZones) { - queryingOptions.timezones = this.options.scheduledRefreshTimeZones; - } + if (this.options.scheduledRefreshTimeZones) { + queryingOptions.timezones = this.options.scheduledRefreshTimeZones; + } - return this.runScheduledRefresh(context, queryingOptions); - })); + return this.runScheduledRefresh(context, queryingOptions); + }) + // Limit the number of refresh contexts we process per iteration + .map(batchLimit) + ); }; protected getRefreshScheduler() { diff --git a/packages/cubejs-server-core/src/core/types.ts b/packages/cubejs-server-core/src/core/types.ts index e7cfb0394bec0..5af0f27f2dd82 100644 --- a/packages/cubejs-server-core/src/core/types.ts +++ b/packages/cubejs-server-core/src/core/types.ts @@ -187,6 +187,7 @@ export interface CreateOptions { scheduledRefreshTimeZones?: string[]; scheduledRefreshContexts?: () => Promise; scheduledRefreshConcurrency?: number; + scheduledRefreshBatchSize?: number; compilerCacheSize?: number; maxCompilerCacheKeepAlive?: number; updateCompilerCacheKeepAlive?: boolean; diff --git a/packages/cubejs-server/src/server.ts b/packages/cubejs-server/src/server.ts index cb40d3b6fb5a7..0193ada59cf2d 100644 --- a/packages/cubejs-server/src/server.ts +++ b/packages/cubejs-server/src/server.ts @@ -49,6 +49,8 @@ type RequireOne = { }; export class CubejsServer { + protected ServerCoreClass: typeof CubeCore = CubeCore; + protected readonly core: CubejsServerCore; protected readonly config: RequireOne; @@ -77,7 +79,7 @@ export class CubejsServer { }, }; - this.core = CubeCore.create(config, systemOptions); + this.core = this.ServerCoreClass.create(config, systemOptions); this.server = null; } diff --git a/packages/cubejs-server/src/server/container.ts b/packages/cubejs-server/src/server/container.ts index 9f39905018cdb..8503e47c79269 100644 --- a/packages/cubejs-server/src/server/container.ts +++ b/packages/cubejs-server/src/server/container.ts @@ -34,6 +34,8 @@ function safetyParseSemver(version: string | null) { export class ServerContainer { protected isCubeConfigEmpty: boolean = true; + protected ServerClass: typeof CubejsServer = CubejsServer; + public constructor( protected readonly configuration: { debug: boolean } ) { @@ -231,7 +233,7 @@ export class ServerContainer { configuration.scheduledRefreshTimer = false; } - const server = new CubejsServer(configuration, { + const server = new this.ServerClass(configuration, { isCubeConfigEmpty }); diff --git a/yarn.lock b/yarn.lock index 1ac3534aae6e1..5dc73f55f69d5 100644 --- a/yarn.lock +++ b/yarn.lock @@ -20758,6 +20758,13 @@ p-limit@^3.0.1, p-limit@^3.0.2, p-limit@^3.1.0: dependencies: yocto-queue "^0.1.0" +p-limit@^4.0.0: + version "4.0.0" + resolved "https://registry.yarnpkg.com/p-limit/-/p-limit-4.0.0.tgz#914af6544ed32bfa54670b061cafcbd04984b644" + integrity sha512-5b0R4txpzjPWVw/cXXUResoD4hb6U/x9BH08L7nw+GN1sezDzPdxeRvpc9c433fZhBan/wusjbCsqwqm4EIBIQ== + dependencies: + yocto-queue "^1.0.0" + p-locate@^2.0.0: version "2.0.0" resolved "https://registry.yarnpkg.com/p-locate/-/p-locate-2.0.0.tgz#20a0103b222a70c8fd39cc2e580680f3dde5ec43" @@ -28810,6 +28817,11 @@ yocto-queue@^0.1.0: resolved "https://registry.yarnpkg.com/yocto-queue/-/yocto-queue-0.1.0.tgz#0294eb3dee05028d31ee1a5fa2c556a6aaf10a1b" integrity sha512-rVksvsnNCdJ/ohGc6xgPwyN8eheCxsiLM8mxuE/t/mOVqJewPuO1miLpTHQiRgTKCLexL4MeAFVagts7HmNZ2Q== +yocto-queue@^1.0.0: + version "1.0.0" + resolved "https://registry.yarnpkg.com/yocto-queue/-/yocto-queue-1.0.0.tgz#7f816433fb2cbc511ec8bf7d263c3b58a1a3c251" + integrity sha512-9bnSc/HEW2uRy67wc+T8UwauLuPJVn28jb+GtJY16iiKWyvmYJRXVT4UamsAEGQfPohgr2q4Tq0sQbQlxTfi1g== + yorkie@^2.0.0: version "2.0.0" resolved "https://registry.yarnpkg.com/yorkie/-/yorkie-2.0.0.tgz#92411912d435214e12c51c2ae1093e54b6bb83d9"