Monitoring and Incident Response

Monitoring and Incident Response

Effective security requires continuous monitoring and rapid incident response. Authentication systems generate valuable security signals that, when properly analyzed, reveal attack patterns and system compromises. Implementing comprehensive logging and alerting enables proactive security rather than reactive breach discovery.

Authentication event logging must balance completeness with privacy. Log successful and failed authentication attempts, password changes, and account modifications. Include metadata like timestamps, IP addresses, and user agents. However, never log passwords, even failed attempts, as these logs could become attack vectors. Hash any sensitive data before logging to maintain audit trails without exposing secrets.

import json
import logging
from datetime import datetime
from collections import defaultdict
import smtplib
from email.mime.text import MIMEText

class AuthenticationMonitor:
    """Real-time monitoring and alerting for authentication events"""
    
    def __init__(self, alert_config):
        self.alert_config = alert_config
        self.threshold_tracker = defaultdict(list)
        
        # Configure secure logging
        self.logger = logging.getLogger('auth_security')
        handler = logging.handlers.RotatingFileHandler(
            'auth_security.log',
            maxBytes=50*1024*1024,  # 50MB
            backupCount=10
        )
        handler.setFormatter(logging.Formatter(
            '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        ))
        self.logger.addHandler(handler)
        self.logger.setLevel(logging.INFO)
    
    def log_auth_event(self, event_type, username, success, 
                      ip_address=None, additional_data=None):
        """Log authentication event securely"""
        
        event = {
            'timestamp': datetime.utcnow().isoformat(),
            'event_type': event_type,
            'username': self._hash_username(username),  # Hash for privacy
            'success': success,
            'ip_address': ip_address,
            'additional_data': additional_data or {}
        }
        
        # Log event
        self.logger.info(json.dumps(event))
        
        # Check for suspicious patterns
        self._check_thresholds(event)
    
    def _hash_username(self, username):
        """Hash username for privacy in logs"""
        
        # Use HMAC with secret key for consistent hashing
        import hmac
        secret = self.alert_config.get('log_secret', 'default-secret').encode()
        return hmac.new(secret, username.encode(), 'sha256').hexdigest()[:16]
    
    def _check_thresholds(self, event):
        """Check for suspicious patterns requiring alerts"""
        
        alerts = []
        
        # Failed login spike detection
        if event['event_type'] == 'login' and not event['success']:
            key = f"failed_login_{event['ip_address']}"
            self.threshold_tracker[key].append(event['timestamp'])
            
            # Clean old entries (last hour)
            cutoff = (datetime.utcnow() - timedelta(hours=1)).isoformat()
            self.threshold_tracker[key] = [
                ts for ts in self.threshold_tracker[key] if ts > cutoff
            ]
            
            # Check threshold
            if len(self.threshold_tracker[key]) > 50:
                alerts.append({
                    'type': 'failed_login_spike',
                    'severity': 'high',
                    'details': f"50+ failed logins from {event['ip_address']} in 1 hour"
                })
        
        # Successful login from new location
        if event['event_type'] == 'login' and event['success']:
            if self._is_new_location(event['username'], event['ip_address']):
                alerts.append({
                    'type': 'new_location_login',
                    'severity': 'medium',
                    'details': f"Login from new IP: {event['ip_address']}"
                })
        
        # Password spray detection
        if event['event_type'] == 'login' and not event['success']:
            self._check_password_spray(event, alerts)
        
        # Send alerts
        for alert in alerts:
            self._send_alert(alert)
    
    def _check_password_spray(self, event, alerts):
        """Detect password spray attacks"""
        
        # Track unique usernames per IP
        key = f"spray_{event['ip_address']}"
        if key not in self.threshold_tracker:
            self.threshold_tracker[key] = set()
            
        self.threshold_tracker[key].add(event['username'])
        
        # Alert if same IP tries many different accounts
        if len(self.threshold_tracker[key]) > 20:
            alerts.append({
                'type': 'password_spray',
                'severity': 'critical',
                'details': f"Possible password spray from {event['ip_address']}"
            })
    
    def _send_alert(self, alert):
        """Send security alert"""
        
        # Log alert
        self.logger.warning(f"SECURITY ALERT: {json.dumps(alert)}")
        
        # Email notification for critical alerts
        if alert['severity'] == 'critical':
            self._send_email_alert(alert)
        
        # Could also send to SIEM, Slack, PagerDuty, etc.
    
    def generate_security_report(self):
        """Generate periodic security report"""
        
        # Analyze logs for patterns
        report = {
            'period': 'last_24_hours',
            'summary': {
                'total_logins': 0,
                'failed_logins': 0,
                'unique_ips': set(),
                'locked_accounts': 0,
                'password_resets': 0
            },
            'top_failed_usernames': defaultdict(int),
            'top_failed_ips': defaultdict(int)
        }
        
        # Process logs (simplified for example)
        # In production, use log aggregation tools
        
        return report