Skip to content

Commit

Permalink
Revert "🪟 🔧 Consume the connection status API endpoint (#8455)" (#9076)
Browse files Browse the repository at this point in the history
  • Loading branch information
teallarson committed Sep 28, 2023
1 parent eb0539e commit fe1cde0
Show file tree
Hide file tree
Showing 8 changed files with 199 additions and 176 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,21 @@ import {
useLateMultiplierExperiment,
} from "components/connection/StreamStatus/streamStatusUtils";

import { useGetConnection, useListConnectionsStatuses } from "core/api";
import { useListJobsForConnectionStatus, useGetConnection } from "core/api";
import {
ConnectionScheduleType,
ConnectionStatus,
FailureType,
JobConfigType,
JobStatus,
JobWithAttemptsRead,
WebBackendConnectionRead,
} from "core/api/types/AirbyteClient";
import { moveTimeToFutureByPeriod } from "core/utils/time";
import { useSchemaChanges } from "hooks/connection/useSchemaChanges";

import { ConnectionStatusIndicatorStatus } from "../ConnectionStatusIndicator";
import { jobStatusesIndicatingFinishedExecution } from "../ConnectionSync/ConnectionSyncContext";

export const isHandleableScheduledConnection = (scheduleType: ConnectionScheduleType | undefined) =>
scheduleType === "basic";
Expand Down Expand Up @@ -58,15 +61,32 @@ export interface UIConnectionStatus {
isRunning: boolean;
}

const getConfigErrorFromJobs = (jobs: JobWithAttemptsRead[]) => {
const sortedAttempts = [...(jobs?.[0]?.attempts ?? [])].sort((a, b) => {
if (a.createdAt < b.createdAt) {
return 1;
} else if (a.createdAt > b.createdAt) {
return -1;
}
return 0;
});
const latestAttempt = sortedAttempts[0];
const configErrorFailure = latestAttempt?.failureSummary?.failures.find(
(failure) => failure.failureType === FailureType.config_error
);
return configErrorFailure;
};

export const useConnectionStatus = (connectionId: string): UIConnectionStatus => {
const connection = useGetConnection(connectionId);

const connectionStatuses = useListConnectionsStatuses([connectionId]);
const connectionStatus = connectionStatuses[0];
// get the last N (10) jobs for this connection
// to determine the connection's status
const {
data: { jobs },
} = useListJobsForConnectionStatus(connectionId);

const { isRunning, isLastCompletedJobReset, lastSyncJobStatus, lastSuccessfulSync, failureType } = connectionStatus;

const hasConfigError = failureType === FailureType.config_error;
const configError = getConfigErrorFromJobs(jobs);

const { hasBreakingSchemaChange } = useSchemaChanges(connection.schemaChange);

Expand All @@ -75,8 +95,29 @@ export const useConnectionStatus = (connectionId: string): UIConnectionStatus =>
const lateMultiplier = useLateMultiplierExperiment();
const errorMultiplier = useErrorMultiplierExperiment();

const isRunning = jobs[0]?.job?.status === JobStatus.running || jobs[0]?.job?.status === JobStatus.incomplete;

const isLastCompletedJobReset =
jobs[0]?.job?.resetConfig &&
jobs[0]?.job?.configType === JobConfigType.reset_connection &&
(jobs[0]?.job?.status === JobStatus.succeeded || jobs[0]?.job?.status === JobStatus.failed);

// compute the connection sync status from the job history
const lastCompletedSyncJob = jobs.find(
({ job }) =>
job && job.configType === JobConfigType.sync && jobStatusesIndicatingFinishedExecution.includes(job.status)
);
const lastSyncJobStatus = lastCompletedSyncJob?.job?.status;

// find the last successful sync job & its timestamp
const lastSuccessfulSyncJob = jobs.find(
({ job }) => job?.configType === JobConfigType.sync && job?.status === JobStatus.succeeded
);
const lastSuccessfulSync = lastSuccessfulSyncJob?.job?.createdAt;

// calculate the time we expect the next sync to start (basic schedule only)
const latestSyncJobCreatedAt = connection.latestSyncJobCreatedAt;
const lastSyncJob = jobs.find(({ job }) => job?.configType === JobConfigType.sync);
const latestSyncJobCreatedAt = lastSyncJob?.job?.createdAt;
let nextSync;
if (latestSyncJobCreatedAt && connection.scheduleData?.basicSchedule) {
const latestSync = dayjs(latestSyncJobCreatedAt * 1000);
Expand All @@ -87,7 +128,7 @@ export const useConnectionStatus = (connectionId: string): UIConnectionStatus =>
);
}

if (hasBreakingSchemaChange || hasConfigError) {
if (hasBreakingSchemaChange || configError) {
return {
status: ConnectionStatusIndicatorStatus.ActionRequired,
lastSyncJobStatus,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,23 @@ import { createContext, useCallback, useContext, useMemo } from "react";

import {
useResetConnection,
useResetConnectionStream,
useSyncConnection,
useCancelJob,
useListJobsForConnection,
useListJobsForConnectionStatus,
useSetConnectionJobsData,
useSetConnectionRunState,
} from "core/api";
import {
ConnectionStatus,
ConnectionStream,
JobWithAttemptsRead,
JobConfigType,
JobStatus,
JobInfoRead,
WebBackendConnectionRead,
} from "core/api/types/AirbyteClient";
import { useConnectionEditService } from "hooks/services/ConnectionEdit/ConnectionEditService";
import { useExperiment } from "hooks/services/Experiment";

interface ConnectionSyncContext {
syncConnection: () => Promise<void>;
Expand All @@ -28,27 +30,20 @@ interface ConnectionSyncContext {
resetStreams: (streams?: ConnectionStream[]) => Promise<void>;
resetStarting: boolean;
jobResetRunning: boolean;
lastCompletedSyncJob?: JobWithAttemptsRead;
}

export const jobStatusesIndicatingFinishedExecution: string[] = [
JobStatus.succeeded,
JobStatus.failed,
JobStatus.cancelled,
];
export const jobStatusesIndicatingFinishedExecution: string[] = [JobStatus.succeeded, JobStatus.failed];
const useConnectionSyncContextInit = (connection: WebBackendConnectionRead): ConnectionSyncContext => {
const { connectionId } = connection;
const jobsPageSize = 1;
const jobsPageSize = useExperiment("connection.streamCentricUI.numberOfLogsToLoad", 10);
const {
data: { jobs },
} = useListJobsForConnection(connectionId);
} = useListJobsForConnectionStatus(connection.connectionId);
const connectionEnabled = connection.status === ConnectionStatus.active;
const setConnectionJobsData = useSetConnectionJobsData(connectionId);
const setConnectionStatusRunState = useSetConnectionRunState(connectionId);
const setConnectionJobsData = useSetConnectionJobsData(connection.connectionId);

const prependJob = useCallback(
(newJob: JobInfoRead) => {
const isNewJobRunning = newJob.job.status === JobStatus.pending || newJob.job.status === JobStatus.running;
setConnectionStatusRunState(isNewJobRunning);
setConnectionJobsData((prev) => {
// if the new job id is already in the list, don't add it again
if (prev?.jobs?.[0]?.job?.id === newJob.job.id) {
Expand All @@ -59,7 +54,7 @@ const useConnectionSyncContextInit = (connection: WebBackendConnectionRead): Con
job: {
...newJob.job,
// if the new job's status is pending, set to running so the UI updates immediately
status: isNewJobRunning ? JobStatus.running : newJob.job.status,
status: newJob.job.status === JobStatus.pending ? JobStatus.running : newJob.job.status,
},
attempts: [],
};
Expand All @@ -71,7 +66,7 @@ const useConnectionSyncContextInit = (connection: WebBackendConnectionRead): Con
};
});
},
[setConnectionJobsData, jobsPageSize, setConnectionStatusRunState]
[setConnectionJobsData, jobsPageSize]
);

const { mutateAsync: doSyncConnection, isLoading: syncStarting } = useSyncConnection();
Expand All @@ -84,7 +79,6 @@ const useConnectionSyncContextInit = (connection: WebBackendConnectionRead): Con
const jobId = jobs?.[0]?.job?.id;
if (jobId) {
await doCancelJob(jobId);
setConnectionStatusRunState(false);
setConnectionJobsData((prev) => {
// deep copy from previous data because
// 1. we don't want to mutate the in-state objects
Expand All @@ -99,13 +93,22 @@ const useConnectionSyncContextInit = (connection: WebBackendConnectionRead): Con
};
});
}
}, [jobs, doCancelJob, setConnectionJobsData, setConnectionStatusRunState]);
}, [jobs, doCancelJob, setConnectionJobsData]);

const { mutateAsync: doResetConnection, isLoading: resetStarting } = useResetConnection();
const resetStreams = useCallback(async () => {
// Reset all streams
prependJob(await doResetConnection(connectionId));
}, [connectionId, doResetConnection, prependJob]);
const { mutateAsync: resetStream } = useResetConnectionStream(connection.connectionId);
const resetStreams = useCallback(
async (streams?: ConnectionStream[]) => {
if (streams) {
// Reset a set of streams.
prependJob(await resetStream(streams));
} else {
// Reset all selected streams
prependJob(await doResetConnection(connection.connectionId));
}
},
[connection.connectionId, doResetConnection, resetStream, prependJob]
);

const activeJob = jobs[0]?.job;
const jobSyncRunning = useMemo(
Expand All @@ -119,6 +122,11 @@ const useConnectionSyncContextInit = (connection: WebBackendConnectionRead): Con
[activeJob?.configType, activeJob?.status]
);

const lastCompletedSyncJob = jobs.find(
({ job }) =>
job && job.configType === JobConfigType.sync && jobStatusesIndicatingFinishedExecution.includes(job.status)
);

return {
syncConnection,
connectionEnabled,
Expand All @@ -129,6 +137,7 @@ const useConnectionSyncContextInit = (connection: WebBackendConnectionRead): Con
resetStreams,
resetStarting,
jobResetRunning,
lastCompletedSyncJob,
};
};

Expand Down
37 changes: 1 addition & 36 deletions airbyte-webapp/src/core/api/hooks/connections.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Updater, useIsMutating, useMutation, useQuery, useQueryClient } from "@tanstack/react-query";
import { useIsMutating, useMutation, useQuery, useQueryClient } from "@tanstack/react-query";
import { useCallback } from "react";
import { useIntl } from "react-intl";

Expand All @@ -12,7 +12,6 @@ import { SCOPE_WORKSPACE } from "services/Scope";
import { useCurrentWorkspace, useInvalidateWorkspaceStateQuery } from "./workspaces";
import { useAppMonitoringService } from "../../../hooks/services/AppMonitoringService";
import { useNotificationService } from "../../../hooks/services/Notification";
import { getConnectionStatuses } from "../generated/AirbyteClient";
import {
createOrUpdateStateSafe,
deleteConnection,
Expand All @@ -31,7 +30,6 @@ import {
ConnectionScheduleType,
ConnectionStateCreateOrUpdate,
ConnectionStatus,
ConnectionStatusesRead,
ConnectionStream,
DestinationRead,
NamespaceDefinitionType,
Expand Down Expand Up @@ -411,36 +409,3 @@ export const useCreateOrUpdateState = () => {
}
);
};

export const useListConnectionsStatuses = (connectionIds: string[]) => {
const requestOptions = useRequestOptions();
const queryKey = [SCOPE_WORKSPACE, "connections", "status", connectionIds];

return useSuspenseQuery(queryKey, () => getConnectionStatuses({ connectionIds }, requestOptions), {
refetchInterval: (data) => {
// when any of the polled connections is running, refresh 2.5s instead of 10s
return data?.some(({ isRunning }) => isRunning) ? 2500 : 10000;
},
});
};

export const useSetConnectionRunState = (connectionId: string) => {
const queryClient = useQueryClient();

return (isRunning: boolean) => {
queryClient.setQueriesData([SCOPE_WORKSPACE, "connections", "status"], ((data) => {
if (data) {
data = data.map((connectionStatus) => {
if (connectionStatus.connectionId === connectionId) {
const nextConnectionStatus = structuredClone(connectionStatus); // don't mutate existing object
nextConnectionStatus.isRunning = isRunning; // set run state
delete nextConnectionStatus.failureType; // new runs reset failure state
return nextConnectionStatus;
}
return connectionStatus;
});
}
return data;
}) as Updater<ConnectionStatusesRead | undefined, ConnectionStatusesRead>);
};
};
12 changes: 5 additions & 7 deletions airbyte-webapp/src/core/api/hooks/jobs.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { Updater, useIsMutating, useMutation, useQuery, useQueryClient } from "@tanstack/react-query";

import { jobStatusesIndicatingFinishedExecution } from "components/connection/ConnectionSync/ConnectionSyncContext";

import { useExperiment } from "hooks/services/Experiment";
import { SCOPE_WORKSPACE } from "services/Scope";

import {
Expand All @@ -11,7 +10,7 @@ import {
getJobInfoWithoutLogs,
listJobsFor,
} from "../generated/AirbyteClient";
import { JobListRequestBody, JobReadList } from "../types/AirbyteClient";
import { JobListRequestBody, JobReadList, JobStatus } from "../types/AirbyteClient";
import { useRequestOptions } from "../useRequestOptions";
import { useSuspenseQuery } from "../useSuspenseQuery";

Expand All @@ -28,8 +27,7 @@ export const useListJobs = (listParams: JobListRequestBody, keepPreviousData = t

const result = useQuery(queryKey, () => listJobsFor(listParams, requestOptions), {
refetchInterval: (data) => {
const jobStatus = data?.jobs?.[0]?.job?.status;
return jobStatus && jobStatusesIndicatingFinishedExecution.includes(jobStatus) ? 10000 : 2500;
return data?.jobs?.[0]?.job?.status === JobStatus.running ? 2500 : 10000;
},
keepPreviousData,
suspense: true,
Expand All @@ -41,12 +39,12 @@ export const useListJobs = (listParams: JobListRequestBody, keepPreviousData = t
};
};

export const useListJobsForConnection = (connectionId: string, pageSize = 1) => {
export const useListJobsForConnectionStatus = (connectionId: string) => {
return useListJobs({
configId: connectionId,
configTypes: ["sync", "reset_connection"],
pagination: {
pageSize,
pageSize: useExperiment("connection.streamCentricUI.numberOfLogsToLoad", 10),
},
});
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ export interface Experiments {
"connection.searchableJobLogs": boolean;
"connection.streamCentricUI.errorMultiplier": number;
"connection.streamCentricUI.lateMultiplier": number;
"connection.streamCentricUI.numberOfLogsToLoad": number;
"connection.streamCentricUI.v2": boolean;
"connection.streamCentricUI.historicalOverview": boolean;
"connection.syncCatalog.simplifiedCatalogRow": boolean;
Expand Down
Loading

0 comments on commit fe1cde0

Please sign in to comment.