diff --git a/README.md b/README.md index f6e03a12..fef99574 100644 --- a/README.md +++ b/README.md @@ -24,7 +24,7 @@ This repository is a part of [Cloud Intelligence Dashboards](https://docs.aws.am This repository contains following elements: * [data-exports](/data-exports) - a Cloud Formation Templates for AWS Data Exports, such as Cost and Usage Report 2.0 and others. This allows a replication of Exports from your Management Account(s) to a Dedicated Data Collection Accounts as well as aggregation of multiple Exports from a set of Linked Accounts. -* [data-collection](/data-collection) - a set of Cloud Formation Templates for collecting infrastructure operational data from Management and Linked Accounts. Such as data from AWS Trusted Advisor, AWS Compute Optimizer, Inventories, Pricing, AWS Health, AWS Support Cases etc. See more about types of data collected [here](/data-collection). +* [data-collection](/data-collection) - a set of Cloud Formation Templates for collecting infrastructure operational data from Management and Linked Accounts. Such as data from AWS Trusted Advisor, AWS Compute Optimizer, Inventories, Pricing, AWS Health, AWS Support Cases, CloudWatch Database Insights, etc. See more about types of data collected [here](/data-collection). * [case-summarization](/case-summarization) - an additional Cloud Formation Template for deploying the AWS Support Case Summarization plugin that offers the capability to summarize cases through Generative AI powered by Amazon Bedrock. * [rls](/rls) - a stack for managing Row Level Security for CID Dashboards. * [security-hub](/security-hub) - Collection of data from AWS Security Hub. diff --git a/data-collection/README.md b/data-collection/README.md index c3468ebb..fd3efa71 100644 --- a/data-collection/README.md +++ b/data-collection/README.md @@ -44,6 +44,7 @@ List of modules and objects collected: | `quicksight` | [Amazon QuickSight](https://aws.amazon.com/quicksight/) | Data Collection Account | Collects QuickSight User and Group information in the Data Collection Account only | | `resilience-hub` | [AWS Resilince Hub](https://aws.amazon.com/resilience-hub/) | Linked Accounts | | | `reference` | Various services | Data Collection Account | Collects reference data for other modules and dashboard to function | +| `rds-multitenant` | [Amazon RDS](https://aws.amazon.com/rds/) | Data Collection Account | Collects CloudWatch Database Insights metrics for multi-tenant RDS instances to enable cost allocation by tenant | ### Deployment Overview @@ -58,6 +59,5 @@ For deployment and further information please reference to this [documentation]( [![Documentation](/.images/documentation.svg)](https://docs.aws.amazon.com/guidance/latest/cloud-intelligence-dashboards/data-exports.html) - ### Contributing See [CONTRIBUTING.md](CONTRIBUTING.md) diff --git a/data-collection/deploy/deploy-data-collection.yaml b/data-collection/deploy/deploy-data-collection.yaml index 8192cd52..77a267f0 100644 --- a/data-collection/deploy/deploy-data-collection.yaml +++ b/data-collection/deploy/deploy-data-collection.yaml @@ -41,7 +41,8 @@ Metadata: - IncludeServiceQuotasModule - IncludeEUCUtilizationModule - IncludeResilienceHubModule - - IncludeReferenceModule + - IncludeReferenceModule + - IncludeRdsMultitenantModule - Label: default: 'EUC (End User Compute) Module Configuration' Parameters: @@ -294,6 +295,11 @@ Parameters: Description: Collects Reference data for other modules AllowedValues: ['yes', 'no'] Default: 'no' + IncludeRdsMultitenantModule: + Type: String + Description: Collects RDS Performance Insights data for multi-tenant cost allocation + AllowedValues: ['yes', 'no'] + Default: 'no' Conditions: DeployTAModule: !Equals [ !Ref IncludeTAModule, "yes"] DeployRightsizingModule: !Equals [ !Ref IncludeRightsizingModule, "yes"] @@ -315,6 +321,7 @@ Conditions: DeployQuickSightModule: !Equals [ !Ref IncludeQuickSightModule, "yes"] DeployServiceQuotasModule: !Equals [ !Ref IncludeServiceQuotasModule, "yes"] DeployResilienceHubModule: !Equals [ !Ref IncludeResilienceHubModule, "yes"] + DeployRdsMultitenantModule: !Equals [ !Ref IncludeRdsMultitenantModule, "yes"] DeployPricingModule: !Or - !Condition DeployInventoryCollectorModule - !Condition DeployRDSUtilizationModule @@ -340,6 +347,7 @@ Conditions: - !Condition DeployServiceQuotasModule - !Condition DeployEUCUtilizationModule - !Condition DeployComputeOptimizerModule + - !Condition DeployRdsMultitenantModule RegionsInScopeIsEmpty: !Equals - !Join [ '', !Split [ ' ', !Ref RegionsInScope ] ] # remove spaces - "" @@ -1528,6 +1536,32 @@ Resources: - RegionsInScopeIsEmpty - !Sub "${AWS::Region}" - !Join [ '', !Split [ ' ', !Ref RegionsInScope ] ] # remove spaces + + RdsMultitenantModule: + Type: AWS::CloudFormation::Stack + Condition: DeployRdsMultitenantModule + Properties: + TemplateURL: "https://dcoccia-test-static-website.s3.eu-central-1.amazonaws.com/module-rds-multitenant.yaml" + Parameters: + DatabaseName: !Ref DatabaseName + DataBucketsKmsKeysArns: !Ref DataBucketsKmsKeysArns + DestinationBucket: !Ref S3Bucket + DestinationBucketARN: !GetAtt S3Bucket.Arn + Schedule: !Ref Schedule + GlueRoleARN: !GetAtt GlueRole.Arn + ResourcePrefix: !Ref ResourcePrefix + LambdaAnalyticsARN: !GetAtt LambdaAnalytics.Arn + MultiAccountRoleName: !Sub "${ResourcePrefix}${MultiAccountRoleName}" + AccountCollectorLambdaARN: !Sub "${AccountCollector.Outputs.LambdaFunctionARN}" + CodeBucket: !If [ ProdCFNTemplateUsed, !FindInMap [RegionMap, !Ref "AWS::Region", CodeBucket], !Ref CFNSourceBucket ] + StepFunctionTemplate: !FindInMap [StepFunctionCode, main-state-machine, TemplatePath] + StepFunctionExecutionRoleARN: !GetAtt StepFunctionExecutionRole.Arn + SchedulerExecutionRoleARN: !GetAtt SchedulerExecutionRole.Arn + RegionsInScope: + Fn::If: + - RegionsInScopeIsEmpty + - !Sub "${AWS::Region}" + - !Join [ '', !Split [ ' ', !Ref RegionsInScope ] ] # remove spaces AccountCollector: Type: AWS::CloudFormation::Stack diff --git a/data-collection/deploy/deploy-data-read-permissions.yaml b/data-collection/deploy/deploy-data-read-permissions.yaml index bb6e3471..9b0771f0 100644 --- a/data-collection/deploy/deploy-data-read-permissions.yaml +++ b/data-collection/deploy/deploy-data-read-permissions.yaml @@ -33,6 +33,7 @@ Metadata: - IncludeLicenseManagerModule - IncludeServiceQuotasModule - IncludeResilienceHubModule + - IncludeRdsMultitenantModule ParameterLabels: ManagementAccountRole: default: "Management account role" @@ -191,6 +192,11 @@ Parameters: Description: Collects Resilience Hub information AllowedValues: ['yes', 'no'] Default: 'no' + IncludeRdsMultitenantModule: + Type: String + Description: Collects RDS Performance Insights data for multi-tenant cost allocation + AllowedValues: ['yes', 'no'] + Default: 'no' Conditions: DeployModuleReadInMgmt: !Equals [!Ref AllowModuleReadInMgmt, "yes"] @@ -230,6 +236,7 @@ Resources: IncludeTransitGatewayModule: !Ref IncludeTransitGatewayModule IncludeServiceQuotasModule: !Ref IncludeServiceQuotasModule IncludeResilienceHubModule: !Ref IncludeResilienceHubModule + DataCollectorOrgAccountModulesReadStackSet: Type: AWS::CloudFormation::StackSet @@ -272,6 +279,8 @@ Resources: ParameterValue: !Ref IncludeServiceQuotasModule - ParameterKey: IncludeResilienceHubModule ParameterValue: !Ref IncludeResilienceHubModule + - ParameterKey: IncludeRdsMultitenantModule + ParameterValue: !Ref IncludeRdsMultitenantModule StackInstancesGroup: - DeploymentTargets: OrganizationalUnitIds: !Split [",", !Ref OrganizationalUnitIds] diff --git a/data-collection/deploy/deploy-in-linked-account.yaml b/data-collection/deploy/deploy-in-linked-account.yaml index e86b8776..de674c6d 100644 --- a/data-collection/deploy/deploy-in-linked-account.yaml +++ b/data-collection/deploy/deploy-in-linked-account.yaml @@ -22,6 +22,7 @@ Metadata: - IncludeTransitGatewayModule - IncludeServiceQuotasModule - IncludeResilienceHubModule + - IncludeRdsMultitenantModule ParameterLabels: DataCollectionAccountID: default: 'Data Collection Account ID' @@ -49,6 +50,8 @@ Metadata: default: 'Include Service Quotas Module' IncludeResilienceHubModule: default: 'Include Resilience Hub Module' + IncludeRdsMultitenantModule: + default: 'Include RDS Multitenant Module' Parameters: DataCollectionAccountID: @@ -112,6 +115,11 @@ Parameters: Description: Collects Resilience Hub data from your accounts AllowedValues: ['yes', 'no'] Default: 'no' + IncludeRdsMultitenantModule: + Type: String + Description: Collects RDS Performance Insights data for multi-tenant cost allocation + AllowedValues: ['yes', 'no'] + Default: 'no' Conditions: IncludeTAModulePolicy: !Equals [!Ref IncludeTAModule, "yes"] @@ -124,6 +132,7 @@ Conditions: IncludeTransitGatewayModulePolicy: !Equals [!Ref IncludeTransitGatewayModule, "yes"] IncludeServiceQuotasModulePolicy: !Equals [!Ref IncludeServiceQuotasModule, "yes"] IncludeResilienceHubModulePolicy: !Equals [!Ref IncludeResilienceHubModule, "yes"] + IncludeRdsMultitenantModulePolicy: !Equals [!Ref IncludeRdsMultitenantModule, "yes"] Outputs: LambdaRole: @@ -155,6 +164,7 @@ Resources: - !Sub "arn:${AWS::Partition}:iam::${DataCollectionAccountID}:role/${ResourcePrefix}support-cases-LambdaRole" - !Sub "arn:${AWS::Partition}:iam::${DataCollectionAccountID}:role/${ResourcePrefix}service-quotas-LambdaRole" - !Sub "arn:${AWS::Partition}:iam::${DataCollectionAccountID}:role/${ResourcePrefix}resilience-hub-LambdaRole" + - !Sub "arn:${AWS::Partition}:iam::${DataCollectionAccountID}:role/${ResourcePrefix}RDSMultitenant-Lambda-Role" Path: / Metadata: cfn_nag: @@ -460,6 +470,31 @@ Resources: Resource: "*" # Wildcard required as actions do not support resource-level permissions Roles: - Ref: LambdaRole + Metadata: + cfn_nag: + rules_to_suppress: + - id: W12 + reason: "Policy is used for scanning of a wide range of resources" + # RDS Multitenant policy + RdsMultitenantPolicy: + Type: 'AWS::IAM::Policy' + Condition: IncludeRdsMultitenantModulePolicy + Properties: + PolicyName: RdsMultitenantPolicy + PolicyDocument: + Version: "2012-10-17" + Statement: + - Effect: "Allow" + Action: + - "rds:DescribeDBInstances" + Resource: !Sub "arn:${AWS::Partition}:rds:*:${AWS::AccountId}:db:*" + - Effect: "Allow" + Action: + - "pi:GetResourceMetrics" + - "ec2:DescribeRegions" + Resource: "*" + Roles: + - Ref: LambdaRole Metadata: cfn_nag: rules_to_suppress: diff --git a/data-collection/deploy/deploy-in-management-account.yaml b/data-collection/deploy/deploy-in-management-account.yaml index 33cf3896..a9bbaabe 100644 --- a/data-collection/deploy/deploy-in-management-account.yaml +++ b/data-collection/deploy/deploy-in-management-account.yaml @@ -18,6 +18,7 @@ Metadata: - IncludeHealthEventsModule - IncludeRightsizingModule - IncludeLicenseManagerModule + - IncludeRdsMultitenantModule - IncludeServiceQuotasModule ParameterLabels: ManagementAccountRole: @@ -38,6 +39,8 @@ Metadata: default: "Include Health Events Module" IncludeLicenseManagerModule: default: "Include Marketplace Licensing Module" + IncludeRdsMultitenantModule: + default: "Include RDS Multi-tenant Module" IncludeServiceQuotasModule: default: "Include Service Quotas Module" Parameters: @@ -82,6 +85,11 @@ Parameters: Description: Collects Marketplace Licensing Information from your accounts AllowedValues: ['yes', 'no'] Default: 'no' + IncludeRdsMultitenantModule: + Type: String + Description: Collects RDS Multi-tenant Performance Insights data from your accounts + AllowedValues: ['yes', 'no'] + Default: 'no' IncludeServiceQuotasModule: Type: String Description: Collects Service Quotas Information from your accounts @@ -95,6 +103,7 @@ Conditions: EnableBackupModule: !Equals [!Ref IncludeBackupModule, "yes"] EnableHealthEventsModule: !Equals [!Ref IncludeHealthEventsModule, "yes"] EnableLicenseManagerModule: !Equals [!Ref IncludeLicenseManagerModule, "yes"] + EnableRdsMultitenantModule: !Equals [!Ref IncludeRdsMultitenantModule, "yes"] EnableServiceQuotasModule: !Equals [!Ref IncludeServiceQuotasModule, "yes"] Outputs: @@ -128,6 +137,7 @@ Resources: - !Sub "arn:${AWS::Partition}:iam::${DataCollectionAccountID}:role/${ResourcePrefix}backup-LambdaRole" - !Sub "arn:${AWS::Partition}:iam::${DataCollectionAccountID}:role/${ResourcePrefix}health-events-LambdaRole" - !Sub "arn:${AWS::Partition}:iam::${DataCollectionAccountID}:role/${ResourcePrefix}license-manager-LambdaRole" + - !Sub "arn:${AWS::Partition}:iam::${DataCollectionAccountID}:role/${ResourcePrefix}rds-multitenant-LambdaRole" - !Sub "arn:${AWS::Partition}:iam::${DataCollectionAccountID}:role/${ResourcePrefix}RLS-LambdaRole" - !Sub "arn:${AWS::Partition}:iam::${DataCollectionAccountID}:role/${ResourcePrefix}service-quotas-LambdaRole" Path: / @@ -339,6 +349,29 @@ Resources: rules_to_suppress: - id: W12 reason: "Policy is used for scanning of a wide range of resources" + RdsMultitenantPolicy: + Type: "AWS::IAM::Policy" + Condition: EnableRdsMultitenantModule + Properties: + PolicyName: RdsMultitenantPolicy + PolicyDocument: + Version: "2012-10-17" + Statement: + - Effect: "Allow" + Action: + - "pi:GetResourceMetrics" + - "pi:DescribeDimensionKeys" + - "pi:GetDimensionKeyDetails" + - "rds:DescribeDBInstances" + - "rds:DescribeDBClusters" + Resource: "*" + Roles: + - Ref: LambdaRole + Metadata: + cfn_nag: + rules_to_suppress: + - id: W12 + reason: "Policy is used for scanning of a wide range of resources" ServiceQuotasPolicy: Type: "AWS::IAM::Policy" Condition: EnableServiceQuotasModule diff --git a/data-collection/deploy/module-rds-multitenant.yaml b/data-collection/deploy/module-rds-multitenant.yaml new file mode 100644 index 00000000..e1d03262 --- /dev/null +++ b/data-collection/deploy/module-rds-multitenant.yaml @@ -0,0 +1,788 @@ +AWSTemplateFormatVersion: '2010-09-09' +Description: | + RDS Multi-Tenant Cost Visibility Module + Collects CloudWatch Database Insights metrics to enable cost allocation for multi-tenant RDS instances. + Supports both user-level and database-level metric collection across all RDS engines. + +Parameters: + DatabaseName: + Type: String + Description: Name of the Athena database to be created to hold lambda information + AllowedPattern: ([a-z0-9_]*?$) + Default: optimization_data + + DataBucketsKmsKeysArns: + Type: String + Description: "ARNs of KMS Keys for data buckets and/or Glue Catalog. Comma separated list, no spaces. Keep empty if data Buckets and Glue Catalog are not Encrypted with KMS. You can also set it to '*' to grant decrypt permission for all the keys." + Default: "" + + DestinationBucket: + Type: String + Description: S3 bucket for storing data + + DestinationBucketARN: + Type: String + Description: ARN of the S3 bucket for storing data + + Schedule: + Type: String + Description: Schedule expression for the Lambda function + Default: rate(1 hour) + + GlueRoleARN: + Type: String + Description: ARN of the Glue role + + ResourcePrefix: + Type: String + Description: Prefix for resource names + + LambdaAnalyticsARN: + Type: String + Description: ARN of the Lambda Analytics function + + CodeBucket: + Type: String + Description: S3 bucket containing the code + + StepFunctionTemplate: + Type: String + Description: Path to the Step Function template + + StepFunctionExecutionRoleARN: + Type: String + Description: ARN of the Step Function execution role + + SchedulerExecutionRoleARN: + Type: String + Description: ARN of the Scheduler execution role + + RegionsInScope: + Type: String + Description: Comma-delimited list of regions to collect data from + Default: "" + + MultiAccountRoleName: + Type: String + Description: Name of the IAM role deployed in all accounts which can retrieve AWS Data. + + AccountCollectorLambdaARN: + Type: String + Description: Arn of the Account Collector Lambda + +Resources: + # IAM Role for the Lambda function to collect CloudWatch Database Insights metrics + RDSMetricsLambdaRole: + Type: AWS::IAM::Role + Properties: + RoleName: !Sub "${ResourcePrefix}RDSMultitenant-Lambda-Role" + AssumeRolePolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: Allow + Principal: + Service: lambda.amazonaws.com + Action: sts:AssumeRole + ManagedPolicyArns: + - !Sub "arn:${AWS::Partition}:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole" + Policies: + - PolicyName: "AssumeMultiAccountRole" + PolicyDocument: + Version: "2012-10-17" + Statement: + - Effect: "Allow" + Action: "sts:AssumeRole" + Resource: !Sub "arn:${AWS::Partition}:iam::*:role/${MultiAccountRoleName}" + - PolicyName: RDSMetricsAccess + PolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: Allow + Action: + - rds:DescribeDBInstances + Resource: '*' + - Effect: Allow + Action: + - pi:GetResourceMetrics + Resource: '*' + - Effect: Allow + Action: + - ec2:DescribeRegions + Resource: '*' + - Effect: Allow + Action: + - s3:PutObject + Resource: !Sub "${DestinationBucketARN}/*" + + # Lambda function that collects CloudWatch Database Insights metrics for multi-tenant cost allocation + RDSPerformanceInsightsFnHourly: + Type: AWS::Lambda::Function + Properties: + FunctionName: !Sub "${ResourcePrefix}rds-multitenant-Lambda" + Handler: index.lambda_handler + Runtime: python3.13 + Timeout: 300 + MemorySize: 1024 + Role: !GetAtt RDSMetricsLambdaRole.Arn + Code: + ZipFile: | + from io import BytesIO + from datetime import datetime + import boto3 + from datetime import datetime, timedelta + import json + import os + import subprocess + import sys + from collections import defaultdict + + + def install_packages(): + subprocess.check_call([sys.executable, "-m", "pip", + "install", "--target", "/tmp", "pyarrow"]) + sys.path.append('/tmp') + + + install_packages() + + import pyarrow as pa + import pyarrow.parquet as pq + + + metrics_period_in_seconds = int(os.environ['METRICS_PERIOD_IN_SECONDS']) + metrics_s3_prefix = os.environ['METRICS_S3_PREFIX'] + # the number of hours to get performance insights data. + # default 1 hour, can be used to initially load past data + hour_delta = int(os.environ.get("DELTA_HOUR") or 1) + + + def lambda_handler(event, context): + """ + Main Lambda handler for collecting CloudWatch Database Insights metrics from RDS instances. + Enables multi-tenant cost allocation by collecting db.load metrics by user and database dimensions. + """ + if 'account' not in event: + raise ValueError( + "Please do not trigger this Lambda manually." + "Find the corresponding state machine in Step Functions and Trigger from there." + ) + + try: + account = json.loads(event["account"]) + account_id = account["account_id"] + account_name = account["account_name"] + payer_id = account["payer_id"] + + print(f"Collecting CloudWatch Database Insights data for account: {account_id}") + + # Get regions in scope from environment variable + regions_in_scope = os.environ.get('REGIONS_IN_SCOPE', '') + regions = [region.strip() for region in regions_in_scope.split(',') if region.strip()] + + # Create S3 client + s3_client = boto3.client('s3') + + # Dictionary to accumulate metrics across all instances in a region + region_metrics = {} + + # Iterate through all regions + for region in regions: + try: + # Create region-specific RDS and Performance Insights clients using cross-account role + rds_client = assume_role(account_id, 'rds', region) + pi_client = assume_role(account_id, 'pi', region) + + # Get RDS instances in this region + rds_instances = get_rds_instances(rds_client) + + # Initialize metrics for this region if not already present + if region not in region_metrics: + region_metrics[region] = [] + + # Process each RDS instance + for instance in rds_instances: + instance_id = instance['DbiResourceId'] + instance_arn = instance['DBInstanceArn'] + engine = instance.get('Engine', '') + print(f"Processing metrics for instance {instance_arn} (engine: {engine}) in region {region}") + + # Check if Performance Insights is enabled + if instance.get('PerformanceInsightsEnabled', False): + # Get Performance Insights metrics + metrics = get_performance_metrics(pi_client, instance_id, engine) + + # Collect metrics for this instance + region_metrics[region].extend( + process_metrics(instance_id, instance_arn, metrics, region, engine) + ) + else: + print(f"Performance Insights not enabled for instance {instance_id} in {region}") + + except Exception as e: + print(f"Error processing region {region}: {str(e)}") + + # Write accumulated metrics to S3 + write_metrics_to_s3(s3_client, region_metrics, account_id, payer_id) + + except Exception as e: + print(f"Error processing account {account_id}: {str(e)}") + raise + + return { + 'statusCode': 200, + 'body': json.dumps(f'Processed RDS instances across {len(regions)} regions') + } + + + def get_rds_instances(rds_client): + instances = [] + paginator = rds_client.get_paginator('describe_db_instances') + + for page in paginator.paginate(): + instances.extend(page['DBInstances']) + + return instances + + + def should_collect_database_metrics(engine): + """ + Determines if database dimension metrics are supported for the given engine. + Oracle and SQL Server engines don't support db.name dimension in CloudWatch Database Insights. + https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/USER_PerfInsights.UsingDashboard.Components.html#USER_PerfInsights.UsingDashboard.Components.AvgActiveSessions + """ + excluded_engines = ['oracle-ee', 'oracle-se2', 'oracle-se1', 'oracle-se', + 'sqlserver-ee', 'sqlserver-se', 'sqlserver-ex', 'sqlserver-web'] + return engine.lower() not in excluded_engines + + def get_performance_metrics(pi_client, instance_id, engine): + """ + Retrieves CloudWatch Database Insights metrics for the specified RDS instance. + Collects CPU metrics and db.load metrics grouped by user and database dimensions. + """ + current_time = datetime.utcnow() + end_time = current_time.replace(minute=0, second=0, microsecond=0) # Round to top of hour + start_time = end_time - timedelta(hours=hour_delta) + + # Base metrics: CPU utilization (no dimensions) + metric_queries = [ + { + 'Metric': 'os.general.numVCPUs.avg' # Used for cost allocation calculations + }, + { + 'Metric': 'db.load.avg', # Database load by user (tenant) + 'GroupBy': { + 'Group': 'db.user', + 'Dimensions': ['db.user.name'] + } + } + ] + + # Add database dimension if supported by engine + # Oracle and SQL Server don't support database-level grouping + if should_collect_database_metrics(engine): + metric_queries.append({ + 'Metric': 'db.load.avg', # Database load by database name + 'GroupBy': { + 'Group': 'db', + 'Dimensions': ['db.name'] + } + }) + + response = pi_client.get_resource_metrics( + ServiceType='RDS', + Identifier=instance_id, + MetricQueries=metric_queries, + StartTime=start_time, + EndTime=end_time, + PeriodInSeconds=metrics_period_in_seconds + ) + + return response['MetricList'] + + + def process_metrics(instance_id, instance_arn, metrics, region, engine): + """ + Processes CloudWatch Database Insights metrics and flattens them for storage. + Creates records for each metric data point with proper dimension mapping. + """ + all_flattened_metrics = [] + num_cpus = '' + + print(f"Processing metrics for instance {instance_id} in region {region}") + print(f"Total metrics received: {len(metrics)}") + + for metric in metrics: + print(f"Processing metric: {metric['Key']['Metric']}") + + # Extract CPU count for cost allocation calculations + if metric["Key"]["Metric"] == 'os.general.numVCPUs.avg': + for datapoint in metric["DataPoints"]: + num_cpus = datapoint.get("Value", 0) + if num_cpus != '': + break + + # Process metrics with dimensions (user or database level) + if "Dimensions" in metric["Key"]: + dimensions = metric["Key"]["Dimensions"] + + # Determine dimension type for multi-tenant cost allocation + if 'db.user.name' in dimensions: + dimension_type = 'user' # Cost allocation by database user + elif 'db.name' in dimensions: + dimension_type = 'database' # Cost allocation by database name + else: + dimension_type = 'unknown' + + # Create base record with all metadata + base_entry = { + "metric": metric["Key"]["Metric"], + "resourcearn": instance_arn, + "instance_id": instance_id, + "engine": engine, # Database engine type + "num_vcpus": num_cpus, # For cost calculations + "dimension_type": dimension_type, # user|database|unknown + "db_user_name": dimensions.get('db.user.name', None), # Tenant user + "db_database_name": dimensions.get('db.name', None) # Database name + } + + for datapoint in metric["DataPoints"]: + flattened_entry = base_entry.copy() + flattened_entry.update({ + "timestamp": datapoint["Timestamp"].strftime("%Y-%m-%d %H:%M:%S%z"), + "value": datapoint["Value"] + }) + + all_flattened_metrics.append(flattened_entry) + + print(f"Total metrics processed for instance {instance_id}: {len(all_flattened_metrics)}") + return all_flattened_metrics + + + def assume_role(account_id, service, region): + partition = boto3.session.Session().get_partition_for_region(region_name=region) + role_name = os.environ['ROLENAME'] + assumed = boto3.client('sts', region_name=region).assume_role( + RoleArn=f"arn:{partition}:iam::{account_id}:role/{role_name}", + RoleSessionName='rds_multitenant_data_collection' + ) + creds = assumed['Credentials'] + return boto3.client(service, region_name=region, + aws_access_key_id=creds['AccessKeyId'], + aws_secret_access_key=creds['SecretAccessKey'], + aws_session_token=creds['SessionToken'] + ) + + def write_metrics_to_s3(s3_client, region_metrics, account_id, payer_id): + """ + Writes collected metrics to S3 in Parquet format with proper partitioning. + Groups metrics by timestamp and stores them in hourly partitions for efficient querying. + """ + # Process metrics for each region + for region, metrics in region_metrics.items(): + if not metrics: + print(f"No metrics to process for region {region}") + continue + + # Group metrics by hourly timestamps for efficient storage + timestamp_grouped_metrics = {} + for metric in metrics: + timestamp = datetime.strptime(metric['timestamp'], "%Y-%m-%d %H:%M:%S%z") + year = timestamp.strftime('%Y') + month = timestamp.strftime('%m') + day = timestamp.strftime('%d') + hour = timestamp.strftime('%H') + + # Create partition key for S3 organization + timestamp_key = f"{year}/{month}/{day}/{hour}" + + if timestamp_key not in timestamp_grouped_metrics: + timestamp_grouped_metrics[timestamp_key] = [] + + timestamp_grouped_metrics[timestamp_key].append(metric) + + # Write each hourly batch to S3 as separate Parquet files + for timestamp_key, grouped_metrics in timestamp_grouped_metrics.items(): + year, month, day, hour = timestamp_key.split('/') + + # S3 key with Hive-style partitioning for Athena compatibility + s3_key = f"{metrics_s3_prefix}/payer_id={payer_id}/account_id={account_id}/region={region}/year={year}/month={month}/day={day}/hour={hour}/metrics.parquet" + + print(f"Writing metrics to S3 key: {s3_key}") + print(f"Total number of metrics: {len(grouped_metrics)}") + + # Convert to Apache Arrow table for efficient Parquet storage + table = pa.Table.from_pylist(grouped_metrics) + + # Serialize to Parquet format in memory + buf = BytesIO() + pq.write_table(table, buf) + buf.seek(0) + + # Upload to S3 bucket + s3_client.put_object( + Bucket=os.environ['METRICS_BUCKET'], + Key=s3_key, + Body=buf.getvalue() + ) + print(f"Successfully wrote metrics to {s3_key}") + Environment: + Variables: + METRICS_BUCKET: !Ref DestinationBucket + METRICS_PERIOD_IN_SECONDS: '3600' + METRICS_S3_PREFIX: 'rds-multitenant' + ROLENAME: !Ref MultiAccountRoleName + REGIONS_IN_SCOPE: !Ref RegionsInScope + + # EventBridge Scheduler to run the data collection hourly + ModuleRefreshSchedule: + Type: 'AWS::Scheduler::Schedule' + Properties: + Description: !Sub 'Scheduler for the ODC RDS Multitenant module' + Name: !Sub '${ResourcePrefix}RDSMultitenant-RefreshSchedule' + ScheduleExpression: !Ref Schedule + State: ENABLED + FlexibleTimeWindow: + MaximumWindowInMinutes: 30 + Mode: 'FLEXIBLE' + Target: + Arn: !GetAtt ModuleStepFunction.Arn + RoleArn: !Ref SchedulerExecutionRoleARN + Input: !Sub '{"module_lambda":"${RDSPerformanceInsightsFnHourly.Arn}","crawlers": ["${ResourcePrefix}PerformanceInsightsRDSCrawler", "${ResourcePrefix}PerformanceInsightsRDSCrawlerHourly"]}' + + # Glue Database to store CloudWatch Database Insights table metadata + GlueDatabase: + Type: AWS::Glue::Database + Properties: + CatalogId: !Ref AWS::AccountId + DatabaseInput: + Name: rds_performance_insights_db + Description: Database for CloudWatch Database Insights multi-tenant cost allocation data + + # Glue Table for RDS multitenant data + RDSMultitenantTable: + Type: AWS::Glue::Table + Properties: + CatalogId: !Ref AWS::AccountId + DatabaseName: !Ref GlueDatabase + TableInput: + Name: hourly_rds_multitenant + Description: Table for RDS Performance Insights multi-tenant cost allocation data + TableType: EXTERNAL_TABLE + Parameters: + classification: parquet + PartitionKeys: + - Name: payer_id + Type: string + - Name: account_id + Type: string + - Name: region + Type: string + - Name: year + Type: string + - Name: month + Type: string + - Name: day + Type: string + - Name: hour + Type: string + StorageDescriptor: + Columns: + - Name: metric + Type: string + - Name: resourcearn + Type: string + - Name: instance_id + Type: string + - Name: engine + Type: string + - Name: num_vcpus + Type: double + - Name: dimension_type + Type: string + - Name: db_user_name + Type: string + - Name: db_database_name + Type: string + - Name: timestamp + Type: string + - Name: value + Type: double + - Name: "db.user.name" + Type: string + - Name: "db.database.name" + Type: int + Location: !Sub "s3://${DestinationBucket}/rds-multitenant/" + InputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat + OutputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat + SerdeInfo: + SerializationLibrary: org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe + + + + GlueServiceRole: + Type: AWS::IAM::Role + Properties: + RoleName: !Sub "${ResourcePrefix}RDSMultitenant-Glue-Role" + AssumeRolePolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: Allow + Principal: + Service: glue.amazonaws.com + Action: sts:AssumeRole + ManagedPolicyArns: + - !Sub "arn:${AWS::Partition}:iam::aws:policy/service-role/AWSGlueServiceRole" + Policies: + - PolicyName: S3Access + PolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: Allow + Action: + - s3:GetObject + - s3:PutObject + Resource: !Sub "${DestinationBucketARN}/*" + + # Glue Crawler to automatically discover and catalog the Parquet data schema + PerformanceInsightsRDSCrawler: + Type: AWS::Glue::Crawler + Properties: + Name: !Sub "${ResourcePrefix}PerformanceInsightsRDSCrawler" + Role: !GetAtt GlueServiceRole.Arn + DatabaseName: !Ref GlueDatabase + Targets: + S3Targets: + - Path: !Sub "s3://${DestinationBucket}/rds-multitenant/" + Schedule: + ScheduleExpression: cron(50 */12 * * ? *) + SchemaChangePolicy: + UpdateBehavior: UPDATE_IN_DATABASE + DeleteBehavior: LOG + + PerformanceInsightsRDSCrawlerHourly: + Type: AWS::Glue::Crawler + Properties: + Name: !Sub "${ResourcePrefix}PerformanceInsightsRDSCrawlerHourly" + Role: !GetAtt GlueServiceRole.Arn + DatabaseName: !Ref GlueDatabase + TablePrefix: "hourly_" + Targets: + S3Targets: + - Path: !Sub "s3://${DestinationBucket}/rds-multitenant/" + Schedule: + ScheduleExpression: cron(50 */12 * * ? *) + SchemaChangePolicy: + UpdateBehavior: UPDATE_IN_DATABASE + DeleteBehavior: LOG + + + + AthenaViewsLambdaRole: + Type: AWS::IAM::Role + Properties: + RoleName: !Sub "${ResourcePrefix}RDSMultitenant-Athena-Role" + AssumeRolePolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: Allow + Principal: + Service: lambda.amazonaws.com + Action: sts:AssumeRole + ManagedPolicyArns: + - !Sub "arn:${AWS::Partition}:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole" + Policies: + - PolicyName: AthenaAccess + PolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: Allow + Action: + - athena:StartQueryExecution + - athena:GetQueryExecution + Resource: '*' + - Effect: Allow + Action: + - glue:GetDatabase + - glue:GetDatabases + - glue:GetTable + - glue:CreateTable + - glue:UpdateTable + Resource: + - !Sub "arn:${AWS::Partition}:glue:${AWS::Region}:${AWS::AccountId}:catalog" + - !Sub "arn:${AWS::Partition}:glue:${AWS::Region}:${AWS::AccountId}:database/${GlueDatabase}" + - !Sub "arn:${AWS::Partition}:glue:${AWS::Region}:${AWS::AccountId}:table/${GlueDatabase}/*" + - Effect: Allow + Action: + - s3:GetBucketLocation + - s3:GetObject + - s3:PutObject + Resource: + - !Sub "${DestinationBucketARN}" + - !Sub "${DestinationBucketARN}/*" + + # Lambda function to create Athena views for cost allocation analysis + CreateAthenaViewsLambda: + Type: AWS::Lambda::Function + Properties: + FunctionName: !Sub "${ResourcePrefix}RDSMultitenant-Views" + Handler: index.handler + Runtime: python3.13 + Timeout: 300 + MemorySize: 256 + Role: !GetAtt AthenaViewsLambdaRole.Arn + Code: + ZipFile: | + import boto3 + import json + import urllib3 + import time + + def handler(event, context): + print(f"ResponseURL: {event.get('ResponseURL', 'Not found')}") + print(f"Event: {json.dumps(event)}") + response_status = 'SUCCESS' + response_data = {} + + try: + print(f"Request type: {event['RequestType']}") + if event['RequestType'] in ['Create', 'Update']: + print("Starting Athena view creation") + athena = boto3.client('athena') + + # Check if table exists first + glue = boto3.client('glue') + try: + glue.get_table( + DatabaseName=event['ResourceProperties']['GlueDatabase'], + Name='hourly_rds_multitenant' + ) + table_exists = True + except glue.exceptions.EntityNotFoundException: + table_exists = False + print("Table hourly_rds_multitenant does not exist yet, skipping view creation") + + if table_exists: + view_sql = f""" + create or replace view "AwsDataCatalog"."{event['ResourceProperties']['GlueDatabase']}"."pi_data_view" as + WITH aggregate_load_data AS ( + SELECT + timestamp, + resourcearn, + dimension_type, + AVG(num_vcpus) AS num_vcpus, + SUM(value) AS total_db_load, + greatest(AVG(num_vcpus), SUM(value)) total_compute_power, + count(1) AS num_entities + FROM "AwsDataCatalog"."{event['ResourceProperties']['GlueDatabase']}"."hourly_rds_multitenant" + GROUP BY 1, 2, 3 + ) + SELECT + b.timestamp, + b.account_id, + b.resourcearn, + b.engine, + b.num_vcpus, + b.dimension_type, + CASE WHEN b.dimension_type = 'user' THEN b.db_user_name END as user_name, + CASE WHEN b.dimension_type = 'database' THEN b.db_database_name END as database_name, + b.value db_load, + a.total_db_load, + a.total_compute_power, + a.num_entities, + case when a.total_db_load = 0 then 0 else b.value / a.total_db_load end AS perc_utilization, + (b.value / a.total_compute_power) perc_utilization_rebased + FROM "AwsDataCatalog"."{event['ResourceProperties']['GlueDatabase']}"."hourly_rds_multitenant" b + JOIN aggregate_load_data a + ON a.timestamp = b.timestamp AND a.resourcearn = b.resourcearn AND a.dimension_type = b.dimension_type + """ + + response = athena.start_query_execution( + QueryString=view_sql, + QueryExecutionContext={'Database': event['ResourceProperties']['GlueDatabase']}, + ResultConfiguration={'OutputLocation': event['ResourceProperties']['S3OutputLocation']} + ) + + # Wait for completion + query_id = response['QueryExecutionId'] + while True: + result = athena.get_query_execution(QueryExecutionId=query_id) + status = result['QueryExecution']['Status']['State'] + if status in ['SUCCEEDED', 'FAILED', 'CANCELLED']: + break + time.sleep(2) + + if status != 'SUCCEEDED': + raise Exception(f"Query failed: {result['QueryExecution']['Status'].get('StateChangeReason', 'Unknown error')}") + + response_data['Message'] = 'Views created successfully' + else: + response_data['Message'] = 'Table does not exist yet, view creation skipped' + + elif event['RequestType'] == 'Delete': + response_data['Message'] = 'Views deleted successfully' + except Exception as e: + print(f"ERROR: {str(e)}") + print(f"ERROR TYPE: {type(e)}") + import traceback + print(f"TRACEBACK: {traceback.format_exc()}") + response_status = 'FAILED' + response_data['Message'] = str(e) + + send_response(event, context, response_status, response_data) + + def send_response(event, context, response_status, response_data): + response_body = { + 'Status': response_status, + 'Reason': f'See CloudWatch Log Stream: {context.log_stream_name}', + 'PhysicalResourceId': context.log_stream_name, + 'StackId': event['StackId'], + 'RequestId': event['RequestId'], + 'LogicalResourceId': event['LogicalResourceId'], + 'Data': response_data + } + + http = urllib3.PoolManager() + response = http.request('PUT', event['ResponseURL'], + body=json.dumps(response_body), + headers={'Content-Type': 'application/json'}) + print(f'Response status: {response.status}') + + CreateAthenaViewsCustomResource: + Type: Custom::CreateAthenaViews + Properties: + ServiceToken: !GetAtt CreateAthenaViewsLambda.Arn + GlueDatabase: !Ref GlueDatabase + S3OutputLocation: !Sub 's3://${DestinationBucket}/athena_output/' + + # Step Function to orchestrate the data collection workflow across accounts + ModuleStepFunction: + Type: AWS::StepFunctions::StateMachine + Properties: + StateMachineName: !Sub '${ResourcePrefix}RDSMultitenant-StateMachine' + StateMachineType: STANDARD + RoleArn: !Ref StepFunctionExecutionRoleARN + DefinitionS3Location: + Bucket: !Ref CodeBucket + Key: !Ref StepFunctionTemplate + DefinitionSubstitutions: + AccountCollectorLambdaARN: !Ref AccountCollectorLambdaARN + ModuleLambdaARN: !GetAtt RDSPerformanceInsightsFnHourly.Arn + Crawlers: !Sub '["${ResourcePrefix}PerformanceInsightsRDSCrawler", "${ResourcePrefix}PerformanceInsightsRDSCrawlerHourly"]' + CollectionType: "LINKED" + Params: '' + Module: rds-multitenant + DeployRegion: !Ref AWS::Region + Account: !Ref AWS::AccountId + Prefix: !Ref ResourcePrefix + Bucket: !Ref DestinationBucket + +Outputs: + StepFunctionARN: + Description: ARN for the module's Step Function + Value: !GetAtt ModuleStepFunction.Arn + + + + GlueDatabaseName: + Description: Glue database for RDS Performance Insights data + Value: !Ref GlueDatabase \ No newline at end of file diff --git a/data-collection/deploy/source/rds-multitenant/README.md b/data-collection/deploy/source/rds-multitenant/README.md new file mode 100644 index 00000000..e7bf3ebd --- /dev/null +++ b/data-collection/deploy/source/rds-multitenant/README.md @@ -0,0 +1,81 @@ +# RDS Multi-Tenant Cost Visibility Module + +## Overview + +This module helps improve cost visibility for Amazon RDS multi-tenant instances by leveraging Performance Insights and Amazon Athena. It enables organizations to accurately allocate costs to different tenants and optimize database performance. + +## Features + +- Collects Amazon RDS Performance Insights data for multi-tenant databases +- Creates Athena views to analyze tenant-specific resource usage +- Integrates with Cost and Usage Report (CUR 2.0) data for cost allocation +- Provides visibility into database resource consumption by tenant + +## Prerequisites + +- [CUR 2.0](https://docs.aws.amazon.com/cur/latest/userguide/data-exports-migrate-two.html) enabled and mapped to a Glue table +- Amazon RDS instances with Performance Insights enabled +- Proper IAM permissions to access RDS Performance Insights API + +## How It Works + +1. A Lambda function collects Performance Insights metrics from all RDS instances across regions +2. The metrics are stored in an S3 bucket with proper partitioning +3. Glue crawlers create and maintain tables in the Glue Data Catalog +4. Athena views are created to join Performance Insights data with CUR data +5. The final view provides tenant-specific cost allocation based on database usage + +## Athena Views + +The module creates the following Athena views: + +1. **rds_pi_consolidated**: Consolidates Performance Insights data +2. **rds_pi_with_cur**: Joins Performance Insights data with CUR data +3. **rds_tenant_cost_allocation**: Provides tenant-specific cost allocation + +## Usage + +After deployment, you can use the Athena views to: +- Analyze database resource usage by tenant +- Allocate costs based on actual resource consumption +- Identify optimization opportunities for multi-tenant databases + +## Example Queries + +### Get tenant usage by database instance +```sql +SELECT + instance_id, + tenant_id, + usage_date, + SUM(total_db_load) as total_load, + SUM(allocated_cost) as total_cost +FROM + rds_performance_insights_db.rds_tenant_cost_allocation +WHERE + usage_date BETWEEN DATE '2023-01-01' AND DATE '2023-01-31' +GROUP BY + instance_id, tenant_id, usage_date +ORDER BY + instance_id, total_cost DESC; +``` + +### Get daily cost allocation for a specific tenant +```sql +SELECT + usage_date, + SUM(allocated_cost) as daily_cost +FROM + rds_performance_insights_db.rds_tenant_cost_allocation +WHERE + tenant_id = 'app_user' + AND usage_date BETWEEN DATE '2023-01-01' AND DATE '2023-01-31' +GROUP BY + usage_date +ORDER BY + usage_date; +``` + +## Based On + +This module is based on the AWS Database Blog post: [Improve cost visibility of an Amazon RDS multi-tenant instance with Performance Insights and Amazon Athena](https://aws.amazon.com/blogs/database/improve-cost-visibility-of-an-amazon-rds-multi-tenant-instance-with-performance-insights-and-amazon-athena/) \ No newline at end of file diff --git a/data-collection/deploy/source/rds-multitenant/rds_multitenant_handler.py b/data-collection/deploy/source/rds-multitenant/rds_multitenant_handler.py new file mode 100644 index 00000000..91bf6177 --- /dev/null +++ b/data-collection/deploy/source/rds-multitenant/rds_multitenant_handler.py @@ -0,0 +1,196 @@ +import boto3 +import json +import os +import datetime +import logging +import pandas as pd +import io +import pyarrow as pa +import pyarrow.parquet as pq + +# Configure logging +logger = logging.getLogger() +logger.setLevel(logging.INFO) + +def lambda_handler(event, context): + """ + Lambda function to collect RDS Performance Insights metrics and store them in S3 + """ + logger.info("Starting RDS Performance Insights data collection") + + # Get environment variables + metrics_bucket = os.environ.get('METRICS_BUCKET') + metrics_period = int(os.environ.get('METRICS_PERIOD_IN_SECONDS', '3600')) + metrics_s3_prefix = os.environ.get('METRICS_S3_PREFIX', 'rds_pi_data') + + if not metrics_bucket: + raise ValueError("METRICS_BUCKET environment variable is required") + + # Get current timestamp + now = datetime.datetime.utcnow() + end_time = now + start_time = end_time - datetime.timedelta(seconds=metrics_period) + + # Get list of regions + ec2_client = boto3.client('ec2') + regions = [region['RegionName'] for region in ec2_client.describe_regions()['Regions']] + + # Process each region + for region in regions: + try: + process_region(region, metrics_bucket, metrics_s3_prefix, start_time, end_time, metrics_period) + except Exception as e: + logger.error(f"Error processing region {region}: {str(e)}") + + return { + 'statusCode': 200, + 'body': json.dumps('RDS Performance Insights data collection completed') + } + +def process_region(region, metrics_bucket, metrics_s3_prefix, start_time, end_time, metrics_period): + """Process a single AWS region""" + logger.info(f"Processing region: {region}") + + # Create RDS client for the region + rds_client = boto3.client('rds', region_name=region) + + # Get list of DB instances in the region + try: + response = rds_client.describe_db_instances() + db_instances = response['DBInstances'] + + # Process each DB instance + for db_instance in db_instances: + try: + process_db_instance(db_instance, region, metrics_bucket, metrics_s3_prefix, start_time, end_time, metrics_period) + except Exception as e: + logger.error(f"Error processing DB instance {db_instance['DBInstanceIdentifier']}: {str(e)}") + + # Handle pagination + while 'Marker' in response: + response = rds_client.describe_db_instances(Marker=response['Marker']) + db_instances = response['DBInstances'] + + for db_instance in db_instances: + try: + process_db_instance(db_instance, region, metrics_bucket, metrics_s3_prefix, start_time, end_time, metrics_period) + except Exception as e: + logger.error(f"Error processing DB instance {db_instance['DBInstanceIdentifier']}: {str(e)}") + + except Exception as e: + logger.error(f"Error listing DB instances in region {region}: {str(e)}") + +def process_db_instance(db_instance, region, metrics_bucket, metrics_s3_prefix, start_time, end_time, metrics_period): + """Process a single DB instance""" + db_instance_id = db_instance['DBInstanceIdentifier'] + logger.info(f"Processing DB instance: {db_instance_id}") + + # Check if Performance Insights is enabled + if not db_instance.get('PerformanceInsightsEnabled', False): + logger.info(f"Performance Insights not enabled for {db_instance_id}, skipping") + return + + # Get Performance Insights metrics + pi_client = boto3.client('pi', region_name=region) + resource_arn = db_instance['PerformanceInsightsEnabled'] and db_instance['PerformanceInsightsARN'] + + if not resource_arn: + logger.info(f"Performance Insights ARN not available for {db_instance_id}, skipping") + return + + # Get DB CPU utilization by user + try: + metrics_response = pi_client.get_resource_metrics( + ServiceType='RDS', + Identifier=resource_arn, + MetricQueries=[ + { + 'Metric': 'db.load.avg', + 'GroupBy': { + 'Group': 'db.user', + 'Dimensions': ['db.user.name'] + } + } + ], + StartTime=start_time, + EndTime=end_time, + PeriodInSeconds=metrics_period + ) + + # Process and store metrics + process_metrics(metrics_response, db_instance, resource_arn, metrics_bucket, metrics_s3_prefix) + + except Exception as e: + logger.error(f"Error getting Performance Insights metrics for {db_instance_id}: {str(e)}") + +def process_metrics(metrics_response, db_instance, resource_arn, metrics_bucket, metrics_s3_prefix): + """Process and store Performance Insights metrics""" + db_instance_id = db_instance['DBInstanceIdentifier'] + account_id = boto3.client('sts').get_caller_identity().get('Account') + num_vcpus = db_instance.get('DBInstanceClass', '').split('.')[1][0] if 'DBInstanceClass' in db_instance else 0 + + # Extract metrics data + metrics_data = [] + + for metric_result in metrics_response.get('MetricList', []): + metric_name = metric_result.get('Metric') + + for datapoint in metric_result.get('DataPoints', []): + timestamp = datapoint.get('Timestamp') + + # Format timestamp for partitioning + year = timestamp.strftime('%Y') + month = timestamp.strftime('%m') + day = timestamp.strftime('%d') + hour = timestamp.strftime('%H') + + # Process each group + for group in datapoint.get('Group', []): + dimensions = {} + for dimension in group.get('Dimensions', []): + for key, value in dimension.items(): + dimensions[key] = value + + user_name = dimensions.get('db.user.name', 'unknown') + value = group.get('Value', 0) + + # Add to metrics data + metrics_data.append({ + 'metric': metric_name, + 'resourcearn': resource_arn, + 'instance_id': db_instance_id, + 'num_vcpus': float(num_vcpus), + 'db.user.name': user_name, + 'timestamp': timestamp.isoformat(), + 'value': float(value), + 'account_id': account_id, + 'year': year, + 'month': month, + 'day': day, + 'hour': hour + }) + + if not metrics_data: + logger.info(f"No metrics data found for {db_instance_id}") + return + + # Convert to DataFrame and save as Parquet + df = pd.DataFrame(metrics_data) + + # Create S3 key with partitioning + s3_key = f"{metrics_s3_prefix}/account_id={account_id}/year={year}/month={month}/day={day}/hour={hour}/{db_instance_id}_{timestamp.strftime('%Y%m%d%H%M%S')}.parquet" + + # Convert to Parquet and upload to S3 + table = pa.Table.from_pandas(df) + buf = io.BytesIO() + pq.write_table(table, buf) + buf.seek(0) + + s3_client = boto3.client('s3') + s3_client.put_object( + Bucket=metrics_bucket, + Key=s3_key, + Body=buf.getvalue() + ) + + logger.info(f"Uploaded metrics data for {db_instance_id} to s3://{metrics_bucket}/{s3_key}") \ No newline at end of file diff --git a/data-collection/deploy/source/rds-multitenant/rds_multitenant_views.py b/data-collection/deploy/source/rds-multitenant/rds_multitenant_views.py new file mode 100644 index 00000000..777223ee --- /dev/null +++ b/data-collection/deploy/source/rds-multitenant/rds_multitenant_views.py @@ -0,0 +1,268 @@ +import boto3 +import os +import logging +import time + +# Configure logging +logger = logging.getLogger() +logger.setLevel(logging.INFO) + +def handler(event, context): + """ + Lambda function to create Athena views for RDS Performance Insights data + """ + logger.info("Starting Athena views creation") + + # Get environment variables + glue_database_name = os.environ.get('GLUE_DATABASE_NAME') + cur_database_name = os.environ.get('CUR_DATABASE_NAME') + cur_table_name = os.environ.get('CUR_TABLE_NAME') + + if not glue_database_name or not cur_database_name or not cur_table_name: + raise ValueError("GLUE_DATABASE_NAME, CUR_DATABASE_NAME, and CUR_TABLE_NAME environment variables are required") + + # Create Athena client + athena_client = boto3.client('athena') + + # Create workgroup if it doesn't exist + workgroup_name = "RdsMultitenantWorkgroup" + try: + create_workgroup_if_not_exists(athena_client, workgroup_name) + except Exception as e: + logger.error(f"Error creating workgroup: {str(e)}") + raise + + # Create views + try: + # Create view for RDS Performance Insights data + create_pi_view(athena_client, workgroup_name, glue_database_name) + + # Create view for joining Performance Insights data with CUR data + create_pi_cur_view(athena_client, workgroup_name, glue_database_name, cur_database_name, cur_table_name) + + # Create view for tenant cost allocation + create_tenant_cost_view(athena_client, workgroup_name, glue_database_name) + + except Exception as e: + logger.error(f"Error creating views: {str(e)}") + raise + + return { + 'statusCode': 200, + 'body': 'Athena views created successfully' + } + +def create_workgroup_if_not_exists(athena_client, workgroup_name): + """Create Athena workgroup if it doesn't exist""" + try: + athena_client.get_work_group(WorkGroup=workgroup_name) + logger.info(f"Workgroup {workgroup_name} already exists") + except athena_client.exceptions.InvalidRequestException: + logger.info(f"Creating workgroup {workgroup_name}") + athena_client.create_work_group( + Name=workgroup_name, + Configuration={ + 'ResultConfiguration': { + 'EncryptionConfiguration': { + 'EncryptionOption': 'SSE_S3' + } + }, + 'EnforceWorkGroupConfiguration': True, + 'PublishCloudWatchMetricsEnabled': True, + 'BytesScannedCutoffPerQuery': 10000000000, + 'RequesterPaysEnabled': False + }, + Description='Workgroup for RDS Multitenant Performance Insights views' + ) + +def execute_query(athena_client, workgroup_name, database, query): + """Execute Athena query and wait for completion""" + response = athena_client.start_query_execution( + QueryString=query, + QueryExecutionContext={ + 'Database': database + }, + WorkGroup=workgroup_name + ) + + query_execution_id = response['QueryExecutionId'] + + # Wait for query to complete + state = 'RUNNING' + max_retries = 10 + retry_count = 0 + + while state == 'RUNNING' and retry_count < max_retries: + response = athena_client.get_query_execution(QueryExecutionId=query_execution_id) + state = response['QueryExecution']['Status']['State'] + + if state == 'RUNNING': + retry_count += 1 + time.sleep(3) + + if state == 'SUCCEEDED': + logger.info(f"Query executed successfully: {query_execution_id}") + else: + error_message = response['QueryExecution']['Status'].get('StateChangeReason', 'Unknown error') + logger.error(f"Query failed: {error_message}") + raise Exception(f"Query failed: {error_message}") + + return query_execution_id + +def create_pi_view(athena_client, workgroup_name, database): + """Create view for RDS Performance Insights data""" + logger.info("Creating RDS Performance Insights view") + + query = f""" + CREATE OR REPLACE VIEW {database}.rds_pi_consolidated AS + SELECT + account_id, + resourcearn, + instance_id, + num_vcpus, + `db.user.name` as db_user_name, + timestamp, + metric, + value + FROM + {database}.rds_pi_data_hourly + """ + + execute_query(athena_client, workgroup_name, database, query) + +def create_pi_cur_view(athena_client, workgroup_name, pi_database, cur_database, cur_table): + """Create view joining Performance Insights data with CUR data""" + logger.info("Creating RDS Performance Insights with CUR view") + + query = f""" + CREATE OR REPLACE VIEW {pi_database}.rds_pi_with_cur AS + WITH pi_data AS ( + SELECT + account_id, + resourcearn, + instance_id, + num_vcpus, + db_user_name, + timestamp, + metric, + value, + SUBSTR(resourcearn, STRPOS(resourcearn, ':db:') + 4) AS db_identifier + FROM + {pi_database}.rds_pi_consolidated + ), + cur_data AS ( + SELECT + line_item_usage_account_id as account_id, + line_item_resource_id, + line_item_usage_type, + product_database_engine, + line_item_usage_start_date, + line_item_usage_end_date, + line_item_unblended_cost, + CASE + WHEN line_item_resource_id LIKE 'arn:aws:rds:%' THEN SUBSTR(line_item_resource_id, STRPOS(line_item_resource_id, ':db:') + 4) + ELSE NULL + END AS db_identifier + FROM + {cur_database}.{cur_table} + WHERE + product_servicecode = 'AmazonRDS' + AND line_item_line_item_type = 'Usage' + ) + SELECT + pi.account_id, + pi.resourcearn, + pi.instance_id, + pi.num_vcpus, + pi.db_user_name, + pi.timestamp, + pi.metric, + pi.value, + cur.line_item_usage_type, + cur.product_database_engine, + cur.line_item_usage_start_date, + cur.line_item_usage_end_date, + cur.line_item_unblended_cost + FROM + pi_data pi + JOIN + cur_data cur + ON + pi.db_identifier = cur.db_identifier + AND pi.account_id = cur.account_id + AND pi.timestamp BETWEEN cur.line_item_usage_start_date AND cur.line_item_usage_end_date + """ + + execute_query(athena_client, workgroup_name, pi_database, query) + +def create_tenant_cost_view(athena_client, workgroup_name, database): + """Create view for tenant cost allocation""" + logger.info("Creating tenant cost allocation view") + + query = f""" + CREATE OR REPLACE VIEW {database}.rds_tenant_cost_allocation AS + WITH tenant_usage AS ( + SELECT + account_id, + instance_id, + db_user_name as tenant_id, + DATE(timestamp) as usage_date, + SUM(value) as total_db_load, + COUNT(*) as sample_count + FROM + {database}.rds_pi_consolidated + WHERE + metric = 'db.load.avg' + GROUP BY + account_id, instance_id, db_user_name, DATE(timestamp) + ), + instance_usage AS ( + SELECT + account_id, + instance_id, + usage_date, + SUM(total_db_load) as instance_total_load, + MAX(sample_count) as sample_count + FROM + tenant_usage + GROUP BY + account_id, instance_id, usage_date + ), + instance_costs AS ( + SELECT + account_id, + instance_id, + DATE(line_item_usage_start_date) as usage_date, + SUM(line_item_unblended_cost) as daily_cost + FROM + {database}.rds_pi_with_cur + GROUP BY + account_id, instance_id, DATE(line_item_usage_start_date) + ) + SELECT + tu.account_id, + tu.instance_id, + tu.tenant_id, + tu.usage_date, + tu.total_db_load, + iu.instance_total_load, + CASE + WHEN iu.instance_total_load > 0 THEN tu.total_db_load / iu.instance_total_load + ELSE 0 + END as usage_percentage, + ic.daily_cost, + CASE + WHEN iu.instance_total_load > 0 THEN (tu.total_db_load / iu.instance_total_load) * ic.daily_cost + ELSE 0 + END as allocated_cost + FROM + tenant_usage tu + JOIN + instance_usage iu ON tu.account_id = iu.account_id AND tu.instance_id = iu.instance_id AND tu.usage_date = iu.usage_date + LEFT JOIN + instance_costs ic ON tu.account_id = ic.account_id AND tu.instance_id = ic.instance_id AND tu.usage_date = ic.usage_date + ORDER BY + tu.usage_date, tu.instance_id, allocated_cost DESC + """ + + execute_query(athena_client, workgroup_name, database, query) \ No newline at end of file diff --git a/data-collection/deploy/source/rds-multitenant/requirements.txt b/data-collection/deploy/source/rds-multitenant/requirements.txt new file mode 100644 index 00000000..e77bb919 --- /dev/null +++ b/data-collection/deploy/source/rds-multitenant/requirements.txt @@ -0,0 +1,2 @@ +pandas==1.5.3 +pyarrow==11.0.0 \ No newline at end of file