简介

最近我开发了 MessageWorkerPool 专案。其主要概念是提供一个平台框架,使使用者能够快速且轻鬆地在 Worker 内实作逻辑。该设计高度灵活,允许基于我创建的 Worker 通讯协议,以多种程式语言实作 Worker。目前,我已提供使用 C#、Rust 和 Python 编写的 Worker 范例。

这个函式库在多进程环境中处理任务表现优异。此外,它还支援优雅关闭 (graceful shutdown),确保在随时 consumer worker 能顺利终止处理程序。

MessageWorkerPool GitHub

为什么选择 ProcessPool 而非 ThreadPool ?

当你需要强大的隔离性,以防止某个任务影响其他任务时,应该选择 ProcessPool,特别是针对关键操作或容易崩溃的任务。虽然 ThreadPool 较为轻量(因为执行绪共用记忆体并且具有较低的上下文切换开销),但 ProcessPool 能够提供更灵活的解决方案,允许使用不同的程式语言来实作 Worker。

安装

要安装 MessageWorkerPool 套件,请使用以下 NuGet 指令:

PM > Install-Package MessageWorkerPool

若要手动安装此函式库,可克隆储存库并建置专案:

git clone https://github.com/isdaniel/MessageWorkerPool.git
cd MessageWorkerPool
dotnet build

架构概览

快速开始

这是部署 RabbitMQ 和相关服务的快速开始指南,使用提供的 docker-compose.yml 档案和 .env 中的环境变数。

docker-compose --env-file .\\env\\.env up --build -d

  • 检查 RabbitMQ 健康状态:在浏览器中开启 http://localhost:8888 以访问 RabbitMQ 管理面板。
    • 使用者名称: guest
    • 密码: guest
  • 检查 OrleansDashboard http://localhost:8899
    • 使用者名称: admin
    • 密码: test.123

    程式结构

    以下是创建并配置与 RabbitMQ 互动的 workerpool 的范例程式码。以下是其功能的解析:workerpool 将根据您的 RabbitMqSetting 设定从 RabbitMQ 伺服器获取讯息,并通过 Process.StandardInput 将讯息传递给用户创建的真实 worker node

    public class Program
    {
    public static async Task Main(string[] args)
    {
    CreateHostBuilder(args).Build().Run();
    }

    public static IHostBuilder CreateHostBuilder(string[] args) =>
    Host.CreateDefaultBuilder(args)
    .ConfigureLogging(logging =>
    {
    logging.ClearProviders();
    logging.AddConsole(options => {
    options.FormatterName = ConsoleFormatterNames.Simple;
    });
    logging.Services.Configure<SimpleConsoleFormatterOptions>(options => {
    options.IncludeScopes = true;
    options.TimestampFormat = " yyyy-MM-dd HH:mm:ss ";
    });
    }).AddRabbitMqWorkerPool(new RabbitMqSetting
    {
    UserName = Environment.GetEnvironmentVariable("USERNAME") ?? "guest",
    Password = Environment.GetEnvironmentVariable("PASSWORD") ?? "guest",
    HostName = Environment.GetEnvironmentVariable("RABBITMQ_HOSTNAME"),
    Port = ushort.TryParse(Environment.GetEnvironmentVariable("RABBITMQ_PORT"), out ushort p) ? p : (ushort) 5672,
    PrefetchTaskCount = 3
    }, new WorkerPoolSetting() { WorkerUnitCount = 9, CommandLine = "dotnet", Arguments = @"./ProcessBin/WorkerProcessSample.dll", QueueName = Environment.GetEnvironmentVariable("QUEUENAME"), }
    );

    }

    worker process 与 workerPool 之间的协议

    worker node 与任务进程之间的协议使用 MessagePack 二进制格式来进行更快且更小的资料传输,标準输入将发送信号来控制 worker process。

    一开始 workerPool 将通过标準输入传递 NamedPipe 名称,因此 worker node 需要接收该名称并建立 worker process 和 workerPool 之间的 NamedPipe。

    workerPool 发送的操作指令

    目前,workerPool将通过标準输入向 worker process 发送操作信号或指令。

    • CLOSED_SIGNAL (__quit__): 代表 workerPool 发送关闭或关机信号给 worker node,worker process 应尽快执行优雅关机。通过 (Data Named Pipe Stream) 进行资料传输命名管道是一种强大的进程间通信 (IPC) 机制,它允许两个或更多的进程之间进行通信,即使它们运行在不同的机器上(例如 Windows 等支持的平台)。我们的 worker 使用此方式在 worker node 与 workerPool 之间传输资料。

    msgpack 协议支持的资料类型如下类别与 byte[] 格式。

    对应的 byte[] 资料是:

    [132,161,48,179,78,101,119,32,79,117,116,80,117,116,32,77,101,115,115,97,103,101,33,161,49,204,200,161,50,129,164,116,101,115,116,167,116,101,115,116,118,97,108,161,51,169,116,101,115,116,81,117,101,117,101]

    要将提供的伪 JSON 结构表示为 MsgPack 格式(byte[]),我们可以分解过程如下:

    Edit
    {
    "0": "New OutPut Message!",
    "1": 200,
    "2": {
    "test": "testval"
    },
    "3": "testQueue"
    }

    更多资讯,您可以使用 msgpack-converter 来解码和编码。

    /// <summary>
    /// 封装来自 MQ 服务的讯息
    /// </summary>
    [MessagePackObject]
    public class MessageOutputTask
    {
    /// <summary>
    /// 来自进程的输出讯息
    /// </summary>
    [Key("0")]
    public string Message { get; set; }
    [Key("1")]
    public MessageStatus Status { get; set; }
    /// <summary>
    /// 我们希望储存的回应资讯以便继续执行讯息。
    /// </summary>
    [Key("2")]
    [MessagePackFormatter(typeof(PrimitiveObjectResolver))]
    public IDictionary<string, object> Headers { get; set; }
    /// <summary>
    /// 预设使用 BasicProperties.Reply To 队列名称,任务处理器可以覆写回应队列名称。
    /// </summary>
    /// <value>预设使用 BasicProperties.Reply</value>
    [Key("3")]
    public string ReplyQueueName { get; set; }
    }

    我将在此介绍 MessageStatus 的含义。

    • IGNORE_MESSAGE (-1) : 将讯息附加到资料流管道中,而不进行进一步处理。
      • Status = -1: 任务处理告诉 worker process 这不是回应或确认讯息,只是回馈到资料流管道。
    • MESSAGE_DONE (200) : 通知 worker process 该案件可以由讯息队列服务进行确认。
      • Status = 200 任务处理告诉 worker process 该任务已完成并且可以确认。
    • MESSAGE_DONE_WITH_REPLY (201) : 请确保我们满足以下步骤以支援 RPC。
      • 客户端代码必须提供 ReplyTo 资讯。
      • 任务处理将使用 JSON 负载中的 Message 栏位来回应队列资讯。
      • 例如:当 Status = 201 透过资料流管道发送时,任务处理指示 worker process 输出,例如 1010,该数据必须然后发送到回应队列。

    我们可以通过不同的程式语言来编写自己的 worker node (我已经在此 github 提供了 Python, .NET, rust example code)。

    如何处理长时间运行的任务或涉及处理大量数据行的任务?

    类似于操作系统中的进程,发生上下文切换(中断等)。

    客户端可以通过 Header 发送一个 TimeoutMilliseconds 值:在取消之前等待的时间(毫秒)。如果任务执行超过该值,worker process 可以使用该值来设置中断,例如 CancellationToken。

    例如,MessageOutputTask 的 JSON 可以如下所示,status=201 代表此讯息将重新入队以便下次处理,并且讯息将携带 Headers 资讯再次重新入队。

    {
    "Message": "This is Mock Json Data",
    "Status": 201,
    "Headers": {
    "CreateTimestamp": "2025-01-01T14:35:00Z",
    "PreviousProcessingTimestamp": "2025-01-01T14:40:00Z",
    "Source": "OrderProcessingService",
    "PreviousExecutedRows": "123",
    "RequeueTimes": "3"
    }
    }

    此专案还包括 integration、unit test 和 github action pipeline。虽然 API 文件(专案仍在 beta 阶段),但我计划在未来逐步添加。如果您对此专案有任何想法或建议,请随时创建问题或发送 PR。