Skip to content

Commit

Permalink
feat(core): add pieceConfig mechanism
Browse files Browse the repository at this point in the history
  • Loading branch information
AdamSelene committed Oct 15, 2024
1 parent 278c6d0 commit dd6107d
Show file tree
Hide file tree
Showing 7 changed files with 240 additions and 76 deletions.
152 changes: 118 additions & 34 deletions packages/engine/src/lib/handler/piece-executor.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,33 @@
import { URL } from 'url'
import { ActionContext, ConnectionsManager, PauseHook, PauseHookParams, PiecePropertyMap, StaticPropsValue, StopHook, StopHookParams, TagsManager } from '@activepieces/pieces-framework'
import { ActionType, assertNotNullOrUndefined, AUTHENTICATION_PROPERTY_NAME, ExecutionType, FlowRunStatus, GenericStepOutput, isNil, PauseType, PieceAction, StepOutputStatus } from '@activepieces/shared'
import {
ActionContext,
ConnectionsManager,
PauseHook,
PauseHookParams,
PiecePropertyMap,
StaticPropsValue,
StopHook,
StopHookParams,
TagsManager,
} from '@activepieces/pieces-framework'
import {
ActionType,
assertNotNullOrUndefined,
AUTHENTICATION_PROPERTY_NAME,
ExecutionType,
FlowRunStatus,
GenericStepOutput,
isNil,
PauseType,
PieceAction,
StepOutputStatus,
} from '@activepieces/shared'
import dayjs from 'dayjs'
import { continueIfFailureHandler, handleExecutionError, runWithExponentialBackoff } from '../helper/error-handling'
import {
continueIfFailureHandler,
handleExecutionError,
runWithExponentialBackoff,
} from '../helper/error-handling'
import { PausedFlowTimeoutError } from '../helper/execution-errors'
import { pieceLoader } from '../helper/piece-loader'
import { createConnectionService } from '../services/connections.service'
Expand All @@ -12,25 +37,39 @@ import { createContextStore } from '../services/storage.service'
import { ActionHandler, BaseExecutor } from './base-executor'
import { ExecutionVerdict } from './context/flow-execution-context'

type HookResponse = { stopResponse: StopHookParams | undefined, pauseResponse: PauseHookParams | undefined, tags: string[], stopped: boolean, paused: boolean }
type HookResponse = {
stopResponse: StopHookParams | undefined
pauseResponse: PauseHookParams | undefined
tags: string[]
stopped: boolean
paused: boolean
}

const AP_PAUSED_FLOW_TIMEOUT_DAYS = Number(process.env.AP_PAUSED_FLOW_TIMEOUT_DAYS)
const AP_PAUSED_FLOW_TIMEOUT_DAYS = Number(
process.env.AP_PAUSED_FLOW_TIMEOUT_DAYS,
)
const AP_PIECES_CONFIG = JSON.parse(process.env.AP_PIECES_CONFIG || '{}')

export const pieceExecutor: BaseExecutor<PieceAction> = {
async handle({
action,
executionState,
constants,
}) {
async handle({ action, executionState, constants }) {
if (executionState.isCompleted({ stepName: action.name })) {
return executionState
}
const resultExecution = await runWithExponentialBackoff(executionState, action, constants, executeAction)
const resultExecution = await runWithExponentialBackoff(
executionState,
action,
constants,
executeAction,
)
return continueIfFailureHandler(resultExecution, action, constants)
},
}

const executeAction: ActionHandler<PieceAction> = async ({ action, executionState, constants }) => {
const executeAction: ActionHandler<PieceAction> = async ({
action,
executionState,
constants,
}) => {
const stepOutput = GenericStepOutput.create({
input: {},
type: ActionType.PIECE,
Expand All @@ -46,14 +85,23 @@ const executeAction: ActionHandler<PieceAction> = async ({ action, executionStat
piecesSource: constants.piecesSource,
})

const { resolvedInput, censoredInput } = await constants.variableService.resolve<StaticPropsValue<PiecePropertyMap>>({
unresolvedInput: action.settings.input,
executionState,
})
const { resolvedInput, censoredInput } =
await constants.variableService.resolve<
StaticPropsValue<PiecePropertyMap>
>({
unresolvedInput: action.settings.input,
executionState,
})

stepOutput.input = censoredInput

const { processedInput, errors } = await constants.variableService.applyProcessorsAndValidators(resolvedInput, pieceAction.props, piece.auth, pieceAction.requireAuth)
const { processedInput, errors } =
await constants.variableService.applyProcessorsAndValidators(
resolvedInput,
pieceAction.props,
piece.auth,
pieceAction.requireAuth,
)
if (Object.keys(errors).length > 0) {
throw new Error(JSON.stringify(errors))
}
Expand Down Expand Up @@ -112,32 +160,48 @@ const executeAction: ActionHandler<PieceAction> = async ({ action, executionStat
externalId: constants.externalProjectId,
},
generateResumeUrl: (params) => {
const url = new URL(`${constants.publicUrl}v1/flow-runs/${constants.flowRunId}/requests/${executionState.pauseRequestId}`)
const url = new URL(
`${constants.publicUrl}v1/flow-runs/${constants.flowRunId}/requests/${executionState.pauseRequestId}`,
)
url.search = new URLSearchParams(params.queryParams).toString()
return url.toString()
},
pieceConfig: AP_PIECES_CONFIG?.[action.settings.pieceName] || {},
}
const runMethodToExecute = (constants.testSingleStepMode && !isNil(pieceAction.test)) ? pieceAction.test : pieceAction.run
const runMethodToExecute =
constants.testSingleStepMode && !isNil(pieceAction.test)
? pieceAction.test
: pieceAction.run
const output = await runMethodToExecute(context)
const newExecutionContext = executionState.addTags(hookResponse.tags)

if (hookResponse.stopped) {
assertNotNullOrUndefined(hookResponse.stopResponse, 'stopResponse')
return newExecutionContext.upsertStep(action.name, stepOutput.setOutput(output)).setVerdict(ExecutionVerdict.SUCCEEDED, {
reason: FlowRunStatus.STOPPED,
stopResponse: hookResponse.stopResponse.response,
}).increaseTask()
return newExecutionContext
.upsertStep(action.name, stepOutput.setOutput(output))
.setVerdict(ExecutionVerdict.SUCCEEDED, {
reason: FlowRunStatus.STOPPED,
stopResponse: hookResponse.stopResponse.response,
})
.increaseTask()
}
if (hookResponse.paused) {
assertNotNullOrUndefined(hookResponse.pauseResponse, 'pauseResponse')
return newExecutionContext.upsertStep(action.name, stepOutput.setOutput(output).setStatus(StepOutputStatus.PAUSED))
return newExecutionContext
.upsertStep(
action.name,
stepOutput.setOutput(output).setStatus(StepOutputStatus.PAUSED),
)
.setVerdict(ExecutionVerdict.PAUSED, {
reason: FlowRunStatus.PAUSED,
pauseMetadata: hookResponse.pauseResponse.pauseMetadata,
})
}

return newExecutionContext.upsertStep(action.name, stepOutput.setOutput(output)).increaseTask().setVerdict(ExecutionVerdict.RUNNING, undefined)
return newExecutionContext
.upsertStep(action.name, stepOutput.setOutput(output))
.increaseTask()
.setVerdict(ExecutionVerdict.RUNNING, undefined)
}
catch (e) {
const handledError = handleExecutionError(e)
Expand All @@ -155,20 +219,31 @@ const executeAction: ActionHandler<PieceAction> = async ({ action, executionStat

const createTagsManager = (hookResponse: HookResponse): TagsManager => {
return {
add: async (params: {
name: string
}): Promise<void> => {
add: async (params: { name: string }): Promise<void> => {
hookResponse.tags.push(params.name)
},

}
}

const createConnectionManager = ({ engineToken, projectId, hookResponse, apiUrl }: { projectId: string, engineToken: string, hookResponse: HookResponse, apiUrl: string }): ConnectionsManager => {
const createConnectionManager = ({
engineToken,
projectId,
hookResponse,
apiUrl,
}: {
projectId: string
engineToken: string
hookResponse: HookResponse
apiUrl: string
}): ConnectionsManager => {
return {
get: async (key: string) => {
try {
const connection = await createConnectionService({ projectId, engineToken, apiUrl }).obtain(key)
const connection = await createConnectionService({
projectId,
engineToken,
apiUrl,
}).obtain(key)
hookResponse.tags.push(`connection:${key}`)
return connection
}
Expand All @@ -186,14 +261,23 @@ function createStopHook(hookResponse: HookResponse): StopHook {
}
}

function createPauseHook(hookResponse: HookResponse, pauseId: string): PauseHook {
function createPauseHook(
hookResponse: HookResponse,
pauseId: string,
): PauseHook {
return (req) => {
hookResponse.paused = true
switch (req.pauseMetadata.type) {
case PauseType.DELAY: {
const diffInDays = dayjs(req.pauseMetadata.resumeDateTime).diff(dayjs(), 'days')
const diffInDays = dayjs(req.pauseMetadata.resumeDateTime).diff(
dayjs(),
'days',
)
if (diffInDays > AP_PAUSED_FLOW_TIMEOUT_DAYS) {
throw new PausedFlowTimeoutError(undefined, AP_PAUSED_FLOW_TIMEOUT_DAYS)
throw new PausedFlowTimeoutError(
undefined,
AP_PAUSED_FLOW_TIMEOUT_DAYS,
)
}
hookResponse.pauseResponse = {
pauseMetadata: req.pauseMetadata,
Expand Down
3 changes: 2 additions & 1 deletion packages/pieces/community/framework/src/lib/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ export type BaseActionContext<
generateResumeUrl: (params: {
queryParams: Record<string, string>
}) => string;
pieceConfig: object;
};

type BeginExecutionActionContext<
Expand Down Expand Up @@ -194,4 +195,4 @@ export enum StoreScope {
// Collection were deprecated in favor of project
PROJECT = 'COLLECTION',
FLOW = 'FLOW',
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ export const httpSendRequestAction = createAction({
} else {
proxyUrl = `http://${proxySettings.proxy_host}:${proxySettings.proxy_port}`;
}

const httpsAgent = new HttpsProxyAgent(proxyUrl)
const axiosClient = axios.create({
httpsAgent,
Expand Down
8 changes: 5 additions & 3 deletions packages/server/shared/src/lib/system/system-prop.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ export enum AppSystemProp {
ENCRYPTION_KEY = 'ENCRYPTION_KEY',
EXECUTION_DATA_RETENTION_DAYS = 'EXECUTION_DATA_RETENTION_DAYS',
JWT_SECRET = 'JWT_SECRET',

/**
* @deprecated this now can be done from the platform admin page.
*/
Expand Down Expand Up @@ -95,14 +95,15 @@ export enum SharedSystemProp {
MAX_FILE_SIZE_MB = 'MAX_FILE_SIZE_MB',

FRONTEND_URL = 'FRONTEND_URL',

// These are shared as the app is using the engine as a dependency for now.
CACHE_PATH = 'CACHE_PATH',
EXECUTION_MODE = 'EXECUTION_MODE',
PACKAGE_ARCHIVE_PATH = 'PACKAGE_ARCHIVE_PATH',
SANDBOX_MEMORY_LIMIT = 'SANDBOX_MEMORY_LIMIT',
SANDBOX_PROPAGATED_ENV_VARS = 'SANDBOX_PROPAGATED_ENV_VARS',
PIECES_SOURCE = 'PIECES_SOURCE',
PIECES_CONFIG = 'PIECES_CONFIG',
ENGINE_EXECUTABLE_PATH = 'ENGINE_EXECUTABLE_PATH',
ENRICH_ERROR_CONTEXT = 'ENRICH_ERROR_CONTEXT',

Expand All @@ -112,13 +113,14 @@ export enum SharedSystemProp {
LOKI_URL = 'LOKI_URL',
LOKI_USERNAME = 'LOKI_USERNAME',


}

export enum WorkerSystemProps {
FLOW_WORKER_CONCURRENCY = 'FLOW_WORKER_CONCURRENCY',
SCHEDULED_WORKER_CONCURRENCY = 'SCHEDULED_WORKER_CONCURRENCY',
SCHEDULED_POLLING_COUNT = 'SCHEDULED_POLLING_COUNT',

// TODO: This is currently undocumented and used for testing purposes. Please document or remove as necessary.
POLLING_POOL_SIZE = 'POLLING_POOL_SIZE',
WORKER_TOKEN = 'WORKER_TOKEN',
Expand Down
2 changes: 1 addition & 1 deletion packages/server/shared/src/lib/system/system.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ const systemPropDefaultValues: Partial<Record<SystemProp, string>> = {
[AppSystemProp.MAX_CONCURRENT_JOBS_PER_PROJECT]: '100',
[AppSystemProp.PROJECT_RATE_LIMITER_ENABLED]: 'false',
[AppSystemProp.DEV_PIECES]: '',

[SharedSystemProp.PIECES_CONFIG]: '{}',
}

export const system = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ export class IsolateSandbox {
AP_PAUSED_FLOW_TIMEOUT_DAYS: system.getOrThrow(SharedSystemProp.PAUSED_FLOW_TIMEOUT_DAYS),
AP_BASE_CODE_DIRECTORY: IsolateSandbox.sandboxCodesCachePath,
AP_MAX_FILE_SIZE_MB: system.getOrThrow(SharedSystemProp.MAX_FILE_SIZE_MB),
AP_PIECES_CONFIG: system.getOrThrow(SharedSystemProp.PIECES_CONFIG),
}
}

Expand Down
Loading

0 comments on commit dd6107d

Please sign in to comment.