什么是协程
新接触的人看了网上很多人的见解都是一头雾水,本人的理解,协程就是可中断的函数,这个函数在执行到某一时刻可以暂停,保存当前的上下文(比如当前作用域的变量,函数参数等等),在后来某一时刻可以手动恢复这个中断的函数,把保存的上下文恢复并从中断的地方继续执行。简而言之,协程就是可中断的函数,协程如何实现:保存上下文和恢复上下文。
你可能会说协程不会这么简单的吧,我这里来举例一下啊,如python的协程
def test(): print('begin') yield print('hello world') yield print('end') t = test() next(t)
以上就是一个协程,怎么调用它呢,如果直接使用test(),它不是调用,而是返回一个句柄(python中叫生成器),通过这个句柄就可以启动这个协程,以下是调用结果
很显然,这个函数只执行了一部分,继续执行下去只要继续调用next就可以,如上的test函数只有两次“中断”,调用三次next就会执行完毕(由于是主讲c++20协程,python协程的细节不会去讲)
调度器
如果是上面的这种协程是没有什么实际用途的,协程和调度器结合起来才是真正发挥作用的时候。调度器就是处理好协程之间的调用,知道所有协程调用的时机,通过调度器可以实现更多的功能,如定时协程,io协程,以下依旧拿python的协程来举例(各位请勿着急,实在是python太好举例了,前面先说明白,后面c++20的协程才好讲)
还是定义一个协程
async def test(): print('begin') print('end')
python为了区分迭代器生成器和协程,加入了新关键字async和await,并且在里面不能使用yield关键字,不过原理都是一样的,以上的协程中途没有中断(没有上下文的切换),一次便可以执行完毕。
好,现在开始说调度器,调度器简单理解为一个队列,将一个协程扔进调度器,调度器根据来执行所有的协程,那么调度器如何执行呢,简单来说就是使用一个循环,从队列中取出协程,然后“复苏”这个协程,如下
首先看main函数,asyncio.ensure_future(test())就是将main这个协程扔进调度器的队列中
asyncio.get_event_loop()就是获得这个循环,
loop.run_forever()就是开始这个循环,在循环中,会从队列中取出协程执行,先看执行结果
因为test协程没有进行上下文的切换,当循环直接复苏一次test协程后,test协程就直接执行完毕了,前面所讲,基于这个调度器可以实现很多额外的功能,如果说在这个循环中我加入一个睡眠的协程,用一个键值对(键为超时的时间戳,值为协程句柄),在循环中不停的获取当前的时间戳,然后从这个队列中比对时间戳,当时间戳相等后就表明这个协程就已经可以执行了,直接取出协程并复苏执行(当前可以这样理解,调度器肯定不是这样的步骤,还有很多很复杂的步骤,不过我们并不需要知道(一般来讲))
看如下的改造
在test协程中增加了一句await asyncio.sleep(1),这样就发生了一次上下文的切换,在循环中,开始从队列中取出这个test协程执行,执行途中遇到了asyncio.sleep(1),test协程就保存当前的上下文,然后“中断”,中断后,程序流程又回到了循环中,然后在队列中又增加一个键为时间戳值是test协程句柄的一项,下一次训换开始直接获取当前时间戳,然后比对,如果超时了,就继续拿出test协程进行执行(暂时这样理解),
所以执行结果如下
先打印begin,然后等待一秒中,然后再打印end,然后test协程执行完毕,从代码上看这个逻辑是这样的,如果调度器中有多个协程,在这等待的一秒时间又会上下文切换去执行别的协程,时间到了又会到test协程中从睡眠的地方恢复执行。
c++20的协程
c++20的标准中,新增了协程的支持,也就是可以在c++中定义一个协程了,但是看过的小伙伴肯定是知道的,要定义一个协程只要定义一些必要的函数,在这里,我推荐知乎的一篇文章,看一下要实现哪一些接口,C++20协程初探,然后有小伙伴肯定会说了,你这算什么意思,直接拿别人的结果,然后直接写一个标题,直接套用。
不会的,我当然也不会做这样的事情,首先我想说明的是,这些接口只是官方定义的,如果记这些简直跟死背书没有什么区别,我先表明的只是协程究竟是什么,以上python中讲到了如何定义一个协程
async def test(): pass
前面的那个async就是一个协程要实现的接口,20的标准中支持的就是如何定义一个类似async的东西,好,继续往下
如果在c++中能定义了这么一个协程,肯定也是没有什么作用的,需要一个调度器才是协程的真正强大之处,很抱歉,20官方并没有提供这样的东西,以下是我本人写的提供了类似这样功能的一套代码,有人肯定会说网上有那么多c++协程的代码,都写的乱七八糟,根本无法理解(可以这么说,不要喷我,反正我就是这么想的)。
先放链接吧libfuture,是的,没错,我把这个小工具库叫做libfuture(感觉有点兴奋,毕竟是自己真正意味上第一次写小工具库),下载0.0.6版本的就好(脸红,因为还在完善,只能不停的修bug,0.0.6算是修改的比较完善了,虽然还有一点)
下载好了就是一个解压包,直接打开libfuture/src/libfuture.sln(如果你是windows的话,当然,这个库我作了跨平台,因为涉及到了socket和数据收发,使用了windows平台的iocp和linux平台的epoll),如果你不想自己编译,那么可以使用我编译好了的库文件和动态链接库
开始使用
使用之前我说明一下,使用vs2019,而且要支持20标准的,一般直接下载最新的是支持的,打开创建一个项目,我这里直接叫做testlibfuture了,
直接点击添加现有项将lib文件添加进来
选择lib文件,点击直接添加,之后就是这样
你也可以使用其他的方法,然后随便写一个main函数编译一下,注意选择32位debug版本的,如果你是使用我编译的话
直接将dll文件放入和可执行文件一层的目录中
先说一下配置,语言标准要选择c++20,
附加包含目录直接把刚刚下载的libfuture源码的include目录包含进去就好了
然后重磅戏就来了,先来写一个栗子
#include <libfuture.h> #include <iostream> using namespace std; using namespace libfuture; future_t<> task1() { cout << "task1 begin" << endl; cout << "task1 end" << endl; co_return; } int main(int argc, char** argv) { auto sche = current_scheduler(); //开启一个协程 sche->ensure_future(task1()); sche->run_until_no_task(); return 0; }
我用了一个libfuture的命名空间,直接引用头文件加using namespace就可以,
首先定义一个协程,就是task1,前面的future_t<> 就是类似python的async的东西,auto sche = current_scheduler();就是得到调度器,
sche->ensure_future(task1());就是把task1协程丢进调度器,sche->run_until_no_task();就是启动调度器。看一下运行结果
好,也是上面一样的思路,将task1协程扔进调度器中存放协程的队列,调度器启动一个循环,直接得到这个循环执行,这个协程中没有进行上下文的切换,因此一下就执行完毕了。
由于我特别喜欢go语言开启协程的方式,我就定义了一个宏叫做cpp,跟ensure_future一样的功能,所以直接改成以下的
#include <libfuture.h> #include <iostream> using namespace std; using namespace libfuture; future_t<> task1() { cout << "task1 begin" << endl; cout << "task1 end" << endl; co_return; } int main(int argc, char** argv) { auto sche = current_scheduler(); //开启一个协程 cpp task1(); sche->run_until_no_task(); return 0; }
执行结果也是上面一样,现在再来加上一个协程的睡眠,
#include <libfuture.h> #include <iostream> using namespace std; using namespace libfuture; future_t<> task1() { cout << "task1 begin" << endl; co_await 1s; cout << "task1 end" << endl; co_return; } int main(int argc, char** argv) { auto sche = current_scheduler(); sche->init(); //开启一个协程 cpp task1(); sche->run_until_no_task(); return 0; }
说明一下,要使用睡眠功能要进行调度器要进行初始化,也就是init,在python中协程的睡眠是await asyncio.sleep(1),这样就是睡眠一秒,这里直接就是co_await 1s就是睡眠一秒,libfuture的睡眠的时间基准是使用标准库的chrono。执行是这样的
第一个的iocp。。。。。。。87可以先忽略哈,有些设计上的,和日志的打印还没有具体完善,先打印task1 begin 在等待一秒,再打印task1 end ,好,如果还有其他的协程,在这一秒的上下文切换中就会去执行其他的协程,当到时间了,会回到当前执行协程恢复执行,
再来介绍libfuture内置的非常重要的协程,
open_accept(接收一个客户端),返回值是一个sockaddr_in指针,是客户端的地址信息;
open_connection(连接一个服务端),返回值是是否连接上;
buffer_read(往一个socket中读数据),返回值是是否接收数据是否超时,
buffer_write(往一个socket中写数据),返回值是发送数据是否超时,
是的,我往其中添加的io的协程,为什么呢,像以上说的,一直判断队列中的时间戳,那么在时间没到的途中一直判断就会造成cpu的空转,浪费cpu,所以要把等待的时间让出去,让cpu去执行其他的程序。
先看一个客户端栗子
#include "libfuture.h" #include <string> #include <iostream> using namespace std; using namespace libfuture; #define BUF_LEN 10240 string send_str = "\ GET / HTTP/1.1\r\n\ Host: 42.192.165.127\r\n\ Connection: keep-alive\r\n\r\n"; future_t<> test_connect(const char* ip, unsigned short port) { //空间要大 buffer_t buffer(BUF_LEN + 1); socket_t client_socket(AF_INET, SOCK_STREAM, 0); bool has_c = co_await open_connection(&client_socket, ip, port); if (!has_c) { cout << "连接失败" << endl; co_return; } cout << "连接成功" << endl; buffer.push(send_str.c_str(), send_str.size()); bool is_timeout = co_await buffer_write(&buffer, &client_socket, 5s); if (is_timeout) { cout << "超时未发送" << endl; co_return; } cout << "发送消息成功" << endl; buffer.clear(); //看看回了什么消息 is_timeout = co_await buffer_read(&buffer, &client_socket, 5s); if (is_timeout) { cout << "超时未读取到消息" << endl; co_return; } if (buffer.has_data()) { //防止烫烫或屯屯 int len = buffer.data_len(); if (len >= BUF_LEN) len = BUF_LEN; buffer.data()[len] = 0; cout << buffer.data() << endl; } co_return; } int main(int argc, char** argv) { #ifdef _WIN32 WSADATA data; WSAStartup(MAKEWORD(2, 2), &data); #endif auto sche = current_scheduler(); sche->init(); for (int i = 0; i < 10; ++i) cpp test_connect("42.192.165.127", 80); sche->run_until_no_task(); #ifdef _WIN32 WSACleanup(); #endif return 0; }
以上,因为windows的socket要先进行初始化才能用,所以有WsaStartup之类的函数,首先
auto sche = current_scheduler(); sche->init();
两行代码是获得调度器,并初始化调度器,
for (int i = 0; i < 10; ++i)
cpp test_connect(“42.192.165.127”, 80);
是往调度器中扔进10个连接的协程,说明一下,这个ip地址是我服务器的ip地址,我没做防护,是的,没有做防护,所以我拿来测试,大家不要搞我啊(泪目)
,先来看test_connect协程
buffer_t和socket_t均是libfuture中定义的,在libfuture.h头文件中引入,
BUF_LEN是一个宏,被定义为10240,
open_connection是一个用于打开一个连接的协程,在连接成功之前会一直挂起当连接成功后会恢复执行,返回值为是否连接成功,由于是模拟http的请求,要发送的字符串为下
然后开始发送
buffer.push,见名知义,往缓冲区中推入数据,然后使用buffer_write发送数据,返回值为是否超时,因为我加上了超时的机制,同样,在消息发送出去前会一直挂起,
然后把缓冲区清空,然后再读取,同样流程,然后如何返回后有数据,将缓冲区的最后一位设置为字符串结尾,这个大家都应该知道吧,然后打印出来,具体流程就是这样,在上下的切换中回去执行其他的协程,因为在调度器中我加入了10个协程,以下是结果
瞬间所有请求处理完毕,看右边的拉条,10次的请求你全部打印出来了(毕竟是自己的服务器,没有防护。。。),至于为什么乱码,不用说,windows控制台gbk编码。看到这里,各位看官老爷是不是很有想法了呢,
继续,拿出一个服务器的栗子
#include "libfuture.h" #include <iostream> #include <sstream> #include <string> using namespace std; using namespace libfuture; #define BUF_LEN 10240 future_t<> test_send_and_recv(socket_t* client_socket, string addr) { buffer_t buffer(BUF_LEN + 1); while (true) { buffer.clear(); //超时时间为5秒 bool is_timeout = co_await buffer_read(&buffer, client_socket, 5s); if (is_timeout) { cout << "读取超时" << endl; break; } if (buffer.has_data()) { //防止烫烫烫烫烫烫烫烫烫烫烫烫烫或屯屯屯屯屯屯屯屯屯屯屯屯屯屯 int len = buffer.data_len(); if (len > BUF_LEN) len = BUF_LEN; buffer.data()[len] = 0; cout << "recv from " << addr << ":" << buffer.data() << endl; //超时时间为5秒 bool is_timeout = co_await buffer_write(&buffer, client_socket, 5000ms); if (is_timeout) { cout << "发送超时" << endl; break; } } else { client_socket->close(); break; } } cout << "client leave" << endl; delete client_socket; co_return; } future_t<> test_accept() { socket_t* client_socket = nullptr; while (true) { client_socket = new socket_t(); //在接收到客户端之前会一直挂起 sockaddr_in* client_addr = co_await open_accept(client_socket); stringstream ss; ss << inet_ntoa(client_addr->sin_addr) << ":"; ss << ntohs(client_addr->sin_port); cout << ss.str() << " join" << endl; //开启一个协程来处理这个socket的接收和发送数据 cpp test_send_and_recv(client_socket, ss.str()); } co_return; } int main(int argc, char** argv) { #ifdef _WIN32 WSADATA _data; WSAStartup(MAKEWORD(2, 2), &_data); #endif auto sche = current_scheduler(); socket_t* listen_socket = new socket_t(AF_INET, SOCK_STREAM, 0); listen_socket->reuse_addr(); listen_socket->bind(8000, "127.0.0.1"); listen_socket->listen(128); sche->set_init_sockfd(listen_socket->sockfd()); //要成为一个服务端必须要设置一个监听套接字进行初始化 sche->init(); //开启一个协程 cpp test_accept(); sche->run_until_no_task(); #ifdef _WIN32 WSACleanup(); #endif return 0; }
流程我就不再讲了,我直接运行走起,要说明的只有一点,要成为一个服务端,要设置一个监听套接字然后初始化,对,是因为坑*的windows,iocp简直不是人
再后来我直接打开两个telnet
用过windows的telnet的都知道windows的telnet每按一下就会发送出去,,,
都开始连接
都开始进入状态了,现在我连个客户端都可以发送信息,无堵塞,注意,我这个程序是单线程的,但使用协程方式的异步io就是
很强
后面我还没写到这里,直接就超时了,因为我代码里写的超时都是5秒,,,我直接改成100秒然后编译运行开始连接
开始异步收发消息
关掉telnet后,也是会提示退出,同样,buffer那一行提示可以忽视啊,我还没完善错误打印
我给大家准备好了本地下载地址
testlibfuture的代码
libfuture0.0.6代码
解压后的sample文件夹中有所有的栗子
最后
我觉得一个协程库要具备一个简单声明协程的方式,还有就是要有一个处理所有协程的调度器,协程能够直接调用另外一个协程,libfuture是能做到的,基于这个调度器要实现休眠协程,数据的协程,协程锁,要让用户能将自己写的协程嵌入这个调度器,实在是python的协程库实实在在的做到了这一点,但是要在c++中实现这些,真的是无比困难,只有一步一步的探索。
文件夹下有所有的栗子代码,可以一步一步调试,协程是如何创建的,调度器是怎么执行的;说明,linux上也可以直接编译使用的,我用的是gcc10.2.0,makefile直接在src下跟源代码一个路径,好了,到此为止,第一次写文,乱七八糟,敬请见谅。