Skip to content

Commit

Permalink
refactor: handlers should use the data services instead of other hand…
Browse files Browse the repository at this point in the history
…lers (#14330)

Co-authored-by: keyihuang <[email protected]>
  • Loading branch information
bgroff and keyihuang committed Oct 18, 2024
1 parent 186ba39 commit 7c6ebbb
Show file tree
Hide file tree
Showing 2 changed files with 187 additions and 155 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,14 @@

package io.airbyte.commons.server.handlers;

import io.airbyte.api.model.generated.ActorDefinitionVersionRead;
import io.airbyte.api.model.generated.ActorStatus;
import io.airbyte.api.model.generated.ActorType;
import io.airbyte.api.model.generated.ConnectionReadList;
import io.airbyte.api.model.generated.DestinationIdRequestBody;
import io.airbyte.api.model.generated.DestinationReadList;
import io.airbyte.api.model.generated.LicenseInfoResponse;
import io.airbyte.api.model.generated.SourceIdRequestBody;
import io.airbyte.api.model.generated.SourceReadList;
import io.airbyte.api.model.generated.WorkspaceIdRequestBody;
import io.airbyte.api.model.generated.WorkspaceReadList;
import io.airbyte.config.persistence.ActorDefinitionVersionHelper;
import io.airbyte.data.exceptions.ConfigNotFoundException;
import io.airbyte.data.services.ConnectionService;
import io.airbyte.data.services.DestinationService;
import io.airbyte.data.services.SourceService;
import io.airbyte.data.services.WorkspaceService;
import io.airbyte.validation.json.JsonValidationException;
import io.fabric8.kubernetes.api.model.Container;
import io.fabric8.kubernetes.api.model.Node;
Expand All @@ -30,11 +26,14 @@
import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Stream;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;
import org.slf4j.Logger;
Expand All @@ -55,27 +54,27 @@ public class DiagnosticToolHandler {
public static final String DIAGNOSTIC_REPORT_FILE_NAME = "diagnostic_report";
public static final String DIAGNOSTIC_REPORT_FILE_FORMAT = ".zip";

private final WorkspacesHandler workspacesHandler;
private final ConnectionsHandler connectionsHandler;
private final SourceHandler sourceHandler;
private final DestinationHandler destinationHandler;
private final ActorDefinitionVersionHandler actorDefinitionVersionHandler;
private final WorkspaceService workspaceService;
private final ConnectionService connectionService;
private final SourceService sourceService;
private final DestinationService destinationService;
private final ActorDefinitionVersionHelper actorDefinitionVersionHelper;
private final InstanceConfigurationHandler instanceConfigurationHandler;
private final Optional<KubernetesClient> kubernetesClient;
private final DumperOptions yamlDumperOptions;

public DiagnosticToolHandler(final WorkspacesHandler workspacesHandler,
final ConnectionsHandler connectionsHandler,
final SourceHandler sourceHandler,
final DestinationHandler destinationHandler,
final ActorDefinitionVersionHandler actorDefinitionVersionHandler,
public DiagnosticToolHandler(final WorkspaceService workspaceService,
final ConnectionService connectionService,
final SourceService sourceService,
final DestinationService destinationService,
final ActorDefinitionVersionHelper actorDefinitionVersionHelper,
final InstanceConfigurationHandler instanceConfigurationHandler,
final Optional<KubernetesClient> kubernetesClient) {
this.workspacesHandler = workspacesHandler;
this.connectionsHandler = connectionsHandler;
this.sourceHandler = sourceHandler;
this.destinationHandler = destinationHandler;
this.actorDefinitionVersionHandler = actorDefinitionVersionHandler;
this.workspaceService = workspaceService;
this.connectionService = connectionService;
this.sourceService = sourceService;
this.destinationService = destinationService;
this.actorDefinitionVersionHelper = actorDefinitionVersionHelper;
this.instanceConfigurationHandler = instanceConfigurationHandler;
this.kubernetesClient = kubernetesClient;
this.yamlDumperOptions = new DumperOptions();
Expand Down Expand Up @@ -150,97 +149,120 @@ private List<Map<String, Object>> collectWorkspaceInfo() {
try {
// get all workspaces
LOGGER.info("Collecting workspaces data...");
final WorkspaceReadList workspaces = workspacesHandler.listWorkspaces();
final List<Map<String, Object>> workspaceList = workspaces.getWorkspaces().stream().map(workspace -> {
final Map<String, Object> workspaceInfo = new HashMap<>();
workspaceInfo.put("name", workspace.getName());
workspaceInfo.put("id", workspace.getWorkspaceId().toString());
workspaceInfo.put("connections", collectConnectionInfo(workspace.getWorkspaceId()));
workspaceInfo.put("connectors", collectConnectorInfo(workspace.getWorkspaceId()));
return workspaceInfo;
}).toList();
return workspaceList;
} catch (final JsonValidationException | IOException e) {
return workspaceService
.listStandardWorkspaces(false)
.stream()
.map(workspace -> Map.of(
"name", workspace.getName(),
"id", workspace.getWorkspaceId(),
"connections", Objects.requireNonNull(collectConnectionInfo(workspace.getWorkspaceId())),
"connectors", Objects.requireNonNull(collectConnectorInfo(workspace.getWorkspaceId()))))
.toList();
} catch (final IOException e) {
LOGGER.error("Error collecting workspace information", e);
return null;
}
}

private List<Map<String, Object>> collectConnectionInfo(final UUID workspaceId) {
private List<Map<String, String>> collectConnectionInfo(final UUID workspaceId) {
try {
LOGGER.info("Collecting connections data...");
// get all connections by workspaceId
final ConnectionReadList connections = connectionsHandler.listConnectionsForWorkspace(new WorkspaceIdRequestBody().workspaceId(workspaceId));
final List<Map<String, Object>> connectionList = connections.getConnections().stream().map(connection -> {
final Map<String, Object> connectionInfo = new HashMap<>();
connectionInfo.put("name", connection.getName());
connectionInfo.put("id", connection.getConnectionId().toString());
connectionInfo.put("status", connection.getStatus().toString());
connectionInfo.put("sourceId", connection.getSourceId().toString());
connectionInfo.put("destinationId", connection.getDestinationId().toString());
return connectionInfo;
}).toList();
return connectionList;
} catch (final JsonValidationException | ConfigNotFoundException | IOException e) {
return connectionService
.listWorkspaceStandardSyncs(workspaceId, false)
.stream()
.map(connection -> Map.of(
"name", connection.getName(),
"id", connection.getConnectionId().toString(),
"status", connection.getStatus().toString(),
"sourceId", connection.getSourceId().toString(),
"destinationId", connection.getDestinationId().toString()))
.toList();
} catch (final IOException e) {
LOGGER.error("Error collecting connection information", e);
return null;
return Collections.emptyList();
}
}

private List<Map<String, Object>> collectConnectorInfo(final UUID workspaceId) {
private List<Map<String, String>> collectConnectorInfo(final UUID workspaceId) {
try {
LOGGER.info("Collecting connectors data...");
// get all sources by workspaceId (only include active ones in the report)
final SourceReadList sources = sourceHandler.listSourcesForWorkspace(new WorkspaceIdRequestBody().workspaceId(workspaceId));
final List<Map<String, Object>> sourceList = sources.getSources().stream().filter(source -> ActorStatus.ACTIVE.equals(source.getStatus()))
final List<Map<String, String>> sources = sourceService
.listWorkspaceSourceConnection(workspaceId)
.stream()
.filter(source -> {
// TODO: isSourceActive feels like it could not throw and just return false if the config is not
// found.
try {
return sourceService.isSourceActive(source.getSourceId());
} catch (IOException e) {
return false;
}
})
.map(source -> {
final Map<String, Object> connectionInfo = new HashMap<>();
connectionInfo.put("name", source.getName());
connectionInfo.put("id", source.getSourceId().toString());
connectionInfo.put("type", ActorType.SOURCE.toString());
connectionInfo.put("connectorDefinitionId", source.getSourceDefinitionId().toString());
ActorDefinitionVersionHelper.ActorDefinitionVersionWithOverrideStatus sourceDefinitionVersion = null;
try {
final ActorDefinitionVersionRead sourceDefinitionVersion =
actorDefinitionVersionHandler.getActorDefinitionVersionForSourceId(new SourceIdRequestBody().sourceId(source.getSourceId()));
connectionInfo.put("connectorDockerImageTag", sourceDefinitionVersion.getDockerImageTag());
connectionInfo.put("connectorVersionOverrideApplied", sourceDefinitionVersion.getIsVersionOverrideApplied());
connectionInfo.put("connectorSupportState", sourceDefinitionVersion.getSupportState().toString());
} catch (final JsonValidationException | IOException | ConfigNotFoundException
| io.airbyte.config.persistence.ConfigNotFoundException e) {
sourceDefinitionVersion = actorDefinitionVersionHelper.getSourceVersionWithOverrideStatus(
sourceService.getSourceDefinitionFromSource(source.getSourceDefinitionId()),
workspaceId,
source.getSourceId());
} catch (IOException | JsonValidationException | ConfigNotFoundException e) {
LOGGER.error("Error collecting source version information", e);
}
return connectionInfo;
}).toList();

return Map.of(
"name", source.getName(),
"id", source.getSourceId().toString(),
"type", ActorType.SOURCE.toString(),
"connectorDefinitionId", source.getSourceDefinitionId().toString(),
"connectorDockerImageTag",
sourceDefinitionVersion != null ? sourceDefinitionVersion.actorDefinitionVersion().getDockerImageTag() : "",
"connectorVersionOverrideApplied", Boolean.toString(Objects.requireNonNull(sourceDefinitionVersion).isOverrideApplied()),
"connectorSupportState", sourceDefinitionVersion.actorDefinitionVersion().getSupportState().toString());

})
.toList();

// get all destinations by workspaceId (only include active ones in the report)
final DestinationReadList destinations = destinationHandler.listDestinationsForWorkspace(new WorkspaceIdRequestBody().workspaceId(workspaceId));
final List<Map<String, Object>> destinationList =
destinations.getDestinations().stream().filter(destination -> ActorStatus.ACTIVE.equals(destination.getStatus()))
.map(destination -> {
final Map<String, Object> connectionInfo = new HashMap<>();
connectionInfo.put("name", destination.getName());
connectionInfo.put("id", destination.getDestinationId().toString());
connectionInfo.put("type", ActorType.DESTINATION.toString());
connectionInfo.put("connectorDefinitionId", destination.getDestinationId().toString());
try {
final ActorDefinitionVersionRead destinationDefinitionVersion = actorDefinitionVersionHandler
.getActorDefinitionVersionForDestinationId(new DestinationIdRequestBody().destinationId(destination.getDestinationId()));
connectionInfo.put("connectorDockerImageTag", destinationDefinitionVersion.getDockerImageTag());
connectionInfo.put("connectorVersionOverrideApplied", destinationDefinitionVersion.getIsVersionOverrideApplied());
connectionInfo.put("connectorSupportState", destinationDefinitionVersion.getSupportState().toString());
} catch (final JsonValidationException | IOException | ConfigNotFoundException
| io.airbyte.config.persistence.ConfigNotFoundException e) {
LOGGER.error("Error collecting destination version information", e);
}
return connectionInfo;
}).toList();
final List<Map<String, String>> destinations = destinationService
.listWorkspaceDestinationConnection(workspaceId)
.stream()
.filter(destination -> {
// TODO: isDestinationActive feels like it could not throw and just return false if the config is
// not found.
try {
return destinationService.isDestinationActive(destination.getDestinationId());
} catch (IOException e) {
return false;
}
})
.map(destination -> {
ActorDefinitionVersionHelper.ActorDefinitionVersionWithOverrideStatus destinationDefinitionVersion = null;
try {
destinationDefinitionVersion = actorDefinitionVersionHelper.getDestinationVersionWithOverrideStatus(
destinationService.getStandardDestinationDefinition(destination.getDestinationId()),
workspaceId,
destination.getDestinationId());
} catch (IOException | JsonValidationException | ConfigNotFoundException e) {
LOGGER.error("Error collecting destination version information", e);
}

return Map.of(
"name", destination.getName(),
"id", destination.getDestinationId().toString(),
"type", ActorType.DESTINATION.toString(),
"connectorDefinitionId", destination.getDestinationId().toString(),
"connectorDockerImageTag",
destinationDefinitionVersion != null ? destinationDefinitionVersion.actorDefinitionVersion().getDockerImageTag() : "",
"connectorVersionOverrideApplied", Boolean.toString(Objects.requireNonNull(destinationDefinitionVersion).isOverrideApplied()),
"connectorSupportState", destinationDefinitionVersion.actorDefinitionVersion().getSupportState().toString());
}).toList();
// merge the two lists
final List<Map<String, Object>> allConnectors = new ArrayList<>(sourceList);
allConnectors.addAll(destinationList);
return allConnectors;
} catch (final JsonValidationException | IOException | io.airbyte.data.exceptions.ConfigNotFoundException e) {
return Stream.concat(sources.stream(), destinations.stream()).toList();
} catch (final IOException e) {
LOGGER.error("Error collecting connectors information", e);
return null;
return Collections.emptyList();
}
}

Expand Down
Loading

0 comments on commit 7c6ebbb

Please sign in to comment.