Skip to content

Commit 28c3a56

Browse files
authored
feat: support templating for http sinks (#493)
1 parent dd181e7 commit 28c3a56

File tree

5 files changed

+235
-16
lines changed

5 files changed

+235
-16
lines changed

plugins/sinks/http/README.md

Lines changed: 45 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,19 +10,57 @@ sinks:
1010
config:
1111
method: POST
1212
success_code: 200
13-
url: https://compass.com/v1beta1/asset
13+
url: https://compass.requestcatcher.com/v1beta2/asset/{{ .Type }}/{{ .Urn }}
1414
headers:
1515
Header-1: value11,value12
16+
script:
17+
engine: tengo
18+
source: |
19+
payload := {
20+
details: {
21+
some_key: asset.urn,
22+
another_key: asset.name
23+
}
24+
}
25+
sink(payload)
1626
```
1727
1828
## Config Defination
1929
20-
| Key | Value | Example | Description | |
21-
| :-- | :---- | :------ | :---------- | :-- |
22-
|`url` | `string` | `http://compass.production.com/v1beta1/asset` | URL to the http server, contains all the info needed to make the request, like port and route | *required*|
23-
| `method` | `string` | `POST` | the method string of by which the request is to be made, e.g. POST/PATCH/GET | *required* |
24-
| `success_code` | `integer` | `200` | to identify the expected success code the http server returns, defult is `200` | *optional* |
25-
| `headers` | `map` | `"Content-Type": "application/json"` | to add any header/headers that may be required for making the request | *optional* |
30+
| Key | Value | Example | Description | |
31+
| :------------- | :-------- | :------------------------------------------------------------------------ | :-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | :--------- |
32+
| `url` | `string` | `https://compass.requestcatcher.com/v1beta2/asset/{{ .Type }}/{{ .Urn }}` | URL to the http server, contains all the info needed to make the request, like port and route, support go [text/template](https://pkg.go.dev/text/template) (see the properties in [v1beta2.Asset](https://github.com/goto/meteor/blob/main/models/gotocompany/assets/v1beta2/asset.pb.go#L25-L68)) | _required_ |
33+
| `method` | `string` | `POST` | the method string of by which the request is to be made, e.g. POST/PATCH/GET | _required_ |
34+
| `success_code` | `integer` | `200` | to identify the expected success code the http server returns, defult is `200` | _optional_ |
35+
| `headers` | `map` | `"Content-Type": "application/json"` | to add any header/headers that may be required for making the request | _optional_ |
36+
| `script` | `Object` | see [Script](#Script) | Script for building custom payload | \*optional |
37+
38+
## Script
39+
40+
| Key | Value | Example | Description | |
41+
| :------- | :------- | :------------------- | :----------------------------------------------------------------------------------- | :--------- |
42+
| `engine` | `string` | `tengo` | Script engine. Only `"tengo"` is supported currently | _required_ |
43+
| `source` | `string` | see [Usage](#Usage). | [Tengo][tengo] script used to map the request into custom payload to be sent to url. | _required_ |
44+
45+
### Script Globals
46+
47+
- [`asset`](#recipe_scope)
48+
- [`sink(Payload)`](#sinkpayload)
49+
- [`exit`](#exit)
50+
51+
#### `asset`
52+
53+
The asset record emitted by the extractor and processors is made available in the script
54+
environment as `asset`. The field names will be as
55+
per the [`Asset` proto definition](https://github.com/goto/proton/blob/5b5dc72/gotocompany/assets/v1beta2/asset.proto#L14).
56+
57+
#### `sink(Payload)`
58+
59+
Send http request to url with payload.
60+
61+
#### `exit()`
62+
63+
Terminates the script execution.
2664

2765
## Contributing
2866

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
---
2+
version: 1
3+
interactions:
4+
- request:
5+
body: '{"details":{"another_key":"index1","some_key":"elasticsearch.index1"}}'
6+
form: {}
7+
headers:
8+
Accept:
9+
- application/json
10+
Content-Type:
11+
- application/json
12+
url: http://127.0.0.1:54943/table/elasticsearch.index1
13+
method: POST
14+
response:
15+
body: ""
16+
headers:
17+
Content-Length:
18+
- "0"
19+
Date:
20+
- Mon, 11 Jul 2022 09:32:21 GMT
21+
status: 200 OK
22+
code: 200
23+
duration: 274.68µs
24+
- request:
25+
body: '{"details":{"another_key":"index2","some_key":"elasticsearch.index2"}}'
26+
form: {}
27+
headers:
28+
Accept:
29+
- application/json
30+
Content-Type:
31+
- application/json
32+
url: http://127.0.0.1:54943/table/elasticsearch.index2
33+
method: POST
34+
response:
35+
body: ""
36+
headers:
37+
Content-Length:
38+
- "0"
39+
Date:
40+
- Mon, 11 Jul 2022 09:32:21 GMT
41+
status: 200 OK
42+
code: 200
43+
duration: 173.792µs

plugins/sinks/http/http.go

Lines changed: 36 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,12 @@ import (
99
"io"
1010
"net/http"
1111
"strings"
12+
"text/template"
1213

1314
"github.com/MakeNowJust/heredoc"
1415
"github.com/raystack/meteor/metrics/otelhttpclient"
1516
"github.com/raystack/meteor/models"
17+
v1beta2 "github.com/raystack/meteor/models/raystack/assets/v1beta2"
1618
"github.com/raystack/meteor/plugins"
1719
"github.com/raystack/meteor/registry"
1820
"github.com/raystack/salt/log"
@@ -26,6 +28,10 @@ type Config struct {
2628
Headers map[string]string `mapstructure:"headers"`
2729
Method string `mapstructure:"method" validate:"required"`
2830
SuccessCode int `mapstructure:"success_code" default:"200"`
31+
Script *struct {
32+
Engine string `mapstructure:"engine" validate:"required,oneof=tengo"`
33+
Source string `mapstructure:"source" validate:"required"`
34+
} `mapstructure:"script"`
2935
}
3036

3137
var info = plugins.Info{
@@ -34,11 +40,21 @@ var info = plugins.Info{
3440
Tags: []string{"http", "sink"},
3541
SampleConfig: heredoc.Doc(`
3642
# The url (hostname and route) of the http service
37-
url: https://compass.com/route
43+
url: https://compass.requestcatcher.com/{{ .Type }}/{{ .Urn }}
3844
method: "PUT"
3945
# Additional HTTP headers, multiple headers value are separated by a comma
4046
headers:
4147
X-Other-Header: value1, value2
48+
script:
49+
engine: tengo
50+
source: |
51+
payload := {
52+
details: {
53+
some_key: asset.urn,
54+
another_key: asset.name
55+
}
56+
}
57+
sink(payload)
4258
`),
4359
}
4460

@@ -75,12 +91,7 @@ func (s *Sink) Sink(ctx context.Context, batch []models.Record) error {
7591
for _, record := range batch {
7692
metadata := record.Data()
7793
s.logger.Info("sinking record to http", "record", metadata.Urn)
78-
payload, err := json.Marshal(metadata)
79-
if err != nil {
80-
return fmt.Errorf("build http payload: %w", err)
81-
}
82-
83-
if err = s.send(ctx, payload); err != nil {
94+
if err := s.send(ctx, metadata); err != nil {
8495
return fmt.Errorf("send data: %w", err)
8596
}
8697

@@ -92,9 +103,25 @@ func (s *Sink) Sink(ctx context.Context, batch []models.Record) error {
92103

93104
func (*Sink) Close() error { return nil }
94105

95-
func (s *Sink) send(ctx context.Context, payloadBytes []byte) error {
106+
func (s *Sink) send(ctx context.Context, asset *v1beta2.Asset) error {
107+
t := template.Must(template.New("url").Parse(s.config.URL))
108+
var buf bytes.Buffer
109+
if err := t.Execute(&buf, asset); err != nil {
110+
return fmt.Errorf("build http url: %w", err)
111+
}
112+
url := buf.String()
113+
if s.config.Script != nil {
114+
return s.executeScript(ctx, url, asset)
115+
}
116+
payload, err := json.Marshal(asset)
117+
if err != nil {
118+
return fmt.Errorf("build http payload: %w", err)
119+
}
96120
// send request
97-
req, err := http.NewRequestWithContext(ctx, s.config.Method, s.config.URL, bytes.NewBuffer(payloadBytes))
121+
return s.makeRequest(ctx, url, payload)
122+
}
123+
func (s *Sink) makeRequest(ctx context.Context, url string, payload []byte) error {
124+
req, err := http.NewRequestWithContext(ctx, s.config.Method, url, bytes.NewBuffer(payload))
98125
if err != nil {
99126
return err
100127
}

plugins/sinks/http/http_test.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,46 @@ func TestSink(t *testing.T) {
154154
err = httpSink.Sink(context.TODO(), getExpectedVal(t))
155155
assert.NoError(t, err)
156156
})
157+
158+
t.Run("should return no error when using templates", func(t *testing.T) {
159+
r, err := recorder.New("fixtures/response_with_script")
160+
if err != nil {
161+
t.Fatal(err)
162+
}
163+
defer func() {
164+
err := r.Stop()
165+
if err != nil {
166+
t.Fatal(err)
167+
}
168+
}()
169+
httpSink := h.New(&http.Client{Transport: r}, testutils.Logger)
170+
config := map[string]interface{}{
171+
"success_code": success_code,
172+
"url": "http://127.0.0.1:54943/{{ .Type }}/{{ .Urn }}",
173+
"method": "POST",
174+
"headers": map[string]string{
175+
"Content-Type": "application/json",
176+
"Accept": "application/json",
177+
},
178+
"script": map[string]interface{}{
179+
"engine": "tengo",
180+
"source": `
181+
payload := {
182+
details: {
183+
some_key: asset.urn,
184+
another_key: asset.name
185+
}
186+
}
187+
sink(payload)
188+
`,
189+
},
190+
}
191+
err = httpSink.Init(context.TODO(), plugins.Config{RawConfig: config})
192+
assert.NoError(t, err)
193+
defer httpSink.Close()
194+
err = httpSink.Sink(context.TODO(), getExpectedVal(t))
195+
assert.NoError(t, err)
196+
})
157197
}
158198

159199
func getExpectedVal(t *testing.T) []models.Record {

plugins/sinks/http/tengo_script.go

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
package http
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"errors"
7+
"fmt"
8+
9+
"github.com/d5/tengo/v2"
10+
v1beta2 "github.com/raystack/meteor/models/raystack/assets/v1beta2"
11+
"github.com/raystack/meteor/plugins/internal/tengoutil"
12+
"github.com/raystack/meteor/plugins/internal/tengoutil/structmap"
13+
)
14+
15+
var errUserExit = errors.New("user exit")
16+
17+
func (s *Sink) executeScript(ctx context.Context, url string, asset *v1beta2.Asset) error {
18+
scriptCfg := s.config.Script
19+
script, err := tengoutil.NewSecureScript(
20+
([]byte)(scriptCfg.Source), s.scriptGlobals(ctx, url),
21+
)
22+
if err != nil {
23+
return err
24+
}
25+
c, err := script.Compile()
26+
if err != nil {
27+
return fmt.Errorf("compile: %w", err)
28+
}
29+
assetMap, err := structmap.AsMap(asset)
30+
if err != nil {
31+
return fmt.Errorf("convert asset to map: %w", err)
32+
}
33+
if err := c.Set("asset", assetMap); err != nil {
34+
return fmt.Errorf("set asset into vm: %w", err)
35+
}
36+
if err := c.RunContext(ctx); err != nil && !errors.Is(err, errUserExit) {
37+
return fmt.Errorf("run: %w", err)
38+
}
39+
return nil
40+
}
41+
func (s *Sink) scriptGlobals(ctx context.Context, url string) map[string]interface{} {
42+
return map[string]interface{}{
43+
"asset": map[string]interface{}{},
44+
"sink": &tengo.UserFunction{
45+
Name: "sink",
46+
Value: s.executeRequestWrapper(ctx, url),
47+
},
48+
"exit": &tengo.UserFunction{
49+
Name: "exit",
50+
Value: func(...tengo.Object) (tengo.Object, error) {
51+
return nil, errUserExit
52+
},
53+
},
54+
}
55+
}
56+
func (s *Sink) executeRequestWrapper(ctx context.Context, url string) tengo.CallableFunc {
57+
return func(args ...tengo.Object) (tengo.Object, error) {
58+
if len(args) != 1 {
59+
return nil, fmt.Errorf("execute request: invalid number of arguments of sink function, expected 1, got %d", len(args))
60+
}
61+
payloadObj, ok := args[0].(*tengo.Map)
62+
if !ok {
63+
return nil, fmt.Errorf("execute request: invalid type of argument of sink function, expected map, got %T", args[0])
64+
}
65+
payload, err := json.Marshal(tengo.ToInterface(payloadObj))
66+
if err != nil {
67+
return nil, fmt.Errorf("execute request: marshal payload: %w", err)
68+
}
69+
return nil, s.makeRequest(ctx, url, payload)
70+
}
71+
}

0 commit comments

Comments
 (0)