Monitoring and Incident Response
Monitoring and Incident Response
Effective security requires continuous monitoring and rapid incident response. Authentication systems generate valuable security signals that, when properly analyzed, reveal attack patterns and system compromises. Implementing comprehensive logging and alerting enables proactive security rather than reactive breach discovery.
Authentication event logging must balance completeness with privacy. Log successful and failed authentication attempts, password changes, and account modifications. Include metadata like timestamps, IP addresses, and user agents. However, never log passwords, even failed attempts, as these logs could become attack vectors. Hash any sensitive data before logging to maintain audit trails without exposing secrets.
import json
import logging
from datetime import datetime
from collections import defaultdict
import smtplib
from email.mime.text import MIMEText
class AuthenticationMonitor:
"""Real-time monitoring and alerting for authentication events"""
def __init__(self, alert_config):
self.alert_config = alert_config
self.threshold_tracker = defaultdict(list)
# Configure secure logging
self.logger = logging.getLogger('auth_security')
handler = logging.handlers.RotatingFileHandler(
'auth_security.log',
maxBytes=50*1024*1024, # 50MB
backupCount=10
)
handler.setFormatter(logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
))
self.logger.addHandler(handler)
self.logger.setLevel(logging.INFO)
def log_auth_event(self, event_type, username, success,
ip_address=None, additional_data=None):
"""Log authentication event securely"""
event = {
'timestamp': datetime.utcnow().isoformat(),
'event_type': event_type,
'username': self._hash_username(username), # Hash for privacy
'success': success,
'ip_address': ip_address,
'additional_data': additional_data or {}
}
# Log event
self.logger.info(json.dumps(event))
# Check for suspicious patterns
self._check_thresholds(event)
def _hash_username(self, username):
"""Hash username for privacy in logs"""
# Use HMAC with secret key for consistent hashing
import hmac
secret = self.alert_config.get('log_secret', 'default-secret').encode()
return hmac.new(secret, username.encode(), 'sha256').hexdigest()[:16]
def _check_thresholds(self, event):
"""Check for suspicious patterns requiring alerts"""
alerts = []
# Failed login spike detection
if event['event_type'] == 'login' and not event['success']:
key = f"failed_login_{event['ip_address']}"
self.threshold_tracker[key].append(event['timestamp'])
# Clean old entries (last hour)
cutoff = (datetime.utcnow() - timedelta(hours=1)).isoformat()
self.threshold_tracker[key] = [
ts for ts in self.threshold_tracker[key] if ts > cutoff
]
# Check threshold
if len(self.threshold_tracker[key]) > 50:
alerts.append({
'type': 'failed_login_spike',
'severity': 'high',
'details': f"50+ failed logins from {event['ip_address']} in 1 hour"
})
# Successful login from new location
if event['event_type'] == 'login' and event['success']:
if self._is_new_location(event['username'], event['ip_address']):
alerts.append({
'type': 'new_location_login',
'severity': 'medium',
'details': f"Login from new IP: {event['ip_address']}"
})
# Password spray detection
if event['event_type'] == 'login' and not event['success']:
self._check_password_spray(event, alerts)
# Send alerts
for alert in alerts:
self._send_alert(alert)
def _check_password_spray(self, event, alerts):
"""Detect password spray attacks"""
# Track unique usernames per IP
key = f"spray_{event['ip_address']}"
if key not in self.threshold_tracker:
self.threshold_tracker[key] = set()
self.threshold_tracker[key].add(event['username'])
# Alert if same IP tries many different accounts
if len(self.threshold_tracker[key]) > 20:
alerts.append({
'type': 'password_spray',
'severity': 'critical',
'details': f"Possible password spray from {event['ip_address']}"
})
def _send_alert(self, alert):
"""Send security alert"""
# Log alert
self.logger.warning(f"SECURITY ALERT: {json.dumps(alert)}")
# Email notification for critical alerts
if alert['severity'] == 'critical':
self._send_email_alert(alert)
# Could also send to SIEM, Slack, PagerDuty, etc.
def generate_security_report(self):
"""Generate periodic security report"""
# Analyze logs for patterns
report = {
'period': 'last_24_hours',
'summary': {
'total_logins': 0,
'failed_logins': 0,
'unique_ips': set(),
'locked_accounts': 0,
'password_resets': 0
},
'top_failed_usernames': defaultdict(int),
'top_failed_ips': defaultdict(int)
}
# Process logs (simplified for example)
# In production, use log aggregation tools
return report