Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions src/iperf.h
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ struct iperf_stream
pthread_t thr;
int thread_created;
int done;
int affinity;

/* configurable members */
int local_port;
Expand Down Expand Up @@ -293,6 +294,7 @@ enum debug_level {
};


#define MAX_STREAMS 128
struct iperf_test
{
pthread_mutex_t print_mutex;
Expand All @@ -314,6 +316,9 @@ struct iperf_test
int duration; /* total duration of test (-t flag) */
char *diskfile_name; /* -F option */
int affinity, server_affinity; /* -A option */
#if defined(HAVE_SCHED_SETAFFINITY)
cpu_set_t cpumask;
#endif /* HAVE_CPUSET_SETAFFINITY */
#if defined(HAVE_CPUSET_SETAFFINITY)
cpuset_t cpumask;
#endif /* HAVE_CPUSET_SETAFFINITY */
Expand Down Expand Up @@ -385,7 +390,8 @@ struct iperf_test
double cpu_util[3]; /* cpu utilization of the test - total, user, system */
double remote_cpu_util[3]; /* cpu utilization for the remote host/client - total, user, system */

int num_streams; /* total streams in the test (-P) */
int num_streams; /* total streams in the test (-P) */
int streams_affinity[MAX_STREAMS];

atomic_iperf_size_t bytes_sent;
atomic_iperf_size_t blocks_sent;
Expand Down Expand Up @@ -463,7 +469,6 @@ struct iperf_test
#define MAX_OMIT_TIME 600
#define MAX_BURST 1000
#define MAX_MSS (9 * 1024)
#define MAX_STREAMS 128

#define TIMESTAMP_FORMAT "%c "

Expand Down
86 changes: 83 additions & 3 deletions src/iperf_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -3876,7 +3876,7 @@ iperf_print_intermediate(struct iperf_test *test)
if (test->json_output)
cJSON_AddItemToObject(json_interval, sum_name, iperf_json_printf("start: %f end: %f seconds: %f bytes: %d bits_per_second: %f jitter_ms: %f lost_packets: %d packets: %d lost_percent: %f omitted: %b sender: %b", (double) start_time, (double) end_time, (double) irp->interval_duration, (int64_t) bytes, bandwidth * 8, (double) avg_jitter * 1000.0, (int64_t) lost_packets, (int64_t) total_packets, (double) lost_percent, test->omitting, stream_must_be_sender));
else
iperf_printf(test, report_sum_bw_udp_format, mbuf, start_time, end_time, ubuf, nbuf, avg_jitter * 1000.0, lost_packets, total_packets, lost_percent, test->omitting?report_omitted:"");
iperf_printf(test, report_sum_bw_udp_format, mbuf, start_time, end_time, ubuf, nbuf, avg_jitter * 1000.0, lost_packets, total_packets, lost_percent, (int)((total_packets - lost_packets) / (end_time - start_time)), test->omitting?report_omitted:"");
}
}
}
Expand Down Expand Up @@ -4340,7 +4340,7 @@ iperf_print_results(struct iperf_test *test)
*/
if (! (test->role == 's' && !stream_must_be_sender) ) {
unit_snprintf(ubuf, UNIT_LEN, (double) total_sent, 'A');
iperf_printf(test, report_sum_bw_udp_format, mbuf, start_time, sender_time, ubuf, nbuf, 0.0, (int64_t) 0, sender_total_packets, 0.0, report_sender);
iperf_printf(test, report_sum_bw_udp_format, mbuf, start_time, sender_time, ubuf, nbuf, 0.0, (int64_t) 0, sender_total_packets, 0.0, (int)(sender_total_packets / (sender_time - start_time)), report_sender);
}
if (! (test->role == 's' && stream_must_be_sender) ) {

Expand All @@ -4353,7 +4353,7 @@ iperf_print_results(struct iperf_test *test)
bandwidth = 0.0;
}
unit_snprintf(nbuf, UNIT_LEN, bandwidth, test->settings->unit_format);
iperf_printf(test, report_sum_bw_udp_format, mbuf, start_time, receiver_time, ubuf, nbuf, avg_jitter * 1000.0, lost_packets, receiver_total_packets, lost_percent, report_receiver);
iperf_printf(test, report_sum_bw_udp_format, mbuf, start_time, receiver_time, ubuf, nbuf, avg_jitter * 1000.0, lost_packets, receiver_total_packets, lost_percent, (int)((receiver_total_packets - lost_packets) / (receiver_time - start_time)), report_receiver);
}
}
}
Expand Down Expand Up @@ -4635,6 +4635,7 @@ iperf_new_stream(struct iperf_test *test, int s, int sender)

memset(sp, 0, sizeof(struct iperf_stream));

sp->affinity = -1;
sp->sender = sender;
sp->test = test;
sp->settings = test->settings;
Expand Down Expand Up @@ -5169,6 +5170,85 @@ iperf_json_finish(struct iperf_test *test)


/* CPU affinity stuff - Linux, FreeBSD, and Windows only. */
/* init streams cpu map
If the number of CPUs >= streams (-P), map each stream to a specific CPU core.
*/
void
iperf_affinity_streams_init(struct iperf_test *test)
{
#if defined(HAVE_SCHED_SETAFFINITY)
if (sched_getaffinity(0, sizeof(cpu_set_t), &test->cpumask) != 0) {
return;
}

/* Assign bindings according to main process affinity */
int available_cpu_num = CPU_COUNT(&test->cpumask);
int affinity_stream_cnt = 0;
if (available_cpu_num > 0) {
affinity_stream_cnt = (test->num_streams / available_cpu_num) * available_cpu_num;
}

for (int i = 0; i < test->num_streams; i++) {
if (affinity_stream_cnt > 0) {
int offset = i % available_cpu_num;
int j = 0;
for (; j < CPU_SETSIZE; j++) {
if (CPU_ISSET(j, &test->cpumask)) {
if (offset == 0) {
break;
}
offset--;
}
}
test->streams_affinity[i] = j;
affinity_stream_cnt--;
} else {
test->streams_affinity[i] = -1;
}
}
#else
for (int i = 0; i < test->num_streams; i++) {
test->streams_affinity[i] = -1;
}
#endif
}

void
iperf_setaffinity_streams_raw(int affinity)
{
#if defined(HAVE_SCHED_SETAFFINITY)
cpu_set_t cpu_set;
CPU_ZERO(&cpu_set);
CPU_SET(affinity, &cpu_set);
if (sched_setaffinity(0, sizeof(cpu_set_t), &cpu_set) != 0) {
return;
}
#endif
}

void
iperf_setaffinity_streams(struct iperf_test *test, int index)
{
#if defined(HAVE_SCHED_SETAFFINITY)
iperf_setaffinity_streams_raw(test->streams_affinity[index]);
#endif
}

void
iperf_setaffinity_streams_post(struct iperf_test *test)
{
#if defined(HAVE_SCHED_SETAFFINITY)
if (sched_setaffinity(0, sizeof(cpu_set_t), &test->cpumask) != 0) {
return;
}

struct iperf_stream *sp;
int index = 0;
SLIST_FOREACH(sp, &test->streams, streams){
sp->affinity = test->streams_affinity[index++ % test->num_streams];
}
#endif
}

int
iperf_setaffinity(struct iperf_test *test, int affinity)
Expand Down
4 changes: 4 additions & 0 deletions src/iperf_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,10 @@ int iperf_json_finish(struct iperf_test *);
/* CPU affinity routines */
int iperf_setaffinity(struct iperf_test *, int affinity);
int iperf_clearaffinity(struct iperf_test *);
void iperf_affinity_streams_init(struct iperf_test *test);
void iperf_setaffinity_streams(struct iperf_test *test, int index);
void iperf_setaffinity_streams_raw(int affinity);
void iperf_setaffinity_streams_post(struct iperf_test *test);

/* Custom printf routine. */
int iperf_printf(struct iperf_test *test, const char *format, ...) __attribute__ ((format(printf,2,3)));
Expand Down
13 changes: 13 additions & 0 deletions src/iperf_client_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ iperf_client_worker_run(void *s) {
struct iperf_stream *sp = (struct iperf_stream *) s;
struct iperf_test *test = sp->test;

if (sp->affinity >= 0) {
iperf_setaffinity_streams_raw(sp->affinity);
}

/* Blocking signal to make sure that signal will be handled by main thread */
sigset_t set;
sigemptyset(&set);
Expand Down Expand Up @@ -112,6 +116,13 @@ iperf_create_streams(struct iperf_test *test, int sender)

int orig_bind_port = test->bind_port;
for (i = 0; i < test->num_streams; ++i) {
/* -> Create streams on different cpus
The network protocol stack will save the cpu core when creating the socket.
With a multi-queue NIC, the queue selection may relate to the cpu core num
(XPS, RPS, sk_*_queue_mapping...),
which helps improve the performance of multi-queue physical network cards.
*/
iperf_setaffinity_streams(test,i);

test->bind_port = orig_bind_port;
if (orig_bind_port) {
Expand Down Expand Up @@ -339,6 +350,7 @@ iperf_handle_message_client(struct iperf_test *test)
test->on_connect(test);
break;
case CREATE_STREAMS:
iperf_affinity_streams_init(test);
if (test->mode == BIDIRECTIONAL)
{
if (iperf_create_streams(test, 1) < 0)
Expand All @@ -348,6 +360,7 @@ iperf_handle_message_client(struct iperf_test *test)
}
else if (iperf_create_streams(test, test->mode) < 0)
return -1;
iperf_setaffinity_streams_post(test);
break;
case TEST_START:
if (iperf_init_test(test) < 0)
Expand Down
2 changes: 1 addition & 1 deletion src/iperf_locale.c
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ const char report_sum_bw_retrans_format[] =
"[SUM]%s %6.2f-%-6.2f sec %ss %ss/sec %3"PRId64" %s\n";

const char report_sum_bw_udp_format[] =
"[SUM]%s %6.2f-%-6.2f sec %ss %ss/sec %5.3f ms %" PRId64 "/%" PRId64 " (%.2g%%) %s\n";
"[SUM]%s %6.2f-%-6.2f sec %ss %ss/sec %5.3f ms %" PRId64 "/%" PRId64 " (%.2g%%) %" PRId64 " pps %s\n";

const char report_sum_bw_udp_sender_format[] =
"[SUM]%s %6.2f-%-6.2f sec %ss %ss/sec %s %" PRId64 " %s\n";
Expand Down
9 changes: 8 additions & 1 deletion src/iperf_server_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ iperf_server_worker_run(void *s) {
struct iperf_stream *sp = (struct iperf_stream *) s;
struct iperf_test *test = sp->test;

if (sp->affinity >= 0) {
iperf_setaffinity_streams_raw(sp->affinity);
}

/* Blocking signal to make sure that signal will be handled by main thread */
sigset_t set;
sigemptyset(&set);
Expand Down Expand Up @@ -713,6 +717,7 @@ iperf_run_server(struct iperf_test *test)
streams_to_send = test->num_streams;
streams_to_rec = 0;
}
iperf_affinity_streams_init(test);
}
}
if (FD_ISSET(test->ctrl_sck, &read_set)) {
Expand All @@ -725,7 +730,7 @@ iperf_run_server(struct iperf_test *test)

if (test->state == CREATE_STREAMS) {
if (FD_ISSET(test->prot_listener, &read_set)) {

iperf_setaffinity_streams(test,rec_streams_accepted);
if ((s = test->protocol->accept(test)) < 0) {
cleanup_server(test);
return -1;
Expand Down Expand Up @@ -844,6 +849,8 @@ iperf_run_server(struct iperf_test *test)


if (rec_streams_accepted == streams_to_rec && send_streams_accepted == streams_to_send) {
iperf_setaffinity_streams_post(test);

if (test->protocol->id != Ptcp) {
FD_CLR(test->prot_listener, &test->read_set);
close(test->prot_listener);
Expand Down