Skip to content

Commit b8ca368

Browse files
committed
Add CC-Heap, DSM-Heap, and H-Heap: Concurrent Heaps Based on CC-Synch, DSM-Synch, and H-Combining Objects.
This commit introduces three new concurrent heap implementations: - CC-Heap: Based on the CC-Synch synchronization method. - DSM-Heap: Built using DSM-Synch. - H-Heap: Based on H-Heap. All these implementations leverage the serial code `libconcurrent/serial/serialheap.c` and aim to offer options for heap-based concurrent data structures.
1 parent ac5b1d6 commit b8ca368

File tree

10 files changed

+578
-0
lines changed

10 files changed

+578
-0
lines changed

benchmarks/ccheapbench.c

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
#include <stdio.h>
2+
#include <stdlib.h>
3+
#include <pthread.h>
4+
#include <string.h>
5+
#include <stdint.h>
6+
7+
#include <config.h>
8+
#include <primitives.h>
9+
#include <fastrand.h>
10+
#include <threadtools.h>
11+
#include <ccheap.h>
12+
#include <barrier.h>
13+
#include <bench_args.h>
14+
15+
CCHeapStruct *object_struct CACHE_ALIGN;
16+
int64_t d1 CACHE_ALIGN, d2;
17+
SynchBarrier bar CACHE_ALIGN;
18+
SynchBenchArgs bench_args CACHE_ALIGN;
19+
20+
inline static void *Execute(void* Arg) {
21+
CCHeapThreadState *th_state;
22+
long i, rnum;
23+
volatile int j;
24+
long id = (long) Arg;
25+
26+
synchFastRandomSetSeed(id + 1);
27+
th_state = synchGetAlignedMemory(CACHE_LINE_SIZE, sizeof(CCHeapThreadState));
28+
CCHeapThreadStateInit(object_struct, th_state, (int)id);
29+
synchBarrierWait(&bar);
30+
if (id == 0)
31+
d1 = synchGetTimeMillis();
32+
33+
for (i = 0; i < bench_args.runs; i++) {
34+
// perform an insert operation
35+
CCHeapInsert(object_struct, th_state, bench_args.runs - i, id);
36+
rnum = synchFastRandomRange(1, bench_args.max_work);
37+
for (j = 0; j < rnum; j++)
38+
;
39+
// perform a delete min operation
40+
CCHeapDeleteMin(object_struct, th_state, id);
41+
rnum = synchFastRandomRange(1, bench_args.max_work);
42+
for (j = 0; j < rnum; j++)
43+
;
44+
}
45+
return NULL;
46+
}
47+
48+
int main(int argc, char *argv[]) {
49+
CCHeapThreadState th_state;
50+
int i;
51+
52+
synchParseArguments(&bench_args, argc, argv);
53+
object_struct = synchGetAlignedMemory(S_CACHE_LINE_SIZE, sizeof(CCHeapStruct));
54+
CCHeapInit(object_struct, CCHEAP_TYPE_MIN, bench_args.nthreads);
55+
CCHeapThreadStateInit(object_struct, &th_state, 0);
56+
for (i = 0; i < SYNCH_HEAP_INITIAL_SIZE/2; i++)
57+
CCHeapInsert(object_struct, &th_state, i, 0);
58+
59+
synchBarrierSet(&bar, bench_args.nthreads);
60+
synchStartThreadsN(bench_args.nthreads, Execute, bench_args.fibers_per_thread);
61+
synchJoinThreadsN(bench_args.nthreads - 1);
62+
d2 = synchGetTimeMillis();
63+
64+
printf("time: %d (ms)\tthroughput: %.2f (millions ops/sec)\t", (int) (d2 - d1), 2 * bench_args.runs * bench_args.nthreads/(1000.0*(d2 - d1)));
65+
synchPrintStats(bench_args.nthreads, bench_args.total_runs);
66+
67+
#ifdef DEBUG
68+
fprintf(stderr, "DEBUG: Object state: %lld\n", object_struct->heap.counter - SYNCH_HEAP_INITIAL_SIZE/2);
69+
fprintf(stderr, "DEBUG: rounds: %d\n", object_struct->heap.rounds);
70+
fprintf(stderr, "DEBUG: initial_items: %lld\n", SYNCH_HEAP_INITIAL_SIZE/2);
71+
fprintf(stderr, "DEBUG: remained_items: %ld\n", object_struct->state.items);
72+
fprintf(stderr, "DEBUG: last_level_used: %u\n", object_struct->state.last_used_level);
73+
fprintf(stderr, "DEBUG: last_used_level_pos: %u\n", object_struct->state.last_used_level_pos);
74+
fprintf(stderr, "DEBUG: Checking heap state: %s\n", ((serialHeapClearAndValidation(&object_struct->state) == true) ? "VALID" : "INVALID"));
75+
#endif
76+
77+
return 0;
78+
}

benchmarks/dsmheapbench.c

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
#include <stdio.h>
2+
#include <stdlib.h>
3+
#include <pthread.h>
4+
#include <string.h>
5+
#include <stdint.h>
6+
7+
#include <config.h>
8+
#include <primitives.h>
9+
#include <fastrand.h>
10+
#include <threadtools.h>
11+
#include <dsmheap.h>
12+
#include <barrier.h>
13+
#include <bench_args.h>
14+
15+
DSMHeapStruct *object_struct CACHE_ALIGN;
16+
int64_t d1 CACHE_ALIGN, d2;
17+
SynchBarrier bar CACHE_ALIGN;
18+
SynchBenchArgs bench_args CACHE_ALIGN;
19+
20+
inline static void *Execute(void* Arg) {
21+
DSMHeapThreadState *th_state;
22+
long i, rnum;
23+
volatile int j;
24+
long id = (long) Arg;
25+
26+
synchFastRandomSetSeed(id + 1);
27+
th_state = synchGetAlignedMemory(CACHE_LINE_SIZE, sizeof(DSMHeapThreadState));
28+
DSMHeapThreadStateInit(object_struct, th_state, (int)id);
29+
synchBarrierWait(&bar);
30+
if (id == 0)
31+
d1 = synchGetTimeMillis();
32+
33+
for (i = 0; i < bench_args.runs; i++) {
34+
// perform an insert operation
35+
DSMHeapInsert(object_struct, th_state, bench_args.runs - i, id);
36+
rnum = synchFastRandomRange(1, bench_args.max_work);
37+
for (j = 0; j < rnum; j++)
38+
;
39+
// perform a delete min operation
40+
DSMHeapDeleteMin(object_struct, th_state, id);
41+
rnum = synchFastRandomRange(1, bench_args.max_work);
42+
for (j = 0; j < rnum; j++)
43+
;
44+
}
45+
return NULL;
46+
}
47+
48+
int main(int argc, char *argv[]) {
49+
DSMHeapThreadState th_state;
50+
int i;
51+
52+
synchParseArguments(&bench_args, argc, argv);
53+
object_struct = synchGetAlignedMemory(S_CACHE_LINE_SIZE, sizeof(DSMHeapStruct));
54+
DSMHeapInit(object_struct, DSMHEAP_TYPE_MIN, bench_args.nthreads);
55+
DSMHeapThreadStateInit(object_struct, &th_state, 0);
56+
for (i = 0; i < SYNCH_HEAP_INITIAL_SIZE/2; i++)
57+
DSMHeapInsert(object_struct, &th_state, i, 0);
58+
59+
synchBarrierSet(&bar, bench_args.nthreads);
60+
synchStartThreadsN(bench_args.nthreads, Execute, bench_args.fibers_per_thread);
61+
synchJoinThreadsN(bench_args.nthreads - 1);
62+
d2 = synchGetTimeMillis();
63+
64+
printf("time: %d (ms)\tthroughput: %.2f (millions ops/sec)\t", (int) (d2 - d1), 2 * bench_args.runs * bench_args.nthreads/(1000.0*(d2 - d1)));
65+
synchPrintStats(bench_args.nthreads, bench_args.total_runs);
66+
67+
#ifdef DEBUG
68+
fprintf(stderr, "DEBUG: Object state: %lld\n", object_struct->heap.counter - SYNCH_HEAP_INITIAL_SIZE/2);
69+
fprintf(stderr, "DEBUG: rounds: %d\n", object_struct->heap.rounds);
70+
fprintf(stderr, "DEBUG: initial_items: %lld\n", SYNCH_HEAP_INITIAL_SIZE/2);
71+
fprintf(stderr, "DEBUG: remained_items: %ld\n", object_struct->state.items);
72+
fprintf(stderr, "DEBUG: last_level_used: %u\n", object_struct->state.last_used_level);
73+
fprintf(stderr, "DEBUG: last_used_level_pos: %u\n", object_struct->state.last_used_level_pos);
74+
fprintf(stderr, "DEBUG: Checking heap state: %s\n", ((serialHeapClearAndValidation(&object_struct->state) == true) ? "VALID" : "INVALID"));
75+
#endif
76+
77+
return 0;
78+
}

benchmarks/hheapbench.c

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
#include <stdio.h>
2+
#include <stdlib.h>
3+
#include <pthread.h>
4+
#include <string.h>
5+
#include <stdint.h>
6+
7+
#include <config.h>
8+
#include <primitives.h>
9+
#include <fastrand.h>
10+
#include <threadtools.h>
11+
#include <hheap.h>
12+
#include <barrier.h>
13+
#include <bench_args.h>
14+
15+
HSynchHeapStruct *object_struct CACHE_ALIGN;
16+
int64_t d1 CACHE_ALIGN, d2;
17+
SynchBarrier bar CACHE_ALIGN;
18+
SynchBenchArgs bench_args CACHE_ALIGN;
19+
20+
inline static void *Execute(void* Arg) {
21+
HSynchHeapThreadState *th_state;
22+
long i, rnum;
23+
volatile int j;
24+
long id = (long) Arg;
25+
26+
synchFastRandomSetSeed(id + 1);
27+
th_state = synchGetAlignedMemory(CACHE_LINE_SIZE, sizeof(HSynchHeapThreadState));
28+
HSynchHeapThreadStateInit(object_struct, th_state, (int)id);
29+
synchBarrierWait(&bar);
30+
if (id == 0)
31+
d1 = synchGetTimeMillis();
32+
33+
for (i = 0; i < bench_args.runs; i++) {
34+
// perform an insert operation
35+
HSynchHeapInsert(object_struct, th_state, bench_args.runs - i, id);
36+
rnum = synchFastRandomRange(1, bench_args.max_work);
37+
for (j = 0; j < rnum; j++)
38+
;
39+
// perform a delete min operation
40+
HSynchHeapDeleteMin(object_struct, th_state, id);
41+
rnum = synchFastRandomRange(1, bench_args.max_work);
42+
for (j = 0; j < rnum; j++)
43+
;
44+
}
45+
return NULL;
46+
}
47+
48+
int main(int argc, char *argv[]) {
49+
HSynchHeapThreadState th_state;
50+
int i;
51+
52+
synchParseArguments(&bench_args, argc, argv);
53+
object_struct = synchGetAlignedMemory(S_CACHE_LINE_SIZE, sizeof(HSynchHeapStruct));
54+
HSynchHeapInit(object_struct, HHEAP_TYPE_MIN, bench_args.nthreads, bench_args.numa_nodes);
55+
HSynchHeapThreadStateInit(object_struct, &th_state, 0);
56+
for (i = 0; i < SYNCH_HEAP_INITIAL_SIZE/2; i++)
57+
HSynchHeapInsert(object_struct, &th_state, i, 0);
58+
59+
synchBarrierSet(&bar, bench_args.nthreads);
60+
synchStartThreadsN(bench_args.nthreads, Execute, bench_args.fibers_per_thread);
61+
synchJoinThreadsN(bench_args.nthreads - 1);
62+
d2 = synchGetTimeMillis();
63+
64+
printf("time: %d (ms)\tthroughput: %.2f (millions ops/sec)\t", (int) (d2 - d1), 2 * bench_args.runs * bench_args.nthreads/(1000.0*(d2 - d1)));
65+
synchPrintStats(bench_args.nthreads, bench_args.total_runs);
66+
67+
#ifdef DEBUG
68+
fprintf(stderr, "DEBUG: Object state: %lld\n", object_struct->heap.counter - SYNCH_HEAP_INITIAL_SIZE/2);
69+
fprintf(stderr, "DEBUG: rounds: %d\n", object_struct->heap.rounds);
70+
fprintf(stderr, "DEBUG: initial_items: %lld\n", SYNCH_HEAP_INITIAL_SIZE/2);
71+
fprintf(stderr, "DEBUG: remained_items: %ld\n", object_struct->state.items);
72+
fprintf(stderr, "DEBUG: last_level_used: %u\n", object_struct->state.last_used_level);
73+
fprintf(stderr, "DEBUG: last_used_level_pos: %u\n", object_struct->state.last_used_level_pos);
74+
fprintf(stderr, "DEBUG: Checking heap state: %s\n", ((serialHeapClearAndValidation(&object_struct->state) == true) ? "VALID" : "INVALID"));
75+
#endif
76+
77+
return 0;
78+
}

libconcurrent/concurrent/ccheap.c

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
#include <serialheap.h>
2+
#include <ccheap.h>
3+
4+
void CCHeapInit(CCHeapStruct *heap_struct, uint32_t type, uint32_t nthreads) {
5+
serialHeapInit(&heap_struct->state, type);
6+
CCSynchStructInit(&heap_struct->heap, nthreads);
7+
}
8+
9+
void CCHeapThreadStateInit(CCHeapStruct *heap_struct, CCHeapThreadState *lobject_struct, int pid) {
10+
CCSynchThreadStateInit(&heap_struct->heap, &lobject_struct->thread_state, pid);
11+
}
12+
13+
void CCHeapInsert(CCHeapStruct *heap_struct, CCHeapThreadState *lobject_struct, SynchHeapElement arg, int pid) {
14+
CCSynchApplyOp(&heap_struct->heap, &lobject_struct->thread_state, serialHeapApplyOperation, &heap_struct->state, arg | SYNCH_HEAP_INSERT_OP, pid);
15+
}
16+
17+
SynchHeapElement CCHeapDeleteMin(CCHeapStruct *heap_struct, CCHeapThreadState *lobject_struct, int pid) {
18+
return CCSynchApplyOp(&heap_struct->heap, &lobject_struct->thread_state, serialHeapApplyOperation, &heap_struct->state, SYNCH_HEAP_DELETE_MIN_MAX_OP, pid);
19+
}
20+
21+
SynchHeapElement CCHeapGetMin(CCHeapStruct *heap_struct, CCHeapThreadState *lobject_struct, int pid) {
22+
return CCSynchApplyOp(&heap_struct->heap, &lobject_struct->thread_state, serialHeapApplyOperation, &heap_struct->state, SYNCH_HEAP_GET_MIN_MAX_OP, pid);
23+
}

libconcurrent/concurrent/dsmheap.c

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
#include <serialheap.h>
2+
#include <dsmheap.h>
3+
4+
void DSMHeapInit(DSMHeapStruct *heap_struct, uint32_t type, uint32_t nthreads) {
5+
serialHeapInit(&heap_struct->state, type);
6+
DSMSynchStructInit(&heap_struct->heap, nthreads);
7+
}
8+
9+
void DSMHeapThreadStateInit(DSMHeapStruct *heap_struct, DSMHeapThreadState *lobject_struct, int pid) {
10+
DSMSynchThreadStateInit(&heap_struct->heap, &lobject_struct->thread_state, pid);
11+
}
12+
13+
void DSMHeapInsert(DSMHeapStruct *heap_struct, DSMHeapThreadState *lobject_struct, SynchHeapElement arg, int pid) {
14+
DSMSynchApplyOp(&heap_struct->heap, &lobject_struct->thread_state, serialHeapApplyOperation, &heap_struct->state, arg | SYNCH_HEAP_INSERT_OP, pid);
15+
}
16+
17+
SynchHeapElement DSMHeapDeleteMin(DSMHeapStruct *heap_struct, DSMHeapThreadState *lobject_struct, int pid) {
18+
return DSMSynchApplyOp(&heap_struct->heap, &lobject_struct->thread_state, serialHeapApplyOperation, &heap_struct->state, SYNCH_HEAP_DELETE_MIN_MAX_OP, pid);
19+
}
20+
21+
SynchHeapElement DSMHeapGetMin(DSMHeapStruct *heap_struct, DSMHeapThreadState *lobject_struct, int pid) {
22+
return DSMSynchApplyOp(&heap_struct->heap, &lobject_struct->thread_state, serialHeapApplyOperation, &heap_struct->state, SYNCH_HEAP_GET_MIN_MAX_OP, pid);
23+
}

libconcurrent/concurrent/hheap.c

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
#include <serialheap.h>
2+
#include <hheap.h>
3+
4+
void HSynchHeapInit(HSynchHeapStruct *heap_struct, uint32_t type, uint32_t nthreads, uint32_t numa_nodes) {
5+
serialHeapInit(&heap_struct->state, type);
6+
HSynchStructInit(&heap_struct->heap, nthreads, numa_nodes);
7+
}
8+
9+
void HSynchHeapThreadStateInit(HSynchHeapStruct *heap_struct, HSynchHeapThreadState *lobject_struct, int pid) {
10+
HSynchThreadStateInit(&heap_struct->heap, &lobject_struct->thread_state, pid);
11+
}
12+
13+
void HSynchHeapInsert(HSynchHeapStruct *heap_struct, HSynchHeapThreadState *lobject_struct, SynchHeapElement arg, int pid) {
14+
HSynchApplyOp(&heap_struct->heap, &lobject_struct->thread_state, serialHeapApplyOperation, &heap_struct->state, arg | SYNCH_HEAP_INSERT_OP, pid);
15+
}
16+
17+
SynchHeapElement HSynchHeapDeleteMin(HSynchHeapStruct *heap_struct, HSynchHeapThreadState *lobject_struct, int pid) {
18+
return HSynchApplyOp(&heap_struct->heap, &lobject_struct->thread_state, serialHeapApplyOperation, &heap_struct->state, SYNCH_HEAP_DELETE_MIN_MAX_OP, pid);
19+
}
20+
21+
SynchHeapElement HSynchHeapGetMin(HSynchHeapStruct *heap_struct, HSynchHeapThreadState *lobject_struct, int pid) {
22+
return HSynchApplyOp(&heap_struct->heap, &lobject_struct->thread_state, serialHeapApplyOperation, &heap_struct->state, SYNCH_HEAP_GET_MIN_MAX_OP, pid);
23+
}

libconcurrent/includes/ccheap.h

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/// @file ccheap.h
2+
/// @author Nikolaos D. Kallimanis
3+
/// @brief This file exposes the API of the CCHeap, which is a concurrent heap object of fixed size.
4+
/// The provided implementation uses the dynamic serial heap implementation provided by `serialheap.h` combined with CCSynch combining object
5+
/// provided by `ccsynch.h`. This dynamic heap implementation is based on the static heap implementation provided
6+
/// in https://github.com/ConcurrentDistributedLab/PersistentCombining repository.
7+
/// An example of use of this API is provided in benchmarks/ccheapbench.c file.
8+
///
9+
/// @copyright Copyright (c) 2024
10+
#ifndef _CCHEAP_H_
11+
#define _CCHEAP_H_
12+
13+
#include <limits.h>
14+
#include <ccsynch.h>
15+
16+
#include <serialheap.h>
17+
18+
19+
/// @brief The type of heap is max-heap
20+
#define CCHEAP_TYPE_MIN SYNCH_HEAP_TYPE_MIN
21+
/// @brief The type of heap is min-heap
22+
#define CCHEAP_TYPE_MAX SYNCH_HEAP_TYPE_MAX
23+
24+
/// @brief CCHeapStruct stores the state of an instance of the CCHeap concurrent heap implementation.
25+
/// CCHeapStruct should be initialized using the CCHeapStructInit function.
26+
typedef struct CCHeapStruct {
27+
/// @brief An instance of CCSynch.
28+
CCSynchStruct heap CACHE_ALIGN;
29+
/// @brief An instance of a serial heap implementation (see `serialheap.h`).
30+
SerialHeapStruct state;
31+
} CCHeapStruct;
32+
33+
/// @brief CCHeapThreadState stores each thread's local state for a single instance of CCHeap.
34+
/// For each instance of CCHeap, a discrete instance of CCHeapThreadStateState should be used.
35+
typedef struct CCHeapThreadState {
36+
/// @brief A CCSynchThreadState struct for the instance of CCSynch.
37+
CCSynchThreadState thread_state;
38+
} CCHeapThreadState;
39+
40+
/// @brief This function initializes an instance of the CCHeap concurrent heap implementation.
41+
///
42+
/// This function should be called once (by a single thread) before any other thread tries to
43+
/// apply any operation on the heap object.
44+
///
45+
/// @param heap_struct A pointer to an instance of the CCHeap concurrent heap implementation.
46+
/// @param type Identifies the type of heap (i.e. min-heap or max-heap). In case that the argument is equal to CCHEAP_TYPE_MIN,
47+
/// the type of heap is min. In case that the argument is equal to CCHEAP_TYPE_MAX, the type of heap is max.
48+
/// @param nthreads The number of threads that will use the CCHeap concurrent heap implementation.
49+
void CCHeapInit(CCHeapStruct *heap_struct, uint32_t type, uint32_t nthreads);
50+
51+
/// @brief This function should be called once by every thread before it applies any operation to the CCHeap concurrent heap implementation.
52+
///
53+
/// @param heap_struct A pointer to an instance of the CCHeap concurrent heap implementation.
54+
/// @param lobject_struct A pointer to thread's local state of CCHeap.
55+
/// @param pid The pid of the calling thread.
56+
void CCHeapThreadStateInit(CCHeapStruct *heap_struct, CCHeapThreadState *lobject_struct, int pid);
57+
58+
/// @brief This function inserts a new element with value `arg` to the heap.
59+
///
60+
/// @param heap_struct A pointer to an instance of the CCHeap concurrent heap implementation.
61+
/// @param lobject_struct A pointer to thread's local state of CCHeap.
62+
/// @param arg The value of the element that will be inserted in the heap.
63+
/// @param pid The pid of the calling thread.
64+
void CCHeapInsert(CCHeapStruct *heap_struct, CCHeapThreadState *lobject_struct, SynchHeapElement arg, int pid);
65+
66+
/// @brief This function removes the element of the heap that has the minimum value.
67+
///
68+
/// @param heap_struct A pointer to an instance of the CCHeap concurrent heap implementation.
69+
/// @param lobject_struct A pointer to thread's local state of CCHeap.
70+
/// @param pid The pid of the calling thread.
71+
/// @return The value of the removed element. In case that the heap is empty `EMPTY_HEAP` is returned.
72+
SynchHeapElement CCHeapDeleteMin(CCHeapStruct *heap_struct, CCHeapThreadState *lobject_struct, int pid);
73+
74+
/// @brief This function returns (without removing) the element of the heap that has the minimum value.
75+
///
76+
/// @param heap_struct A pointer to an instance of the CCHeap concurrent heap implementation.
77+
/// @param lobject_struct A pointer to thread's local state of CCHeap.
78+
/// @param pid The pid of the calling thread.
79+
/// @return The value of the minimum element contained in the heap. In case that the heap is empty `EMPTY_HEAP` is returned.
80+
SynchHeapElement CCHeapGetMin(CCHeapStruct *heap_struct, CCHeapThreadState *lobject_struct, int pid);
81+
82+
#endif

0 commit comments

Comments
 (0)