Security Design trong System Architecture

Tổng Quan

Security Design bao gồm các nguyên tắc, patterns và best practices để xây dựng hệ thống an toàn, bảo vệ khỏi các threats và vulnerabilities.

Security Principles

Defense in Depth

Multiple layers of security:
1. Network Security (Firewalls, VPN)
2. Application Security (Authentication, Authorization)
3. Data Security (Encryption, Access Control)
4. Infrastructure Security (OS hardening, Monitoring)

Zero Trust Architecture

class ZeroTrustGateway:
    def authenticate_request(self, request):
        # Verify identity
        user = self.verify_identity(request.token)

        # Check device trust
        device_score = self.assess_device_trust(request.device_info)

        # Evaluate context
        context_score = self.evaluate_context(request.location, request.time)

        # Make access decision
        trust_score = self.calculate_trust_score(user, device_score, context_score)

        return trust_score > self.trust_threshold

Authentication & Authorization

Multi-Factor Authentication

class MFAService:
    def __init__(self):
        self.totp_generator = TOTPGenerator()
        self.sms_service = SMSService()
        self.email_service = EmailService()

    def initiate_mfa(self, user_id, primary_factor):
        if not self.verify_primary_factor(user_id, primary_factor):
            raise AuthenticationError("Invalid credentials")

        # Generate and send second factor
        if user.preferred_mfa == 'totp':
            return self.require_totp(user_id)
        elif user.preferred_mfa == 'sms':
            return self.send_sms_code(user_id)
        elif user.preferred_mfa == 'email':
            return self.send_email_code(user_id)

Role-Based Access Control (RBAC)

class RBACSystem:
    def __init__(self):
        self.role_permissions = {
            'admin': ['read', 'write', 'delete', 'manage_users'],
            'editor': ['read', 'write'],
            'viewer': ['read']
        }

    def check_permission(self, user_id, resource, action):
        user_roles = self.get_user_roles(user_id)

        for role in user_roles:
            if action in self.role_permissions.get(role, []):
                return True

        return False

    def authorize_request(self, user_id, resource_path, http_method):
        action = self.map_http_method_to_action(http_method)
        return self.check_permission(user_id, resource_path, action)

Data Protection

Encryption at Rest và in Transit

class EncryptionService:
    def __init__(self):
        self.aes_key = self.load_encryption_key()
        self.tls_context = self.setup_tls_context()

    def encrypt_sensitive_data(self, data):
        # Encrypt using AES-256
        iv = os.urandom(16)
        cipher = AES.new(self.aes_key, AES.MODE_CBC, iv)

        # Pad data to block size
        padded_data = self.pad_data(data)
        encrypted_data = cipher.encrypt(padded_data)

        return base64.b64encode(iv + encrypted_data)

    def setup_secure_connection(self):
        # Configure TLS 1.3
        context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
        context.minimum_version = ssl.TLSVersion.TLSv1_3
        context.check_hostname = True
        context.verify_mode = ssl.CERT_REQUIRED

        return context

Data Loss Prevention (DLP)

class DLPSystem:
    def __init__(self):
        self.sensitive_patterns = {
            'ssn': r'\b\d{3}-\d{2}-\d{4}\b',
            'credit_card': r'\b\d{4}[- ]?\d{4}[- ]?\d{4}[- ]?\d{4}\b',
            'email': r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'
        }

    def scan_content(self, content):
        violations = []

        for data_type, pattern in self.sensitive_patterns.items():
            matches = re.findall(pattern, content)
            if matches:
                violations.append({
                    'type': data_type,
                    'matches': len(matches),
                    'severity': self.get_severity(data_type)
                })

        return violations

    def apply_data_masking(self, content):
        for data_type, pattern in self.sensitive_patterns.items():
            if data_type == 'credit_card':
                content = re.sub(pattern, '****-****-****-****', content)
            elif data_type == 'ssn':
                content = re.sub(pattern, '***-**-****', content)

        return content

Network Security

Web Application Firewall (WAF)

class WAFRules:
    def __init__(self):
        self.sql_injection_patterns = [
            r"(\b(SELECT|INSERT|UPDATE|DELETE|DROP|CREATE|ALTER)\b)",
            r"(\b(UNION|JOIN)\b.*\b(SELECT)\b)",
            r"((\%27)|(\'))\s*(\%6F|o|O)(\%72|r|R)"
        ]

        self.xss_patterns = [
            r"<script[^>]*>.*?</script>",
            r"javascript:",
            r"on\w+\s*="
        ]

    def analyze_request(self, request):
        threats = []

        # Check for SQL injection
        if self.detect_sql_injection(request.body + request.query_string):
            threats.append('sql_injection')

        # Check for XSS
        if self.detect_xss(request.body + request.headers):
            threats.append('xss')

        # Check rate limiting
        if self.check_rate_limit(request.ip):
            threats.append('rate_limit_exceeded')

        return threats

DDoS Protection

class DDoSProtection:
    def __init__(self):
        self.rate_limiter = RedisRateLimiter()
        self.anomaly_detector = AnomalyDetector()

    def analyze_traffic(self, requests_per_second, source_ips):
        # Detect traffic anomalies
        if requests_per_second > self.normal_threshold * 10:
            # Potential DDoS attack
            self.trigger_protection_mode()

        # Analyze source IP distribution
        ip_distribution = self.analyze_ip_distribution(source_ips)
        if ip_distribution.gini_coefficient < 0.3:
            # Traffic is highly concentrated from few IPs
            self.apply_ip_based_filtering(source_ips)

    def trigger_protection_mode(self):
        # Enable aggressive rate limiting
        self.rate_limiter.set_strict_limits()

        # Activate CAPTCHA for suspicious requests
        self.enable_captcha_challenge()

        # Alert security team
        self.send_security_alert("DDoS attack detected")

Security Monitoring

Security Information and Event Management (SIEM)

class SIEMSystem:
    def __init__(self):
        self.log_aggregator = LogAggregator()
        self.threat_detector = ThreatDetector()
        self.alert_manager = AlertManager()

    def process_security_events(self, events):
        for event in events:
            # Normalize event format
            normalized_event = self.normalize_event(event)

            # Correlate with other events
            correlations = self.correlate_events(normalized_event)

            # Detect threats
            threats = self.threat_detector.analyze(normalized_event, correlations)

            # Generate alerts for high-risk threats
            for threat in threats:
                if threat.risk_score > self.alert_threshold:
                    self.alert_manager.create_alert(threat)

Intrusion Detection

class IntrusionDetectionSystem:
    def __init__(self):
        self.baseline_behavior = self.load_baseline()
        self.ml_model = self.load_anomaly_model()

    def detect_anomalies(self, user_activity):
        # Behavioral analysis
        behavior_score = self.analyze_user_behavior(user_activity)

        # Machine learning detection
        features = self.extract_features(user_activity)
        anomaly_score = self.ml_model.predict_anomaly(features)

        # Combined scoring
        risk_score = (behavior_score + anomaly_score) / 2

        if risk_score > self.anomaly_threshold:
            return self.create_security_incident(user_activity, risk_score)

        return None

Compliance & Governance

GDPR Compliance

class GDPRCompliance:
    def handle_data_subject_request(self, request_type, user_id):
        if request_type == 'access':
            return self.export_user_data(user_id)
        elif request_type == 'deletion':
            return self.delete_user_data(user_id)
        elif request_type == 'rectification':
            return self.update_user_data(user_id, request.corrections)
        elif request_type == 'portability':
            return self.export_portable_data(user_id)

    def ensure_data_minimization(self, data_collection):
        # Only collect necessary data
        essential_fields = self.get_essential_fields()
        return {k: v for k, v in data_collection.items() if k in essential_fields}

    def implement_privacy_by_design(self, system_design):
        # Encrypt PII by default
        # Implement data retention policies
        # Add consent management
        # Enable audit logging
        pass

Incident Response

Security Incident Management

class SecurityIncidentResponse:
    def __init__(self):
        self.incident_queue = PriorityQueue()
        self.response_team = SecurityTeam()

    def handle_security_incident(self, incident):
        # Classify incident severity
        severity = self.classify_severity(incident)

        # Immediate containment
        if severity == 'critical':
            self.immediate_containment(incident)

        # Notify response team
        self.notify_response_team(incident, severity)

        # Start investigation
        investigation = self.start_investigation(incident)

        # Document response
        self.document_response(incident, investigation)

    def immediate_containment(self, incident):
        if incident.type == 'data_breach':
            # Isolate affected systems
            self.isolate_systems(incident.affected_systems)

            # Revoke compromised credentials
            self.revoke_credentials(incident.compromised_accounts)

            # Block malicious IPs
            self.block_ips(incident.malicious_ips)

Security Testing

Penetration Testing

class SecurityTesting:
    def automated_vulnerability_scan(self, target_systems):
        results = []

        for system in target_systems:
            # Network vulnerability scan
            network_vulns = self.scan_network_vulnerabilities(system)

            # Web application scan
            web_vulns = self.scan_web_vulnerabilities(system)

            # Configuration audit
            config_issues = self.audit_security_configuration(system)

            results.append({
                'system': system,
                'network_vulnerabilities': network_vulns,
                'web_vulnerabilities': web_vulns,
                'configuration_issues': config_issues
            })

        return results

Next Steps

Nội dung này sẽ được mở rộng thêm với: - Cloud security best practices - Container security patterns - API security frameworks - Blockchain security considerations - AI/ML security challenges