软件设计 2017-06-11
ezCoroutine协程原型库只是个原型库,但是已经能够支持1000K以上数量的协程运行,而且是stackful模式。基本的对外接口有两类,一类是类似Posix线程的接口:协程创建threadCreat,协程回收threadJoin,调度函数switch_to;另外一类是类似lua协程接口,但是有所不同,我们的返回规则更加简单,所以功能也有所限制:挂起协程yield,恢复协程resume;然后是一些辅助函数接口:通过协程号寻找到协程结构体函数findThread,初始化协程函数init,销毁协程函数destroy,destroyAll函数,以及printThread,printLoop,printSharedStack等打印函数。
具体代码放在GitHub上:https://github.com/Yuandong-Chen/ezCoroutine
注意,该库只能在X86/X64 Linux环境下用gnu/gcc集成的编译工具(不能用llvm/clang),代码只支持32位,得用-m32参数,而且得加上-fno-stack-protector参数关闭栈检测机制,否则我不能保证不出现bus IO error 和 segment fault。因为具体实现我用了一些特殊的手段,smash了栈空间,以后会逐步改善。
下面通过几个问题来介绍:
这里给出经典的生产者-消费者模式的例子:
1 /************************* Test *************************/ 2 3 #include "ezco.h" 4 #include <stdio.h> 5 #include <stdlib.h> 6 7 #define NUM 2 8 #define CAL 2 9 Thread_t *tid[2]; 10 int *res1[2]; 11 12 void *produceFunc(void *d) 13 { 14 int j=0; 15 while(1) 16 { 17 j = (int)yield(tid[1],233); 18 printf("produce get: %d\n", j); 19 } 20 21 return ((void *)j); 22 } 23 24 void *consumeFunc(void *d) 25 { 26 int j=0; 27 28 while(1) 29 { 30 j = resume(d,332); 31 printf("consume get: %d\n", j); 32 } 33 34 return ((void *)j); 35 } 36 37 int main(int argc, char const *argv[]) 38 { 39 int i = 0; 40 init(); 41 42 threadCreat(&tid[1],produceFunc,NULL); 43 threadCreat(&tid[0],consumeFunc,tid[1]); 44 45 while (1){ 46 printf("main is grunting... %d\n", i++); 47 sleep(1); 48 switch_to(0); //Give up control to next thread 49 } 50 51 for(i = 0; i<NUM; i++){ 52 threadJoin(tid[i], &res1[i]); //Collect and Release the resourse of tid1 53 } 54 55 printLoop(&dead);printLoop(&live); 56 destroyAll(); 57 return 0; 58 } 59
测试结果如下:
设计与实现的基本原理请参考ucontext.h实现协程(用ucontext组件,所以代码量很小),或者参考longjmp实现协程,或者用内联汇编实现协程,在这里不累述。这里说下为何能支持大量协程。其实我们用了共享栈技术,每个协程共享一个4K或者更大的空间(你可以用堆,用匿名内存映射或者直接开个数组也都是可以的,总之得保证4K页对齐的空间),每个协程自己有私有栈空间指针privateptr,每个时刻只有一个协程在运行,此时栈空间在这个4K共享空间中(当然除了main以外),当切换协程时,动态分配一个堆内存,大小为此时协程栈实际大小(一般都很小,小的只有几十个Bytes, 大的有几百个Bytes,完全不用4KB),然后用privateptr指向它,把现在的栈里头的值复制进去。切换到下一个协程,当然下一个协程和上一个切出的一样,已经有privateptr指向了自己的私有栈空间,而这私有栈空间保存在堆内存中(有点拗口),我们只要用memcpy把这个堆内存中的栈复制回到4K共享空间中,再恢复些寄存器的值(保存在任务控制块结构体中),然后用该协程私有的寄存器的值跳转即可,这样一套操作就完成了运行时上下文切换工作。这样实现最大的开销就是不停的copy进copy出,不停地malloc和不停地free栈空间,这样的代价换来的是空间的节约。有多节约?我写了个测试用例如下:
1 /************************* Test *************************/ 2 3 #include "ezthread.h" 4 #include <stdio.h> 5 #include <stdlib.h> 6 7 #define NUM 1000000 8 #define CAL 2 9 10 void *sum1tod(void *d) 11 { 12 int i, j=0; 13 14 for (i = 1; i <= d; ++i) 15 { 16 j += i; 17 printf("thread %d is grunting... %d\n",live.current->data->tid , i); 18 switch_to(0); // Give up control to next thread 19 } 20 21 return ((void *)j); 22 } 23 24 void *hello(void *d) 25 { 26 int i, j=0; 27 28 for (i = 1; i <= d; ++i) 29 { 30 printf("hello\n"); 31 switch_to(0); // Give up control to next thread 32 } 33 34 return ((void *)j); 35 } 36 37 int main(int argc, char const *argv[]) 38 { 39 int res = 0; 40 int i; 41 init(); 42 Thread_t *tid1[NUM]; 43 int *res1[NUM]; 44 for(i = 0; i<NUM; i++){ 45 threadCreat(&tid1[i], (i%2)?sum1tod:hello, CAL); 46 } 47 48 49 50 for (i = 1; i <= CAL; ++i){ 51 res+=i; 52 printf("main is grunting... %d\n", i); 53 switch_to(0); //Give up control to next thread 54 } 55 56 for(i = 0; i<NUM; i++){ 57 threadJoin(tid1[i], &res1[i]); //Collect and Release the resourse of tid1 58 res += (int)res1[i]; 59 } 60 61 printf("parallel compute: %d\n", (int)res); 62 printLoop(&dead);printLoop(&live); 63 destroyAll(); 64 return 0; 65 }Test
测试结果如下:
简单地解释一下:50W个协程做1+2,并返回计算结果3,另外50W个打印hello,用top指令看了下内存占用率,基本是1022404KB*27.2%/1000000 = 285Bytes/Coroutine, 即峰值是平均每个协程占用285Bytes左右。1500003这个结果是3*500000+3(main也在无聊的计算1+2)得来的。无独有偶,我们这种共享栈实现的原理和云风的协程库是一样的。
对于这个问题,我的理解是,把任务交给编写标准库的人去做吧,当然你也可以实现自己的malloc,free内存分配器,用上slab等流行的方式分配内存,但是这不是我们所要考虑的任务。
理论上,可以拓展成X64,ARM,MIPS,SPARC等等版本,并且能够配合C++使用,但是其中共享栈实现依赖了编译器对volatile关键字处理的行为,能否实现还得看这类CPU的操作系统有没有提供gnu/gcc编译工具。
(a)不依赖编译器行为,重构共享栈的实现。(难度大,本人不希望用ucontext.h组件实现,但以后stackless模式会用setjmp,longjmp实现)
(b)用hash表结合原有的循环链表使得搜索速度达到O(1)。(几乎没难度)
(d)拓展到X64版本等版本。(难度一般)
(e)提供多种实战用例,在实际开发项目中用上。(可行性未知,后果未知)