剖析C#中的 async 方法

本文手动翻译自:Dissecting the async methods in C#

C#是开发者的生产力利器,并且它还在被不断地推动着进化,变得更适合开发高性能应用程序,这更是开发者们喜闻乐见的消息。

举个例子:C# 5 给我们带来了 async 异步方法,它的出现标志着【次世代】的来临——它可以将多个基于 Task 的操作结合到一起,使代码编写更加直观容易。但 async 也存在额外开销,Task 是引用类型,创建 Task 对象的时候会向堆区申请内存,即使 async 以同步的方式运行,也会有性能开销。到了C# 7,async 方法开始支持返回类Task (task-like)的类型,如 ValueTask ,来减少申请堆内存的次数或在某些情况下完全避免。

为了理解这一套运行机制,我们需要深入了解 async 方法是如何实现的。

但首先,来看看一些关于历史的小姿势:

TaskTask<T> 类于 .NET 4.0 中引入,以我的观点来看,它让 .NET 在异步和并行编程领域的思想产生了巨大转变,它是 asyncawait 的基石。跟以往的异步编程模式不一样,比如 .NET1.0BeginXXX/EndXXX 模式(AKA同步编程模型)或者基于事件的异步编程模式,例如 .NET2.0 中 的 BackgroundWorker,和它们相比 Task 可以互相组合,合作完成任务。

Task 对象能够在当它负责的任务完成后,将结果返回给你。这里的任务可以是 IO 操作,或者计算密集型任务。任务的内容不重要,重要的是它返回的结果,它是自给自足的,且是 Task 中妥妥的一等公民——试想,你的后续工作需要 Task 执行的结果才能继续开展,因此你得先等待它完成并返回。Task 对象是接收【未来】结果的邮箱——当 Task 完成工作后,它会将结果保存下来,放进 Result 属性中。你可以将 Task 对象存放在某个变量里(比如数组),将它作为方法的返回值返回,或者还能将它作为参数传递给其他方法。甚至,你可以将两个 Task 结合在一起,形成一个新的 Task ,例如当前置任务完成后,自动开启新任务,构成一条简单的工作流水线。通过 Task 库提供的方法,你也可以决定当某个 Task 执行成功、失败或者被取消后,后续应该做的事情。

Task 库(Task Parallel Library)改变了我们对【并行】的认识,并且C# 5带来的 asyncawait 让其发扬光大。async/awaitTask 对象们串连起来,并且让开发者在平时常见的结构中使用它们,如 try/catchusing 等等。但美中不足的是,async/await 有自身带来的额外性能开销。为了理解这部分开销是如何造成的,我们需要先了解它的工作原理。

你做了什么!Async 关键字

常规的方法只有一个入口和一个出口(可能存在多个 return 关键字,但运行时只会有一个执行)。但是 async 方法和迭代器方法(有 yield return 的方法)是不一样的。这里我们只讨论 async 方法,当它被调用后,其会立即返回结果(TaskTask<T>)给调用者,然后调用者可以使用 await 等待 Task 将真正的结果返回回来。

为了方便,后面我们将 async 方法定义为被 async 关键字标记的的方法。被 async 标记不代表该方法就以异步的方式执行,它只是做个标记,让编译器在遇到该标记时对 async 方法做一些特殊的处理,这一点很重要,不要忘记啰!

来看看下面的 async 方法:

class StockPrices
{
    private Dictionary<string, decimal> _stockPrices;
    public async Task<decimal> GetStockPriceForAsync(string companyId)
    {
        await InitializeMapIfNeededAsync();
        _stockPrices.TryGetValue(companyId, out var result);
        return result;
    }
 
    private async Task InitializeMapIfNeededAsync()
    {
        if (_stockPrices != null)
            return;
 
        await Task.Delay(42);
        // Getting the stock prices from the external source and cache in memory.
        _stockPrices = new Dictionary<string, decimal> { { "MSFT", 42 } };
    }
}

GetStockPriceForAsync 方法确保字典 _stockPrices 在返回结果之前已被初始化。

为了更好地理解编译器给代码施加了什么魔法,我们先试着将上面的代码手动转换为编译器优化过后的版本。

手撕 async 方法

Task 类库提供两种方法让我们创建和连接:Task.ContinueWith 用于在 Task 对象完成任务后指定接下来的任务;TaskCompletionSource<T> 类用于手动创建 Task 对象。

class GetStockPriceForAsync_StateMachine
{
    enum State { Start, Step1, }
    private readonly StockPrices @this;
    private readonly string _companyId;
    private readonly TaskCompletionSource<decimal> _tcs;
    private Task _initializeMapIfNeededTask;
    private State _state = State.Start;
 
    public GetStockPriceForAsync_StateMachine(StockPrices @this, string companyId)
    {
        this.@this = @this;
        _companyId = companyId;
    }
 
    public void Start()
    {
        try
        {
            if (_state == State.Start)
            {
                // The code from the start of the method to the first 'await'.
 
                if (string.IsNullOrEmpty(_companyId))
                    throw new ArgumentNullException();
 
                _initializeMapIfNeededTask = @this.InitializeMapIfNeeded();
 
                // Update state and schedule continuation
                _state = State.Step1;
                _initializeMapIfNeededTask.ContinueWith(_ => Start());
            }
            else if (_state == State.Step1)
            {
                // Need to check the error and the cancel case first
                if (_initializeMapIfNeededTask.Status == TaskStatus.Canceled)
                    _tcs.SetCanceled();
                else if (_initializeMapIfNeededTask.Status == TaskStatus.Faulted)
                    _tcs.SetException(_initializeMapIfNeededTask.Exception.InnerException);
                else
                {
                    // The code between first await and the rest of the method
 
                    @this._store.TryGetValue(_companyId, out var result);
                    _tcs.SetResult(result);
                }
            }
        }
        catch (Exception e)
        {
            _tcs.SetException(e);
        }
    }
 
    public Task<decimal> Task => _tcs.Task;
}
 
public Task<decimal> GetStockPriceForAsync(string companyId)
{
    var stateMachine = new GetStockPriceForAsync_StateMachine(this, companyId);
    stateMachine.Start();
    return stateMachine.Task;
}

代码比较冗长,但并不太复杂。之前 GetStockPriceForAsync 方法的所有逻辑都移动到了 GetStockPriceForAsync_StateMachine.Start 方法里了,该方法采取了【续体传递风格来编写】(continuation-pass style)。异步算法,大致上是根据 await 边界将原方法拆分,构建成一个状态机。第一个区块是从方法开始到第一个 await 关键字的部分;第二个区块是从第一个 await 开始到第二个 await 部分;由此可见,第三个区块就是从第二个 await 开始,到第三个 await 或到方法结束(若后续没有 await 关键字);理解到这里就比较简单了,await 关键字将方法分割成不同的部分。

// Step 1 of the generated state machine:
 
if (string.IsNullOrEmpty(_companyId)) throw new ArgumentNullException();
_initializeMapIfNeededTask = @this.InitializeMapIfNeeded();

每个被等待的 Task 对象现在变成了状态机的成员字段,而且 Start 方法将自己申请为 Task 对象的后续任务:

_state = State.Step1;
_initializeMapIfNeededTask.ContinueWith(_ => Start());

然后再看,当 Task 对象工作完毕,Start 方法会被重新调用,_state 字段将被再次判断,以确认现在进行到了哪一步。之后的逻辑将判断 Task 是否执行成功、被取消或者失败。之后,状态机将运行至下一状态,执行下一个区块的代码。当所有任务都执行完毕后,状态机会设置 TaskCompletionSorce<T> 对象的结果(这里的结果为 Task 对象),并且 GetStockPriceForAsync 方法会将作为结果的 Task 返回。

// The code between first await and the rest of the method
 
@this._stockPrices.TryGetValue(_companyId, out var result);
_tcs.SetResult(result); // The caller gets the result back

上面的实现方式有一些致命缺点:

  • 频繁地申请堆内存:一次给状态机,一次给 TaskCompletionSource<T> ,一次给 TaskCompletionSource<T> 里的 Task 对象,一次给 Task.ContinueWith 中的委托。

  • 没有【热路径优化】(hot path optimizations):如果被等待的 Task 早就已经完成了,那就没必要再执行 Task.ContinueWith

  • 缺少扩展性:上面的实现方式与 Task 类具有非常强的耦合关系,这样就没办法将代码用在其他地方,比如等待与之不同的类型,或者返回 TaskTask<T> 以外的类型。

下面我们将解开真正的异步状态机的面纱,看看它是怎么解决这些问题的。

Async 状态机原理

编译器转换 Async 函数的手法已经与上一节的非常相似了,其原理可以总结为以下几点:

  • 生成的状态机就像异步方法的堆栈帧,包含原异步方法的所有逻辑。

  • AsyncTaskMethodBuilder<T> 用于保存完成后的 Task (和 TaskCompletionSource<T> 非常相似),并且管理状态机的状态切换。

  • TaskAwaiter<T> 包装了 Task ,并且在必要时管理 Task 的后续任务。

  • MoveNextRunner 在给定的上下文中调用状态机的 IAsyncStateMachine.MoveNext 方法。

生成的状态机在 debug 模式下为 class 类型,在 release 模式下为 struct 类型。所有类型(除了 MoveNextRunner 类)都在 BCL (Base Class Library)中定义为 struct

编译器给状态机成员的命名会类似 <YourMethodNameAsync>d__1 。为了避免命名冲突,生成的成员名包含了我们无法在编译器中使用的非法字符。但为了简单起见,后面的例子我会将非法字符尖括号 <> 都替换为 下划线 _

异步方法?让我康康!

原始的【异步】方法构建了一个状态机实例,并将它初始化为 captured 状态(如果方法为非静态方法,会包含 this 指针),然后调用 AsyncTaskMethodBuilder<T>.Start 方法启动状态机,调用时会将状态机的引用作为参数传递进去。

[AsyncStateMachine(typeof(_GetStockPriceForAsync_d__1))]
public Task<decimal> GetStockPriceFor(string companyId)
{
    _GetStockPriceForAsync_d__1 _GetStockPriceFor_d__;
    _GetStockPriceFor_d__.__this = this;
    _GetStockPriceFor_d__.companyId = companyId;
    _GetStockPriceFor_d__.__builder = AsyncTaskMethodBuilder<decimal>.Create();
    _GetStockPriceFor_d__.__state = -1;
    var __t__builder = _GetStockPriceFor_d__.__builder;
    __t__builder.Start<_GetStockPriceForAsync_d__1>(ref _GetStockPriceFor_d__);
    return _GetStockPriceFor_d__.__builder.Task;
}

传递引用是非常重要的优化点,因为状态机的 struct 占用十分大的内存(大于100 bytes),引用传参避免了多余的拷贝。

要上了!状——态——机

struct _GetStockPriceForAsync_d__1 : IAsyncStateMachine
{
    public StockPrices __this;
    public string companyId;
    public AsyncTaskMethodBuilder<decimal> __builder;
    public int __state;
    private TaskAwaiter __task1Awaiter;
 
    public void MoveNext()
    {
        decimal result;
        try
        {
            TaskAwaiter awaiter;
            if (__state != 0)
            {
                // State 1 of the generated state machine:
                if (string.IsNullOrEmpty(companyId))
                    throw new ArgumentNullException();
 
                awaiter = __this.InitializeLocalStoreIfNeededAsync().GetAwaiter();
 
                // Hot path optimization: if the task is completed,
                // the state machine automatically moves to the next step
                if (!awaiter.IsCompleted)
                {
                    __state = 0;
                    __task1Awaiter = awaiter;
 
                    // The following call will eventually cause boxing of the state machine.
                    __builder.AwaitUnsafeOnCompleted(ref awaiter, ref this);
                    return;
                }
            }
            else
            {
                awaiter = __task1Awaiter;
                __task1Awaiter = default(TaskAwaiter);
                __state = -1;
            }
 
            // GetResult returns void, but it'll throw if the awaited task failed.
            // This exception is catched later and changes the resulting task.
            awaiter.GetResult();
            __this._stocks.TryGetValue(companyId, out result);
        }
        catch (Exception exception)
        {
            // Final state: failure
            __state = -2;
            __builder.SetException(exception);
            return;
        }
 
        // Final state: success
        __state = -2;
        __builder.SetResult(result);
    }
 
    void IAsyncStateMachine.SetStateMachine(IAsyncStateMachine stateMachine)
    {
        __builder.SetStateMachine(stateMachine);
    }
}

编译器生成的状态机代码看起来很复杂,但本质上和我们手动编写的非常相似。

即使原理上差不多,但它还是有一些改进后的亮点:

1.Hot path优化

与我们单纯地想法不同,生成的状态机意识到被等待的 Task 可能已经完成。

awaiter = __this.InitializeLocalStoreIfNeededAsync().GetAwaiter();
 
// Hot path optimization: if the task is completed,
// the state machine automatically moves to the next step
if (!awaiter.IsCompleted)
{
    // Irrelevant stuff
 
    // The following call will eventually cause boxing of the state machine.
    __builder.AwaitUnsafeOnCompleted(ref awaiter, ref this);
    return;
}

如果被等待的 Task 已经完成(无论成功与否),状态机将继续移动到下一步:

// GetResult returns void, but it'll throw if the awaited task failed.
// This exception is catched later and changes the resulting task.
awaiter.GetResult();
__this._stocks.TryGetValue(companyId, out result);

这意味着即使所有 Task 都已经完成,整个状态机也将继续待在堆栈中。直到今天(作文时间2017年11月),即便一个 async 方法中的 Task 已完成或完全按照同步方式运行,它也将造成微小的内存消耗。这些内存消耗只会是来自 Task 对象!

2.错误处理

状态机中没有专门处理 Task 对象执行失败或被取消的逻辑。当 Task 被取消或者失败后,状态机在调用 awaiter.GetResult() 的时候会抛出 TaskCancelledException 异常。这是一种优雅的解决方式,它得益于 GetResult() 方法中,与传统的通过 task.Wait()task.Result 来处理的不同之处。

对于 task.Wait()task.Result 这俩卧龙凤雏,当它们只遇到导致 Task 失败的单个异常时,仍然会抛出 AggregateException 异常。这样做的原因十分简单:Task 的任务可能不仅是具有一个错误的 IO 密集型操作,也可能是并行计算的结果。在后者中,可能会出现不止一个错误,AggregateException 就是设计来处理这种情况的。

但是 async/await 模式是为最多只会出现一个错误的异步操作而设计的。所以C# 开发者认为 awaiter.GetResult()AggregateException 拆解并只返回第一个错误是更有意义的。老实讲这样的设计思路并不完美,具体的地方我们将在下一篇文章中讨论。

异步状态机只是其中一块拼图,为了理解整块拼图,我们需要看看状态机对象是如何与 TaskAwaiter<T>AsyncTaskMethodBuilder<T> 交互的。

好兄弟们是怎样合作工♂作的?

这张图看起来复杂到姥姥家了,但每个环节都设计得和姥姥家的烤饼干一样完美,并且它们在整个流程中不可或缺。其中最有趣的合作部分发生在被等待的 Task 还未完成时(被棕色矩形标记起来的部分):

  • 状态机调用 builder.AwaitUnsafeOnCompleted(ref awaiter, ref this); 来将自己设置为 Task 对象们未完成时的后续任务(续体),推动状态机的状态转换。

  • builder 确保当 Task 任务完成时将调用 IAsyncStateMachine.MoveNext

    • builder捕获(captures)当前的执行上下文(ExecutionContext),接着创建一个 MoveNextRunner 对象(当前状态机实例将作为参数之一)来与之关联,然后再把 MoveNextRunner.Run() 包装为一个 Action 委托,该委托将在被捕获的执行上下文中向前推动状态机执行。

    • builder 通过调用 TaskAwaiter.UnsafeOnCompleted(action) 来将委托 action 处理为 Task 对象完成后的续体(注意与第一步中续体的区别嗷)。

当被等待的 Task 任务完成后,会执行之前给定的回调方法,并且状态机会执行到 async 方法的下一个代码区块。

等一下!执行上下文是啥?

有人可能会想,啥是执行上下文,为啥要弄得这么复杂?

在没有异步的世界里,每个线程将环境信息保存在线程的本地存储中。被保存的可以是安全相关信息、特定的数据(culture-specific data)等等。当一个线程中有三个方法被调用时,共用的数据将在方法之间很自然地传递,但将环境切换到异步模式下就没这么简单了。异步方法中的每个【部分】(section)都能在不同的线程中执行,这就导致线程的本地存储变得不可用(A线程无法访问B线程的本地信息)。

执行上下文保存了一个逻辑控制流信息,即使跨越多个线程,有了执行上下文就能继续执行之前的任务。

Task.RunThreadPool.QueueUserWorkItem 会自己处理执行上下文。Task.Run 方法会在调用线程时保存执行上下文,然后将它保存在 Task 对象里。当 TaskSchedulerTask 对象下达指令调用其委托时,会通过 ExecutionContext.Run 方法将之前保存的上下文取出来。

我们可以用 AsyncLocal<T> 来演示一遍上述的过程:

static Task ExecutionContextInAction()
{
    var li = new AsyncLocal<int>();
    li.Value = 42;
 
    return Task.Run(() =>
    {
        // Task.Run restores the execution context
        Console.WriteLine("In Task.Run: " + li.Value);
    }).ContinueWith(_ =>
    {
        // The continuation restores the execution context as well
        Console.WriteLine("In Task.ContinueWith: " + li.Value);
    });
}

在上面的代码中,执行上下文经过 Task.RunTask.ContinueWith 方法传递。如果运行代码,将输出:

In Task.Run: 42
In Task.ContinueWith: 42

但在基础类库中(BCL)不是所有方法都会自动保存执行上下文,比如 TaskAwaiter<T>.UnsafeOnCompleteAsyncMethodBuilder<T>.AwaitUnsafeOnComplete 。这看起来很奇怪,为什么C# 开发者要采取 AsyncMethodBuilder<T>MoveNextRunner 中的【不安全】方法来手动传递上下文,而不是采用内置的 AwaitTaskContinuetion 。我猜是考虑到性能问题或者其他限制。

用【不安全】方法的实现方式:

static async Task ExecutionContextInAsyncMethod()
        {
            var li = new AsyncLocal<int>();
            li.Value = 42;
            await Task.Delay(42);
 
            // The context is implicitely captured. li.Value is 42
            Console.WriteLine("After first await: " + li.Value);
 
            var tsk2 = Task.Yield();//创建当前执行上下文中的task
            tsk2.GetAwaiter().UnsafeOnCompleted(() =>
            {
                // The context is not captured: li.Value is 0
                Console.WriteLine("Inside UnsafeOnCompleted: " + li.Value);
            });
 
            await tsk2;
 
            // The context is captured: li.Value is 42
            Console.WriteLine("After second await: " + li.Value);
        }

输出:

After first await: 42
Inside UnsafeOnCompleted: 0
After second await: 42

总结划重点

  • Async 方法和传统的同步方法有着天差地别。

  • 编译器将给每个异步方法生成一个状态机,并将方法中所有代码(逻辑)放在状态机中。

  • 生成的状态机代码针对同步执行的情况做出了优化:当被等待的 Task 对象已经完成,那么该异步方法的开销将降低至最小。

  • 如果被等待的 Task 对象还没有完成,那么后续将会有其它类参与进来完成工作。

课外阅读

如果你想要了解更多有关执行上下文的知识,强烈推荐下面的两篇博客:

之后,我们将携手探索 C# 异步方法的可扩展性。

上一篇
下一篇