Skip to content

Commit 54e99ea

Browse files
committed
Introducing the FC-Heap concurrent heap implementation based on
flat-combining.
1 parent c176986 commit 54e99ea

File tree

4 files changed

+184
-1
lines changed

4 files changed

+184
-1
lines changed

benchmarks/fcheapbench.c

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
#include <stdio.h>
2+
#include <stdlib.h>
3+
#include <pthread.h>
4+
#include <string.h>
5+
#include <stdint.h>
6+
7+
#include <config.h>
8+
#include <primitives.h>
9+
#include <fastrand.h>
10+
#include <threadtools.h>
11+
#include <fcheap.h>
12+
#include <barrier.h>
13+
#include <bench_args.h>
14+
15+
FCHeapStruct *object_struct CACHE_ALIGN;
16+
int64_t d1 CACHE_ALIGN, d2;
17+
SynchBarrier bar CACHE_ALIGN;
18+
SynchBenchArgs bench_args CACHE_ALIGN;
19+
20+
inline static void *Execute(void* Arg) {
21+
FCHeapThreadState *th_state;
22+
long i, rnum;
23+
volatile int j;
24+
long id = (long) Arg;
25+
26+
synchFastRandomSetSeed(id + 1);
27+
th_state = synchGetAlignedMemory(CACHE_LINE_SIZE, sizeof(FCHeapThreadState));
28+
FCHeapThreadStateInit(object_struct, th_state, (int)id);
29+
if (id == 0) {
30+
for (i = 0; i < SYNCH_HEAP_INITIAL_SIZE/2; i++)
31+
FCHeapInsert(object_struct, th_state, i, 0);
32+
}
33+
synchBarrierWait(&bar);
34+
if (id == 0)
35+
d1 = synchGetTimeMillis();
36+
37+
for (i = 0; i < bench_args.runs; i++) {
38+
// perform an insert operation
39+
FCHeapInsert(object_struct, th_state, bench_args.runs - i, id);
40+
rnum = synchFastRandomRange(1, bench_args.max_work);
41+
for (j = 0; j < rnum; j++)
42+
;
43+
// perform a delete min operation
44+
SynchHeapElement h = FCHeapDeleteMin(object_struct, th_state, id);
45+
if (h==SYNCH_HEAP_EMPTY)
46+
fprintf(stderr, "DEBUG: Empty!\n");
47+
rnum = synchFastRandomRange(1, bench_args.max_work);
48+
for (j = 0; j < rnum; j++)
49+
;
50+
}
51+
52+
return NULL;
53+
}
54+
55+
int main(int argc, char *argv[]) {
56+
synchParseArguments(&bench_args, argc, argv);
57+
object_struct = synchGetAlignedMemory(S_CACHE_LINE_SIZE, sizeof(FCHeapStruct));
58+
FCHeapInit(object_struct, FCHEAP_TYPE_MIN, bench_args.nthreads);
59+
synchBarrierSet(&bar, bench_args.nthreads);
60+
synchStartThreadsN(bench_args.nthreads, Execute, bench_args.fibers_per_thread);
61+
synchJoinThreadsN(bench_args.nthreads - 1);
62+
d2 = synchGetTimeMillis();
63+
64+
printf("time: %d (ms)\tthroughput: %.2f (millions ops/sec)\t", (int) (d2 - d1), 2 * bench_args.runs * bench_args.nthreads/(1000.0*(d2 - d1)));
65+
synchPrintStats(bench_args.nthreads, bench_args.total_runs);
66+
67+
#ifdef DEBUG
68+
fprintf(stderr, "DEBUG: Object state: %lld\n", object_struct->heap.counter - SYNCH_HEAP_INITIAL_SIZE/2);
69+
fprintf(stderr, "DEBUG: rounds: %ld\n", object_struct->heap.rounds);
70+
fprintf(stderr, "DEBUG: initial_items: %lld\n", SYNCH_HEAP_INITIAL_SIZE/2);
71+
fprintf(stderr, "DEBUG: remained_items: %ld\n", object_struct->state.items);
72+
fprintf(stderr, "DEBUG: last_level_used: %u\n", object_struct->state.last_used_level);
73+
fprintf(stderr, "DEBUG: last_used_level_pos: %u\n", object_struct->state.last_used_level_pos);
74+
fprintf(stderr, "DEBUG: Checking heap state: %s\n", ((serialHeapClearAndValidation(&object_struct->state) == true) ? "VALID" : "INVALID"));
75+
#endif
76+
77+
return 0;
78+
}

libconcurrent/concurrent/fcheap.c

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
#include <serialheap.h>
2+
#include <fcheap.h>
3+
4+
void FCHeapInit(FCHeapStruct *heap_struct, uint32_t type, uint32_t nthreads) {
5+
FCStructInit(&heap_struct->heap, nthreads);
6+
serialHeapInit(&heap_struct->state, type);
7+
}
8+
9+
void FCHeapThreadStateInit(FCHeapStruct *heap_struct, FCHeapThreadState *lobject_struct, int pid) {
10+
FCThreadStateInit(&heap_struct->heap, &lobject_struct->thread_state, pid);
11+
}
12+
13+
void FCHeapInsert(FCHeapStruct *heap_struct, FCHeapThreadState *lobject_struct, SynchHeapElement arg, int pid) {
14+
FCApplyOp(&heap_struct->heap, &lobject_struct->thread_state, serialHeapApplyOperation, &heap_struct->state, arg | SYNCH_HEAP_INSERT_OP, pid);
15+
}
16+
17+
SynchHeapElement FCHeapDeleteMin(FCHeapStruct *heap_struct, FCHeapThreadState *lobject_struct, int pid) {
18+
return FCApplyOp(&heap_struct->heap, &lobject_struct->thread_state, serialHeapApplyOperation, &heap_struct->state, SYNCH_HEAP_DELETE_MIN_MAX_OP, pid);
19+
}
20+
21+
SynchHeapElement FCHeapGetMin(FCHeapStruct *heap_struct, FCHeapThreadState *lobject_struct, int pid) {
22+
return FCApplyOp(&heap_struct->heap, &lobject_struct->thread_state, serialHeapApplyOperation, &heap_struct->state, SYNCH_HEAP_GET_MIN_MAX_OP, pid);
23+
}

libconcurrent/includes/fcheap.h

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/// @file fcheap.h
2+
/// @author Nikolaos D. Kallimanis
3+
/// @brief This file exposes the API of the FCHeap, which is a concurrent heap object of fixed size.
4+
/// The provided implementation uses the dynamic serial heap implementation provided by `serialheap.h` combined with the flat-combining object
5+
/// provided by `fc.h`. This dynamic heap implementation is based on the static heap implementation provided
6+
/// in https://github.com/ConcurrentDistributedLab/PersistentCombining repository.
7+
/// An example of use of this API is provided in benchmarks/fcheapbench.c file.
8+
///
9+
/// @copyright Copyright (c) 2024
10+
#ifndef _FCHEAP_H_
11+
#define _FCHEAP_H_
12+
13+
#include <limits.h>
14+
#include <fc.h>
15+
16+
#include <serialheap.h>
17+
18+
19+
/// @brief The type of heap is max-heap
20+
#define FCHEAP_TYPE_MIN SYNCH_HEAP_TYPE_MIN
21+
/// @brief The type of heap is min-heap
22+
#define FCHEAP_TYPE_MAX SYNCH_HEAP_TYPE_MAX
23+
24+
/// @brief FCHeapStruct stores the state of an instance of the FCHeap concurrent heap implementation.
25+
/// FCHeapStruct should be initialized using the FCHeapStructInit function.
26+
typedef struct FCHeapStruct {
27+
/// @brief An instance of FC.
28+
FCStruct heap CACHE_ALIGN;
29+
/// @brief An instance of a serial heap implementation (see `serialheap.h`).
30+
SerialHeapStruct state;
31+
} FCHeapStruct;
32+
33+
/// @brief FCHeapThreadState stores each thread's local state for a single instance of FCHeap.
34+
/// For each instance of FCHeap, a discrete instance of FCHeapThreadStateState should be used.
35+
typedef struct FCHeapThreadState {
36+
/// @brief A FCThreadState struct for the instance of FC.
37+
FCThreadState thread_state;
38+
} FCHeapThreadState;
39+
40+
/// @brief This function initializes an instance of the FCHeap concurrent heap implementation.
41+
///
42+
/// This function should be called once (by a single thread) before any other thread tries to
43+
/// apply any operation on the heap object.
44+
///
45+
/// @param heap_struct A pointer to an instance of the FCHeap concurrent heap implementation.
46+
/// @param type Identifies the type of heap (i.e. min-heap or max-heap). In case that the argument is equal to FCHEAP_TYPE_MIN,
47+
/// the type of heap is min. In case that the argument is equal to FCHEAP_TYPE_MAX, the type of heap is max.
48+
/// @param nthreads The number of threads that will use the FCHeap concurrent heap implementation.
49+
void FCHeapInit(FCHeapStruct *heap_struct, uint32_t type, uint32_t nthreads);
50+
51+
/// @brief This function should be called once by every thread before it applies any operation to the FCHeap concurrent heap implementation.
52+
///
53+
/// @param heap_struct A pointer to an instance of the FCHeap concurrent heap implementation.
54+
/// @param lobject_struct A pointer to thread's local state of FCHeap.
55+
/// @param pid The pid of the calling thread.
56+
void FCHeapThreadStateInit(FCHeapStruct *heap_struct, FCHeapThreadState *lobject_struct, int pid);
57+
58+
/// @brief This function inserts a new element with value `arg` to the heap.
59+
///
60+
/// @param heap_struct A pointer to an instance of the FCHeap concurrent heap implementation.
61+
/// @param lobject_struct A pointer to thread's local state of FCHeap.
62+
/// @param arg The value of the element that will be inserted in the heap.
63+
/// @param pid The pid of the calling thread.
64+
void FCHeapInsert(FCHeapStruct *heap_struct, FCHeapThreadState *lobject_struct, SynchHeapElement arg, int pid);
65+
66+
/// @brief This function removes the element of the heap that has the minimum value.
67+
///
68+
/// @param heap_struct A pointer to an instance of the FCHeap concurrent heap implementation.
69+
/// @param lobject_struct A pointer to thread's local state of FCHeap.
70+
/// @param pid The pid of the calling thread.
71+
/// @return The value of the removed element. In case that the heap is empty `SYNCH_HEAP_EMPTY` is returned.
72+
SynchHeapElement FCHeapDeleteMin(FCHeapStruct *heap_struct, FCHeapThreadState *lobject_struct, int pid);
73+
74+
/// @brief This function returns (without removing) the element of the heap that has the minimum value.
75+
///
76+
/// @param heap_struct A pointer to an instance of the FCHeap concurrent heap implementation.
77+
/// @param lobject_struct A pointer to thread's local state of FCHeap.
78+
/// @param pid The pid of the calling thread.
79+
/// @return The value of the minimum element contained in the heap. In case that the heap is empty `SYNCH_HEAP_EMPTY` is returned.
80+
SynchHeapElement FCHeapGetMin(FCHeapStruct *heap_struct, FCHeapThreadState *lobject_struct, int pid);
81+
82+
#endif

validate.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ COLOR_FAIL="[ \e[31mFAIL\e[39m ]"
3636
declare -a uobjects=( "ccsynchbench.run" "dsmsynchbench.run" "hsynchbench.run" "oscibench.run" "simbench.run" "fcbench.run" "oyamabench.run" "mcsbench.run" "clhbench.run" "pthreadsbench.run" "fadbench.run")
3737
declare -a queues=( "ccqueuebench.run" "clhqueuebench.run" "dsmqueuebench.run" "hqueuebench.run" "osciqueuebench.run" "simqueuebench.run" "fcqueuebench.run" "lcrqbench.run")
3838
declare -a stacks=( "ccstackbench.run" "clhstackbench.run" "dsmstackbench.run" "hstackbench.run" "oscistackbench.run" "simstackbench.run" "fcstackbench.run")
39-
declare -a heaps=( "ccheapbench.run" "dsmheapbench.run" "hheapbench.run")
39+
declare -a heaps=( "ccheapbench.run" "dsmheapbench.run" "hheapbench.run" "fcheapbench.run")
4040
declare -a hashtables=("clhhashbench.run" "dsmhashbench.run")
4141

4242
if [ "$1" = "--help" ] || [ "$1" = "-h" ]; then

0 commit comments

Comments
 (0)