@@ -149,7 +149,7 @@ enum rte_ring_queue_behavior {
149
149
* values in a modulo-32bit base: that's why the overflow of the indexes is not
150
150
* a problem.
151
151
*/
152
- struct __CACHELINE_ALIGNED __rte_index {
152
+ struct __rte_index {
153
153
atomic_uint_fast32_t head ;
154
154
atomic_uint_fast32_t tail ;
155
155
};
@@ -163,8 +163,8 @@ struct __CACHELINE_ALIGNED rte_ring {
163
163
};
164
164
165
165
/** Ring producer & consumer status. */
166
- struct __rte_index prod ;
167
- struct __rte_index cons ;
166
+ struct __rte_index prod __CACHELINE_ALIGNED ;
167
+ struct __rte_index cons __CACHELINE_ALIGNED ;
168
168
169
169
void * volatile ring [0 ] __CACHELINE_ALIGNED ;
170
170
};
@@ -230,6 +230,99 @@ rte_ring_destroy(struct rte_ring *r)
230
230
}
231
231
232
232
233
+ /*
234
+ * Unrolled store data to ring
235
+ */
236
+ static inline void
237
+ __store_ring (struct rte_ring * r , uint_fast32_t head , void * const * obj , unsigned n )
238
+ {
239
+ unsigned i ;
240
+ unsigned loops = n & 0x3 ;
241
+ unsigned idx = head & r -> mask ;
242
+
243
+
244
+ // If we know we won't wrap around, we unroll 4 times
245
+ if (likely ((idx + n ) <= r -> mask )) {
246
+ for (i = 0 ; i < loops ; i += 4 , idx += 4 ) {
247
+ r -> ring [idx + 0 ] = obj [i + 0 ];
248
+ r -> ring [idx + 1 ] = obj [i + 1 ];
249
+ r -> ring [idx + 2 ] = obj [i + 2 ];
250
+ r -> ring [idx + 3 ] = obj [i + 3 ];
251
+ }
252
+
253
+ // mop up remainder
254
+ switch (n & 0x3 ) {
255
+ case 3 :
256
+ r -> ring [idx + 0 ] = obj [i + 0 ];
257
+ r -> ring [idx + 1 ] = obj [i + 1 ];
258
+ r -> ring [idx + 2 ] = obj [i + 2 ];
259
+ break ;
260
+
261
+ case 2 :
262
+ r -> ring [idx + 0 ] = obj [i + 0 ];
263
+ r -> ring [idx + 1 ] = obj [i + 1 ];
264
+ break ;
265
+
266
+ case 1 :
267
+ r -> ring [idx + 0 ] = obj [i + 0 ];
268
+ break ;
269
+ }
270
+ } else {
271
+ const uint32_t mask = r -> mask ;
272
+
273
+ for (i = 0 ; i < n ; i ++ , idx ++ ) {
274
+ r -> ring [idx & mask ] = obj [i ];
275
+ }
276
+ }
277
+ }
278
+
279
+
280
+ /*
281
+ * Unrolled load data from ring
282
+ */
283
+ static inline void
284
+ __load_ring (struct rte_ring * r , uint_fast32_t head , void * * obj , unsigned n )
285
+ {
286
+ unsigned i ;
287
+ unsigned loops = n & 0x3 ;
288
+ unsigned idx = head & r -> mask ;
289
+
290
+
291
+ // If we know we won't wrap around, we unroll 4 times
292
+ if (likely ((idx + n ) <= r -> mask )) {
293
+ for (i = 0 ; i < loops ; i += 4 , idx += 4 ) {
294
+ obj [i + 0 ] = r -> ring [idx + 0 ];
295
+ obj [i + 1 ] = r -> ring [idx + 1 ];
296
+ obj [i + 2 ] = r -> ring [idx + 2 ];
297
+ obj [i + 3 ] = r -> ring [idx + 3 ];
298
+ }
299
+
300
+ // mop up remainder
301
+ switch (n & 0x3 ) {
302
+ case 3 :
303
+ obj [i + 0 ] = r -> ring [idx + 0 ];
304
+ obj [i + 1 ] = r -> ring [idx + 1 ];
305
+ obj [i + 2 ] = r -> ring [idx + 2 ];
306
+ break ;
307
+
308
+ case 2 :
309
+ obj [i + 0 ] = r -> ring [idx + 0 ];
310
+ obj [i + 1 ] = r -> ring [idx + 1 ];
311
+ break ;
312
+
313
+ case 1 :
314
+ obj [i + 0 ] = r -> ring [idx + 0 ];
315
+ break ;
316
+ }
317
+ } else {
318
+ const uint32_t mask = r -> mask ;
319
+ for (i = 0 ; i < n ; i ++ , idx ++ ) {
320
+ obj [i + 0 ] = r -> ring [idx & mask ];
321
+ }
322
+ }
323
+ }
324
+
325
+
233
326
/**
234
327
* @internal Enqueue several objects on the ring (multi-producers safe).
235
328
*
@@ -265,7 +358,6 @@ __rte_ring_mp_do_enqueue(struct rte_ring *r, void * const *obj_table,
265
358
uint_fast32_t prod_head , prod_next ;
266
359
uint_fast32_t cons_tail , free_entries ;
267
360
int success ;
268
- unsigned i ;
269
361
270
362
/* move prod.head atomically */
271
363
do {
@@ -298,8 +390,7 @@ __rte_ring_mp_do_enqueue(struct rte_ring *r, void * const *obj_table,
298
390
} while (unlikely (success == 0 ));
299
391
300
392
/* write entries in ring */
301
- for (i = 0 ; likely (i < n ); i ++ )
302
- r -> ring [(prod_head + i ) & mask ] = obj_table [i ];
393
+ __store_ring (r , prod_head , obj_table , n );
303
394
304
395
/*
305
396
* If there are other enqueues in progress that preceeded us,
@@ -342,7 +433,6 @@ __rte_ring_sp_do_enqueue(struct rte_ring *r, void * const *obj_table,
342
433
const uint32_t mask = r -> mask ;
343
434
uint_fast32_t prod_head , cons_tail ;
344
435
uint_fast32_t prod_next , free_entries ;
345
- unsigned i ;
346
436
347
437
prod_head = atomic_load_explicit (& r -> prod .head , memory_order_acquire );
348
438
cons_tail = atomic_load_explicit (& r -> cons .tail , memory_order_acquire );
@@ -367,9 +457,9 @@ __rte_ring_sp_do_enqueue(struct rte_ring *r, void * const *obj_table,
367
457
atomic_store_explicit (& r -> prod .head , prod_next , memory_order_release );
368
458
369
459
/* write entries in ring */
370
- for (i = 0 ; likely (i < n ); i ++ )
371
- r -> ring [(prod_head + i ) & mask ] = obj_table [i ];
460
+ __store_ring (r , prod_head , obj_table , n );
372
461
462
+ assert (atomic_load (& r -> prod .tail ) == prod_head );
373
463
atomic_store_explicit (& r -> prod .tail , prod_next , memory_order_release );
374
464
375
465
return n ;
@@ -404,11 +494,9 @@ __rte_ring_mc_do_dequeue(struct rte_ring *r, void **obj_table,
404
494
unsigned n , enum rte_ring_queue_behavior behavior )
405
495
{
406
496
const unsigned max = n ;
407
- const uint32_t mask = r -> mask ;
408
497
uint_fast32_t cons_head , prod_tail ;
409
498
uint_fast32_t cons_next , entries ;
410
499
int success ;
411
- unsigned i ;
412
500
413
501
/* move cons.head atomically */
414
502
do {
@@ -438,8 +526,7 @@ __rte_ring_mc_do_dequeue(struct rte_ring *r, void **obj_table,
438
526
} while (likely (success == 0 ));
439
527
440
528
441
- for (i = 0 ; likely (i < n ); i ++ )
442
- obj_table [i ] = r -> ring [(cons_head + i ) & mask ];
529
+ __load_ring (r , cons_head , obj_table , n );
443
530
444
531
/*
445
532
* If there are other dequeues in progress that preceded us,
@@ -480,10 +567,8 @@ static inline int
480
567
__rte_ring_sc_do_dequeue (struct rte_ring * r , void * * obj_table ,
481
568
unsigned n , enum rte_ring_queue_behavior behavior )
482
569
{
483
- const uint32_t mask = r -> mask ;
484
570
uint_fast32_t cons_head , prod_tail ;
485
571
uint_fast32_t cons_next , entries ;
486
- unsigned i ;
487
572
488
573
cons_head = atomic_load_explicit (& r -> cons .head , memory_order_acquire );
489
574
prod_tail = atomic_load_explicit (& r -> prod .tail , memory_order_acquire );
@@ -503,12 +588,9 @@ __rte_ring_sc_do_dequeue(struct rte_ring *r, void **obj_table,
503
588
cons_next = cons_head + n ;
504
589
atomic_store_explicit (& r -> cons .head , cons_next , memory_order_release );
505
590
506
- for (i = 0 ; likely (i < n ); i ++ ) {
507
- /* WTF??? WHY DOES THIS CODE GIVE STRICT-ALIASING WARNINGS
508
- * ON SOME GCC. THEY ARE FREAKING VOID* !!! */
509
- obj_table [i ] = r -> ring [(cons_head + i ) & mask ];
510
- }
591
+ __load_ring (r , cons_head , obj_table , n );
511
592
593
+ assert (atomic_load (& r -> cons .tail ) == cons_head );
512
594
atomic_store_explicit (& r -> cons .tail , cons_next , memory_order_release );
513
595
return n ;
514
596
}
0 commit comments