Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kotlin coroutines prototype #1792

Draft
wants to merge 6 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
buildscript {
ext {
palantirGitVersionVersion = "${JavaVersion.current().isCompatibleWith(JavaVersion.VERSION_11) ? '0.15.0' : '0.13.0'}"
kotlinVersion = "${project.hasProperty("edgeDepsTest") ? '1.8.20' : (JavaVersion.current().isCompatibleWith(JavaVersion.VERSION_16) ? '1.5.32' : '1.4.32')}"
kotlinVersion = '1.8.20' //"${project.hasProperty("edgeDepsTest") ? '1.8.20' : (JavaVersion.current().isCompatibleWith(JavaVersion.VERSION_16) ? '1.5.32' : '1.4.32')}"
}
}

Expand Down
3 changes: 2 additions & 1 deletion temporal-kotlin/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ dependencies {
// this module shouldn't carry temporal-sdk with it, especially for situations when users may be using a shaded artifact
compileOnly project(':temporal-sdk')

implementation "org.jetbrains.kotlin:kotlin-reflect"
implementation "org.jetbrains.kotlin:kotlin-reflect:1.8.20"
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.7.1")

implementation "com.fasterxml.jackson.datatype:jackson-datatype-jsr310"
implementation "com.fasterxml.jackson.datatype:jackson-datatype-jdk8"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this material except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.internal.async

import io.temporal.kotlin.interceptors.WorkflowInboundCallsInterceptor
import io.temporal.kotlin.interceptors.WorkflowOutboundCallsInterceptor

/**
* Provides core functionality for a root WorkflowInboundCallsInterceptor that is reused by specific
* root RootWorkflowInboundCallsInterceptor implementations inside [ ] and [POJOWorkflowImplementationFactory]
*
*
* Root `WorkflowInboundCallsInterceptor` is an interceptor that should be at the end of
* the [WorkflowInboundCallsInterceptor] interceptors chain and which encapsulates calls into
* Temporal internals while providing a WorkflowInboundCallsInterceptor interface for chaining on
* top of it.
*/
abstract class BaseRootKotlinWorkflowInboundCallsInterceptor(protected val workflowContext: KotlinWorkflowContext) :
WorkflowInboundCallsInterceptor {
override suspend fun init(outboundCalls: WorkflowOutboundCallsInterceptor) {
workflowContext.initHeadOutboundCallsInterceptor(outboundCalls)
}

override suspend fun handleSignal(input: WorkflowInboundCallsInterceptor.SignalInput) {
TODO("Not yet implemented")
// workflowContext.handleInterceptedSignal(input)
}

override fun handleQuery(input: WorkflowInboundCallsInterceptor.QueryInput): WorkflowInboundCallsInterceptor.QueryOutput {
TODO("Implement")
// return workflowContext.handleInterceptedQuery(input)
}

override fun validateUpdate(input: WorkflowInboundCallsInterceptor.UpdateInput) {
TODO("Implement")
// workflowContext.handleInterceptedValidateUpdate(input)
}

override suspend fun executeUpdate(input: WorkflowInboundCallsInterceptor.UpdateInput): WorkflowInboundCallsInterceptor.UpdateOutput {
TODO("Implement")
// return workflowContext.handleInterceptedExecuteUpdate(input)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this material except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.internal.async

import io.temporal.api.common.v1.Payloads
import io.temporal.common.converter.DataConverter
import io.temporal.common.converter.EncodedValues
import io.temporal.common.converter.Values
import io.temporal.common.interceptors.Header
import io.temporal.internal.sync.WorkflowInternal
import io.temporal.kotlin.interceptors.KotlinWorkerInterceptor
import io.temporal.kotlin.interceptors.WorkflowInboundCallsInterceptor
import io.temporal.kotlin.interceptors.WorkflowOutboundCallsInterceptor
import io.temporal.kotlin.workflow.KotlinDynamicWorkflow
import io.temporal.workflow.Functions.Func

internal class DynamicKotlinWorkflowDefinition(
private val factory: Func<out KotlinDynamicWorkflow>,
private val workerInterceptors: Array<KotlinWorkerInterceptor>,
private val dataConverter: DataConverter
) : KotlinWorkflowDefinition {
private var workflowInvoker: WorkflowInboundCallsInterceptor? = null

override suspend fun initialize() {
val workflowContext: KotlinWorkflowContext = KotlinWorkflowInternal.getRootWorkflowContext()
workflowInvoker = RootWorkflowInboundCallsInterceptor(workflowContext)
for (workerInterceptor in workerInterceptors) {
workflowInvoker = workerInterceptor.interceptWorkflow(workflowInvoker!!)
}
workflowContext.initHeadInboundCallsInterceptor(workflowInvoker!!)
workflowInvoker!!.init(workflowContext)
}

override suspend fun execute(header: Header?, input: Payloads?): Payloads? {
val args: Values = EncodedValues(input, dataConverter)
val result = workflowInvoker!!.execute(
WorkflowInboundCallsInterceptor.WorkflowInput(header!!, arrayOf(args))
)
return dataConverter.toPayloads(result.result).orElse(null)
}

internal inner class RootWorkflowInboundCallsInterceptor(workflowContext: KotlinWorkflowContext?) :
BaseRootKotlinWorkflowInboundCallsInterceptor(
workflowContext!!
) {
private var workflow: KotlinDynamicWorkflow? = null

override suspend fun init(outboundCalls: WorkflowOutboundCallsInterceptor) {
super.init(outboundCalls)
newInstance()
WorkflowInternal.registerListener(workflow)
}

override suspend fun execute(input: WorkflowInboundCallsInterceptor.WorkflowInput): WorkflowInboundCallsInterceptor.WorkflowOutput {
val result = workflow!!.execute(input.arguments[0] as EncodedValues)
return WorkflowInboundCallsInterceptor.WorkflowOutput(result)
}

private fun newInstance() {
check(workflow == null) { "Already called" }
workflow = factory.apply()
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this material except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.internal.async

import java.lang.reflect.Type

/**
* KotlinActivityStub is used to call an activity without referencing an interface it implements. This is
* useful to call activities when their type is not known at compile time or to execute activities
* implemented in other languages. Created through [Workflow.newActivityStub].
*/
interface KotlinActivityStub {
/**
* Executes an activity by its type name and arguments. Blocks until the activity completion.
*
* @param activityName name of an activity type to execute.
* @param resultClass the expected return type of the activity. Use Void.class for activities that
* return void type.
* @param args arguments of the activity.
* @param <R> return type.
* @return an activity result.
</R> */
suspend fun <R> execute(activityName: String, resultClass: Class<R>, vararg args: Any): R?

/**
* Executes an activity by its type name and arguments. Blocks until the activity completion.
*
* @param activityName name of an activity type to execute.
* @param resultClass the expected return class of the activity. Use Void.class for activities
* that return void type.
* @param resultType the expected return type of the activity. Differs from resultClass for
* generic types.
* @param args arguments of the activity.
* @param <R> return type.
* @return an activity result.
</R> */
suspend fun <R> execute(activityName: String, resultClass: Class<R>, resultType: Type, vararg args: Any): R?
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this material except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.internal.async

import io.temporal.activity.ActivityOptions
import io.temporal.common.interceptors.Header
import io.temporal.kotlin.interceptors.WorkflowOutboundCallsInterceptor
import java.lang.reflect.Type

internal class KotlinActivityStubImpl(
options: ActivityOptions?,
private val activityExecutor: WorkflowOutboundCallsInterceptor
) : KotlinActivityStub {

private val options: ActivityOptions = ActivityOptions.newBuilder(options).validateAndBuildWithDefaults()

override suspend fun <R> execute(activityName: String, resultClass: Class<R>, vararg args: Any): R? {
return activityExecutor
.executeActivity(
WorkflowOutboundCallsInterceptor.ActivityInput(
activityName,
resultClass,
resultClass,
args,
options,
Header.empty()
)
).result
}

override suspend fun <R> execute(
activityName: String,
resultClass: Class<R>,
resultType: Type,
vararg args: Any
): R? {
return activityExecutor
.executeActivity(
WorkflowOutboundCallsInterceptor.ActivityInput(
activityName,
resultClass,
resultType,
args,
options,
Header.empty()
)
).result
}
}
Loading