LeoHan 2020-06-13
好的,TinyWebServer我们讲了八个模块中的5个,还剩下数据库mysql模块,定时器timer模块,日记log模块。
(更新中~~~~~~)
mysql模块
项目中有简单的注册和登录功能,所以要使用到数据库。那么mysql模块就是数据库相关的模块,主要的其实就是数据库连接池。
首先数据库连接池是只有一个的,那么怎么保证从项目的每一个地方获取都是这个唯一的一个数据库连接池呢?欸,想到什么了?单例模式。在这里我们使用Cpp11下简洁的静态局部变量实现单例模式:
connection_pool* connection_pool::GetInstance() { static connection_pool connPool; return &connPool; }
原理是是C++11标准规定了当一个线程正在初始化一个变量的时候,其他线程必须得等到该初始化完成以后才能访问它。
OK我们上面解决了池子的问题,接下来我们考虑到,WebServer要有一定的并发度,所以我们要有多个数据库连接资源放在数据库连接池,当任务线程需要数据库连接的时候就向池子申请一个。好的,那么我们便有了一个问题:怎样保证数据库连接的线程安全?
我们先得有一个保存数据库连接的数据结构list,当然我们得先保证池子(或者说list的安全)的线程安全,所以有一个池子的互斥锁lock,然后我们保证list中连接资源的安全,所以再有一个信号量reserve,用于管理池子的空闲连接数。
有了这两个池子接下来的就是比较常规的做法了:在获取/归还数据库连接资源前先用互斥锁对池子加锁,然后用信号量保证list空闲连接资源数。
#include<stdio.h> #include<mysql/mysql.h> #include<string.h> #include<list> #include<pthread.h> #include<iostream> #include"sql_connection_pool.h" using namespace std; //构造函数 connection_pool::connection_pool() { this->CurConn = 0; this->FreeConn = 0; this->MaxConn = 0; } //单例模式,静态 connection_pool* connection_pool::GetInstance() { static connection_pool connPool; return &connPool; } //真正的初始化函数 void connection_pool::init(string url, string User, string PassWord, string DBName, int Port, unsigned int MaxConn) { this->url = url; this->Port = Port; this->User = User; this->PassWord = PassWord; this->DatabaseName = DBName; //先互斥锁锁住池子,创造MaxConn数据库链接 lock.lock(); for (int i = 0; i < MaxConn; i++) { //新创建一个连接资源 MYSQL* con = NULL; con = mysql_init(con); if (con == NULL) { cout << "mysqlinit Error:" << mysql_error(con)<<endl; exit(1); } con = mysql_real_connect(con, url.c_str(), User.c_str(), PassWord.c_str(), DBName.c_str(), Port, NULL, 0); if (con == NULL) { cout << "mysql connect Error:" << mysql_error(con) << endl; exit(1); } //把这个资源放入链表 connList.push_back(con); ++FreeConn; } //初始化信号量和池子数量 reserve = sem(FreeConn); this->MaxConn = FreeConn; lock.unlock(); } //请求获取一个连接资源 MYSQL* connection_pool::GetConnection() { MYSQL* con = NULL; if (connList.size() == 0) return NULL; //请求一个资源,互斥锁/信号量 准备 reserve.wait(); lock.lock(); con = connList.front(); //从链表头取得一个资源 connList.pop_front(); --FreeConn; ++CurConn; lock.unlock(); return con; } //获取空闲连接数 int connection_pool::GetFreeConn() { return this->FreeConn; } //释放当前连接资源con bool connection_pool::ReleaseConnection(MYSQL* con) { if (con == NULL) return false; lock.lock(); connList.push_back(con); ++FreeConn; --CurConn; reserve.post(); lock.unlock(); return true; } //析构函数 connection_pool::~connection_pool() { DestroyPool(); } //销毁整个数据库连接池 void connection_pool::DestroyPool() { lock.lock(); if (connList.size() > 0) { list<MYSQL*>::iterator it; for (it = connList.begin(); it != connList.end(); it++) { MYSQL* con = *it; mysql_close(con); //获得每一个连接资源close掉 } CurConn = 0; FreeConn = 0; connList.clear(); } lock.unlock(); } //连接池RAII connectionRAII::connectionRAII(MYSQL** SQL, connection_pool* connPool) { *SQL = connPool->GetConnection(); conRAII = *SQL; poolRAII = connPool; } connectionRAII::~connectionRAII() { poolRAII->ReleaseConnection(conRAII); }
mysql模块
timer模块
定时器模块的功能是定时检查长时间无反应的连接,如果有服务器这边就主动断开连接。那么我们怎样做到这一点呢?博主个人感觉有两个关键问题:
① 定时器事件应该以一种怎样的方式去触发
这个问题其实很有意思,通常我们以前学习到处理信号的方式是把信号发生之后的要处理的逻辑全部放在信号的回调函数中。在这时候我们也许忽略了一个事实:在Linux环境下当我们回调一个信号的回调函数时候这段时间系统会忽略至少这个同样的信号(这是当然的不然就有可能死循环等出错),那么我们为了不让这些被忽略的信号被忽略太久,我们得想尽办法尽量缩短这个回调函数的执行时间。那么怎样才能做到这样呢?
一个理所当然的思路是:把回调函数的逻辑搬到主函数执行。那么怎样做到这一点:统一事件源。原理很简单,这时我们的信号回调函数不要处理逻辑,而是在回调函数中通过管道给主函数发送信息,那么当主函数监听到读时间并且判断到是从管道读端来的,那就知道这个信号到了我主函数应该处理了。
② 定时器以及应该以怎么样的数据结构来保存
在游双的《高性能服务器编程》这本书里面写到三种定时器的存储结构:链表、时间轮、时间堆。这个TinyWebServer使用的是最好实现的链表定时器。
我们有一个定时器结点类util_timer,每个结点表示一个客户连接,它保存了双向链表的前后指针,客户数据client_data和回调函数。如果我们判断到这个结点长时间无反应,所以我们调用这个回调函数传入client_data,然后回调函数就会把这个客户断开,并且做一些善后工作。
我们还有链表类sort_timer_lst,这个链表是一个时间递增的结点链表,即从链表头到尾这个客户的最后一次反应时间是递增的。这个链表类当然有插入和删除结点函数。并且还有adjust_timer调整链表位置函数,作用是当一个客户有了反应,那么我们需要更新他的最后一次反应时间,那么为了维护链表的递增特性,我们需要这么一个调整位置的函数。此外,这个类还有一个检查函数(定时清扫),作用是我们上文提到统一了事件源,把信号回调函数逻辑搬到主函数执行,所以这个定时清扫检查逻辑就是在这个检查函数。主函数判断到信号来了,就执行这个函数进行检查链表中长时间无反应的结点进行清扫。
#ifndef LST_TIMER #define LST_TIMER #include<time.h> #include<netinet/in.h> class util_timer; struct client_data { sockaddr_in address; int sockfd; util_timer* timer; //客户对应的定时器,和25行相互 }; //链表结点,包含事件和客户数据 class util_timer { public: util_timer() : prev(NULL),next(NULL) {} public: time_t expire; //记录时间 //!!!定时器的执行函数,到时间就调用这个 void (*cb_func)(client_data*); client_data* user_data; //客户数据,和12行相互 util_timer* prev; //双向链表 util_timer* next; //双向链表 }; //链表,按事件升序排序 class sort_timer_lst { public: //链表的构造与析构函数 sort_timer_lst() : head(NULL), tail(NULL) {}; ~sort_timer_lst() { util_timer* tmp = head; while (tmp) { head = tmp->next; delete tmp; tmp = head; } } //插入结点 void add_timer(util_timer* timer) { if (!timer) return; if (!head) { head = tail = timer; return; } //如果新的定时器超时时间小于当前头部结点 //直接将当前定时器结点作为头部结点 if (timer->expire < head->expire) { timer->next = head; head->prev = timer; head = timer; return; } //至少不是插入到头,调用函数继续插入 add_timer(timer, head); } //调整定时器,任务发生变化时,调整定时器在链表中的位置 void adjust_timer(util_timer* timer) { if (!timer) return; util_timer* tmp = timer->next; //因为只会增加,所以如果在最后肯定无需调整 if (!tmp || (timer->expire < tmp->expire)) return; //分两种情况:头/非头。思路都是先删除,再调用插入函数重新插入 if (timer == head) { head = head->next; head->prev = NULL; timer->next = NULL; add_timer(timer, head); } else { timer->prev->next = timer->next; timer->next->prev = timer->prev; add_timer(timer, timer->next); } } //删除定时器 void del_timer(util_timer* timer) { if (!timer) return; //即整个链表就剩下一个结点,直接删除 if (timer == head && timer == tail) { delete timer; head = NULL; tail = NULL; return; } //被删除的定时器为头结点 if (timer == head) { head = head->next; head->prev = NULL; delete timer; return; } //被删除的是尾结点 if (timer == tail) { tail = tail->prev; tail->next = NULL; delete timer; return; } //不是头尾,普通删除 timer->prev->next = timer->next; timer->next->prev = timer->prev; delete timer; return; } //定时任务处理函数 void tick() { if (!head) return; time_t cur = time(NULL); //获取当前时间 util_timer* tmp = head; while (tmp) { if (cur < tmp->expire) break; //就到这里了,后面的执行时间都还没到 tmp->cb_func(tmp->user_data); //满足超市条件,调用cb_func删除连接 //执行完之后,删除链表头并移动头 head = tmp->next; if (head) head->prev = NULL; delete tmp; tmp = head; } } private: //把timer插入到链表中,经过上面的检测到这里至少不是插入到头 void add_timer(util_timer* timer, util_timer* lst_head) { util_timer* prev = lst_head; util_timer* tmp = prev->next; //遍历当前结点之后的链表,按照超时时间找到目标定时器对应的位置,常规双向链表插入操作 while (tmp) { //插入到prev后,tmp之前 if (timer->expire < tmp->expire) { prev->next = timer; timer->next = tmp; tmp->prev = timer; timer->prev = prev; break; } prev = tmp; tmp = tmp->next; } //上面没有插入成功,证明要插入到最后面 if (!tmp) { prev->next = timer; timer->prev = prev; timer->next = NULL; tail = timer; } } private: util_timer* head; util_timer* tail; }; #endif
timer模块
log模块
log是日志模块,一个合格的服务器当然少不了日志来记录错误异常等等信息。我们想设计一个日志模块,他能顺利写日志但是又不要占用主线程时间去写,所以我们设计异步写日志的模块。
怎么是异步写日志呢?我们考虑设计一个日志队列,这个队列主要是用一个循环数组模拟队列来存储日志,这里要注意这个队列只是存储我们真正的目的是要写到文件里,所以只是存储并未达到目的。但是考虑到文件IO操作是比较慢的,所以我们采用的异步IO就是先写到内存里,然后日志线程自己有空的时候写到文件里。
所以这一模块的关键就是日志队列和写日志的线程。
先来思考日志队列,他的需求就是时不时会有一段日志塞到这个队列中,又时不时会有这其中的一段日志被取出来,那么当然是队列不满才能往里塞,队列不空才能有东西取出来。稍加思考这是什么?欸,就是经典的生产者消费者模型。所以也就没什么好说的了,常规处理:要一个互斥锁和信号量,操作前都加锁就行。
#ifndef BLOCK_QUEUE_H #define BLOCK_QUEUE_H #include <iostream> #include <stdlib.h> #include <pthread.h> #include <sys/time.h> #include "../lock/locker.h" using namespace std; template <class T> class block_queue { public: block_queue(int max_size = 1000) { if (max_size <= 0) { exit(-1); } m_max_size = max_size; m_array = new T[max_size]; m_size = 0; m_front = -1; m_back = -1; } void clear() { m_mutex.lock(); m_size = 0; m_front = -1; m_back = -1; m_mutex.unlock(); } ~block_queue() { m_mutex.lock(); if (m_array != NULL) delete[] m_array; m_mutex.unlock(); } //判断队列是否满了 bool full() { m_mutex.lock(); if (m_size >= m_max_size) { m_mutex.unlock(); return true; } m_mutex.unlock(); return false; } //判断队列是否为空 bool empty() { m_mutex.lock(); if (0 == m_size) { m_mutex.unlock(); return true; } m_mutex.unlock(); return false; } //返回队首元素 bool front(T& value) { m_mutex.lock(); if (0 == m_size) { m_mutex.unlock(); return false; } value = m_array[m_front]; m_mutex.unlock(); return true; } //返回队尾元素 bool back(T& value) { m_mutex.lock(); if (0 == m_size) { m_mutex.unlock(); return false; } value = m_array[m_back]; m_mutex.unlock(); return true; } int size() { int tmp = 0; m_mutex.lock(); tmp = m_size; m_mutex.unlock(); return tmp; } int max_size() { int tmp = 0; m_mutex.lock(); tmp = m_max_size; m_mutex.unlock(); return tmp; } //往队列添加元素,需要将所有使用队列的线程先唤醒 //当有元素push进队列,相当于生产者生产了一个元素 //若当前没有线程等待条件变量,则唤醒无意义 bool push(const T& item) { m_mutex.lock(); if (m_size >= m_max_size) { m_cond.broadcast(); m_mutex.unlock(); return false; } m_back = (m_back + 1) % m_max_size; m_array[m_back] = item; m_size++; m_cond.broadcast(); m_mutex.unlock(); return true; } //pop时,如果当前队列没有元素,将会等待条件变量 bool pop(T& item) { m_mutex.lock(); while (m_size <= 0) { if (!m_cond.wait(m_mutex.get())) { m_mutex.unlock(); return false; } } m_front = (m_front + 1) % m_max_size; item = m_array[m_front]; m_size--; m_mutex.unlock(); return true; } //增加了超时处理 bool pop(T& item, int ms_timeout) { struct timespec t = { 0, 0 }; struct timeval now = { 0, 0 }; gettimeofday(&now, NULL); m_mutex.lock(); if (m_size <= 0) { t.tv_sec = now.tv_sec + ms_timeout / 1000; t.tv_nsec = (ms_timeout % 1000) * 1000; if (!m_cond.timewait(m_mutex.get(), t)) { m_mutex.unlock(); return false; } } if (m_size <= 0) { m_mutex.unlock(); return false; } m_front = (m_front + 1) % m_max_size; item = m_array[m_front]; m_size--; m_mutex.unlock(); return true; } private: locker m_mutex; cond m_cond; T* m_array; int m_size; int m_max_size; int m_front; int m_back; }; #endif
日志队列
那么剩下的就是写日志线程,这一部分也比较简单就是新建一个线程,这个线程不断while当日志队列有日志就从里面取出来写到文件去,这个过程记得加锁就行。
#include <string.h> #include <time.h> #include <sys/time.h> #include <stdarg.h> #include "log.h" #include <pthread.h> using namespace std; Log::Log() { m_count = 0; m_is_async = false; } Log::~Log() { if (m_fp != NULL) { fclose(m_fp); } } //异步需要设置阻塞队列的长度,同步不需要设置 bool Log::init(const char *file_name, int log_buf_size, int split_lines, int max_queue_size) { //如果设置了max_queue_size,则设置为异步 if (max_queue_size >= 1) { m_is_async = true; m_log_queue = new block_queue<string>(max_queue_size); pthread_t tid; //flush_log_thread为回调函数,这里表示创建线程异步写日志 pthread_create(&tid, NULL, flush_log_thread, NULL); } m_log_buf_size = log_buf_size; m_buf = new char[m_log_buf_size]; memset(m_buf, ‘\0‘, m_log_buf_size); m_split_lines = split_lines; time_t t = time(NULL); struct tm *sys_tm = localtime(&t); struct tm my_tm = *sys_tm; const char *p = strrchr(file_name, ‘/‘); char log_full_name[256] = {0}; if (p == NULL) { snprintf(log_full_name, 255, "%d_%02d_%02d_%s", my_tm.tm_year + 1900, my_tm.tm_mon + 1, my_tm.tm_mday, file_name); } else { strcpy(log_name, p + 1); strncpy(dir_name, file_name, p - file_name + 1); snprintf(log_full_name, 255, "%s%d_%02d_%02d_%s", dir_name, my_tm.tm_year + 1900, my_tm.tm_mon + 1, my_tm.tm_mday, log_name); } m_today = my_tm.tm_mday; m_fp = fopen(log_full_name, "a"); if (m_fp == NULL) { return false; } return true; } void Log::write_log(int level, const char *format, ...) { struct timeval now = {0, 0}; gettimeofday(&now, NULL); time_t t = now.tv_sec; struct tm *sys_tm = localtime(&t); struct tm my_tm = *sys_tm; char s[16] = {0}; switch (level) { case 0: strcpy(s, "[debug]:"); break; case 1: strcpy(s, "[info]:"); break; case 2: strcpy(s, "[warn]:"); break; case 3: strcpy(s, "[erro]:"); break; default: strcpy(s, "[info]:"); break; } //写入一个log,对m_count++, m_split_lines最大行数 m_mutex.lock(); m_count++; if (m_today != my_tm.tm_mday || m_count % m_split_lines == 0) //everyday log { char new_log[256] = {0}; fflush(m_fp); fclose(m_fp); char tail[16] = {0}; snprintf(tail, 16, "%d_%02d_%02d_", my_tm.tm_year + 1900, my_tm.tm_mon + 1, my_tm.tm_mday); if (m_today != my_tm.tm_mday) { snprintf(new_log, 255, "%s%s%s", dir_name, tail, log_name); m_today = my_tm.tm_mday; m_count = 0; } else { snprintf(new_log, 255, "%s%s%s.%lld", dir_name, tail, log_name, m_count / m_split_lines); } m_fp = fopen(new_log, "a"); } m_mutex.unlock(); va_list valst; va_start(valst, format); string log_str; m_mutex.lock(); //写入的具体时间内容格式 int n = snprintf(m_buf, 48, "%d-%02d-%02d %02d:%02d:%02d.%06ld %s ", my_tm.tm_year + 1900, my_tm.tm_mon + 1, my_tm.tm_mday, my_tm.tm_hour, my_tm.tm_min, my_tm.tm_sec, now.tv_usec, s); int m = vsnprintf(m_buf + n, m_log_buf_size - 1, format, valst); m_buf[n + m] = ‘\n‘; m_buf[n + m + 1] = ‘\0‘; log_str = m_buf; m_mutex.unlock(); if (m_is_async && !m_log_queue->full()) { m_log_queue->push(log_str); } else { m_mutex.lock(); fputs(log_str.c_str(), m_fp); m_mutex.unlock(); } va_end(valst); } void Log::flush(void) { m_mutex.lock(); //强制刷新写入流缓冲区 fflush(m_fp); m_mutex.unlock(); }
日志模块
日志模块本身不难理解,其实难理解的是写日志函数中的各种宏以及文件/字符串函数的灵活应用。
参考资料:
TinyWebServer项目地址:https://github.com/qinguoyi/TinyWebServer
单例模:https://light-city.club/sc/design_pattern/singleton/singleton/