SPSCQueue 介绍
2019.03.26
呼呼呼山
 热度
℃
SPSCQueue
A fast single-producer, single-consumer lock-free queue for C++, boost library
无锁队列之单生产者单消费者
现实应用场景中经常会用到单生产者单消费者或多生产者多消费者模型,例如一个写线程,接收到数据写入缓冲区,另一个线程读出并处理。为了尽可能减少锁所占用的时间,可使用gcc的一些原子操作来代替pthread_mutex_t或pthread_spinlock_t。
核心思想:预分配一段连续的内存,用minseq表示读的位置,maxseq表示写的位置,当写满或者读完时返回错误;如果不同步有可能发生的错误是:读线程读取maxseq时,刚好写线程在修改maxseq,minseq同上,故使用__sync_add_and_fetch 函数自增0或者1来读取或者修改变量。
注意事项:分配的内存大小一定要足够大,否则可能会造成数据的丢失。
就好比一件仓库,有人不停往里存放东西,有人不停往外取,当仓库较小时,空间满时,可能会出现存放失败,那么这件商品就丢失了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129
| #pragma once #include <windows.h> #include <boost/utility.hpp> #include "basic_types.h" template<typename T> class SPSCQueue : private boost::noncopyable { public: SPSCQueue() : m_ptrArray(NULL), m_maxQueueLength(0) { } SPSCQueue(UInt64 p_maxQueueLength) : m_ptrArray(NULL) { Reset(p_maxQueueLength); } void Reset(UInt64 p_maxQueueLength) { LogAssert (p_maxQueueLength > 0); delete []m_ptrArray; m_maxQueueLength = p_maxQueueLength; m_ptrArray = new T*[m_maxQueueLength]; m_head = m_tail = 0; } ~SPSCQueue() { LogAssert(Empty()); delete []m_ptrArray; } void PutWait(T *p_object) { int backoff = 0; while (m_tail - m_head >= m_maxQueueLength) { SpinPauseWithBackoff (backoff); } m_ptrArray[m_tail % m_maxQueueLength] = p_object; ++m_tail; } T* GetWait() { int backoff = 0; while (m_tail == m_head) { SpinPauseWithBackoff (backoff); } T* ret = m_ptrArray[m_head % m_maxQueueLength]; ++m_head; return ret; } T* Get() { T* ret = m_ptrArray[m_head % m_maxQueueLength]; ++m_head; return ret; } T* Peek() { return m_ptrArray[m_head % m_maxQueueLength]; } T* PeekWait() { int backoff = 0; while (m_tail == m_head) { SpinPauseWithBackoff (backoff); } return m_ptrArray[m_head % m_maxQueueLength]; } UInt64 Size() const { return m_tail - m_head; } bool Empty() const { return m_head == m_tail; } private: static void SpinPauseWithBackoff(int& p_backoff) { int yieldCount = p_backoff; do { YieldProcessor (); } while (--yieldCount >= 0); if (p_backoff <= 511) { p_backoff = p_backoff * 2 + 1; } } T** m_ptrArray; UInt64 m_maxQueueLength; __declspec(align(64)) volatile UInt64 m_head; __declspec(align(64)) volatile UInt64 m_tail; };
|