•15 min read

AWS Security Hub: Centralized Security Management at Enterprise Scale

Security HubSecurity PostureComplianceMulti-Account

AWS Security Hub transforms fragmented security tools into a unified security operations center, but many organizations struggle to realize its full potential. The challenge isn't just technical—it's organizational. Security Hub can aggregate findings from dozens of security tools across hundreds of accounts, but without proper configuration and integration, it becomes just another source of alert fatigue.

The promise of Security Hub is compelling: centralized security posture management, automated compliance monitoring, and unified incident response. The reality for many organizations is overwhelming volumes of findings, false positives, and difficulty prioritizing remediation efforts.

This guide explores enterprise-grade Security Hub implementations that provide comprehensive visibility and control across complex AWS environments. We'll cover advanced configuration patterns, custom finding generation, and integration strategies that transform Security Hub from a finding aggregator into a strategic security platform.

Security Hub Architecture for Enterprise

Multi-Account Security Orchestration

import boto3
import json
from datetime import datetime, timedelta

class EnterpriseSecurityHub:
    def __init__(self):
        self.securityhub_client = boto3.client('securityhub')
        self.organizations_client = boto3.client('organizations')
        
    def setup_organization_security_hub(self):
        """Setup Security Hub across AWS Organization"""
        
        # Enable Security Hub in master account
        try:
            self.securityhub_client.enable_security_hub(
                Tags={
                    'Purpose': 'CentralizedSecurityManagement',
                    'ManagedBy': 'SecurityTeam',
                    'Environment': 'Organization'
                },
                EnableDefaultStandards=True
            )
        except self.securityhub_client.exceptions.ResourceConflictException:
            print("Security Hub already enabled in master account")
        
        # Get organization accounts
        accounts = self.organizations_client.list_accounts()['Accounts']
        master_account_id = self.organizations_client.describe_organization()['Organization']['MasterAccountId']
        
        # Enable Security Hub in member accounts
        member_accounts = []
        for account in accounts:
            if account['Status'] == 'ACTIVE' and account['Id'] != master_account_id:
                member_accounts.append({
                    'AccountId': account['Id'],
                    'Email': account['Email']
                })
        
        # Create members and send invitations
        if member_accounts:
            try:
                self.securityhub_client.create_members(
                    AccountDetails=member_accounts
                )
                
                # Send invitations
                account_ids = [account['AccountId'] for account in member_accounts]
                self.securityhub_client.invite_members(
                    AccountIds=account_ids,
                    Message='Join organization Security Hub for centralized security management'
                )
                
            except Exception as e:
                print(f"Error setting up member accounts: {e}")
        
        return member_accounts
    
    def configure_security_standards(self):
        """Configure security standards and compliance frameworks"""
        
        # Available security standards
        standards_to_enable = [
            'arn:aws:securityhub:::ruleset/finding-format/aws-foundational-security-standard',
            'arn:aws:securityhub:::ruleset/finding-format/cis-aws-foundations-benchmark',
            'arn:aws:securityhub:::ruleset/finding-format/pci-dss',
            'arn:aws:securityhub:::ruleset/finding-format/aws-resource-tagging-standard'
        ]
        
        enabled_standards = []
        
        for standard_arn in standards_to_enable:
            try:
                response = self.securityhub_client.batch_enable_standards(
                    StandardsSubscriptionRequests=[
                        {
                            'StandardsArn': standard_arn,
                            'StandardsInput': {}
                        }
                    ]
                )
                enabled_standards.append(response)
                
            except Exception as e:
                print(f"Failed to enable standard {standard_arn}: {e}")
        
        return enabled_standards

Custom Security Standards

def create_custom_security_standards():
    """Create custom security standards for organization-specific requirements"""
    
    # Custom control for encryption requirements
    encryption_control = {
        'Id': 'CustomEncryption.1',
        'Title': 'All data at rest must be encrypted with customer-managed KMS keys',
        'Description': 'Ensures all data storage services use customer-managed KMS keys for encryption',
        'RemediationUrl': 'https://docs.company.com/security/encryption-standards',
        'SeverityRating': 'HIGH',
        'ComplianceStatus': 'PASSED'  # Will be updated based on findings
    }
    
    # Custom control for network security
    network_control = {
        'Id': 'CustomNetwork.1', 
        'Title': 'All resources must be deployed in private subnets with proper security groups',
        'Description': 'Ensures resources follow network security best practices',
        'RemediationUrl': 'https://docs.company.com/security/network-standards',
        'SeverityRating': 'MEDIUM',
        'ComplianceStatus': 'PASSED'
    }
    
    # Custom control for access management
    access_control = {
        'Id': 'CustomAccess.1',
        'Title': 'All IAM roles must have maximum session duration of 1 hour',
        'Description': 'Limits session duration to reduce credential exposure risk',
        'RemediationUrl': 'https://docs.company.com/security/access-standards',
        'SeverityRating': 'MEDIUM',
        'ComplianceStatus': 'PASSED'
    }
    
    custom_standard = {
        'Name': 'CompanySecurityStandard',
        'Description': 'Custom security standard for company-specific requirements',
        'Controls': [encryption_control, network_control, access_control]
    }
    
    return custom_standard

def implement_custom_findings_generator():
    """Generate custom findings for Security Hub"""
    
    custom_findings_code = '''
import boto3
import json
from datetime import datetime
import uuid

class CustomFindingsGenerator:
    def __init__(self):
        self.securityhub_client = boto3.client('securityhub')
        self.account_id = boto3.client('sts').get_caller_identity()['Account']
        self.region = boto3.Session().region_name
        
    def generate_encryption_findings(self):
        """Generate findings for encryption compliance"""
        
        findings = []
        
        # Check S3 buckets for encryption
        s3_client = boto3.client('s3')
        buckets = s3_client.list_buckets()['Buckets']
        
        for bucket in buckets:
            bucket_name = bucket['Name']
            
            try:
                # Check bucket encryption
                encryption = s3_client.get_bucket_encryption(Bucket=bucket_name)
                
                # Check if using customer-managed KMS key
                rules = encryption['ServerSideEncryptionConfiguration']['Rules']
                uses_customer_kms = False
                
                for rule in rules:
                    sse_config = rule['ApplyServerSideEncryptionByDefault']
                    if (sse_config['SSEAlgorithm'] == 'aws:kms' and 
                        'KMSMasterKeyID' in sse_config and
                        not sse_config['KMSMasterKeyID'].startswith('alias/aws/')):
                        uses_customer_kms = True
                        break
                
                if not uses_customer_kms:
                    finding = self.create_finding(
                        finding_id=f"custom-encryption-{bucket_name}",
                        title=f"S3 bucket {bucket_name} not using customer-managed KMS key",
                        description=f"Bucket {bucket_name} is not encrypted with customer-managed KMS key",
                        severity_label='HIGH',
                        resource_type='AwsS3Bucket',
                        resource_id=bucket_name,
                        compliance_status='FAILED',
                        control_id='CustomEncryption.1'
                    )
                    findings.append(finding)
                    
            except s3_client.exceptions.ClientError as e:
                if e.response['Error']['Code'] == 'ServerSideEncryptionConfigurationNotFoundError':
                    # Bucket not encrypted at all
                    finding = self.create_finding(
                        finding_id=f"custom-encryption-{bucket_name}",
                        title=f"S3 bucket {bucket_name} not encrypted",
                        description=f"Bucket {bucket_name} has no server-side encryption configured",
                        severity_label='CRITICAL',
                        resource_type='AwsS3Bucket',
                        resource_id=bucket_name,
                        compliance_status='FAILED',
                        control_id='CustomEncryption.1'
                    )
                    findings.append(finding)
        
        return findings
    
    def generate_network_findings(self):
        """Generate findings for network security compliance"""
        
        findings = []
        ec2_client = boto3.client('ec2')
        
        # Check EC2 instances in public subnets
        instances = ec2_client.describe_instances()
        
        for reservation in instances['Reservations']:
            for instance in reservation['Instances']:
                if instance['State']['Name'] != 'terminated':
                    subnet_id = instance['SubnetId']
                    
                    # Check if subnet is public
                    subnet = ec2_client.describe_subnets(SubnetIds=[subnet_id])['Subnets'][0]
                    
                    # Get route table for subnet
                    route_tables = ec2_client.describe_route_tables(
                        Filters=[
                            {'Name': 'association.subnet-id', 'Values': [subnet_id]}
                        ]
                    )['RouteTables']
                    
                    is_public = False
                    for rt in route_tables:
                        for route in rt['Routes']:
                            if (route.get('GatewayId', '').startswith('igw-') and 
                                route.get('DestinationCidrBlock') == '0.0.0.0/0'):
                                is_public = True
                                break
                    
                    if is_public:
                        finding = self.create_finding(
                            finding_id=f"custom-network-{instance['InstanceId']}",
                            title=f"EC2 instance {instance['InstanceId']} in public subnet",
                            description=f"Instance {instance['InstanceId']} is deployed in public subnet {subnet_id}",
                            severity_label='MEDIUM',
                            resource_type='AwsEc2Instance',
                            resource_id=instance['InstanceId'],
                            compliance_status='FAILED',
                            control_id='CustomNetwork.1'
                        )
                        findings.append(finding)
        
        return findings
    
    def create_finding(self, finding_id, title, description, severity_label, 
                      resource_type, resource_id, compliance_status, control_id):
        """Create a Security Hub finding"""
        
        severity_score = {
            'INFORMATIONAL': 0,
            'LOW': 1,
            'MEDIUM': 40,
            'HIGH': 70,
            'CRITICAL': 90
        }
        
        finding = {
            'SchemaVersion': '2018-10-08',
            'Id': finding_id,
            'ProductArn': f'arn:aws:securityhub:{self.region}:{self.account_id}:product/{self.account_id}/default',
            'GeneratorId': 'custom-security-scanner',
            'AwsAccountId': self.account_id,
            'Types': ['Sensitive Data Identifications/Compliance/Custom'],
            'FirstObservedAt': datetime.now().isoformat() + 'Z',
            'LastObservedAt': datetime.now().isoformat() + 'Z',
            'CreatedAt': datetime.now().isoformat() + 'Z',
            'UpdatedAt': datetime.now().isoformat() + 'Z',
            'Severity': {
                'Label': severity_label,
                'Normalized': severity_score[severity_label]
            },
            'Title': title,
            'Description': description,
            'Resources': [
                {
                    'Type': resource_type,
                    'Id': f'arn:aws:{resource_type.lower().replace("aws", "")}:{self.region}:{self.account_id}:{resource_id}',
                    'Region': self.region,
                    'Partition': 'aws'
                }
            ],
            'Compliance': {
                'Status': compliance_status,
                'RelatedRequirements': [control_id]
            },
            'WorkflowState': 'NEW',
            'RecordState': 'ACTIVE'
        }
        
        return finding
    
    def submit_findings(self, findings):
        """Submit findings to Security Hub"""
        
        if not findings:
            return
        
        # Security Hub accepts max 100 findings per batch
        batch_size = 100
        
        for i in range(0, len(findings), batch_size):
            batch = findings[i:i + batch_size]
            
            try:
                response = self.securityhub_client.batch_import_findings(
                    Findings=batch
                )
                
                if response['FailedCount'] > 0:
                    print(f"Failed to import {response['FailedCount']} findings")
                    for failure in response['FailedFindings']:
                        print(f"Failed finding: {failure}")
                        
            except Exception as e:
                print(f"Error submitting findings batch: {e}")
'''
    
    return custom_findings_code

Advanced Insights and Analytics

Custom Security Insights

def create_custom_insights():
    """Create custom insights for security analysis"""
    
    securityhub_client = boto3.client('securityhub')
    
    # Insight for high-severity findings by resource type
    resource_severity_insight = {
        'Name': 'High-Severity Findings by Resource Type',
        'Filters': {
            'SeverityLabel': [
                {
                    'Value': 'HIGH',
                    'Comparison': 'EQUALS'
                },
                {
                    'Value': 'CRITICAL', 
                    'Comparison': 'EQUALS'
                }
            ],
            'RecordState': [
                {
                    'Value': 'ACTIVE',
                    'Comparison': 'EQUALS'
                }
            ]
        },
        'GroupByAttribute': 'ResourceType'
    }
    
    # Insight for compliance status by standard
    compliance_insight = {
        'Name': 'Compliance Status by Security Standard',
        'Filters': {
            'ComplianceStatus': [
                {
                    'Value': 'FAILED',
                    'Comparison': 'EQUALS'
                }
            ],
            'RecordState': [
                {
                    'Value': 'ACTIVE',
                    'Comparison': 'EQUALS'
                }
            ]
        },
        'GroupByAttribute': 'ComplianceSecurityControlId'
    }
    
    # Insight for findings by AWS account
    account_insight = {
        'Name': 'Security Findings by AWS Account',
        'Filters': {
            'RecordState': [
                {
                    'Value': 'ACTIVE',
                    'Comparison': 'EQUALS'
                }
            ]
        },
        'GroupByAttribute': 'AwsAccountId'
    }
    
    # Insight for trending findings
    trending_insight = {
        'Name': 'Trending Security Issues',
        'Filters': {
            'CreatedAt': [
                {
                    'Start': (datetime.now() - timedelta(days=7)).isoformat() + 'Z',
                    'End': datetime.now().isoformat() + 'Z',
                    'DateRange': {
                        'Value': 7,
                        'Unit': 'DAYS'
                    }
                }
            ],
            'RecordState': [
                {
                    'Value': 'ACTIVE',
                    'Comparison': 'EQUALS'
                }
            ]
        },
        'GroupByAttribute': 'Type'
    }
    
    insights = [resource_severity_insight, compliance_insight, account_insight, trending_insight]
    created_insights = []
    
    for insight_config in insights:
        try:
            response = securityhub_client.create_insight(**insight_config)
            created_insights.append(response)
        except Exception as e:
            print(f"Failed to create insight {insight_config['Name']}: {e}")
    
    return created_insights

def create_security_metrics_dashboard():
    """Create CloudWatch dashboard for Security Hub metrics"""
    
    cloudwatch = boto3.client('cloudwatch')
    
    dashboard_widgets = [
        {
            "type": "metric",
            "properties": {
                "metrics": [
                    ["AWS/SecurityHub", "Findings", "ComplianceType", "PASSED"],
                    [".", ".", ".", "FAILED"],
                    [".", ".", ".", "WARNING"],
                    [".", ".", ".", "NOT_AVAILABLE"]
                ],
                "period": 300,
                "stat": "Sum",
                "region": "us-east-1",
                "title": "Compliance Status Overview"
            }
        },
        {
            "type": "metric",
            "properties": {
                "metrics": [
                    ["AWS/SecurityHub", "Findings", "SeverityLabel", "CRITICAL"],
                    [".", ".", ".", "HIGH"],
                    [".", ".", ".", "MEDIUM"],
                    [".", ".", ".", "LOW"],
                    [".", ".", ".", "INFORMATIONAL"]
                ],
                "period": 300,
                "stat": "Sum",
                "region": "us-east-1",
                "title": "Findings by Severity"
            }
        },
        {
            "type": "log",
            "properties": {
                "query": "SOURCE '/aws/securityhub/findings'\n| fields @timestamp, AwsAccountId, Title, Severity.Label\n| filter Severity.Label = \"CRITICAL\" or Severity.Label = \"HIGH\"\n| stats count() by AwsAccountId\n| sort count desc",
                "region": "us-east-1",
                "title": "High-Severity Findings by Account",
                "view": "table"
            }
        }
    ]
    
    dashboard_body = {
        "widgets": dashboard_widgets
    }
    
    response = cloudwatch.put_dashboard(
        DashboardName='SecurityHub-Overview',
        DashboardBody=json.dumps(dashboard_body)
    )
    
    return response

Automated Remediation Integration

Security Hub Response Automation

def create_automated_remediation_system():
    """Create automated remediation system for Security Hub findings"""
    
    remediation_lambda_code = '''
import json
import boto3
import logging
from datetime import datetime

logger = logging.getLogger()
logger.setLevel(logging.INFO)

def lambda_handler(event, context):
    """Handle Security Hub findings with automated remediation"""
    
    # Parse Security Hub finding
    finding = event['detail']['findings'][0]
    
    finding_type = finding.get('Types', [])
    severity = finding.get('Severity', {}).get('Label', 'MEDIUM')
    resource_type = finding.get('Resources', [{}])[0].get('Type', '')
    
    logger.info(f"Processing finding: {finding_type} with severity {severity}")
    
    try:
        # Route to appropriate remediation handler
        if 'AwsS3Bucket' in resource_type:
            handle_s3_findings(finding)
        elif 'AwsEc2SecurityGroup' in resource_type:
            handle_security_group_findings(finding)
        elif 'AwsIamRole' in resource_type or 'AwsIamUser' in resource_type:
            handle_iam_findings(finding)
        elif 'AwsEc2Instance' in resource_type:
            handle_ec2_findings(finding)
        else:
            handle_generic_finding(finding)
            
        # Update finding workflow status
        update_finding_workflow(finding, 'RESOLVED', 'Automatically remediated')
        
        return {
            'statusCode': 200,
            'body': json.dumps('Finding processed successfully')
        }
        
    except Exception as e:
        logger.error(f"Error processing finding: {str(e)}")
        update_finding_workflow(finding, 'NEW', f'Remediation failed: {str(e)}')
        
        return {
            'statusCode': 500,
            'body': json.dumps(f'Error processing finding: {str(e)}')
        }

def handle_s3_findings(finding):
    """Handle S3-related security findings"""
    
    resource_id = finding['Resources'][0]['Id']
    bucket_name = resource_id.split(':')[-1]
    
    s3_client = boto3.client('s3')
    
    # Check finding type and apply appropriate remediation
    finding_types = finding.get('Types', [])
    
    if any('Public' in ftype for ftype in finding_types):
        # Block public access
        s3_client.put_public_access_block(
            Bucket=bucket_name,
            PublicAccessBlockConfiguration={
                'BlockPublicAcls': True,
                'IgnorePublicAcls': True,
                'BlockPublicPolicy': True,
                'RestrictPublicBuckets': True
            }
        )
        logger.info(f"Blocked public access for bucket {bucket_name}")
    
    if any('Encryption' in ftype for ftype in finding_types):
        # Enable default encryption
        s3_client.put_bucket_encryption(
            Bucket=bucket_name,
            ServerSideEncryptionConfiguration={
                'Rules': [
                    {
                        'ApplyServerSideEncryptionByDefault': {
                            'SSEAlgorithm': 'aws:kms',
                            'KMSMasterKeyID': 'alias/aws/s3'
                        }
                    }
                ]
            }
        )
        logger.info(f"Enabled encryption for bucket {bucket_name}")

def handle_security_group_findings(finding):
    """Handle security group findings"""
    
    resource_id = finding['Resources'][0]['Id']
    sg_id = resource_id.split('/')[-1]
    
    ec2_client = boto3.client('ec2')
    
    # Get security group details
    sg_response = ec2_client.describe_security_groups(GroupIds=[sg_id])
    security_group = sg_response['SecurityGroups'][0]
    
    # Remove overly permissive rules
    problematic_rules = []
    
    for rule in security_group['IpPermissions']:
        for ip_range in rule.get('IpRanges', []):
            if ip_range['CidrIp'] == '0.0.0.0/0':
                # Check if it's a problematic port
                from_port = rule.get('FromPort', 0)
                if from_port in [22, 3389, 1433, 3306, 5432]:  # SSH, RDP, SQL ports
                    problematic_rules.append(rule)
    
    if problematic_rules:
        ec2_client.revoke_security_group_ingress(
            GroupId=sg_id,
            IpPermissions=problematic_rules
        )
        logger.info(f"Removed {len(problematic_rules)} problematic rules from {sg_id}")

def handle_iam_findings(finding):
    """Handle IAM-related findings"""
    
    resource_id = finding['Resources'][0]['Id']
    resource_type = finding['Resources'][0]['Type']
    
    iam_client = boto3.client('iam')
    
    if 'AwsIamUser' in resource_type:
        username = resource_id.split('/')[-1]
        
        # Check for unused access keys
        access_keys = iam_client.list_access_keys(UserName=username)
        
        for key in access_keys['AccessKeyMetadata']:
            # Get last used information
            last_used = iam_client.get_access_key_last_used(
                AccessKeyId=key['AccessKeyId']
            )
            
            # If key hasn't been used in 90 days, deactivate it
            if 'LastUsedDate' in last_used['AccessKeyLastUsed']:
                days_since_use = (datetime.now() - last_used['AccessKeyLastUsed']['LastUsedDate'].replace(tzinfo=None)).days
                
                if days_since_use > 90:
                    iam_client.update_access_key(
                        UserName=username,
                        AccessKeyId=key['AccessKeyId'],
                        Status='Inactive'
                    )
                    logger.info(f"Deactivated unused access key {key['AccessKeyId']} for user {username}")

def update_finding_workflow(finding, workflow_status, note):
    """Update Security Hub finding workflow status"""
    
    securityhub_client = boto3.client('securityhub')
    
    finding_id = finding['Id']
    product_arn = finding['ProductArn']
    
    try:
        securityhub_client.batch_update_findings(
            FindingIdentifiers=[
                {
                    'Id': finding_id,
                    'ProductArn': product_arn
                }
            ],
            Workflow={
                'Status': workflow_status
            },
            Note={
                'Text': note,
                'UpdatedBy': 'AutomatedRemediation'
            }
        )
        
        logger.info(f"Updated finding {finding_id} workflow status to {workflow_status}")
        
    except Exception as e:
        logger.error(f"Failed to update finding workflow: {str(e)}")
'''
    
    return remediation_lambda_code

Security Hub Reporting and Analytics

Comprehensive Security Reporting

def generate_security_posture_report():
    """Generate comprehensive security posture report"""
    
    securityhub_client = boto3.client('securityhub')
    
    # Get findings summary
    findings_response = securityhub_client.get_findings(
        Filters={
            'RecordState': [
                {
                    'Value': 'ACTIVE',
                    'Comparison': 'EQUALS'
                }
            ]
        },
        MaxResults=100
    )
    
    findings = findings_response['Findings']
    
    # Analyze findings
    report = {
        'report_generated': datetime.now().isoformat(),
        'summary': {
            'total_findings': len(findings),
            'critical_findings': 0,
            'high_findings': 0,
            'medium_findings': 0,
            'low_findings': 0
        },
        'compliance_summary': {},
        'resource_analysis': {},
        'trend_analysis': {},
        'recommendations': []
    }
    
    # Categorize findings by severity
    for finding in findings:
        severity = finding.get('Severity', {}).get('Label', 'MEDIUM')
        
        if severity == 'CRITICAL':
            report['summary']['critical_findings'] += 1
        elif severity == 'HIGH':
            report['summary']['high_findings'] += 1
        elif severity == 'MEDIUM':
            report['summary']['medium_findings'] += 1
        elif severity == 'LOW':
            report['summary']['low_findings'] += 1
    
    # Analyze compliance status
    compliance_findings = {}
    for finding in findings:
        compliance = finding.get('Compliance', {})
        status = compliance.get('Status', 'UNKNOWN')
        
        if status not in compliance_findings:
            compliance_findings[status] = 0
        compliance_findings[status] += 1
    
    report['compliance_summary'] = compliance_findings
    
    # Analyze by resource type
    resource_findings = {}
    for finding in findings:
        for resource in finding.get('Resources', []):
            resource_type = resource.get('Type', 'Unknown')
            
            if resource_type not in resource_findings:
                resource_findings[resource_type] = 0
            resource_findings[resource_type] += 1
    
    report['resource_analysis'] = resource_findings
    
    # Generate recommendations
    report['recommendations'] = generate_security_recommendations(findings)
    
    return report

def generate_security_recommendations(findings):
    """Generate security recommendations based on findings analysis"""
    
    recommendations = []
    
    # Analyze finding patterns
    finding_types = {}
    severity_counts = {'CRITICAL': 0, 'HIGH': 0, 'MEDIUM': 0, 'LOW': 0}
    
    for finding in findings:
        # Count finding types
        for ftype in finding.get('Types', []):
            if ftype not in finding_types:
                finding_types[ftype] = 0
            finding_types[ftype] += 1
        
        # Count severities
        severity = finding.get('Severity', {}).get('Label', 'MEDIUM')
        if severity in severity_counts:
            severity_counts[severity] += 1
    
    # Generate recommendations based on patterns
    if severity_counts['CRITICAL'] > 0:
        recommendations.append({
            'priority': 'IMMEDIATE',
            'category': 'Critical Security Issues',
            'description': f'{severity_counts["CRITICAL"]} critical security findings require immediate attention',
            'action': 'Review and remediate all critical findings within 24 hours'
        })
    
    if severity_counts['HIGH'] > 10:
        recommendations.append({
            'priority': 'HIGH',
            'category': 'High-Severity Issues',
            'description': f'{severity_counts["HIGH"]} high-severity findings detected',
            'action': 'Implement automated remediation for common high-severity issues'
        })
    
    # Check for common finding types
    common_types = sorted(finding_types.items(), key=lambda x: x[1], reverse=True)[:5]
    
    for ftype, count in common_types:
        if count > 5:
            recommendations.append({
                'priority': 'MEDIUM',
                'category': 'Pattern Analysis',
                'description': f'Recurring finding type: {ftype} ({count} occurrences)',
                'action': f'Investigate root cause and implement preventive controls for {ftype}'
            })
    
    return recommendations

def create_executive_dashboard():
    """Create executive-level security dashboard"""
    
    cloudwatch = boto3.client('cloudwatch')
    
    executive_dashboard = {
        "widgets": [
            {
                "type": "metric",
                "properties": {
                    "metrics": [
                        ["AWS/SecurityHub", "ComplianceScore"]
                    ],
                    "period": 86400,  # Daily
                    "stat": "Average",
                    "region": "us-east-1",
                    "title": "Overall Security Compliance Score",
                    "yAxis": {
                        "left": {
                            "min": 0,
                            "max": 100
                        }
                    }
                }
            },
            {
                "type": "number",
                "properties": {
                    "metrics": [
                        ["AWS/SecurityHub", "Findings", "SeverityLabel", "CRITICAL"]
                    ],
                    "period": 300,
                    "stat": "Sum",
                    "region": "us-east-1",
                    "title": "Critical Findings"
                }
            },
            {
                "type": "number",
                "properties": {
                    "metrics": [
                        ["AWS/SecurityHub", "Findings", "SeverityLabel", "HIGH"]
                    ],
                    "period": 300,
                    "stat": "Sum",
                    "region": "us-east-1",
                    "title": "High-Severity Findings"
                }
            },
            {
                "type": "log",
                "properties": {
                    "query": "SOURCE '/aws/securityhub/findings'\n| fields @timestamp, Title, Severity.Label, Compliance.Status\n| filter Severity.Label = \"CRITICAL\"\n| sort @timestamp desc\n| limit 10",
                    "region": "us-east-1",
                    "title": "Recent Critical Findings",
                    "view": "table"
                }
            }
        ]
    }
    
    response = cloudwatch.put_dashboard(
        DashboardName='Security-Executive-Dashboard',
        DashboardBody=json.dumps(executive_dashboard)
    )
    
    return response

Conclusion

AWS Security Hub provides a powerful platform for centralized security management, but its effectiveness depends on proper configuration and integration. Key takeaways include:

  • Enable organization-wide Security Hub for comprehensive visibility
  • Configure multiple security standards for thorough compliance coverage
  • Create custom findings and insights for organization-specific requirements
  • Implement automated remediation for rapid response to security issues
  • Build comprehensive dashboards for different stakeholder needs
  • Generate regular reports for continuous security posture improvement

Effective Security Hub implementation requires balancing automation with human oversight. While automated remediation can handle routine security issues, complex findings still require skilled security analysts for proper investigation and resolution.

The patterns shown here provide a foundation for building enterprise-grade security operations that scale with your organization's growth while maintaining strong security posture across all AWS accounts and resources.### Multi-Account Security Orchestration

import boto3
import json
from datetime import datetime, timedelta

class EnterpriseSecurityHub:
    def __init__(self):
        self.securityhub_client = boto3.client('securityhub')
        self.organizations_client = boto3.client('organizations')
        
    def setup_organization_security_hub(self):
        """Setup Security Hub across AWS Organization"""
        
        # Enable Security Hub in master account
        try:
            self.securityhub_client.enable_security_hub(
                Tags={
                    'Purpose': 'CentralizedSecurityManagement',
                    'ManagedBy': 'SecurityTeam',
                    'Environment': 'Organization'
                },
                EnableDefaultStandards=True
            )
        except self.securityhub_client.exceptions.ResourceConflictException:
            print("Security Hub already enabled in master account")
        
        # Get organization accounts
        accounts = self.organizations_client.list_accounts()['Accounts']
        master_account_id = self.organizations_client.describe_organization()['Organization']['MasterAccountId']
        
        # Enable Security Hub in member accounts
        member_accounts = []
        for account in accounts:
            if account['Status'] == 'ACTIVE' and account['Id'] != master_account_id:
                member_accounts.append({
                    'AccountId': account['Id'],
                    'Email': account['Email']
                })
        
        # Create members and send invitations
        if member_accounts:
            try:
                self.securityhub_client.create_members(
                    AccountDetails=member_accounts
                )
                
                # Send invitations
                account_ids = [account['AccountId'] for account in member_accounts]
                self.securityhub_client.invite_members(
                    AccountIds=account_ids,
                    Message='Join organization Security Hub for centralized security management'
                )
                
            except Exception as e:
                print(f"Error setting up member accounts: {e}")
        
        return member_accounts

Conclusion

AWS Security Hub provides a powerful platform for centralized security management, but its effectiveness depends on proper configuration and integration. Key takeaways include:

  • Enable organization-wide Security Hub for comprehensive visibility
  • Configure multiple security standards for thorough compliance coverage
  • Create custom findings and insights for organization-specific requirements
  • Implement automated remediation for rapid response to security issues
  • Build comprehensive dashboards for different stakeholder needs
  • Generate regular reports for continuous security posture improvement

Effective Security Hub implementation requires balancing automation with human oversight. While automated remediation can handle routine security issues, complex findings still require skilled security analysts for proper investigation and resolution.

The patterns shown here provide a foundation for building enterprise-grade security operations that scale with your organization's growth while maintaining strong security posture across all AWS accounts and resources.## En hancing Security Hub with Specialized IAM Analysis

While Security Hub provides excellent general security monitoring capabilities, IAM security requires specialized analysis that goes beyond standard compliance checks. This is where AccessLens complements Security Hub perfectly.

AccessLens provides the deep IAM analysis capabilities that Security Hub lacks:

  • Advanced trust relationship analysis that identifies complex privilege escalation paths
  • Cross-account IAM risk assessment that spans your entire AWS organization
  • Policy analysis that goes beyond basic compliance to identify subtle security risks
  • Custom findings integration that enhances Security Hub with IAM-specific insights

By integrating AccessLens with Security Hub, you get the best of both worlds: comprehensive security monitoring from Security Hub and specialized IAM analysis from AccessLens.

Explore how AccessLens can enhance your Security Hub implementation and provide the specialized IAM security analysis that your organization needs.

Security Hub provides the platform, but AccessLens provides the IAM expertise. Together, they deliver comprehensive security visibility that scales with your organization.