Skip to content

Commit b9a7b72

Browse files
author
Jacob Lacouture
committed
briefly defer yielding the thread/gvl
1 parent a841c31 commit b9a7b72

File tree

8 files changed

+189
-26
lines changed

8 files changed

+189
-26
lines changed

ractor.c

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@ static VALUE rb_eRactorMovedError;
3131
static VALUE rb_eRactorClosedError;
3232
static VALUE rb_cRactorMovedObject;
3333

34+
extern void rb_thread_sched_init(struct rb_thread_sched *, bool atfork);
35+
extern void rb_thread_sched_destroy(struct rb_thread_sched *);
36+
3437
static void vm_ractor_blocking_cnt_inc(rb_vm_t *vm, rb_ractor_t *r, const char *file, int line);
3538

3639
// Ractor locking
@@ -257,6 +260,8 @@ ractor_free(void *ptr)
257260
r->newobj_cache = NULL;
258261
}
259262

263+
rb_thread_sched_destroy(&r->threads.sched);
264+
260265
ruby_xfree(r);
261266
}
262267

@@ -2097,8 +2102,6 @@ rb_ractor_atfork(rb_vm_t *vm, rb_thread_t *th)
20972102
}
20982103
#endif
20992104

2100-
void rb_thread_sched_init(struct rb_thread_sched *, bool atfork);
2101-
21022105
void
21032106
rb_ractor_living_threads_init(rb_ractor_t *r)
21042107
{

test/-ext-/thread/test_instrumentation_api.rb

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,8 +98,11 @@ def test_io_release_gvl
9898
thread = nil
9999
full_timeline = record do
100100
thread = Thread.new do
101-
w.write("Hello\n")
101+
r.readline
102102
end
103+
# Sleep causes the readline call to take long enough that its thread gets descheduled.
104+
sleep(1)
105+
w.puts("Hello")
103106
thread.join
104107
end
105108

test/ruby/test_process.rb

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1956,7 +1956,8 @@ def test_daemon_no_threads
19561956
puts Dir.entries("/proc/self/task") - %W[. ..]
19571957
end
19581958
bug4920 = '[ruby-dev:43873]'
1959-
assert_include(1..2, data.size, bug4920)
1959+
# On pthread builds there will be an extra thread for the deferred-thread-wait worker
1960+
assert_include(1..3, data.size, bug4920)
19601961
assert_not_include(data.map(&:to_i), pid)
19611962
end
19621963
else # darwin

thread.c

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,7 @@ static inline void blocking_region_end(rb_thread_t *th, struct rb_blocking_regio
202202
/* Important that this is inlined into the macro, and not part of \
203203
* blocking_region_begin - see bug #20493 */ \
204204
RB_VM_SAVE_MACHINE_CONTEXT(th); \
205-
thread_sched_to_waiting(TH_SCHED(th), th); \
205+
thread_sched_blocking_region_enter(TH_SCHED(th), th); \
206206
exec; \
207207
blocking_region_end(th, &__region); \
208208
}; \
@@ -1519,9 +1519,9 @@ blocking_region_end(rb_thread_t *th, struct rb_blocking_region_buffer *region)
15191519
/* entry to ubf_list impossible at this point, so unregister is safe: */
15201520
unregister_ubf_list(th);
15211521

1522-
thread_sched_to_running(TH_SCHED(th), th);
1523-
rb_ractor_thread_switch(th->ractor, th);
1522+
thread_sched_blocking_region_exit(TH_SCHED(th), th);
15241523

1524+
rb_ractor_thread_switch(th->ractor, th);
15251525
th->blocking_region_buffer = 0;
15261526
rb_ractor_blocking_threads_dec(th->ractor, __FILE__, __LINE__);
15271527
if (th->status == THREAD_STOPPED) {

thread_none.c

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,17 +37,25 @@ thread_sched_yield(struct rb_thread_sched *sched, rb_thread_t *th)
3737
{
3838
}
3939

40+
static void
41+
thread_sched_blocking_region_enter(struct rb_thread_sched *sched, rb_thread_t *th)
42+
{
43+
}
44+
45+
static void
46+
thread_sched_blocking_region_exit(struct rb_thread_sched *sched, rb_thread_t *th)
47+
{
48+
}
49+
4050
void
4151
rb_thread_sched_init(struct rb_thread_sched *sched, bool atfork)
4252
{
4353
}
4454

45-
#if 0
46-
static void
55+
void
4756
rb_thread_sched_destroy(struct rb_thread_sched *sched)
4857
{
4958
}
50-
#endif
5159

5260
// Do nothing for mutex guard
5361
void

thread_pthread.c

Lines changed: 142 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,10 @@ static const void *const condattr_monotonic = NULL;
7575
// #define HAVE_SYS_EPOLL_H 0
7676
#endif
7777

78+
#if !defined(SET_CURRENT_THREAD_NAME) && defined(__linux__) && defined(PR_SET_NAME)
79+
# define SET_CURRENT_THREAD_NAME(name) prctl(PR_SET_NAME, name)
80+
#endif
81+
7882
#ifndef USE_MN_THREADS
7983
#if defined(__EMSCRIPTEN__) || defined(COROUTINE_PTHREAD_CONTEXT)
8084
// on __EMSCRIPTEN__ provides epoll* declarations, but no implementations.
@@ -692,10 +696,14 @@ rb_del_running_thread(rb_thread_t *th)
692696
static void
693697
thread_sched_set_running(struct rb_thread_sched *sched, rb_thread_t *th)
694698
{
699+
ASSERT_thread_sched_locked(sched, NULL);
695700
RUBY_DEBUG_LOG("th:%u->th:%u", rb_th_serial(sched->running), rb_th_serial(th));
696701
VM_ASSERT(sched->running != th);
697702

698703
sched->running = th;
704+
// This cancels any deferred scheduling action.
705+
sched->deferred_wait_th = NULL;
706+
sched->deferred_wait_th_count += 1;
699707
}
700708

701709
RBIMPL_ATTR_MAYBE_UNUSED()
@@ -1129,9 +1137,120 @@ thread_sched_yield(struct rb_thread_sched *sched, rb_thread_t *th)
11291137
thread_sched_unlock(sched, th);
11301138
}
11311139

1140+
static void
1141+
thread_sched_blocking_region_enter(struct rb_thread_sched *sched, rb_thread_t *th)
1142+
{
1143+
thread_sched_lock(sched, th);
1144+
if (sched->is_running) {
1145+
VM_ASSERT(sched->running == th);
1146+
1147+
sched->deferred_wait_th = th;
1148+
sched->deferred_wait_th_count += 1;
1149+
rb_native_cond_signal(&sched->deferred_wait_cond);
1150+
} else {
1151+
VM_ASSERT(sched->running == NULL);
1152+
thread_sched_to_waiting_common(sched, th);
1153+
}
1154+
thread_sched_unlock(sched, th);
1155+
}
1156+
1157+
static void
1158+
thread_sched_blocking_region_exit(struct rb_thread_sched *sched, rb_thread_t *th)
1159+
{
1160+
thread_sched_lock(sched, th);
1161+
1162+
if (sched->running == th && th == sched->deferred_wait_th) {
1163+
// We never descheduled the thread. Cancel that request now.
1164+
sched->deferred_wait_th_count += 1;
1165+
sched->deferred_wait_th = NULL;
1166+
} else {
1167+
thread_sched_to_running_common(sched, th);
1168+
}
1169+
1170+
thread_sched_unlock(sched, th);
1171+
}
1172+
1173+
static void
1174+
transfer_sched_lock(struct rb_thread_sched *sched, struct rb_thread_struct *current, struct rb_thread_struct *th)
1175+
{
1176+
RUBY_DEBUG_LOG("Transferring sched ownership from:%u to th:%u", rb_th_serial(current), rb_th_serial(th));
1177+
#if VM_CHECK_MODE
1178+
VM_ASSERT(sched->lock_owner == current);
1179+
sched->lock_owner = th;
1180+
#endif
1181+
}
1182+
1183+
static void *
1184+
deferred_wait_thread_worker(void *arg)
1185+
{
1186+
#ifdef SET_CURRENT_THREAD_NAME
1187+
SET_CURRENT_THREAD_NAME("rb_def_wait");
1188+
#endif
1189+
struct rb_thread_sched *sched = (struct rb_thread_sched *) arg;
1190+
struct rb_thread_struct *lock_owner = sched->deferred_wait_th_dummy;
1191+
1192+
thread_sched_lock(sched, lock_owner);
1193+
for (;;) {
1194+
if (sched->stop) {
1195+
break;
1196+
}
1197+
// The cond-wait will drop the lock. We'll need to update lock_owner manually.
1198+
transfer_sched_lock(sched, lock_owner, NULL);
1199+
rb_native_cond_wait(&sched->deferred_wait_cond, &sched->lock_);
1200+
transfer_sched_lock(sched, NULL, lock_owner);
1201+
for (;;) {
1202+
if (!sched->deferred_wait_th) {
1203+
break;
1204+
}
1205+
struct rb_thread_struct *th = sched->deferred_wait_th;
1206+
int count = sched->deferred_wait_th_count;
1207+
thread_sched_unlock(sched, lock_owner);
1208+
usleep(50);
1209+
1210+
// th is not a stable reference here. Go back to our dummy thread.
1211+
lock_owner = sched->deferred_wait_th_dummy;
1212+
thread_sched_lock(sched, lock_owner);
1213+
1214+
if (count != sched->deferred_wait_th_count) {
1215+
continue;
1216+
}
1217+
VM_ASSERT(sched->is_running);
1218+
VM_ASSERT(th == sched->deferred_wait_th);
1219+
VM_ASSERT(sched->running == sched->deferred_wait_th);
1220+
1221+
// Before calling into the scheduler we need to transfer lock ownership (logically) from the worker
1222+
// thread to the target thread.
1223+
transfer_sched_lock(sched, lock_owner, th);
1224+
// We're now acting on behalf of the target thread.
1225+
lock_owner = th;
1226+
thread_sched_to_waiting_common(sched, th);
1227+
break;
1228+
}
1229+
}
1230+
thread_sched_unlock(sched, lock_owner);
1231+
return NULL;
1232+
}
1233+
1234+
static void
1235+
start_deferred_wait_thread(struct rb_thread_sched *sched)
1236+
{
1237+
pthread_attr_t attr;
1238+
int r;
1239+
r = pthread_attr_init(&attr);
1240+
if (r) {
1241+
rb_bug_errno("start_deferred_wait_thread - pthread_attr_init", r);
1242+
}
1243+
r = pthread_create(&sched->deferred_wait_pthread, &attr, deferred_wait_thread_worker, sched);
1244+
if (r) {
1245+
rb_bug_errno("start_deferred_wait_thread - pthread_create", r);
1246+
}
1247+
pthread_attr_destroy(&attr);
1248+
}
1249+
11321250
void
11331251
rb_thread_sched_init(struct rb_thread_sched *sched, bool atfork)
11341252
{
1253+
sched->stop = false;
11351254
rb_native_mutex_initialize(&sched->lock_);
11361255

11371256
#if VM_CHECK_MODE
@@ -1144,6 +1263,13 @@ rb_thread_sched_init(struct rb_thread_sched *sched, bool atfork)
11441263
#if USE_MN_THREADS
11451264
if (!atfork) sched->enable_mn_threads = true; // MN is enabled on Ractors
11461265
#endif
1266+
1267+
rb_native_cond_initialize(&sched->deferred_wait_cond);
1268+
sched->deferred_wait_th = NULL;
1269+
sched->deferred_wait_th_count = 0;
1270+
sched->deferred_wait_th_dummy = (struct rb_thread_struct *) malloc(sizeof(struct rb_thread_struct));
1271+
sched->deferred_wait_th_dummy->serial = 100000;
1272+
start_deferred_wait_thread(sched);
11471273
}
11481274

11491275
static void
@@ -1504,21 +1630,30 @@ rb_ractor_sched_barrier_join(rb_vm_t *vm, rb_ractor_t *cr)
15041630
// TODO
15051631

15061632
static void clear_thread_cache_altstack(void);
1633+
#endif
15071634

1508-
static void
1635+
void
15091636
rb_thread_sched_destroy(struct rb_thread_sched *sched)
15101637
{
1638+
// Stop the deferred wait worker
1639+
rb_native_mutex_lock(&sched->lock_);
1640+
1641+
sched->stop = true;
1642+
rb_native_cond_signal(&sched->deferred_wait_cond);
1643+
rb_native_mutex_unlock(&sched->lock_);
1644+
1645+
pthread_join(sched->deferred_wait_pthread, NULL);
1646+
15111647
/*
1512-
* only called once at VM shutdown (not atfork), another thread
1648+
* only called once at VM/ractor shutdown (not atfork), another thread
15131649
* may still grab vm->gvl.lock when calling gvl_release at
15141650
* the end of thread_start_func_2
15151651
*/
1516-
if (0) {
1517-
rb_native_mutex_destroy(&sched->lock);
1518-
}
1519-
clear_thread_cache_altstack();
1652+
rb_native_mutex_destroy(&sched->lock_);
1653+
rb_native_cond_destroy(&sched->deferred_wait_cond);
1654+
1655+
//clear_thread_cache_altstack();
15201656
}
1521-
#endif
15221657

15231658
#ifdef RB_THREAD_T_HAS_NATIVE_ID
15241659
static int
@@ -1550,7 +1685,6 @@ thread_sched_atfork(struct rb_thread_sched *sched)
15501685
vm->ractor.sched.running_cnt = 0;
15511686

15521687
rb_native_mutex_initialize(&vm->ractor.sched.lock);
1553-
// rb_native_cond_destroy(&vm->ractor.sched.cond);
15541688
rb_native_cond_initialize(&vm->ractor.sched.cond);
15551689
rb_native_cond_initialize(&vm->ractor.sched.barrier_complete_cond);
15561690
rb_native_cond_initialize(&vm->ractor.sched.barrier_release_cond);
@@ -2660,10 +2794,6 @@ setup_communication_pipe_internal(int pipes[2])
26602794
set_nonblock(pipes[1]);
26612795
}
26622796

2663-
#if !defined(SET_CURRENT_THREAD_NAME) && defined(__linux__) && defined(PR_SET_NAME)
2664-
# define SET_CURRENT_THREAD_NAME(name) prctl(PR_SET_NAME, name)
2665-
#endif
2666-
26672797
enum {
26682798
THREAD_NAME_MAX =
26692799
#if defined(__linux__)

thread_pthread.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,14 @@ struct rb_thread_sched {
128128
int readyq_cnt;
129129
// ractor scheduling
130130
struct ccan_list_node grq_node;
131+
132+
// Deferred descheduler
133+
bool stop;
134+
pthread_t deferred_wait_pthread;
135+
rb_nativethread_cond_t deferred_wait_cond;
136+
int deferred_wait_th_count;
137+
struct rb_thread_struct *deferred_wait_th;
138+
struct rb_thread_struct *deferred_wait_th_dummy;
131139
};
132140

133141
#ifdef RB_THREAD_LOCAL_SPECIFIER

thread_win32.c

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -148,22 +148,32 @@ thread_sched_yield(struct rb_thread_sched *sched, rb_thread_t *th)
148148
thread_sched_to_running(sched, th);
149149
}
150150

151+
static void
152+
thread_sched_blocking_region_enter(struct rb_thread_sched *sched, rb_thread_t *th)
153+
{
154+
thread_sched_to_waiting(sched, th);
155+
}
156+
157+
static void
158+
thread_sched_blocking_region_exit(struct rb_thread_sched *sched, rb_thread_t *th)
159+
{
160+
thread_sched_to_running(sched, th);
161+
}
162+
151163
void
152164
rb_thread_sched_init(struct rb_thread_sched *sched, bool atfork)
153165
{
154166
if (GVL_DEBUG) fprintf(stderr, "sched init\n");
155167
sched->lock = w32_mutex_create();
156168
}
157169

158-
#if 0
159170
// per-ractor
160171
void
161172
rb_thread_sched_destroy(struct rb_thread_sched *sched)
162173
{
163-
if (GVL_DEBUG) fprintf(stderr, "sched destroy\n");
164-
CloseHandle(sched->lock);
174+
// if (GVL_DEBUG) fprintf(stderr, "sched destroy\n");
175+
// CloseHandle(sched->lock);
165176
}
166-
#endif
167177

168178
rb_thread_t *
169179
ruby_thread_from_native(void)

0 commit comments

Comments
 (0)