Skip to content

Commit f5a623d

Browse files
authored
[Fix #1002] Implementing custom inline function call (#1012)
* [Fix #1002] Adding custom function Signed-off-by: fjtirado <[email protected]> * [Fix #1002] Moving test files Signed-off-by: fjtirado <[email protected]> * [Fix #1002] Adding test Signed-off-by: fjtirado <[email protected]> --------- Signed-off-by: fjtirado <[email protected]>
1 parent 68d85d8 commit f5a623d

File tree

54 files changed

+677
-306
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

54 files changed

+677
-306
lines changed

experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaConsumerCallExecutor.java

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,23 +21,28 @@
2121
import io.serverlessworkflow.impl.WorkflowContext;
2222
import io.serverlessworkflow.impl.WorkflowDefinition;
2323
import io.serverlessworkflow.impl.WorkflowModel;
24+
import io.serverlessworkflow.impl.WorkflowMutablePosition;
2425
import io.serverlessworkflow.impl.executors.CallableTask;
26+
import io.serverlessworkflow.impl.executors.CallableTaskBuilder;
2527
import java.util.Optional;
2628
import java.util.concurrent.CompletableFuture;
2729
import java.util.function.Consumer;
2830

29-
public class JavaConsumerCallExecutor<T> implements CallableTask<CallJava.CallJavaConsumer<T>> {
31+
public class JavaConsumerCallExecutor<T>
32+
implements CallableTaskBuilder<CallJava.CallJavaConsumer<T>> {
3033

3134
private Consumer<T> consumer;
32-
private Optional<Class<T>> inputClass = Optional.empty();
35+
private Optional<Class<T>> inputClass;
3336

34-
public void init(CallJava.CallJavaConsumer<T> task, WorkflowDefinition definition) {
37+
public void init(
38+
CallJava.CallJavaConsumer<T> task,
39+
WorkflowDefinition definition,
40+
WorkflowMutablePosition position) {
3541
consumer = task.consumer();
3642
inputClass = task.inputClass();
3743
}
3844

39-
@Override
40-
public CompletableFuture<WorkflowModel> apply(
45+
private CompletableFuture<WorkflowModel> apply(
4146
WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input) {
4247
T typed = JavaFuncUtils.convertT(input, inputClass);
4348
consumer.accept(typed);
@@ -48,4 +53,9 @@ public CompletableFuture<WorkflowModel> apply(
4853
public boolean accept(Class<? extends TaskBase> clazz) {
4954
return CallJava.CallJavaConsumer.class.isAssignableFrom(clazz);
5055
}
56+
57+
@Override
58+
public CallableTask build() {
59+
return this::apply;
60+
}
5161
}

experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaContextFunctionCallExecutor.java

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,29 +17,44 @@
1717

1818
import io.serverlessworkflow.api.types.TaskBase;
1919
import io.serverlessworkflow.api.types.func.CallJava;
20+
import io.serverlessworkflow.api.types.func.CallJava.CallJavaContextFunction;
2021
import io.serverlessworkflow.api.types.func.JavaContextFunction;
2122
import io.serverlessworkflow.impl.TaskContext;
2223
import io.serverlessworkflow.impl.WorkflowContext;
2324
import io.serverlessworkflow.impl.WorkflowDefinition;
2425
import io.serverlessworkflow.impl.WorkflowModel;
26+
import io.serverlessworkflow.impl.WorkflowMutablePosition;
2527
import io.serverlessworkflow.impl.executors.CallableTask;
28+
import io.serverlessworkflow.impl.executors.CallableTaskBuilder;
2629
import java.util.Optional;
2730
import java.util.concurrent.CompletableFuture;
2831

2932
public class JavaContextFunctionCallExecutor<T, V>
30-
implements CallableTask<CallJava.CallJavaContextFunction<T, V>> {
33+
implements CallableTaskBuilder<CallJava.CallJavaContextFunction<T, V>> {
3134

3235
private JavaContextFunction<T, V> function;
3336
private Optional<Class<T>> inputClass;
3437

3538
@Override
36-
public void init(CallJava.CallJavaContextFunction<T, V> task, WorkflowDefinition definition) {
39+
public boolean accept(Class<? extends TaskBase> clazz) {
40+
return CallJava.CallJavaContextFunction.class.isAssignableFrom(clazz);
41+
}
42+
43+
@Override
44+
public void init(
45+
CallJavaContextFunction<T, V> task,
46+
WorkflowDefinition definition,
47+
WorkflowMutablePosition position) {
3748
this.function = task.function();
3849
this.inputClass = task.inputClass();
3950
}
4051

4152
@Override
42-
public CompletableFuture<WorkflowModel> apply(
53+
public CallableTask build() {
54+
return this::apply;
55+
}
56+
57+
private CompletableFuture<WorkflowModel> apply(
4358
WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input) {
4459
return CompletableFuture.completedFuture(
4560
workflowContext
@@ -49,9 +64,4 @@ public CompletableFuture<WorkflowModel> apply(
4964
.fromAny(
5065
input, function.apply(JavaFuncUtils.convertT(input, inputClass), workflowContext)));
5166
}
52-
53-
@Override
54-
public boolean accept(Class<? extends TaskBase> clazz) {
55-
return CallJava.CallJavaContextFunction.class.isAssignableFrom(clazz);
56-
}
5767
}

experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaFilterFunctionCallExecutor.java

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,29 +17,25 @@
1717

1818
import io.serverlessworkflow.api.types.TaskBase;
1919
import io.serverlessworkflow.api.types.func.CallJava;
20+
import io.serverlessworkflow.api.types.func.CallJava.CallJavaFilterFunction;
2021
import io.serverlessworkflow.api.types.func.JavaFilterFunction;
2122
import io.serverlessworkflow.impl.TaskContext;
2223
import io.serverlessworkflow.impl.WorkflowContext;
2324
import io.serverlessworkflow.impl.WorkflowDefinition;
2425
import io.serverlessworkflow.impl.WorkflowModel;
26+
import io.serverlessworkflow.impl.WorkflowMutablePosition;
2527
import io.serverlessworkflow.impl.executors.CallableTask;
28+
import io.serverlessworkflow.impl.executors.CallableTaskBuilder;
2629
import java.util.Optional;
2730
import java.util.concurrent.CompletableFuture;
2831

2932
public class JavaFilterFunctionCallExecutor<T, V>
30-
implements CallableTask<CallJava.CallJavaFilterFunction<T, V>> {
33+
implements CallableTaskBuilder<CallJava.CallJavaFilterFunction<T, V>> {
3134

3235
private JavaFilterFunction<T, V> function;
3336
private Optional<Class<T>> inputClass;
3437

35-
@Override
36-
public void init(CallJava.CallJavaFilterFunction<T, V> task, WorkflowDefinition definition) {
37-
this.function = task.function();
38-
this.inputClass = task.inputClass();
39-
}
40-
41-
@Override
42-
public CompletableFuture<WorkflowModel> apply(
38+
private CompletableFuture<WorkflowModel> apply(
4339
WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input) {
4440
return CompletableFuture.completedFuture(
4541
workflowContext
@@ -56,4 +52,18 @@ public CompletableFuture<WorkflowModel> apply(
5652
public boolean accept(Class<? extends TaskBase> clazz) {
5753
return CallJava.CallJavaFilterFunction.class.isAssignableFrom(clazz);
5854
}
55+
56+
@Override
57+
public void init(
58+
CallJavaFilterFunction<T, V> task,
59+
WorkflowDefinition definition,
60+
WorkflowMutablePosition position) {
61+
this.function = task.function();
62+
this.inputClass = task.inputClass();
63+
}
64+
65+
@Override
66+
public CallableTask build() {
67+
return this::apply;
68+
}
5969
}

experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaFunctionCallExecutor.java

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,28 +17,25 @@
1717

1818
import io.serverlessworkflow.api.types.TaskBase;
1919
import io.serverlessworkflow.api.types.func.CallJava;
20+
import io.serverlessworkflow.api.types.func.CallJava.CallJavaFunction;
2021
import io.serverlessworkflow.impl.TaskContext;
2122
import io.serverlessworkflow.impl.WorkflowContext;
2223
import io.serverlessworkflow.impl.WorkflowDefinition;
2324
import io.serverlessworkflow.impl.WorkflowModel;
25+
import io.serverlessworkflow.impl.WorkflowMutablePosition;
2426
import io.serverlessworkflow.impl.executors.CallableTask;
27+
import io.serverlessworkflow.impl.executors.CallableTaskBuilder;
2528
import java.util.Optional;
2629
import java.util.concurrent.CompletableFuture;
2730
import java.util.function.Function;
2831

2932
public class JavaFunctionCallExecutor<T, V>
30-
implements CallableTask<CallJava.CallJavaFunction<T, V>> {
33+
implements CallableTaskBuilder<CallJava.CallJavaFunction<T, V>> {
3134

3235
private Function<T, V> function;
3336
private Optional<Class<T>> inputClass;
3437

35-
public void init(CallJava.CallJavaFunction<T, V> task, WorkflowDefinition definition) {
36-
function = task.function();
37-
inputClass = task.inputClass();
38-
}
39-
40-
@Override
41-
public CompletableFuture<WorkflowModel> apply(
38+
private CompletableFuture<WorkflowModel> apply(
4239
WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input) {
4340
return CompletableFuture.completedFuture(
4441
workflowContext
@@ -52,4 +49,18 @@ public CompletableFuture<WorkflowModel> apply(
5249
public boolean accept(Class<? extends TaskBase> clazz) {
5350
return CallJava.CallJavaFunction.class.isAssignableFrom(clazz);
5451
}
52+
53+
@Override
54+
public void init(
55+
CallJavaFunction<T, V> task,
56+
WorkflowDefinition definition,
57+
WorkflowMutablePosition position) {
58+
function = task.function();
59+
inputClass = task.inputClass();
60+
}
61+
62+
@Override
63+
public CallableTask build() {
64+
return this::apply;
65+
}
5566
}

experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaLoopFunctionCallExecutor.java

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,27 +19,25 @@
1919

2020
import io.serverlessworkflow.api.types.TaskBase;
2121
import io.serverlessworkflow.api.types.func.CallJava;
22+
import io.serverlessworkflow.api.types.func.CallJava.CallJavaLoopFunction;
2223
import io.serverlessworkflow.api.types.func.LoopFunction;
2324
import io.serverlessworkflow.impl.TaskContext;
2425
import io.serverlessworkflow.impl.WorkflowContext;
2526
import io.serverlessworkflow.impl.WorkflowDefinition;
2627
import io.serverlessworkflow.impl.WorkflowModel;
2728
import io.serverlessworkflow.impl.WorkflowModelFactory;
29+
import io.serverlessworkflow.impl.WorkflowMutablePosition;
2830
import io.serverlessworkflow.impl.executors.CallableTask;
31+
import io.serverlessworkflow.impl.executors.CallableTaskBuilder;
2932
import java.util.concurrent.CompletableFuture;
3033

31-
public class JavaLoopFunctionCallExecutor implements CallableTask<CallJava.CallJavaLoopFunction> {
34+
public class JavaLoopFunctionCallExecutor
35+
implements CallableTaskBuilder<CallJava.CallJavaLoopFunction> {
3236

3337
private LoopFunction function;
3438
private String varName;
3539

36-
public void init(CallJava.CallJavaLoopFunction task, WorkflowDefinition definition) {
37-
function = task.function();
38-
varName = task.varName();
39-
}
40-
41-
@Override
42-
public CompletableFuture<WorkflowModel> apply(
40+
private CompletableFuture<WorkflowModel> apply(
4341
WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input) {
4442
WorkflowModelFactory modelFactory = workflowContext.definition().application().modelFactory();
4543
return CompletableFuture.completedFuture(
@@ -51,7 +49,18 @@ public CompletableFuture<WorkflowModel> apply(
5149

5250
@Override
5351
public boolean accept(Class<? extends TaskBase> clazz) {
54-
5552
return CallJava.CallJavaLoopFunction.class.isAssignableFrom(clazz);
5653
}
54+
55+
@Override
56+
public void init(
57+
CallJavaLoopFunction task, WorkflowDefinition definition, WorkflowMutablePosition position) {
58+
function = task.function();
59+
varName = task.varName();
60+
}
61+
62+
@Override
63+
public CallableTask build() {
64+
return this::apply;
65+
}
5766
}

experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaLoopFunctionIndexCallExecutor.java

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,30 +19,26 @@
1919

2020
import io.serverlessworkflow.api.types.TaskBase;
2121
import io.serverlessworkflow.api.types.func.CallJava;
22+
import io.serverlessworkflow.api.types.func.CallJava.CallJavaLoopFunctionIndex;
2223
import io.serverlessworkflow.api.types.func.LoopFunctionIndex;
2324
import io.serverlessworkflow.impl.TaskContext;
2425
import io.serverlessworkflow.impl.WorkflowContext;
2526
import io.serverlessworkflow.impl.WorkflowDefinition;
2627
import io.serverlessworkflow.impl.WorkflowModel;
2728
import io.serverlessworkflow.impl.WorkflowModelFactory;
29+
import io.serverlessworkflow.impl.WorkflowMutablePosition;
2830
import io.serverlessworkflow.impl.executors.CallableTask;
31+
import io.serverlessworkflow.impl.executors.CallableTaskBuilder;
2932
import java.util.concurrent.CompletableFuture;
3033

3134
public class JavaLoopFunctionIndexCallExecutor
32-
implements CallableTask<CallJava.CallJavaLoopFunctionIndex> {
35+
implements CallableTaskBuilder<CallJava.CallJavaLoopFunctionIndex> {
3336

3437
private LoopFunctionIndex function;
3538
private String varName;
3639
private String indexName;
3740

38-
public void init(CallJava.CallJavaLoopFunctionIndex task, WorkflowDefinition definition) {
39-
function = task.function();
40-
varName = task.varName();
41-
indexName = task.indexName();
42-
}
43-
44-
@Override
45-
public CompletableFuture<WorkflowModel> apply(
41+
private CompletableFuture<WorkflowModel> apply(
4642
WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input) {
4743
WorkflowModelFactory modelFactory = workflowContext.definition().application().modelFactory();
4844

@@ -59,4 +55,19 @@ public CompletableFuture<WorkflowModel> apply(
5955
public boolean accept(Class<? extends TaskBase> clazz) {
6056
return CallJava.CallJavaLoopFunctionIndex.class.isAssignableFrom(clazz);
6157
}
58+
59+
@Override
60+
public void init(
61+
CallJavaLoopFunctionIndex task,
62+
WorkflowDefinition definition,
63+
WorkflowMutablePosition position) {
64+
function = task.function();
65+
varName = task.varName();
66+
indexName = task.indexName();
67+
}
68+
69+
@Override
70+
public CallableTask build() {
71+
return this::apply;
72+
}
6273
}
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.executors;
17+
18+
import io.serverlessworkflow.api.types.CallFunction;
19+
import io.serverlessworkflow.api.types.FunctionArguments;
20+
import io.serverlessworkflow.api.types.Task;
21+
import io.serverlessworkflow.api.types.TaskBase;
22+
import io.serverlessworkflow.impl.WorkflowDefinition;
23+
import io.serverlessworkflow.impl.WorkflowMutablePosition;
24+
import io.serverlessworkflow.impl.WorkflowUtils;
25+
import io.serverlessworkflow.impl.WorkflowValueResolver;
26+
import java.util.Map;
27+
import java.util.Optional;
28+
29+
public class CallFunctionExecutor implements CallableTaskBuilder<CallFunction> {
30+
31+
private TaskExecutorBuilder<? extends TaskBase> executorBuilder;
32+
private WorkflowValueResolver<Map<String, Object>> args;
33+
34+
@Override
35+
public void init(
36+
CallFunction task, WorkflowDefinition definition, WorkflowMutablePosition position) {
37+
String functionName = task.getCall();
38+
FunctionArguments functionArgs = task.getWith();
39+
args =
40+
functionArgs != null
41+
? WorkflowUtils.buildMapResolver(
42+
definition.application(), functionArgs.getAdditionalProperties())
43+
: (w, t, m) -> Map.of();
44+
Task function = null;
45+
if (definition.workflow().getUse() != null
46+
&& definition.workflow().getUse().getFunctions() != null
47+
&& definition.workflow().getUse().getFunctions().getAdditionalProperties() != null) {
48+
function =
49+
definition.workflow().getUse().getFunctions().getAdditionalProperties().get(functionName);
50+
}
51+
if (function == null) {
52+
// TODO search in catalog
53+
throw new UnsupportedOperationException("Function Catalog not supported yet");
54+
}
55+
executorBuilder =
56+
definition.application().taskFactory().getTaskExecutor(position, function, definition);
57+
}
58+
59+
@Override
60+
public boolean accept(Class<? extends TaskBase> clazz) {
61+
return clazz.equals(CallFunction.class);
62+
}
63+
64+
@Override
65+
public CallableTask build() {
66+
TaskExecutor<? extends TaskBase> executor = executorBuilder.build();
67+
return (w, t, m) ->
68+
executor
69+
.apply(
70+
w,
71+
Optional.of(t),
72+
w.definition().application().modelFactory().fromAny(args.apply(w, t, m)))
73+
.thenApply(o -> o.output());
74+
}
75+
}

0 commit comments

Comments
 (0)