盒子
盒子
Posts List
  1. SPSCQueue

SPSCQueue 介绍

SPSCQueue

A fast single-producer, single-consumer lock-free queue for C++, boost library

无锁队列之单生产者单消费者

现实应用场景中经常会用到单生产者单消费者或多生产者多消费者模型,例如一个写线程,接收到数据写入缓冲区,另一个线程读出并处理。为了尽可能减少锁所占用的时间,可使用gcc的一些原子操作来代替pthread_mutex_t或pthread_spinlock_t。

核心思想:预分配一段连续的内存,用minseq表示读的位置,maxseq表示写的位置,当写满或者读完时返回错误;如果不同步有可能发生的错误是:读线程读取maxseq时,刚好写线程在修改maxseq,minseq同上,故使用__sync_add_and_fetch 函数自增0或者1来读取或者修改变量。

注意事项:分配的内存大小一定要足够大,否则可能会造成数据的丢失。
就好比一件仓库,有人不停往里存放东西,有人不停往外取,当仓库较小时,空间满时,可能会出现存放失败,那么这件商品就丢失了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
#pragma once
#include <windows.h>
#include <boost/utility.hpp>
#include "basic_types.h"
// One lock free queue with a great performance with following limitations:
// 1: Only support one consumer and one producer at most.
// 2: Put/Get times can't exceed MAX_UINT64.
// 3: Read/write of m_head/m_tail is atomic (single instruction).
template<typename T>
class SPSCQueue : private boost::noncopyable
{
public:
SPSCQueue() : m_ptrArray(NULL), m_maxQueueLength(0)
{
}
SPSCQueue(UInt64 p_maxQueueLength) : m_ptrArray(NULL)
{
Reset(p_maxQueueLength);
}
// Reset max queue length.
void Reset(UInt64 p_maxQueueLength)
{
LogAssert (p_maxQueueLength > 0);
delete []m_ptrArray;
m_maxQueueLength = p_maxQueueLength;
m_ptrArray = new T*[m_maxQueueLength];
m_head = m_tail = 0;
}
~SPSCQueue()
{
LogAssert(Empty());
delete []m_ptrArray;
}
// Add new object to queue's tail.
void PutWait(T *p_object)
{
int backoff = 0;
while (m_tail - m_head >= m_maxQueueLength)
{
SpinPauseWithBackoff (backoff);
}
m_ptrArray[m_tail % m_maxQueueLength] = p_object;
++m_tail;
}
// Get object from queue's head.
T* GetWait()
{
int backoff = 0;
while (m_tail == m_head)
{
SpinPauseWithBackoff (backoff);
}
T* ret = m_ptrArray[m_head % m_maxQueueLength];
++m_head;
return ret;
}
// Get object from queue's head, caller need to make sure queue is not empty,
// otherwise the result is not defined.
T* Get()
{
T* ret = m_ptrArray[m_head % m_maxQueueLength];
++m_head;
return ret;
}
T* Peek()
{
return m_ptrArray[m_head % m_maxQueueLength];
}
// Peek object from queue's head.
T* PeekWait()
{
int backoff = 0;
while (m_tail == m_head)
{
SpinPauseWithBackoff (backoff);
}
return m_ptrArray[m_head % m_maxQueueLength];
}
UInt64 Size() const
{
return m_tail - m_head;
}
bool Empty() const
{
return m_head == m_tail;
}
private:
static void SpinPauseWithBackoff(int& p_backoff)
{
int yieldCount = p_backoff;
do
{
YieldProcessor ();
}
while (--yieldCount >= 0);
// Backoff is always power of 2 minus 1.
if (p_backoff <= 511)
{
p_backoff = p_backoff * 2 + 1;
}
}
// Buffer array.
T** m_ptrArray;
// Max Queue length.
UInt64 m_maxQueueLength;
// Queue's head index.
__declspec(align(64)) volatile UInt64 m_head;
// Queue's tail index.
__declspec(align(64)) volatile UInt64 m_tail;
};