Skip to content

Commit a575332

Browse files
authored
Read DMS replication instance identifier from the DMS API (#492)
The replication instance and replication task ARNs do not contain the value that DMS uses for the ReplicationInstanceIdentifier dimension. Fixes #485
1 parent 8ba0f05 commit a575332

File tree

9 files changed

+381
-2
lines changed

9 files changed

+381
-2
lines changed

README.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -406,6 +406,13 @@ The following IAM permission is required to discover tagged API Gateway REST API
406406
"apigateway:GET"
407407
```
408408

409+
The following IAM permissions are required to discover tagged Database Migration Service (DMS) replication instances and tasks:
410+
411+
```json
412+
"dms:DescribeReplicationInstances",
413+
"dms:DescribeReplicationTasks"
414+
```
415+
409416
## Running locally
410417

411418
```shell

pkg/abstract.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ func scrapeAwsData(
4747
client: cache.GetTagging(&region, role),
4848
apiGatewayClient: cache.GetAPIGateway(&region, role),
4949
asgClient: cache.GetASG(&region, role),
50+
dmsClient: cache.GetDMS(&region, role),
5051
ec2Client: cache.GetEC2(&region, role),
5152
}
5253

pkg/aws_tags.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"github.com/aws/aws-sdk-go/aws"
99
"github.com/aws/aws-sdk-go/service/apigateway/apigatewayiface"
1010
"github.com/aws/aws-sdk-go/service/autoscaling/autoscalingiface"
11+
"github.com/aws/aws-sdk-go/service/databasemigrationservice/databasemigrationserviceiface"
1112
"github.com/aws/aws-sdk-go/service/ec2/ec2iface"
1213
"github.com/aws/aws-sdk-go/service/resourcegroupstaggingapi"
1314
"github.com/aws/aws-sdk-go/service/resourcegroupstaggingapi/resourcegroupstaggingapiiface"
@@ -81,6 +82,7 @@ type tagsInterface struct {
8182
asgClient autoscalingiface.AutoScalingAPI
8283
apiGatewayClient apigatewayiface.APIGatewayAPI
8384
ec2Client ec2iface.EC2API
85+
dmsClient databasemigrationserviceiface.DatabaseMigrationServiceAPI
8486
}
8587

8688
func (iface tagsInterface) get(job *Job, region string) ([]*taggedResource, error) {

pkg/prometheus.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,10 @@ var (
4545
Name: "yace_cloudwatch_ec2api_requests_total",
4646
Help: "Help is not implemented yet.",
4747
})
48+
dmsAPICounter = prometheus.NewCounter(prometheus.CounterOpts{
49+
Name: "yace_cloudwatch_dmsapi_requests_total",
50+
Help: "Help is not implemented yet.",
51+
})
4852
)
4953

5054
type PrometheusMetric struct {

pkg/services.go

Lines changed: 55 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"github.com/aws/aws-sdk-go/aws"
99
"github.com/aws/aws-sdk-go/service/apigateway"
1010
"github.com/aws/aws-sdk-go/service/autoscaling"
11+
"github.com/aws/aws-sdk-go/service/databasemigrationservice"
1112
"github.com/aws/aws-sdk-go/service/ec2"
1213
)
1314

@@ -189,7 +190,60 @@ var (
189190
aws.String("dms"),
190191
},
191192
DimensionRegexps: []*string{
192-
aws.String("rep:(?P<ReplicationInstanceIdentifier>[^/]+)"),
193+
aws.String("rep:[^/]+/(?P<ReplicationInstanceIdentifier>[^/]+)"),
194+
aws.String("task:(?P<ReplicationTaskIdentifier>[^/]+)/(?P<ReplicationInstanceIdentifier>[^/]+)"),
195+
},
196+
// Append the replication instance identifier to DMS task and instance ARNs
197+
FilterFunc: func(iface tagsInterface, inputResources []*taggedResource) (outputResources []*taggedResource, err error) {
198+
if len(inputResources) == 0 {
199+
return inputResources, nil
200+
}
201+
202+
ctx := context.Background()
203+
replicationInstanceIdentifiers := make(map[string]string)
204+
pageNum := 0
205+
if err := iface.dmsClient.DescribeReplicationInstancesPagesWithContext(ctx, nil,
206+
func(page *databasemigrationservice.DescribeReplicationInstancesOutput, lastPage bool) bool {
207+
pageNum++
208+
dmsAPICounter.Inc()
209+
210+
for _, instance := range page.ReplicationInstances {
211+
replicationInstanceIdentifiers[aws.StringValue(instance.ReplicationInstanceArn)] = aws.StringValue(instance.ReplicationInstanceIdentifier)
212+
}
213+
214+
return pageNum < 100
215+
},
216+
); err != nil {
217+
return nil, err
218+
}
219+
pageNum = 0
220+
if err := iface.dmsClient.DescribeReplicationTasksPagesWithContext(ctx, nil,
221+
func(page *databasemigrationservice.DescribeReplicationTasksOutput, lastPage bool) bool {
222+
pageNum++
223+
dmsAPICounter.Inc()
224+
225+
for _, task := range page.ReplicationTasks {
226+
taskInstanceArn := aws.StringValue(task.ReplicationInstanceArn)
227+
if instanceIdentifier, ok := replicationInstanceIdentifiers[taskInstanceArn]; ok {
228+
replicationInstanceIdentifiers[aws.StringValue(task.ReplicationTaskArn)] = instanceIdentifier
229+
}
230+
}
231+
232+
return pageNum < 100
233+
},
234+
); err != nil {
235+
return nil, err
236+
}
237+
238+
for _, resource := range inputResources {
239+
r := resource
240+
// Append the replication instance identifier to replication instance and task ARNs
241+
if instanceIdentifier, ok := replicationInstanceIdentifiers[r.ARN]; ok {
242+
r.ARN = fmt.Sprintf("%s/%s", r.ARN, instanceIdentifier)
243+
}
244+
outputResources = append(outputResources, r)
245+
}
246+
return
193247
},
194248
}, {
195249
Namespace: "AWS/DDoSProtection",

pkg/services_test.go

Lines changed: 241 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,241 @@
1+
package exporter
2+
3+
import (
4+
"reflect"
5+
"testing"
6+
7+
"github.com/aws/aws-sdk-go/aws"
8+
"github.com/aws/aws-sdk-go/aws/request"
9+
"github.com/aws/aws-sdk-go/service/databasemigrationservice"
10+
"github.com/aws/aws-sdk-go/service/databasemigrationservice/databasemigrationserviceiface"
11+
)
12+
13+
func TestDMSFilterFunc(t *testing.T) {
14+
tests := []struct {
15+
name string
16+
iface tagsInterface
17+
inputResources []*taggedResource
18+
outputResources []*taggedResource
19+
}{
20+
{
21+
"empty input resources",
22+
tagsInterface{},
23+
[]*taggedResource{},
24+
[]*taggedResource{},
25+
},
26+
{
27+
"replication tasks and instances",
28+
tagsInterface{
29+
dmsClient: dmsClient{
30+
describeReplicationInstancesOutput: &databasemigrationservice.DescribeReplicationInstancesOutput{
31+
ReplicationInstances: []*databasemigrationservice.ReplicationInstance{
32+
{
33+
ReplicationInstanceArn: aws.String("arn:aws:dms:us-east-1:123123123123:rep:ABCDEFG1234567890"),
34+
ReplicationInstanceIdentifier: aws.String("repl-instance-identifier-1"),
35+
},
36+
{
37+
ReplicationInstanceArn: aws.String("arn:aws:dms:us-east-1:123123123123:rep:ZZZZZZZZZZZZZZZZZ"),
38+
ReplicationInstanceIdentifier: aws.String("repl-instance-identifier-2"),
39+
},
40+
{
41+
ReplicationInstanceArn: aws.String("arn:aws:dms:us-east-1:123123123123:rep:YYYYYYYYYYYYYYYYY"),
42+
ReplicationInstanceIdentifier: aws.String("repl-instance-identifier-3"),
43+
},
44+
},
45+
},
46+
describeReplicationTasksOutput: &databasemigrationservice.DescribeReplicationTasksOutput{
47+
ReplicationTasks: []*databasemigrationservice.ReplicationTask{
48+
{
49+
ReplicationTaskArn: aws.String("arn:aws:dms:us-east-1:123123123123:task:9999999999999999"),
50+
ReplicationInstanceArn: aws.String("arn:aws:dms:us-east-1:123123123123:rep:ZZZZZZZZZZZZZZZZZ"),
51+
},
52+
{
53+
ReplicationTaskArn: aws.String("arn:aws:dms:us-east-1:123123123123:task:2222222222222222"),
54+
ReplicationInstanceArn: aws.String("arn:aws:dms:us-east-1:123123123123:rep:ZZZZZZZZZZZZZZZZZ"),
55+
},
56+
{
57+
ReplicationTaskArn: aws.String("arn:aws:dms:us-east-1:123123123123:task:3333333333333333"),
58+
ReplicationInstanceArn: aws.String("arn:aws:dms:us-east-1:123123123123:rep:WWWWWWWWWWWWWWWWW"),
59+
},
60+
},
61+
},
62+
},
63+
},
64+
[]*taggedResource{
65+
{
66+
ARN: "arn:aws:dms:us-east-1:123123123123:rep:ABCDEFG1234567890",
67+
Namespace: "dms",
68+
Region: "us-east-1",
69+
Tags: []Tag{
70+
{
71+
Key: "Test",
72+
Value: "Value",
73+
},
74+
},
75+
},
76+
{
77+
ARN: "arn:aws:dms:us-east-1:123123123123:rep:WXYZ987654321",
78+
Namespace: "dms",
79+
Region: "us-east-1",
80+
Tags: []Tag{
81+
{
82+
Key: "Test",
83+
Value: "Value 2",
84+
},
85+
},
86+
},
87+
{
88+
ARN: "arn:aws:dms:us-east-1:123123123123:task:9999999999999999",
89+
Namespace: "dms",
90+
Region: "us-east-1",
91+
Tags: []Tag{
92+
{
93+
Key: "Test",
94+
Value: "Value 3",
95+
},
96+
},
97+
},
98+
{
99+
ARN: "arn:aws:dms:us-east-1:123123123123:task:5555555555555555",
100+
Namespace: "dms",
101+
Region: "us-east-1",
102+
Tags: []Tag{
103+
{
104+
Key: "Test",
105+
Value: "Value 4",
106+
},
107+
},
108+
},
109+
{
110+
ARN: "arn:aws:dms:us-east-1:123123123123:subgrp:demo-subgrp",
111+
Namespace: "dms",
112+
Region: "us-east-1",
113+
Tags: []Tag{
114+
{
115+
Key: "Test",
116+
Value: "Value 5",
117+
},
118+
},
119+
},
120+
{
121+
ARN: "arn:aws:dms:us-east-1:123123123123:endpoint:1111111111111111",
122+
Namespace: "dms",
123+
Region: "us-east-1",
124+
Tags: []Tag{
125+
{
126+
Key: "Test",
127+
Value: "Value 6",
128+
},
129+
},
130+
},
131+
},
132+
[]*taggedResource{
133+
{
134+
ARN: "arn:aws:dms:us-east-1:123123123123:rep:ABCDEFG1234567890/repl-instance-identifier-1",
135+
Namespace: "dms",
136+
Region: "us-east-1",
137+
Tags: []Tag{
138+
{
139+
Key: "Test",
140+
Value: "Value",
141+
},
142+
},
143+
},
144+
{
145+
ARN: "arn:aws:dms:us-east-1:123123123123:rep:WXYZ987654321",
146+
Namespace: "dms",
147+
Region: "us-east-1",
148+
Tags: []Tag{
149+
{
150+
Key: "Test",
151+
Value: "Value 2",
152+
},
153+
},
154+
},
155+
{
156+
ARN: "arn:aws:dms:us-east-1:123123123123:task:9999999999999999/repl-instance-identifier-2",
157+
Namespace: "dms",
158+
Region: "us-east-1",
159+
Tags: []Tag{
160+
{
161+
Key: "Test",
162+
Value: "Value 3",
163+
},
164+
},
165+
},
166+
{
167+
ARN: "arn:aws:dms:us-east-1:123123123123:task:5555555555555555",
168+
Namespace: "dms",
169+
Region: "us-east-1",
170+
Tags: []Tag{
171+
{
172+
Key: "Test",
173+
Value: "Value 4",
174+
},
175+
},
176+
},
177+
{
178+
ARN: "arn:aws:dms:us-east-1:123123123123:subgrp:demo-subgrp",
179+
Namespace: "dms",
180+
Region: "us-east-1",
181+
Tags: []Tag{
182+
{
183+
Key: "Test",
184+
Value: "Value 5",
185+
},
186+
},
187+
},
188+
{
189+
ARN: "arn:aws:dms:us-east-1:123123123123:endpoint:1111111111111111",
190+
Namespace: "dms",
191+
Region: "us-east-1",
192+
Tags: []Tag{
193+
{
194+
Key: "Test",
195+
Value: "Value 6",
196+
},
197+
},
198+
},
199+
},
200+
},
201+
}
202+
for _, test := range tests {
203+
t.Run(test.name, func(t *testing.T) {
204+
dms := SupportedServices.GetService("dms")
205+
206+
outputResources, err := dms.FilterFunc(test.iface, test.inputResources)
207+
if err != nil {
208+
t.Logf("Error from FilterFunc: %v", err)
209+
t.FailNow()
210+
}
211+
if len(outputResources) != len(test.outputResources) {
212+
t.Logf("len(outputResources) = %d, want %d", len(outputResources), len(test.outputResources))
213+
t.Fail()
214+
}
215+
for i, resource := range outputResources {
216+
if len(test.outputResources) <= i {
217+
break
218+
}
219+
wantResource := *test.outputResources[i]
220+
if !reflect.DeepEqual(*resource, wantResource) {
221+
t.Errorf("outputResources[%d] = %+v, want %+v", i, *resource, wantResource)
222+
}
223+
}
224+
})
225+
}
226+
}
227+
228+
type dmsClient struct {
229+
databasemigrationserviceiface.DatabaseMigrationServiceAPI
230+
describeReplicationInstancesOutput *databasemigrationservice.DescribeReplicationInstancesOutput
231+
describeReplicationTasksOutput *databasemigrationservice.DescribeReplicationTasksOutput
232+
}
233+
234+
func (dms dmsClient) DescribeReplicationInstancesPagesWithContext(ctx aws.Context, input *databasemigrationservice.DescribeReplicationInstancesInput, fn func(*databasemigrationservice.DescribeReplicationInstancesOutput, bool) bool, opts ...request.Option) error {
235+
fn(dms.describeReplicationInstancesOutput, true)
236+
return nil
237+
}
238+
func (dms dmsClient) DescribeReplicationTasksPagesWithContext(ctx aws.Context, input *databasemigrationservice.DescribeReplicationTasksInput, fn func(*databasemigrationservice.DescribeReplicationTasksOutput, bool) bool, opts ...request.Option) error {
239+
fn(dms.describeReplicationTasksOutput, true)
240+
return nil
241+
}

0 commit comments

Comments
 (0)