第33章 协程(C++20)

协程并不是一个新鲜的概念,如果读者熟悉C#或者Python,可能或多或少对它有所了解。概括来讲,协程是一种可以被挂起和恢复的函数,它提供了一种创建异步代码的方法。

事实上,C++中协程的概念早在2017年已经被提出,并且作为技术规范(Technical Specification)加入C++扩展中。不过协程迟迟没有加入C++标准中,因为很多专家质疑和反对这项新特性,他们认为协程过于复杂让人难以理解,并且有过多需要自定义的代码才能让其正常工作。所以这项提案直到2019年3月的C++标准委员会上才通过投票加入C++20标准中,并且最终采用了微软的实现方案。

在我看来,协程的实现确实非常复杂,只想通过文字描述来理解协程有一定的困难,想要充分理解协程甚至可能需要深入汇编层面去观察其代码的生成详情。为了便于读者理解,我将本章分为两个部分,首先讨论协程的使用方法,由于C++20标准库提供了一系列的辅助代码,因此在协程的使用的理解上不会有太大难度。然后我们再去讨论协程的实现原理,这部分我会通过实现一套辅助代码的方法尽可能理清协程的实现原理。最后请注意,本章中的示例代码均采用MSVC进行编译。

刚刚提到过,协程是一种可以被挂起和恢复的函数,那么究竟如何让函数挂起和恢复呢?请看下面的代码:

#include <iostream>
#include <chrono>
#include <future>

using namespace std::chrono_literals;

std::future<int> foo()
{
  std::cout << "call foo\n";
    std::this_thread::sleep_for(3s);
    co_return 5;
}

std::future<std::future<int>> bar()
{
  std::cout << "call bar\n";
  std::cout << "before foo\n";
  auto n = co_await std::async(foo);       // 挂起点
  std::cout << "after foo\n";
  co_return n;
}

int main()
{
  std::cout << "before bar\n";
  auto i = bar();
  std::cout << "after bar\n";
    i.wait();
  std::cout << "result = " << i.get().get();
}

编译运行以上代码,输出结果如下:

before bar
call bar
before foo
after bar
call foo
after foo
result = 5

仔细观察输出结果就会发现,代码在输出before foo以后的运行流程不同寻常。普通情况下,即使异步调用foo()函数,before foo之后输出的应该也是after foo,但是真实结果输出的却是after bar。也就是说线程执行到auto n = co_await std::async (foo);之后跳出了bar函数,如同从函数中间return了一般,返回后执行了std::cout << "after bar\n";,最后等待std::future<std::future<int>>的结果。实际上,我们看到的这个过程就是协程的挂起操作,而auto n = co_await std::async(foo);是协程的挂起点。观察这句代码我们会发现一个新的关键字co_await,读者暂时不需要理会它的具体功能,只需要将其当作是挂起点的标识即可。继续观察输出结果会看到call foo和紧随其后的after foo,说明函数foo执行结束之后,bar函数从当前的挂起点恢复了执行。进一步来说,co_await会触发一个挂起点,在触发挂起点后执行流程会返回到调用者继续执行,同时异步执行co_await所等待的对象,在等待对象执行完毕后,挂起点恢复执行流程继续执行后续代码。观察foo函数的代码,也会发现一个新的关键字co_return,我们暂时不需要关心它的详情,只需要知道它设置了返回值的结果,并且触发了挂起点的恢复,伪代码如下:

std::cout << "call foo\n";
std::this_thread::sleep_for(3s);
set_return_future_value(5);
set_future_ready();

需要说明的是,这里没有使用<< std::endl换行是因为异步执行会打乱输出,为了防止输出格式混乱,这里直接在字符串中使用\n换行。

在理解co_awaitco_return挂起和恢复协程之后,我们再来讨论另一种情况:

#include <iostream>
#include <experimental/generator>

std::experimental::generator<int> foo()
{
  std::cout << "begin" << std::endl;
  for (int i = 0; i < 10; i++) {
       co_yield i;
  }
  std::cout << "end" << std::endl;
}

int main()
{
  for (auto i : foo()) {
       std::cout << i << std::endl;
  }
}

编译运行以上代码,输出结果如下:

begin
0
1
2
3
4
5
6
7
8
9
end

这次读者应该能猜到挂起点的位置了,就是co_yield i;。代码在执行到for (auto i : foo())的时候调用函数foo(),并且输出字符串begin。然后进入循环,当执行到co_yield i;时协程被挂起,并将i的值返回给调用者。对于第一次执行,i的值为0,紧跟在begin之后的输出为0。再次进入循环并调用foo()的时候,函数并不会从头开始执行,而是从上次执行的挂起点恢复执行,于是0之后不会再次输出begin,而是输出数字1。依此类推,执行到输出9后再次进入foo函数,从挂起点恢复后跳出循环并执行std::cout << "end" << std::endl;

上面的两个例子中出现了3个新关键字,分别是co_awaitco_returnco_yield。C++20标准规定,具有以上3个关键字中任意一个的函数就是协程。请注意,因为main函数不能为协程,所以函数体中不能出现这3个关键字。通常情况下,建议将协程和标准库中的futuregenerator一起使用,因为协程辅助代码较为复杂,所以应该尽量避免自定义它们。

请注意,协程虽然提供了一种异步代码的编写方法,但是并不会自动执行异步操作,例如:

#include <iostream>
#include <chrono>
#include <future>
using namespace std::chrono_literals;

std::packaged_task<int()> task(
  []() {
       std::cout << "call task\n";
       std::this_thread::sleep_for(3s); 
       return 5; 
  }
);

std::future<int> bar()
{
  return task.get_future();
}

std::future<void> foo()
{
  std::cout << "call foo\n";
  std::cout << "before bar\n";
  auto i = co_await bar();
  std::cout << "after bar\n";
  std::cout << "result = " << i;
}

int main()
{
  std::cout << "before foo\n";
  auto w = foo();
  std::cout << "after foo\n";
  w.wait();
}

在上面的代码中,虽然使用auto i = co_await bar();挂起了协程,但是并没有其他线程执行异步操作,造成的结果就是w.wait();一直等待。代码运行的结果如下:

before foo
call foo
before bar
after foo

除了编写协程代码,我们还需要为协程创建异步执行环境,让我们修改上面代码的bar函数:

std::future<int> bar()
{
  std::future<int> r = task.get_future();
  std::thread t(std::move(task));
  t.detach();
  return r;
}

再次编译运行代码,可以获得正确的输出结果如下:

before foo
call foo
before bar
after foo
call task
after bar
result = 5

为了更容易地讨论协程的实现原理,我打算从协程的3个关键字入手,依次说明其背后的原理,最后就能呈现协程实现的整个面貌,让我们先从co_await开始吧。

从前面的代码示例,我们知道co_await运算符可以创建一个挂起点将协程挂起并等待协程恢复。那么co_await运算符所针对的操作数具体是什么呢?

auto n = co_await std::async(foo);

这句代码可以拆解分析:

std::future<std::future<int>> expr = std::async(foo);
auto n = co_await expr;

这里我们将表达式expr命名为可等待体,顾名思义是指该对象是可以被等待的。请注意,并非所有对象都是可等待体,例如下面的代码就一定会报错:

co_await std::string{ "hello" };

编译该代码编译器提示:

error C3312: no callable 'await_resume' function found for type 'std::string'
error C3312: no callable 'await_ready' function found for type 'std::string'
error C3312: no callable 'await_suspend' function found for type 'std::string'

错误提示说std::string缺少3个函数,目标对象可被等待需要实现await_resumeawait_readyawait_suspen这3个成员函数。具备这3个函数的对象可以称为等待器,也就是说等待器和可等待体可以是同一个对象。那么等待器是做什么的,为什么要给同一个对象两种命名呢?我们需要从以上的3个函数开始讨论。

1.await_ready函数叫作is_ready或许更加容易理解,该函数用于判定可等待体是否已经准备好,也就是说可等待体是否已经完成了目标任务,如果已经完成,则返回true;否则返回false

2.await_suspend这个函数名则更加令人难以理解,命名为schedule_ continuation应该会更加清晰,它的作用就是调度协程的执行流程,比如异步等待可等待体的结果、恢复协程以及将执行的控制权返回调用者。

3.await_resume实际上用于接收异步执行结果,可以叫作retrieve_value

了解了这3个函数的作用,现在就动手让std::string支持co_await

class awaitable_string : public std::string {
public:
  using std::string::string;
  bool await_ready() const { return true; }
  void await_suspend(std::experimental::coroutine_handle<> h) const {}
  std::string await_resume() const { return *this; }
};

std::future<std::string> foo()
{
  auto str = co_await awaitable_string{ "hello" };
  co_return str;
}

int main()
{
  auto s = foo();
  std::cout << s.get();
}

上面的代码可以编译成功并且输出字符串hello,因为我们实现的awaitable_string公有继承了std::string并且实现了await_readyawait_suspendawait_resume这3个函数。不过读者应该也猜到了,这个实现并没有异步功能。但这并不妨碍我们进一步理解它们,足够简单的代码反而更容易让人理解。

1.bool await_ready()返回true表明目标对象已经准备好了,也就是说协程无须在此挂起,执行流会继续按照代码编写顺序同步执行后续代码,在这种情况下await_suspend会被忽略,直接执行await_resume函数获得结果。如果函数返回false,则标识目标对象没有准备好,需要执行后续操作。

2.所谓的后续操作即调用void await_suspend(std::experimental:: coroutine_handle <> h)函数,这里有一个特殊的形参coroutine_handle<>,正如它的类型名所示,它是协程的句柄,可以用于控制协程的运行流程。读者不必了解其细节,只需要知道该句柄由编译器生成,其中包含协程挂起和恢复的上下文信息即可,coroutine_ handle<>operator()resume()函数,它们可以执行挂起点之后的代码。回到await_suspend函数本身,它可以借助coroutine_handle<>控制协程的执行流程。值得注意的是,await_suspend不一定返回void类型,还可以返回boolcoroutine_handle类型。

1)返回void类型表示协程需要将执行流的控制权交给调用者,协程保持挂起状态。

2)返回bool类型则又会出现两种情况,当返回值为true时,效果和返回类型与void相同;当返回false的时候,则恢复当前协程运行。

3)返回coroutine_handle类型的时候,则会恢复该句柄对应的协程。

值得注意的是,如果在await_suspend中捕获到了异常,那么协程也会恢复并且在协程中抛出该异常。

3.std::string await_resume()实际上和恢复本身没有关系,可以看到它只是返回最终结果而已。

了解了以上知识点之后,我们可以尝试让await_ready返回false,看一看会发生什么事情:

class awaitable_string : public std::string {
public:
  …
  bool await_ready() const { return false; }
  …
};

编译运行修改后的代码会发现程序被无限期挂起了,原因是虽然awaitable_ string{ "hello" }在构造的时候已经准备好了,但是由于await_ready返回false,因此编译器认为目标对象没有准备好,需要await_suspend来做协程的调度,但是这个函数什么也没做。这样就不会恢复协程的执行,co_return str;自然不会执行,程序在std::cout << s.get();被无限期挂起了。要解决这个问题,我们需要实现一些await_ suspend的代码,例如:

class awaitable_string : public std::string {
public:
  using std::string::string;
  bool await_ready() const { return false; }
  void await_suspend(std::experimental::coroutine_handle<> h) const {
       std::thread t{ [h] {
            // 模拟复杂操作,用时3s
            std::this_thread::sleep_for(3s);
            h(); }
       };
       t.detach();
  }
  std::string await_resume() const { return *this; }
};

修改代码后再次编译运行代码会发现,程序运行3s后输出hello字符串。这是因为await_suspend函数创建了新线程,并且在线程中等待3s后执行恢复流程,该恢复流程执行到co_return str;导致s.get()获得结果,最终输出hello字符串。

1.co_await运算符的重载

看到这里相信读者已经知道co_await运算符是干什么的了。但是我们还有一个问题没有解决,那就是为什么有可等待体和等待器两种名称,实际上我们在上文中看到的awaitable_string是一种特殊的情况,也就是可等待体和等待器是同一个对象,但是这不是必需的。我们可以重载co_await运算符,让它从可等待体转换为等待器,还是以std::string为例:

awaitable_string operator co_await(std::string&& str)
{
  return awaitable_string{ str };
}

std::future<std::string> foo()
{
  auto str = co_await std::string{ "hello" };
  co_return str;
}

在上面的代码中,awaitable_string operator co_await(std:: string&& str)co_await的重载,它将std::string转换为awaitable_ string后返回,这样我们就可以在foo函数中直接使用co_await std::string { "hello" };,而不必担心编译报错了。

除了使用非成员的方式重载co_await之外,还可以使用成员重载co_await。只不过对于std::string来说,修改STL的代码明显是不可行的,所以这里采用非成员方式。

2.可等待体和等待器的完整实现

最后让我们实现一个完整的可等待体和等待器来结束co_await的讨论:

#include <iostream>
#include <fstream>
#include <streambuf>
#include <future>

class file_io_string {
public:
  file_io_string(const char* file_name) {
       t_ = std::thread{ [file_name, this]() mutable {
            std::ifstream f(file_name);
            std::string str((std::istreambuf_iterator<char>(f)),
            std::istreambuf_iterator<char>());
            result_ = str;
            ready_ = true;
       } };
  }
  bool await_ready() const { return ready_; }
  void await_suspend(std::experimental::coroutine_handle<> h) {
       std::thread r{ [h, t = std::move(t_)] () mutable {
            t.join();
            h(); }
       };
       r.detach();
  }
  std::string await_resume() const { return result_; }
private:
  bool ready_ = false;
  std::thread t_;
  std::string result_;
};

std::future<std::string> foo()
{
  auto str = co_await file_io_string{ "test.txt" };
  co_return str;
}

int main()
{
  auto s = foo();
  std::cout << s.get();
}

在上面的代码中,file_io_string既是一个可等待体也是一个等待器,它可以异步读取一个文件数据到std::string中。file_io_string在构造函数中创建新线程执行文件读取操作并且设置ready_true。一般情况下,主线程的执行会比IO线程快,所以主线程调用await_ready的时候ready_更可能为false,这时代码会执行await_suspend函数,await_suspend函数创建新线程等待文件IO线程执行完毕,并且从挂起点恢复执行foo函数。

为了弄清楚co_yield运算符的实现原理,我们还是得从一段代码开始:

struct my_int_generator {};

my_int_generator foo()
{
  for (int i = 0; i < 10; i++) {
       co_yield i;
  }
}

编译上面的代码会获得两条相同的错误提示:

error C2039: 'promise_type': is not a member of 'std::experimental::coroutine_traits<my_int_generator>'

编译器表示std::experimental::coroutine_traits<my_int_generator>中没有promise_type成员类型。现在问题来了,我们的代码里并没有所谓的promise_type,更不知道coroutine_traits是什么了。解开这两个谜团,co_yield运算符的实现原理也就清晰了。

promise_type

现在让我们聚焦到promise_type类型上,这是一个非常关键的结构,实际上它不仅能影响co_yield的行为,co_awaitco_return也会被其影响。简单来说,promise_type可以用于自定义协程自身行为,代码的编写者可以自定义协程的多种状态以及自定义协程中任何co_awaitco_returnco_yield表达式的行为,比如挂起前和恢复后的处理、如何返回最终结果等。

通常情况下promise_type会作为函数的嵌套类型存在,比如在std::experimental:: generator类模板中就存在嵌套类型promise_type。当然,我们不能期待所有已经存在的代码都有嵌套类型promise_type。所以C++标准提供了另外一种方式获取promise_type,那就是std::experimental:: coroutine_traits<T>。比如std::future就是这么做的:

template <class _Ty, class… _ArgTypes>
struct coroutine_traits<future<_Ty>, _ArgTypes…> {…}

说动手就动手,为了让上一节的代码正常编译,我们这就来实现一个promise_type

struct my_int_generator {
  struct promise_type {};
};

my_int_generator foo()
{
  for (int i = 0; i < 10; i++) {
       co_yield i;
  }
}

编译上面的代码,编译器依然会报错。显然,promise_type不能是空结构体。让我们看一看错误提示:

error C2039: 'yield_value': is not a member of 'my_int_generator::promise_ type'
message : see declaration of 'my_int_generator::promise_type'
error C3789: this function cannot be a coroutine: 'my_int_generator::promise_type' does not declare the member 'get_return_object()'

实际上,想要实现一个generator可用的promise_type,有几个成员函数是必须实现的:

#include <experimental/resumable>
using namespace std::experimental;
struct my_int_generator {
  struct promise_type {
       int* value_ = nullptr;

       my_int_generator get_return_object() {
            return my_int_generator{ *this };
       }
       auto initial_suspend() const noexcept {
            return suspend_always{};
       }
       auto final_suspend() const noexcept {
            return suspend_always{};
       }
       auto yield_value(int& value) {
            value_ = &value;
            return suspend_always{};
       }
       void return_void() {}
  };

  explicit my_int_generator(promise_type& p) 
       : handle_(coroutine_handle<promise_type>::from_promise(p)) {}
  ~my_int_generator() {
       if (handle_) {
            handle_.destroy();
       }
  }
  coroutine_handle<promise_type> handle_;
};

my_int_generator foo()
{
  for (int i = 0; i < 10; i++) {
       co_yield i;
  }
}

int main()
{
  auto obj = foo();
}

我知道上面这份代码不是那么容易理解,不过没关系,接下来我们来逐个讨论。

1.get_return_object是一个非常关键的函数,理解这个函数名我们需要从调用者的角度来看问题。可以看到调用者是main函数,它使用obj接受foo()执行的返回值。那么问题来了,foo()函数并没有return任何值。这时协程需要promise_type帮助它返回一个对象,这个辅助函数就是get_return_object。现在就好理解了,get_return_object就是通过my_int_generator的构造函数创建了一个对象并且返回给调用者,其中构造函数的形参接受一个promise_type的引用类型,并将其转换为coroutine_handle <promise_ type>类型。前文已经讨论过,coroutine_handle的作用是控制协程执行流,这里也不例外,我们后面需要用它来恢复协程的执行。

2.通常情况下我们不需要在意initial_suspendfinal_suspend这两个函数,它们是C++标准给予代码库编写者在协程执行前后的挂起机会,程序员可以利用这些机会做一些额外的逻辑处理,大多数情况下是用不到的。值得注意的是,这两个函数的返回类型必须是一个等代器,为了代码编写的方便,标准为我们准备了两种等待器suspend_alwayssuspend_never,分别表示必然挂起和从不挂起:

struct suspend_always {
  bool await_ready() noexcept {
       return false;
  }
  void await_suspend(coroutine_handle<>) noexcept {}
  void await_resume() noexcept {}
};
struct suspend_never {
  bool await_ready() noexcept {
       return true;
  }
  void await_suspend(coroutine_handle<>) noexcept {}
  void await_resume() noexcept {}
};

库的编写者可以根据实际情况选择返回类型。这里的my_int_generator选择返回suspend_always的具体原因后面会提到。

3.yield_value的意思很简单,保存co_yield操作数的值并且返回等待器,generator通常返回suspend_always。事实上,co_yield i;可以等价于代码:

co_await promise.yield_value(i);

4.return_void用于实现没有co_return的情况。promise_type中必须存在return_void或者return_value

现在代码已经可以顺利编译通过了,不过my_int_generator还没有任何generator的动作,这里我们需要操作协程句柄恢复执行协程代码并且返回生成值:

struct my_int_generator {
  …
  int next() {
       if (!handle_) {
            return -1;
       }
       handle_();
       if (handle_.done()) {
            handle_.destroy();
            handle_ = nullptr;
            return -1;
       }
       return handle_.promise().value_;
  }
  …
};

int main()
{
  auto obj = foo();
  std::cout << obj.next() << std::endl;
  std::cout << obj.next() << std::endl;
  std::cout << obj.next() << std::endl;
}

在上面的代码中,成员函数next先使用if(!handle_)检查协程句柄的有效性,然后执行恢复协程handle_();并通过handle_.done()检查协程是否执行完毕,如果执行完毕则销毁句柄,否则返回生成的值handle_.promise().value_。值得注意的是,这里需要先恢复协程,至于原因读者是否还记得initial_suspendfinal_suspend返回的都是suspend_always,这个返回类型让协程在进入循环前就挂起了,所以需要让协程先恢复运行。运行以上代码,程序顺利输出:

0
1
2

如果读者想让自己的generator支持基于范围的for循环:

for (auto i : foo()) {
  std::cout << i << std::endl;
}

可以回顾一下第17章,为generator实现一套迭代器即可。

理解了复杂的co_awaitco_yield后,co_return运算符的原理就很容易理解了。和co_yield相同,co_return也需要promise_type的支持,请看下面的代码:

struct my_int_return {
  struct promise_type {
       int value_ = 0;
       my_int_return get_return_object() {
            return my_int_return{ *this };
       }

       auto initial_suspend() const noexcept {
            return suspend_never{};
       }
       auto final_suspend() const noexcept {
            return suspend_always{};
       }

       void return_value(int value) {
            value_ = value;
       }
  };

  explicit my_int_return(promise_type& p)
       : handle_(coroutine_handle<promise_type>::from_promise(p)) {}

  ~my_int_return() {
       if (handle_) {
            handle_.destroy();
       }
  }

  int get() {
       if (!ready_) {
            value_ = handle_.promise().value_;
            ready_ = true;
            if (handle_.done()) {
                 handle_.destroy();
                 handle_ = nullptr;
            }
       }

       return value_;
  }

  coroutine_handle<promise_type> handle_;
  int value_ = 0;
  bool ready_ = false;
};

my_int_return foo()
{
  co_return 5;
}

int main()
{
  auto obj = foo();
  std::cout << obj.get();
  std::cout << obj.get();
  std::cout << obj.get();
}

这段代码和上一节的代码示例非常相似,读者可以对比着去理解。唯一需要说明的是成员函数void return_value(int value),函数foo中的co_return 5实际上就是调用的return_value(5)。如果co_return没有任何返回值,则需要用成员函数void return_void()代替void return_value(int value)

promise_type还有一个额外的功能,即可对co_await的操作数进行转换处理。为此我们需要给promise_type添加一个成员函数await_transform,例如:

struct promise_type {
    …
    awaitable await_transform(expr e) {
        return awaitable(e);
    }
};

这样做的结果是代码co_await expr;最终会转换为:co_await promise.await_transform(expr);

除此之外,promise_type还可以对异常进行处理,为此我们需要给promise_type添加一个成员函数void set_exception,例如:

struct promise_type {
    …
    void unhandled_exception() {
       eptr_ = std::current_exception();
  }
};

当协程运行过程中发生异常时,代码会捕获异常并且调用unhandled_ exception函数,这个过程代码类似于:

co_await promise.initial_suspend();
try
{
  …
}
catch (…)
{
  promise.unhandled_exception();
}
FinalSuspend:
co_await promise.final_suspend();
}

捕获异常后我们可以选择在调用者上下文中重新抛出该异常。

总的来说,协程的实现原理非常复杂,编译器为每个协程生成大量的代码,同时也需要程序员配合编写辅助代码才能正确使用。稍有疏忽就可能引发未定义的行为,尤其要注意协程上下文创建和销毁的时机、协程句柄的生命周期、运行的代码是否与线程强关联等。我的建议是,没有特别的需求,通常情况下应该使用成熟的协程库帮助我们完成协程函数的编写,比如STL、cppcoro等。最后还是提醒读者,在生产中使用协程编写代码请确保对协程机制和执行流有深入理解。