diff --git a/airbyte-webapp/src/components/connection/ConnectionStatus/useConnectionStatus.test.ts b/airbyte-webapp/src/components/connection/ConnectionStatus/useConnectionStatus.test.ts index f6e13193fd0..3c62348fcd2 100644 --- a/airbyte-webapp/src/components/connection/ConnectionStatus/useConnectionStatus.test.ts +++ b/airbyte-webapp/src/components/connection/ConnectionStatus/useConnectionStatus.test.ts @@ -2,15 +2,19 @@ import { renderHook } from "@testing-library/react"; import dayjs from "dayjs"; import { mockConnection } from "test-utils"; +import { mockAttempt } from "test-utils/mock-data/mockAttempt"; +import { mockJob } from "test-utils/mock-data/mockJob"; -import { useListConnectionsStatuses, useGetConnection } from "core/api"; +import { useListJobsForConnectionStatus, useGetConnection } from "core/api"; import { ConnectionScheduleDataBasicSchedule, ConnectionScheduleDataCron, ConnectionStatus, - ConnectionStatusRead, + FailureOrigin, FailureType, + JobConfigType, JobStatus, + JobWithAttemptsRead, SchemaChange, WebBackendConnectionRead, } from "core/api/types/AirbyteClient"; @@ -18,7 +22,6 @@ import { useSchemaChanges } from "hooks/connection/useSchemaChanges"; import { isConnectionLate, isHandleableScheduledConnection, useConnectionStatus } from "./useConnectionStatus"; import { ConnectionStatusIndicatorStatus } from "../ConnectionStatusIndicator"; -import { jobStatusesIndicatingFinishedExecution } from "../ConnectionSync/ConnectionSyncContext"; const MULTIPLIER_EXPERIMENT_VALUE = 2; @@ -39,8 +42,8 @@ jest.mock("core/api"); const mockUseGetConnection = useGetConnection as unknown as jest.Mock; jest.mock("core/api"); -const mockUseListConnectionsStatuses = useListConnectionsStatuses as unknown as jest.Mock< - Array> +const mockUseListJobsForConnectionStatus = useListJobsForConnectionStatus as unknown as jest.Mock< + ReturnType >; interface MockSetup { @@ -50,16 +53,10 @@ interface MockSetup { scheduleType?: WebBackendConnectionRead["scheduleType"]; scheduleData?: WebBackendConnectionRead["scheduleData"]; - // status(es) - connectionStatuses: Array>; + // job history + jobList?: JobWithAttemptsRead[]; } -const resetAndSetupMocks = ({ - connectionStatus, - schemaChange, - scheduleType, - scheduleData, - connectionStatuses, -}: MockSetup) => { +const resetAndSetupMocks = ({ connectionStatus, schemaChange, scheduleType, scheduleData, jobList }: MockSetup) => { if (schemaChange === SchemaChange.breaking && connectionStatus !== ConnectionStatus.inactive) { // platform disables a connection when there is a breaking schema change throw new Error("A breaking schema change should always result in an inactive connection"); @@ -75,7 +72,11 @@ const resetAndSetupMocks = ({ scheduleData, })); - mockUseListConnectionsStatuses.mockReturnValue(connectionStatuses); + mockUseListJobsForConnectionStatus.mockImplementation(() => ({ + data: { jobs: jobList || [], totalJobCount: jobList?.length ?? 0 }, + isPreviousData: false, + setData: () => undefined, + })); mockUseSchemaChanges.mockImplementation(() => ({ hasBreakingSchemaChange, @@ -244,29 +245,29 @@ describe("useConnectionStatus", () => { const outside2xFrequency = dayjs().subtract(50, "hours").unix(); it.each` - title | expectedConnectionStatus | connectionStatus | schemaChange | connectionStatuses | scheduleType | scheduleData - ${"most recent sync was successful, no schema changes"} | ${ConnectionStatusIndicatorStatus.OnTime} | ${ConnectionStatus.active} | ${SchemaChange.no_change} | ${buildConnectionStatusesRead(JobStatus.succeeded, within1xFrequency)} | ${undefined} | ${undefined} - ${"most recent sync was successful, breaking schema changes"} | ${ConnectionStatusIndicatorStatus.ActionRequired} | ${ConnectionStatus.inactive} | ${SchemaChange.breaking} | ${buildConnectionStatusesRead(JobStatus.succeeded, within1xFrequency)} | ${undefined} | ${undefined} - ${"breaking schema changes, sync is within 1x frequency"} | ${ConnectionStatusIndicatorStatus.ActionRequired} | ${ConnectionStatus.inactive} | ${SchemaChange.breaking} | ${buildConnectionStatusesRead(JobStatus.failed, within1xFrequency)} | ${"basic"} | ${{ units: 24, timeUnit: "hours" }} - ${"breaking schema changes, sync is within 2x frequency"} | ${ConnectionStatusIndicatorStatus.ActionRequired} | ${ConnectionStatus.inactive} | ${SchemaChange.breaking} | ${buildConnectionStatusesRead(JobStatus.failed, within2xFrequency)} | ${"basic"} | ${{ units: 24, timeUnit: "hours" }} - ${"breaking schema changes, sync is outside of 2x frequency"} | ${ConnectionStatusIndicatorStatus.ActionRequired} | ${ConnectionStatus.inactive} | ${SchemaChange.breaking} | ${buildConnectionStatusesRead(JobStatus.failed, outside2xFrequency)} | ${"basic"} | ${{ units: 24, timeUnit: "hours" }} - ${"new connection, not scheduled"} | ${ConnectionStatusIndicatorStatus.Pending} | ${ConnectionStatus.active} | ${SchemaChange.no_change} | ${buildConnectionStatusesRead()} | ${undefined} | ${undefined} - ${"new connection, scheduled"} | ${ConnectionStatusIndicatorStatus.Pending} | ${ConnectionStatus.active} | ${SchemaChange.no_change} | ${buildConnectionStatusesRead()} | ${"basic"} | ${{ units: 24, timeUnit: "hours" }} - ${"connection status is failed, no previous success"} | ${ConnectionStatusIndicatorStatus.Error} | ${ConnectionStatus.active} | ${SchemaChange.no_change} | ${buildConnectionStatusesRead(JobStatus.failed, undefined)} | ${undefined} | ${undefined} - ${"connection status is failed, last previous success was within 1x schedule frequency"} | ${ConnectionStatusIndicatorStatus.OnTrack} | ${ConnectionStatus.active} | ${SchemaChange.no_change} | ${buildConnectionStatusesRead(JobStatus.failed, within1xFrequency)} | ${"basic"} | ${{ units: 24, timeUnit: "hours" }} - ${"connection status is failed, last previous success was within 2x schedule frequency"} | ${ConnectionStatusIndicatorStatus.OnTrack} | ${ConnectionStatus.active} | ${SchemaChange.no_change} | ${buildConnectionStatusesRead(JobStatus.failed, within2xFrequency)} | ${"basic"} | ${{ units: 24, timeUnit: "hours" }} - ${"connection status is failed, last previous success was within 2x schedule frequency (cron)"} | ${ConnectionStatusIndicatorStatus.Error} | ${ConnectionStatus.active} | ${SchemaChange.no_change} | ${buildConnectionStatusesRead(JobStatus.failed, within2xFrequency)} | ${"cron"} | ${{ cronExpression: "* * * * *", cronTimeZone: "UTC" }} - ${"connection status is failed, last previous success was outside 2x schedule frequency"} | ${ConnectionStatusIndicatorStatus.Error} | ${ConnectionStatus.active} | ${SchemaChange.no_change} | ${buildConnectionStatusesRead(JobStatus.failed, outside2xFrequency)} | ${"basic"} | ${{ units: 24, timeUnit: "hours" }} - ${"connection status is failed, last previous success was within 2x schedule frequency"} | ${ConnectionStatusIndicatorStatus.OnTrack} | ${ConnectionStatus.active} | ${SchemaChange.non_breaking} | ${buildConnectionStatusesRead(JobStatus.failed, within2xFrequency)} | ${"basic"} | ${{ units: 24, timeUnit: "hours" }} - ${"last sync was successful, but the next sync hasn't started (outside 2x frequency)"} | ${ConnectionStatusIndicatorStatus.Late} | ${ConnectionStatus.active} | ${SchemaChange.non_breaking} | ${buildConnectionStatusesRead(JobStatus.succeeded, outside2xFrequency)} | ${"basic"} | ${{ units: 24, timeUnit: "hours" }} - ${"last sync was successful, but the next sync hasn't started (outside 2x frequency, cron)"} | ${ConnectionStatusIndicatorStatus.OnTime} | ${ConnectionStatus.active} | ${SchemaChange.non_breaking} | ${buildConnectionStatusesRead(JobStatus.succeeded, outside2xFrequency)} | ${"cron"} | ${{ cronExpression: "* * * * *", cronTimeZone: "UTC" }} - ${"last sync was cancelled, but the next cron-scheduled sync hasn't started"} | ${ConnectionStatusIndicatorStatus.OnTime} | ${ConnectionStatus.active} | ${SchemaChange.non_breaking} | ${buildConnectionStatusesRead(JobStatus.cancelled, within1xFrequency)} | ${"cron"} | ${{ cronExpression: "* * * * *", cronTimeZone: "UTC" }} - ${"last sync was cancelled, but last successful sync is within 1x frequency"} | ${ConnectionStatusIndicatorStatus.OnTime} | ${ConnectionStatus.active} | ${SchemaChange.non_breaking} | ${buildConnectionStatusesRead(JobStatus.cancelled, within1xFrequency)} | ${"basic"} | ${{ units: 24, timeUnit: "hours" }} - ${"last sync was cancelled, but last successful sync is within 2x frequency"} | ${ConnectionStatusIndicatorStatus.OnTrack} | ${ConnectionStatus.active} | ${SchemaChange.non_breaking} | ${buildConnectionStatusesRead(JobStatus.cancelled, within2xFrequency)} | ${"basic"} | ${{ units: 24, timeUnit: "hours" }} - ${"last sync was cancelled, but last successful sync is outside 2x frequency"} | ${ConnectionStatusIndicatorStatus.Late} | ${ConnectionStatus.active} | ${SchemaChange.non_breaking} | ${buildConnectionStatusesRead(JobStatus.cancelled, outside2xFrequency)} | ${"basic"} | ${{ units: 24, timeUnit: "hours" }} - ${"last sync has a config_error"} | ${ConnectionStatusIndicatorStatus.ActionRequired} | ${ConnectionStatus.active} | ${SchemaChange.no_change} | ${buildConfigErrorConnectionStatusesRead(within1xFrequency)} | ${"basic"} | ${{ units: 24, timeUnit: "hours" }} - ${"most recent completed job was a successful reset"} | ${ConnectionStatusIndicatorStatus.Pending} | ${ConnectionStatus.active} | ${SchemaChange.no_change} | ${buildConnectionStatusesRead(JobStatus.succeeded, undefined, true)} | ${"basic"} | ${{ units: 24, timeUnit: "hours" }} - ${"most recent completed job was a failed reset"} | ${ConnectionStatusIndicatorStatus.Pending} | ${ConnectionStatus.active} | ${SchemaChange.no_change} | ${buildConnectionStatusesRead(JobStatus.failed, undefined, true)} | ${"basic"} | ${{ units: 24, timeUnit: "hours" }} + title | expectedConnectionStatus | connectionStatus | schemaChange | jobList | scheduleType | scheduleData + ${"most recent sync was successful, no schema changes"} | ${ConnectionStatusIndicatorStatus.OnTime} | ${ConnectionStatus.active} | ${SchemaChange.no_change} | ${buildJobs(JobStatus.succeeded, within1xFrequency)} | ${undefined} | ${undefined} + ${"most recent sync was successful, breaking schema changes"} | ${ConnectionStatusIndicatorStatus.ActionRequired} | ${ConnectionStatus.inactive} | ${SchemaChange.breaking} | ${buildJobs(JobStatus.succeeded, within1xFrequency)} | ${undefined} | ${undefined} + ${"breaking schema changes, sync is within 1x frequency"} | ${ConnectionStatusIndicatorStatus.ActionRequired} | ${ConnectionStatus.inactive} | ${SchemaChange.breaking} | ${buildJobs(JobStatus.failed, within1xFrequency)} | ${"basic"} | ${{ units: 24, timeUnit: "hours" }} + ${"breaking schema changes, sync is within 2x frequency"} | ${ConnectionStatusIndicatorStatus.ActionRequired} | ${ConnectionStatus.inactive} | ${SchemaChange.breaking} | ${buildJobs(JobStatus.failed, within2xFrequency)} | ${"basic"} | ${{ units: 24, timeUnit: "hours" }} + ${"breaking schema changes, sync is outside of 2x frequency"} | ${ConnectionStatusIndicatorStatus.ActionRequired} | ${ConnectionStatus.inactive} | ${SchemaChange.breaking} | ${buildJobs(JobStatus.failed, outside2xFrequency)} | ${"basic"} | ${{ units: 24, timeUnit: "hours" }} + ${"new connection, not scheduled"} | ${ConnectionStatusIndicatorStatus.Pending} | ${ConnectionStatus.active} | ${SchemaChange.no_change} | ${buildJobs(JobStatus.incomplete, undefined)} | ${undefined} | ${undefined} + ${"new connection, scheduled"} | ${ConnectionStatusIndicatorStatus.Pending} | ${ConnectionStatus.active} | ${SchemaChange.no_change} | ${buildJobs(JobStatus.incomplete, undefined)} | ${"basic"} | ${{ units: 24, timeUnit: "hours" }} + ${"connection status is failed, no previous success"} | ${ConnectionStatusIndicatorStatus.Error} | ${ConnectionStatus.active} | ${SchemaChange.no_change} | ${buildJobs(JobStatus.failed, undefined)} | ${undefined} | ${undefined} + ${"connection status is failed, last previous success was within 1x schedule frequency"} | ${ConnectionStatusIndicatorStatus.OnTrack} | ${ConnectionStatus.active} | ${SchemaChange.no_change} | ${buildJobs(JobStatus.failed, within1xFrequency)} | ${"basic"} | ${{ units: 24, timeUnit: "hours" }} + ${"connection status is failed, last previous success was within 2x schedule frequency"} | ${ConnectionStatusIndicatorStatus.OnTrack} | ${ConnectionStatus.active} | ${SchemaChange.no_change} | ${buildJobs(JobStatus.failed, within2xFrequency)} | ${"basic"} | ${{ units: 24, timeUnit: "hours" }} + ${"connection status is failed, last previous success was within 2x schedule frequency (cron)"} | ${ConnectionStatusIndicatorStatus.Error} | ${ConnectionStatus.active} | ${SchemaChange.no_change} | ${buildJobs(JobStatus.failed, within2xFrequency)} | ${"cron"} | ${{ cronExpression: "* * * * *", cronTimeZone: "UTC" }} + ${"connection status is failed, last previous success was outside 2x schedule frequency"} | ${ConnectionStatusIndicatorStatus.Error} | ${ConnectionStatus.active} | ${SchemaChange.no_change} | ${buildJobs(JobStatus.failed, outside2xFrequency)} | ${"basic"} | ${{ units: 24, timeUnit: "hours" }} + ${"connection status is failed, last previous success was within 2x schedule frequency"} | ${ConnectionStatusIndicatorStatus.OnTrack} | ${ConnectionStatus.active} | ${SchemaChange.non_breaking} | ${buildJobs(JobStatus.failed, within2xFrequency)} | ${"basic"} | ${{ units: 24, timeUnit: "hours" }} + ${"last sync was successful, but the next sync hasn't started (outside 2x frequency)"} | ${ConnectionStatusIndicatorStatus.Late} | ${ConnectionStatus.active} | ${SchemaChange.non_breaking} | ${buildJobs(JobStatus.succeeded, outside2xFrequency)} | ${"basic"} | ${{ units: 24, timeUnit: "hours" }} + ${"last sync was successful, but the next sync hasn't started (outside 2x frequency, cron)"} | ${ConnectionStatusIndicatorStatus.OnTime} | ${ConnectionStatus.active} | ${SchemaChange.non_breaking} | ${buildJobs(JobStatus.succeeded, outside2xFrequency)} | ${"cron"} | ${{ cronExpression: "* * * * *", cronTimeZone: "UTC" }} + ${"last sync was cancelled, but the next cron-scheduled sync hasn't started"} | ${ConnectionStatusIndicatorStatus.OnTime} | ${ConnectionStatus.active} | ${SchemaChange.non_breaking} | ${buildJobs(JobStatus.cancelled, within1xFrequency)} | ${"cron"} | ${{ cronExpression: "* * * * *", cronTimeZone: "UTC" }} + ${"last sync was cancelled, but last successful sync is within 1x frequency"} | ${ConnectionStatusIndicatorStatus.OnTime} | ${ConnectionStatus.active} | ${SchemaChange.non_breaking} | ${buildJobs(JobStatus.cancelled, within1xFrequency)} | ${"basic"} | ${{ units: 24, timeUnit: "hours" }} + ${"last sync was cancelled, but last successful sync is within 2x frequency"} | ${ConnectionStatusIndicatorStatus.OnTrack} | ${ConnectionStatus.active} | ${SchemaChange.non_breaking} | ${buildJobs(JobStatus.cancelled, within2xFrequency)} | ${"basic"} | ${{ units: 24, timeUnit: "hours" }} + ${"last sync was cancelled, but last successful sync is outside 2x frequency"} | ${ConnectionStatusIndicatorStatus.Late} | ${ConnectionStatus.active} | ${SchemaChange.non_breaking} | ${buildJobs(JobStatus.cancelled, outside2xFrequency)} | ${"basic"} | ${{ units: 24, timeUnit: "hours" }} + ${"last sync has a config_error"} | ${ConnectionStatusIndicatorStatus.ActionRequired} | ${ConnectionStatus.active} | ${SchemaChange.no_change} | ${buildJobsWithConfigError(within1xFrequency)} | ${"basic"} | ${{ units: 24, timeUnit: "hours" }} + ${"most recent completed job was a successful reset"} | ${ConnectionStatusIndicatorStatus.Pending} | ${ConnectionStatus.active} | ${SchemaChange.no_change} | ${buildJobs(JobStatus.succeeded, undefined, true)} | ${"basic"} | ${{ units: 24, timeUnit: "hours" }} + ${"most recent completed job was a failed reset"} | ${ConnectionStatusIndicatorStatus.Pending} | ${ConnectionStatus.active} | ${SchemaChange.no_change} | ${buildJobs(JobStatus.failed, undefined, true)} | ${"basic"} | ${{ units: 24, timeUnit: "hours" }} `( "$title:" + "\n\treturns $expectedConnectionStatus when" + @@ -276,7 +277,7 @@ describe("useConnectionStatus", () => { "\n\tand has $schemaChange schema changes", ({ expectedConnectionStatus, scheduleType, scheduleData, ...mockConfig }) => { resetAndSetupMocks({ ...mockConfig, ...buildScheduleData(scheduleType, scheduleData) }); - const { result } = renderHook(() => useConnectionStatus(mockConnection.connectionId)); + const { result } = renderHook(() => useConnectionStatus("test-connection-id")); expect(result.current.status).toBe(expectedConnectionStatus); } ); @@ -285,9 +286,9 @@ describe("useConnectionStatus", () => { resetAndSetupMocks({ connectionStatus: ConnectionStatus.active, schemaChange: SchemaChange.no_change, - connectionStatuses: buildConnectionStatusesRead(JobStatus.incomplete, undefined), + jobList: buildJobs(JobStatus.incomplete, undefined), }); - const { result } = renderHook(() => useConnectionStatus(mockConnection.connectionId)); + const { result } = renderHook(() => useConnectionStatus("test-connection-id")); expect(result.current.isRunning).toBe(true); }); }); @@ -308,24 +309,62 @@ function buildScheduleData( return { scheduleType, scheduleData: { cron: schedule as ConnectionScheduleDataCron } }; } -function buildConnectionStatusesRead( - latestSyncStatus?: JobStatus, - lastSuccessfulSync?: number, - lastJobWasReset?: boolean -) { - return [ +function buildJobs(latestSyncStatus: JobStatus, lastSuccessfulSync: number | undefined, lastJobWasReset?: boolean) { + const jobs: JobWithAttemptsRead[] = []; + + // jobslist endpoint returns the most recent job first + + if (latestSyncStatus && !lastJobWasReset) { + jobs.push({ + job: { + ...mockJob, + status: latestSyncStatus, + }, + attempts: [], // attempts are not used + }); + } + + if (lastSuccessfulSync) { + jobs.push({ + job: { + ...mockJob, + createdAt: lastSuccessfulSync, + }, + attempts: [], // attempts are not used + }); + } + + if (lastJobWasReset) { + jobs.push({ + job: { + ...mockJob, + status: latestSyncStatus, + configType: JobConfigType.reset_connection, + resetConfig: { streamsToReset: [] }, + }, + attempts: [], // attempts are not used + }); + } + return jobs; +} + +function buildJobsWithConfigError(lastSuccessfulSync: number | undefined) { + const jobs = buildJobs(JobStatus.failed, lastSuccessfulSync); + + jobs[0].attempts = [ { - connectionId: mockConnection.connectionId, - lastSyncJobStatus: latestSyncStatus, - [lastSuccessfulSync ? "lastSuccessfulSync" : ""]: lastSuccessfulSync, - isRunning: latestSyncStatus ? !jobStatusesIndicatingFinishedExecution.includes(latestSyncStatus) : false, - isLastCompletedJobReset: !!lastJobWasReset, + ...mockAttempt, + failureSummary: { + failures: [ + { + failureOrigin: FailureOrigin.source, + failureType: FailureType.config_error, + timestamp: 0, + }, + ], + }, }, - ] as Array>; -} + ]; -function buildConfigErrorConnectionStatusesRead(lastSuccessfulSync: number | undefined) { - const jobs = buildConnectionStatusesRead(JobStatus.failed, lastSuccessfulSync); - jobs[0].failureType = FailureType.config_error; return jobs; } diff --git a/airbyte-webapp/src/components/connection/ConnectionStatus/useConnectionStatus.ts b/airbyte-webapp/src/components/connection/ConnectionStatus/useConnectionStatus.ts index c2dc36ed113..5445f65fc20 100644 --- a/airbyte-webapp/src/components/connection/ConnectionStatus/useConnectionStatus.ts +++ b/airbyte-webapp/src/components/connection/ConnectionStatus/useConnectionStatus.ts @@ -5,18 +5,21 @@ import { useLateMultiplierExperiment, } from "components/connection/StreamStatus/streamStatusUtils"; -import { useGetConnection, useListConnectionsStatuses } from "core/api"; +import { useListJobsForConnectionStatus, useGetConnection } from "core/api"; import { ConnectionScheduleType, ConnectionStatus, FailureType, + JobConfigType, JobStatus, + JobWithAttemptsRead, WebBackendConnectionRead, } from "core/api/types/AirbyteClient"; import { moveTimeToFutureByPeriod } from "core/utils/time"; import { useSchemaChanges } from "hooks/connection/useSchemaChanges"; import { ConnectionStatusIndicatorStatus } from "../ConnectionStatusIndicator"; +import { jobStatusesIndicatingFinishedExecution } from "../ConnectionSync/ConnectionSyncContext"; export const isHandleableScheduledConnection = (scheduleType: ConnectionScheduleType | undefined) => scheduleType === "basic"; @@ -58,15 +61,32 @@ export interface UIConnectionStatus { isRunning: boolean; } +const getConfigErrorFromJobs = (jobs: JobWithAttemptsRead[]) => { + const sortedAttempts = [...(jobs?.[0]?.attempts ?? [])].sort((a, b) => { + if (a.createdAt < b.createdAt) { + return 1; + } else if (a.createdAt > b.createdAt) { + return -1; + } + return 0; + }); + const latestAttempt = sortedAttempts[0]; + const configErrorFailure = latestAttempt?.failureSummary?.failures.find( + (failure) => failure.failureType === FailureType.config_error + ); + return configErrorFailure; +}; + export const useConnectionStatus = (connectionId: string): UIConnectionStatus => { const connection = useGetConnection(connectionId); - const connectionStatuses = useListConnectionsStatuses([connectionId]); - const connectionStatus = connectionStatuses[0]; + // get the last N (10) jobs for this connection + // to determine the connection's status + const { + data: { jobs }, + } = useListJobsForConnectionStatus(connectionId); - const { isRunning, isLastCompletedJobReset, lastSyncJobStatus, lastSuccessfulSync, failureType } = connectionStatus; - - const hasConfigError = failureType === FailureType.config_error; + const configError = getConfigErrorFromJobs(jobs); const { hasBreakingSchemaChange } = useSchemaChanges(connection.schemaChange); @@ -75,8 +95,29 @@ export const useConnectionStatus = (connectionId: string): UIConnectionStatus => const lateMultiplier = useLateMultiplierExperiment(); const errorMultiplier = useErrorMultiplierExperiment(); + const isRunning = jobs[0]?.job?.status === JobStatus.running || jobs[0]?.job?.status === JobStatus.incomplete; + + const isLastCompletedJobReset = + jobs[0]?.job?.resetConfig && + jobs[0]?.job?.configType === JobConfigType.reset_connection && + (jobs[0]?.job?.status === JobStatus.succeeded || jobs[0]?.job?.status === JobStatus.failed); + + // compute the connection sync status from the job history + const lastCompletedSyncJob = jobs.find( + ({ job }) => + job && job.configType === JobConfigType.sync && jobStatusesIndicatingFinishedExecution.includes(job.status) + ); + const lastSyncJobStatus = lastCompletedSyncJob?.job?.status; + + // find the last successful sync job & its timestamp + const lastSuccessfulSyncJob = jobs.find( + ({ job }) => job?.configType === JobConfigType.sync && job?.status === JobStatus.succeeded + ); + const lastSuccessfulSync = lastSuccessfulSyncJob?.job?.createdAt; + // calculate the time we expect the next sync to start (basic schedule only) - const latestSyncJobCreatedAt = connection.latestSyncJobCreatedAt; + const lastSyncJob = jobs.find(({ job }) => job?.configType === JobConfigType.sync); + const latestSyncJobCreatedAt = lastSyncJob?.job?.createdAt; let nextSync; if (latestSyncJobCreatedAt && connection.scheduleData?.basicSchedule) { const latestSync = dayjs(latestSyncJobCreatedAt * 1000); @@ -87,7 +128,7 @@ export const useConnectionStatus = (connectionId: string): UIConnectionStatus => ); } - if (hasBreakingSchemaChange || hasConfigError) { + if (hasBreakingSchemaChange || configError) { return { status: ConnectionStatusIndicatorStatus.ActionRequired, lastSyncJobStatus, diff --git a/airbyte-webapp/src/components/connection/ConnectionSync/ConnectionSyncContext.tsx b/airbyte-webapp/src/components/connection/ConnectionSync/ConnectionSyncContext.tsx index 46adc4ab77a..81e0e46b6e4 100644 --- a/airbyte-webapp/src/components/connection/ConnectionSync/ConnectionSyncContext.tsx +++ b/airbyte-webapp/src/components/connection/ConnectionSync/ConnectionSyncContext.tsx @@ -2,21 +2,23 @@ import { createContext, useCallback, useContext, useMemo } from "react"; import { useResetConnection, + useResetConnectionStream, useSyncConnection, useCancelJob, - useListJobsForConnection, + useListJobsForConnectionStatus, useSetConnectionJobsData, - useSetConnectionRunState, } from "core/api"; import { ConnectionStatus, ConnectionStream, JobWithAttemptsRead, + JobConfigType, JobStatus, JobInfoRead, WebBackendConnectionRead, } from "core/api/types/AirbyteClient"; import { useConnectionEditService } from "hooks/services/ConnectionEdit/ConnectionEditService"; +import { useExperiment } from "hooks/services/Experiment"; interface ConnectionSyncContext { syncConnection: () => Promise; @@ -28,27 +30,20 @@ interface ConnectionSyncContext { resetStreams: (streams?: ConnectionStream[]) => Promise; resetStarting: boolean; jobResetRunning: boolean; + lastCompletedSyncJob?: JobWithAttemptsRead; } -export const jobStatusesIndicatingFinishedExecution: string[] = [ - JobStatus.succeeded, - JobStatus.failed, - JobStatus.cancelled, -]; +export const jobStatusesIndicatingFinishedExecution: string[] = [JobStatus.succeeded, JobStatus.failed]; const useConnectionSyncContextInit = (connection: WebBackendConnectionRead): ConnectionSyncContext => { - const { connectionId } = connection; - const jobsPageSize = 1; + const jobsPageSize = useExperiment("connection.streamCentricUI.numberOfLogsToLoad", 10); const { data: { jobs }, - } = useListJobsForConnection(connectionId); + } = useListJobsForConnectionStatus(connection.connectionId); const connectionEnabled = connection.status === ConnectionStatus.active; - const setConnectionJobsData = useSetConnectionJobsData(connectionId); - const setConnectionStatusRunState = useSetConnectionRunState(connectionId); + const setConnectionJobsData = useSetConnectionJobsData(connection.connectionId); const prependJob = useCallback( (newJob: JobInfoRead) => { - const isNewJobRunning = newJob.job.status === JobStatus.pending || newJob.job.status === JobStatus.running; - setConnectionStatusRunState(isNewJobRunning); setConnectionJobsData((prev) => { // if the new job id is already in the list, don't add it again if (prev?.jobs?.[0]?.job?.id === newJob.job.id) { @@ -59,7 +54,7 @@ const useConnectionSyncContextInit = (connection: WebBackendConnectionRead): Con job: { ...newJob.job, // if the new job's status is pending, set to running so the UI updates immediately - status: isNewJobRunning ? JobStatus.running : newJob.job.status, + status: newJob.job.status === JobStatus.pending ? JobStatus.running : newJob.job.status, }, attempts: [], }; @@ -71,7 +66,7 @@ const useConnectionSyncContextInit = (connection: WebBackendConnectionRead): Con }; }); }, - [setConnectionJobsData, jobsPageSize, setConnectionStatusRunState] + [setConnectionJobsData, jobsPageSize] ); const { mutateAsync: doSyncConnection, isLoading: syncStarting } = useSyncConnection(); @@ -84,7 +79,6 @@ const useConnectionSyncContextInit = (connection: WebBackendConnectionRead): Con const jobId = jobs?.[0]?.job?.id; if (jobId) { await doCancelJob(jobId); - setConnectionStatusRunState(false); setConnectionJobsData((prev) => { // deep copy from previous data because // 1. we don't want to mutate the in-state objects @@ -99,13 +93,22 @@ const useConnectionSyncContextInit = (connection: WebBackendConnectionRead): Con }; }); } - }, [jobs, doCancelJob, setConnectionJobsData, setConnectionStatusRunState]); + }, [jobs, doCancelJob, setConnectionJobsData]); const { mutateAsync: doResetConnection, isLoading: resetStarting } = useResetConnection(); - const resetStreams = useCallback(async () => { - // Reset all streams - prependJob(await doResetConnection(connectionId)); - }, [connectionId, doResetConnection, prependJob]); + const { mutateAsync: resetStream } = useResetConnectionStream(connection.connectionId); + const resetStreams = useCallback( + async (streams?: ConnectionStream[]) => { + if (streams) { + // Reset a set of streams. + prependJob(await resetStream(streams)); + } else { + // Reset all selected streams + prependJob(await doResetConnection(connection.connectionId)); + } + }, + [connection.connectionId, doResetConnection, resetStream, prependJob] + ); const activeJob = jobs[0]?.job; const jobSyncRunning = useMemo( @@ -119,6 +122,11 @@ const useConnectionSyncContextInit = (connection: WebBackendConnectionRead): Con [activeJob?.configType, activeJob?.status] ); + const lastCompletedSyncJob = jobs.find( + ({ job }) => + job && job.configType === JobConfigType.sync && jobStatusesIndicatingFinishedExecution.includes(job.status) + ); + return { syncConnection, connectionEnabled, @@ -129,6 +137,7 @@ const useConnectionSyncContextInit = (connection: WebBackendConnectionRead): Con resetStreams, resetStarting, jobResetRunning, + lastCompletedSyncJob, }; }; diff --git a/airbyte-webapp/src/core/api/hooks/connections.ts b/airbyte-webapp/src/core/api/hooks/connections.ts index 9565c0b1230..b073fe70abc 100644 --- a/airbyte-webapp/src/core/api/hooks/connections.ts +++ b/airbyte-webapp/src/core/api/hooks/connections.ts @@ -1,4 +1,4 @@ -import { Updater, useIsMutating, useMutation, useQuery, useQueryClient } from "@tanstack/react-query"; +import { useIsMutating, useMutation, useQuery, useQueryClient } from "@tanstack/react-query"; import { useCallback } from "react"; import { useIntl } from "react-intl"; @@ -12,7 +12,6 @@ import { SCOPE_WORKSPACE } from "services/Scope"; import { useCurrentWorkspace, useInvalidateWorkspaceStateQuery } from "./workspaces"; import { useAppMonitoringService } from "../../../hooks/services/AppMonitoringService"; import { useNotificationService } from "../../../hooks/services/Notification"; -import { getConnectionStatuses } from "../generated/AirbyteClient"; import { createOrUpdateStateSafe, deleteConnection, @@ -31,7 +30,6 @@ import { ConnectionScheduleType, ConnectionStateCreateOrUpdate, ConnectionStatus, - ConnectionStatusesRead, ConnectionStream, DestinationRead, NamespaceDefinitionType, @@ -411,36 +409,3 @@ export const useCreateOrUpdateState = () => { } ); }; - -export const useListConnectionsStatuses = (connectionIds: string[]) => { - const requestOptions = useRequestOptions(); - const queryKey = [SCOPE_WORKSPACE, "connections", "status", connectionIds]; - - return useSuspenseQuery(queryKey, () => getConnectionStatuses({ connectionIds }, requestOptions), { - refetchInterval: (data) => { - // when any of the polled connections is running, refresh 2.5s instead of 10s - return data?.some(({ isRunning }) => isRunning) ? 2500 : 10000; - }, - }); -}; - -export const useSetConnectionRunState = (connectionId: string) => { - const queryClient = useQueryClient(); - - return (isRunning: boolean) => { - queryClient.setQueriesData([SCOPE_WORKSPACE, "connections", "status"], ((data) => { - if (data) { - data = data.map((connectionStatus) => { - if (connectionStatus.connectionId === connectionId) { - const nextConnectionStatus = structuredClone(connectionStatus); // don't mutate existing object - nextConnectionStatus.isRunning = isRunning; // set run state - delete nextConnectionStatus.failureType; // new runs reset failure state - return nextConnectionStatus; - } - return connectionStatus; - }); - } - return data; - }) as Updater); - }; -}; diff --git a/airbyte-webapp/src/core/api/hooks/jobs.ts b/airbyte-webapp/src/core/api/hooks/jobs.ts index 53d4b9b0373..ce34a0173d4 100644 --- a/airbyte-webapp/src/core/api/hooks/jobs.ts +++ b/airbyte-webapp/src/core/api/hooks/jobs.ts @@ -1,7 +1,6 @@ import { Updater, useIsMutating, useMutation, useQuery, useQueryClient } from "@tanstack/react-query"; -import { jobStatusesIndicatingFinishedExecution } from "components/connection/ConnectionSync/ConnectionSyncContext"; - +import { useExperiment } from "hooks/services/Experiment"; import { SCOPE_WORKSPACE } from "services/Scope"; import { @@ -11,7 +10,7 @@ import { getJobInfoWithoutLogs, listJobsFor, } from "../generated/AirbyteClient"; -import { JobListRequestBody, JobReadList } from "../types/AirbyteClient"; +import { JobListRequestBody, JobReadList, JobStatus } from "../types/AirbyteClient"; import { useRequestOptions } from "../useRequestOptions"; import { useSuspenseQuery } from "../useSuspenseQuery"; @@ -28,8 +27,7 @@ export const useListJobs = (listParams: JobListRequestBody, keepPreviousData = t const result = useQuery(queryKey, () => listJobsFor(listParams, requestOptions), { refetchInterval: (data) => { - const jobStatus = data?.jobs?.[0]?.job?.status; - return jobStatus && jobStatusesIndicatingFinishedExecution.includes(jobStatus) ? 10000 : 2500; + return data?.jobs?.[0]?.job?.status === JobStatus.running ? 2500 : 10000; }, keepPreviousData, suspense: true, @@ -41,12 +39,12 @@ export const useListJobs = (listParams: JobListRequestBody, keepPreviousData = t }; }; -export const useListJobsForConnection = (connectionId: string, pageSize = 1) => { +export const useListJobsForConnectionStatus = (connectionId: string) => { return useListJobs({ configId: connectionId, configTypes: ["sync", "reset_connection"], pagination: { - pageSize, + pageSize: useExperiment("connection.streamCentricUI.numberOfLogsToLoad", 10), }, }); }; diff --git a/airbyte-webapp/src/hooks/services/Experiment/experiments.ts b/airbyte-webapp/src/hooks/services/Experiment/experiments.ts index 526a063d3de..7ba4d304be6 100644 --- a/airbyte-webapp/src/hooks/services/Experiment/experiments.ts +++ b/airbyte-webapp/src/hooks/services/Experiment/experiments.ts @@ -19,6 +19,7 @@ export interface Experiments { "connection.searchableJobLogs": boolean; "connection.streamCentricUI.errorMultiplier": number; "connection.streamCentricUI.lateMultiplier": number; + "connection.streamCentricUI.numberOfLogsToLoad": number; "connection.streamCentricUI.v2": boolean; "connection.streamCentricUI.historicalOverview": boolean; "connection.syncCatalog.simplifiedCatalogRow": boolean; diff --git a/airbyte-webapp/src/pages/connections/StreamStatusPage/ErrorMessage.tsx b/airbyte-webapp/src/pages/connections/StreamStatusPage/ErrorMessage.tsx index 31a28e8d3c0..594dd156f69 100644 --- a/airbyte-webapp/src/pages/connections/StreamStatusPage/ErrorMessage.tsx +++ b/airbyte-webapp/src/pages/connections/StreamStatusPage/ErrorMessage.tsx @@ -2,18 +2,16 @@ import { useMemo } from "react"; import { useIntl } from "react-intl"; import { useNavigate } from "react-router-dom"; -import { jobStatusesIndicatingFinishedExecution } from "components/connection/ConnectionSync/ConnectionSyncContext"; +import { useConnectionSyncContext } from "components/connection/ConnectionSync/ConnectionSyncContext"; import { Box } from "components/ui/Box"; import { FlexContainer } from "components/ui/Flex"; import { Message, MessageProps, MessageType, isHigherSeverity } from "components/ui/Message"; import { useCurrentWorkspaceId } from "area/workspace/utils"; -import { useListJobsForConnection } from "core/api"; import { useDestinationDefinitionVersion, useSourceDefinitionVersion } from "core/api"; import { shouldDisplayBreakingChangeBanner, getHumanReadableUpgradeDeadline } from "core/domain/connector"; import { ActorDefinitionVersionRead, - AttemptRead, FailureOrigin, FailureType, JobWithAttemptsRead, @@ -26,45 +24,13 @@ import { ConnectionRoutePaths, RoutePaths } from "pages/routePaths"; import styles from "./ErrorMessage.module.scss"; const getErrorMessageFromJob = (job: JobWithAttemptsRead | undefined) => { - if (!job || !job.job) { - return null; - } - - const attempts = job.attempts ? [...job.attempts] : []; - - // sort most recent attempt to top - attempts.sort((a, b) => { - if (a.createdAt < b.createdAt) { - return 1; - } else if (a.createdAt > b.createdAt) { - return -1; - } - return 0; - }); - - const isJobComplete = jobStatusesIndicatingFinishedExecution.includes(job.job.status); - - let attemptToReadErrorFrom: undefined | AttemptRead = undefined; - if (isJobComplete) { - // use the most recent attempt - if (attempts[0]?.failureSummary?.failures?.[0]?.failureType !== FailureType.manual_cancellation) { - [attemptToReadErrorFrom] = attempts; - } - } else { - // use the most recent errored attempt, so the error message persists - // if the job is still running, bridging any gap as a new attempt spins up - attemptToReadErrorFrom = attempts.find((attempt) => { - const failureType = attempt.failureSummary?.failures?.[0]?.failureType; - return failureType && failureType !== FailureType.manual_cancellation; - }); - } - - if (attemptToReadErrorFrom) { + const latestAttempt = job?.attempts?.slice(-1)[0]; + if (latestAttempt?.failureSummary?.failures?.[0]?.failureType !== "manual_cancellation") { return { - errorMessage: attemptToReadErrorFrom?.failureSummary?.failures?.[0]?.externalMessage, - failureOrigin: attemptToReadErrorFrom?.failureSummary?.failures?.[0]?.failureOrigin, - failureType: attemptToReadErrorFrom?.failureSummary?.failures?.[0]?.failureType, - attemptId: attemptToReadErrorFrom?.id, + errorMessage: latestAttempt?.failureSummary?.failures?.[0]?.externalMessage, + failureOrigin: latestAttempt?.failureSummary?.failures?.[0]?.failureOrigin, + failureType: latestAttempt?.failureSummary?.failures?.[0]?.failureType, + attemptId: latestAttempt?.id, jobId: job?.job?.id, }; } @@ -115,9 +81,7 @@ export const ErrorMessage: React.FC = () => { const workspaceId = useCurrentWorkspaceId(); const { connection } = useConnectionEditService(); - const { - data: { jobs }, - } = useListJobsForConnection(connection.connectionId); + const { lastCompletedSyncJob } = useConnectionSyncContext(); const { hasSchemaChanges, hasBreakingSchemaChange } = useSchemaChanges(connection.schemaChange); const sourceActorDefinitionVersion = useSourceDefinitionVersion(connection.sourceId); const destinationActorDefinitionVersion = useDestinationDefinitionVersion(connection.destinationId); @@ -126,7 +90,8 @@ export const ErrorMessage: React.FC = () => { const errorMessagesToDisplay = useMemo(() => { const errorMessages: MessageProps[] = []; - const { jobId, attemptId, errorMessage, failureType, failureOrigin } = getErrorMessageFromJob(jobs?.[0]) ?? {}; + const { jobId, attemptId, errorMessage, failureType, failureOrigin } = + getErrorMessageFromJob(lastCompletedSyncJob) ?? {}; // If we have an error message and no breaking schema changes, show the error message if (errorMessage && !hasBreakingSchemaChange) { const isConfigError = failureType === FailureType.config_error; @@ -247,7 +212,7 @@ export const ErrorMessage: React.FC = () => { formatMessage, hasBreakingSchemaChange, hasSchemaChanges, - jobs, + lastCompletedSyncJob, navigate, connection.sourceId, connection.destinationId, diff --git a/airbyte-webapp/src/pages/connections/StreamStatusPage/StreamsListContext.tsx b/airbyte-webapp/src/pages/connections/StreamStatusPage/StreamsListContext.tsx index 81f25570133..da80f1c2e29 100644 --- a/airbyte-webapp/src/pages/connections/StreamStatusPage/StreamsListContext.tsx +++ b/airbyte-webapp/src/pages/connections/StreamStatusPage/StreamsListContext.tsx @@ -4,9 +4,13 @@ import { ConnectionStatusIndicatorStatus } from "components/connection/Connectio import { sortStreams } from "components/connection/StreamStatus/streamStatusUtils"; import { useStreamsStatuses } from "area/connection/utils"; +import { useListJobsForConnectionStatus } from "core/api"; import { useConnectionEditService } from "hooks/services/ConnectionEdit/ConnectionEditService"; const useStreamsContextInit = (connectionId: string) => { + const { + data: { jobs }, + } = useListJobsForConnectionStatus(connectionId); const [searchTerm, setSearchTerm] = useState(""); const { enabledStreams, streamStatuses } = useStreamsStatuses(connectionId); @@ -24,6 +28,7 @@ const useStreamsContextInit = (connectionId: string) => { return { setSearchTerm, filteredStreams, + jobs, }; };