GCC Code Coverage Report
Directory: ./ Exec Total Coverage
File: src/ringbuf.c Lines: 0 159 0.0 %
Date: 2020-12-10 21:44:00 Branches: 0 68 0.0 %

Line Branch Exec Source
1
/*
2
 * Copyright (c) 2016-2017 Mindaugas Rasiukevicius <rmind at noxt eu>
3
 * All rights reserved.
4
 *
5
 * Use is subject to license terms, as specified in the LICENSE file.
6
 */
7
8
/*
9
 * Atomic multi-producer single-consumer ring buffer, which supports
10
 * contiguous range operations and which can be conveniently used for
11
 * message passing.
12
 *
13
 * There are three offsets -- think of clock hands:
14
 * - NEXT: marks the beginning of the available space,
15
 * - WRITTEN: the point up to which the data is actually written.
16
 * - Observed READY: point up to which data is ready to be written.
17
 *
18
 * Producers
19
 *
20
 *	Observe and save the 'next' offset, then request N bytes from
21
 *	the ring buffer by atomically advancing the 'next' offset.  Once
22
 *	the data is written into the "reserved" buffer space, the thread
23
 *	clears the saved value; these observed values are used to compute
24
 *	the 'ready' offset.
25
 *
26
 * Consumer
27
 *
28
 *	Writes the data between 'written' and 'ready' offsets and updates
29
 *	the 'written' value.  The consumer thread scans for the lowest
30
 *	seen value by the producers.
31
 *
32
 * Key invariant
33
 *
34
 *	Producers cannot go beyond the 'written' offset; producers are
35
 *	also not allowed to catch up with the consumer.  Only the consumer
36
 *	is allowed to catch up with the producer i.e. set the 'written'
37
 *	offset to be equal to the 'next' offset.
38
 *
39
 * Wrap-around
40
 *
41
 *	If the producer cannot acquire the requested length due to little
42
 *	available space at the end of the buffer, then it will wraparound.
43
 *	WRAP_LOCK_BIT in 'next' offset is used to lock the 'end' offset.
44
 *
45
 *	There is an ABA problem if one producer stalls while a pair of
46
 *	producer and consumer would both successfully wrap-around and set
47
 *	the 'next' offset to the stale value of the first producer, thus
48
 *	letting it to perform a successful CAS violating the invariant.
49
 *	A counter in the 'next' offset (masked by WRAP_COUNTER) is used
50
 *	to prevent from this problem.  It is incremented on wraparounds.
51
 *
52
 *	The same ABA problem could also cause a stale 'ready' offset,
53
 *	which could be observed by the consumer.  We set WRAP_LOCK_BIT in
54
 *	the 'seen' value before advancing the 'next' and clear this bit
55
 *	after the successful advancing; this ensures that only the stable
56
 *	'ready' observed by the consumer.
57
 */
58
59
#include <stdio.h>
60
#include <stdlib.h>
61
#include <stddef.h>
62
#include <stdbool.h>
63
#include <inttypes.h>
64
#include <string.h>
65
#include <limits.h>
66
#include <errno.h>
67
68
#include "ringbuf.h"
69
#include "ringbuf_utils.h"
70
71
#define	RBUF_OFF_MASK	(0x00000000ffffffffUL)
72
#define	WRAP_LOCK_BIT	(0x8000000000000000UL)
73
#define	RBUF_OFF_MAX	(UINT64_MAX & ~WRAP_LOCK_BIT)
74
75
#define	WRAP_COUNTER	(0x7fffffff00000000UL)
76
#define	WRAP_INCR(x)	(((x) + 0x100000000UL) & WRAP_COUNTER)
77
78
typedef uint64_t	ringbuf_off_t;
79
typedef uint64_t	worker_off_t;
80
typedef uint64_t	registered_t;
81
82
enum
83
{
84
	not_registered,
85
	being_registered,	/* Being registered in register_worker() */
86
	perm_registered,	/* Registered in ringbuf_register() */
87
	temp_registered		/* Registered in ringbuf_acquire() */
88
};
89
90
struct ringbuf_worker {
91
	volatile ringbuf_off_t	seen_off;
92
	registered_t		registered;
93
};
94
95
struct ringbuf {
96
	/* Ring buffer space. */
97
	size_t			space;
98
99
	/*
100
	 * The NEXT hand is atomically updated by the producer.
101
	 * WRAP_LOCK_BIT is set in case of wrap-around; in such case,
102
	 * the producer can update the 'end' offset.
103
	 */
104
	volatile ringbuf_off_t	next;
105
	ringbuf_off_t		end;
106
107
	/* The index of the first potentially free worker-record. */
108
	worker_off_t		first_free_worker;
109
110
	/* The following are updated by the consumer. */
111
	ringbuf_off_t		written;
112
	unsigned		nworkers, ntempworkers;
113
	ringbuf_worker_t	workers[];
114
};
115
116
/*
117
 * ringbuf_setup: initialise a new ring buffer of a given length.
118
 */
119
int
120
ringbuf_setup(ringbuf_t *rbuf, unsigned nworkers, unsigned ntempworkers, size_t length)
121
{
122
	if (length >= RBUF_OFF_MASK) {
123
		errno = EINVAL;
124
		return -1;
125
	}
126
	memset(rbuf, 0, offsetof(ringbuf_t, workers[nworkers + ntempworkers]));
127
	rbuf->space = length;
128
	rbuf->end = RBUF_OFF_MAX;
129
	rbuf->nworkers = nworkers;
130
	rbuf->ntempworkers = ntempworkers;
131
	return 0;
132
}
133
134
/*
135
 * ringbuf_get_sizes: return the sizes of the ringbuf_t and ringbuf_worker_t.
136
 */
137
void
138
ringbuf_get_sizes(unsigned nworkers, unsigned ntempworkers,
139
    size_t *ringbuf_size, size_t *ringbuf_worker_size)
140
{
141
	if (ringbuf_size)
142
		*ringbuf_size = offsetof(ringbuf_t, workers[nworkers + ntempworkers]);
143
	if (ringbuf_worker_size)
144
		*ringbuf_worker_size = sizeof(ringbuf_worker_t);
145
}
146
147
/*
148
 * register_worker: allocate a worker-record for a thread/process,
149
 * and pass back the pointer to its local store.
150
 * Returns NULL if none are available.
151
 */
152
static ringbuf_worker_t *
153
register_worker(ringbuf_t *rbuf, unsigned registration_type)
154
{
155
	worker_off_t volatile *p_free_worker;
156
	int acquired, state;
157
	ringbuf_worker_t *w = NULL;
158
159
	/* Try to find a worker-record that can be registered. */
160
	p_free_worker = &rbuf->first_free_worker;
161
	acquired = false;
162
	while (!acquired) {
163
		worker_off_t prev_free_worker, i;
164
165
		/* Get the index of the first worker-record to try registering. */
166
		prev_free_worker = *p_free_worker;
167
168
		for (i = 0; !acquired && i < rbuf->ntempworkers; ++i) {
169
			worker_off_t new_free_worker;
170
171
			/* Prepare to acquire a worker-record index. */
172
			new_free_worker = ((prev_free_worker & RBUF_OFF_MASK)
173
				+ i) % rbuf->ntempworkers;
174
175
			/* Try to acquire a worker-record. */
176
			w = &rbuf->workers[new_free_worker + rbuf->nworkers];
177
            state = not_registered;
178
			if (!atomic_compare_exchange_weak(&w->registered, &state, being_registered))
179
				continue;
180
			acquired = true;
181
			w->seen_off = RBUF_OFF_MAX;
182
			atomic_thread_fence(memory_order_release);
183
			w->registered = registration_type;
184
185
			/* Advance the index if no one else has. */
186
			new_free_worker |= WRAP_INCR(prev_free_worker);
187
			atomic_compare_exchange_weak(p_free_worker, &prev_free_worker, new_free_worker);
188
		}
189
190
		/*
191
		 * If no worker-record could be registered, and no one else was
192
		 * trying to register at the same time, then stop searching.
193
		 */
194
		if (!acquired && (*p_free_worker) == prev_free_worker)
195
			break;
196
	}
197
198
	/* Register this worker-record. */
199
	return w;
200
}
201
202
/*
203
 * ringbuf_register: register the worker (thread/process) as a producer
204
 * and pass the pointer to its local store.
205
 */
206
ringbuf_worker_t *
207
ringbuf_register(ringbuf_t *rbuf, unsigned i)
208
{
209
	ASSERT (i < rbuf->nworkers);
210
211
	ringbuf_worker_t *w = &rbuf->workers[i];
212
213
	w->seen_off = RBUF_OFF_MAX;
214
	atomic_thread_fence(memory_order_release);
215
	w->registered = perm_registered;
216
	return w;
217
}
218
219
void
220
ringbuf_unregister(ringbuf_t *rbuf, ringbuf_worker_t *w)
221
{
222
	w->registered = not_registered;
223
	(void)rbuf;
224
}
225
226
/*
227
 * stable_nextoff: capture and return a stable value of the 'next' offset.
228
 */
229
static inline ringbuf_off_t
230
stable_nextoff(ringbuf_t *rbuf)
231
{
232
	unsigned count = SPINLOCK_BACKOFF_MIN;
233
	ringbuf_off_t next;
234
235
	while ((next = rbuf->next) & WRAP_LOCK_BIT) {
236
		SPINLOCK_BACKOFF(count);
237
	}
238
	atomic_thread_fence(memory_order_acquire);
239
	ASSERT((next & RBUF_OFF_MASK) < rbuf->space);
240
	return next;
241
}
242
243
/*
244
 * ringbuf_acquire: request a space of a given length in the ring buffer.
245
 *
246
 * => On success: returns the offset at which the space is available.
247
 * => On failure: returns -1.
248
 */
249
ssize_t
250
ringbuf_acquire(ringbuf_t *rbuf, ringbuf_worker_t **pw, size_t len)
251
{
252
	ringbuf_off_t seen, next, target;
253
	ringbuf_worker_t *w;
254
255
	ASSERT(len > 0 && len <= rbuf->space);
256
257
	/* If necessary, acquire a worker-record. */
258
	if (*pw == NULL) {
259
		w = register_worker(rbuf, temp_registered);
260
		if (w == NULL)
261
			return -1;
262
		*pw = w;
263
	} else {
264
		w = *pw;
265
	}
266
	ASSERT(w->seen_off == RBUF_OFF_MAX);
267
268
	do {
269
		ringbuf_off_t written;
270
271
		/*
272
		 * Get the stable 'next' offset.  Save the observed 'next'
273
		 * value (i.e. the 'seen' offset), but mark the value as
274
		 * unstable (set WRAP_LOCK_BIT).
275
		 *
276
		 * Note: CAS will issue a memory_order_release for us and
277
		 * thus ensures that it reaches global visibility together
278
		 * with new 'next'.
279
		 */
280
		seen = stable_nextoff(rbuf);
281
		next = seen & RBUF_OFF_MASK;
282
		ASSERT(next < rbuf->space);
283
		w->seen_off = next | WRAP_LOCK_BIT;
284
285
		/*
286
		 * Compute the target offset.  Key invariant: we cannot
287
		 * go beyond the WRITTEN offset or catch up with it.
288
		 */
289
		target = next + len;
290
		written = rbuf->written;
291
		if (__predict_false(next < written && target >= written)) {
292
			/* The producer must wait. */
293
			w->seen_off = RBUF_OFF_MAX;
294
			if (w->registered == temp_registered) {
295
				*pw = NULL;
296
				atomic_thread_fence(memory_order_release);
297
				w->registered = not_registered;
298
			}
299
			return -1;
300
		}
301
302
		if (__predict_false(target >= rbuf->space)) {
303
			const bool exceed = target > rbuf->space;
304
305
			/*
306
			 * Wrap-around and start from the beginning.
307
			 *
308
			 * If we would exceed the buffer, then attempt to
309
			 * acquire the WRAP_LOCK_BIT and use the space in
310
			 * the beginning.  If we used all space exactly to
311
			 * the end, then reset to 0.
312
			 *
313
			 * Check the invariant again.
314
			 */
315
			target = exceed ? (WRAP_LOCK_BIT | len) : 0;
316
			if ((target & RBUF_OFF_MASK) >= written) {
317
				w->seen_off = RBUF_OFF_MAX;
318
				if (w->registered == temp_registered) {
319
					*pw = NULL;
320
					atomic_thread_fence(memory_order_release);
321
					w->registered = not_registered;
322
				}
323
				return -1;
324
			}
325
			/* Increment the wrap-around counter. */
326
			target |= WRAP_INCR(seen & WRAP_COUNTER);
327
		} else {
328
			/* Preserve the wrap-around counter. */
329
			target |= seen & WRAP_COUNTER;
330
		}
331
	} while (!atomic_compare_exchange_weak(&rbuf->next, &seen, target));
332
333
	/*
334
	 * Acquired the range.  Clear WRAP_LOCK_BIT in the 'seen' value
335
	 * thus indicating that it is stable now.
336
	 */
337
	w->seen_off &= ~WRAP_LOCK_BIT;
338
339
	/*
340
	 * If we set the WRAP_LOCK_BIT in the 'next' (because we exceed
341
	 * the remaining space and need to wrap-around), then save the
342
	 * 'end' offset and release the lock.
343
	 */
344
	if (__predict_false(target & WRAP_LOCK_BIT)) {
345
		/* Cannot wrap-around again if consumer did not catch-up. */
346
		ASSERT(rbuf->written <= next);
347
		ASSERT(rbuf->end == RBUF_OFF_MAX);
348
		rbuf->end = next;
349
		next = 0;
350
351
		/*
352
		 * Unlock: ensure the 'end' offset reaches global
353
		 * visibility before the lock is released.
354
		 */
355
		atomic_thread_fence(memory_order_release);
356
		rbuf->next = (target & ~WRAP_LOCK_BIT);
357
	}
358
	ASSERT((target & RBUF_OFF_MASK) <= rbuf->space);
359
	return (ssize_t)next;
360
}
361
362
/*
363
 * ringbuf_produce: indicate the acquired range in the buffer is produced
364
 * and is ready to be consumed.
365
 */
366
void
367
ringbuf_produce(ringbuf_t *rbuf, ringbuf_worker_t **pw)
368
{
369
	ringbuf_worker_t *w = *pw;
370
371
	(void)rbuf;
372
	ASSERT(w->registered != not_registered
373
		&& w->registered != being_registered);
374
	ASSERT(w->seen_off != RBUF_OFF_MAX);
375
	atomic_thread_fence(memory_order_release);
376
	w->seen_off = RBUF_OFF_MAX;
377
378
	/* Free any temporarily-allocated worker-record. */
379
	if (w->registered == temp_registered) {
380
		w->registered = not_registered;
381
		*pw = NULL;
382
	}
383
}
384
385
/*
386
 * ringbuf_consume: get a contiguous range which is ready to be consumed.
387
 */
388
size_t
389
ringbuf_consume(ringbuf_t *rbuf, size_t *offset)
390
{
391
	ringbuf_off_t written = rbuf->written, next, ready;
392
	size_t towrite;
393
	unsigned total_workers;
394
retry:
395
	/*
396
	 * Get the stable 'next' offset.  Note: stable_nextoff() issued
397
	 * a load memory barrier.  The area between the 'written' offset
398
	 * and the 'next' offset will be the *preliminary* target buffer
399
	 * area to be consumed.
400
	 */
401
	next = stable_nextoff(rbuf) & RBUF_OFF_MASK;
402
	if (written == next) {
403
		/* If producers did not advance, then nothing to do. */
404
		return 0;
405
	}
406
407
	/*
408
	 * Observe the 'ready' offset of each producer.
409
	 *
410
	 * At this point, some producer might have already triggered the
411
	 * wrap-around and some (or all) seen 'ready' values might be in
412
	 * the range between 0 and 'written'.  We have to skip them.
413
	 */
414
	ready = RBUF_OFF_MAX;
415
416
	total_workers = rbuf->nworkers + rbuf->ntempworkers;
417
	for (unsigned i = 0; i < total_workers; i++) {
418
		ringbuf_worker_t *w = &rbuf->workers[i];
419
		unsigned count = SPINLOCK_BACKOFF_MIN;
420
		ringbuf_off_t seen_off;
421
422
		/* Skip if the worker has not registered. */
423
		if (w->registered == not_registered
424
				|| w->registered == being_registered) {
425
			continue;
426
		}
427
428
		/*
429
		 * Get a stable 'seen' value.  This is necessary since we
430
		 * want to discard the stale 'seen' values.
431
		 */
432
		while ((seen_off = w->seen_off) & WRAP_LOCK_BIT) {
433
			SPINLOCK_BACKOFF(count);
434
		}
435
436
		/*
437
		 * Ignore the offsets after the possible wrap-around.
438
		 * We are interested in the smallest seen offset that is
439
		 * not behind the 'written' offset.
440
		 */
441
		if (seen_off >= written) {
442
			ready = MIN(seen_off, ready);
443
		}
444
		ASSERT(ready >= written);
445
	}
446
447
	/*
448
	 * Finally, we need to determine whether wrap-around occurred
449
	 * and deduct the safe 'ready' offset.
450
	 */
451
	if (next < written) {
452
		const ringbuf_off_t end = MIN(rbuf->space, rbuf->end);
453
454
		/*
455
		 * Wrap-around case.  Check for the cut off first.
456
		 *
457
		 * Reset the 'written' offset if it reached the end of
458
		 * the buffer or the 'end' offset (if set by a producer).
459
		 * However, we must check that the producer is actually
460
		 * done (the observed 'ready' offsets are clear).
461
		 */
462
		if (ready == RBUF_OFF_MAX && written == end) {
463
			/*
464
			 * Clear the 'end' offset if was set.
465
			 */
466
			if (rbuf->end != RBUF_OFF_MAX) {
467
				rbuf->end = RBUF_OFF_MAX;
468
				atomic_thread_fence(memory_order_release);
469
			}
470
			/* Wrap-around the consumer and start from zero. */
471
			rbuf->written = written = 0;
472
			goto retry;
473
		}
474
475
		/*
476
		 * We cannot wrap-around yet; there is data to consume at
477
		 * the end.  The ready range is smallest of the observed
478
		 * 'ready' or the 'end' offset.  If neither is set, then
479
		 * the actual end of the buffer.
480
		 */
481
		ASSERT(ready > next);
482
		ready = MIN(ready, end);
483
		ASSERT(ready >= written);
484
	} else {
485
		/*
486
		 * Regular case.  Up to the observed 'ready' (if set)
487
		 * or the 'next' offset.
488
		 */
489
		ready = MIN(ready, next);
490
	}
491
	towrite = ready - written;
492
	*offset = written;
493
494
	ASSERT(ready >= written);
495
	ASSERT(towrite <= rbuf->space);
496
	return towrite;
497
}
498
499
/*
500
 * ringbuf_release: indicate that the consumed range can now be released.
501
 */
502
void
503
ringbuf_release(ringbuf_t *rbuf, size_t nbytes)
504
{
505
	const size_t nwritten = rbuf->written + nbytes;
506
507
	ASSERT(rbuf->written <= rbuf->space);
508
	ASSERT(rbuf->written <= rbuf->end);
509
	ASSERT(nwritten <= rbuf->space);
510
511
	rbuf->written = (nwritten == rbuf->space) ? 0 : nwritten;
512
}