AWS Secrets Manager: Advanced Patterns for Enterprise Secret Management
AWS Secrets Manager goes beyond simple password storage to provide a comprehensive secret management platform. This guide explores advanced patterns for implementing enterprise-grade secret management that scales with your organization's security requirements.
Secrets Manager Architecture Patterns
Hierarchical Secret Organization
import boto3
import json
class EnterpriseSecretsManager:
def __init__(self):
self.secrets_client = boto3.client('secretsmanager')
def create_hierarchical_secrets(self):
"""Create secrets with hierarchical naming convention"""
secret_hierarchy = {
'production': {
'databases': {
'primary-db': {
'username': 'prod_db_user',
'password': 'secure_password_123',
'engine': 'mysql',
'host': 'prod-db.cluster-xyz.us-east-1.rds.amazonaws.com',
'port': 3306,
'dbname': 'production'
},
'analytics-db': {
'username': 'analytics_user',
'password': 'analytics_pass_456',
'engine': 'postgresql',
'host': 'analytics-db.cluster-abc.us-east-1.rds.amazonaws.com',
'port': 5432,
'dbname': 'analytics'
}
},
'apis': {
'payment-gateway': {
'api_key': 'pk_live_abcdef123456',
'secret_key': 'sk_live_fedcba654321',
'webhook_secret': 'whsec_xyz789'
},
'email-service': {
'api_key': 'SG.abcdef123456.xyz789',
'sender_email': '[email protected]'
}
}
},
'staging': {
'databases': {
'staging-db': {
'username': 'staging_user',
'password': 'staging_pass_789',
'engine': 'mysql',
'host': 'staging-db.cluster-def.us-east-1.rds.amazonaws.com',
'port': 3306,
'dbname': 'staging'
}
}
}
}
created_secrets = []
for environment, categories in secret_hierarchy.items():
for category, secrets in categories.items():
for secret_name, secret_value in secrets.items():
full_secret_name = f"{environment}/{category}/{secret_name}"
try:
response = self.secrets_client.create_secret(
Name=full_secret_name,
Description=f"{secret_name} for {environment} {category}",
SecretString=json.dumps(secret_value),
Tags=[
{'Key': 'Environment', 'Value': environment},
{'Key': 'Category', 'Value': category},
{'Key': 'ManagedBy', 'Value': 'SecretsManager'},
{'Key': 'AutoRotation', 'Value': 'enabled' if 'password' in secret_value else 'disabled'}
]
)
created_secrets.append(response['ARN'])
except Exception as e:
print(f"Failed to create secret {full_secret_name}: {e}")
return created_secrets
Cross-Account Secret Sharing
def setup_cross_account_secret_sharing():
"""Configure cross-account access to secrets"""
# Resource policy for cross-account access
cross_account_policy = {
"Version": "2012-10-17",
"Statement": [
{
"Sid": "AllowCrossAccountAccess",
"Effect": "Allow",
"Principal": {
"AWS": [
"arn:aws:iam::987654321098:role/ApplicationRole",
"arn:aws:iam::987654321098:role/DeploymentRole"
]
},
"Action": [
"secretsmanager:GetSecretValue",
"secretsmanager:DescribeSecret"
],
"Resource": "*",
"Condition": {
"StringEquals": {
"secretsmanager:ResourceTag/Environment": "shared"
}
}
},
{
"Sid": "AllowRotationFunction",
"Effect": "Allow",
"Principal": {
"AWS": "arn:aws:iam::123456789012:role/SecretsManagerRotationRole"
},
"Action": [
"secretsmanager:RotateSecret",
"secretsmanager:GetSecretValue",
"secretsmanager:PutSecretValue",
"secretsmanager:UpdateSecretVersionStage"
],
"Resource": "*"
}
]
}
secrets_client = boto3.client('secretsmanager')
# Apply policy to shared secrets
shared_secrets = secrets_client.list_secrets(
Filters=[
{
'Key': 'tag-key',
'Values': ['Environment']
},
{
'Key': 'tag-value',
'Values': ['shared']
}
]
)
for secret in shared_secrets['SecretList']:
secrets_client.put_resource_policy(
SecretId=secret['ARN'],
ResourcePolicy=json.dumps(cross_account_policy)
)
return cross_account_policy
Automatic Secret Rotation
Database Secret Rotation
import boto3
import json
import pymysql
from datetime import datetime, timedelta
def create_database_rotation_function():
"""Create Lambda function for database secret rotation"""
lambda_code = '''
import json
import boto3
import pymysql
import logging
from botocore.exceptions import ClientError
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def lambda_handler(event, context):
"""Handle secret rotation for RDS MySQL"""
secret_arn = event['Step1']['SecretId']
token = event['Step1']['ClientRequestToken']
step = event['Step1']['Step']
secrets_client = boto3.client('secretsmanager')
try:
if step == "createSecret":
create_secret(secrets_client, secret_arn, token)
elif step == "setSecret":
set_secret(secrets_client, secret_arn, token)
elif step == "testSecret":
test_secret(secrets_client, secret_arn, token)
elif step == "finishSecret":
finish_secret(secrets_client, secret_arn, token)
else:
raise ValueError(f"Invalid step parameter: {step}")
return {"statusCode": 200}
except Exception as e:
logger.error(f"Rotation failed: {str(e)}")
raise e
def create_secret(secrets_client, secret_arn, token):
"""Create new secret version with new password"""
# Get current secret
current_secret = secrets_client.get_secret_value(
SecretId=secret_arn,
VersionStage="AWSCURRENT"
)
secret_dict = json.loads(current_secret['SecretString'])
# Generate new password
new_password = secrets_client.get_random_password(
PasswordLength=32,
ExcludeCharacters='"@/\\'
)['RandomPassword']
# Create new secret version
secret_dict['password'] = new_password
secrets_client.put_secret_value(
SecretId=secret_arn,
ClientRequestToken=token,
SecretString=json.dumps(secret_dict),
VersionStages=['AWSPENDING']
)
def set_secret(secrets_client, secret_arn, token):
"""Set the new password in the database"""
# Get pending secret
pending_secret = secrets_client.get_secret_value(
SecretId=secret_arn,
VersionId=token,
VersionStage="AWSPENDING"
)
# Get current secret for connection
current_secret = secrets_client.get_secret_value(
SecretId=secret_arn,
VersionStage="AWSCURRENT"
)
pending_dict = json.loads(pending_secret['SecretString'])
current_dict = json.loads(current_secret['SecretString'])
# Connect to database with current credentials
connection = pymysql.connect(
host=current_dict['host'],
user=current_dict['username'],
password=current_dict['password'],
database=current_dict['dbname'],
port=current_dict['port']
)
try:
with connection.cursor() as cursor:
# Update password in database
cursor.execute(
f"ALTER USER '{current_dict['username']}'@'%' IDENTIFIED BY '{pending_dict['password']}'"
)
connection.commit()
finally:
connection.close()
def test_secret(secrets_client, secret_arn, token):
"""Test the new secret by connecting to database"""
pending_secret = secrets_client.get_secret_value(
SecretId=secret_arn,
VersionId=token,
VersionStage="AWSPENDING"
)
secret_dict = json.loads(pending_secret['SecretString'])
# Test connection with new credentials
connection = pymysql.connect(
host=secret_dict['host'],
user=secret_dict['username'],
password=secret_dict['password'],
database=secret_dict['dbname'],
port=secret_dict['port']
)
try:
with connection.cursor() as cursor:
cursor.execute("SELECT 1")
result = cursor.fetchone()
if result[0] != 1:
raise Exception("Database connection test failed")
finally:
connection.close()
def finish_secret(secrets_client, secret_arn, token):
"""Finalize the rotation by updating version stages"""
secrets_client.update_secret_version_stage(
SecretId=secret_arn,
VersionStage="AWSCURRENT",
ClientRequestToken=token
)
secrets_client.update_secret_version_stage(
SecretId=secret_arn,
VersionStage="AWSPENDING",
ClientRequestToken=token,
RemoveFromVersionId=token
)
'''
return lambda_code
def setup_automatic_rotation():
"""Setup automatic rotation for database secrets"""
secrets_client = boto3.client('secretsmanager')
lambda_client = boto3.client('lambda')
# Create rotation Lambda function
rotation_function = lambda_client.create_function(
FunctionName='rds-mysql-rotation',
Runtime='python3.9',
Role='arn:aws:iam::123456789012:role/SecretsManagerRotationRole',
Handler='lambda_function.lambda_handler',
Code={'ZipFile': create_database_rotation_function().encode()},
Description='Rotate RDS MySQL secrets',
Timeout=60,
Environment={
'Variables': {
'SECRETS_MANAGER_ENDPOINT': f'https://secretsmanager.us-east-1.amazonaws.com'
}
}
)
# Configure rotation for database secrets
database_secrets = secrets_client.list_secrets(
Filters=[
{
'Key': 'tag-key',
'Values': ['Category']
},
{
'Key': 'tag-value',
'Values': ['databases']
}
]
)
for secret in database_secrets['SecretList']:
secrets_client.rotate_secret(
SecretId=secret['ARN'],
RotationLambdaARN=rotation_function['FunctionArn'],
RotationRules={
'AutomaticallyAfterDays': 30
}
)
return rotation_function
API Key Rotation
def create_api_key_rotation():
"""Create rotation function for API keys"""
api_rotation_code = '''
import json
import boto3
import requests
import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def lambda_handler(event, context):
"""Handle API key rotation"""
secret_arn = event['Step1']['SecretId']
token = event['Step1']['ClientRequestToken']
step = event['Step1']['Step']
secrets_client = boto3.client('secretsmanager')
if step == "createSecret":
create_api_key(secrets_client, secret_arn, token)
elif step == "setSecret":
activate_api_key(secrets_client, secret_arn, token)
elif step == "testSecret":
test_api_key(secrets_client, secret_arn, token)
elif step == "finishSecret":
deactivate_old_key(secrets_client, secret_arn, token)
return {"statusCode": 200}
def create_api_key(secrets_client, secret_arn, token):
"""Create new API key via provider API"""
current_secret = secrets_client.get_secret_value(
SecretId=secret_arn,
VersionStage="AWSCURRENT"
)
secret_dict = json.loads(current_secret['SecretString'])
# Call API provider to create new key
response = requests.post(
f"{secret_dict['api_base_url']}/keys",
headers={
'Authorization': f"Bearer {secret_dict['api_key']}",
'Content-Type': 'application/json'
},
json={
'name': f"rotated-key-{token[:8]}",
'permissions': secret_dict.get('permissions', [])
}
)
if response.status_code == 201:
new_key_data = response.json()
secret_dict['new_api_key'] = new_key_data['key']
secret_dict['new_key_id'] = new_key_data['id']
secrets_client.put_secret_value(
SecretId=secret_arn,
ClientRequestToken=token,
SecretString=json.dumps(secret_dict),
VersionStages=['AWSPENDING']
)
else:
raise Exception(f"Failed to create new API key: {response.text}")
def test_api_key(secrets_client, secret_arn, token):
"""Test new API key"""
pending_secret = secrets_client.get_secret_value(
SecretId=secret_arn,
VersionId=token,
VersionStage="AWSPENDING"
)
secret_dict = json.loads(pending_secret['SecretString'])
# Test new API key
response = requests.get(
f"{secret_dict['api_base_url']}/test",
headers={
'Authorization': f"Bearer {secret_dict['new_api_key']}"
}
)
if response.status_code != 200:
raise Exception(f"New API key test failed: {response.text}")
'''
return api_rotation_code
Secret Access Patterns
Application Integration
class SecureApplicationConfig:
def __init__(self, region='us-east-1'):
self.secrets_client = boto3.client('secretsmanager', region_name=region)
self.cache = {}
self.cache_ttl = 300 # 5 minutes
def get_secret(self, secret_name, use_cache=True):
"""Get secret with caching and error handling"""
cache_key = f"{secret_name}_{int(time.time() // self.cache_ttl)}"
if use_cache and cache_key in self.cache:
return self.cache[cache_key]
try:
response = self.secrets_client.get_secret_value(SecretId=secret_name)
secret_value = json.loads(response['SecretString'])
if use_cache:
self.cache[cache_key] = secret_value
return secret_value
except ClientError as e:
error_code = e.response['Error']['Code']
if error_code == 'DecryptionFailureException':
raise e
elif error_code == 'InternalServiceErrorException':
raise e
elif error_code == 'InvalidParameterException':
raise e
elif error_code == 'InvalidRequestException':
raise e
elif error_code == 'ResourceNotFoundException':
raise e
else:
raise e
def get_database_config(self, environment, db_name):
"""Get database configuration"""
secret_name = f"{environment}/databases/{db_name}"
return self.get_secret(secret_name)
def get_api_credentials(self, environment, service_name):
"""Get API credentials"""
secret_name = f"{environment}/apis/{service_name}"
return self.get_secret(secret_name)
def refresh_cache(self):
"""Clear cache to force refresh"""
self.cache.clear()
# Usage in application
def database_connection_example():
"""Example of using secrets in database connection"""
config = SecureApplicationConfig()
try:
db_config = config.get_database_config('production', 'primary-db')
connection = pymysql.connect(
host=db_config['host'],
user=db_config['username'],
password=db_config['password'],
database=db_config['dbname'],
port=db_config['port'],
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor
)
return connection
except Exception as e:
logger.error(f"Database connection failed: {e}")
raise
Container Integration
def create_ecs_task_definition_with_secrets():
"""Create ECS task definition with Secrets Manager integration"""
task_definition = {
"family": "secure-app",
"networkMode": "awsvpc",
"requiresCompatibilities": ["FARGATE"],
"cpu": "256",
"memory": "512",
"executionRoleArn": "arn:aws:iam::123456789012:role/ecsTaskExecutionRole",
"taskRoleArn": "arn:aws:iam::123456789012:role/ecsTaskRole",
"containerDefinitions": [
{
"name": "app-container",
"image": "myapp:latest",
"essential": True,
"portMappings": [
{
"containerPort": 8080,
"protocol": "tcp"
}
],
"environment": [
{
"name": "APP_ENV",
"value": "production"
}
],
"secrets": [
{
"name": "DB_PASSWORD",
"valueFrom": "arn:aws:secretsmanager:us-east-1:123456789012:secret:production/databases/primary-db:password::"
},
{
"name": "DB_USERNAME",
"valueFrom": "arn:aws:secretsmanager:us-east-1:123456789012:secret:production/databases/primary-db:username::"
},
{
"name": "API_KEY",
"valueFrom": "arn:aws:secretsmanager:us-east-1:123456789012:secret:production/apis/payment-gateway:api_key::"
}
],
"logConfiguration": {
"logDriver": "awslogs",
"options": {
"awslogs-group": "/ecs/secure-app",
"awslogs-region": "us-east-1",
"awslogs-stream-prefix": "ecs"
}
}
}
]
}
ecs = boto3.client('ecs')
response = ecs.register_task_definition(**task_definition)
return response
Security and Compliance
Audit and Monitoring
def setup_secrets_monitoring():
"""Setup comprehensive monitoring for Secrets Manager"""
cloudwatch = boto3.client('cloudwatch')
events = boto3.client('events')
# CloudWatch alarms for secret access
alarms = [
{
'AlarmName': 'SecretsManager-UnauthorizedAccess',
'ComparisonOperator': 'GreaterThanThreshold',
'EvaluationPeriods': 1,
'MetricName': 'GetSecretValue',
'Namespace': 'AWS/SecretsManager',
'Period': 300,
'Statistic': 'Sum',
'Threshold': 100.0,
'ActionsEnabled': True,
'AlarmActions': [
'arn:aws:sns:us-east-1:123456789012:security-alerts'
],
'AlarmDescription': 'High volume of secret access attempts'
}
]
for alarm_config in alarms:
cloudwatch.put_metric_alarm(**alarm_config)
# EventBridge rules for secret operations
critical_events = [
'CreateSecret',
'DeleteSecret',
'PutSecretValue',
'UpdateSecret',
'RestoreSecret'
]
for event_name in critical_events:
rule_name = f'SecretsManager-{event_name}'
events.put_rule(
Name=rule_name,
EventPattern=json.dumps({
'source': ['aws.secretsmanager'],
'detail-type': ['AWS API Call via CloudTrail'],
'detail': {
'eventSource': ['secretsmanager.amazonaws.com'],
'eventName': [event_name]
}
}),
State='ENABLED'
)
events.put_targets(
Rule=rule_name,
Targets=[
{
'Id': '1',
'Arn': 'arn:aws:lambda:us-east-1:123456789012:function:secrets-audit-processor'
}
]
)
def audit_secret_access():
"""Audit secret access patterns"""
cloudtrail = boto3.client('cloudtrail')
# Query for secret access events
events = cloudtrail.lookup_events(
LookupAttributes=[
{
'AttributeKey': 'EventSource',
'AttributeValue': 'secretsmanager.amazonaws.com'
}
],
StartTime=datetime.now() - timedelta(days=7),
EndTime=datetime.now()
)
access_patterns = {}
for event in events['Events']:
user_identity = event.get('UserIdentity', {})
event_name = event['EventName']
user_key = user_identity.get('userName', user_identity.get('type', 'Unknown'))
if user_key not in access_patterns:
access_patterns[user_key] = {
'total_events': 0,
'event_types': {},
'secrets_accessed': set()
}
access_patterns[user_key]['total_events'] += 1
if event_name not in access_patterns[user_key]['event_types']:
access_patterns[user_key]['event_types'][event_name] = 0
access_patterns[user_key]['event_types'][event_name] += 1
# Extract secret name from event
if 'requestParameters' in event:
secret_id = event['requestParameters'].get('secretId')
if secret_id:
access_patterns[user_key]['secrets_accessed'].add(secret_id)
# Convert sets to lists for JSON serialization
for user_data in access_patterns.values():
user_data['secrets_accessed'] = list(user_data['secrets_accessed'])
return access_patterns
Compliance Reporting
def generate_compliance_report():
"""Generate compliance report for secrets management"""
secrets_client = boto3.client('secretsmanager')
# Get all secrets
all_secrets = []
paginator = secrets_client.get_paginator('list_secrets')
for page in paginator.paginate():
all_secrets.extend(page['SecretList'])
compliance_report = {
'total_secrets': len(all_secrets),
'rotation_enabled': 0,
'rotation_disabled': 0,
'encrypted_secrets': 0,
'cross_account_secrets': 0,
'secrets_by_environment': {},
'secrets_without_tags': [],
'rotation_overdue': []
}
for secret in all_secrets:
# Check rotation status
if secret.get('RotationEnabled'):
compliance_report['rotation_enabled'] += 1
# Check if rotation is overdue
last_rotated = secret.get('LastRotatedDate')
if last_rotated:
days_since_rotation = (datetime.now(last_rotated.tzinfo) - last_rotated).days
if days_since_rotation > 35: # Assuming 30-day rotation policy
compliance_report['rotation_overdue'].append({
'name': secret['Name'],
'days_overdue': days_since_rotation - 30
})
else:
compliance_report['rotation_disabled'] += 1
# Check encryption
if secret.get('KmsKeyId'):
compliance_report['encrypted_secrets'] += 1
# Check tags
tags = secret.get('Tags', [])
if not tags:
compliance_report['secrets_without_tags'].append(secret['Name'])
# Group by environment
env_tag = next((tag['Value'] for tag in tags if tag['Key'] == 'Environment'), 'untagged')
if env_tag not in compliance_report['secrets_by_environment']:
compliance_report['secrets_by_environment'][env_tag] = 0
compliance_report['secrets_by_environment'][env_tag] += 1
return compliance_report
Cost Optimization
Secret Lifecycle Management
def implement_secret_lifecycle_management():
"""Implement lifecycle management for cost optimization"""
secrets_client = boto3.client('secretsmanager')
# Find unused secrets
unused_secrets = []
all_secrets = secrets_client.list_secrets()['SecretList']
for secret in all_secrets:
# Check last accessed time (would need CloudTrail analysis)
last_accessed = get_secret_last_accessed_time(secret['ARN'])
if last_accessed and (datetime.now() - last_accessed).days > 90:
unused_secrets.append({
'name': secret['Name'],
'arn': secret['ARN'],
'last_accessed': last_accessed,
'days_unused': (datetime.now() - last_accessed).days
})
# Schedule deletion for unused secrets (with confirmation)
for secret in unused_secrets:
if secret['days_unused'] > 180: # 6 months unused
print(f"Scheduling deletion for unused secret: {secret['name']}")
# Uncomment to actually delete
# secrets_client.delete_secret(
# SecretId=secret['arn'],
# RecoveryWindowInDays=30
# )
return unused_secrets
def optimize_secret_storage():
"""Optimize secret storage for cost efficiency"""
optimization_strategies = {
'consolidate_similar_secrets': {
'description': 'Combine related secrets into single JSON objects',
'example': 'Store all database credentials in one secret'
},
'use_parameter_store_for_non_sensitive': {
'description': 'Use Systems Manager Parameter Store for non-sensitive config',
'cost_savings': 'Parameter Store is free for standard parameters'
},
'implement_secret_sharing': {
'description': 'Share secrets across accounts instead of duplicating',
'implementation': 'Use resource policies for cross-account access'
}
}
return optimization_strategies
Conclusion
AWS Secrets Manager provides enterprise-grade secret management capabilities that go far beyond simple password storage. Key takeaways include:
- Implement hierarchical organization for scalable secret management
- Enable automatic rotation for database credentials and API keys
- Use proper caching strategies to optimize performance and costs
- Integrate with container platforms for secure application deployment
- Monitor and audit secret access for security and compliance
- Implement lifecycle management to optimize costs
Effective secrets management is crucial for maintaining security at scale. The patterns shown here provide a foundation for implementing enterprise-grade secret management that balances security, operational efficiency, and cost optimization.
Remember that secrets management is not just about storage - it's about the entire lifecycle of sensitive data from creation to rotation to retirement. A comprehensive approach ensures your secrets remain secure throughout their entire lifecycle.
Securing Secrets Access with AccessLens
While Secrets Manager provides robust secret storage and rotation capabilities, the security of your secrets ultimately depends on who has access to them. Overpermissive IAM policies and misconfigured resource policies can expose your most sensitive data to unauthorized access.
AccessLens helps secure your Secrets Manager implementation by providing:
- Secret access policy analysis that identifies overpermissive permissions
- Cross-account secret sharing visibility that reveals potential security risks
- IAM permission analysis that shows who can access your secrets
- Compliance monitoring that ensures secret access aligns with your security policies
- Risk assessment that identifies potential privilege escalation paths through secrets
Your secrets are only as secure as your access controls. AccessLens ensures that your Secrets Manager deployment maintains strong security while enabling the business functionality you need.
Secure your secrets management with AccessLens and gain the visibility you need to maintain strong secret access controls.
Don't let IAM misconfigurations compromise your secrets security. Get the access visibility and control you need for comprehensive secrets protection.