重要前提
安装AI Skills的关键前提是:必须科学上网,且开启TUN模式,这一点至关重要,直接决定安装能否顺利完成,在此郑重提醒三遍:科学上网,科学上网,科学上网。查看完整安装教程 →
kinesis-stream-processor by dengineproblem/agents-monorepo
npx skills add https://github.com/dengineproblem/agents-monorepo --skill kinesis-stream-processor使用 AWS Kinesis 构建实时数据流应用程序的专家。
| 组件 | 用途 | 使用场景 |
|---|---|---|
| Data Streams | 实时数据摄取 | 自定义处理,低延迟 |
| Data Firehose | 数据交付到目标 | S3、Redshift、Elasticsearch |
| Data Analytics | 基于 SQL 的处理 | 实时分析 |
| Video Streams | 视频流 | IoT、媒体处理 |
Kinesis Data Streams:
per_shard:
write: "1,000 records/sec OR 1 MB/sec"
read: "5 transactions/sec, up to 10,000 records"
read_throughput: "2 MB/sec"
per_stream:
max_shards: "500 (soft limit)"
retention: "24 hours (default) to 365 days"
per_record:
max_size: "1 MB"
partition_key: "256 bytes max"
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
import boto3
import json
import time
from concurrent.futures import ThreadPoolExecutor
from typing import List, Dict, Any
class KinesisProducer:
"""优化的 Kinesis 生产者,支持批处理和错误处理。"""
def __init__(self, stream_name: str, region: str = 'us-east-1'):
self.stream_name = stream_name
self.client = boto3.client('kinesis', region_name=region)
self.buffer: List[Dict] = []
self.buffer_size = 500 # 每批次最大记录数
self.buffer_time = 0.1 # 每 100 毫秒刷新一次
self.last_flush = time.time()
def put_record(self, data: Dict[str, Any], partition_key: str) -> None:
"""将记录添加到缓冲区,如果需要则刷新。"""
self.buffer.append({
'Data': json.dumps(data).encode('utf-8'),
'PartitionKey': partition_key
})
if len(self.buffer) >= self.buffer_size:
self.flush()
elif time.time() - self.last_flush > self.buffer_time:
self.flush()
def flush(self) -> None:
"""将缓冲的记录发送到 Kinesis。"""
if not self.buffer:
return
records = self.buffer[:500] # PutRecords 限制
self.buffer = self.buffer[500:]
try:
response = self.client.put_records(
StreamName=self.stream_name,
Records=records
)
# 处理部分失败
failed_count = response.get('FailedRecordCount', 0)
if failed_count > 0:
self._handle_failures(response, records)
except Exception as e:
print(f"Kinesis put_records 错误: {e}")
# 实现重试逻辑或死信队列
raise
self.last_flush = time.time()
def _handle_failures(self, response: Dict, records: List[Dict]) -> None:
"""使用指数退避重试失败的记录。"""
failed_records = []
for i, record_response in enumerate(response['Records']):
if 'ErrorCode' in record_response:
failed_records.append(records[i])
print(f"失败记录: {record_response['ErrorCode']} - {record_response.get('ErrorMessage')}")
# 重试失败的记录
if failed_records:
time.sleep(0.1) # 短暂退避
self.client.put_records(
StreamName=self.stream_name,
Records=failed_records
)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.flush()
const { KinesisClient, PutRecordsCommand } = require('@aws-sdk/client-kinesis');
class KinesisProducer {
constructor(streamName, region = 'us-east-1') {
this.streamName = streamName;
this.client = new KinesisClient({ region });
this.buffer = [];
this.bufferSize = 500;
this.flushInterval = 100; // 毫秒
// 自动刷新计时器
setInterval(() => this.flush(), this.flushInterval);
}
async putRecord(data, partitionKey) {
this.buffer.push({
Data: Buffer.from(JSON.stringify(data)),
PartitionKey: partitionKey
});
if (this.buffer.length >= this.bufferSize) {
await this.flush();
}
}
async flush() {
if (this.buffer.length === 0) return;
const records = this.buffer.splice(0, 500);
try {
const command = new PutRecordsCommand({
StreamName: this.streamName,
Records: records
});
const response = await this.client.send(command);
if (response.FailedRecordCount > 0) {
await this.handleFailures(response, records);
}
} catch (error) {
console.error('Kinesis 错误:', error);
throw error;
}
}
async handleFailures(response, records) {
const failedRecords = response.Records
.map((r, i) => r.ErrorCode ? records[i] : null)
.filter(Boolean);
if (failedRecords.length > 0) {
// 指数退避重试
await new Promise(resolve => setTimeout(resolve, 100));
const command = new PutRecordsCommand({
StreamName: this.streamName,
Records: failedRecords
});
await this.client.send(command);
}
}
}
import json
import base64
from typing import Dict, Any, List
def lambda_handler(event: Dict[str, Any], context) -> Dict[str, Any]:
"""处理来自 Lambda 触发器的 Kinesis 记录。"""
processed_records = []
failed_records = []
for record in event['Records']:
try:
# 解码 Kinesis 记录
payload = base64.b64decode(record['kinesis']['data'])
data = json.loads(payload)
# 处理记录
result = process_record(data)
processed_records.append({
'sequenceNumber': record['kinesis']['sequenceNumber'],
'result': result
})
except Exception as e:
print(f"处理记录时出错: {e}")
failed_records.append({
'sequenceNumber': record['kinesis']['sequenceNumber'],
'error': str(e)
})
# 报告结果
print(f"已处理: {len(processed_records)}, 失败: {len(failed_records)}")
# 为部分批次响应返回批次项目失败
return {
'batchItemFailures': [
{'itemIdentifier': r['sequenceNumber']}
for r in failed_records
]
}
def process_record(data: Dict) -> Dict:
"""处理每条记录的业务逻辑。"""
# 转换数据
transformed = {
'id': data.get('id'),
'timestamp': data.get('timestamp'),
'processed_at': datetime.utcnow().isoformat(),
'value': data.get('value', 0) * 2 # 示例转换
}
# 写入下游(DynamoDB、S3 等)
write_to_downstream(transformed)
return transformed
import boto3
import time
from datetime import datetime
class KinesisConsumer:
"""支持检查点的 KCL 风格消费者。"""
def __init__(self, stream_name: str, region: str = 'us-east-1'):
self.stream_name = stream_name
self.client = boto3.client('kinesis', region_name=region)
self.checkpoint_interval = 60 # 秒
self.last_checkpoint = time.time()
def process_shard(self, shard_id: str) -> None:
"""处理来自单个分片的记录。"""
# 获取分片迭代器
iterator_response = self.client.get_shard_iterator(
StreamName=self.stream_name,
ShardId=shard_id,
ShardIteratorType='LATEST' # 或 'TRIM_HORIZON'、'AT_SEQUENCE_NUMBER'
)
shard_iterator = iterator_response['ShardIterator']
while True:
try:
response = self.client.get_records(
ShardIterator=shard_iterator,
Limit=100
)
for record in response['Records']:
self.process_record(record)
# 定期检查点
if time.time() - self.last_checkpoint > self.checkpoint_interval:
self.checkpoint(shard_id, response['Records'][-1]['SequenceNumber'])
# 获取下一个迭代器
shard_iterator = response.get('NextShardIterator')
if not shard_iterator:
break
# 遵守速率限制
if len(response['Records']) == 0:
time.sleep(0.5)
except Exception as e:
print(f"处理分片 {shard_id} 时出错: {e}")
time.sleep(1)
def process_record(self, record: Dict) -> None:
"""处理单个记录。"""
data = json.loads(record['Data'])
# 此处为业务逻辑
print(f"正在处理: {data}")
def checkpoint(self, shard_id: str, sequence_number: str) -> None:
"""保存检查点以供恢复。"""
# 存储在 DynamoDB 或其他持久化存储中
print(f"检查点: shard={shard_id}, seq={sequence_number}")
self.last_checkpoint = time.time()
import boto3
import json
def setup_enhanced_fanout(stream_arn: str, consumer_name: str) -> str:
"""注册增强扇出消费者以获得专用吞吐量。"""
client = boto3.client('kinesis')
# 注册消费者
response = client.register_stream_consumer(
StreamARN=stream_arn,
ConsumerName=consumer_name
)
consumer_arn = response['Consumer']['ConsumerARN']
# 等待消费者变为活动状态
waiter = client.get_waiter('stream_consumer_active')
waiter.wait(
StreamARN=stream_arn,
ConsumerName=consumer_name
)
return consumer_arn
def subscribe_to_shard(consumer_arn: str, shard_id: str):
"""使用增强扇出订阅分片。"""
client = boto3.client('kinesis')
response = client.subscribe_to_shard(
ConsumerARN=consumer_arn,
ShardId=shard_id,
StartingPosition={
'Type': 'LATEST'
}
)
# 处理来自订阅的事件
for event in response['EventStream']:
if 'SubscribeToShardEvent' in event:
records = event['SubscribeToShardEvent']['Records']
for record in records:
process_record(record)
AWSTemplateFormatVersion: '2010-09-09'
Description: 带有 Lambda 消费者的 Kinesis 数据流
Parameters:
StreamName:
Type: String
Default: my-data-stream
ShardCount:
Type: Number
Default: 2
RetentionPeriod:
Type: Number
Default: 24
Resources:
KinesisStream:
Type: AWS::Kinesis::Stream
Properties:
Name: !Ref StreamName
ShardCount: !Ref ShardCount
RetentionPeriodHours: !Ref RetentionPeriod
StreamEncryption:
EncryptionType: KMS
KeyId: alias/aws/kinesis
Tags:
- Key: Environment
Value: production
ProcessorFunction:
Type: AWS::Lambda::Function
Properties:
FunctionName: kinesis-processor
Runtime: python3.11
Handler: index.lambda_handler
MemorySize: 256
Timeout: 60
Role: !GetAtt ProcessorRole.Arn
Code:
ZipFile: |
import json
import base64
def lambda_handler(event, context):
for record in event['Records']:
payload = base64.b64decode(record['kinesis']['data'])
print(f"Processed: {payload}")
return {'statusCode': 200}
EventSourceMapping:
Type: AWS::Lambda::EventSourceMapping
Properties:
EventSourceArn: !GetAtt KinesisStream.Arn
FunctionName: !Ref ProcessorFunction
StartingPosition: LATEST
BatchSize: 100
MaximumBatchingWindowInSeconds: 5
MaximumRetryAttempts: 3
BisectBatchOnFunctionError: true
ParallelizationFactor: 1
ProcessorRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: lambda.amazonaws.com
Action: sts:AssumeRole
ManagedPolicyArns:
- arn:aws:iam::aws:policy/service-role/AWSLambdaKinesisExecutionRole
- arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
# CloudWatch 告警
IteratorAgeAlarm:
Type: AWS::CloudWatch::Alarm
Properties:
AlarmName: kinesis-iterator-age
MetricName: GetRecords.IteratorAgeMilliseconds
Namespace: AWS/Kinesis
Dimensions:
- Name: StreamName
Value: !Ref StreamName
Statistic: Maximum
Period: 60
EvaluationPeriods: 5
Threshold: 60000 # 1 分钟
ComparisonOperator: GreaterThanThreshold
AlarmActions:
- !Ref AlertTopic
AlertTopic:
Type: AWS::SNS::Topic
Properties:
TopicName: kinesis-alerts
Outputs:
StreamArn:
Value: !GetAtt KinesisStream.Arn
StreamName:
Value: !Ref KinesisStream
resource "aws_kinesis_stream" "main" {
name = var.stream_name
shard_count = var.shard_count
retention_period = var.retention_hours
encryption_type = "KMS"
kms_key_id = "alias/aws/kinesis"
shard_level_metrics = [
"IncomingBytes",
"IncomingRecords",
"OutgoingBytes",
"OutgoingRecords",
"WriteProvisionedThroughputExceeded",
"ReadProvisionedThroughputExceeded",
"IteratorAgeMilliseconds"
]
tags = {
Environment = var.environment
}
}
resource "aws_lambda_event_source_mapping" "kinesis" {
event_source_arn = aws_kinesis_stream.main.arn
function_name = aws_lambda_function.processor.arn
starting_position = "LATEST"
batch_size = 100
maximum_batching_window_in_seconds = 5
maximum_retry_attempts = 3
bisect_batch_on_function_error = true
parallelization_factor = 1
}
| 指标 | 描述 | 告警阈值 |
|---|---|---|
IncomingRecords | 每秒写入的记录数 | 监控流量模式 |
IncomingBytes | 每秒写入的字节数 | 分片限制的 80% |
WriteProvisionedThroughputExceeded | 被限制的写入 | >0 |
ReadProvisionedThroughputExceeded | 被限制的读取 | >0 |
GetRecords.IteratorAgeMilliseconds | 消费者延迟 | >60000 毫秒 |
GetRecords.Success | 成功的 GetRecords 操作 | 监控下降情况 |
import boto3
def get_stream_metrics(stream_name: str, period_minutes: int = 5):
"""获取用于监控的关键 Kinesis 指标。"""
cloudwatch = boto3.client('cloudwatch')
metrics = [
'IncomingRecords',
'IncomingBytes',
'WriteProvisionedThroughputExceeded',
'GetRecords.IteratorAgeMilliseconds'
]
results = {}
for metric in metrics:
response = cloudwatch.get_metric_statistics(
Namespace='AWS/Kinesis',
MetricName=metric,
Dimensions=[{'Name': 'StreamName', 'Value': stream_name}],
StartTime=datetime.utcnow() - timedelta(minutes=period_minutes),
EndTime=datetime.utcnow(),
Period=60,
Statistics=['Sum', 'Average', 'Maximum']
)
results[metric] = response['Datapoints']
return results
每周安装数
37
代码仓库
GitHub 星标数
3
首次出现
2026年1月29日
安全审计
安装于
github-copilot37
opencode36
gemini-cli36
claude-code36
codex36
kimi-cli36
Expert in building real-time data streaming applications with AWS Kinesis.
| Component | Purpose | Use Case |
|---|---|---|
| Data Streams | Real-time data ingestion | Custom processing, low latency |
| Data Firehose | Delivery to destinations | S3, Redshift, Elasticsearch |
| Data Analytics | SQL-based processing | Real-time analytics |
| Video Streams | Video streaming | IoT, media processing |
Kinesis Data Streams:
per_shard:
write: "1,000 records/sec OR 1 MB/sec"
read: "5 transactions/sec, up to 10,000 records"
read_throughput: "2 MB/sec"
per_stream:
max_shards: "500 (soft limit)"
retention: "24 hours (default) to 365 days"
per_record:
max_size: "1 MB"
partition_key: "256 bytes max"
import boto3
import json
import time
from concurrent.futures import ThreadPoolExecutor
from typing import List, Dict, Any
class KinesisProducer:
"""Optimized Kinesis producer with batching and error handling."""
def __init__(self, stream_name: str, region: str = 'us-east-1'):
self.stream_name = stream_name
self.client = boto3.client('kinesis', region_name=region)
self.buffer: List[Dict] = []
self.buffer_size = 500 # Max records per batch
self.buffer_time = 0.1 # Flush every 100ms
self.last_flush = time.time()
def put_record(self, data: Dict[str, Any], partition_key: str) -> None:
"""Add record to buffer, flush if needed."""
self.buffer.append({
'Data': json.dumps(data).encode('utf-8'),
'PartitionKey': partition_key
})
if len(self.buffer) >= self.buffer_size:
self.flush()
elif time.time() - self.last_flush > self.buffer_time:
self.flush()
def flush(self) -> None:
"""Send buffered records to Kinesis."""
if not self.buffer:
return
records = self.buffer[:500] # PutRecords limit
self.buffer = self.buffer[500:]
try:
response = self.client.put_records(
StreamName=self.stream_name,
Records=records
)
# Handle partial failures
failed_count = response.get('FailedRecordCount', 0)
if failed_count > 0:
self._handle_failures(response, records)
except Exception as e:
print(f"Kinesis put_records error: {e}")
# Implement retry logic or dead letter queue
raise
self.last_flush = time.time()
def _handle_failures(self, response: Dict, records: List[Dict]) -> None:
"""Retry failed records with exponential backoff."""
failed_records = []
for i, record_response in enumerate(response['Records']):
if 'ErrorCode' in record_response:
failed_records.append(records[i])
print(f"Failed record: {record_response['ErrorCode']} - {record_response.get('ErrorMessage')}")
# Retry failed records
if failed_records:
time.sleep(0.1) # Brief backoff
self.client.put_records(
StreamName=self.stream_name,
Records=failed_records
)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.flush()
const { KinesisClient, PutRecordsCommand } = require('@aws-sdk/client-kinesis');
class KinesisProducer {
constructor(streamName, region = 'us-east-1') {
this.streamName = streamName;
this.client = new KinesisClient({ region });
this.buffer = [];
this.bufferSize = 500;
this.flushInterval = 100; // ms
// Auto-flush timer
setInterval(() => this.flush(), this.flushInterval);
}
async putRecord(data, partitionKey) {
this.buffer.push({
Data: Buffer.from(JSON.stringify(data)),
PartitionKey: partitionKey
});
if (this.buffer.length >= this.bufferSize) {
await this.flush();
}
}
async flush() {
if (this.buffer.length === 0) return;
const records = this.buffer.splice(0, 500);
try {
const command = new PutRecordsCommand({
StreamName: this.streamName,
Records: records
});
const response = await this.client.send(command);
if (response.FailedRecordCount > 0) {
await this.handleFailures(response, records);
}
} catch (error) {
console.error('Kinesis error:', error);
throw error;
}
}
async handleFailures(response, records) {
const failedRecords = response.Records
.map((r, i) => r.ErrorCode ? records[i] : null)
.filter(Boolean);
if (failedRecords.length > 0) {
// Exponential backoff retry
await new Promise(resolve => setTimeout(resolve, 100));
const command = new PutRecordsCommand({
StreamName: this.streamName,
Records: failedRecords
});
await this.client.send(command);
}
}
}
import json
import base64
from typing import Dict, Any, List
def lambda_handler(event: Dict[str, Any], context) -> Dict[str, Any]:
"""Process Kinesis records from Lambda trigger."""
processed_records = []
failed_records = []
for record in event['Records']:
try:
# Decode Kinesis record
payload = base64.b64decode(record['kinesis']['data'])
data = json.loads(payload)
# Process record
result = process_record(data)
processed_records.append({
'sequenceNumber': record['kinesis']['sequenceNumber'],
'result': result
})
except Exception as e:
print(f"Error processing record: {e}")
failed_records.append({
'sequenceNumber': record['kinesis']['sequenceNumber'],
'error': str(e)
})
# Report results
print(f"Processed: {len(processed_records)}, Failed: {len(failed_records)}")
# Return batch item failures for partial batch response
return {
'batchItemFailures': [
{'itemIdentifier': r['sequenceNumber']}
for r in failed_records
]
}
def process_record(data: Dict) -> Dict:
"""Business logic for processing each record."""
# Transform data
transformed = {
'id': data.get('id'),
'timestamp': data.get('timestamp'),
'processed_at': datetime.utcnow().isoformat(),
'value': data.get('value', 0) * 2 # Example transformation
}
# Write to downstream (DynamoDB, S3, etc.)
write_to_downstream(transformed)
return transformed
import boto3
import time
from datetime import datetime
class KinesisConsumer:
"""KCL-style consumer with checkpointing."""
def __init__(self, stream_name: str, region: str = 'us-east-1'):
self.stream_name = stream_name
self.client = boto3.client('kinesis', region_name=region)
self.checkpoint_interval = 60 # seconds
self.last_checkpoint = time.time()
def process_shard(self, shard_id: str) -> None:
"""Process records from a single shard."""
# Get shard iterator
iterator_response = self.client.get_shard_iterator(
StreamName=self.stream_name,
ShardId=shard_id,
ShardIteratorType='LATEST' # or 'TRIM_HORIZON', 'AT_SEQUENCE_NUMBER'
)
shard_iterator = iterator_response['ShardIterator']
while True:
try:
response = self.client.get_records(
ShardIterator=shard_iterator,
Limit=100
)
for record in response['Records']:
self.process_record(record)
# Checkpoint periodically
if time.time() - self.last_checkpoint > self.checkpoint_interval:
self.checkpoint(shard_id, response['Records'][-1]['SequenceNumber'])
# Get next iterator
shard_iterator = response.get('NextShardIterator')
if not shard_iterator:
break
# Respect rate limits
if len(response['Records']) == 0:
time.sleep(0.5)
except Exception as e:
print(f"Error processing shard {shard_id}: {e}")
time.sleep(1)
def process_record(self, record: Dict) -> None:
"""Process individual record."""
data = json.loads(record['Data'])
# Business logic here
print(f"Processing: {data}")
def checkpoint(self, shard_id: str, sequence_number: str) -> None:
"""Save checkpoint for recovery."""
# Store in DynamoDB or other persistent store
print(f"Checkpoint: shard={shard_id}, seq={sequence_number}")
self.last_checkpoint = time.time()
import boto3
import json
def setup_enhanced_fanout(stream_arn: str, consumer_name: str) -> str:
"""Register enhanced fan-out consumer for dedicated throughput."""
client = boto3.client('kinesis')
# Register consumer
response = client.register_stream_consumer(
StreamARN=stream_arn,
ConsumerName=consumer_name
)
consumer_arn = response['Consumer']['ConsumerARN']
# Wait for consumer to become active
waiter = client.get_waiter('stream_consumer_active')
waiter.wait(
StreamARN=stream_arn,
ConsumerName=consumer_name
)
return consumer_arn
def subscribe_to_shard(consumer_arn: str, shard_id: str):
"""Subscribe to shard with enhanced fan-out."""
client = boto3.client('kinesis')
response = client.subscribe_to_shard(
ConsumerARN=consumer_arn,
ShardId=shard_id,
StartingPosition={
'Type': 'LATEST'
}
)
# Process events from subscription
for event in response['EventStream']:
if 'SubscribeToShardEvent' in event:
records = event['SubscribeToShardEvent']['Records']
for record in records:
process_record(record)
AWSTemplateFormatVersion: '2010-09-09'
Description: Kinesis Data Stream with Lambda Consumer
Parameters:
StreamName:
Type: String
Default: my-data-stream
ShardCount:
Type: Number
Default: 2
RetentionPeriod:
Type: Number
Default: 24
Resources:
KinesisStream:
Type: AWS::Kinesis::Stream
Properties:
Name: !Ref StreamName
ShardCount: !Ref ShardCount
RetentionPeriodHours: !Ref RetentionPeriod
StreamEncryption:
EncryptionType: KMS
KeyId: alias/aws/kinesis
Tags:
- Key: Environment
Value: production
ProcessorFunction:
Type: AWS::Lambda::Function
Properties:
FunctionName: kinesis-processor
Runtime: python3.11
Handler: index.lambda_handler
MemorySize: 256
Timeout: 60
Role: !GetAtt ProcessorRole.Arn
Code:
ZipFile: |
import json
import base64
def lambda_handler(event, context):
for record in event['Records']:
payload = base64.b64decode(record['kinesis']['data'])
print(f"Processed: {payload}")
return {'statusCode': 200}
EventSourceMapping:
Type: AWS::Lambda::EventSourceMapping
Properties:
EventSourceArn: !GetAtt KinesisStream.Arn
FunctionName: !Ref ProcessorFunction
StartingPosition: LATEST
BatchSize: 100
MaximumBatchingWindowInSeconds: 5
MaximumRetryAttempts: 3
BisectBatchOnFunctionError: true
ParallelizationFactor: 1
ProcessorRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: lambda.amazonaws.com
Action: sts:AssumeRole
ManagedPolicyArns:
- arn:aws:iam::aws:policy/service-role/AWSLambdaKinesisExecutionRole
- arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
# CloudWatch Alarms
IteratorAgeAlarm:
Type: AWS::CloudWatch::Alarm
Properties:
AlarmName: kinesis-iterator-age
MetricName: GetRecords.IteratorAgeMilliseconds
Namespace: AWS/Kinesis
Dimensions:
- Name: StreamName
Value: !Ref StreamName
Statistic: Maximum
Period: 60
EvaluationPeriods: 5
Threshold: 60000 # 1 minute
ComparisonOperator: GreaterThanThreshold
AlarmActions:
- !Ref AlertTopic
AlertTopic:
Type: AWS::SNS::Topic
Properties:
TopicName: kinesis-alerts
Outputs:
StreamArn:
Value: !GetAtt KinesisStream.Arn
StreamName:
Value: !Ref KinesisStream
resource "aws_kinesis_stream" "main" {
name = var.stream_name
shard_count = var.shard_count
retention_period = var.retention_hours
encryption_type = "KMS"
kms_key_id = "alias/aws/kinesis"
shard_level_metrics = [
"IncomingBytes",
"IncomingRecords",
"OutgoingBytes",
"OutgoingRecords",
"WriteProvisionedThroughputExceeded",
"ReadProvisionedThroughputExceeded",
"IteratorAgeMilliseconds"
]
tags = {
Environment = var.environment
}
}
resource "aws_lambda_event_source_mapping" "kinesis" {
event_source_arn = aws_kinesis_stream.main.arn
function_name = aws_lambda_function.processor.arn
starting_position = "LATEST"
batch_size = 100
maximum_batching_window_in_seconds = 5
maximum_retry_attempts = 3
bisect_batch_on_function_error = true
parallelization_factor = 1
}
| Metric | Description | Alert Threshold |
|---|---|---|
IncomingRecords | Records put per second | Monitor for traffic patterns |
IncomingBytes | Bytes put per second | 80% of shard limit |
WriteProvisionedThroughputExceeded | Throttled writes | >0 |
ReadProvisionedThroughputExceeded | Throttled reads | >0 |
GetRecords.IteratorAgeMilliseconds |
import boto3
def get_stream_metrics(stream_name: str, period_minutes: int = 5):
"""Get key Kinesis metrics for monitoring."""
cloudwatch = boto3.client('cloudwatch')
metrics = [
'IncomingRecords',
'IncomingBytes',
'WriteProvisionedThroughputExceeded',
'GetRecords.IteratorAgeMilliseconds'
]
results = {}
for metric in metrics:
response = cloudwatch.get_metric_statistics(
Namespace='AWS/Kinesis',
MetricName=metric,
Dimensions=[{'Name': 'StreamName', 'Value': stream_name}],
StartTime=datetime.utcnow() - timedelta(minutes=period_minutes),
EndTime=datetime.utcnow(),
Period=60,
Statistics=['Sum', 'Average', 'Maximum']
)
results[metric] = response['Datapoints']
return results
Weekly Installs
37
Repository
GitHub Stars
3
First Seen
Jan 29, 2026
Security Audits
Gen Agent Trust HubPassSocketPassSnykPass
Installed on
github-copilot37
opencode36
gemini-cli36
claude-code36
codex36
kimi-cli36
Supabase Postgres 最佳实践指南 - 8大类别性能优化规则与SQL示例
89,100 周安装
模式库(pattern-library)设计模式专家 - 可复用用户体验解决方案与最佳实践
194 周安装
PlayCanvas引擎:轻量级WebGL/WebGPU游戏引擎,ECS架构与可视化编辑器
187 周安装
事件溯源架构师:精通事件溯源、CQRS与事件驱动架构的设计与实现
189 周安装
Pandoc文档转换教程:Markdown转Word/PDF/HTML格式转换命令大全
186 周安装
Storyboard Manager:AI驱动的创意写作助手,角色发展与故事规划工具
212 周安装
Apple HIG 快速参考与设计决策指南 - iOS/macOS 界面设计规范检查清单
189 周安装
| Consumer lag |
| >60000ms |
GetRecords.Success | Successful GetRecords | Monitor for drops |