Advanced
Real-Time Data Pipeline
Build a stream processing and analytics platform with Kinesis
Project Overview
Create a real-time data ingestion and processing pipeline that handles streaming data, performs transformations, and delivers insights through dashboards.
Prerequisites
- Understanding of streaming data concepts
- Python or Node.js programming skills
- Basic SQL and data analytics knowledge
- Experience with Lambda functions
Architecture
Data Sources
IoT/Apps/Logs
Kinesis
Data Streams
Lambda
Transform
Firehose
Delivery
S3
Data Lake
OpenSearch
Analytics
QuickSight or Kibana for visualization and dashboards
Step-by-Step Instructions
1
Create Kinesis Data Stream
- Go to Kinesis and create a new Data Stream
- Choose On-Demand capacity mode for variable loads
- Or Provisioned mode for predictable throughput
- Name it appropriately (e.g., clickstream-events)
- Note the stream ARN for permissions
2
Set Up Data Producers
- Create a sample data generator using AWS SDK
- Use PutRecord or PutRecords API
- Choose an appropriate partition key strategy
- Implement error handling and retries
- Consider Kinesis Data Generator for testing
3
Create Kinesis Firehose for S3 Delivery
- Create a Firehose delivery stream
- Source: Kinesis Data Stream
- Destination: S3 bucket
- Configure buffering (size and interval)
- Enable data format conversion (Parquet) if needed
- Add transformation Lambda if required
4
Add Real-Time Processing with Lambda
- Create Lambda function triggered by Kinesis
- Process records in batches
- Perform aggregations, filtering, or enrichment
- Write results to DynamoDB or another stream
- Handle partial batch failures properly
5
Set Up OpenSearch for Analytics
- Create OpenSearch domain
- Configure Firehose to deliver to OpenSearch
- Define index mappings for your data
- Set up index rotation (daily/weekly)
- Use Kibana/OpenSearch Dashboards for visualization
6
Create Dashboards and Alerts
- Build OpenSearch Dashboards visualizations
- Or connect QuickSight to S3/Athena
- Create real-time monitoring dashboards
- Set up CloudWatch alarms for pipeline health
- Configure anomaly detection if needed
Tips
- Use enhanced fan-out for multiple consumers - Each consumer gets dedicated throughput
- Partition data effectively - Choose partition keys that distribute load evenly
- Monitor iterator age - High iterator age indicates processing lag
- Implement dead letter queues - Capture failed records for later analysis
Code Examples
Create Kinesis Data Stream
Terminal Commands
BASH
# Create Kinesis Data Stream (on-demand mode)
aws kinesis create-stream \
--stream-name clickstream-events \
--stream-mode-details StreamMode=ON_DEMAND
# Or create with provisioned capacity
aws kinesis create-stream \
--stream-name clickstream-events \
--shard-count 2
Data Producer (Python)
producer.py
PYTHON
import boto3
import json
import random
from datetime import datetime
kinesis = boto3.client('kinesis')
def generate_event():
return {
'event_id': f'evt-{random.randint(1000, 9999)}',
'user_id': f'user-{random.randint(1, 100)}',
'event_type': random.choice(['click', 'view', 'purchase']),
'page': random.choice(['/home', '/products', '/checkout']),
'timestamp': datetime.utcnow().isoformat()
}
def send_events(stream_name, count=100):
for _ in range(count):
event = generate_event()
kinesis.put_record(
StreamName=stream_name,
Data=json.dumps(event),
PartitionKey=event['user_id']
)
print(f'Sent {count} events to {stream_name}')
if __name__ == '__main__':
send_events('clickstream-events', 100)
Lambda Stream Processor
lambda_function.py
PYTHON
import json
import base64
import boto3
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('EventAggregates')
def lambda_handler(event, context):
aggregates = {}
for record in event['Records']:
# Decode the Kinesis data
payload = base64.b64decode(record['kinesis']['data'])
data = json.loads(payload)
# Aggregate by event type
event_type = data['event_type']
if event_type not in aggregates:
aggregates[event_type] = 0
aggregates[event_type] += 1
# Store aggregates
for event_type, count in aggregates.items():
table.update_item(
Key={'event_type': event_type},
UpdateExpression='ADD event_count :c',
ExpressionAttributeValues={':c': count}
)
return {
'statusCode': 200,
'body': f'Processed {len(event["Records"])} records'
}
Kinesis Firehose to S3 (CloudFormation)
firehose.yaml
YAML
AWSTemplateFormatVersion: '2010-09-09'
Resources:
DeliveryStream:
Type: AWS::KinesisFirehose::DeliveryStream
Properties:
DeliveryStreamName: clickstream-to-s3
DeliveryStreamType: KinesisStreamAsSource
KinesisStreamSourceConfiguration:
KinesisStreamARN: !Sub arn:aws:kinesis:${AWS::Region}:${AWS::AccountId}:stream/clickstream-events
RoleARN: !GetAtt FirehoseRole.Arn
S3DestinationConfiguration:
BucketARN: !GetAtt DataLakeBucket.Arn
RoleARN: !GetAtt FirehoseRole.Arn
BufferingHints:
IntervalInSeconds: 60
SizeInMBs: 5
CompressionFormat: GZIP
Prefix: 'raw/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/'
ErrorOutputPrefix: 'errors/'
OpenSearch Index Mapping
index-mapping.json
JSON
{
"mappings": {
"properties": {
"event_id": { "type": "keyword" },
"user_id": { "type": "keyword" },
"event_type": { "type": "keyword" },
"page": { "type": "keyword" },
"timestamp": { "type": "date" },
"location": { "type": "geo_point" }
}
},
"settings": {
"number_of_shards": 2,
"number_of_replicas": 1
}
}
What You'll Learn
- Stream ingestion with Kinesis Data Streams
- Real-time data transformation with Lambda
- Data delivery with Kinesis Firehose
- Search and analytics with OpenSearch
- Dashboard creation and data visualization