Skip to content

Commit ef035df

Browse files
authored
fix: error jars parameters when edit v1.0 pipeline (#671)
1 parent ee1d524 commit ef035df

File tree

4 files changed

+52
-24
lines changed

4 files changed

+52
-24
lines changed

src/control-plane/backend/lambda/api/common/utils.ts

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -313,21 +313,25 @@ function _getTransformerPluginInfo(pipeline: IPipeline, resources: CPipelineReso
313313
if (pipeline.dataCollectionSDK === DataCollectionSDK.CLICKSTREAM) {
314314
const defaultTransformer = resources.plugins?.filter(p => p.id === 'BUILT-IN-1')[0];
315315
if (defaultTransformer?.mainFunction) {
316-
transformerClassNames.push(defaultTransformer?.mainFunction);
316+
transformerClassNames.push(_getTransformClassNameByVersion(defaultTransformer?.mainFunction, pipeline.templateVersion));
317317
}
318318
} else {
319319
throw new ClickStreamBadRequestError('Transform plugin is required.');
320320
}
321321
} else {
322-
const { classNames, pluginJars, pluginFiles } = _getTransformerPluginInfoFromResources(resources, pipeline.dataProcessing?.transformPlugin);
322+
const { classNames, pluginJars, pluginFiles } = _getTransformerPluginInfoFromResources(
323+
resources,
324+
pipeline.dataProcessing?.transformPlugin,
325+
pipeline.templateVersion,
326+
);
323327
transformerClassNames= transformerClassNames.concat(classNames);
324328
transformerPluginJars = transformerPluginJars.concat(pluginJars);
325329
transformerPluginFiles = transformerPluginFiles.concat(pluginFiles);
326330
}
327331
return { transformerClassNames, transformerPluginJars, transformerPluginFiles };
328332
}
329333

330-
function _getTransformerPluginInfoFromResources(resources: CPipelineResources, transformPluginId: string) {
334+
function _getTransformerPluginInfoFromResources(resources: CPipelineResources, transformPluginId: string, templateVersion?: string) {
331335
const classNames: string[] = [];
332336
const pluginJars: string[] = [];
333337
const pluginFiles: string[] = [];
@@ -341,11 +345,21 @@ function _getTransformerPluginInfoFromResources(resources: CPipelineResources, t
341345
}
342346
}
343347
if (transform?.mainFunction) {
344-
classNames.push(transform?.mainFunction);
348+
classNames.push(_getTransformClassNameByVersion(transform?.mainFunction, templateVersion));
345349
}
346350
return { classNames, pluginJars, pluginFiles };
347351
}
348352

353+
function _getTransformClassNameByVersion(mainFunction: string, templateVersion?: string) {
354+
if (templateVersion?.startsWith('v1.0')) {
355+
return mainFunction.replace(
356+
'software.aws.solution.clickstream.TransformerV2',
357+
'software.aws.solution.clickstream.Transformer',
358+
);
359+
}
360+
return mainFunction;
361+
}
362+
349363
function getSubnetType(routeTable: RouteTable) {
350364
const routes = routeTable.Routes;
351365
let subnetType = SubnetType.ISOLATED;

src/control-plane/backend/lambda/api/model/pipeline.ts

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,7 @@ export interface IPipeline {
236236
readonly version: string;
237237
readonly versionTag: string;
238238
readonly createAt: number;
239-
readonly updateAt: number;
239+
updateAt: number;
240240
readonly operator: string;
241241
readonly deleted: boolean;
242242
}
@@ -292,6 +292,7 @@ export class CPipeline {
292292
throw new ClickStreamBadRequestError('Pipeline Workflow can not empty.');
293293
}
294294
this.pipeline.lastAction = 'Update';
295+
this.pipeline.templateVersion = oldPipeline.templateVersion;
295296
validateIngestionServerNum(this.pipeline.ingestionServer.size);
296297
this.pipeline.executionName = `main-${uuidv4()}`;
297298

@@ -308,7 +309,6 @@ export class CPipeline {
308309
// create new execution
309310
const execWorkflow = this.stackManager.getExecWorkflow();
310311
this.pipeline.executionArn = await this.stackManager.execute(execWorkflow, this.pipeline.executionName);
311-
this.pipeline.templateVersion = oldPipeline.templateVersion;
312312
this.pipeline.tags = oldPipeline.tags;
313313
this.pipeline.workflow = this.stackManager.getWorkflow();
314314

@@ -347,19 +347,10 @@ export class CPipeline {
347347
if (!AllowedList.includes(paramName)) {
348348
notAllowEdit.push(paramName);
349349
} else {
350-
let parameterValue = diffParameters.edited.find(p => p[0] === key)?.[1];
351-
if (stackName.startsWith(`Clickstream-${PipelineStackType.DATA_PROCESSING}`) &&
352-
paramName === 'TransformerAndEnrichClassNames' &&
353-
oldPipeline.templateVersion?.startsWith('v1.0')) {
354-
parameterValue = parameterValue.replace(
355-
'software.aws.solution.clickstream.TransformerV2',
356-
'software.aws.solution.clickstream.Transformer',
357-
);
358-
}
359350
editParameters.push({
360351
stackName: stackName,
361352
parameterKey: paramName,
362-
parameterValue: parameterValue,
353+
parameterValue: diffParameters.edited.find(p => p[0] === key)?.[1],
363354
});
364355
}
365356
}
@@ -414,6 +405,7 @@ export class CPipeline {
414405
const execWorkflow = this.stackManager.getExecWorkflow();
415406
this.pipeline.executionArn = await this.stackManager.execute(execWorkflow, executionName);
416407
// update pipeline metadata
408+
this.pipeline.updateAt = Date.now();
417409
await store.updatePipelineAtCurrentVersion(this.pipeline);
418410
}
419411

@@ -427,6 +419,7 @@ export class CPipeline {
427419
const execWorkflow = this.stackManager.getExecWorkflow();
428420
this.pipeline.executionArn = await this.stackManager.execute(execWorkflow, executionName);
429421
// update pipeline metadata
422+
this.pipeline.updateAt = Date.now();
430423
await store.updatePipelineAtCurrentVersion(this.pipeline);
431424

432425
// bind plugin

src/control-plane/backend/lambda/api/store/dynamodb/dynamodb-store.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -690,7 +690,7 @@ export class DynamoDbStore implements ClickStreamStore {
690690
':executionArn': pipeline.executionArn ?? '',
691691
':templateVersion': pipeline.templateVersion ?? '',
692692
':lastAction': pipeline.lastAction ?? '',
693-
':updateAt': Date.now().toString(),
693+
':updateAt': pipeline.updateAt,
694694
':operator': pipeline.operator,
695695
},
696696
ReturnValues: 'ALL_NEW',

src/control-plane/backend/lambda/api/test/api/workflow.test.ts

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1238,7 +1238,10 @@ describe('Workflow test', () => {
12381238
subnetsIsolated: true,
12391239
subnetsCross3AZ: true,
12401240
});
1241-
const pipeline: CPipeline = new CPipeline({ ...MSK_DATA_PROCESSING_ATHENA_PIPELINE });
1241+
const pipeline: CPipeline = new CPipeline({
1242+
...MSK_DATA_PROCESSING_ATHENA_PIPELINE,
1243+
templateVersion: FULL_SOLUTION_VERSION,
1244+
});
12421245
const wf = await pipeline.generateWorkflow();
12431246
const expected = {
12441247
Version: '2022-03-15',
@@ -1413,7 +1416,10 @@ describe('Workflow test', () => {
14131416
subnetsIsolated: true,
14141417
subnetsCross3AZ: true,
14151418
});
1416-
const pipeline: CPipeline = new CPipeline({ ...MSK_DATA_PROCESSING_NEW_SERVERLESS_PIPELINE });
1419+
const pipeline: CPipeline = new CPipeline({
1420+
...MSK_DATA_PROCESSING_NEW_SERVERLESS_PIPELINE,
1421+
templateVersion: FULL_SOLUTION_VERSION,
1422+
});
14171423
const wf = await pipeline.generateWorkflow();
14181424
const expected = {
14191425
Version: '2022-03-15',
@@ -1609,7 +1615,10 @@ describe('Workflow test', () => {
16091615
subnetsCross3AZ: true,
16101616
noVpcEndpoint: true,
16111617
});
1612-
const pipeline: CPipeline = new CPipeline({ ...KINESIS_DATA_PROCESSING_NEW_REDSHIFT_PIPELINE });
1618+
const pipeline: CPipeline = new CPipeline({
1619+
...KINESIS_DATA_PROCESSING_NEW_REDSHIFT_PIPELINE,
1620+
templateVersion: FULL_SOLUTION_VERSION,
1621+
});
16131622
const wf = await pipeline.generateWorkflow();
16141623
const expected = {
16151624
Version: '2022-03-15',
@@ -1762,7 +1771,10 @@ describe('Workflow test', () => {
17621771
publicAZContainPrivateAZ: true,
17631772
noVpcEndpoint: true,
17641773
});
1765-
const pipeline: CPipeline = new CPipeline({ ...KINESIS_DATA_PROCESSING_PROVISIONED_REDSHIFT_PIPELINE });
1774+
const pipeline: CPipeline = new CPipeline({
1775+
...KINESIS_DATA_PROCESSING_PROVISIONED_REDSHIFT_PIPELINE,
1776+
templateVersion: FULL_SOLUTION_VERSION,
1777+
});
17661778
const wf = await pipeline.generateWorkflow();
17671779
const expected = {
17681780
Version: '2022-03-15',
@@ -1936,7 +1948,10 @@ describe('Workflow test', () => {
19361948
publicAZContainPrivateAZ: true,
19371949
noVpcEndpoint: true,
19381950
});
1939-
const pipeline: CPipeline = new CPipeline({ ...KINESIS_DATA_PROCESSING_PROVISIONED_REDSHIFT_QUICKSIGHT_PIPELINE });
1951+
const pipeline: CPipeline = new CPipeline({
1952+
...KINESIS_DATA_PROCESSING_PROVISIONED_REDSHIFT_QUICKSIGHT_PIPELINE,
1953+
templateVersion: FULL_SOLUTION_VERSION,
1954+
});
19401955
const wf = await pipeline.generateWorkflow();
19411956
const expected = {
19421957
Version: '2022-03-15',
@@ -2306,7 +2321,10 @@ describe('Workflow test', () => {
23062321
subnetsCross3AZ: true,
23072322
noVpcEndpoint: true,
23082323
});
2309-
const pipeline: CPipeline = new CPipeline({ ...KINESIS_DATA_PROCESSING_NEW_REDSHIFT_QUICKSIGHT_PIPELINE });
2324+
const pipeline: CPipeline = new CPipeline({
2325+
...KINESIS_DATA_PROCESSING_NEW_REDSHIFT_QUICKSIGHT_PIPELINE,
2326+
templateVersion: FULL_SOLUTION_VERSION,
2327+
});
23102328
const wf = await pipeline.generateWorkflow();
23112329
const expected = {
23122330
Version: '2022-03-15',
@@ -2482,7 +2500,10 @@ describe('Workflow test', () => {
24822500
subnetsIsolated: true,
24832501
noApp: true,
24842502
});
2485-
const pipeline: CPipeline = new CPipeline({ ...MSK_DATA_PROCESSING_NEW_SERVERLESS_PIPELINE });
2503+
const pipeline: CPipeline = new CPipeline({
2504+
...MSK_DATA_PROCESSING_NEW_SERVERLESS_PIPELINE,
2505+
templateVersion: FULL_SOLUTION_VERSION,
2506+
});
24862507
const wf = await pipeline.generateWorkflow();
24872508
const expected = {
24882509
Version: '2022-03-15',

0 commit comments

Comments
 (0)