C#语言async await工作原理示例解析

来自:网络
时间:2023-07-24
阅读:
目录

正文

前不久,我们发布了《选择 .NET 的 n 个理由》。它提供了对平台的高层次概述,总结了各种组件和设计决策,并承诺对所涉及的领域发表更深入的文章。这是第一篇这样深入探讨 C# 和 .NET 中 async/await 的历史、背后的设计决策和实现细节的文章。

对 async/await 的支持已经存在了十年之久。在这段时间里,它改变了为 .NET 编写可扩展代码的方式,而在不了解其底层逻辑的情况下使用该功能是可行的,也是非常常见的。在这篇文章中,我们将深入探讨 await 在语言、编译器和库级别的工作原理,以便你可以充分利用这些有价值的功能。

不过,要做到这一点,我们需要追溯到 async/await 之前,以了解在没有它的情况下最先进的异步代码是什么样子的。

最初的样子

早在 .NET Framework 1.0中,就有异步编程模型模式,又称 APM 模式、Begin/End 模式、IAsyncResult 模式。在高层次上,该模式很简单。对于同步操作 DoStuff:

class Handler
{
    public int DoStuff(string arg);
}

作为模式的一部分,将有两个相应的方法:BeginDoStuff 方法和 EndDoStuff 方法:

class Handler
{
    public int DoStuff(string arg);
    public IAsyncResult BeginDoStuff(string arg, AsyncCallback? callback, object? state);
    public int EndDoStuff(IAsyncResult asyncResult);
}

BeginDoStuff 会像 DoStuff 一样接受所有相同的参数,但除此之外,它还会接受 AsyncCallback 委托和一个不透明的状态对象,其中一个或两个都可以为 null。Begin 方法负责初始化异步操作,如果提供了回调(通常称为初始操作的“延续”),它还负责确保在异步操作完成时调用回调。Begin 方法还将构造一个实现了 IAsyncResult 的类型实例,使用可选状态填充 IAsyncResult 的 AsyncState 属性:

namespace System
{
    public interface IAsyncResult
    {
        object? AsyncState { get; }
        WaitHandle AsyncWaitHandle { get; }
        bool IsCompleted { get; }
        bool CompletedSynchronously { get; }
    }
    public delegate void AsyncCallback(IAsyncResult ar);
}

然后,这个 IAsyncResult 实例将从 Begin 方法返回,并在最终调用 AsyncCallback 时传递给它。当准备使用操作的结果时,调用者将把 IAsyncResult 实例传递给 End 方法,该方法负责确保操作已完成(如果没有完成,则通过阻塞同步等待操作完成),然后返回操作的任何结果,包括传播可能发生的任何错误和异常。因此,不用像下面这样写代码来同步执行操作:

try
{
    int i = handler.DoStuff(arg);
    Use(i);
}
catch (Exception e)
{
    ... // handle exceptions from DoStuff and Use
}

可以按以下方式使用 Begin/End 方法异步执行相同的操作:

try
{
    handler.BeginDoStuff(arg, iar =>
    {
        try
        {
            Handler handler = (Handler)iar.AsyncState!;
            int i = handler.EndDoStuff(iar);
            Use(i);
        }
        catch (Exception e2)
        {
            ... // handle exceptions from EndDoStuff and Use
        }
    }, handler);
}
catch (Exception e)
{
    ... // handle exceptions thrown from the synchronous call to BeginDoStuff
}

对于在任何语言中处理过基于回调的 API 的人来说,这应该感觉很熟悉。

然而,事情从此变得更加复杂。例如,有一个"stack dives"的问题。stack dives 是指代码反复调用,在堆栈中越陷越深,以至于可能出现堆栈溢出。如果操作同步完成,Begin 方法被允许同步调用回调,这意味着对 Begin 的调用本身可能直接调用回调。同步完成的 "异步 "操作实际上是很常见的;它们不是 "异步",因为它们被保证异步完成,而只是被允许这样做。

这是一种真实的可能性,很容易再现。

.NET Core 运行

在 .NET Core 上试试这个程序

using System.NET;
using System.NET.Sockets;
using Socket listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
listener.Bind(new IPEndPoint(IPAddress.Loopback, 0));
listener.Listen();
using Socket client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
client.Connect(listener.LocalEndPoint!);
using Socket server = listener.Accept();
_ = server.SendAsync(new byte[100_000]);
var mres = new ManualResetEventSlim();
byte[] buffer = new byte[1];
var stream = new NetworkStream(client);
void ReadAgain()
{
    stream.BeginRead(buffer, 0, 1, iar =>
    {
        if (stream.EndRead(iar) != 0)
        {
            ReadAgain(); // uh oh!
        }
        else
        {
            mres.Set();
        }
    }, null);
};
ReadAgain();
mres.Wait();

在这里,我设置了一个相互连接的简单客户端套接字和服务器套接字。服务器向客户端发送100,000字节,然后客户端继续使用 BeginRead/EndRead 来“异步”地每次读取一个字节。传给 BeginRead 的回调函数通过调用 EndRead 来完成读取,然后如果它成功读取了所需的字节,它会通过递归调用 ReadAgain 局部函数来发出另一个 BeginRead。然而,在 .NET Core 中,套接字操作比在 .NET Framework 上快得多,并且如果操作系统能够满足同步操作,它将同步完成(注意内核本身有一个缓冲区用于满足套接字接收操作)。因此,这个堆栈会溢出:

C#语言async await工作原理示例解析

因此,APM 模型中内置了补偿机制。有两种可能的方法可以弥补这一点:

1.不要允许 AsyncCallback 被同步调用。如果一直异步调用它,即使操作以同步方式完成,那么 stack dives 的风险也会消失。但是性能也是如此,因为同步完成的操作(或者快到无法观察到它们的区别)是非常常见的,强迫每个操作排队回调会增加可测量的开销。

2.使用一种机制,允许调用方而不是回调方在操作同步完成时执行延续工作。这样,您就可以避开额外的方法框架,继续执行后续工作,而不深入堆栈。

APM 模式与方法2一起使用。为此,IAsyncResult 接口公开了两个相关但不同的成员:IsCompleted 和 CompletedSynchronously。IsCompleted 告诉你操作是否已经完成,可以多次检查它,最终它会从 false 转换为 true,然后保持不变。相比之下,CompletedSynchronously 永远不会改变(如果改变了,那就是一个令人讨厌的 bug)。它用于 Begin 方法的调用者和 AsyncCallback 之间的通信,他们中的一个负责执行任何延续工作。如果 CompletedSynchronously 为 false,则操作是异步完成的,响应操作完成的任何后续工作都应该留给回调;毕竟,如果工作没有同步完成,Begin 的调用方无法真正处理它,因为还不知道操作已经完成(如果调用方只是调用 End,它将阻塞直到操作完成)。然而,如果 CompletedSynchronously 为真,如果回调要处理延续工作,那么它就有 stack dives 的风险,因为它将在堆栈上执行比开始时更深的延续工作。因此,任何涉及到这种堆栈潜水的实现都需要检查 CompletedSynchronously,并让 Begin 方法的调用者执行延续工作(如果它为真),这意味着回调不需要执行延续工作。这也是 CompletedSynchronously 永远不能更改的原因,调用方和回调方需要看到相同的值,以确保不管竞争条件如何,延续工作只执行一次。

我们都习惯了现代语言中的控制流结构为我们提供的强大和简单性,一旦引入了任何合理的复杂性,而基于回调的方法通常会与这种结构相冲突。其他主流语言也没有更好的替代方案。

我们需要一种更好的方法,一种从 APM 模式中学习的方法,融合它正确的东西,同时避免它的陷阱。值得注意的是,APM 模式只是一种模式。运行时间、核心库和编译器在使用或实现该模式时并没有提供任何帮助。

基于事件的异步模式

.NET Framework 2.0引入了一些 API,实现了处理异步操作的不同模式,这种模式主要用于在客户端应用程序上下文中处理异步操作。这种基于事件的异步模式或 EAP 也作为一对成员出现,这次是一个用于初始化异步操作的方法和一个用于侦听其完成的事件。因此,我们之前的 DoStuff 示例可能被公开为一组成员,如下所示:

class Handler
{
    public int DoStuff(string arg);
    public void DoStuffAsync(string arg, object? userToken);
    public event DoStuffEventHandler? DoStuffCompleted;
}
public delegate void DoStuffEventHandler(object sender, DoStuffEventArgs e);
public class DoStuffEventArgs : AsyncCompletedEventArgs
{
    public DoStuffEventArgs(int result, Exception? error, bool canceled, object? userToken) :
        base(error, canceled, usertoken) => Result = result;
    public int Result { get; }
}

你需要用 DoStuffCompleted 事件注册你的后续工作,然后调用 DoStuffAsync 方法;它将启动该操作,并且在该操作完成时,调用者将异步地引发 DoStuffCompleted 事件。然后,处理程序可以继续执行后续工作,可能会验证所提供的 userToken 与它所期望的进行匹配,从而允许多个处理程序同时连接到事件。

这种模式使一些用例变得更简单,同时使其他用例变得更加困难(考虑到前面的 APM CopyStreamToStream 示例,这说明了一些问题)。它没有以广泛的方式推出,只是在一个单独的 .NET Framework 版本中匆匆的出现又消失了,尽管留下了它使用期间添加的 api,如 Ping.SendAsync/Ping.PingCompleted:

public class Ping : Component
{
    public void SendAsync(string hostNameOrAddress, object? userToken);
    public event PingCompletedEventHandler? PingCompleted;
    ...
}

然而,它确实取得了一个 APM 模式完全没有考虑到的显著进步,并且这一点一直延续到我们今天所接受的模型中: SynchronizationContext

考虑到像 Windows Forms 这样的 UI 框架。与 Windows 上的大多数 UI 框架一样,控件与特定的线程相关联,该线程运行一个消息泵,该消息泵运行能够与这些控件交互的工作,只有该线程应该尝试操作这些控件,而任何其他想要与控件交互的线程都应该通过发送消息由 UI 线程的泵消耗来完成操作。Windows 窗体使用 ControlBeginInvoke 等方法使这变得很容易,它将提供的委托和参数排队,由与该控件相关联的任何线程运行。因此,你可以这样编写代码:

private void button1_Click(object sender, EventArgs e)
{
    ThreadPool.QueueUserWorkItem(_ =>
    {
        string message = ComputeMessage();
        button1.BeginInvoke(() =>
        {
            button1.Text = message;
        });
    });
}

这将卸载在 ThreadPool 线程上完成的 ComputeMessage()工作(以便在处理 UI 的过程中保持 UI 的响应性),然后在工作完成时,将委托队列返回到与 button1 相关的线程,以更新 button1 的标签。这很简单,WPF 也有类似的东西,只是用它的 Dispatcher 类型:

private void button1_Click(object sender, RoutedEventArgs e){
ThreadPool.QueueUserWorkItem(_ =>
{
string message = ComputeMessage();
button1.Dispatcher.InvokeAsync(() =>
{
button1.Content = message;
});
});}

.NET MAUI 也有类似的功能。但如果我想把这个逻辑放到辅助方法中呢?

E.g.

// Call ComputeMessage and then invoke the update action to update controls.internal static void ComputeMessageAndInvokeUpdate(Action<string> update) { ... }

然后我可以这样使用它:

private void button1_Click(object sender, EventArgs e){
    ComputeMessageAndInvokeUpdate(message => button1.Text = message);}

但是如何实现 ComputeMessageAndInvokeUpdate,使其能够在这些应用程序中工作呢?是否需要硬编码才能了解每个可能的 UI 框架?这就是 SynchronizationContext 的魅力所在。我们可以这样实现这个方法:

internal static void ComputeMessageAndInvokeUpdate(Action<string> update){
    SynchronizationContext? sc = SynchronizationContext.Current;
    ThreadPool.QueueUserWorkItem(_ =>
    {
        string message = ComputeMessage();
        if (sc is not null)
        {
            sc.Post(_ => update(message), null);
        }
        else
        {
            update(message);
        }
    });}

它使用 SynchronizationContext 作为一个抽象,目标是任何“调度器”,应该用于回到与 UI 交互的必要环境。然后,每个应用程序模型确保它作为 SynchronizationContext.Current 发布一个 SynchronizationContext-derived 类型,去做 "正确的事情"。例如,Windows Forms 有这个:

public sealed class WindowsFormsSynchronizationContext : SynchronizationContext, IDisposable{
    public override void Post(SendOrPostCallback d, object? state) =>
        _controlToSendTo?.BeginInvoke(d, new object?[] { state });
    ...}

WPF 有这个:

public sealed class DispatcherSynchronizationContext : SynchronizationContext{
    public override void Post(SendOrPostCallback d, Object state) =>
        _dispatcher.BeginInvoke(_priority, d, state);
    ...}

ASP.NET 曾经有一个,它实际上并不关心工作在什么线程上运行,而是关心给定的请求相关的工作被序列化,这样多个线程就不会并发地访问给定的 HttpContext:

internal sealed class AspNetSynchronizationContext : AspNetSynchronizationContextBase{
    public override void Post(SendOrPostCallback callback, Object state) =>
        _state.Helper.QueueAsynchronous(() => callback(state));
    ...}

这也不限于这些主要的应用程序模型。例如,xunit 是一个流行的单元测试框架,是 .NET 核心存储库用于单元测试的框架,它也采用了多个自定义的 SynchronizationContext。例如,你可以允许并行运行测试,但限制允许并发运行的测试数量。这是如何实现的呢?通过 SynchronizationContext:

public class MaxConcurrencySyncContext : SynchronizationContext, IDisposable{
    public override void Post(SendOrPostCallback d, object? state)
    {
        var context = ExecutionContext.Capture();
        workQueue.Enqueue((d, state, context));
        workReady.Set();
    }}

MaxConcurrencySyncContext 的 Post 方法只是将工作排到自己的内部工作队列中,然后在它自己的工作线程上处理它,它根据所需的最大并发数来控制有多少工作线程。

这与基于事件的异步模式有什么联系?EAP 和 SynchronizationContext 是同时引入的,当异步操作被启动时,EAP 规定完成事件应该排队到当前任何 SynchronizationContext 中。为了稍微简化一下,System.ComponentModel 中也引入了一些辅助类型,尤其是 AsyncOperation 和 AsyncOperationManager。前者只是一个元组,封装了用户提供的状态对象和捕获的 SynchronizationContext,而后者只是作为一个简单的工厂来捕获并创建 AsyncOperation 实例。然后 EAP 实现将使用这些,例如 Ping.SendAsync 调用 AsyncOperationManager.CreateOperation 来捕获 SynchronizationContext。当操作完成时,AsyncOperation 的 PostOperationCompleted 方法将被调用,以调用存储的 SynchronizationContext 的 Post 方法。

我们需要比 APM 模式更好的东西,接下来出现的 EAP 引入了一些新的事务,但并没有真正解决我们面临的核心问题。我们仍然需要更好的东西。

输入任务

.NET Framework 4.0 引入了 System.Threading.Tasks.Task 类型。从本质上讲,Task 只是一个数据结构,表示某些异步操作的最终完成(其他框架将类似的类型称为“promise”或“future”)。创建 Task 是为了表示某些操作,然后当它表示的操作逻辑上完成时,结果存储到该 Task 中。但是 Task 提供的关键特性使它比 IAsyncResult 更有用,它在自己内部内置了 continuation 的概念。这一特性意味着您可以访问任何 Task,并在其完成时请求异步通知,由任务本身处理同步,以确保继续被调用,无论任务是否已经完成、尚未完成、还是与通知请求同时完成。为什么会有如此大的影响?如果你还记得我们对旧 APM 模式的讨论,有两个主要问题。

  • 你必须为每个操作实现一个自定义的 IAsyncResult 实现:没有内置的 IAsyncResult 实现,任何人都可以根据需要使用。
  • 在 Begin 方法被调用之前,你必须知道当它完成时要做什么。这使得实现组合器和其他用于消耗和组合任意异步实现的通用例程成为一个重大挑战。

现在,让我们更好地理解它的实际含义。我们先从几个字段开始:

class MyTask{
    private bool _completed;
    private Exception? _error;
    private Action<MyTask>? _continuation;
    private ExecutionContext? _ec;
    ...}

我们需要一个字段来知道任务是否完成(_completed),还需要一个字段来存储导致任务失败的任何错误(_error);如果我们还要实现一个通用的 MyTask<TResult>,那么也会有一个私有的 TResult _result 字段,用于存储操作的成功结果。到目前为止,这看起来很像我们之前自定义的 IAsyncResult 实现(当然,这不是巧合)。但是现在最重要的部分,是 _continuation 字段。在这个简单的实现中,我们只支持一个 continuation,但对于解释目的来说这已经足够了(真正的任务使用了一个对象字段,该字段可以是单个 continuation 对象,也可以是 continuation 对象的 List<>)。这是一个委托,将在任务完成时调用。

如前所述,与以前的模型相比,Task 的一个基本进步是能够在操作开始后提供延续工作(回调)。我们需要一个方法来做到这一点,所以让我们添加 ContinueWith:

public void ContinueWith(Action<MyTask> action){
    lock (this)
    {
        if (_completed)
        {
            ThreadPool.QueueUserWorkItem(_ => action(this));
        }
        else if (_continuation is not null)
        {
            throw new InvalidOperationException("Unlike Task, this implementation only supports a single continuation.");
        }
        else
        {
            _continuation = action;
            _ec = ExecutionContext.Capture();
        }
    }}

如果任务在 ContinueWith 被调用时已经被标记为完成,ContinueWith 只是排队执行委托。否则,该方法将存储该委托,以便在任务完成时可以排队继续执行(它还存储了一个叫做 ExecutionContext 的东西,然后在以后调用该委托时使用它)。

然后,我们需要能够将 MyTask 标记为完成,这意味着它所代表的异步操作已经完成。为此,我们将提供两个方法,一个用于标记完成(" SetResult "),另一个用于标记完成并返回错误(" SetException "):

public void SetResult() => Complete(null);
public void SetException(Exception error) => Complete(error);
private void Complete(Exception? error){
    lock (this)
    {
        if (_completed)
        {
            throw new InvalidOperationException("Already completed");
        }
        _error = error;
        _completed = true;
        if (_continuation is not null)
        {
            ThreadPool.QueueUserWorkItem(_ =>
            {
                if (_ec is not null)
                {
                    ExecutionContext.Run(_ec, _ => _continuation(this), null);
                }
                else
                {
                    _continuation(this);
                }
            });
        }
    }}

我们存储任何错误,将任务标记为已完成,然后如果之前已经注册了 continuation,则将其排队等待调用。

最后,我们需要一种方法来传播任务中可能发生的任何异常(并且,如果这是一个泛型 MyTask<T>,则返回其_result);为了方便某些情况,我们还允许此方法阻塞等待任务完成,这可以通过 ContinueWith 实现(continuation 只是发出 ManualResetEventSlim 信号,然后调用者阻塞等待完成)。

public void Wait(){
    ManualResetEventSlim? mres = null;
    lock (this)
    {
        if (!_completed)
        {
            mres = new ManualResetEventSlim();
            ContinueWith(_ => mres.Set());
        }
    }
    mres?.Wait();
    if (_error is not null)
    {
        ExceptionDispatchInfo.Throw(_error);
    }}

基本上就是这样。现在可以肯定的是,真正的 Task 要复杂得多,有更高效的实现,支持任意数量的 continuation,有大量关于它应该如何表现的按钮(例如,continuation 应该像这里所做的那样排队,还是应该作为任务完成的一部分同步调用),能够存储多个异常而不是一个异常,具有取消的特殊知识,有大量的辅助方法用于执行常见操作,例如 Task.Run,它创建一个 Task 来表示线程池上调用的委托队列等等。

你可能还注意到,我简单的 MyTask 直接有公共的 SetResult/SetException 方法,而 Task 没有。实际上,Task 确实有这样的方法,它们只是内部的,System.Threading.Tasks.TaskCompletionSource 类型作为任务及其完成的独立“生产者”;这样做不是出于技术上的需要,而是为了让完成方法远离只用于消费的东西。然后,你就可以把 Task 分发出去,而不必担心它会在你下面完成;完成信号是创建任务的实现细节,并且通过保留 TaskCompletionSource 本身来保留完成它的权利。(CancellationToken 和 CancellationTokenSource 遵循类似的模式:CancellationToken 只是 CancellationTokenSource 的一个结构封装器,只提供与消费取消信号相关的公共区域,但没有产生取消信号的能力,而产生取消信号的能力仅限于能够访问 CancellationTokenSource的人。)

当然,我们可以为这个 MyTask 实现组合器和辅助器,就像 Task 提供的那样。

想要一个简单的 MyTask.WhenAll?

public static MyTask WhenAll(MyTask t1, MyTask t2){
    var t = new MyTask();
    int remaining = 2;
    Exception? e = null;
    Action<MyTask> continuation = completed =>
    {
        e ??= completed._error; // just store a single exception for simplicity
        if (Interlocked.Decrement(ref remaining) == 0)
        {
            if (e is not null) t.SetException(e);
            else t.SetResult();
        }
    };
    t1.ContinueWith(continuation);
    t2.ContinueWith(continuation);
    return t;}

想要一个 MyTask.Run?你得到了它:

public static MyTask Run(Action action){
    var t = new MyTask();
    ThreadPool.QueueUserWorkItem(_ =>
    {
        try
        {
            action();
            t.SetResult();
        }
        catch (Exception e)
        {
            t.SetException(e);
        }
    });
    return t;}

一个 MyTask.Delay 怎么样?当然可以:

public static MyTask Delay(TimeSpan delay){
    var t = new MyTask();
    var timer = new Timer(_ => t.SetResult());
    timer.Change(delay, Timeout.InfiniteTimeSpan);
    return t;}

有了 Task,.NET 中之前的所有异步模式都将成为过去。在以前使用 APM 模式或 EAP 模式实现异步实现的地方,都会公开新的 Task 返回方法。

▌ValueTasks

时至今日,Task 仍然是 .NET 中异步处理的主力,每次发布都有新方法公开,并且在整个生态系统中都例行地返回 Task 和  Task<TResult>。然而,Task 是一个类,这意味着创建一个类需要分配内存。在大多数情况下,为一个长期异步操作额外分配内存是微不足道的,除了对性能最敏感的操作之外,不会对所有操作的性能产生有意义的影响。不过,如前所述,异步操作的同步完成是相当常见的。引入 Stream.ReadAsync 是为了返回一个 Task<int>,但如果你从一个 BufferedStream 中读取数据,很有可能很多读取都是同步完成的,因为只需要从内存中的缓冲区中读取数据,而不是执行系统调用和真正的 I/O 操作。不得不分配一个额外的对象来返回这样的数据是不幸的(注意,APM 也是这样的情况)。对于返回 Task 的非泛型方法,该方法可以只返回一个已经完成的单例任务,而实际上 Task.CompletedTask 提供了一个这样的单例 Task。但对于 Task<TResult> 来说,不可能为每个可能的结果缓存一个 Task。我们可以做些什么来让这种同步完成更快呢?

缓存一些 Task<TResult> 是可能的。例如,Task<bool> 非常常见,而且只有两个有意义的东西需要缓存:当结果为 true 时,一个 Task<bool>,当结果为 false 时,一个 Task<bool>。或者,虽然我们不想缓存40亿个 Task<int> 来容纳所有可能的 Int32 结果,但小的 Int32 值是非常常见的,因此我们可以缓存一些值,比如-1到8。或者对于任意类型,default 是一个合理的通用值,因此我们可以缓存每个相关类型的 Task<TResult>,其中 Result 为 default(TResult)。事实上,Task.FromResult 今天也是这样做的 (从最近的 .NET 版本开始),使用一个小型的可复用的 Task<TResult> 单例缓存,并在适当时返回其中一个,或者为准确提供的结果值分配一个新的 Task<TResult>。可以创建其他方案来处理其他合理的常见情况。例如,当使用 Stream.ReadAsync 时,在同一个流上多次调用它是合理的,而且每次调用时允许读取的字节数都是相同的。实现能够完全满足 count 请求是合理的。这意味着 Stream.ReadAsync 重复返回相同的 int 值是很常见的。为了避免这种情况下的多次分配,多个 Stream 类型(如 MemoryStream)会缓存它们最后成功返回的 Task<int>,如果下一次读取也同步完成并成功获得相同的结果,它可以只是再次返回相同的 Task<int>,而不是创建一个新的。但其他情况呢?在性能开销非常重要的情况下,如何更普遍地避免对同步完成的这种分配?

这就是 ValueTask<TResult> 的作用。ValueTask<TResult> 最初是作为 TResult 和 Task<TResult> 之间的一个区分并集。说到底,抛开那些花哨的东西,这就是它的全部 (或者,更确切地说,曾经是),是一个即时的结果,或者是对未来某个时刻的一个结果的承诺:

public readonly struct ValueTask<TResult>{
   private readonly Task<TResult>? _task;
   private readonly TResult _result;
   ...}

然后,一个方法可以返回这样一个 ValueTask<TResult>,而不是一个 Task<TResult>,如果 TResult 在需要返回的时候已经知道了,那么就可以避免 Task<TResult> 的分配,代价是一个更大的返回类型和稍微多一点间接性。

然而,在一些超级极端的高性能场景中,即使在异步完成的情况下,您也希望能够避免 Task<TResult> 分配。例如,Socket 位于网络堆栈的底部,Socket 上的 SendAsync 和 ReceiveAsync 对于许多服务来说是非常热门的路径,同步和异步完成都非常常见(大多数同步发送完成,许多同步接收完成,因为数据已经在内核中缓冲了)。如果在一个给定的 Socket 上,我们可以使这样的发送和接收不受分配限制,而不管操作是同步完成还是异步完成,这不是很好吗?

这就是 System.Threading.Tasks.Sources.IValueTaskSource<TResult> 进入的地方:

public interface IValueTaskSource<out TResult>{
    ValueTaskSourceStatus GetStatus(short token);
    void OnCompleted(Action<object?> continuation, object? state, short token, ValueTaskSourceOnCompletedFlags flags);
    TResult GetResult(short token);}

IValueTaskSource<TResult> 接口允许一个实现为 ValueTask<TResult> 提供自己的支持对象,使该对象能够实现像 GetResult 这样的方法来检索操作的结果,以及 OnCompleted 来连接操作的延续。就这样,ValueTask<TResult> 对其定义进行了一个小小的更改,其 Task<TResult>? _task 字段替换为 object? _obj 字段:

public readonly struct ValueTask<TResult>{
   private readonly object? _obj;
   private readonly TResult _result;
   ...}

以前 _task 字段要么是 Task<TResult> 要么是 null,现在 _obj 字段也可以是 IValueTaskSource<TResult>。一旦 Task<TResult> 被标记为已完成,它将保持完成状态,并且永远不会转换回未完成的状态。相比之下,实现 IValueTaskSource<TResult> 的对象对实现有完全的控制权,可以自由地在完成状态和不完成状态之间双向转换,因为 ValueTask<TResult> 的契约是一个给定的实例只能被消耗一次,因此从结构上看,它不应该观察到底层实例的消耗后变化(这就是 CA2012等分析规则存在的原因)。这就使得像 Socket 这样的类型能够将 IValueTaskSource<TResult> 的实例集中起来,用于重复调用。Socket 最多可以缓存两个这样的实例,一个用于读,一个用于写,因为99.999%的情况是在同一时间最多只有一个接收和一个发送。

我提到了 ValueTask<TResult>,但没有提到 ValueTask。当只处理避免同步完成的分配时,使用非泛型 ValueTask(代表无结果的无效操作)在性能上没有什么好处,因为同样的条件可以用 Task.CompletedTask 来表示。但是,一旦我们关心在异步完成的情况下使用可池化的底层对象来避免分配的能力,那么这对非泛型也很重要。因此,当 IValueTaskSource<TResult> 被引入时,IValueTaskSource 和 ValueTask 也被引入。

因此,我们有 Task、Task<TResult>、ValueTask 和 ValueTask<TResult>。我们能够以各种方式与它们交互,表示任意的异步操作,并连接 continuation 来处理这些异步操作的完成。

下期文章,我们将继续介绍 C# 迭代器,欢迎持续关注。

以上就是C#语言async await工作原理示例解析的详细内容,更多关于C# async await工作原理的资料请关注其它相关文章!

返回顶部
顶部