Skip to content

Commit 767660c

Browse files
7 send historical device data via websocket (#76)
* feat: historical data request * fix: using payload key as websocket request * feat: publish historical response to ws * test: group ws messages by context * test: rephrase the test sentense * test: comparing time on historical data response * test: using the same format in end to end test * fix: remove ts from historical request * refactor: end to end test * feat: define historical chart types in backend * feat: validate request and add metrics * Jest error when mock @hello.nrfcloud.com (#87) * test: demo jest error * fix(jest): tell @swc/jest to understand ESM syntax in types --------- Co-authored-by: Markus Tacker <[email protected]> * feat: remove unused code * refactor: convert response in the backend * feat: add error message * test: add end-to-end test for location historical request * test: removed unused code * test: catch error in clean up process * feat: change proto historical data response context * fix(deps): update proto * feat: using bdd markdown feature * fix: using try..catch * Update features/HistoricalRequest.feature.md * Update feature-runner/steps/websocket.ts * test(e2e): renamed step --------- Co-authored-by: Markus Tacker <[email protected]> Co-authored-by: Markus Tacker <[email protected]>
1 parent 8872574 commit 767660c

22 files changed

+1435
-48
lines changed

cdk/BackendLambdas.d.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,5 +11,6 @@ type BackendLambdas = {
1111
onDeviceMessage: PackedLambda
1212
storeMessagesInTimestream: PackedLambda
1313
healthCheck: PackedLambda
14+
historicalDataRequest: PackedLambda
1415
kpis: PackedLambda
1516
}

cdk/packBackendLambdas.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,5 +30,9 @@ export const packBackendLambdas = async (): Promise<BackendLambdas> => ({
3030
'lambda/storeMessagesInTimestream.ts',
3131
),
3232
healthCheck: await packLambdaFromPath('healthCheck', 'lambda/healthCheck.ts'),
33+
historicalDataRequest: await packLambdaFromPath(
34+
'historicalDataRequest',
35+
'lambda/historicalDataRequest.ts',
36+
),
3337
kpis: await packLambdaFromPath('kpis', 'lambda/kpis.ts'),
3438
})

cdk/resources/HistoricalData.ts

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ export class HistoricalData extends Construct {
2828
websocketAPI: WebsocketAPI
2929
lambdaSources: {
3030
storeMessagesInTimestream: PackedLambda
31+
historicalDataRequest: PackedLambda
3132
}
3233
layers: Lambda.ILayerVersion[]
3334
},
@@ -91,5 +92,55 @@ export class HistoricalData extends Construct {
9192
targets: [new EventTargets.LambdaFunction(storeMessagesInTimestream)],
9293
eventBus: websocketAPI.eventBus,
9394
})
95+
96+
const historicalDataRequest = new Lambda.Function(
97+
this,
98+
'historicalDataRequest',
99+
{
100+
handler: lambdaSources.historicalDataRequest.handler,
101+
architecture: Lambda.Architecture.ARM_64,
102+
runtime: Lambda.Runtime.NODEJS_18_X,
103+
timeout: Duration.seconds(5),
104+
memorySize: 1792,
105+
code: new LambdaSource(this, lambdaSources.historicalDataRequest).code,
106+
description: 'Handle historical data request',
107+
environment: {
108+
VERSION: this.node.tryGetContext('version'),
109+
LOG_LEVEL: this.node.tryGetContext('logLevel'),
110+
HISTORICAL_DATA_TABLE_INFO: this.table.ref,
111+
EVENTBUS_NAME: websocketAPI.eventBus.eventBusName,
112+
NODE_NO_WARNINGS: '1',
113+
},
114+
layers,
115+
initialPolicy: [
116+
new IAM.PolicyStatement({
117+
actions: [
118+
'timestream:Select',
119+
'timestream:DescribeTable',
120+
'timestream:ListMeasures',
121+
],
122+
resources: [this.table.attrArn],
123+
}),
124+
new IAM.PolicyStatement({
125+
actions: [
126+
'timestream:DescribeEndpoints',
127+
'timestream:SelectValues',
128+
'timestream:CancelQuery',
129+
],
130+
resources: ['*'],
131+
}),
132+
],
133+
logRetention: Logs.RetentionDays.ONE_WEEK,
134+
},
135+
)
136+
new Events.Rule(this, 'historicalDataRequestRule', {
137+
eventPattern: {
138+
source: ['thingy.ws'],
139+
detailType: ['request'],
140+
},
141+
targets: [new EventTargets.LambdaFunction(historicalDataRequest)],
142+
eventBus: websocketAPI.eventBus,
143+
})
144+
websocketAPI.eventBus.grantPutEventsTo(historicalDataRequest)
94145
}
95146
}

cdk/resources/WebsocketAPI.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,13 +110,15 @@ export class WebsocketAPI extends Construct {
110110
environment: {
111111
VERSION: this.node.tryGetContext('version'),
112112
WEBSOCKET_CONNECTIONS_TABLE_NAME: this.connectionsTable.tableName,
113+
EVENTBUS_NAME: this.eventBus.eventBusName,
113114
LOG_LEVEL: this.node.tryGetContext('logLevel'),
114115
NODE_NO_WARNINGS: '1',
115116
DISABLE_METRICS: this.node.tryGetContext('isTest') === true ? '1' : '0',
116117
},
117118
layers,
118119
logRetention: Logs.RetentionDays.ONE_WEEK,
119120
})
121+
this.eventBus.grantPutEventsTo(onMessage)
120122
this.connectionsTable.grantWriteData(onMessage)
121123

122124
// OnDisconnect
@@ -331,7 +333,7 @@ export class WebsocketAPI extends Construct {
331333
new Events.Rule(this, 'publishToWebsocketClientsRule', {
332334
eventPattern: {
333335
source: ['thingy.ws'],
334-
detailType: ['message', 'connect'],
336+
detailType: ['message', 'connect', 'error'],
335337
},
336338
targets: [new EventsTargets.LambdaFunction(publishToWebsocketClients)],
337339
eventBus: this.eventBus,

feature-runner/lib/websocket.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { ulid } from '../../util/ulid.js'
44
export type WebSocketClient = {
55
connect: () => Promise<any>
66
close: () => void
7+
send: (message: Record<string, unknown>) => Promise<void>
78
messages: Record<string, unknown>
89
}
910
const clients: Record<string, WebSocketClient> = {}
@@ -41,6 +42,13 @@ export const createWebsocketClient = ({
4142
delete clients[id]
4243
},
4344
messages,
45+
send: async (message) =>
46+
new Promise<void>((resolve, reject) => {
47+
client.send(JSON.stringify(message), (error) => {
48+
if (error) return reject(error)
49+
resolve()
50+
})
51+
}),
4452
}
4553
}
4654

feature-runner/run-features.ts

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
import { CloudFormationClient } from '@aws-sdk/client-cloudformation'
22
import { DynamoDBClient } from '@aws-sdk/client-dynamodb'
33
import { SSMClient } from '@aws-sdk/client-ssm'
4+
import { TimestreamQueryClient } from '@aws-sdk/client-timestream-query'
5+
import { TimestreamWriteClient } from '@aws-sdk/client-timestream-write'
46
import { runFolder } from '@nordicsemiconductor/bdd-markdown'
57
import { stackOutput } from '@nordicsemiconductor/cloudformation-helpers'
6-
import { queryClient } from '@nordicsemiconductor/timestream-helpers'
78
import chalk from 'chalk'
89
import { randomUUID } from 'node:crypto'
910
import path from 'node:path'
@@ -13,12 +14,13 @@ import {
1314
TEST_RESOURCES_STACK_NAME,
1415
} from '../cdk/stacks/stackConfig.js'
1516
import type { StackOutputs as TestStackOutputs } from '../cdk/test-resources/TestResourcesStack.js'
17+
import { storeRecordsInTimestream } from '../historicalData/storeRecordsInTimestream.js'
1618
import { getSettings as nrfCloudSettings } from '../nrfcloud/settings.js'
1719
import { deleteSettings, getSettings, putSettings } from '../util/settings.js'
1820
import type { WebSocketClient } from './lib/websocket.js'
1921
import { configStepRunners } from './steps/config.js'
2022
import { steps as deviceSteps } from './steps/device.js'
21-
import { steps as historicalSteps } from './steps/historicalData.js'
23+
import { steps as historicalDataSteps } from './steps/historicalData.js'
2224
import { steps as mocknRFCloudSteps } from './steps/mocknRFCloud.js'
2325
import { steps as storageSteps } from './steps/storage.js'
2426
import { websocketStepRunners } from './steps/websocket.js'
@@ -76,8 +78,16 @@ const configSettings = getSettings({
7678
})
7779

7880
const db = new DynamoDBClient({})
79-
const timestream = await queryClient()
80-
81+
const timestream = new TimestreamQueryClient({})
82+
const writeTimestream = new TimestreamWriteClient({})
83+
const [DatabaseName, TableName] = config.historicalDataTableInfo.split('|')
84+
if (DatabaseName === undefined || TableName === undefined)
85+
throw Error('historicalDataTableInfo is not configured')
86+
const storeTimestream = storeRecordsInTimestream({
87+
timestream: writeTimestream,
88+
DatabaseName,
89+
TableName,
90+
})
8191
const print = (arg: unknown) =>
8292
typeof arg === 'object' ? JSON.stringify(arg) : arg
8393

@@ -125,7 +135,7 @@ runner
125135
.addStepRunners(...webSocketSteps)
126136
.addStepRunners(...deviceSteps(accountDeviceSettings, db))
127137
.addStepRunners(...mocknRFCloudSteps({ db }))
128-
.addStepRunners(...historicalSteps({ timestream }))
138+
.addStepRunners(...historicalDataSteps({ timestream, storeTimestream }))
129139
.addStepRunners(...storageSteps())
130140
.addStepRunners(...configSteps)
131141

feature-runner/steps/config.ts

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,9 +65,13 @@ export const configStepRunners = ({
6565
} => ({
6666
steps: createConfigStepRunners({ configWriter }),
6767
cleanup: async () => {
68-
const settings = await configSettings()
69-
for (const property in settings) {
70-
await configRemover({ property })
68+
try {
69+
const settings = await configSettings()
70+
for (const property in settings) {
71+
await configRemover({ property })
72+
}
73+
} catch {
74+
// just ignore any error if there is no configure
7175
}
7276
},
7377
})

feature-runner/steps/device.ts

Lines changed: 49 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
import { DynamoDBClient } from '@aws-sdk/client-dynamodb'
1+
import { DynamoDBClient, GetItemCommand } from '@aws-sdk/client-dynamodb'
2+
import { unmarshall } from '@aws-sdk/util-dynamodb'
23
import {
34
codeBlockOrThrow,
45
matchGroups,
@@ -8,6 +9,7 @@ import {
89
type StepRunnerArgs,
910
} from '@nordicsemiconductor/bdd-markdown'
1011
import { Type } from '@sinclair/typebox'
12+
import assert from 'assert/strict'
1113
import mqtt from 'mqtt'
1214
import { randomUUID } from 'node:crypto'
1315
import { readFileSync } from 'node:fs'
@@ -41,8 +43,8 @@ const createDeviceForModel =
4143
step.title,
4244
)
4345
if (match === null) return noMatch
44-
const { model, storageName } = match
4546

47+
const { model, storageName } = match
4648
const fingerprint = `92b.${generateCode()}`
4749
const id = randomUUID()
4850

@@ -93,8 +95,45 @@ const createDeviceForModel =
9395
progress(`Device registered: ${fingerprint} (${id})`)
9496
}
9597

98+
const getDevice =
99+
({ db }: { db: DynamoDBClient }) =>
100+
async ({
101+
step,
102+
log: {
103+
step: { progress },
104+
},
105+
context: { devicesTable },
106+
}: StepRunnerArgs<World>): Promise<StepRunResult> => {
107+
const match = matchGroups(
108+
Type.Object({
109+
key: Type.String(),
110+
}),
111+
)(/^The device id `(?<key>[^`]+)` should equal$/, step.title)
112+
113+
if (match === null) return noMatch
114+
115+
progress(`Get data with id ${match.key} from ${devicesTable}`)
116+
const res = await db.send(
117+
new GetItemCommand({
118+
TableName: devicesTable,
119+
Key: {
120+
deviceId: { S: match.key ?? '' },
121+
},
122+
}),
123+
)
124+
125+
progress(
126+
`Data returned from query: `,
127+
JSON.stringify(res.Item ?? {}, null, 2),
128+
)
129+
assert.deepEqual(
130+
unmarshall(res.Item ?? {}),
131+
JSON.parse(codeBlockOrThrow(step).code),
132+
)
133+
}
134+
96135
const publishDeviceMessage =
97-
(bridgeInfo: Settings) =>
136+
(nRFCloudSettings: Settings) =>
98137
async ({
99138
step,
100139
log: {
@@ -117,14 +156,14 @@ const publishDeviceMessage =
117156
progress(`Device id ${match.id} publishes to topic ${match.topic}`)
118157
await new Promise((resolve, reject) => {
119158
const mqttClient = mqtt.connect({
120-
host: bridgeInfo.mqttEndpoint,
159+
host: nRFCloudSettings.mqttEndpoint,
121160
port: 8883,
122161
protocol: 'mqtts',
123162
protocolVersion: 4,
124163
clean: true,
125164
clientId: match.id,
126-
key: bridgeInfo.accountDevicePrivateKey,
127-
cert: bridgeInfo.accountDeviceClientCert,
165+
key: nRFCloudSettings.accountDevicePrivateKey,
166+
cert: nRFCloudSettings.accountDeviceClientCert,
128167
ca: readFileSync(
129168
path.join(process.cwd(), 'data', 'AmazonRootCA1.pem'),
130169
'utf-8',
@@ -133,7 +172,7 @@ const publishDeviceMessage =
133172

134173
mqttClient.on('connect', () => {
135174
progress('connected')
136-
const topic = `${bridgeInfo.mqttTopicPrefix}${match.topic}`
175+
const topic = `${nRFCloudSettings.mqttTopicPrefix}${match.topic}`
137176
progress('publishing', message, topic)
138177
mqttClient.publish(topic, JSON.stringify(message), (error) => {
139178
if (error) return reject(error)
@@ -150,9 +189,10 @@ const publishDeviceMessage =
150189
}
151190

152191
export const steps = (
153-
bridgeInfo: Settings,
192+
nRFCloudSettings: Settings,
154193
db: DynamoDBClient,
155194
): StepRunner<World & Record<string, string>>[] => [
156195
createDeviceForModel({ db }),
157-
publishDeviceMessage(bridgeInfo),
196+
getDevice({ db }),
197+
publishDeviceMessage(nRFCloudSettings),
158198
]

feature-runner/steps/historicalData.ts

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ import * as chai from 'chai'
1616
import { expect } from 'chai'
1717
import chaiSubset from 'chai-subset'
1818
import pRetry from 'p-retry'
19+
import { convertMessageToTimestreamRecords } from '../../historicalData/convertMessageToTimestreamRecords.js'
20+
import { storeRecordsInTimestream } from '../../historicalData/storeRecordsInTimestream.js'
1921
import type { World } from '../run-features.js'
2022

2123
chai.use(chaiSubset)
@@ -104,8 +106,46 @@ const assertResult = async ({
104106
)
105107
}
106108

109+
const writeTimestream =
110+
(store: ReturnType<typeof storeRecordsInTimestream>) =>
111+
async ({
112+
step,
113+
log: {
114+
step: { progress },
115+
},
116+
context: { historicalDataTableInfo },
117+
}: StepRunnerArgs<World>): Promise<StepRunResult> => {
118+
const match = matchGroups(
119+
Type.Object({
120+
deviceId: Type.String(),
121+
}),
122+
)(
123+
/^I write Timestream for the device `(?<deviceId>[^`]+)` with this message$/,
124+
step.title,
125+
)
126+
if (match === null) return noMatch
127+
128+
const message = JSON.parse(codeBlockOrThrow(step).code)
129+
await store(convertMessageToTimestreamRecords(message), {
130+
Dimensions: [
131+
{
132+
Name: 'deviceId',
133+
Value: match.deviceId,
134+
},
135+
],
136+
})
137+
138+
progress(`Write to timestream: ${JSON.stringify(message, null, 2)}`)
139+
}
140+
107141
export const steps = ({
108142
timestream,
143+
storeTimestream,
109144
}: {
110145
timestream: TimestreamQueryClient
111-
}): StepRunner<World>[] => [queryTimestream(timestream), assertResult]
146+
storeTimestream: ReturnType<typeof storeRecordsInTimestream>
147+
}): StepRunner<World>[] => [
148+
queryTimestream(timestream),
149+
writeTimestream(storeTimestream),
150+
assertResult,
151+
]

0 commit comments

Comments
 (0)