在 C# .NET 中使用管道

Using Channels In C# .NET

这篇文章是关于 C# .NET 中 Channel 的系列文章的一部分。
当然,从第 1 部分开始总是更好,但您可以使用下面的链接跳过任何您想要的地方。

第 1 部分 – 入门
第 2 部分 – 高级管道
第 3 部分 – 了解Back Pressure背压

我最近一直在玩 .NET Core 3.X 中引入的新 Channel<T> 类型。我想我在它第一次发布时(连同管道)就玩弄了它,但是文档非常稀少,我无法理解它们与任何其他队列有什么不同。

和他们玩过之后,我终于看到了他们所拥有的吸引力和真正的力量。最值得注意的是大型异步后台操作,需要几乎两种方式的通信来同步他们正在做的事情。这句话有点拗口,但希望在本系列结束时,您应该清楚何时应该使用 Channel<T>,以及何时应该使用更基本的东西,例如 Queue<T>。

什么是管道?

从本质上讲,Channel 本质上是 .NET 中的一种新集合类型,它的行为非常类似于现有的 Queue<T> 类型(它是 ConcurrentQueue 的兄弟),但具有额外的好处。我在真正尝试研究该主题时发现的问题是,许多现有的外部队列技术(IBM MQ、Rabbit MQ 等)都有“通道”的概念,它们的范围从将其描述为完全抽象的思维过程与实际的他们系统中的物理类型。

现在也许我在这里完全偏离了基础,但是如果您将 .NET 中的 Channel 视为简单的队列,并在其周围附加逻辑以允许它等待新消息,请告诉生产者暂停,因为队列正在获取大而消费者跟不上,以及出色的线程安全支持,我认为很难出错。

现在我在这里提到了一个关键字,生产者/消费者。您可能以前听说过它,它是 Pub/Sub 的兄弟。它们不可互换。

Pub/Sub 描述了某人发布消息的行为,以及一个或多个“订阅者”收听该消息并对其采取行动。没有负载分配,因为当您添加订阅者时,他们基本上会获得与其他所有人相同的消息的副本。

在图表形式中,Pub/Sub 看起来有点像这样:

生产者/消费者描述了生产者发布消息的行为,并且有一个或多个消费者可以对该消息采取行动,但每条消息只能读取一次。它不会复制到每个订阅者。

当然还有图表形式:

考虑生产者/消费者的另一种方式是考虑你去超市结账。随着客户尝试结账并且队列变长,您可以简单地打开更多结账来处理这些客户。这个小小的思考过程实际上很重要,因为如果您无法再打开结帐会发生什么?队列是否应该越来越长?如果结账操作员坐在那里但没有顾客怎么办?他们应该把它打包回家,还是应该告诉他们只是坐下来等到有顾客。

这通常被称为生产者-消费者问题,也是 Channels 旨在解决的问题。

基本管道示例

与 Channels 相关的一切都存在于 System.Threading.Channels 中。在更高版本中,这似乎与您的标准 .NET Core 项目捆绑在一起,但如果没有,这里会有一个 nuget 包:https ://www.nuget.org/packages/System.Threading.Channels 。

一个非常简单的频道示例如下所示:

static async Task Main(string[] args)
{
    var myChannel = Channel.CreateUnbounded();

    for(int i=0; i < 10; i++)
    {
        await myChannel.Writer.WriteAsync(i);
    }

    while(true)
    {
        var item = await myChannel.Reader.ReadAsync();
        Console.WriteLine(item);
    }
}

这里没有一大堆要谈的。我们创建了一个“无界”通道(这意味着它可以容纳无限项,但在本系列中会进一步介绍)。我们写入 10 项并读取 10 项,此时它与我们在 .NET 中看到的任何其他队列没有太大不同。

管道是线程安全的

没错,Channels 是线程安全的。这意味着多个线程可以毫无问题地读取/写入同一通道。如果我们在这里查看 Channels 源代码,我们可以看到它是线程安全的,因为它使用锁和内部“队列”的组合来同步读取器/写入器以一个接一个地读取/写入。

实际上,Channels 的预期用例是多线程场景。例如,如果我们从上面获取我们的基本代码,当我们实际上不需要它时,维护我们的线程安全性实际上会有一些开销。因此,在这种情况下,我们最好只使用 Queue<T>。但是这段代码呢?

static async Task Main(string[] args)
{
    var myChannel = Channel.CreateUnbounded();

    _ = Task.Factory.StartNew(async () =>
    {
        for (int i = 0; i < 10; i++)
        {
            await myChannel.Writer.WriteAsync(i);
            await Task.Delay(1000);
        }
    });

    while(true)
    {
        var item = await myChannel.Reader.ReadAsync();
        Console.WriteLine(item);
    }
}

在这里,我们有一个单独的线程将消息泵入,而我们的主线程将消息读出。您会注意到有趣的事情是我们在消息之间添加了延迟。那么为什么我们可以调用 ReadAsync() 和事情只是……。工作?没有 TryDequeue 或 Dequeue,如果队列中没有消息,它会运行 null 对吗?

答案是 Channel Reader 的“ReadAsync()”方法实际上会*wait* 等待消息(但不会*block*)。因此,您在等待消息时不需要做一些可笑的紧密循环,也不需要在等待时完全阻塞线程。我们将在接下来的帖子中更多地讨论这个问题,但只知道您可以使用 ReadAsync 基本上等待新消息通过,而不是编写一些自定义的紧密缠绕的代码来做同样的事情。

下一步是什么?

现在您已经掌握了基础知识,让我们看看使用 Channels 的一些更高级的场景。

第 2 部分 – 高级通道

在我们之前的文章中,我们查看了一些关于 Channels 如何工作的简单示例,我们看到了一些非常漂亮的功能,但在大多数情况下,它与任何其他 XQueue 实现非常相似。因此,让我们深入探讨一些更高级的主题。好吧..我说的是先进的,但其中很多都非常简单。这可能读起来像是一个贯穿始终的功能,但有很多值得喜爱的地方!

分离读/写问题

如果你曾经在两个类之间共享一个队列,你就会知道任何一个类都可以读/写,即使它们不应该这样做。例如 :

class MyProducer
{
    private readonly Queue<int> _queue;

    public MyProducer(Queue<int> queue)
    {
        _queue = queue;
    }
}

class MyConsumer
{
    private readonly Queue<int> _queue;

    public MyConsumer(Queue<int> queue)
    {
        _queue = queue;
    }
}

因此,虽然 Producer 应该只写入队列,而 Consumer 应该只读取,但在这两种情况下,它们都可以对队列执行所有操作。虽然您可能在自己的脑海中希望消费者只阅读,但另一个开发人员可能会出现并非常高兴地开始调用 Enqueue,除了代码审查之外别无他法来阻止他们犯这个错误。

但是有了 Channels,我们可以做不同的事情。

class Program
{
    static async Task Main(string[] args)
    {
        var myChannel = Channel.CreateUnbounded<int>();
        var producer = new MyProducer(myChannel.Writer);
        var consumer = new MyConsumer(myChannel.Reader);
    }
}

class MyProducer
{
    private readonly ChannelWriter<int> _channelWriter;

    public MyProducer(ChannelWriter<int> channelWriter)
    {
        _channelWriter = channelWriter;
    }
}

class MyConsumer
{
    private readonly ChannelReader<int> _channelReader;

    public MyConsumer(ChannelReader<int> channelReader)
    {
        _channelReader = channelReader;
    }
}

在这个示例中,我添加了一个 main 方法来向您展示编写器/读取器的创建是如何发生的,但这非常简单。所以在这里我们可以看到,对于我们的Producer,我只给它传递了一个ChannelWriter,所以它只能做写操作。对于我们的消费者,我们给它传递了一个 ChannelReader,所以它只能读取。

当然,这并不意味着其他开发人员不能仅仅修改代码并开始注入根 Channel 对象,或者同时传入 ChannelWriter/ChannelReader,但它至少比代码的意图要好得多。

完成一个频道

我们之前看到,当我们在通道上调用 ReadAsync() 时,它实际上会坐在那里等待消息,但是如果没有更多消息到来怎么办?也许这是一次批处理作业,并且批处理已完成。通常对于 .NET 中的其他队列,我们​​必须传递某种共享布尔值和/或 CancellationToken。但有了 Channels,它就更容易了。

考虑以下 :

static async Task Main(string[] args)
{
    var myChannel = Channel.CreateUnbounded<int>();

    _ = Task.Factory.StartNew(async () =>
    {
        for (int i = 0; i < 10; i++)
        {
            await myChannel.Writer.WriteAsync(i);
        }

        myChannel.Writer.Complete();
    });

    try
    {
        while (true)
        {
            var item = await myChannel.Reader.ReadAsync();
            Console.WriteLine(item);
            await Task.Delay(1000);
        }
    }catch(ChannelClosedException e)
    {
        Console.WriteLine("Channel was closed!");
    }
}

我已经做到了,以便我们的第二个线程尽可能快地写入我们的频道,然后完成它。然后我们的阅读器慢慢阅读,阅读之间有 1 秒的延迟。请注意,我们捕获了 ChannelClosedExecption,当您尝试在最终消息之后*从关闭的通道中读取时调用它。

我只想说清楚。在通道上调用 Complete() 不会立即关闭通道并杀死所有读取它的人。相反,它是一种告诉任何读者,一旦阅读完最后一条消息,我们就完成了的方式。这很重要,因为这意味着在等待新项目、队列为空、已满等时是否调用 Complete() 并不重要。我们可以确定我们将完成所有可用的工作然后完成.

将 IAsyncEnumerable 与通道一起使用

如果我们在尝试关闭频道时以我们的例子为例,有两件事对我来说很突出。

  1. 我们有一个 while(true) 循环。这并没有那么糟糕,但它有点碍眼。
  2. 为了跳出这个循环,并且知道通道已经完成,我们必须捕获一个异常并基本上吞下它。

这些问题可以使用返回 IAsyncEnumerable 的命令“ReadAllAsync()”来解决(更多关于IAsyncEnumerable 的工作原理在这里)。代码看起来有点像这样:

static async Task Main(string[] args)
{
    var myChannel = Channel.CreateUnbounded<int>();

    _ = Task.Factory.StartNew(async () =>
    {
        for (int i = 0; i < 10; i++)
        {
            await myChannel.Writer.WriteAsync(i);
        }

        myChannel.Writer.Complete();
    });

    await foreach(var item in myChannel.Reader.ReadAllAsync())
    {
        Console.WriteLine(item);
        await Task.Delay(1000);
    }
}

现在代码读起来好多了,并删除了一些围绕捕获异常的额外垃圾。因为我们使用的是 IAsyncEnumerable,我们仍然可以像以前一样等待每个项目,但我们不再需要捕获异常,因为当通道完成时,它只是说它没有更多内容并且循环退出。

同样,这消除了您过去在处理队列时必须编写的一些杂乱代码。以前您必须编写某种带有 breakout 子句的无限循环,现在它只是一个真正整洁的循环,可以处理引擎盖下的所有内容。

下一步是什么

到目前为止,我们一直在使用“无界”渠道。正如您可能已经猜到的那样,当然可以选择使用 BoundedChannel。但这是什么?“背压”一词与它有何关系?查看本系列的下一部分,以更好地了解背压。

第 3 部分 – 了解背压

到目前为止,我们一直在使用所谓的“无界”通道。当我们创建频道时,您会注意到它,我们会这样做:

var myChannel = Channel.CreateUnbounded<int>();

但实际上,我们可以这样做:

var myChannel = Channel.CreateBounded<int>(1000);

这与创建另一个集合类型(例如具有有限容量的 List 或 Array)并没有太大区别。在我们的示例中,我们创建了一个最多可容纳 1000 个项目的通道。但为什么要限制自己呢?嗯..这就是背压的用武之地。

什么是背压?

计算术语中的背压(特别是在消息传递/队列方面)是指资源(无论是内存、内存、网络容量还是例如所需外部 API 的 API 速率限制)是有限的。而且我们应该能够向链条施加“压力”,以尝试减轻一些负担。至少,让生态系统中的其他人知道我们正在承受负载,我们可能需要一些时间来处理他们的请求。

一般来说,当我们谈论队列的背压时。几乎普遍地,我们都在谈论一种方法来告诉任何试图在队列中添加更多项目的人,要么他们根本无法将更多项目加入队列,要么他们需要退出一段时间。更罕见的是,我们谈论的是队列在达到一定容量后纯粹丢弃消息。这些情况很少见(因为通常您不希望消息简单地死掉),但我们确实可以选择。

那么,.NET 频道如何工作呢?

通道的背压选项

在使用 Channels 时,我们实际上有一个非常简单的方法来增加背压。代码如下所示:

var channelOptions = new BoundedChannelOptions(5)
{
    FullMode = BoundedChannelFullMode.Wait
};

var myChannel = Channel.CreateBounded<int>(channelOptions);

我们可以指定以下完整模式:

等待
只需让调用者在打开 WriteAsync() 调用之前等待。

DropNewest/DropOldest
删除频道中最旧或最新的项目,以便为我们要添加的项目腾出空间。

DropWrite
简单地转储我们应该写的消息。

您还应该注意两段额外的代码。

您可以调用 WaitToWriteAsync() :

await myChannel.Writer.WaitToWriteAsync();

这让我们“等待”通道的有界限制。例如,当通道已满时,我们可以调用它来简单地等待直到有空间。这意味着即使打开了 DropWrite FullMode,我们也可以通过简单地等待直到有容量来限制我们在地面上丢弃的消息数量。

我们应该注意的另一段代码是:

var success = myChannel.Writer.TryWrite(i);

这允许我们尝试写入队列,并返回是否成功。请务必注意,此方法不是异步的。要么我们可以写信给频道,要么没有“好吧..如果你再等一会儿你也许可以”。