akka-net-best-practices by aaronontheweb/dotnet-skills
npx skills add https://github.com/aaronontheweb/dotnet-skills --skill akka-net-best-practices在以下情况下使用此技能:
Context.System.EventStream 仅限于单个 ActorSystem 进程。它不能跨集群节点工作。
// 错误示例:这仅在单个服务器上有效
// 当你添加第二个服务器时,服务器 2 上的订阅者将无法收到来自服务器 1 的事件
Context.System.EventStream.Subscribe(Self, typeof(PostCreated));
Context.System.EventStream.Publish(new PostCreated(postId, authorId));
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
EventStream 适用场景:
对于必须跨多个集群节点到达 Actor 的事件,请使用 Akka.Cluster.Tools.PublishSubscribe:
using Akka.Cluster.Tools.PublishSubscribe;
public class TimelineUpdatePublisher : ReceiveActor
{
private readonly IActorRef _mediator;
public TimelineUpdatePublisher()
{
// 获取 DistributedPubSub 中介者
_mediator = DistributedPubSub.Get(Context.System).Mediator;
Receive<PublishTimelineUpdate>(msg =>
{
// 发布到主题 - 到达所有节点上的所有订阅者
_mediator.Tell(new Publish($"timeline:{msg.UserId}", msg.Update));
});
}
}
builder.WithDistributedPubSub(role: null); // 在所有角色上可用,或指定一个角色
| 模式 | 主题格式 | 使用场景 |
|---|---|---|
| 按用户 | timeline:{userId} | 时间线更新、通知 |
| 按实体 | post:{postId} | 帖子互动更新 |
| 广播 | system:announcements | 系统范围通知 |
| 基于角色 | workers:rss-poller | 工作分配 |
在 Actor 上定义的监督策略规定了 该 Actor 如何监督其子级,而不是该 Actor 自身如何被监督。
public class ParentActor : ReceiveActor
{
// 此策略适用于 ParentActor 的子级,而非 ParentActor 自身
protected override SupervisorStrategy SupervisorStrategy()
{
return new OneForOneStrategy(
maxNrOfRetries: 10,
withinTimeRange: TimeSpan.FromSeconds(30),
decider: ex => ex switch
{
ArithmeticException => Directive.Resume,
NullReferenceException => Directive.Restart,
ArgumentException => Directive.Stop,
_ => Directive.Escalate
});
}
}
默认的 OneForOneStrategy 已经包含了速率限制:
除非有特定需求,否则很少需要自定义策略。
合理原因:
AllForOneStrategy不合理原因:
在以下情况下使用 try-catch:
public class RssFeedPollerActor : ReceiveActor
{
public RssFeedPollerActor()
{
ReceiveAsync<PollFeed>(async msg =>
{
try
{
var feed = await _httpClient.GetStringAsync(msg.FeedUrl);
var items = ParseFeed(feed);
// 处理项目...
}
catch (HttpRequestException ex)
{
// 预期故障 - 记录日志并安排重试
_log.Warning("Feed {Url} unavailable: {Error}", msg.FeedUrl, ex.Message);
Context.System.Scheduler.ScheduleTellOnce(
TimeSpan.FromMinutes(5), Self, msg, Self);
}
catch (XmlException ex)
{
// 无效的 Feed 格式 - 记录日志并标记为错误
_log.Error("Feed {Url} has invalid format: {Error}", msg.FeedUrl, ex.Message);
Sender.Tell(new FeedPollResult.InvalidFormat(msg.FeedUrl));
}
});
}
}
让异常传播(触发监督)当:
// 错误示例:吞掉异常会隐藏问题
catch (Exception ex)
{
_log.Error(ex, "Error processing work");
// Actor 继续运行,但状态可能已损坏
}
// 正确示例:处理已知异常,让未知异常传播
catch (HttpRequestException ex)
{
// 已知的、预期的故障 - 优雅处理
_log.Warning("HTTP request failed: {Error}", ex.Message);
Sender.Tell(new WorkResult.TransientFailure());
}
// 未知异常传播给监督机制
在以下情况下使用 Props.Create():
IServiceProvider 或 IRequiredActor<T>// 没有 DI 需求的简单 Actor
public static Props Props(PostId postId, IPostWriteStore store)
=> Akka.Actor.Props.Create(() => new PostEngagementActor(postId, store));
在以下情况下使用 resolver.Props<T>():
IServiceProvider 来创建作用域服务IRequiredActor<T> 获取其他 Actor 的引用// 使用 DI 注册
builder.WithActors((system, registry, resolver) =>
{
var actor = system.ActorOf(resolver.Props<OrderProcessorActor>(), "order-processor");
registry.Register<OrderProcessorActor>(actor);
});
你几乎不需要远程部署。 如果你没有进行远程部署(很可能你没有):
Props.Create(() => new Actor(...)) 是可以的对于大多数应用程序,请使用集群分片而不是远程部署 - 它会自动处理分发。
当你有很多后台作业(RSS 源、邮件发送等)时,不要一次性处理所有作业 - 这会导致惊群问题。
解决此问题的三种模式:
FOR UPDATE SKIP LOCKED 实现自然的跨节点分发完整代码示例请参阅 work-distribution-patterns.md。
| 错误 | 为何错误 | 修复方法 |
|---|---|---|
| 使用 EventStream 进行跨节点发布/订阅 | EventStream 仅限本地 | 使用 DistributedPubSub |
| 定义监督以“保护”一个 Actor | 监督保护的是子级 | 理解层次结构 |
| 捕获所有异常 | 隐藏错误,损坏状态 | 仅捕获预期错误 |
| 总是使用 DependencyResolver | 增加不必要的复杂性 | 可能时使用普通 Props |
| 一次性处理所有后台作业 | 惊群问题,资源耗尽 | 使用数据库队列 + 速率限制 |
| 为预期故障抛出异常 | 触发不必要的重启 | 返回结果类型,使用消息传递 |
需要在 Actor 之间通信吗?
├── 仅限同一进程? -> EventStream 即可
├── 跨集群节点?
│ ├── 点对点? -> 使用 ActorSelection 或已知的 IActorRef
│ └── 发布/订阅? -> 使用 DistributedPubSub
└── 发送即忘到外部系统? -> 考虑发件箱模式
Actor 中发生异常?
├── 预期故障(HTTP 超时、无效输入)?
│ └── Try-catch,优雅处理,继续
├── 状态可能已损坏?
│ └── 让监督重启
├── 未知原因?
│ └── 让监督重启
└── 编程错误(空引用、逻辑错误)?
└── 让监督重启,修复错误
创建 Actor Props?
├── Actor 需要 IServiceProvider?
│ └── 使用 resolver.Props<T>()
├── Actor 需要 IRequiredActor<T>?
│ └── 使用 resolver.Props<T>()
├── 具有构造函数参数的简单 Actor?
│ └── 使用 Props.Create(() => new Actor(...))
└── 需要远程部署?
└── 可能不需要 - 改用集群分片
对于需要在集群化生产环境和本地/测试环境中运行的应用程序,使用抽象模式在不同实现之间切换:
AkkaExecutionMode 枚举 - 控制使用哪些实现(LocalTest 与 Clustered)GenericChildPerEntityParent - 使用相同的 IMessageExtractor 在本地模拟分片行为IPubSubMediator - 为可交换的本地/集群实现抽象 DistributedPubSub完整实现代码请参阅 cluster-local-abstractions.md。
在 Actor 中,使用来自 Context.GetLogger() 的 ILoggingAdapter,而不是 DI 注入的 ILogger<T>:
public class MyActor : ReceiveActor
{
private readonly ILoggingAdapter _log = Context.GetLogger();
public MyActor()
{
Receive<MyMessage>(msg =>
{
_log.Info("Processing message for user {UserId}", msg.UserId);
_log.Error(ex, "Failed to process {MessageType}", msg.GetType().Name);
});
}
}
为何使用 ILoggingAdapter:
Info()、Debug()、Warning()、Error()(不是 Log* 变体)不要将 ILogger 注入到 Actor 中 - 这会绕过 Akka 的日志基础设施。
// 使用命名占位符,便于日志聚合和查询
_log.Info("Order {OrderId} processed for customer {CustomerId}", order.Id, order.CustomerId);
// 优先使用命名占位符而非位置占位符
// 正确:{OrderId}, {CustomerId}
// 避免:{0}, {1}
当 Actor 通过 PipeTo 启动异步操作时,如果管理不当,这些操作可能会比 Actor 存活更久。关键实践:
PostStop() 中取消并释放完整实现代码请参阅 async-cancellation-patterns.md。
每周安装次数
83
代码仓库
GitHub 星标数
491
首次出现
2026 年 1 月 28 日
安全审计
安装于
claude-code66
codex54
opencode53
github-copilot51
gemini-cli51
kimi-cli46
Use this skill when:
Context.System.EventStream is local to a single ActorSystem process. It does NOT work across cluster nodes.
// BAD: This only works on a single server
// When you add a second server, subscribers on server 2 won't receive events from server 1
Context.System.EventStream.Subscribe(Self, typeof(PostCreated));
Context.System.EventStream.Publish(new PostCreated(postId, authorId));
When EventStream is appropriate:
For events that must reach actors across multiple cluster nodes, use Akka.Cluster.Tools.PublishSubscribe:
using Akka.Cluster.Tools.PublishSubscribe;
public class TimelineUpdatePublisher : ReceiveActor
{
private readonly IActorRef _mediator;
public TimelineUpdatePublisher()
{
// Get the DistributedPubSub mediator
_mediator = DistributedPubSub.Get(Context.System).Mediator;
Receive<PublishTimelineUpdate>(msg =>
{
// Publish to a topic - reaches all subscribers across all nodes
_mediator.Tell(new Publish($"timeline:{msg.UserId}", msg.Update));
});
}
}
builder.WithDistributedPubSub(role: null); // Available on all roles, or specify a role
| Pattern | Topic Format | Use Case |
|---|---|---|
| Per-user | timeline:{userId} | Timeline updates, notifications |
| Per-entity | post:{postId} | Post engagement updates |
| Broadcast | system:announcements | System-wide notifications |
| Role-based | workers:rss-poller | Work distribution |
A supervision strategy defined on an actor dictates how that actor supervises its children , NOT how the actor itself is supervised.
public class ParentActor : ReceiveActor
{
// This strategy applies to children of ParentActor, NOT to ParentActor itself
protected override SupervisorStrategy SupervisorStrategy()
{
return new OneForOneStrategy(
maxNrOfRetries: 10,
withinTimeRange: TimeSpan.FromSeconds(30),
decider: ex => ex switch
{
ArithmeticException => Directive.Resume,
NullReferenceException => Directive.Restart,
ArgumentException => Directive.Stop,
_ => Directive.Escalate
});
}
}
The default OneForOneStrategy already includes rate limiting:
You rarely need a custom strategy unless you have specific requirements.
Good reasons:
AllForOneStrategyBad reasons:
Use try-catch when:
The failure is expected (network timeout, invalid input, external service down)
You know exactly why the exception occurred
You can handle it gracefully (retry, return error response, log and continue)
Restarting would not help (same error would occur again)
public class RssFeedPollerActor : ReceiveActor { public RssFeedPollerActor() { ReceiveAsync<PollFeed>(async msg => { try { var feed = await _httpClient.GetStringAsync(msg.FeedUrl); var items = ParseFeed(feed); // Process items... } catch (HttpRequestException ex) { // Expected failure - log and schedule retry _log.Warning("Feed {Url} unavailable: {Error}", msg.FeedUrl, ex.Message); Context.System.Scheduler.ScheduleTellOnce( TimeSpan.FromMinutes(5), Self, msg, Self); } catch (XmlException ex) { // Invalid feed format - log and mark as bad _log.Error("Feed {Url} has invalid format: {Error}", msg.FeedUrl, ex.Message); Sender.Tell(new FeedPollResult.InvalidFormat(msg.FeedUrl)); } }); } }
Let exceptions propagate (trigger supervision) when:
// BAD: Swallowing exceptions hides problems
catch (Exception ex)
{
_log.Error(ex, "Error processing work");
// Actor continues with potentially corrupt state
}
// GOOD: Handle known exceptions, let unknown ones propagate
catch (HttpRequestException ex)
{
// Known, expected failure - handle gracefully
_log.Warning("HTTP request failed: {Error}", ex.Message);
Sender.Tell(new WorkResult.TransientFailure());
}
// Unknown exceptions propagate to supervision
UseProps.Create() when:
Actor doesn't need IServiceProvider or IRequiredActor<T>
All dependencies can be passed via constructor
Actor is simple and self-contained
// Simple actor with no DI needs public static Props Props(PostId postId, IPostWriteStore store) => Akka.Actor.Props.Create(() => new PostEngagementActor(postId, store));
Useresolver.Props<T>() when:
Actor needs IServiceProvider to create scoped services
Actor uses IRequiredActor<T> to get references to other actors
Actor has many dependencies that are already in DI container
// Registration with DI builder.WithActors((system, registry, resolver) => { var actor = system.ActorOf(resolver.Props<OrderProcessorActor>(), "order-processor"); registry.Register<OrderProcessorActor>(actor); });
You almost never need remote deployment. If you're not doing remote deployment (and you probably aren't):
Props.Create(() => new Actor(...)) with closures is fineFor most applications, use cluster sharding instead of remote deployment - it handles distribution automatically.
When you have many background jobs (RSS feeds, email sending, etc.), don't process them all at once - this causes thundering herd problems.
Three patterns to solve this:
FOR UPDATE SKIP LOCKED for natural cross-node distributionSee work-distribution-patterns.md for full code samples.
| Mistake | Why It's Wrong | Fix |
|---|---|---|
| Using EventStream for cross-node pub/sub | EventStream is local only | Use DistributedPubSub |
| Defining supervision to "protect" an actor | Supervision protects children | Understand the hierarchy |
| Catching all exceptions | Hides bugs, corrupts state | Only catch expected errors |
| Always using DependencyResolver | Adds unnecessary complexity | Use plain Props when possible |
| Processing all background jobs at once | Thundering herd, resource exhaustion | Use database queue + rate limiting |
| Throwing exceptions for expected failures | Triggers unnecessary restarts | Return result types, use messaging |
Need to communicate between actors?
├── Same process only? -> EventStream is fine
├── Across cluster nodes?
│ ├── Point-to-point? -> Use ActorSelection or known IActorRef
│ └── Pub/sub? -> Use DistributedPubSub
└── Fire-and-forget to external system? -> Consider outbox pattern
Exception occurred in actor?
├── Expected failure (HTTP timeout, invalid input)?
│ └── Try-catch, handle gracefully, continue
├── State might be corrupt?
│ └── Let supervision restart
├── Unknown cause?
│ └── Let supervision restart
└── Programming error (null ref, bad logic)?
└── Let supervision restart, fix the bug
Creating actor Props?
├── Actor needs IServiceProvider?
│ └── Use resolver.Props<T>()
├── Actor needs IRequiredActor<T>?
│ └── Use resolver.Props<T>()
├── Simple actor with constructor params?
│ └── Use Props.Create(() => new Actor(...))
└── Remote deployment needed?
└── Probably not - use cluster sharding instead
For applications that need to run both in clustered production and local/test environments, use abstraction patterns to toggle between implementations:
AkkaExecutionMode enum - Controls which implementations are used (LocalTest vs Clustered)GenericChildPerEntityParent - Mimics sharding behavior locally using the same IMessageExtractorIPubSubMediator - Abstracts DistributedPubSub for swappable local/cluster implementationsSee cluster-local-abstractions.md for complete implementation code.
In actors, use ILoggingAdapter from Context.GetLogger() instead of DI-injected ILogger<T>:
public class MyActor : ReceiveActor
{
private readonly ILoggingAdapter _log = Context.GetLogger();
public MyActor()
{
Receive<MyMessage>(msg =>
{
_log.Info("Processing message for user {UserId}", msg.UserId);
_log.Error(ex, "Failed to process {MessageType}", msg.GetType().Name);
});
}
}
Why ILoggingAdapter:
Info(), Debug(), Warning(), Error() (not Log* variants)Don't inject ILogger into actors - it bypasses Akka's logging infrastructure.
// Named placeholders for better log aggregation and querying
_log.Info("Order {OrderId} processed for customer {CustomerId}", order.Id, order.CustomerId);
// Prefer named placeholders over positional
// Good: {OrderId}, {CustomerId}
// Avoid: {0}, {1}
When actors launch async operations via PipeTo, those operations can outlive the actor if not properly managed. Key practices:
PostStop()See async-cancellation-patterns.md for complete implementation code.
Weekly Installs
83
Repository
GitHub Stars
491
First Seen
Jan 28, 2026
Security Audits
Gen Agent Trust HubPassSocketPassSnykWarn
Installed on
claude-code66
codex54
opencode53
github-copilot51
gemini-cli51
kimi-cli46
Sentry .NET SDK 设置指南:错误监控、性能追踪与日志记录
328 周安装