Serverless Architecture - Function-as-a-Service Design

Tổng quan

Serverless architecture runs code in stateless compute containers managed by cloud providers without managing servers.

Core Concepts

Functions as a Service (FaaS)

# AWS Lambda function
import json

def lambda_handler(event, context):
    # Process order
    order_data = json.loads(event['body'])

    # Validate order
    if not validate_order(order_data):
        return {
            'statusCode': 400,
            'body': json.dumps({'error': 'Invalid order'})
        }

    # Save to database
    order_id = save_order(order_data)

    # Return response
    return {
        'statusCode': 201,
        'body': json.dumps({'order_id': order_id})
    }

Event-Driven Triggers

# AWS SAM template
Resources:
  ProcessOrderFunction:
    Type: AWS::Serverless::Function
    Properties:
      CodeUri: src/
      Handler: order.handler
      Runtime: python3.9
      Events:
        ApiEvent:
          Type: Api
          Properties:
            Path: /orders
            Method: post
        S3Event:
          Type: S3
          Properties:
            Bucket: order-uploads
            Events: s3:ObjectCreated:*

Serverless Patterns

API Gateway + Lambda

# API Gateway integration
def create_user(event, context):
    user_data = json.loads(event['body'])

    # Validate input
    if not user_data.get('email'):
        return error_response(400, 'Email required')

    # Save user
    user_id = dynamodb.put_item(
        TableName='Users',
        Item={
            'id': str(uuid.uuid4()),
            'email': user_data['email'],
            'created_at': datetime.utcnow().isoformat()
        }
    )

    return success_response({'user_id': user_id})

Event Processing

# S3 trigger for image processing
def process_image(event, context):
    for record in event['Records']:
        bucket = record['s3']['bucket']['name']
        key = record['s3']['object']['key']

        # Download image
        image = s3.get_object(Bucket=bucket, Key=key)

        # Resize image
        resized = resize_image(image['Body'].read())

        # Upload processed image
        s3.put_object(
            Bucket=bucket,
            Key=f"processed/{key}",
            Body=resized
        )

Step Functions (Workflow)

{
  "Comment": "Order processing workflow",
  "StartAt": "ValidateOrder",
  "States": {
    "ValidateOrder": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:region:account:function:ValidateOrder",
      "Next": "ProcessPayment"
    },
    "ProcessPayment": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:region:account:function:ProcessPayment",
      "Next": "UpdateInventory",
      "Catch": [{
        "ErrorEquals": ["PaymentFailedException"],
        "Next": "PaymentFailed"
      }]
    },
    "UpdateInventory": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:region:account:function:UpdateInventory",
      "Next": "SendConfirmation"
    },
    "SendConfirmation": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:region:account:function:SendConfirmation",
      "End": true
    },
    "PaymentFailed": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:region:account:function:HandlePaymentFailure",
      "End": true
    }
  }
}

Serverless Databases

DynamoDB Integration

import boto3

dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('Users')

def get_user(event, context):
    user_id = event['pathParameters']['id']

    response = table.get_item(Key={'id': user_id})

    if 'Item' not in response:
        return error_response(404, 'User not found')

    return success_response(response['Item'])

Aurora Serverless

import boto3

rds_data = boto3.client('rds-data')

def query_orders(event, context):
    user_id = event['pathParameters']['user_id']

    response = rds_data.execute_statement(
        resourceArn='arn:aws:rds:region:account:cluster:aurora-cluster',
        secretArn='arn:aws:secretsmanager:region:account:secret:rds-secret',
        database='ecommerce',
        sql='SELECT * FROM orders WHERE user_id = :user_id',
        parameters=[{'name': 'user_id', 'value': {'stringValue': user_id}}]
    )

    return success_response(response['records'])

Cold Start Optimization

Provisioned Concurrency

Resources:
  MyFunction:
    Type: AWS::Serverless::Function
    Properties:
      ProvisionedConcurrencyConfig:
        ProvisionedConcurrencyValue: 10

Connection Pooling

# Global connection pool
import pymongo

client = None

def get_db_client():
    global client
    if client is None:
        client = pymongo.MongoClient(os.environ['MONGO_URI'])
    return client

def lambda_handler(event, context):
    db = get_db_client()
    # Use database connection

Multi-Cloud Serverless

Azure Functions

import azure.functions as func

def main(req: func.HttpRequest) -> func.HttpResponse:
    order_data = req.get_json()

    # Process order
    order_id = process_order(order_data)

    return func.HttpResponse(
        json.dumps({'order_id': order_id}),
        status_code=201,
        mimetype="application/json"
    )

Google Cloud Functions

from flask import Request

def process_order(request: Request):
    order_data = request.get_json()

    # Validate and process
    if not order_data:
        return {'error': 'No data provided'}, 400

    order_id = create_order(order_data)
    return {'order_id': order_id}, 201

Best Practices

Function Design

  • Keep functions small và focused
  • Minimize cold start time
  • Use environment variables for configuration
  • Implement proper error handling
  • Design for idempotency

Security

# JWT token validation
import jwt

def validate_token(event, context):
    token = event['headers'].get('Authorization', '').replace('Bearer ', '')

    try:
        payload = jwt.decode(token, os.environ['JWT_SECRET'], algorithms=['HS256'])
        return {
            'principalId': payload['user_id'],
            'policyDocument': {
                'Version': '2012-10-17',
                'Statement': [{
                    'Action': 'execute-api:Invoke',
                    'Effect': 'Allow',
                    'Resource': event['methodArn']
                }]
            }
        }
    except jwt.InvalidTokenError:
        raise Exception('Unauthorized')

Monitoring

CloudWatch Integration

import boto3

cloudwatch = boto3.client('cloudwatch')

def lambda_handler(event, context):
    start_time = time.time()

    try:
        # Process request
        result = process_request(event)

        # Custom metric
        cloudwatch.put_metric_data(
            Namespace='MyApp',
            MetricData=[{
                'MetricName': 'ProcessingLatency',
                'Value': (time.time() - start_time) * 1000,
                'Unit': 'Milliseconds'
            }]
        )

        return result

    except Exception as e:
        cloudwatch.put_metric_data(
            Namespace='MyApp',
            MetricData=[{
                'MetricName': 'ErrorCount',
                'Value': 1,
                'Unit': 'Count'
            }]
        )
        raise

Cost Optimization

Right-sizing Functions

# Memory optimization
Resources:
  LightFunction:
    Type: AWS::Serverless::Function
    Properties:
      MemorySize: 128  # CPU scales with memory
      Timeout: 10

  HeavyFunction:
    Type: AWS::Serverless::Function
    Properties:
      MemorySize: 1024
      Timeout: 60

Next Steps

  1. 📚 Học về Distributed Systems
  2. 🎯 Practice serverless patterns
  3. 🏗️ Explore workflow orchestration
  4. 💻 Setup monitoring

Content sẽ được expand với detailed serverless implementation patterns.