13 min read

AWS WAF Advanced Protection: Beyond Basic Web Application Security

WAFWeb SecurityDDoS ProtectionBot Management

AWS WAF has evolved from a basic web application firewall into a sophisticated security platform capable of defending against advanced threats. This guide explores enterprise-grade WAF configurations that provide comprehensive protection for modern web applications.

Advanced WAF Architecture

Multi-Layer Protection Strategy

import boto3
import json

class AdvancedWAFManager:
    def __init__(self):
        self.wafv2_client = boto3.client('wafv2')
        self.cloudfront_client = boto3.client('cloudfront')
        self.alb_client = boto3.client('elbv2')
        
    def create_comprehensive_web_acl(self):
        """Create a comprehensive WAF Web ACL with multiple protection layers"""
        
        # Define rule priorities and configurations
        rules = [
            self.create_aws_managed_core_rule_set(),
            self.create_aws_managed_known_bad_inputs(),
            self.create_aws_managed_sql_injection_rule(),
            self.create_aws_managed_xss_rule(),
            self.create_rate_limiting_rule(),
            self.create_geo_blocking_rule(),
            self.create_ip_reputation_rule(),
            self.create_bot_control_rule(),
            self.create_custom_application_rules(),
            self.create_size_restriction_rule()
        ]
        
        web_acl_config = {
            'Name': 'ComprehensiveWebACL',
            'Scope': 'CLOUDFRONT',  # or 'REGIONAL' for ALB/API Gateway
            'DefaultAction': {'Allow': {}},
            'Description': 'Comprehensive WAF protection with multiple security layers',
            'Rules': rules,
            'VisibilityConfig': {
                'SampledRequestsEnabled': True,
                'CloudWatchMetricsEnabled': True,
                'MetricName': 'ComprehensiveWebACL'
            },
            'Tags': [
                {'Key': 'Purpose', 'Value': 'WebApplicationSecurity'},
                {'Key': 'Environment', 'Value': 'Production'},
                {'Key': 'ManagedBy', 'Value': 'SecurityTeam'}
            ]
        }
        
        response = self.wafv2_client.create_web_acl(**web_acl_config)
        return response
    
    def create_aws_managed_core_rule_set(self):
        """AWS Managed Core Rule Set for common vulnerabilities"""
        return {
            'Name': 'AWSManagedRulesCommonRuleSet',
            'Priority': 1,
            'OverrideAction': {'None': {}},
            'Statement': {
                'ManagedRuleGroupStatement': {
                    'VendorName': 'AWS',
                    'Name': 'AWSManagedRulesCommonRuleSet',
                    'ExcludedRules': [
                        # Exclude rules that might cause false positives
                        {'Name': 'SizeRestrictions_BODY'},
                        {'Name': 'GenericRFI_BODY'}
                    ]
                }
            },
            'VisibilityConfig': {
                'SampledRequestsEnabled': True,
                'CloudWatchMetricsEnabled': True,
                'MetricName': 'CommonRuleSetMetric'
            }
        }
    
    def create_rate_limiting_rule(self):
        """Advanced rate limiting with different thresholds"""
        return {
            'Name': 'RateLimitingRule',
            'Priority': 2,
            'Action': {'Block': {}},
            'Statement': {
                'RateBasedStatement': {
                    'Limit': 2000,  # Requests per 5-minute window
                    'AggregateKeyType': 'IP',
                    'ScopeDownStatement': {
                        'NotStatement': {
                            'Statement': {
                                'IPSetReferenceStatement': {
                                    'ARN': 'arn:aws:wafv2:us-east-1:123456789012:global/ipset/AllowedIPs/12345678-1234-1234-1234-123456789012'
                                }
                            }
                        }
                    }
                }
            },
            'VisibilityConfig': {
                'SampledRequestsEnabled': True,
                'CloudWatchMetricsEnabled': True,
                'MetricName': 'RateLimitingMetric'
            }
        }
    
    def create_bot_control_rule(self):
        """AWS Managed Bot Control for sophisticated bot detection"""
        return {
            'Name': 'AWSManagedRulesBotControlRuleSet',
            'Priority': 3,
            'OverrideAction': {'None': {}},
            'Statement': {
                'ManagedRuleGroupStatement': {
                    'VendorName': 'AWS',
                    'Name': 'AWSManagedRulesBotControlRuleSet',
                    'ManagedRuleGroupConfigs': [
                        {
                            'LoginPath': '/login',
                            'PayloadType': 'JSON',
                            'UsernameField': 'username',
                            'PasswordField': 'password'
                        }
                    ]
                }
            },
            'VisibilityConfig': {
                'SampledRequestsEnabled': True,
                'CloudWatchMetricsEnabled': True,
                'MetricName': 'BotControlMetric'
            }
        }

Custom Rule Development

def create_advanced_custom_rules():
    """Create sophisticated custom WAF rules"""
    
    # Advanced SQL injection detection
    advanced_sqli_rule = {
        'Name': 'AdvancedSQLInjectionRule',
        'Priority': 10,
        'Action': {'Block': {}},
        'Statement': {
            'OrStatement': {
                'Statements': [
                    {
                        'ByteMatchStatement': {
                            'SearchString': b'union select',
                            'FieldToMatch': {'Body': {}},
                            'TextTransformations': [
                                {'Priority': 0, 'Type': 'LOWERCASE'},
                                {'Priority': 1, 'Type': 'REMOVE_NULLS'}
                            ],
                            'PositionalConstraint': 'CONTAINS'
                        }
                    },
                    {
                        'RegexMatchStatement': {
                            'RegexString': r'(?i)(union\s+select|select\s+.*\s+from|insert\s+into|update\s+.*\s+set|delete\s+from)',
                            'FieldToMatch': {'AllQueryArguments': {}},
                            'TextTransformations': [
                                {'Priority': 0, 'Type': 'URL_DECODE'},
                                {'Priority': 1, 'Type': 'HTML_ENTITY_DECODE'}
                            ]
                        }
                    },
                    {
                        'SqliMatchStatement': {
                            'FieldToMatch': {'Body': {}},
                            'TextTransformations': [
                                {'Priority': 0, 'Type': 'URL_DECODE'},
                                {'Priority': 1, 'Type': 'HTML_ENTITY_DECODE'}
                            ]
                        }
                    }
                ]
            }
        },
        'VisibilityConfig': {
            'SampledRequestsEnabled': True,
            'CloudWatchMetricsEnabled': True,
            'MetricName': 'AdvancedSQLInjectionMetric'
        }
    }
    
    # Advanced XSS protection
    advanced_xss_rule = {
        'Name': 'AdvancedXSSRule',
        'Priority': 11,
        'Action': {'Block': {}},
        'Statement': {
            'OrStatement': {
                'Statements': [
                    {
                        'XssMatchStatement': {
                            'FieldToMatch': {'AllQueryArguments': {}},
                            'TextTransformations': [
                                {'Priority': 0, 'Type': 'URL_DECODE'},
                                {'Priority': 1, 'Type': 'HTML_ENTITY_DECODE'},
                                {'Priority': 2, 'Type': 'JS_DECODE'}
                            ]
                        }
                    },
                    {
                        'RegexMatchStatement': {
                            'RegexString': r'(?i)(<script|javascript:|vbscript:|onload=|onerror=|onclick=)',
                            'FieldToMatch': {'Body': {}},
                            'TextTransformations': [
                                {'Priority': 0, 'Type': 'LOWERCASE'},
                                {'Priority': 1, 'Type': 'REMOVE_NULLS'}
                            ]
                        }
                    }
                ]
            }
        },
        'VisibilityConfig': {
            'SampledRequestsEnabled': True,
            'CloudWatchMetricsEnabled': True,
            'MetricName': 'AdvancedXSSMetric'
        }
    }
    
    # API abuse prevention
    api_abuse_rule = {
        'Name': 'APIAbusePreventionRule',
        'Priority': 12,
        'Action': {'Block': {}},
        'Statement': {
            'AndStatement': {
                'Statements': [
                    {
                        'ByteMatchStatement': {
                            'SearchString': b'/api/',
                            'FieldToMatch': {'UriPath': {}},
                            'TextTransformations': [{'Priority': 0, 'Type': 'LOWERCASE'}],
                            'PositionalConstraint': 'STARTS_WITH'
                        }
                    },
                    {
                        'RateBasedStatement': {
                            'Limit': 100,  # 100 requests per 5 minutes for API endpoints
                            'AggregateKeyType': 'IP'
                        }
                    }
                ]
            }
        },
        'VisibilityConfig': {
            'SampledRequestsEnabled': True,
            'CloudWatchMetricsEnabled': True,
            'MetricName': 'APIAbuseMetric'
        }
    }
    
    return [advanced_sqli_rule, advanced_xss_rule, api_abuse_rule]

def create_application_specific_rules():
    """Create rules specific to application behavior"""
    
    # Login protection rule
    login_protection_rule = {
        'Name': 'LoginProtectionRule',
        'Priority': 15,
        'Action': {'Block': {}},
        'Statement': {
            'AndStatement': {
                'Statements': [
                    {
                        'ByteMatchStatement': {
                            'SearchString': b'/login',
                            'FieldToMatch': {'UriPath': {}},
                            'TextTransformations': [{'Priority': 0, 'Type': 'LOWERCASE'}],
                            'PositionalConstraint': 'EXACTLY'
                        }
                    },
                    {
                        'RateBasedStatement': {
                            'Limit': 10,  # 10 login attempts per 5 minutes
                            'AggregateKeyType': 'IP'
                        }
                    }
                ]
            }
        },
        'VisibilityConfig': {
            'SampledRequestsEnabled': True,
            'CloudWatchMetricsEnabled': True,
            'MetricName': 'LoginProtectionMetric'
        }
    }
    
    # File upload protection
    file_upload_rule = {
        'Name': 'FileUploadProtectionRule',
        'Priority': 16,
        'Action': {'Block': {}},
        'Statement': {
            'AndStatement': {
                'Statements': [
                    {
                        'ByteMatchStatement': {
                            'SearchString': b'multipart/form-data',
                            'FieldToMatch': {
                                'SingleHeader': {'Name': 'content-type'}
                            },
                            'TextTransformations': [{'Priority': 0, 'Type': 'LOWERCASE'}],
                            'PositionalConstraint': 'CONTAINS'
                        }
                    },
                    {
                        'SizeConstraintStatement': {
                            'FieldToMatch': {'Body': {}},
                            'ComparisonOperator': 'GT',
                            'Size': 10485760,  # 10MB limit
                            'TextTransformations': [{'Priority': 0, 'Type': 'NONE'}]
                        }
                    }
                ]
            }
        },
        'VisibilityConfig': {
            'SampledRequestsEnabled': True,
            'CloudWatchMetricsEnabled': True,
            'MetricName': 'FileUploadProtectionMetric'
        }
    }
    
    return [login_protection_rule, file_upload_rule]

Advanced Bot Management

Sophisticated Bot Detection

def create_advanced_bot_management():
    """Create advanced bot management configuration"""
    
    # Custom bot detection rule
    custom_bot_rule = {
        'Name': 'CustomBotDetectionRule',
        'Priority': 5,
        'Action': {'Block': {}},
        'Statement': {
            'OrStatement': {
                'Statements': [
                    # Detect bots by User-Agent patterns
                    {
                        'RegexMatchStatement': {
                            'RegexString': r'(?i)(bot|crawler|spider|scraper|curl|wget|python|java|go-http)',
                            'FieldToMatch': {
                                'SingleHeader': {'Name': 'user-agent'}
                            },
                            'TextTransformations': [{'Priority': 0, 'Type': 'LOWERCASE'}]
                        }
                    },
                    # Detect missing common headers
                    {
                        'NotStatement': {
                            'Statement': {
                                'ByteMatchStatement': {
                                    'SearchString': b'',
                                    'FieldToMatch': {
                                        'SingleHeader': {'Name': 'accept'}
                                    },
                                    'TextTransformations': [{'Priority': 0, 'Type': 'NONE'}],
                                    'PositionalConstraint': 'CONTAINS'
                                }
                            }
                        }
                    },
                    # Detect suspicious request patterns
                    {
                        'AndStatement': {
                            'Statements': [
                                {
                                    'RateBasedStatement': {
                                        'Limit': 500,
                                        'AggregateKeyType': 'IP'
                                    }
                                },
                                {
                                    'ByteMatchStatement': {
                                        'SearchString': b'GET',
                                        'FieldToMatch': {'Method': {}},
                                        'TextTransformations': [{'Priority': 0, 'Type': 'NONE'}],
                                        'PositionalConstraint': 'EXACTLY'
                                    }
                                }
                            ]
                        }
                    }
                ]
            }
        },
        'VisibilityConfig': {
            'SampledRequestsEnabled': True,
            'CloudWatchMetricsEnabled': True,
            'MetricName': 'CustomBotDetectionMetric'
        }
    }
    
    # Good bot allowlist
    good_bot_allowlist_rule = {
        'Name': 'GoodBotAllowlistRule',
        'Priority': 4,
        'Action': {'Allow': {}},
        'Statement': {
            'OrStatement': {
                'Statements': [
                    {
                        'RegexMatchStatement': {
                            'RegexString': r'(?i)(googlebot|bingbot|slurp|duckduckbot|baiduspider|yandexbot|facebookexternalhit|twitterbot|linkedinbot)',
                            'FieldToMatch': {
                                'SingleHeader': {'Name': 'user-agent'}
                            },
                            'TextTransformations': [{'Priority': 0, 'Type': 'LOWERCASE'}]
                        }
                    },
                    # Verify legitimate search engine bots by IP ranges
                    {
                        'IPSetReferenceStatement': {
                            'ARN': 'arn:aws:wafv2:us-east-1:123456789012:global/ipset/SearchEngineIPs/12345678-1234-1234-1234-123456789012'
                        }
                    }
                ]
            }
        },
        'VisibilityConfig': {
            'SampledRequestsEnabled': True,
            'CloudWatchMetricsEnabled': True,
            'MetricName': 'GoodBotAllowlistMetric'
        }
    }
    
    return [good_bot_allowlist_rule, custom_bot_rule]

def create_challenge_based_protection():
    """Create challenge-based protection for suspicious traffic"""
    
    # CAPTCHA challenge for suspicious behavior
    captcha_challenge_rule = {
        'Name': 'CAPTCHAChallengeRule',
        'Priority': 6,
        'Action': {
            'Captcha': {
                'CustomRequestHandling': {
                    'InsertHeaders': [
                        {
                            'Name': 'X-Challenge-Reason',
                            'Value': 'Suspicious-Activity-Detected'
                        }
                    ]
                }
            }
        },
        'Statement': {
            'OrStatement': {
                'Statements': [
                    # Challenge high-frequency requests
                    {
                        'RateBasedStatement': {
                            'Limit': 1000,
                            'AggregateKeyType': 'IP'
                        }
                    },
                    # Challenge requests with suspicious patterns
                    {
                        'AndStatement': {
                            'Statements': [
                                {
                                    'ByteMatchStatement': {
                                        'SearchString': b'admin',
                                        'FieldToMatch': {'UriPath': {}},
                                        'TextTransformations': [{'Priority': 0, 'Type': 'LOWERCASE'}],
                                        'PositionalConstraint': 'CONTAINS'
                                    }
                                },
                                {
                                    'NotStatement': {
                                        'Statement': {
                                            'IPSetReferenceStatement': {
                                                'ARN': 'arn:aws:wafv2:us-east-1:123456789012:global/ipset/AdminIPs/12345678-1234-1234-1234-123456789012'
                                            }
                                        }
                                    }
                                }
                            ]
                        }
                    }
                ]
            }
        },
        'VisibilityConfig': {
            'SampledRequestsEnabled': True,
            'CloudWatchMetricsEnabled': True,
            'MetricName': 'CAPTCHAChallengeMetric'
        }
    }
    
    return captcha_challenge_rule

Geographic and IP-Based Controls

Advanced Geo-Blocking

def create_advanced_geo_controls():
    """Create sophisticated geographic access controls"""
    
    # Block high-risk countries
    geo_blocking_rule = {
        'Name': 'GeoBlockingRule',
        'Priority': 7,
        'Action': {'Block': {}},
        'Statement': {
            'GeoMatchStatement': {
                'CountryCodes': [
                    'CN',  # China
                    'RU',  # Russia
                    'KP',  # North Korea
                    'IR',  # Iran
                    'SY',  # Syria
                    'CU'   # Cuba
                ]
            }
        },
        'VisibilityConfig': {
            'SampledRequestsEnabled': True,
            'CloudWatchMetricsEnabled': True,
            'MetricName': 'GeoBlockingMetric'
        }
    }
    
    # Allow specific countries for sensitive operations
    sensitive_geo_rule = {
        'Name': 'SensitiveOperationsGeoRule',
        'Priority': 8,
        'Action': {'Block': {}},
        'Statement': {
            'AndStatement': {
                'Statements': [
                    {
                        'ByteMatchStatement': {
                            'SearchString': b'/admin',
                            'FieldToMatch': {'UriPath': {}},
                            'TextTransformations': [{'Priority': 0, 'Type': 'LOWERCASE'}],
                            'PositionalConstraint': 'STARTS_WITH'
                        }
                    },
                    {
                        'NotStatement': {
                            'Statement': {
                                'GeoMatchStatement': {
                                    'CountryCodes': ['US', 'CA', 'GB', 'AU']  # Allowed countries
                                }
                            }
                        }
                    }
                ]
            }
        },
        'VisibilityConfig': {
            'SampledRequestsEnabled': True,
            'CloudWatchMetricsEnabled': True,
            'MetricName': 'SensitiveGeoMetric'
        }
    }
    
    return [geo_blocking_rule, sensitive_geo_rule]

def create_ip_reputation_controls():
    """Create IP reputation-based controls"""
    
    # Block known malicious IPs
    ip_reputation_rule = {
        'Name': 'IPReputationRule',
        'Priority': 9,
        'Action': {'Block': {}},
        'Statement': {
            'OrStatement': {
                'Statements': [
                    {
                        'IPSetReferenceStatement': {
                            'ARN': 'arn:aws:wafv2:us-east-1:123456789012:global/ipset/MaliciousIPs/12345678-1234-1234-1234-123456789012'
                        }
                    },
                    {
                        'IPSetReferenceStatement': {
                            'ARN': 'arn:aws:wafv2:us-east-1:123456789012:global/ipset/TorExitNodes/12345678-1234-1234-1234-123456789012'
                        }
                    }
                ]
            }
        },
        'VisibilityConfig': {
            'SampledRequestsEnabled': True,
            'CloudWatchMetricsEnabled': True,
            'MetricName': 'IPReputationMetric'
        }
    }
    
    return ip_reputation_rule

def update_ip_reputation_lists():
    """Automatically update IP reputation lists"""
    
    import requests
    
    # Threat intelligence feeds
    threat_feeds = [
        {
            'name': 'malicious-ips',
            'url': 'https://reputation-feed.example.com/malicious-ips.txt',
            'ipset_arn': 'arn:aws:wafv2:us-east-1:123456789012:global/ipset/MaliciousIPs/12345678-1234-1234-1234-123456789012'
        },
        {
            'name': 'tor-exit-nodes',
            'url': 'https://check.torproject.org/torbulkexitlist',
            'ipset_arn': 'arn:aws:wafv2:us-east-1:123456789012:global/ipset/TorExitNodes/12345678-1234-1234-1234-123456789012'
        }
    ]
    
    wafv2_client = boto3.client('wafv2')
    
    for feed in threat_feeds:
        try:
            # Download threat feed
            response = requests.get(feed['url'], timeout=30)
            response.raise_for_status()
            
            # Parse IP addresses
            ip_addresses = []
            for line in response.text.strip().split('\n'):
                line = line.strip()
                if line and not line.startswith('#'):
                    # Validate IP format
                    if '/' not in line:
                        line += '/32'  # Add CIDR notation for single IPs
                    ip_addresses.append(line)
            
            # Update IP set (limited to 10,000 IPs per set)
            if ip_addresses:
                ip_addresses = ip_addresses[:10000]  # WAF limit
                
                # Get current IP set to get lock token
                current_ipset = wafv2_client.get_ip_set(
                    Scope='CLOUDFRONT',
                    Id=feed['ipset_arn'].split('/')[-1],
                    Name=feed['name']
                )
                
                # Update IP set
                wafv2_client.update_ip_set(
                    Scope='CLOUDFRONT',
                    Id=feed['ipset_arn'].split('/')[-1],
                    Name=feed['name'],
                    Addresses=ip_addresses,
                    LockToken=current_ipset['LockToken']
                )
                
                print(f"Updated {feed['name']} with {len(ip_addresses)} IP addresses")
                
        except Exception as e:
            print(f"Failed to update {feed['name']}: {e}")

WAF Monitoring and Analytics

Comprehensive Monitoring Setup

def setup_waf_monitoring():
    """Setup comprehensive WAF monitoring and alerting"""
    
    cloudwatch = boto3.client('cloudwatch')
    
    # Create CloudWatch alarms for WAF metrics
    alarms = [
        {
            'AlarmName': 'WAF-HighBlockedRequests',
            'ComparisonOperator': 'GreaterThanThreshold',
            'EvaluationPeriods': 2,
            'MetricName': 'BlockedRequests',
            'Namespace': 'AWS/WAFV2',
            'Period': 300,
            'Statistic': 'Sum',
            'Threshold': 1000.0,
            'ActionsEnabled': True,
            'AlarmActions': [
                'arn:aws:sns:us-east-1:123456789012:waf-security-alerts'
            ],
            'AlarmDescription': 'High number of blocked requests detected',
            'Dimensions': [
                {
                    'Name': 'WebACL',
                    'Value': 'ComprehensiveWebACL'
                }
            ]
        },
        {
            'AlarmName': 'WAF-SQLInjectionAttempts',
            'ComparisonOperator': 'GreaterThanThreshold',
            'EvaluationPeriods': 1,
            'MetricName': 'AdvancedSQLInjectionMetric',
            'Namespace': 'AWS/WAFV2',
            'Period': 300,
            'Statistic': 'Sum',
            'Threshold': 10.0,
            'ActionsEnabled': True,
            'AlarmActions': [
                'arn:aws:sns:us-east-1:123456789012:waf-security-alerts'
            ],
            'AlarmDescription': 'SQL injection attempts detected'
        },
        {
            'AlarmName': 'WAF-RateLimitTriggered',
            'ComparisonOperator': 'GreaterThanThreshold',
            'EvaluationPeriods': 1,
            'MetricName': 'RateLimitingMetric',
            'Namespace': 'AWS/WAFV2',
            'Period': 300,
            'Statistic': 'Sum',
            'Threshold': 50.0,
            'ActionsEnabled': True,
            'AlarmActions': [
                'arn:aws:sns:us-east-1:123456789012:waf-security-alerts'
            ],
            'AlarmDescription': 'Rate limiting frequently triggered'
        }
    ]
    
    created_alarms = []
    for alarm_config in alarms:
        try:
            cloudwatch.put_metric_alarm(**alarm_config)
            created_alarms.append(alarm_config['AlarmName'])
        except Exception as e:
            print(f"Failed to create alarm {alarm_config['AlarmName']}: {e}")
    
    return created_alarms

def create_waf_dashboard():
    """Create comprehensive WAF monitoring dashboard"""
    
    cloudwatch = boto3.client('cloudwatch')
    
    dashboard_body = {
        "widgets": [
            {
                "type": "metric",
                "properties": {
                    "metrics": [
                        ["AWS/WAFV2", "AllowedRequests", "WebACL", "ComprehensiveWebACL"],
                        [".", "BlockedRequests", ".", "."],
                        [".", "CountedRequests", ".", "."]
                    ],
                    "period": 300,
                    "stat": "Sum",
                    "region": "us-east-1",
                    "title": "WAF Request Overview"
                }
            },
            {
                "type": "metric",
                "properties": {
                    "metrics": [
                        ["AWS/WAFV2", "AdvancedSQLInjectionMetric"],
                        [".", "AdvancedXSSMetric"],
                        [".", "RateLimitingMetric"],
                        [".", "BotControlMetric"]
                    ],
                    "period": 300,
                    "stat": "Sum",
                    "region": "us-east-1",
                    "title": "Security Rule Triggers"
                }
            },
            {
                "type": "log",
                "properties": {
                    "query": "SOURCE '/aws/wafv2/webacl'\n| fields @timestamp, httpRequest.clientIp, action, terminatingRuleId\n| filter action = \"BLOCK\"\n| stats count() by httpRequest.clientIp\n| sort count desc\n| limit 20",
                    "region": "us-east-1",
                    "title": "Top Blocked IPs",
                    "view": "table"
                }
            },
            {
                "type": "log",
                "properties": {
                    "query": "SOURCE '/aws/wafv2/webacl'\n| fields @timestamp, httpRequest.uri, terminatingRuleId, action\n| filter action = \"BLOCK\"\n| stats count() by httpRequest.uri\n| sort count desc\n| limit 20",
                    "region": "us-east-1",
                    "title": "Most Attacked Endpoints",
                    "view": "table"
                }
            }
        ]
    }
    
    response = cloudwatch.put_dashboard(
        DashboardName='WAF-Security-Dashboard',
        DashboardBody=json.dumps(dashboard_body)
    )
    
    return response

Advanced Log Analysis

def analyze_waf_logs():
    """Analyze WAF logs for security insights"""
    
    # CloudWatch Insights queries for security analysis
    security_queries = {
        'attack_patterns': """
            SOURCE '/aws/wafv2/webacl'
            | fields @timestamp, httpRequest.clientIp, httpRequest.uri, terminatingRuleId, action
            | filter action = "BLOCK"
            | stats count() by terminatingRuleId
            | sort count desc
        """,
        
        'geographic_threats': """
            SOURCE '/aws/wafv2/webacl'
            | fields @timestamp, httpRequest.clientIp, httpRequest.country, action
            | filter action = "BLOCK"
            | stats count() by httpRequest.country
            | sort count desc
        """,
        
        'user_agent_analysis': """
            SOURCE '/aws/wafv2/webacl'
            | fields @timestamp, httpRequest.headers.user-agent, action
            | filter action = "BLOCK"
            | stats count() by httpRequest.headers.user-agent
            | sort count desc
            | limit 50
        """,
        
        'rate_limit_analysis': """
            SOURCE '/aws/wafv2/webacl'
            | fields @timestamp, httpRequest.clientIp, terminatingRuleId
            | filter terminatingRuleId = "RateLimitingRule"
            | stats count() by httpRequest.clientIp
            | sort count desc
        """,
        
        'payload_analysis': """
            SOURCE '/aws/wafv2/webacl'
            | fields @timestamp, httpRequest.uri, httpRequest.args, terminatingRuleId
            | filter terminatingRuleId like /SQL/ or terminatingRuleId like /XSS/
            | limit 100
        """
    }
    
    logs_client = boto3.client('logs')
    
    analysis_results = {}
    
    for query_name, query in security_queries.items():
        try:
            # Start query
            response = logs_client.start_query(
                logGroupName='/aws/wafv2/webacl',
                startTime=int((datetime.now() - timedelta(hours=24)).timestamp()),
                endTime=int(datetime.now().timestamp()),
                queryString=query
            )
            
            query_id = response['queryId']
            
            # Wait for query to complete
            import time
            while True:
                result = logs_client.get_query_results(queryId=query_id)
                if result['status'] == 'Complete':
                    analysis_results[query_name] = result['results']
                    break
                elif result['status'] == 'Failed':
                    print(f"Query {query_name} failed")
                    break
                time.sleep(2)
                
        except Exception as e:
            print(f"Failed to execute query {query_name}: {e}")
    
    return analysis_results

def generate_security_report():
    """Generate comprehensive WAF security report"""
    
    wafv2_client = boto3.client('wafv2')
    cloudwatch = boto3.client('cloudwatch')
    
    # Get WAF metrics
    end_time = datetime.now()
    start_time = end_time - timedelta(days=7)
    
    metrics_to_fetch = [
        'AllowedRequests',
        'BlockedRequests', 
        'CountedRequests'
    ]
    
    metric_data = {}
    
    for metric_name in metrics_to_fetch:
        response = cloudwatch.get_metric_statistics(
            Namespace='AWS/WAFV2',
            MetricName=metric_name,
            Dimensions=[
                {
                    'Name': 'WebACL',
                    'Value': 'ComprehensiveWebACL'
                }
            ],
            StartTime=start_time,
            EndTime=end_time,
            Period=3600,  # 1 hour periods
            Statistics=['Sum']
        )
        
        metric_data[metric_name] = response['Datapoints']
    
    # Analyze log data
    log_analysis = analyze_waf_logs()
    
    # Generate report
    report = {
        'report_period': {
            'start': start_time.isoformat(),
            'end': end_time.isoformat()
        },
        'metrics_summary': {
            'total_requests': sum(point['Sum'] for point in metric_data.get('AllowedRequests', [])) + 
                            sum(point['Sum'] for point in metric_data.get('BlockedRequests', [])),
            'blocked_requests': sum(point['Sum'] for point in metric_data.get('BlockedRequests', [])),
            'allowed_requests': sum(point['Sum'] for point in metric_data.get('AllowedRequests', [])),
        },
        'security_analysis': log_analysis,
        'recommendations': generate_security_recommendations(log_analysis)
    }
    
    # Calculate block rate
    total_requests = report['metrics_summary']['total_requests']
    blocked_requests = report['metrics_summary']['blocked_requests']
    
    if total_requests > 0:
        report['metrics_summary']['block_rate'] = (blocked_requests / total_requests) * 100
    else:
        report['metrics_summary']['block_rate'] = 0
    
    return report

def generate_security_recommendations(log_analysis):
    """Generate security recommendations based on log analysis"""
    
    recommendations = []
    
    # Analyze attack patterns
    if 'attack_patterns' in log_analysis:
        top_rules = log_analysis['attack_patterns'][:5]
        for rule_data in top_rules:
            rule_id = rule_data[0]['value'] if rule_data else 'Unknown'
            count = rule_data[1]['value'] if len(rule_data) > 1 else 0
            
            if int(count) > 1000:
                recommendations.append({
                    'type': 'high_volume_attacks',
                    'description': f'Rule {rule_id} triggered {count} times - consider tuning or additional protection',
                    'priority': 'HIGH'
                })
    
    # Analyze geographic threats
    if 'geographic_threats' in log_analysis:
        threat_countries = log_analysis['geographic_threats'][:3]
        for country_data in threat_countries:
            country = country_data[0]['value'] if country_data else 'Unknown'
            count = country_data[1]['value'] if len(country_data) > 1 else 0
            
            if int(count) > 500:
                recommendations.append({
                    'type': 'geographic_blocking',
                    'description': f'Consider blocking traffic from {country} ({count} blocked requests)',
                    'priority': 'MEDIUM'
                })
    
    return recommendations

Conclusion

Advanced AWS WAF implementation requires a multi-layered approach that combines managed rules, custom logic, and continuous monitoring. Key takeaways include:

  • Layer multiple protection mechanisms for comprehensive coverage
  • Implement sophisticated bot management to distinguish between good and bad bots
  • Use geographic and IP reputation controls for additional security
  • Monitor and analyze logs continuously for threat intelligence
  • Automate rule updates based on threat feeds and analysis
  • Create comprehensive dashboards for real-time visibility

Effective WAF management is an ongoing process that requires regular tuning, monitoring, and adaptation to new threats. The patterns shown here provide a foundation for enterprise-grade web application protection that scales with your security needs.

Remember that WAF is just one component of a comprehensive security strategy. Combine it with other AWS security services like Shield, CloudFront, and Security Hub for complete protection against modern web application threats.

Comprehensive Security Visibility with AccessLens

While AWS WAF protects your applications at the network layer, comprehensive security requires visibility into the identity and access management layer that controls who can modify your WAF configurations and access your protected resources.

AccessLens complements your WAF deployment by providing:

  • IAM policy analysis that identifies who can modify your WAF rules and configurations
  • Cross-account access visibility that reveals potential security risks in multi-account deployments
  • Resource policy analysis that ensures your WAF-protected resources maintain proper access controls
  • Compliance monitoring that verifies your security configurations meet regulatory requirements
  • Risk assessment that identifies potential privilege escalation paths that could bypass WAF protections

Your WAF protects against external threats, but AccessLens ensures that internal access controls don't become security vulnerabilities.

Strengthen your security posture with AccessLens and gain the IAM visibility that complements your WAF deployment.

Don't let IAM misconfigurations undermine your application security. Get the comprehensive access analysis you need for defense in depth.