AWS Security Hub: Centralized Security Management at Enterprise Scale
AWS Security Hub transforms fragmented security tools into a unified security operations center, but many organizations struggle to realize its full potential. The challenge isn't just technical—it's organizational. Security Hub can aggregate findings from dozens of security tools across hundreds of accounts, but without proper configuration and integration, it becomes just another source of alert fatigue.
The promise of Security Hub is compelling: centralized security posture management, automated compliance monitoring, and unified incident response. The reality for many organizations is overwhelming volumes of findings, false positives, and difficulty prioritizing remediation efforts.
This guide explores enterprise-grade Security Hub implementations that provide comprehensive visibility and control across complex AWS environments. We'll cover advanced configuration patterns, custom finding generation, and integration strategies that transform Security Hub from a finding aggregator into a strategic security platform.
Security Hub Architecture for Enterprise
Multi-Account Security Orchestration
import boto3
import json
from datetime import datetime, timedelta
class EnterpriseSecurityHub:
def __init__(self):
self.securityhub_client = boto3.client('securityhub')
self.organizations_client = boto3.client('organizations')
def setup_organization_security_hub(self):
"""Setup Security Hub across AWS Organization"""
# Enable Security Hub in master account
try:
self.securityhub_client.enable_security_hub(
Tags={
'Purpose': 'CentralizedSecurityManagement',
'ManagedBy': 'SecurityTeam',
'Environment': 'Organization'
},
EnableDefaultStandards=True
)
except self.securityhub_client.exceptions.ResourceConflictException:
print("Security Hub already enabled in master account")
# Get organization accounts
accounts = self.organizations_client.list_accounts()['Accounts']
master_account_id = self.organizations_client.describe_organization()['Organization']['MasterAccountId']
# Enable Security Hub in member accounts
member_accounts = []
for account in accounts:
if account['Status'] == 'ACTIVE' and account['Id'] != master_account_id:
member_accounts.append({
'AccountId': account['Id'],
'Email': account['Email']
})
# Create members and send invitations
if member_accounts:
try:
self.securityhub_client.create_members(
AccountDetails=member_accounts
)
# Send invitations
account_ids = [account['AccountId'] for account in member_accounts]
self.securityhub_client.invite_members(
AccountIds=account_ids,
Message='Join organization Security Hub for centralized security management'
)
except Exception as e:
print(f"Error setting up member accounts: {e}")
return member_accounts
def configure_security_standards(self):
"""Configure security standards and compliance frameworks"""
# Available security standards
standards_to_enable = [
'arn:aws:securityhub:::ruleset/finding-format/aws-foundational-security-standard',
'arn:aws:securityhub:::ruleset/finding-format/cis-aws-foundations-benchmark',
'arn:aws:securityhub:::ruleset/finding-format/pci-dss',
'arn:aws:securityhub:::ruleset/finding-format/aws-resource-tagging-standard'
]
enabled_standards = []
for standard_arn in standards_to_enable:
try:
response = self.securityhub_client.batch_enable_standards(
StandardsSubscriptionRequests=[
{
'StandardsArn': standard_arn,
'StandardsInput': {}
}
]
)
enabled_standards.append(response)
except Exception as e:
print(f"Failed to enable standard {standard_arn}: {e}")
return enabled_standards
Custom Security Standards
def create_custom_security_standards():
"""Create custom security standards for organization-specific requirements"""
# Custom control for encryption requirements
encryption_control = {
'Id': 'CustomEncryption.1',
'Title': 'All data at rest must be encrypted with customer-managed KMS keys',
'Description': 'Ensures all data storage services use customer-managed KMS keys for encryption',
'RemediationUrl': 'https://docs.company.com/security/encryption-standards',
'SeverityRating': 'HIGH',
'ComplianceStatus': 'PASSED' # Will be updated based on findings
}
# Custom control for network security
network_control = {
'Id': 'CustomNetwork.1',
'Title': 'All resources must be deployed in private subnets with proper security groups',
'Description': 'Ensures resources follow network security best practices',
'RemediationUrl': 'https://docs.company.com/security/network-standards',
'SeverityRating': 'MEDIUM',
'ComplianceStatus': 'PASSED'
}
# Custom control for access management
access_control = {
'Id': 'CustomAccess.1',
'Title': 'All IAM roles must have maximum session duration of 1 hour',
'Description': 'Limits session duration to reduce credential exposure risk',
'RemediationUrl': 'https://docs.company.com/security/access-standards',
'SeverityRating': 'MEDIUM',
'ComplianceStatus': 'PASSED'
}
custom_standard = {
'Name': 'CompanySecurityStandard',
'Description': 'Custom security standard for company-specific requirements',
'Controls': [encryption_control, network_control, access_control]
}
return custom_standard
def implement_custom_findings_generator():
"""Generate custom findings for Security Hub"""
custom_findings_code = '''
import boto3
import json
from datetime import datetime
import uuid
class CustomFindingsGenerator:
def __init__(self):
self.securityhub_client = boto3.client('securityhub')
self.account_id = boto3.client('sts').get_caller_identity()['Account']
self.region = boto3.Session().region_name
def generate_encryption_findings(self):
"""Generate findings for encryption compliance"""
findings = []
# Check S3 buckets for encryption
s3_client = boto3.client('s3')
buckets = s3_client.list_buckets()['Buckets']
for bucket in buckets:
bucket_name = bucket['Name']
try:
# Check bucket encryption
encryption = s3_client.get_bucket_encryption(Bucket=bucket_name)
# Check if using customer-managed KMS key
rules = encryption['ServerSideEncryptionConfiguration']['Rules']
uses_customer_kms = False
for rule in rules:
sse_config = rule['ApplyServerSideEncryptionByDefault']
if (sse_config['SSEAlgorithm'] == 'aws:kms' and
'KMSMasterKeyID' in sse_config and
not sse_config['KMSMasterKeyID'].startswith('alias/aws/')):
uses_customer_kms = True
break
if not uses_customer_kms:
finding = self.create_finding(
finding_id=f"custom-encryption-{bucket_name}",
title=f"S3 bucket {bucket_name} not using customer-managed KMS key",
description=f"Bucket {bucket_name} is not encrypted with customer-managed KMS key",
severity_label='HIGH',
resource_type='AwsS3Bucket',
resource_id=bucket_name,
compliance_status='FAILED',
control_id='CustomEncryption.1'
)
findings.append(finding)
except s3_client.exceptions.ClientError as e:
if e.response['Error']['Code'] == 'ServerSideEncryptionConfigurationNotFoundError':
# Bucket not encrypted at all
finding = self.create_finding(
finding_id=f"custom-encryption-{bucket_name}",
title=f"S3 bucket {bucket_name} not encrypted",
description=f"Bucket {bucket_name} has no server-side encryption configured",
severity_label='CRITICAL',
resource_type='AwsS3Bucket',
resource_id=bucket_name,
compliance_status='FAILED',
control_id='CustomEncryption.1'
)
findings.append(finding)
return findings
def generate_network_findings(self):
"""Generate findings for network security compliance"""
findings = []
ec2_client = boto3.client('ec2')
# Check EC2 instances in public subnets
instances = ec2_client.describe_instances()
for reservation in instances['Reservations']:
for instance in reservation['Instances']:
if instance['State']['Name'] != 'terminated':
subnet_id = instance['SubnetId']
# Check if subnet is public
subnet = ec2_client.describe_subnets(SubnetIds=[subnet_id])['Subnets'][0]
# Get route table for subnet
route_tables = ec2_client.describe_route_tables(
Filters=[
{'Name': 'association.subnet-id', 'Values': [subnet_id]}
]
)['RouteTables']
is_public = False
for rt in route_tables:
for route in rt['Routes']:
if (route.get('GatewayId', '').startswith('igw-') and
route.get('DestinationCidrBlock') == '0.0.0.0/0'):
is_public = True
break
if is_public:
finding = self.create_finding(
finding_id=f"custom-network-{instance['InstanceId']}",
title=f"EC2 instance {instance['InstanceId']} in public subnet",
description=f"Instance {instance['InstanceId']} is deployed in public subnet {subnet_id}",
severity_label='MEDIUM',
resource_type='AwsEc2Instance',
resource_id=instance['InstanceId'],
compliance_status='FAILED',
control_id='CustomNetwork.1'
)
findings.append(finding)
return findings
def create_finding(self, finding_id, title, description, severity_label,
resource_type, resource_id, compliance_status, control_id):
"""Create a Security Hub finding"""
severity_score = {
'INFORMATIONAL': 0,
'LOW': 1,
'MEDIUM': 40,
'HIGH': 70,
'CRITICAL': 90
}
finding = {
'SchemaVersion': '2018-10-08',
'Id': finding_id,
'ProductArn': f'arn:aws:securityhub:{self.region}:{self.account_id}:product/{self.account_id}/default',
'GeneratorId': 'custom-security-scanner',
'AwsAccountId': self.account_id,
'Types': ['Sensitive Data Identifications/Compliance/Custom'],
'FirstObservedAt': datetime.now().isoformat() + 'Z',
'LastObservedAt': datetime.now().isoformat() + 'Z',
'CreatedAt': datetime.now().isoformat() + 'Z',
'UpdatedAt': datetime.now().isoformat() + 'Z',
'Severity': {
'Label': severity_label,
'Normalized': severity_score[severity_label]
},
'Title': title,
'Description': description,
'Resources': [
{
'Type': resource_type,
'Id': f'arn:aws:{resource_type.lower().replace("aws", "")}:{self.region}:{self.account_id}:{resource_id}',
'Region': self.region,
'Partition': 'aws'
}
],
'Compliance': {
'Status': compliance_status,
'RelatedRequirements': [control_id]
},
'WorkflowState': 'NEW',
'RecordState': 'ACTIVE'
}
return finding
def submit_findings(self, findings):
"""Submit findings to Security Hub"""
if not findings:
return
# Security Hub accepts max 100 findings per batch
batch_size = 100
for i in range(0, len(findings), batch_size):
batch = findings[i:i + batch_size]
try:
response = self.securityhub_client.batch_import_findings(
Findings=batch
)
if response['FailedCount'] > 0:
print(f"Failed to import {response['FailedCount']} findings")
for failure in response['FailedFindings']:
print(f"Failed finding: {failure}")
except Exception as e:
print(f"Error submitting findings batch: {e}")
'''
return custom_findings_code
Advanced Insights and Analytics
Custom Security Insights
def create_custom_insights():
"""Create custom insights for security analysis"""
securityhub_client = boto3.client('securityhub')
# Insight for high-severity findings by resource type
resource_severity_insight = {
'Name': 'High-Severity Findings by Resource Type',
'Filters': {
'SeverityLabel': [
{
'Value': 'HIGH',
'Comparison': 'EQUALS'
},
{
'Value': 'CRITICAL',
'Comparison': 'EQUALS'
}
],
'RecordState': [
{
'Value': 'ACTIVE',
'Comparison': 'EQUALS'
}
]
},
'GroupByAttribute': 'ResourceType'
}
# Insight for compliance status by standard
compliance_insight = {
'Name': 'Compliance Status by Security Standard',
'Filters': {
'ComplianceStatus': [
{
'Value': 'FAILED',
'Comparison': 'EQUALS'
}
],
'RecordState': [
{
'Value': 'ACTIVE',
'Comparison': 'EQUALS'
}
]
},
'GroupByAttribute': 'ComplianceSecurityControlId'
}
# Insight for findings by AWS account
account_insight = {
'Name': 'Security Findings by AWS Account',
'Filters': {
'RecordState': [
{
'Value': 'ACTIVE',
'Comparison': 'EQUALS'
}
]
},
'GroupByAttribute': 'AwsAccountId'
}
# Insight for trending findings
trending_insight = {
'Name': 'Trending Security Issues',
'Filters': {
'CreatedAt': [
{
'Start': (datetime.now() - timedelta(days=7)).isoformat() + 'Z',
'End': datetime.now().isoformat() + 'Z',
'DateRange': {
'Value': 7,
'Unit': 'DAYS'
}
}
],
'RecordState': [
{
'Value': 'ACTIVE',
'Comparison': 'EQUALS'
}
]
},
'GroupByAttribute': 'Type'
}
insights = [resource_severity_insight, compliance_insight, account_insight, trending_insight]
created_insights = []
for insight_config in insights:
try:
response = securityhub_client.create_insight(**insight_config)
created_insights.append(response)
except Exception as e:
print(f"Failed to create insight {insight_config['Name']}: {e}")
return created_insights
def create_security_metrics_dashboard():
"""Create CloudWatch dashboard for Security Hub metrics"""
cloudwatch = boto3.client('cloudwatch')
dashboard_widgets = [
{
"type": "metric",
"properties": {
"metrics": [
["AWS/SecurityHub", "Findings", "ComplianceType", "PASSED"],
[".", ".", ".", "FAILED"],
[".", ".", ".", "WARNING"],
[".", ".", ".", "NOT_AVAILABLE"]
],
"period": 300,
"stat": "Sum",
"region": "us-east-1",
"title": "Compliance Status Overview"
}
},
{
"type": "metric",
"properties": {
"metrics": [
["AWS/SecurityHub", "Findings", "SeverityLabel", "CRITICAL"],
[".", ".", ".", "HIGH"],
[".", ".", ".", "MEDIUM"],
[".", ".", ".", "LOW"],
[".", ".", ".", "INFORMATIONAL"]
],
"period": 300,
"stat": "Sum",
"region": "us-east-1",
"title": "Findings by Severity"
}
},
{
"type": "log",
"properties": {
"query": "SOURCE '/aws/securityhub/findings'\n| fields @timestamp, AwsAccountId, Title, Severity.Label\n| filter Severity.Label = \"CRITICAL\" or Severity.Label = \"HIGH\"\n| stats count() by AwsAccountId\n| sort count desc",
"region": "us-east-1",
"title": "High-Severity Findings by Account",
"view": "table"
}
}
]
dashboard_body = {
"widgets": dashboard_widgets
}
response = cloudwatch.put_dashboard(
DashboardName='SecurityHub-Overview',
DashboardBody=json.dumps(dashboard_body)
)
return response
Automated Remediation Integration
Security Hub Response Automation
def create_automated_remediation_system():
"""Create automated remediation system for Security Hub findings"""
remediation_lambda_code = '''
import json
import boto3
import logging
from datetime import datetime
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def lambda_handler(event, context):
"""Handle Security Hub findings with automated remediation"""
# Parse Security Hub finding
finding = event['detail']['findings'][0]
finding_type = finding.get('Types', [])
severity = finding.get('Severity', {}).get('Label', 'MEDIUM')
resource_type = finding.get('Resources', [{}])[0].get('Type', '')
logger.info(f"Processing finding: {finding_type} with severity {severity}")
try:
# Route to appropriate remediation handler
if 'AwsS3Bucket' in resource_type:
handle_s3_findings(finding)
elif 'AwsEc2SecurityGroup' in resource_type:
handle_security_group_findings(finding)
elif 'AwsIamRole' in resource_type or 'AwsIamUser' in resource_type:
handle_iam_findings(finding)
elif 'AwsEc2Instance' in resource_type:
handle_ec2_findings(finding)
else:
handle_generic_finding(finding)
# Update finding workflow status
update_finding_workflow(finding, 'RESOLVED', 'Automatically remediated')
return {
'statusCode': 200,
'body': json.dumps('Finding processed successfully')
}
except Exception as e:
logger.error(f"Error processing finding: {str(e)}")
update_finding_workflow(finding, 'NEW', f'Remediation failed: {str(e)}')
return {
'statusCode': 500,
'body': json.dumps(f'Error processing finding: {str(e)}')
}
def handle_s3_findings(finding):
"""Handle S3-related security findings"""
resource_id = finding['Resources'][0]['Id']
bucket_name = resource_id.split(':')[-1]
s3_client = boto3.client('s3')
# Check finding type and apply appropriate remediation
finding_types = finding.get('Types', [])
if any('Public' in ftype for ftype in finding_types):
# Block public access
s3_client.put_public_access_block(
Bucket=bucket_name,
PublicAccessBlockConfiguration={
'BlockPublicAcls': True,
'IgnorePublicAcls': True,
'BlockPublicPolicy': True,
'RestrictPublicBuckets': True
}
)
logger.info(f"Blocked public access for bucket {bucket_name}")
if any('Encryption' in ftype for ftype in finding_types):
# Enable default encryption
s3_client.put_bucket_encryption(
Bucket=bucket_name,
ServerSideEncryptionConfiguration={
'Rules': [
{
'ApplyServerSideEncryptionByDefault': {
'SSEAlgorithm': 'aws:kms',
'KMSMasterKeyID': 'alias/aws/s3'
}
}
]
}
)
logger.info(f"Enabled encryption for bucket {bucket_name}")
def handle_security_group_findings(finding):
"""Handle security group findings"""
resource_id = finding['Resources'][0]['Id']
sg_id = resource_id.split('/')[-1]
ec2_client = boto3.client('ec2')
# Get security group details
sg_response = ec2_client.describe_security_groups(GroupIds=[sg_id])
security_group = sg_response['SecurityGroups'][0]
# Remove overly permissive rules
problematic_rules = []
for rule in security_group['IpPermissions']:
for ip_range in rule.get('IpRanges', []):
if ip_range['CidrIp'] == '0.0.0.0/0':
# Check if it's a problematic port
from_port = rule.get('FromPort', 0)
if from_port in [22, 3389, 1433, 3306, 5432]: # SSH, RDP, SQL ports
problematic_rules.append(rule)
if problematic_rules:
ec2_client.revoke_security_group_ingress(
GroupId=sg_id,
IpPermissions=problematic_rules
)
logger.info(f"Removed {len(problematic_rules)} problematic rules from {sg_id}")
def handle_iam_findings(finding):
"""Handle IAM-related findings"""
resource_id = finding['Resources'][0]['Id']
resource_type = finding['Resources'][0]['Type']
iam_client = boto3.client('iam')
if 'AwsIamUser' in resource_type:
username = resource_id.split('/')[-1]
# Check for unused access keys
access_keys = iam_client.list_access_keys(UserName=username)
for key in access_keys['AccessKeyMetadata']:
# Get last used information
last_used = iam_client.get_access_key_last_used(
AccessKeyId=key['AccessKeyId']
)
# If key hasn't been used in 90 days, deactivate it
if 'LastUsedDate' in last_used['AccessKeyLastUsed']:
days_since_use = (datetime.now() - last_used['AccessKeyLastUsed']['LastUsedDate'].replace(tzinfo=None)).days
if days_since_use > 90:
iam_client.update_access_key(
UserName=username,
AccessKeyId=key['AccessKeyId'],
Status='Inactive'
)
logger.info(f"Deactivated unused access key {key['AccessKeyId']} for user {username}")
def update_finding_workflow(finding, workflow_status, note):
"""Update Security Hub finding workflow status"""
securityhub_client = boto3.client('securityhub')
finding_id = finding['Id']
product_arn = finding['ProductArn']
try:
securityhub_client.batch_update_findings(
FindingIdentifiers=[
{
'Id': finding_id,
'ProductArn': product_arn
}
],
Workflow={
'Status': workflow_status
},
Note={
'Text': note,
'UpdatedBy': 'AutomatedRemediation'
}
)
logger.info(f"Updated finding {finding_id} workflow status to {workflow_status}")
except Exception as e:
logger.error(f"Failed to update finding workflow: {str(e)}")
'''
return remediation_lambda_code
Security Hub Reporting and Analytics
Comprehensive Security Reporting
def generate_security_posture_report():
"""Generate comprehensive security posture report"""
securityhub_client = boto3.client('securityhub')
# Get findings summary
findings_response = securityhub_client.get_findings(
Filters={
'RecordState': [
{
'Value': 'ACTIVE',
'Comparison': 'EQUALS'
}
]
},
MaxResults=100
)
findings = findings_response['Findings']
# Analyze findings
report = {
'report_generated': datetime.now().isoformat(),
'summary': {
'total_findings': len(findings),
'critical_findings': 0,
'high_findings': 0,
'medium_findings': 0,
'low_findings': 0
},
'compliance_summary': {},
'resource_analysis': {},
'trend_analysis': {},
'recommendations': []
}
# Categorize findings by severity
for finding in findings:
severity = finding.get('Severity', {}).get('Label', 'MEDIUM')
if severity == 'CRITICAL':
report['summary']['critical_findings'] += 1
elif severity == 'HIGH':
report['summary']['high_findings'] += 1
elif severity == 'MEDIUM':
report['summary']['medium_findings'] += 1
elif severity == 'LOW':
report['summary']['low_findings'] += 1
# Analyze compliance status
compliance_findings = {}
for finding in findings:
compliance = finding.get('Compliance', {})
status = compliance.get('Status', 'UNKNOWN')
if status not in compliance_findings:
compliance_findings[status] = 0
compliance_findings[status] += 1
report['compliance_summary'] = compliance_findings
# Analyze by resource type
resource_findings = {}
for finding in findings:
for resource in finding.get('Resources', []):
resource_type = resource.get('Type', 'Unknown')
if resource_type not in resource_findings:
resource_findings[resource_type] = 0
resource_findings[resource_type] += 1
report['resource_analysis'] = resource_findings
# Generate recommendations
report['recommendations'] = generate_security_recommendations(findings)
return report
def generate_security_recommendations(findings):
"""Generate security recommendations based on findings analysis"""
recommendations = []
# Analyze finding patterns
finding_types = {}
severity_counts = {'CRITICAL': 0, 'HIGH': 0, 'MEDIUM': 0, 'LOW': 0}
for finding in findings:
# Count finding types
for ftype in finding.get('Types', []):
if ftype not in finding_types:
finding_types[ftype] = 0
finding_types[ftype] += 1
# Count severities
severity = finding.get('Severity', {}).get('Label', 'MEDIUM')
if severity in severity_counts:
severity_counts[severity] += 1
# Generate recommendations based on patterns
if severity_counts['CRITICAL'] > 0:
recommendations.append({
'priority': 'IMMEDIATE',
'category': 'Critical Security Issues',
'description': f'{severity_counts["CRITICAL"]} critical security findings require immediate attention',
'action': 'Review and remediate all critical findings within 24 hours'
})
if severity_counts['HIGH'] > 10:
recommendations.append({
'priority': 'HIGH',
'category': 'High-Severity Issues',
'description': f'{severity_counts["HIGH"]} high-severity findings detected',
'action': 'Implement automated remediation for common high-severity issues'
})
# Check for common finding types
common_types = sorted(finding_types.items(), key=lambda x: x[1], reverse=True)[:5]
for ftype, count in common_types:
if count > 5:
recommendations.append({
'priority': 'MEDIUM',
'category': 'Pattern Analysis',
'description': f'Recurring finding type: {ftype} ({count} occurrences)',
'action': f'Investigate root cause and implement preventive controls for {ftype}'
})
return recommendations
def create_executive_dashboard():
"""Create executive-level security dashboard"""
cloudwatch = boto3.client('cloudwatch')
executive_dashboard = {
"widgets": [
{
"type": "metric",
"properties": {
"metrics": [
["AWS/SecurityHub", "ComplianceScore"]
],
"period": 86400, # Daily
"stat": "Average",
"region": "us-east-1",
"title": "Overall Security Compliance Score",
"yAxis": {
"left": {
"min": 0,
"max": 100
}
}
}
},
{
"type": "number",
"properties": {
"metrics": [
["AWS/SecurityHub", "Findings", "SeverityLabel", "CRITICAL"]
],
"period": 300,
"stat": "Sum",
"region": "us-east-1",
"title": "Critical Findings"
}
},
{
"type": "number",
"properties": {
"metrics": [
["AWS/SecurityHub", "Findings", "SeverityLabel", "HIGH"]
],
"period": 300,
"stat": "Sum",
"region": "us-east-1",
"title": "High-Severity Findings"
}
},
{
"type": "log",
"properties": {
"query": "SOURCE '/aws/securityhub/findings'\n| fields @timestamp, Title, Severity.Label, Compliance.Status\n| filter Severity.Label = \"CRITICAL\"\n| sort @timestamp desc\n| limit 10",
"region": "us-east-1",
"title": "Recent Critical Findings",
"view": "table"
}
}
]
}
response = cloudwatch.put_dashboard(
DashboardName='Security-Executive-Dashboard',
DashboardBody=json.dumps(executive_dashboard)
)
return response
Conclusion
AWS Security Hub provides a powerful platform for centralized security management, but its effectiveness depends on proper configuration and integration. Key takeaways include:
- Enable organization-wide Security Hub for comprehensive visibility
- Configure multiple security standards for thorough compliance coverage
- Create custom findings and insights for organization-specific requirements
- Implement automated remediation for rapid response to security issues
- Build comprehensive dashboards for different stakeholder needs
- Generate regular reports for continuous security posture improvement
Effective Security Hub implementation requires balancing automation with human oversight. While automated remediation can handle routine security issues, complex findings still require skilled security analysts for proper investigation and resolution.
The patterns shown here provide a foundation for building enterprise-grade security operations that scale with your organization's growth while maintaining strong security posture across all AWS accounts and resources.### Multi-Account Security Orchestration
import boto3
import json
from datetime import datetime, timedelta
class EnterpriseSecurityHub:
def __init__(self):
self.securityhub_client = boto3.client('securityhub')
self.organizations_client = boto3.client('organizations')
def setup_organization_security_hub(self):
"""Setup Security Hub across AWS Organization"""
# Enable Security Hub in master account
try:
self.securityhub_client.enable_security_hub(
Tags={
'Purpose': 'CentralizedSecurityManagement',
'ManagedBy': 'SecurityTeam',
'Environment': 'Organization'
},
EnableDefaultStandards=True
)
except self.securityhub_client.exceptions.ResourceConflictException:
print("Security Hub already enabled in master account")
# Get organization accounts
accounts = self.organizations_client.list_accounts()['Accounts']
master_account_id = self.organizations_client.describe_organization()['Organization']['MasterAccountId']
# Enable Security Hub in member accounts
member_accounts = []
for account in accounts:
if account['Status'] == 'ACTIVE' and account['Id'] != master_account_id:
member_accounts.append({
'AccountId': account['Id'],
'Email': account['Email']
})
# Create members and send invitations
if member_accounts:
try:
self.securityhub_client.create_members(
AccountDetails=member_accounts
)
# Send invitations
account_ids = [account['AccountId'] for account in member_accounts]
self.securityhub_client.invite_members(
AccountIds=account_ids,
Message='Join organization Security Hub for centralized security management'
)
except Exception as e:
print(f"Error setting up member accounts: {e}")
return member_accounts
Conclusion
AWS Security Hub provides a powerful platform for centralized security management, but its effectiveness depends on proper configuration and integration. Key takeaways include:
- Enable organization-wide Security Hub for comprehensive visibility
- Configure multiple security standards for thorough compliance coverage
- Create custom findings and insights for organization-specific requirements
- Implement automated remediation for rapid response to security issues
- Build comprehensive dashboards for different stakeholder needs
- Generate regular reports for continuous security posture improvement
Effective Security Hub implementation requires balancing automation with human oversight. While automated remediation can handle routine security issues, complex findings still require skilled security analysts for proper investigation and resolution.
The patterns shown here provide a foundation for building enterprise-grade security operations that scale with your organization's growth while maintaining strong security posture across all AWS accounts and resources.## En hancing Security Hub with Specialized IAM Analysis
While Security Hub provides excellent general security monitoring capabilities, IAM security requires specialized analysis that goes beyond standard compliance checks. This is where AccessLens complements Security Hub perfectly.
AccessLens provides the deep IAM analysis capabilities that Security Hub lacks:
- Advanced trust relationship analysis that identifies complex privilege escalation paths
- Cross-account IAM risk assessment that spans your entire AWS organization
- Policy analysis that goes beyond basic compliance to identify subtle security risks
- Custom findings integration that enhances Security Hub with IAM-specific insights
By integrating AccessLens with Security Hub, you get the best of both worlds: comprehensive security monitoring from Security Hub and specialized IAM analysis from AccessLens.
Explore how AccessLens can enhance your Security Hub implementation and provide the specialized IAM security analysis that your organization needs.
Security Hub provides the platform, but AccessLens provides the IAM expertise. Together, they deliver comprehensive security visibility that scales with your organization.