From 31a464759c69fb05c474e490690fca9b9a911621 Mon Sep 17 00:00:00 2001 From: Bogdan Katishev Date: Tue, 31 Aug 2021 13:35:52 +0200 Subject: [PATCH 1/5] Add new custom resource: elasticsearch.IngestPipeline --- custom_resources/elasticsearch.py | 22 +++++++++++ .../elasticsearch/IngestPipeline/index.py | 38 +++++++++++++++++++ .../IngestPipeline/requirements.txt | 2 + 3 files changed, 62 insertions(+) create mode 100644 custom_resources/elasticsearch.py create mode 100644 lambda_code/elasticsearch/IngestPipeline/index.py create mode 100644 lambda_code/elasticsearch/IngestPipeline/requirements.txt diff --git a/custom_resources/elasticsearch.py b/custom_resources/elasticsearch.py new file mode 100644 index 0000000..f0848a7 --- /dev/null +++ b/custom_resources/elasticsearch.py @@ -0,0 +1,22 @@ +"""Custom resources related to Elasticsearch.""" +from six import string_types +from .LambdaBackedCustomResource import LambdaBackedCustomResource + + +class IngestPipeline(LambdaBackedCustomResource): + props = { + 'EsHost': (string_types, True), + 'PipelineName': (string_types, True), + 'IngestDocument': (dict, True), + } + + @classmethod + def _lambda_policy(cls): + return { + "Version": "2012-10-17", + "Statement": [{ + "Effect": "Allow", + "Action": "es:ESHttp*", + "Resource": "*", + }], + } diff --git a/lambda_code/elasticsearch/IngestPipeline/index.py b/lambda_code/elasticsearch/IngestPipeline/index.py new file mode 100644 index 0000000..3a2ba5a --- /dev/null +++ b/lambda_code/elasticsearch/IngestPipeline/index.py @@ -0,0 +1,38 @@ +""" +Custom resource to create an ingest pipeline in your AWS Elasticsearch Cluster. +""" + +import json +import requests +from cfn_custom_resource import CloudFormationCustomResource +from _metadata import CUSTOM_RESOURCE_NAME + + +class IngestPipeline(CloudFormationCustomResource): + RESOURCE_TYPE_SPEC = CUSTOM_RESOURCE_NAME + + def validate(self): + self.es_host = self.resource_properties['EsHost'] + self.pipeline_name = self.resource_properties['PipelineName'] + self.ingest_doc = self.resource_properties['IngestDocument'] + + def create(self): + url = 'https://' + self.es_host + '/_ingest/pipeline/' + self.pipeline_name + requests.put(url, json.dumps(self.ingest_doc)) + return {} + + def update(self): + return self.create() + + def delete(self): + url = 'https://' + self.es_host + '/_ingest/pipeline/' + self.pipeline_name + try: + requests.delete(url) + except (requests.exceptions.Timeout, + requests.exceptions.TooManyRedirects, + requests.exceptions.RequestException): + # Assume already deleted + pass + + +handler = IngestPipeline.get_handler() diff --git a/lambda_code/elasticsearch/IngestPipeline/requirements.txt b/lambda_code/elasticsearch/IngestPipeline/requirements.txt new file mode 100644 index 0000000..519e058 --- /dev/null +++ b/lambda_code/elasticsearch/IngestPipeline/requirements.txt @@ -0,0 +1,2 @@ +git+https://github.com/iRobotCorporation/cfn-custom-resource#egg=cfn-custom-resource +requests \ No newline at end of file From 438fdda2d7fbcf887fe50a268ab5e020821fc86a Mon Sep 17 00:00:00 2001 From: Ben Bridts Date: Tue, 31 Aug 2021 17:44:30 +0200 Subject: [PATCH 2/5] update ReadMe --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 53924da..1db9d25 100644 --- a/README.md +++ b/README.md @@ -33,7 +33,7 @@ the custom resource. The code corresponding to a class `Resource` in the module Building -------- -The build script gathers all custom resources in a single (generated) +The build script (`build.py`) gathers all custom resources in a single (generated) CloudFormation template. Each resource inside `lambda_code` is zipped. The following (relative) paths are treated specially: From 911393f5ff081ee7f7adc5148a875b1a0f3eca8e Mon Sep 17 00:00:00 2001 From: Ben Bridts Date: Tue, 31 Aug 2021 17:46:43 +0200 Subject: [PATCH 3/5] Add Vpc Support to ElasticSearch --- build.py | 65 ++++++++++++++++--- custom_resources/elasticsearch.py | 13 +++- .../index.py | 0 .../requirements.txt | 0 4 files changed, 68 insertions(+), 10 deletions(-) rename lambda_code/elasticsearch/{IngestPipeline => IngestPipelineViaVpc}/index.py (100%) rename lambda_code/elasticsearch/{IngestPipeline => IngestPipelineViaVpc}/requirements.txt (100%) diff --git a/build.py b/build.py index 2b966cf..69c0528 100644 --- a/build.py +++ b/build.py @@ -22,10 +22,10 @@ from pip._internal import main as pipmain # pip 10 import troposphere -from troposphere import Template, awslambda, logs, Sub, Output, Export, GetAtt, constants +from troposphere import Template, awslambda, logs, Sub, Output, Export, GetAtt, constants, Ref, Not, Equals, Join, ec2 from custom_resources.LambdaBackedCustomResource import LambdaBackedCustomResource -parser = argparse.ArgumentParser(description='Build custom resources CloudForamtion template') +parser = argparse.ArgumentParser(description='Build custom resources CloudFormation template') parser.add_argument('--class-dir', help='Where to look for the CustomResource classes', default='custom_resources') parser.add_argument('--lambda-dir', help='Where to look for defined Lambda functions', @@ -55,6 +55,18 @@ template.set_parameter_label(s3_path, "S3 path") template.add_parameter_to_group(s3_path, lambda_code_location) +vpc_subnets = template.add_parameter(troposphere.Parameter( + "VpcSubnets", + # Type cannot be a list of subnets ids if we want them to also support being empty + Type=constants.COMMA_DELIMITED_LIST, + Default="", + Description="(optional) VPC subnets for Custom Resources that run attached to a VPC" +)) +template.set_parameter_label(vpc_subnets, "VPC Subnets") +vpc_settings = template.add_parameter_to_group(vpc_subnets, "VPC Settings") + +has_vpc_subnets = template.add_condition("HasVpcSubnets", Not(Equals(Join("", Ref(vpc_subnets)), ""))) + def rec_split_path(path: str) -> typing.List[str]: """ @@ -248,9 +260,38 @@ def create_zip_file(custom_resource: CustomResource, output_dir: str): zip_filename = create_zip_file(custom_resource, args.output_dir) + function_settings = custom_resource.troposphere_class.function_settings() + needs_vpc = False + created_aws_objects: list[troposphere.BaseAWSObject] = [] + if "VpcConfig" in function_settings: + needs_vpc = True + security_group = template.add_resource(ec2.SecurityGroup( + "{custom_resource_name}SecurityGroup".format(custom_resource_name=custom_resource_name_cfn), + GroupDescription="Security Group for the {custom_resource_name} custom resource".format( + custom_resource_name='.'.join(custom_resource.name) + ), + )) + created_aws_objects.append(security_group) + created_aws_objects.append(template.add_output(Output( + "{custom_resource_name}SecurityGroup".format(custom_resource_name=custom_resource_name_cfn), + Value=Ref(security_group), + Description="Security Group used by the {custom_resource_name} custom resource".format( + custom_resource_name='.'.join(custom_resource.name) + ), + Export=Export(Sub("${{AWS::StackName}}-{custom_resource_name}SecurityGroup".format( + custom_resource_name=custom_resource_name_cfn, + ))), + ))) + + function_settings["VpcConfig"] = awslambda.VPCConfig( + SecurityGroupIds=[Ref(security_group)], + SubnetIds=Ref(vpc_subnets) + ) + role = template.add_resource(custom_resource.troposphere_class.lambda_role( "{custom_resource_name}Role".format(custom_resource_name=custom_resource_name_cfn), )) + created_aws_objects.append(role) awslambdafunction = template.add_resource(awslambda.Function( "{custom_resource_name}Function".format(custom_resource_name=custom_resource_name_cfn), Code=awslambda.Code( @@ -259,14 +300,15 @@ def create_zip_file(custom_resource: CustomResource, output_dir: str): zip_filename]), ), Role=GetAtt(role, 'Arn'), - **custom_resource.troposphere_class.function_settings() + **function_settings )) - template.add_resource(logs.LogGroup( + created_aws_objects.append(awslambdafunction) + created_aws_objects.append(template.add_resource(logs.LogGroup( "{custom_resource_name}Logs".format(custom_resource_name=custom_resource_name_cfn), LogGroupName=troposphere.Join('', ["/aws/lambda/", troposphere.Ref(awslambdafunction)]), RetentionInDays=90, - )) - template.add_output(Output( + ))) + created_aws_objects.append(template.add_output(Output( "{custom_resource_name}ServiceToken".format(custom_resource_name=custom_resource_name_cfn), Value=GetAtt(awslambdafunction, 'Arn'), Description="ServiceToken for the {custom_resource_name} custom resource".format( @@ -275,8 +317,8 @@ def create_zip_file(custom_resource: CustomResource, output_dir: str): Export=Export(Sub("${{AWS::StackName}}-{custom_resource_name}ServiceToken".format( custom_resource_name=custom_resource_name_cfn ))) - )) - template.add_output(Output( + ))) + created_aws_objects.append(template.add_output(Output( "{custom_resource_name}Role".format(custom_resource_name=custom_resource_name_cfn), Value=GetAtt(role, 'Arn'), Description="Role used by the {custom_resource_name} custom resource".format( @@ -285,7 +327,12 @@ def create_zip_file(custom_resource: CustomResource, output_dir: str): Export=Export(Sub("${{AWS::StackName}}-{custom_resource_name}Role".format( custom_resource_name=custom_resource_name_cfn, ))), - )) + ))) + if needs_vpc: + for aws_object in created_aws_objects: + if aws_object.resource.get('Condition'): + raise ValueError("Can't handle multiple conditions") + aws_object.Condition = has_vpc_subnets with open(os.path.join(args.output_dir, 'cfn.json'), 'w') as f: f.write(template.to_json()) diff --git a/custom_resources/elasticsearch.py b/custom_resources/elasticsearch.py index f0848a7..9d90a45 100644 --- a/custom_resources/elasticsearch.py +++ b/custom_resources/elasticsearch.py @@ -3,7 +3,7 @@ from .LambdaBackedCustomResource import LambdaBackedCustomResource -class IngestPipeline(LambdaBackedCustomResource): +class IngestPipelineViaVpc(LambdaBackedCustomResource): props = { 'EsHost': (string_types, True), 'PipelineName': (string_types, True), @@ -20,3 +20,14 @@ def _lambda_policy(cls): "Resource": "*", }], } + + @classmethod + def _update_lambda_settings(cls, settings): + """ + Update the default settings for the lambda function. + + :param settings: The default settings that will be used + :return: updated settings + """ + settings['VpcConfig'] = {} # build.py adds the config if the key is present + return settings diff --git a/lambda_code/elasticsearch/IngestPipeline/index.py b/lambda_code/elasticsearch/IngestPipelineViaVpc/index.py similarity index 100% rename from lambda_code/elasticsearch/IngestPipeline/index.py rename to lambda_code/elasticsearch/IngestPipelineViaVpc/index.py diff --git a/lambda_code/elasticsearch/IngestPipeline/requirements.txt b/lambda_code/elasticsearch/IngestPipelineViaVpc/requirements.txt similarity index 100% rename from lambda_code/elasticsearch/IngestPipeline/requirements.txt rename to lambda_code/elasticsearch/IngestPipelineViaVpc/requirements.txt From d69a85f443e195e91420f1ddca2af38eb2c59c34 Mon Sep 17 00:00:00 2001 From: Bogdan Katishev Date: Wed, 1 Sep 2021 11:49:51 +0200 Subject: [PATCH 4/5] Sign HTTP requests to Amazon Elasticsearch Service --- .../IngestPipelineViaVpc/index.py | 45 ++++++++++++++----- .../IngestPipelineViaVpc/requirements.txt | 3 +- 2 files changed, 37 insertions(+), 11 deletions(-) diff --git a/lambda_code/elasticsearch/IngestPipelineViaVpc/index.py b/lambda_code/elasticsearch/IngestPipelineViaVpc/index.py index 3a2ba5a..372a001 100644 --- a/lambda_code/elasticsearch/IngestPipelineViaVpc/index.py +++ b/lambda_code/elasticsearch/IngestPipelineViaVpc/index.py @@ -3,12 +3,17 @@ """ import json -import requests +import os + from cfn_custom_resource import CloudFormationCustomResource from _metadata import CUSTOM_RESOURCE_NAME +from elasticsearch import Elasticsearch, RequestsHttpConnection, ElasticsearchException +from requests_aws4auth import AWS4Auth + +REGION = os.environ['AWS_REGION'] -class IngestPipeline(CloudFormationCustomResource): +class IngestPipelineViaVpc(CloudFormationCustomResource): RESOURCE_TYPE_SPEC = CUSTOM_RESOURCE_NAME def validate(self): @@ -17,22 +22,42 @@ def validate(self): self.ingest_doc = self.resource_properties['IngestDocument'] def create(self): - url = 'https://' + self.es_host + '/_ingest/pipeline/' + self.pipeline_name - requests.put(url, json.dumps(self.ingest_doc)) + service = 'es' + credentials = self.get_boto3_session().get_credentials() + awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, REGION, service, session_token=credentials.token) + + es = Elasticsearch( + hosts = [{'host': self.es_host, 'port': 443}], + http_auth = awsauth, + use_ssl = True, + verify_certs = True, + connection_class = RequestsHttpConnection + ) + + es.ingest.put_pipeline(id=self.pipeline_name, body=json.dumps(self.ingest_doc)) return {} def update(self): return self.create() def delete(self): - url = 'https://' + self.es_host + '/_ingest/pipeline/' + self.pipeline_name + service = 'es' + credentials = self.get_boto3_session().get_credentials() + awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, REGION, service, session_token=credentials.token) + + es = Elasticsearch( + hosts = [{'host': self.es_host, 'port': 443}], + http_auth = awsauth, + use_ssl = True, + verify_certs = True, + connection_class = RequestsHttpConnection + ) + try: - requests.delete(url) - except (requests.exceptions.Timeout, - requests.exceptions.TooManyRedirects, - requests.exceptions.RequestException): + es.ingest.delete_pipeline(id=self.pipeline_name) + except ElasticsearchException: # Assume already deleted pass -handler = IngestPipeline.get_handler() +handler = IngestPipelineViaVpc.get_handler() diff --git a/lambda_code/elasticsearch/IngestPipelineViaVpc/requirements.txt b/lambda_code/elasticsearch/IngestPipelineViaVpc/requirements.txt index 519e058..64f891e 100644 --- a/lambda_code/elasticsearch/IngestPipelineViaVpc/requirements.txt +++ b/lambda_code/elasticsearch/IngestPipelineViaVpc/requirements.txt @@ -1,2 +1,3 @@ git+https://github.com/iRobotCorporation/cfn-custom-resource#egg=cfn-custom-resource -requests \ No newline at end of file +elasticsearch +requests-aws4auth \ No newline at end of file From cf0b80fcfa718bbf227c4370f4fc63d0ebebdefd Mon Sep 17 00:00:00 2001 From: Bogdan Katishev Date: Wed, 1 Sep 2021 11:54:50 +0200 Subject: [PATCH 5/5] Only allow ESHttpPut and ESHttpDelete in lambda_policy --- custom_resources/elasticsearch.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/custom_resources/elasticsearch.py b/custom_resources/elasticsearch.py index 9d90a45..8aac86c 100644 --- a/custom_resources/elasticsearch.py +++ b/custom_resources/elasticsearch.py @@ -16,7 +16,10 @@ def _lambda_policy(cls): "Version": "2012-10-17", "Statement": [{ "Effect": "Allow", - "Action": "es:ESHttp*", + "Action": [ + "es:ESHttpPut", + "es:ESHttpDelete", + ], "Resource": "*", }], }