Security Design trong System Architecture
Tổng Quan
Security Design bao gồm các nguyên tắc, patterns và best practices để xây dựng hệ thống an toàn, bảo vệ khỏi các threats và vulnerabilities.
Security Principles
Defense in Depth
Multiple layers of security:
1. Network Security (Firewalls, VPN)
2. Application Security (Authentication, Authorization)
3. Data Security (Encryption, Access Control)
4. Infrastructure Security (OS hardening, Monitoring)
Zero Trust Architecture
class ZeroTrustGateway:
def authenticate_request(self, request):
# Verify identity
user = self.verify_identity(request.token)
# Check device trust
device_score = self.assess_device_trust(request.device_info)
# Evaluate context
context_score = self.evaluate_context(request.location, request.time)
# Make access decision
trust_score = self.calculate_trust_score(user, device_score, context_score)
return trust_score > self.trust_threshold
Authentication & Authorization
Multi-Factor Authentication
class MFAService:
def __init__(self):
self.totp_generator = TOTPGenerator()
self.sms_service = SMSService()
self.email_service = EmailService()
def initiate_mfa(self, user_id, primary_factor):
if not self.verify_primary_factor(user_id, primary_factor):
raise AuthenticationError("Invalid credentials")
# Generate and send second factor
if user.preferred_mfa == 'totp':
return self.require_totp(user_id)
elif user.preferred_mfa == 'sms':
return self.send_sms_code(user_id)
elif user.preferred_mfa == 'email':
return self.send_email_code(user_id)
Role-Based Access Control (RBAC)
class RBACSystem:
def __init__(self):
self.role_permissions = {
'admin': ['read', 'write', 'delete', 'manage_users'],
'editor': ['read', 'write'],
'viewer': ['read']
}
def check_permission(self, user_id, resource, action):
user_roles = self.get_user_roles(user_id)
for role in user_roles:
if action in self.role_permissions.get(role, []):
return True
return False
def authorize_request(self, user_id, resource_path, http_method):
action = self.map_http_method_to_action(http_method)
return self.check_permission(user_id, resource_path, action)
Data Protection
Encryption at Rest và in Transit
class EncryptionService:
def __init__(self):
self.aes_key = self.load_encryption_key()
self.tls_context = self.setup_tls_context()
def encrypt_sensitive_data(self, data):
# Encrypt using AES-256
iv = os.urandom(16)
cipher = AES.new(self.aes_key, AES.MODE_CBC, iv)
# Pad data to block size
padded_data = self.pad_data(data)
encrypted_data = cipher.encrypt(padded_data)
return base64.b64encode(iv + encrypted_data)
def setup_secure_connection(self):
# Configure TLS 1.3
context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
context.minimum_version = ssl.TLSVersion.TLSv1_3
context.check_hostname = True
context.verify_mode = ssl.CERT_REQUIRED
return context
Data Loss Prevention (DLP)
class DLPSystem:
def __init__(self):
self.sensitive_patterns = {
'ssn': r'\b\d{3}-\d{2}-\d{4}\b',
'credit_card': r'\b\d{4}[- ]?\d{4}[- ]?\d{4}[- ]?\d{4}\b',
'email': r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'
}
def scan_content(self, content):
violations = []
for data_type, pattern in self.sensitive_patterns.items():
matches = re.findall(pattern, content)
if matches:
violations.append({
'type': data_type,
'matches': len(matches),
'severity': self.get_severity(data_type)
})
return violations
def apply_data_masking(self, content):
for data_type, pattern in self.sensitive_patterns.items():
if data_type == 'credit_card':
content = re.sub(pattern, '****-****-****-****', content)
elif data_type == 'ssn':
content = re.sub(pattern, '***-**-****', content)
return content
Network Security
Web Application Firewall (WAF)
class WAFRules:
def __init__(self):
self.sql_injection_patterns = [
r"(\b(SELECT|INSERT|UPDATE|DELETE|DROP|CREATE|ALTER)\b)",
r"(\b(UNION|JOIN)\b.*\b(SELECT)\b)",
r"((\%27)|(\'))\s*(\%6F|o|O)(\%72|r|R)"
]
self.xss_patterns = [
r"<script[^>]*>.*?</script>",
r"javascript:",
r"on\w+\s*="
]
def analyze_request(self, request):
threats = []
# Check for SQL injection
if self.detect_sql_injection(request.body + request.query_string):
threats.append('sql_injection')
# Check for XSS
if self.detect_xss(request.body + request.headers):
threats.append('xss')
# Check rate limiting
if self.check_rate_limit(request.ip):
threats.append('rate_limit_exceeded')
return threats
DDoS Protection
class DDoSProtection:
def __init__(self):
self.rate_limiter = RedisRateLimiter()
self.anomaly_detector = AnomalyDetector()
def analyze_traffic(self, requests_per_second, source_ips):
# Detect traffic anomalies
if requests_per_second > self.normal_threshold * 10:
# Potential DDoS attack
self.trigger_protection_mode()
# Analyze source IP distribution
ip_distribution = self.analyze_ip_distribution(source_ips)
if ip_distribution.gini_coefficient < 0.3:
# Traffic is highly concentrated from few IPs
self.apply_ip_based_filtering(source_ips)
def trigger_protection_mode(self):
# Enable aggressive rate limiting
self.rate_limiter.set_strict_limits()
# Activate CAPTCHA for suspicious requests
self.enable_captcha_challenge()
# Alert security team
self.send_security_alert("DDoS attack detected")
Security Monitoring
Security Information and Event Management (SIEM)
class SIEMSystem:
def __init__(self):
self.log_aggregator = LogAggregator()
self.threat_detector = ThreatDetector()
self.alert_manager = AlertManager()
def process_security_events(self, events):
for event in events:
# Normalize event format
normalized_event = self.normalize_event(event)
# Correlate with other events
correlations = self.correlate_events(normalized_event)
# Detect threats
threats = self.threat_detector.analyze(normalized_event, correlations)
# Generate alerts for high-risk threats
for threat in threats:
if threat.risk_score > self.alert_threshold:
self.alert_manager.create_alert(threat)
Intrusion Detection
class IntrusionDetectionSystem:
def __init__(self):
self.baseline_behavior = self.load_baseline()
self.ml_model = self.load_anomaly_model()
def detect_anomalies(self, user_activity):
# Behavioral analysis
behavior_score = self.analyze_user_behavior(user_activity)
# Machine learning detection
features = self.extract_features(user_activity)
anomaly_score = self.ml_model.predict_anomaly(features)
# Combined scoring
risk_score = (behavior_score + anomaly_score) / 2
if risk_score > self.anomaly_threshold:
return self.create_security_incident(user_activity, risk_score)
return None
Compliance & Governance
GDPR Compliance
class GDPRCompliance:
def handle_data_subject_request(self, request_type, user_id):
if request_type == 'access':
return self.export_user_data(user_id)
elif request_type == 'deletion':
return self.delete_user_data(user_id)
elif request_type == 'rectification':
return self.update_user_data(user_id, request.corrections)
elif request_type == 'portability':
return self.export_portable_data(user_id)
def ensure_data_minimization(self, data_collection):
# Only collect necessary data
essential_fields = self.get_essential_fields()
return {k: v for k, v in data_collection.items() if k in essential_fields}
def implement_privacy_by_design(self, system_design):
# Encrypt PII by default
# Implement data retention policies
# Add consent management
# Enable audit logging
pass
Incident Response
Security Incident Management
class SecurityIncidentResponse:
def __init__(self):
self.incident_queue = PriorityQueue()
self.response_team = SecurityTeam()
def handle_security_incident(self, incident):
# Classify incident severity
severity = self.classify_severity(incident)
# Immediate containment
if severity == 'critical':
self.immediate_containment(incident)
# Notify response team
self.notify_response_team(incident, severity)
# Start investigation
investigation = self.start_investigation(incident)
# Document response
self.document_response(incident, investigation)
def immediate_containment(self, incident):
if incident.type == 'data_breach':
# Isolate affected systems
self.isolate_systems(incident.affected_systems)
# Revoke compromised credentials
self.revoke_credentials(incident.compromised_accounts)
# Block malicious IPs
self.block_ips(incident.malicious_ips)
Security Testing
Penetration Testing
class SecurityTesting:
def automated_vulnerability_scan(self, target_systems):
results = []
for system in target_systems:
# Network vulnerability scan
network_vulns = self.scan_network_vulnerabilities(system)
# Web application scan
web_vulns = self.scan_web_vulnerabilities(system)
# Configuration audit
config_issues = self.audit_security_configuration(system)
results.append({
'system': system,
'network_vulnerabilities': network_vulns,
'web_vulnerabilities': web_vulns,
'configuration_issues': config_issues
})
return results
Next Steps
Nội dung này sẽ được mở rộng thêm với: - Cloud security best practices - Container security patterns - API security frameworks - Blockchain security considerations - AI/ML security challenges