简介
最近我开发了 MessageWorkerPool 专案。其主要概念是提供一个平台框架,使使用者能够快速且轻鬆地在 Worker 内实作逻辑。该设计高度灵活,允许基于我创建的 Worker 通讯协议,以多种程式语言实作 Worker。目前,我已提供使用 C#、Rust 和 Python 编写的 Worker 范例。
这个函式库在多进程环境中处理任务表现优异。此外,它还支援优雅关闭 (graceful shutdown),确保在随时 consumer worker 能顺利终止处理程序。
MessageWorkerPool GitHub
为什么选择 ProcessPool 而非 ThreadPool ?
当你需要强大的隔离性,以防止某个任务影响其他任务时,应该选择 ProcessPool,特别是针对关键操作或容易崩溃的任务。虽然 ThreadPool 较为轻量(因为执行绪共用记忆体并且具有较低的上下文切换开销),但 ProcessPool 能够提供更灵活的解决方案,允许使用不同的程式语言来实作 Worker。
安装
要安装 MessageWorkerPool 套件,请使用以下 NuGet 指令:
PM > Install-Package MessageWorkerPool
若要手动安装此函式库,可克隆储存库并建置专案:
git clone https://github.com/isdaniel/MessageWorkerPool.git
cd MessageWorkerPool
dotnet build
架构概览
快速开始
这是部署 RabbitMQ 和相关服务的快速开始指南,使用提供的 docker-compose.yml 档案和 .env 中的环境变数。
docker-compose --env-file .\\env\\.env up --build -d
- 使用者名称: guest
- 密码: guest
- 使用者名称: admin
- 密码: test.123
程式结构
以下是创建并配置与 RabbitMQ 互动的 workerpool 的范例程式码。以下是其功能的解析:workerpool 将根据您的 RabbitMqSetting 设定从 RabbitMQ 伺服器获取讯息,并通过 Process.StandardInput 将讯息传递给用户创建的真实 worker node
public class Program
{
public static async Task Main(string[] args)
{
CreateHostBuilder(args).Build().Run();
}
public static IHostBuilder CreateHostBuilder(string[] args) =>
Host.CreateDefaultBuilder(args)
.ConfigureLogging(logging =>
{
logging.ClearProviders();
logging.AddConsole(options => {
options.FormatterName = ConsoleFormatterNames.Simple;
});
logging.Services.Configure<SimpleConsoleFormatterOptions>(options => {
options.IncludeScopes = true;
options.TimestampFormat = " yyyy-MM-dd HH:mm:ss ";
});
}).AddRabbitMqWorkerPool(new RabbitMqSetting
{
UserName = Environment.GetEnvironmentVariable("USERNAME") ?? "guest",
Password = Environment.GetEnvironmentVariable("PASSWORD") ?? "guest",
HostName = Environment.GetEnvironmentVariable("RABBITMQ_HOSTNAME"),
Port = ushort.TryParse(Environment.GetEnvironmentVariable("RABBITMQ_PORT"), out ushort p) ? p : (ushort) 5672,
PrefetchTaskCount = 3
}, new WorkerPoolSetting() { WorkerUnitCount = 9, CommandLine = "dotnet", Arguments = @"./ProcessBin/WorkerProcessSample.dll", QueueName = Environment.GetEnvironmentVariable("QUEUENAME"), }
);
}
worker process 与 workerPool 之间的协议
worker node 与任务进程之间的协议使用 MessagePack 二进制格式来进行更快且更小的资料传输,标準输入将发送信号来控制 worker process。
一开始 workerPool 将通过标準输入传递 NamedPipe 名称,因此 worker node 需要接收该名称并建立 worker process 和 workerPool 之间的 NamedPipe。
workerPool 发送的操作指令
目前,workerPool将通过标準输入向 worker process 发送操作信号或指令。
- CLOSED_SIGNAL (__quit__): 代表 workerPool 发送关闭或关机信号给 worker node,worker process 应尽快执行优雅关机。通过 (Data Named Pipe Stream) 进行资料传输命名管道是一种强大的进程间通信 (IPC) 机制,它允许两个或更多的进程之间进行通信,即使它们运行在不同的机器上(例如 Windows 等支持的平台)。我们的 worker 使用此方式在 worker node 与 workerPool 之间传输资料。
msgpack 协议支持的资料类型如下类别与 byte[] 格式。
对应的 byte[] 资料是:
[132,161,48,179,78,101,119,32,79,117,116,80,117,116,32,77,101,115,115,97,103,101,33,161,49,204,200,161,50,129,164,116,101,115,116,167,116,101,115,116,118,97,108,161,51,169,116,101,115,116,81,117,101,117,101]
要将提供的伪 JSON 结构表示为 MsgPack 格式(byte[]),我们可以分解过程如下:
Edit
{
"0": "New OutPut Message!",
"1": 200,
"2": {
"test": "testval"
},
"3": "testQueue"
}
更多资讯,您可以使用 msgpack-converter 来解码和编码。
/// <summary>
/// 封装来自 MQ 服务的讯息
/// </summary>
[MessagePackObject]
public class MessageOutputTask
{
/// <summary>
/// 来自进程的输出讯息
/// </summary>
[Key("0")]
public string Message { get; set; }
[Key("1")]
public MessageStatus Status { get; set; }
/// <summary>
/// 我们希望储存的回应资讯以便继续执行讯息。
/// </summary>
[Key("2")]
[MessagePackFormatter(typeof(PrimitiveObjectResolver))]
public IDictionary<string, object> Headers { get; set; }
/// <summary>
/// 预设使用 BasicProperties.Reply To 队列名称,任务处理器可以覆写回应队列名称。
/// </summary>
/// <value>预设使用 BasicProperties.Reply</value>
[Key("3")]
public string ReplyQueueName { get; set; }
}
我将在此介绍 MessageStatus 的含义。
- IGNORE_MESSAGE (-1) : 将讯息附加到资料流管道中,而不进行进一步处理。
- Status = -1: 任务处理告诉 worker process 这不是回应或确认讯息,只是回馈到资料流管道。
- MESSAGE_DONE (200) : 通知 worker process 该案件可以由讯息队列服务进行确认。
- Status = 200 任务处理告诉 worker process 该任务已完成并且可以确认。
- MESSAGE_DONE_WITH_REPLY (201) : 请确保我们满足以下步骤以支援 RPC。
- 客户端代码必须提供 ReplyTo 资讯。
- 任务处理将使用 JSON 负载中的 Message 栏位来回应队列资讯。
- 例如:当 Status = 201 透过资料流管道发送时,任务处理指示 worker process 输出,例如 1010,该数据必须然后发送到回应队列。
我们可以通过不同的程式语言来编写自己的 worker node (我已经在此 github 提供了 Python, .NET, rust example code)。
如何处理长时间运行的任务或涉及处理大量数据行的任务?
类似于操作系统中的进程,发生上下文切换(中断等)。
客户端可以通过 Header 发送一个 TimeoutMilliseconds 值:在取消之前等待的时间(毫秒)。如果任务执行超过该值,worker process 可以使用该值来设置中断,例如 CancellationToken。
例如,MessageOutputTask 的 JSON 可以如下所示,status=201 代表此讯息将重新入队以便下次处理,并且讯息将携带 Headers 资讯再次重新入队。
{
"Message": "This is Mock Json Data",
"Status": 201,
"Headers": {
"CreateTimestamp": "2025-01-01T14:35:00Z",
"PreviousProcessingTimestamp": "2025-01-01T14:40:00Z",
"Source": "OrderProcessingService",
"PreviousExecutedRows": "123",
"RequeueTimes": "3"
}
}
此专案还包括 integration、unit test 和 github action pipeline。虽然 API 文件(专案仍在 beta 阶段),但我计划在未来逐步添加。如果您对此专案有任何想法或建议,请随时创建问题或发送 PR。