-
Notifications
You must be signed in to change notification settings - Fork 0
/
RingBuffer_v1.hpp
173 lines (147 loc) · 4.79 KB
/
RingBuffer_v1.hpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
// Copyright 2018 Kaspar Daugaard. For educational purposes only.
// See http://daugaard.org/blog/writing-a-fast-and-versatile-spsc-ring-buffer
#include <atomic>
#include <algorithm>
#ifndef FORCE_INLINE
# if defined(_MSC_VER)
# define FORCE_INLINE __forceinline
# elif defined(__GNUC__)
# define FORCE_INLINE inline __attribute__ ((always_inline))
# else
# define FORCE_INLINE inline
# endif
#endif
#define CACHE_LINE_SIZE 64
class RingBuffer {
public:
// Allocate buffer space for writing.
FORCE_INLINE void* PrepareWrite(size_t size, size_t alignment);
// Publish written data.
FORCE_INLINE void FinishWrite();
// Write an element to the buffer.
template <typename T>
FORCE_INLINE void Write(const T& value) {
void* dest = PrepareWrite(sizeof(T), alignof(T));
new(dest) T(value);
}
// Write an array of elements to the buffer.
template <typename T>
FORCE_INLINE void WriteArray(const T* values, size_t count) {
void* dest = PrepareWrite(sizeof(T) * count, alignof(T));
for (size_t i = 0; i < count; i++)
new(static_cast<T*>(dest) + i) T(values[i]);
}
// Get read pointer. Size and alignment should match written data.
FORCE_INLINE void* PrepareRead(size_t size, size_t alignment);
// Finish and make buffer space available to writer.
FORCE_INLINE void FinishRead();
// Read an element from the buffer.
template <typename T>
FORCE_INLINE const T& Read() {
void* src = PrepareRead(sizeof(T), alignof(T));
return *static_cast<T*>(src);
}
// Read an array of elements from the buffer.
template <typename T>
FORCE_INLINE const T* ReadArray(size_t count) {
void* src = PrepareRead(sizeof(T) * count, alignof(T));
return static_cast<T*>(src);
}
// Initialize. Buffer must have required alignment. Size must be a power of two.
void Initialize(void* buffer, size_t size) {
Reset();
m_Reader.buffer = m_Writer.buffer = static_cast<char*>(buffer);
m_Reader.size = m_Writer.size = m_Writer.end = size;
}
void Reset() {
m_Reader = m_Writer = LocalState();
m_ReaderShared.pos = m_WriterShared.pos = 0;
}
private:
FORCE_INLINE static size_t Align(size_t pos, size_t alignment) {
#ifdef RINGBUFFER_DO_NOT_ALIGN
alignment = 1;
#endif
return (pos + alignment - 1) & ~(alignment - 1);
}
FORCE_INLINE void GetBufferSpaceToWriteTo(size_t& pos, size_t& end);
FORCE_INLINE void GetBufferSpaceToReadFrom(size_t& pos, size_t& end);
// Writer and reader's local state.
struct alignas(CACHE_LINE_SIZE) LocalState {
LocalState() : buffer(nullptr), pos(0), end(0), base(0), size(0) {}
char* buffer;
size_t pos;
size_t end;
size_t base;
size_t size;
};
LocalState m_Writer;
LocalState m_Reader;
// Try to disable CACHE_LINE_SIZE alignment
// Writer and reader's shared positions.
#if 0
struct alignas(CACHE_LINE_SIZE) SharedState {
std::atomic<size_t> pos;
};
#else
struct SharedState {
std::atomic<size_t> pos;
};
#endif
SharedState m_WriterShared;
SharedState m_ReaderShared;
};
void* RingBuffer::PrepareWrite(size_t size, size_t alignment) {
size_t pos = Align(m_Writer.pos, alignment);
size_t end = pos + size;
if (end > m_Writer.end)
GetBufferSpaceToWriteTo(pos, end);
m_Writer.pos = end;
return m_Writer.buffer + pos;
}
void RingBuffer::FinishWrite() {
m_WriterShared.pos.store(m_Writer.base + m_Writer.pos, std::memory_order_release);
}
void* RingBuffer::PrepareRead(size_t size, size_t alignment) {
size_t pos = Align(m_Reader.pos, alignment);
size_t end = pos + size;
if (end > m_Reader.end)
GetBufferSpaceToReadFrom(pos, end);
m_Reader.pos = end;
return m_Reader.buffer + pos;
}
void RingBuffer::FinishRead() {
m_ReaderShared.pos.store(m_Reader.base + m_Reader.pos, std::memory_order_release);
}
void RingBuffer::GetBufferSpaceToWriteTo(size_t& pos, size_t& end) {
if (end > m_Writer.size) {
end -= pos;
pos = 0;
m_Writer.base += m_Writer.size;
}
for (;;) {
size_t readerPos = m_ReaderShared.pos.load(std::memory_order_acquire);
size_t available = readerPos - m_Writer.base + m_Writer.size;
// Signed comparison (available can be negative)
if (static_cast<ptrdiff_t>(available) >= static_cast<ptrdiff_t>(end)) {
m_Writer.end = std::min(available, m_Writer.size);
break;
}
}
}
void RingBuffer::GetBufferSpaceToReadFrom(size_t& pos, size_t& end) {
if (end > m_Reader.size) {
end -= pos;
pos = 0;
m_Reader.base += m_Reader.size;
}
for (;;) {
size_t writerPos = m_WriterShared.pos.load(std::memory_order_acquire);
size_t available = writerPos - m_Reader.base;
// Signed comparison (available can be negative)
if (static_cast<ptrdiff_t>(available) >= static_cast<ptrdiff_t>(end)) {
m_Reader.end = std::min(available, m_Reader.size);
break;
}
}
}