使用stl的queue实现线程安全队列

DigitalWings 2019-09-10

简介

近日有朋友问起线程安全队列的问题。本文基于stl的queue容器实现了线程安全的队列,可多线程生产,多线程消费。同时与基于boost的circular_buffer实现的环形缓冲区相比较,性能略优(实验测试下来优势也不大,不到5%)。源码比较简单,使用stl和boost实现,并且实现了超过队列最大长度丢弃消息的功能。

实验环境准备

CPU主要参数:intel 2.3GHz,4核
内存:12G
操作系统:windows7

实验结果

线程数为0,表示生产线程和消费线程是同一线程,统一生产完后,从消费第一条开始的计算时间。
其他的是在一个线程生产,N(N>=1)个线程消费的情况。
使用stl的queue实现线程安全队列
折线图显示如图所示,可以看到使用queue实现的队列性能最优,circular_buffer其次,list稍差。
使用stl的queue实现线程安全队列

源码

在simplethreadqueue.h中注释的部分是使用boost的circular_buffer实现的。
simplethreadqueue.h

#ifndef CSIMPLETHREADQUEUE_H
#define CSIMPLETHREADQUEUE_H
#include <string>
#include <queue>
#include <iostream>
#include "boost/timer.hpp"
#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/thread/thread.hpp> 
#include <boost/circular_buffer.hpp>

using namespace std;
using namespace boost;
extern long g_nMaxCount;

template <class T>
class CSimpleThreadQueue
{
public:
    CSimpleThreadQueue():m_nSize(0),m_nConsumed(0){/*m_container.resize(g_nMaxCount);*/}
    size_t size(){return m_nSize;}
    long GetConsumed(){return m_nConsumed;}
    void EnQueue(T item)
    {
        m_mutex.lock();    
        while(m_nSize >= g_nMaxCount)
        {
            m_container.pop();
            --m_nSize;
        }
        m_container.push(item);

        //m_container.push_back(item);
        ++m_nSize;
        m_cond.notify_one();
        m_mutex.unlock();
    }
    void Dequeue(T& item)
    {
        while (true)
        {
            m_mutex.lock();
            if ( m_nSize > 0)
                break;
            m_cond.wait_for(m_mutex, boost::chrono::microseconds(1));
            m_mutex.unlock();
        }

        item = m_container.front();
        m_container.pop();
        //m_container.pop_front();
        -- m_nSize;
        ++m_nConsumed;
        m_mutex.unlock();
    }
    
private:
    std::queue<T> m_container;
    //circular_buffer<T> m_container;
    size_t m_nSize;
    boost::mutex m_mutex;
    condition_variable_any m_cond;
    long m_nConsumed;
};
#endif

main.cpp

#include "simplethreadqueue.h"
#include <boost/date_time/posix_time/posix_time.hpp>

long g_nMaxCount = 500000;//100w
bool g_bRunning = true;
CSimpleThreadQueue<string> g_queue;
boost::mutex g_mutex;

void CallbackMethod(string& strMessage)
{
    int sum = 0;
    for(int i = 0; i < 1000; ++ i)
         sum += i;
    //cout<<strMessage<<endl;
}

void ProduceMessageInit()
{
    for(long long i = 0; i < g_nMaxCount; ++ i)
        g_queue.EnQueue("Test message."/*std::to_string(i)*/);
}

void ProduceMessage()
{
    //static long long i = 0;
    while(g_bRunning)
    {
        g_queue.EnQueue("Test message."/*std::to_string(++i)*/);
    }
}

void ConsumeMessage()
{
    string strMessge;
    //static timer t;
    static boost::posix_time::ptime t1 = boost::posix_time::microsec_clock::universal_time();
    //static long nCount = 0;
    if(g_queue.size() > 0 && g_queue.GetConsumed() < g_nMaxCount)
    {
        g_queue.Dequeue(strMessge);
        //++ nCount;
    }
    else
    {
        g_mutex.lock();
        if(g_bRunning)
        {
            g_bRunning = false;
            boost::posix_time::ptime t2 = boost::posix_time::microsec_clock::universal_time();
            cout<<g_queue.GetConsumed()<<" consumed!"<<endl;
            cout<<t2 - t1 <<"s"<<endl;
        }
        g_mutex.unlock();
    }
    CallbackMethod(strMessge);
}

void ConsumeAllMessage()
{
    while(g_bRunning)
    {
        ConsumeMessage();
    }
}

int main(int argc, char* argv[])
{
    if(argc <= 1)//单线程先生产消息再消费模型
    {
        ProduceMessageInit();
        ConsumeAllMessage();
        return 0;
    }
    //单线程生产多线程消费模型
    ProduceMessageInit();
    
    thread_group tg;
    tg.create_thread(boost::bind(ProduceMessage));
    int nThreadCount = atoi(argv[1]);
    if(nThreadCount <= 0)
        return -1;
    for(int i = 0; i < nThreadCount ; ++i)
        tg.create_thread(boost::bind(ConsumeAllMessage));
    tg.join_all();
    return 0;
}

相关推荐