Skip to content

Commit fdda5b3

Browse files
Add get search attributes and data objects APIs (#18)
1 parent c137796 commit fdda5b3

File tree

4 files changed

+95
-45
lines changed

4 files changed

+95
-45
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ See [contribution guide](CONTRIBUTION.md)
1717
- [x] DataObjectRW
1818
- [x] StateLocal
1919
- [x] Signal workflow API
20-
- [ ] Get workflow DataObjects/SearchAttributes API
20+
- [x] Get workflow DataObjects/SearchAttributes API
2121
- [x] Get workflow result API
2222
- [x] Search workflow API
2323
- [x] Describe workflow API

iwf/client.go

Lines changed: 9 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -23,16 +23,11 @@ type Client interface {
2323
// GetWorkflowDataObjects returns the data objects of a workflow execution
2424
// workflowId is required, workflowRunId is optional and default to current runId of the workflowId
2525
// keys is required to be non-empty. If you intend to return all data objects, use GetAllWorkflowDataObjects API instead
26-
// It returns data objects in format of iwfidl.EncodedObject and user code have to use ObjectEncoder to deserialize
27-
GetWorkflowDataObjects(ctx context.Context, workflow Workflow, workflowId, workflowRunId string, keys []string) (map[string]iwfidl.EncodedObject, error)
28-
// GetAllWorkflowDataObjects returns all the data objects of a workflow execution
29-
// workflowId is required, workflowRunId is optional and default to current runId of the workflowId
30-
// It returns data objects in format of iwfidl.EncodedObject and user code have to use ObjectEncoder to deserialize
31-
GetAllWorkflowDataObjects(ctx context.Context, workflow Workflow, workflowId, workflowRunId string) (map[string]iwfidl.EncodedObject, error)
26+
GetWorkflowDataObjects(ctx context.Context, workflow Workflow, workflowId, workflowRunId string, keys []string) (map[string]Object, error)
3227
// GetWorkflowSearchAttributes returns search attributes of a workflow execution
3328
// workflowId is required, workflowRunId is optional and default to current runId of the workflowId
34-
// keys is required to be non-empty. If you intend to return all data objects, use GetAllWorkflowDataObjects API instead
35-
GetWorkflowSearchAttributes(ctx context.Context, workflow Workflow, workflowId, workflowRunId string) (map[string]interface{}, error)
29+
// keys is required to be non-empty. If you intend to return all data objects, use GetAllWorkflowSearchAttributes API instead
30+
GetWorkflowSearchAttributes(ctx context.Context, workflow Workflow, workflowId, workflowRunId string, keys []string) (map[string]interface{}, error)
3631
// GetAllWorkflowSearchAttributes returns all search attributes of a workflow execution
3732
// workflowId is required, workflowRunId is optional and default to current runId of the workflowId
3833
GetAllWorkflowSearchAttributes(ctx context.Context, workflow Workflow, workflowId, workflowRunId string) (map[string]interface{}, error)
@@ -66,6 +61,9 @@ type clientCommon interface {
6661
// https://cadenceworkflow.io/docs/concepts/search-workflows/
6762
// https://docs.temporal.io/concepts/what-is-a-search-attribute/
6863
SearchWorkflow(ctx context.Context, request iwfidl.WorkflowSearchRequest) (*iwfidl.WorkflowSearchResponse, error)
64+
// GetAllWorkflowDataObjects returns all the data objects of a workflow execution
65+
// workflowId is required, workflowRunId is optional and default to current runId of the workflowId
66+
GetAllWorkflowDataObjects(ctx context.Context, workflowId, workflowRunId string) (map[string]Object, error)
6967
}
7068

7169
// UnregisteredClient is a client without workflow registry
@@ -86,19 +84,11 @@ type UnregisteredClient interface {
8684
// GetWorkflowDataObjects returns the data objects of a workflow execution
8785
// workflowId is required, workflowRunId is optional and default to current runId of the workflowId
8886
// keys is required to be non-empty. If you intend to return all data objects, use GetAllWorkflowDataObjects API instead
89-
// It returns data objects in format of iwfidl.EncodedObject and user code have to use ObjectEncoder to deserialize
90-
GetWorkflowDataObjects(ctx context.Context, workflowId, workflowRunId string, keys []string) (map[string]iwfidl.EncodedObject, error)
91-
// GetAllWorkflowDataObjects returns all the data objects of a workflow execution
92-
// workflowId is required, workflowRunId is optional and default to current runId of the workflowId
93-
// It returns data objects in format of iwfidl.EncodedObject and user code have to use ObjectEncoder to deserialize
94-
GetAllWorkflowDataObjects(ctx context.Context, workflowId, workflowRunId string) (map[string]iwfidl.EncodedObject, error)
87+
GetWorkflowDataObjects(ctx context.Context, workflowId, workflowRunId string, keys []string) (map[string]Object, error)
9588
// GetWorkflowSearchAttributes returns search attributes of a workflow execution
9689
// workflowId is required, workflowRunId is optional and default to current runId of the workflowId
97-
// keys is required to be non-empty. If you intend to return all data objects, use GetAllWorkflowDataObjects API instead
98-
GetWorkflowSearchAttributes(ctx context.Context, workflowId, workflowRunId string) (map[string]iwfidl.SearchAttribute, error)
99-
// GetAllWorkflowSearchAttributes returns all search attributes of a workflow execution
100-
// workflowId is required, workflowRunId is optional and default to current runId of the workflowId
101-
GetAllWorkflowSearchAttributes(ctx context.Context, workflowId, workflowRunId string) (map[string]iwfidl.SearchAttribute, error)
90+
// keys is required to be non-empty. If you intend to return all data objects, use GetAllWorkflowSearchAttributes API instead
91+
GetWorkflowSearchAttributes(ctx context.Context, workflowId, workflowRunId string, keys []iwfidl.SearchAttributeKeyAndType) (map[string]iwfidl.SearchAttribute, error)
10292
}
10393

10494
// NewUnregisteredClient returns a UnregisteredClient

iwf/client_impl.go

Lines changed: 41 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"fmt"
66
"github.com/iworkflowio/iwf-golang-sdk/gen/iwfidl"
7+
"github.com/iworkflowio/iwf-golang-sdk/iwf/ptr"
78
)
89

910
type clientImpl struct {
@@ -43,23 +44,50 @@ func (c *clientImpl) SignalWorkflow(ctx context.Context, workflow Workflow, work
4344
return c.UnregisteredClient.SignalWorkflow(ctx, workflowId, workflowRunId, signalChannelName, signalValue)
4445
}
4546

46-
func (c *clientImpl) GetWorkflowDataObjects(ctx context.Context, workflow Workflow, workflowId, workflowRunId string, keys []string) (map[string]iwfidl.EncodedObject, error) {
47-
//TODO implement me
48-
panic("implement me")
49-
}
50-
51-
func (c *clientImpl) GetAllWorkflowDataObjects(ctx context.Context, workflow Workflow, workflowId, workflowRunId string) (map[string]iwfidl.EncodedObject, error) {
52-
//TODO implement me
53-
panic("implement me")
47+
func (c *clientImpl) GetWorkflowDataObjects(ctx context.Context, workflow Workflow, workflowId, workflowRunId string, keys []string) (map[string]Object, error) {
48+
wfType := GetDefaultWorkflowType(workflow)
49+
doTypeMap := c.registry.getWorkflowDataObjectKeyStore(wfType)
50+
for _, k := range keys {
51+
_, ok := doTypeMap[k]
52+
if !ok {
53+
return nil, fmt.Errorf("data object type %v is not registered", k)
54+
}
55+
}
56+
return c.UnregisteredClient.GetWorkflowDataObjects(ctx, workflowId, workflowRunId, keys)
5457
}
5558

56-
func (c *clientImpl) GetWorkflowSearchAttributes(ctx context.Context, workflow Workflow, workflowId, workflowRunId string) (map[string]interface{}, error) {
57-
//TODO implement me
58-
panic("implement me")
59+
func (c *clientImpl) GetWorkflowSearchAttributes(ctx context.Context, workflow Workflow, workflowId, workflowRunId string, keys []string) (map[string]interface{}, error) {
60+
wfType := GetDefaultWorkflowType(workflow)
61+
allTypes := c.registry.getSearchAttributeTypeStore(wfType)
62+
var keyAndTypes []iwfidl.SearchAttributeKeyAndType
63+
for _, k := range keys {
64+
keyAndTypes = append(keyAndTypes, iwfidl.SearchAttributeKeyAndType{
65+
Key: &k,
66+
ValueType: ptr.Any(allTypes[k]),
67+
})
68+
}
69+
vals, err := c.UnregisteredClient.GetWorkflowSearchAttributes(ctx, workflowId, workflowRunId, keyAndTypes)
70+
if err != nil {
71+
return nil, err
72+
}
73+
out := make(map[string]interface{}, len(vals))
74+
for _, val := range vals {
75+
v, err := getSearchAttributeValue(val)
76+
if err != nil {
77+
return nil, err
78+
}
79+
out[val.GetKey()] = v
80+
}
81+
return out, nil
5982
}
6083

6184
func (c *clientImpl) GetAllWorkflowSearchAttributes(ctx context.Context, workflow Workflow, workflowId, workflowRunId string) (map[string]interface{}, error) {
62-
//TODO implement me
63-
panic("implement me")
85+
wfType := GetDefaultWorkflowType(workflow)
86+
allTypes := c.registry.getSearchAttributeTypeStore(wfType)
87+
var keys []string
88+
for k := range allTypes {
89+
keys = append(keys, k)
90+
}
91+
return c.GetWorkflowSearchAttributes(ctx, workflow, workflowId, workflowRunId, keys)
6492
}
6593

iwf/unregistered_client_impl.go

Lines changed: 44 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -73,24 +73,56 @@ func (u *unregisteredClientImpl) SignalWorkflow(ctx context.Context, workflowId,
7373
return u.processError(err, httpResp)
7474
}
7575

76-
func (u *unregisteredClientImpl) GetWorkflowDataObjects(ctx context.Context, workflowId, workflowRunId string, keys []string) (map[string]iwfidl.EncodedObject, error) {
77-
//TODO implement me
78-
panic("implement me")
76+
func (u *unregisteredClientImpl) GetWorkflowDataObjects(ctx context.Context, workflowId, workflowRunId string, keys []string) (map[string]Object, error) {
77+
if len(keys) == 0 {
78+
return nil, fmt.Errorf("must specify keys to return, use GetAllWorkflowDataObjects if intended to get all keys")
79+
}
80+
return u.doGetWorkflowDataObjects(ctx, workflowId, workflowRunId, keys)
81+
}
82+
83+
func (u *unregisteredClientImpl) GetAllWorkflowDataObjects(ctx context.Context, workflowId, workflowRunId string) (map[string]Object, error) {
84+
return u.doGetWorkflowDataObjects(ctx, workflowId, workflowRunId, nil)
7985
}
8086

81-
func (u *unregisteredClientImpl) GetAllWorkflowDataObjects(ctx context.Context, workflowId, workflowRunId string) (map[string]iwfidl.EncodedObject, error) {
82-
//TODO implement me
83-
panic("implement me")
87+
func (u *unregisteredClientImpl) doGetWorkflowDataObjects(ctx context.Context, workflowId, workflowRunId string, keys []string) (map[string]Object, error) {
88+
reqPost := u.apiClient.DefaultApi.ApiV1WorkflowDataobjectsGetPost(ctx)
89+
resp, httpResp, err := reqPost.WorkflowGetDataObjectsRequest(iwfidl.WorkflowGetDataObjectsRequest{
90+
WorkflowId: workflowId,
91+
WorkflowRunId: iwfidl.PtrString(workflowRunId),
92+
Keys: keys,
93+
}).Execute()
94+
if err := u.processError(err, httpResp); err != nil {
95+
return nil, err
96+
}
97+
out := make(map[string]Object, len(resp.Objects))
98+
for _, kv := range resp.Objects {
99+
out[kv.GetKey()] = NewObject(ptr.Any(kv.GetValue()), u.options.ObjectEncoder)
100+
}
101+
return out, nil
84102
}
85103

86-
func (u *unregisteredClientImpl) GetWorkflowSearchAttributes(ctx context.Context, workflowId, workflowRunId string) (map[string]iwfidl.SearchAttribute, error) {
87-
//TODO implement me
88-
panic("implement me")
104+
func (u *unregisteredClientImpl) GetWorkflowSearchAttributes(ctx context.Context, workflowId, workflowRunId string, keys []iwfidl.SearchAttributeKeyAndType) (map[string]iwfidl.SearchAttribute, error) {
105+
if len(keys) == 0 {
106+
return nil, fmt.Errorf("must specify keys to return, use GetAllWorkflowSearchAttributes if intended to get all keys")
107+
}
108+
return u.doGetWorkflowSearchAttributes(ctx, workflowId, workflowRunId, keys)
89109
}
90110

91-
func (u *unregisteredClientImpl) GetAllWorkflowSearchAttributes(ctx context.Context, workflowId, workflowRunId string) (map[string]iwfidl.SearchAttribute, error) {
92-
//TODO implement me
93-
panic("implement me")
111+
func (u *unregisteredClientImpl) doGetWorkflowSearchAttributes(ctx context.Context, workflowId, workflowRunId string, keys []iwfidl.SearchAttributeKeyAndType) (map[string]iwfidl.SearchAttribute, error) {
112+
reqPost := u.apiClient.DefaultApi.ApiV1WorkflowSearchattributesGetPost(ctx)
113+
resp, httpResp, err := reqPost.WorkflowGetSearchAttributesRequest(iwfidl.WorkflowGetSearchAttributesRequest{
114+
WorkflowId: workflowId,
115+
WorkflowRunId: iwfidl.PtrString(workflowRunId),
116+
Keys: keys,
117+
}).Execute()
118+
if err := u.processError(err, httpResp); err != nil {
119+
return nil, err
120+
}
121+
out := make(map[string]iwfidl.SearchAttribute, len(resp.SearchAttributes))
122+
for _, kv := range resp.SearchAttributes {
123+
out[kv.GetKey()] = kv
124+
}
125+
return out, nil
94126
}
95127

96128
func (u *unregisteredClientImpl) StopWorkflow(ctx context.Context, workflowId, workflowRunId string, options *WorkflowStopOptions) error {

0 commit comments

Comments
 (0)