@@ -22,7 +22,42 @@ function enable_stack_copying(t::Task)
2222 return ccall ((:jl_enable_stack_copying , libtask), Any, (Any,), t):: Task
2323end
2424
25- CTask (func) = Task (func) |> enable_stack_copying
25+ """
26+
27+ task_wrapper()
28+
29+ `task_wrapper` is a wordaround for set the result/exception to the
30+ correct task which maybe copied/forked from another one(the original
31+ one). Without this, the result/exception is always sent to the
32+ original task. That is done in `JULIA_PROJECT/src/task.c`, the
33+ function `start_task` and `finish_task`.
34+
35+ This workaround is not the proper way to do the work it does. The
36+ proper way is refreshing the `current_task` (the variable `t`) in
37+ `start_task` after the call to `jl_apply` returns.
38+
39+ """
40+ function task_wrapper (func)
41+ () ->
42+ try
43+ res = func ()
44+ ct = current_task ()
45+ ct. result = res
46+ isa (ct. storage, Nothing) && (ct. storage = IdDict ())
47+ ct. storage[:_libtask_state ] = :done
48+ wait ()
49+ catch ex
50+ ct = current_task ()
51+ ct. exception = ex
52+ ct. result = ex
53+ ct. backtrace = catch_backtrace ()
54+ isa (ct. storage, Nothing) && (ct. storage = IdDict ())
55+ ct. storage[:_libtask_state ] = :failed
56+ wait ()
57+ end
58+ end
59+
60+ CTask (func) = Task (task_wrapper (func)) |> enable_stack_copying
2661
2762function Base. copy (t:: Task )
2863 t. state != :runnable && t. state != :done &&
@@ -72,7 +107,10 @@ produce(v) = begin
72107 wait ()
73108 end
74109
75- t. state == :runnable || throw (AssertionError (" producer.consumer.state == :runnable" ))
110+ if ! (t. state in [:runnable , :queued ])
111+ throw (AssertionError (" producer.consumer.state in [:runnable, :queued]" ))
112+ end
113+ if t. state == :queued yield () end
76114 if empty
77115 Base. schedule_and_wait (t, v)
78116 ct = current_task () # When a task is copied, ct should be updated to new task ID.
@@ -129,5 +167,16 @@ consume(p::Task, values...) = begin
129167 push! (p. storage[:consumers ]. waitq, ct)
130168 end
131169
132- p. state == :runnable ? Base. schedule_and_wait (p) : wait () # don't attempt to queue it twice
170+ if p. state == :runnable
171+ Base. schedule (p)
172+ yield ()
173+
174+ isa (p. storage, IdDict) && haskey (p. storage, :_libtask_state ) &&
175+ (p. state = p. storage[:_libtask_state ])
176+
177+ if p. exception != nothing
178+ throw (p. exception)
179+ end
180+ end
181+ wait ()
133182end
0 commit comments