Advanced

Real-Time Data Pipeline

Build a stream processing and analytics platform with Kinesis

Project Overview

Create a real-time data ingestion and processing pipeline that handles streaming data, performs transformations, and delivers insights through dashboards.

Difficulty: Advanced
AWS Services: Kinesis Data Streams, Kinesis Firehose, Lambda, OpenSearch, QuickSight
Cost: ~$100-200/month

Prerequisites

  • Understanding of streaming data concepts
  • Python or Node.js programming skills
  • Basic SQL and data analytics knowledge
  • Experience with Lambda functions

Architecture

📡
Data Sources
IoT/Apps/Logs
🌊
Kinesis
Data Streams
Lambda
Transform
🔥
Firehose
Delivery
📦
S3
Data Lake
🔍
OpenSearch
Analytics

QuickSight or Kibana for visualization and dashboards

Step-by-Step Instructions

1

Create Kinesis Data Stream

  • Go to Kinesis and create a new Data Stream
  • Choose On-Demand capacity mode for variable loads
  • Or Provisioned mode for predictable throughput
  • Name it appropriately (e.g., clickstream-events)
  • Note the stream ARN for permissions
2

Set Up Data Producers

  • Create a sample data generator using AWS SDK
  • Use PutRecord or PutRecords API
  • Choose an appropriate partition key strategy
  • Implement error handling and retries
  • Consider Kinesis Data Generator for testing
3

Create Kinesis Firehose for S3 Delivery

  • Create a Firehose delivery stream
  • Source: Kinesis Data Stream
  • Destination: S3 bucket
  • Configure buffering (size and interval)
  • Enable data format conversion (Parquet) if needed
  • Add transformation Lambda if required
4

Add Real-Time Processing with Lambda

  • Create Lambda function triggered by Kinesis
  • Process records in batches
  • Perform aggregations, filtering, or enrichment
  • Write results to DynamoDB or another stream
  • Handle partial batch failures properly
5

Set Up OpenSearch for Analytics

  • Create OpenSearch domain
  • Configure Firehose to deliver to OpenSearch
  • Define index mappings for your data
  • Set up index rotation (daily/weekly)
  • Use Kibana/OpenSearch Dashboards for visualization
6

Create Dashboards and Alerts

  • Build OpenSearch Dashboards visualizations
  • Or connect QuickSight to S3/Athena
  • Create real-time monitoring dashboards
  • Set up CloudWatch alarms for pipeline health
  • Configure anomaly detection if needed

Tips

  • Use enhanced fan-out for multiple consumers - Each consumer gets dedicated throughput
  • Partition data effectively - Choose partition keys that distribute load evenly
  • Monitor iterator age - High iterator age indicates processing lag
  • Implement dead letter queues - Capture failed records for later analysis

Code Examples

Create Kinesis Data Stream

Terminal Commands BASH
# Create Kinesis Data Stream (on-demand mode)
aws kinesis create-stream \
    --stream-name clickstream-events \
    --stream-mode-details StreamMode=ON_DEMAND

# Or create with provisioned capacity
aws kinesis create-stream \
    --stream-name clickstream-events \
    --shard-count 2

Data Producer (Python)

producer.py PYTHON
import boto3
import json
import random
from datetime import datetime

kinesis = boto3.client('kinesis')

def generate_event():
    return {
        'event_id': f'evt-{random.randint(1000, 9999)}',
        'user_id': f'user-{random.randint(1, 100)}',
        'event_type': random.choice(['click', 'view', 'purchase']),
        'page': random.choice(['/home', '/products', '/checkout']),
        'timestamp': datetime.utcnow().isoformat()
    }

def send_events(stream_name, count=100):
    for _ in range(count):
        event = generate_event()
        kinesis.put_record(
            StreamName=stream_name,
            Data=json.dumps(event),
            PartitionKey=event['user_id']
        )
    print(f'Sent {count} events to {stream_name}')

if __name__ == '__main__':
    send_events('clickstream-events', 100)

Lambda Stream Processor

lambda_function.py PYTHON
import json
import base64
import boto3

dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('EventAggregates')

def lambda_handler(event, context):
    aggregates = {}

    for record in event['Records']:
        # Decode the Kinesis data
        payload = base64.b64decode(record['kinesis']['data'])
        data = json.loads(payload)

        # Aggregate by event type
        event_type = data['event_type']
        if event_type not in aggregates:
            aggregates[event_type] = 0
        aggregates[event_type] += 1

    # Store aggregates
    for event_type, count in aggregates.items():
        table.update_item(
            Key={'event_type': event_type},
            UpdateExpression='ADD event_count :c',
            ExpressionAttributeValues={':c': count}
        )

    return {
        'statusCode': 200,
        'body': f'Processed {len(event["Records"])} records'
    }

Kinesis Firehose to S3 (CloudFormation)

firehose.yaml YAML
AWSTemplateFormatVersion: '2010-09-09'
Resources:
  DeliveryStream:
    Type: AWS::KinesisFirehose::DeliveryStream
    Properties:
      DeliveryStreamName: clickstream-to-s3
      DeliveryStreamType: KinesisStreamAsSource
      KinesisStreamSourceConfiguration:
        KinesisStreamARN: !Sub arn:aws:kinesis:${AWS::Region}:${AWS::AccountId}:stream/clickstream-events
        RoleARN: !GetAtt FirehoseRole.Arn
      S3DestinationConfiguration:
        BucketARN: !GetAtt DataLakeBucket.Arn
        RoleARN: !GetAtt FirehoseRole.Arn
        BufferingHints:
          IntervalInSeconds: 60
          SizeInMBs: 5
        CompressionFormat: GZIP
        Prefix: 'raw/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/'
        ErrorOutputPrefix: 'errors/'

OpenSearch Index Mapping

index-mapping.json JSON
{
    "mappings": {
        "properties": {
            "event_id": { "type": "keyword" },
            "user_id": { "type": "keyword" },
            "event_type": { "type": "keyword" },
            "page": { "type": "keyword" },
            "timestamp": { "type": "date" },
            "location": { "type": "geo_point" }
        }
    },
    "settings": {
        "number_of_shards": 2,
        "number_of_replicas": 1
    }
}

What You'll Learn

  • Stream ingestion with Kinesis Data Streams
  • Real-time data transformation with Lambda
  • Data delivery with Kinesis Firehose
  • Search and analytics with OpenSearch
  • Dashboard creation and data visualization