diff --git a/airbyte-commons-server/src/main/java/io/airbyte/commons/server/handlers/DiagnosticToolHandler.java b/airbyte-commons-server/src/main/java/io/airbyte/commons/server/handlers/DiagnosticToolHandler.java index 6c687a8c366..643d87fc5d0 100644 --- a/airbyte-commons-server/src/main/java/io/airbyte/commons/server/handlers/DiagnosticToolHandler.java +++ b/airbyte-commons-server/src/main/java/io/airbyte/commons/server/handlers/DiagnosticToolHandler.java @@ -4,18 +4,14 @@ package io.airbyte.commons.server.handlers; -import io.airbyte.api.model.generated.ActorDefinitionVersionRead; -import io.airbyte.api.model.generated.ActorStatus; import io.airbyte.api.model.generated.ActorType; -import io.airbyte.api.model.generated.ConnectionReadList; -import io.airbyte.api.model.generated.DestinationIdRequestBody; -import io.airbyte.api.model.generated.DestinationReadList; import io.airbyte.api.model.generated.LicenseInfoResponse; -import io.airbyte.api.model.generated.SourceIdRequestBody; -import io.airbyte.api.model.generated.SourceReadList; -import io.airbyte.api.model.generated.WorkspaceIdRequestBody; -import io.airbyte.api.model.generated.WorkspaceReadList; +import io.airbyte.config.persistence.ActorDefinitionVersionHelper; import io.airbyte.data.exceptions.ConfigNotFoundException; +import io.airbyte.data.services.ConnectionService; +import io.airbyte.data.services.DestinationService; +import io.airbyte.data.services.SourceService; +import io.airbyte.data.services.WorkspaceService; import io.airbyte.validation.json.JsonValidationException; import io.fabric8.kubernetes.api.model.Container; import io.fabric8.kubernetes.api.model.Node; @@ -30,11 +26,14 @@ import java.io.IOException; import java.time.Instant; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.UUID; +import java.util.stream.Stream; import java.util.zip.ZipEntry; import java.util.zip.ZipOutputStream; import org.slf4j.Logger; @@ -55,27 +54,27 @@ public class DiagnosticToolHandler { public static final String DIAGNOSTIC_REPORT_FILE_NAME = "diagnostic_report"; public static final String DIAGNOSTIC_REPORT_FILE_FORMAT = ".zip"; - private final WorkspacesHandler workspacesHandler; - private final ConnectionsHandler connectionsHandler; - private final SourceHandler sourceHandler; - private final DestinationHandler destinationHandler; - private final ActorDefinitionVersionHandler actorDefinitionVersionHandler; + private final WorkspaceService workspaceService; + private final ConnectionService connectionService; + private final SourceService sourceService; + private final DestinationService destinationService; + private final ActorDefinitionVersionHelper actorDefinitionVersionHelper; private final InstanceConfigurationHandler instanceConfigurationHandler; private final Optional kubernetesClient; private final DumperOptions yamlDumperOptions; - public DiagnosticToolHandler(final WorkspacesHandler workspacesHandler, - final ConnectionsHandler connectionsHandler, - final SourceHandler sourceHandler, - final DestinationHandler destinationHandler, - final ActorDefinitionVersionHandler actorDefinitionVersionHandler, + public DiagnosticToolHandler(final WorkspaceService workspaceService, + final ConnectionService connectionService, + final SourceService sourceService, + final DestinationService destinationService, + final ActorDefinitionVersionHelper actorDefinitionVersionHelper, final InstanceConfigurationHandler instanceConfigurationHandler, final Optional kubernetesClient) { - this.workspacesHandler = workspacesHandler; - this.connectionsHandler = connectionsHandler; - this.sourceHandler = sourceHandler; - this.destinationHandler = destinationHandler; - this.actorDefinitionVersionHandler = actorDefinitionVersionHandler; + this.workspaceService = workspaceService; + this.connectionService = connectionService; + this.sourceService = sourceService; + this.destinationService = destinationService; + this.actorDefinitionVersionHelper = actorDefinitionVersionHelper; this.instanceConfigurationHandler = instanceConfigurationHandler; this.kubernetesClient = kubernetesClient; this.yamlDumperOptions = new DumperOptions(); @@ -150,97 +149,120 @@ private List> collectWorkspaceInfo() { try { // get all workspaces LOGGER.info("Collecting workspaces data..."); - final WorkspaceReadList workspaces = workspacesHandler.listWorkspaces(); - final List> workspaceList = workspaces.getWorkspaces().stream().map(workspace -> { - final Map workspaceInfo = new HashMap<>(); - workspaceInfo.put("name", workspace.getName()); - workspaceInfo.put("id", workspace.getWorkspaceId().toString()); - workspaceInfo.put("connections", collectConnectionInfo(workspace.getWorkspaceId())); - workspaceInfo.put("connectors", collectConnectorInfo(workspace.getWorkspaceId())); - return workspaceInfo; - }).toList(); - return workspaceList; - } catch (final JsonValidationException | IOException e) { + return workspaceService + .listStandardWorkspaces(false) + .stream() + .map(workspace -> Map.of( + "name", workspace.getName(), + "id", workspace.getWorkspaceId(), + "connections", Objects.requireNonNull(collectConnectionInfo(workspace.getWorkspaceId())), + "connectors", Objects.requireNonNull(collectConnectorInfo(workspace.getWorkspaceId())))) + .toList(); + } catch (final IOException e) { LOGGER.error("Error collecting workspace information", e); return null; } } - private List> collectConnectionInfo(final UUID workspaceId) { + private List> collectConnectionInfo(final UUID workspaceId) { try { LOGGER.info("Collecting connections data..."); // get all connections by workspaceId - final ConnectionReadList connections = connectionsHandler.listConnectionsForWorkspace(new WorkspaceIdRequestBody().workspaceId(workspaceId)); - final List> connectionList = connections.getConnections().stream().map(connection -> { - final Map connectionInfo = new HashMap<>(); - connectionInfo.put("name", connection.getName()); - connectionInfo.put("id", connection.getConnectionId().toString()); - connectionInfo.put("status", connection.getStatus().toString()); - connectionInfo.put("sourceId", connection.getSourceId().toString()); - connectionInfo.put("destinationId", connection.getDestinationId().toString()); - return connectionInfo; - }).toList(); - return connectionList; - } catch (final JsonValidationException | ConfigNotFoundException | IOException e) { + return connectionService + .listWorkspaceStandardSyncs(workspaceId, false) + .stream() + .map(connection -> Map.of( + "name", connection.getName(), + "id", connection.getConnectionId().toString(), + "status", connection.getStatus().toString(), + "sourceId", connection.getSourceId().toString(), + "destinationId", connection.getDestinationId().toString())) + .toList(); + } catch (final IOException e) { LOGGER.error("Error collecting connection information", e); - return null; + return Collections.emptyList(); } } - private List> collectConnectorInfo(final UUID workspaceId) { + private List> collectConnectorInfo(final UUID workspaceId) { try { LOGGER.info("Collecting connectors data..."); // get all sources by workspaceId (only include active ones in the report) - final SourceReadList sources = sourceHandler.listSourcesForWorkspace(new WorkspaceIdRequestBody().workspaceId(workspaceId)); - final List> sourceList = sources.getSources().stream().filter(source -> ActorStatus.ACTIVE.equals(source.getStatus())) + final List> sources = sourceService + .listWorkspaceSourceConnection(workspaceId) + .stream() + .filter(source -> { + // TODO: isSourceActive feels like it could not throw and just return false if the config is not + // found. + try { + return sourceService.isSourceActive(source.getSourceId()); + } catch (IOException e) { + return false; + } + }) .map(source -> { - final Map connectionInfo = new HashMap<>(); - connectionInfo.put("name", source.getName()); - connectionInfo.put("id", source.getSourceId().toString()); - connectionInfo.put("type", ActorType.SOURCE.toString()); - connectionInfo.put("connectorDefinitionId", source.getSourceDefinitionId().toString()); + ActorDefinitionVersionHelper.ActorDefinitionVersionWithOverrideStatus sourceDefinitionVersion = null; try { - final ActorDefinitionVersionRead sourceDefinitionVersion = - actorDefinitionVersionHandler.getActorDefinitionVersionForSourceId(new SourceIdRequestBody().sourceId(source.getSourceId())); - connectionInfo.put("connectorDockerImageTag", sourceDefinitionVersion.getDockerImageTag()); - connectionInfo.put("connectorVersionOverrideApplied", sourceDefinitionVersion.getIsVersionOverrideApplied()); - connectionInfo.put("connectorSupportState", sourceDefinitionVersion.getSupportState().toString()); - } catch (final JsonValidationException | IOException | ConfigNotFoundException - | io.airbyte.config.persistence.ConfigNotFoundException e) { + sourceDefinitionVersion = actorDefinitionVersionHelper.getSourceVersionWithOverrideStatus( + sourceService.getSourceDefinitionFromSource(source.getSourceDefinitionId()), + workspaceId, + source.getSourceId()); + } catch (IOException | JsonValidationException | ConfigNotFoundException e) { LOGGER.error("Error collecting source version information", e); } - return connectionInfo; - }).toList(); + + return Map.of( + "name", source.getName(), + "id", source.getSourceId().toString(), + "type", ActorType.SOURCE.toString(), + "connectorDefinitionId", source.getSourceDefinitionId().toString(), + "connectorDockerImageTag", + sourceDefinitionVersion != null ? sourceDefinitionVersion.actorDefinitionVersion().getDockerImageTag() : "", + "connectorVersionOverrideApplied", Boolean.toString(Objects.requireNonNull(sourceDefinitionVersion).isOverrideApplied()), + "connectorSupportState", sourceDefinitionVersion.actorDefinitionVersion().getSupportState().toString()); + + }) + .toList(); // get all destinations by workspaceId (only include active ones in the report) - final DestinationReadList destinations = destinationHandler.listDestinationsForWorkspace(new WorkspaceIdRequestBody().workspaceId(workspaceId)); - final List> destinationList = - destinations.getDestinations().stream().filter(destination -> ActorStatus.ACTIVE.equals(destination.getStatus())) - .map(destination -> { - final Map connectionInfo = new HashMap<>(); - connectionInfo.put("name", destination.getName()); - connectionInfo.put("id", destination.getDestinationId().toString()); - connectionInfo.put("type", ActorType.DESTINATION.toString()); - connectionInfo.put("connectorDefinitionId", destination.getDestinationId().toString()); - try { - final ActorDefinitionVersionRead destinationDefinitionVersion = actorDefinitionVersionHandler - .getActorDefinitionVersionForDestinationId(new DestinationIdRequestBody().destinationId(destination.getDestinationId())); - connectionInfo.put("connectorDockerImageTag", destinationDefinitionVersion.getDockerImageTag()); - connectionInfo.put("connectorVersionOverrideApplied", destinationDefinitionVersion.getIsVersionOverrideApplied()); - connectionInfo.put("connectorSupportState", destinationDefinitionVersion.getSupportState().toString()); - } catch (final JsonValidationException | IOException | ConfigNotFoundException - | io.airbyte.config.persistence.ConfigNotFoundException e) { - LOGGER.error("Error collecting destination version information", e); - } - return connectionInfo; - }).toList(); + final List> destinations = destinationService + .listWorkspaceDestinationConnection(workspaceId) + .stream() + .filter(destination -> { + // TODO: isDestinationActive feels like it could not throw and just return false if the config is + // not found. + try { + return destinationService.isDestinationActive(destination.getDestinationId()); + } catch (IOException e) { + return false; + } + }) + .map(destination -> { + ActorDefinitionVersionHelper.ActorDefinitionVersionWithOverrideStatus destinationDefinitionVersion = null; + try { + destinationDefinitionVersion = actorDefinitionVersionHelper.getDestinationVersionWithOverrideStatus( + destinationService.getStandardDestinationDefinition(destination.getDestinationId()), + workspaceId, + destination.getDestinationId()); + } catch (IOException | JsonValidationException | ConfigNotFoundException e) { + LOGGER.error("Error collecting destination version information", e); + } + + return Map.of( + "name", destination.getName(), + "id", destination.getDestinationId().toString(), + "type", ActorType.DESTINATION.toString(), + "connectorDefinitionId", destination.getDestinationId().toString(), + "connectorDockerImageTag", + destinationDefinitionVersion != null ? destinationDefinitionVersion.actorDefinitionVersion().getDockerImageTag() : "", + "connectorVersionOverrideApplied", Boolean.toString(Objects.requireNonNull(destinationDefinitionVersion).isOverrideApplied()), + "connectorSupportState", destinationDefinitionVersion.actorDefinitionVersion().getSupportState().toString()); + }).toList(); // merge the two lists - final List> allConnectors = new ArrayList<>(sourceList); - allConnectors.addAll(destinationList); - return allConnectors; - } catch (final JsonValidationException | IOException | io.airbyte.data.exceptions.ConfigNotFoundException e) { + return Stream.concat(sources.stream(), destinations.stream()).toList(); + } catch (final IOException e) { LOGGER.error("Error collecting connectors information", e); - return null; + return Collections.emptyList(); } } diff --git a/airbyte-commons-server/src/test/java/io/airbyte/commons/server/handlers/DiagnosticToolHandlerTest.java b/airbyte-commons-server/src/test/java/io/airbyte/commons/server/handlers/DiagnosticToolHandlerTest.java index a7dbe3ef1b8..1b588782bd7 100644 --- a/airbyte-commons-server/src/test/java/io/airbyte/commons/server/handlers/DiagnosticToolHandlerTest.java +++ b/airbyte-commons-server/src/test/java/io/airbyte/commons/server/handlers/DiagnosticToolHandlerTest.java @@ -8,21 +8,19 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import io.airbyte.api.model.generated.ActorDefinitionVersionRead; -import io.airbyte.api.model.generated.ActorStatus; -import io.airbyte.api.model.generated.ConnectionRead; -import io.airbyte.api.model.generated.ConnectionReadList; -import io.airbyte.api.model.generated.ConnectionStatus; -import io.airbyte.api.model.generated.DestinationRead; -import io.airbyte.api.model.generated.DestinationReadList; import io.airbyte.api.model.generated.LicenseInfoResponse; import io.airbyte.api.model.generated.LicenseStatus; -import io.airbyte.api.model.generated.SourceRead; -import io.airbyte.api.model.generated.SourceReadList; -import io.airbyte.api.model.generated.SupportState; -import io.airbyte.api.model.generated.WorkspaceRead; -import io.airbyte.api.model.generated.WorkspaceReadList; +import io.airbyte.config.ActorDefinitionVersion; +import io.airbyte.config.DestinationConnection; +import io.airbyte.config.SourceConnection; +import io.airbyte.config.StandardSync; +import io.airbyte.config.StandardWorkspace; +import io.airbyte.config.persistence.ActorDefinitionVersionHelper; import io.airbyte.config.persistence.ConfigNotFoundException; +import io.airbyte.data.services.ConnectionService; +import io.airbyte.data.services.DestinationService; +import io.airbyte.data.services.SourceService; +import io.airbyte.data.services.WorkspaceService; import io.airbyte.validation.json.JsonValidationException; import io.fabric8.kubernetes.api.model.Container; import io.fabric8.kubernetes.api.model.Node; @@ -51,11 +49,11 @@ import java.util.UUID; import java.util.zip.ZipEntry; import java.util.zip.ZipInputStream; +import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; /** @@ -66,61 +64,42 @@ class DiagnosticToolHandlerTest { private DiagnosticToolHandler diagnosticToolHandler; - private WorkspacesHandler workspacesHandler; - private ConnectionsHandler connectionsHandler; - private SourceHandler sourceHandler; - private DestinationHandler destinationHandler; + private WorkspaceService workspaceService; + private ConnectionService connectionService; + private SourceService sourceService; + private DestinationService destinationService; private InstanceConfigurationHandler instanceConfigurationHandler; - private ActorDefinitionVersionHandler actorDefinitionVersionHandler; - - @Mock - private KubernetesClient mKubernetesClient; + private ActorDefinitionVersionHelper actorDefinitionVersionHelper; + private KubernetesClient kubernetesClient; @BeforeEach void beforeEach() throws JsonValidationException, IOException, ConfigNotFoundException, io.airbyte.data.exceptions.ConfigNotFoundException { - workspacesHandler = mock(WorkspacesHandler.class); - connectionsHandler = mock(ConnectionsHandler.class); - sourceHandler = mock(SourceHandler.class); - destinationHandler = mock(DestinationHandler.class); - actorDefinitionVersionHandler = mock(ActorDefinitionVersionHandler.class); + workspaceService = mock(WorkspaceService.class); + connectionService = mock(ConnectionService.class); + sourceService = mock(SourceService.class); + destinationService = mock(DestinationService.class); + actorDefinitionVersionHelper = mock(ActorDefinitionVersionHelper.class); instanceConfigurationHandler = mock(InstanceConfigurationHandler.class); - diagnosticToolHandler = - new DiagnosticToolHandler(workspacesHandler, connectionsHandler, sourceHandler, destinationHandler, actorDefinitionVersionHandler, - instanceConfigurationHandler, - Optional.of(mKubernetesClient)); + kubernetesClient = mock(KubernetesClient.class); + diagnosticToolHandler = new DiagnosticToolHandler( + workspaceService, + connectionService, + sourceService, + destinationService, + actorDefinitionVersionHelper, + instanceConfigurationHandler, + Optional.of(kubernetesClient)); // Mock workspace API responses - when(workspacesHandler.listWorkspaces()).thenReturn(new WorkspaceReadList().addWorkspacesItem( - new WorkspaceRead() - .name("workspace1") - .workspaceId(UUID.randomUUID()))); - when(connectionsHandler.listConnectionsForWorkspace(any())).thenReturn(new ConnectionReadList().addConnectionsItem( - new ConnectionRead() - .connectionId(UUID.randomUUID()) - .name("connection1") - .status(ConnectionStatus.ACTIVE) - .sourceId(UUID.randomUUID()) - .destinationId(UUID.randomUUID()))); - when(sourceHandler.listSourcesForWorkspace(any())).thenReturn(new SourceReadList().addSourcesItem( - new SourceRead() - .sourceId(UUID.randomUUID()) - .name("source1") - .sourceDefinitionId(UUID.randomUUID()) - .status(ActorStatus.ACTIVE))); - when(destinationHandler.listDestinationsForWorkspace(any())).thenReturn(new DestinationReadList().addDestinationsItem( - new DestinationRead() - .destinationId(UUID.randomUUID()) - .name("destination1") - .destinationDefinitionId(UUID.randomUUID()) - .status(ActorStatus.ACTIVE))); - - final ActorDefinitionVersionRead actorDefinitionVersion = new ActorDefinitionVersionRead() - .dockerImageTag("tag") - .isVersionOverrideApplied(true) - .supportState(SupportState.SUPPORTED); - - when(actorDefinitionVersionHandler.getActorDefinitionVersionForSourceId(any())).thenReturn(actorDefinitionVersion); - when(actorDefinitionVersionHandler.getActorDefinitionVersionForDestinationId(any())).thenReturn(actorDefinitionVersion); + final var workspace = getStandardWorkspace(); + when(workspaceService.listStandardWorkspaces(false)).thenReturn(List.of(workspace)); + when(connectionService.listWorkspaceStandardSyncs(workspace.getWorkspaceId(), false)).thenReturn(List.of(getStandardSync())); + when(sourceService.listWorkspaceSourceConnection(any())).thenReturn(List.of(getSource())); + when(sourceService.isSourceActive(any())).thenReturn(true); + when(destinationService.listWorkspaceDestinationConnection(any())).thenReturn(List.of(getDestination())); + when(destinationService.isDestinationActive(any())).thenReturn(true); + when(actorDefinitionVersionHelper.getSourceVersionWithOverrideStatus(any(), any(), any())).thenReturn(getActorDefinitionVersion()); + when(actorDefinitionVersionHelper.getDestinationVersionWithOverrideStatus(any(), any(), any())).thenReturn(getActorDefinitionVersion()); // Mock license API responses when(instanceConfigurationHandler.licenseInfo()).thenReturn(new LicenseInfoResponse() @@ -146,7 +125,7 @@ void beforeEach() throws JsonValidationException, IOException, ConfigNotFoundExc final NodeList nodeList = new NodeList(); nodeList.setItems(List.of(node1)); final NonNamespaceOperation op = mock(NonNamespaceOperation.class); - when(mKubernetesClient.nodes()).thenReturn(op); + when(kubernetesClient.nodes()).thenReturn(op); when(op.list()).thenReturn(nodeList); final Pod pod1 = new Pod(); @@ -175,7 +154,7 @@ void beforeEach() throws JsonValidationException, IOException, ConfigNotFoundExc final MixedOperation mop = mock(MixedOperation.class); final NonNamespaceOperation podNamespaceOperation = mock(NonNamespaceOperation.class); - when(mKubernetesClient.pods()).thenReturn(mop); + when(kubernetesClient.pods()).thenReturn(mop); when(mop.inNamespace("ab")).thenReturn(podNamespaceOperation); when(podNamespaceOperation.list()).thenReturn(podList); @@ -236,4 +215,35 @@ void testGenerateDiagnosticReport() throws IOException { } } + private static @NotNull StandardWorkspace getStandardWorkspace() { + return new StandardWorkspace() + .withName("workspace1") + .withWorkspaceId(UUID.randomUUID()); + } + + private static StandardSync getStandardSync() { + return new StandardSync() + .withName("connection1") + .withStatus(StandardSync.Status.ACTIVE) + .withConnectionId(UUID.randomUUID()) + .withSourceId(UUID.randomUUID()) + .withDestinationId(UUID.randomUUID()); + } + + private static SourceConnection getSource() { + return new SourceConnection() + .withSourceId(UUID.randomUUID()) + .withName("source") + .withSourceDefinitionId(UUID.randomUUID()); + } + + private static ActorDefinitionVersionHelper.@NotNull ActorDefinitionVersionWithOverrideStatus getActorDefinitionVersion() { + return new ActorDefinitionVersionHelper.ActorDefinitionVersionWithOverrideStatus( + new ActorDefinitionVersion().withDockerImageTag("tag").withSupportState(ActorDefinitionVersion.SupportState.SUPPORTED), true); + } + + private static DestinationConnection getDestination() { + return new DestinationConnection().withDestinationId(UUID.randomUUID()).withName("destination1").withDestinationDefinitionId(UUID.randomUUID()); + } + }