Skip to content

Commit

Permalink
Create public API thread pool (#12178)
Browse files Browse the repository at this point in the history
  • Loading branch information
JonsSpaghetti committed Apr 19, 2024
1 parent 0340b27 commit 1487103
Show file tree
Hide file tree
Showing 9 changed files with 42 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,10 @@ public interface AirbyteTaskExecutors extends TaskExecutors {
*/
String SCHEDULER = "scheduler";

/**
* The name of the {@link java.util.concurrent.ExecutorService} used for endpoints that belong to
* the public API.
*/
String PUBLIC_API = "public-api";

}
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ open class ConnectionsController(
private val apiAuthorizationHelper: ApiAuthorizationHelper,
private val currentUserService: CurrentUserService,
) : PublicConnectionsApi {
@ExecuteOn(AirbyteTaskExecutors.IO)
@ExecuteOn(AirbyteTaskExecutors.PUBLIC_API)
override fun publicCreateConnection(connectionCreateRequest: ConnectionCreateRequest): Response {
val userId: UUID = currentUserService.currentUser.userId
apiAuthorizationHelper.checkWorkspacePermissions(
Expand Down Expand Up @@ -177,7 +177,7 @@ open class ConnectionsController(
}

@Path("/{connectionId}")
@ExecuteOn(AirbyteTaskExecutors.IO)
@ExecuteOn(AirbyteTaskExecutors.PUBLIC_API)
override fun publicDeleteConnection(connectionId: UUID): Response {
val userId: UUID = currentUserService.currentUser.userId
apiAuthorizationHelper.checkWorkspacePermissions(
Expand Down Expand Up @@ -210,7 +210,7 @@ open class ConnectionsController(
}

@Path("/{connectionId}")
@ExecuteOn(AirbyteTaskExecutors.IO)
@ExecuteOn(AirbyteTaskExecutors.PUBLIC_API)
override fun publicGetConnection(connectionId: UUID): Response {
val userId: UUID = currentUserService.currentUser.userId
apiAuthorizationHelper.checkWorkspacePermissions(
Expand All @@ -237,7 +237,7 @@ open class ConnectionsController(
.build()
}

@ExecuteOn(AirbyteTaskExecutors.IO)
@ExecuteOn(AirbyteTaskExecutors.PUBLIC_API)
override fun listConnections(
workspaceIds: List<UUID>?,
includeDeleted: Boolean?,
Expand Down Expand Up @@ -275,7 +275,7 @@ open class ConnectionsController(

@Patch
@Path("/{connectionId}")
@ExecuteOn(AirbyteTaskExecutors.IO)
@ExecuteOn(AirbyteTaskExecutors.PUBLIC_API)
override fun patchConnection(
@PathParam(value = "connectionId") connectionId: UUID,
@Valid @Body @NotNull connectionPatchRequest:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ open class DefaultController() : PublicRootApi {
@Value("\${airbyte.internal.documentation.host}")
var documentationHost: String? = null

@ExecuteOn(AirbyteTaskExecutors.IO)
@ExecuteOn(AirbyteTaskExecutors.PUBLIC_API)
override fun getDocumentation(): Response {
return Response
.status(302)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ open class DestinationsController(
private val apiAuthorizationHelper: ApiAuthorizationHelper,
private val currentUserService: CurrentUserService,
) : PublicDestinationsApi {
@ExecuteOn(AirbyteTaskExecutors.IO)
@ExecuteOn(AirbyteTaskExecutors.PUBLIC_API)
override fun publicCreateDestination(destinationCreateRequest: DestinationCreateRequest): Response {
val userId: UUID = currentUserService.currentUser.userId
apiAuthorizationHelper.checkWorkspacePermissions(
Expand Down Expand Up @@ -99,7 +99,7 @@ open class DestinationsController(
.build()
}

@ExecuteOn(AirbyteTaskExecutors.IO)
@ExecuteOn(AirbyteTaskExecutors.PUBLIC_API)
override fun publicDeleteDestination(destinationId: UUID): Response {
val userId: UUID = currentUserService.currentUser.userId
apiAuthorizationHelper.checkWorkspacePermissions(
Expand Down Expand Up @@ -131,7 +131,7 @@ open class DestinationsController(
.build()
}

@ExecuteOn(AirbyteTaskExecutors.IO)
@ExecuteOn(AirbyteTaskExecutors.PUBLIC_API)
override fun publicGetDestination(destinationId: UUID): Response {
val userId: UUID = currentUserService.currentUser.userId
apiAuthorizationHelper.checkWorkspacePermissions(
Expand Down Expand Up @@ -163,7 +163,7 @@ open class DestinationsController(
.build()
}

@ExecuteOn(AirbyteTaskExecutors.IO)
@ExecuteOn(AirbyteTaskExecutors.PUBLIC_API)
override fun listDestinations(
workspaceIds: MutableList<UUID>?,
includeDeleted: Boolean?,
Expand Down Expand Up @@ -201,7 +201,7 @@ open class DestinationsController(

@Path("/{destinationId}")
@Patch
@ExecuteOn(AirbyteTaskExecutors.IO)
@ExecuteOn(AirbyteTaskExecutors.PUBLIC_API)
override fun patchDestination(
destinationId: UUID,
destinationPatchRequest: DestinationPatchRequest,
Expand Down Expand Up @@ -241,7 +241,7 @@ open class DestinationsController(
}

@Path("/{destinationId}")
@ExecuteOn(AirbyteTaskExecutors.IO)
@ExecuteOn(AirbyteTaskExecutors.PUBLIC_API)
override fun putDestination(
destinationId: UUID,
destinationPutRequest: DestinationPutRequest,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ open class JobsController(
) : PublicJobsApi {
@DELETE
@Path("/{jobId}")
@ExecuteOn(AirbyteTaskExecutors.IO)
@ExecuteOn(AirbyteTaskExecutors.PUBLIC_API)
override fun publicCancelJob(
@PathParam("jobId") jobId: Long,
): Response {
Expand Down Expand Up @@ -83,7 +83,7 @@ open class JobsController(
.build()
}

@ExecuteOn(AirbyteTaskExecutors.IO)
@ExecuteOn(AirbyteTaskExecutors.PUBLIC_API)
override fun publicCreateJob(jobCreateRequest: JobCreateRequest): Response {
val userId: UUID = currentUserService.currentUser.userId
apiAuthorizationHelper.checkWorkspacePermissions(
Expand Down Expand Up @@ -160,7 +160,7 @@ open class JobsController(

@GET
@Path("/{jobId}")
@ExecuteOn(AirbyteTaskExecutors.IO)
@ExecuteOn(AirbyteTaskExecutors.PUBLIC_API)
override fun getJob(
@PathParam("jobId") jobId: Long,
): Response {
Expand Down Expand Up @@ -195,7 +195,7 @@ open class JobsController(
.build()
}

@ExecuteOn(AirbyteTaskExecutors.IO)
@ExecuteOn(AirbyteTaskExecutors.PUBLIC_API)
override fun listJobs(
connectionId: UUID?,
limit: Int?,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ open class SourcesController(
private val apiAuthorizationHelper: ApiAuthorizationHelper,
private val currentUserService: CurrentUserService,
) : PublicSourcesApi {
@ExecuteOn(AirbyteTaskExecutors.IO)
@ExecuteOn(AirbyteTaskExecutors.PUBLIC_API)
override fun publicCreateSource(sourceCreateRequest: SourceCreateRequest): Response {
val userId: UUID = currentUserService.currentUser.userId
apiAuthorizationHelper.checkWorkspacePermissions(
Expand Down Expand Up @@ -102,7 +102,7 @@ open class SourcesController(
.build()
}

@ExecuteOn(AirbyteTaskExecutors.IO)
@ExecuteOn(AirbyteTaskExecutors.PUBLIC_API)
override fun publicDeleteSource(sourceId: UUID): Response {
val userId: UUID = currentUserService.currentUser.userId
apiAuthorizationHelper.checkWorkspacePermissions(
Expand Down Expand Up @@ -135,7 +135,7 @@ open class SourcesController(
.build()
}

@ExecuteOn(AirbyteTaskExecutors.IO)
@ExecuteOn(AirbyteTaskExecutors.PUBLIC_API)
override fun publicGetSource(sourceId: UUID): Response {
val userId: UUID = currentUserService.currentUser.userId
apiAuthorizationHelper.checkWorkspacePermissions(
Expand Down Expand Up @@ -168,7 +168,7 @@ open class SourcesController(
.build()
}

@ExecuteOn(AirbyteTaskExecutors.IO)
@ExecuteOn(AirbyteTaskExecutors.PUBLIC_API)
override fun initiateOAuth(initiateOauthRequest: InitiateOauthRequest): Response {
val userId: UUID = currentUserService.currentUser.userId
apiAuthorizationHelper.checkWorkspacePermissions(
Expand All @@ -180,7 +180,7 @@ open class SourcesController(
return sourceService.controllerInitiateOAuth(initiateOauthRequest)
}

@ExecuteOn(AirbyteTaskExecutors.IO)
@ExecuteOn(AirbyteTaskExecutors.PUBLIC_API)
override fun listSources(
workspaceIds: MutableList<UUID>?,
includeDeleted: Boolean?,
Expand Down Expand Up @@ -219,7 +219,7 @@ open class SourcesController(

@Patch
@Path("/{sourceId}")
@ExecuteOn(AirbyteTaskExecutors.IO)
@ExecuteOn(AirbyteTaskExecutors.PUBLIC_API)
override fun patchSource(
sourceId: UUID,
sourcePatchRequest: SourcePatchRequest,
Expand Down Expand Up @@ -259,7 +259,7 @@ open class SourcesController(
}

@Path("/{sourceId}")
@ExecuteOn(AirbyteTaskExecutors.IO)
@ExecuteOn(AirbyteTaskExecutors.PUBLIC_API)
override fun putSource(
sourceId: UUID,
sourcePutRequest: SourcePutRequest,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class StreamsController(
private val log: org.slf4j.Logger? = LoggerFactory.getLogger(StreamsController::class.java)
}

@ExecuteOn(AirbyteTaskExecutors.IO)
@ExecuteOn(AirbyteTaskExecutors.PUBLIC_API)
override fun getStreamProperties(
sourceId: UUID,
destinationId: UUID?,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ open class WorkspacesController(
private val currentUserService: CurrentUserService,
) : PublicWorkspacesApi {
@Path("/{workspaceId}/oauthCredentials")
@ExecuteOn(AirbyteTaskExecutors.IO)
@ExecuteOn(AirbyteTaskExecutors.PUBLIC_API)
override fun createOrUpdateWorkspaceOAuthCredentials(
workspaceId: UUID?,
workspaceOAuthCredentialsRequest: WorkspaceOAuthCredentialsRequest?,
Expand All @@ -55,7 +55,7 @@ open class WorkspacesController(
)
}

@ExecuteOn(AirbyteTaskExecutors.IO)
@ExecuteOn(AirbyteTaskExecutors.PUBLIC_API)
override fun publicCreateWorkspace(workspaceCreateRequest: WorkspaceCreateRequest?): Response {
// Now that we have orgs everywhere, ensure the user is at least an organization editor
apiAuthorizationHelper.ensureUserHasAnyRequiredRoleOrThrow(
Expand All @@ -67,7 +67,7 @@ open class WorkspacesController(
}

@Path("/{workspaceId}")
@ExecuteOn(AirbyteTaskExecutors.IO)
@ExecuteOn(AirbyteTaskExecutors.PUBLIC_API)
override fun publicDeleteWorkspace(workspaceId: UUID?): Response {
val userId: UUID = currentUserService.currentUser.userId
apiAuthorizationHelper.checkWorkspacePermissions(
Expand All @@ -80,7 +80,7 @@ open class WorkspacesController(
}

@Path("/{workspaceId}")
@ExecuteOn(AirbyteTaskExecutors.IO)
@ExecuteOn(AirbyteTaskExecutors.PUBLIC_API)
override fun publicGetWorkspace(workspaceId: UUID?): Response {
val userId: UUID = currentUserService.currentUser.userId
apiAuthorizationHelper.checkWorkspacePermissions(
Expand All @@ -92,7 +92,7 @@ open class WorkspacesController(
return workspaceService.controllerGetWorkspace(workspaceId)
}

@ExecuteOn(AirbyteTaskExecutors.IO)
@ExecuteOn(AirbyteTaskExecutors.PUBLIC_API)
override fun publicListWorkspaces(
workspaceIds: MutableList<UUID>?,
includeDeleted: Boolean?,
Expand All @@ -117,7 +117,7 @@ open class WorkspacesController(

@Patch
@Path("/{workspaceId}")
@ExecuteOn(AirbyteTaskExecutors.IO)
@ExecuteOn(AirbyteTaskExecutors.PUBLIC_API)
override fun publicUpdateWorkspace(
workspaceId: UUID?,
workspaceUpdateRequest: WorkspaceUpdateRequest?,
Expand Down
8 changes: 6 additions & 2 deletions airbyte-server/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,16 @@ micronaut:
env:
cloud-deduction: true
executors:
# We set our max pool size for the config DB to 20 in 10-values.yml files at the time of writing this.
health:
type: fixed
n-threads: ${HEALTH_TASK_EXECUTOR_THREADS:10} # Match the data source max pool size below
n-threads: ${HEALTH_TASK_EXECUTOR_THREADS:3}
io:
type: fixed
n-threads: ${IO_TASK_EXECUTOR_THREADS:10} # Match the data source max pool size below
n-threads: ${IO_TASK_EXECUTOR_THREADS:10}
public-api:
type: fixed
n-threads: ${PUBLIC_API_EXECUTOR_THREADS:5}
scheduler:
type: fixed
n-threads: ${SCHEDULER_TASK_EXECUTOR_THREADS:25}
Expand Down

0 comments on commit 1487103

Please sign in to comment.