DigitalWings 2019-09-10
近日有朋友问起线程安全队列的问题。本文基于stl的queue容器实现了线程安全的队列,可多线程生产,多线程消费。同时与基于boost的circular_buffer实现的环形缓冲区相比较,性能略优(实验测试下来优势也不大,不到5%)。源码比较简单,使用stl和boost实现,并且实现了超过队列最大长度丢弃消息的功能。
CPU主要参数:intel 2.3GHz,4核
内存:12G
操作系统:windows7
线程数为0,表示生产线程和消费线程是同一线程,统一生产完后,从消费第一条开始的计算时间。
其他的是在一个线程生产,N(N>=1)个线程消费的情况。
折线图显示如图所示,可以看到使用queue实现的队列性能最优,circular_buffer其次,list稍差。
在simplethreadqueue.h中注释的部分是使用boost的circular_buffer实现的。
simplethreadqueue.h
#ifndef CSIMPLETHREADQUEUE_H #define CSIMPLETHREADQUEUE_H #include <string> #include <queue> #include <iostream> #include "boost/timer.hpp" #include <boost/date_time/posix_time/posix_time.hpp> #include <boost/thread/thread.hpp> #include <boost/circular_buffer.hpp> using namespace std; using namespace boost; extern long g_nMaxCount; template <class T> class CSimpleThreadQueue { public: CSimpleThreadQueue():m_nSize(0),m_nConsumed(0){/*m_container.resize(g_nMaxCount);*/} size_t size(){return m_nSize;} long GetConsumed(){return m_nConsumed;} void EnQueue(T item) { m_mutex.lock(); while(m_nSize >= g_nMaxCount) { m_container.pop(); --m_nSize; } m_container.push(item); //m_container.push_back(item); ++m_nSize; m_cond.notify_one(); m_mutex.unlock(); } void Dequeue(T& item) { while (true) { m_mutex.lock(); if ( m_nSize > 0) break; m_cond.wait_for(m_mutex, boost::chrono::microseconds(1)); m_mutex.unlock(); } item = m_container.front(); m_container.pop(); //m_container.pop_front(); -- m_nSize; ++m_nConsumed; m_mutex.unlock(); } private: std::queue<T> m_container; //circular_buffer<T> m_container; size_t m_nSize; boost::mutex m_mutex; condition_variable_any m_cond; long m_nConsumed; }; #endif
main.cpp
#include "simplethreadqueue.h" #include <boost/date_time/posix_time/posix_time.hpp> long g_nMaxCount = 500000;//100w bool g_bRunning = true; CSimpleThreadQueue<string> g_queue; boost::mutex g_mutex; void CallbackMethod(string& strMessage) { int sum = 0; for(int i = 0; i < 1000; ++ i) sum += i; //cout<<strMessage<<endl; } void ProduceMessageInit() { for(long long i = 0; i < g_nMaxCount; ++ i) g_queue.EnQueue("Test message."/*std::to_string(i)*/); } void ProduceMessage() { //static long long i = 0; while(g_bRunning) { g_queue.EnQueue("Test message."/*std::to_string(++i)*/); } } void ConsumeMessage() { string strMessge; //static timer t; static boost::posix_time::ptime t1 = boost::posix_time::microsec_clock::universal_time(); //static long nCount = 0; if(g_queue.size() > 0 && g_queue.GetConsumed() < g_nMaxCount) { g_queue.Dequeue(strMessge); //++ nCount; } else { g_mutex.lock(); if(g_bRunning) { g_bRunning = false; boost::posix_time::ptime t2 = boost::posix_time::microsec_clock::universal_time(); cout<<g_queue.GetConsumed()<<" consumed!"<<endl; cout<<t2 - t1 <<"s"<<endl; } g_mutex.unlock(); } CallbackMethod(strMessge); } void ConsumeAllMessage() { while(g_bRunning) { ConsumeMessage(); } } int main(int argc, char* argv[]) { if(argc <= 1)//单线程先生产消息再消费模型 { ProduceMessageInit(); ConsumeAllMessage(); return 0; } //单线程生产多线程消费模型 ProduceMessageInit(); thread_group tg; tg.create_thread(boost::bind(ProduceMessage)); int nThreadCount = atoi(argv[1]); if(nThreadCount <= 0) return -1; for(int i = 0; i < nThreadCount ; ++i) tg.create_thread(boost::bind(ConsumeAllMessage)); tg.join_all(); return 0; }