-
Notifications
You must be signed in to change notification settings - Fork 9
feat(worker): add requestgc api #88
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
66206f4
58d94f5
c36ccad
4a5d8aa
3b354ed
71230ad
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -104,11 +104,11 @@ mutable struct Worker <: AbstractWorker | |
# Spawn process | ||
cmd = _get_worker_cmd(; env, exeflags) | ||
proc = open(Cmd( | ||
cmd; | ||
detach=true, | ||
windows_hide=true, | ||
), "w+") | ||
cmd; | ||
detach=true, | ||
windows_hide=true, | ||
), "w+") | ||
|
||
# Keep internal list | ||
__iNtErNaL_get_running_procs() | ||
push!(__iNtErNaL_running_procs, proc) | ||
|
@@ -125,11 +125,11 @@ mutable struct Worker <: AbstractWorker | |
# There's no reason to keep the worker process alive after the manager loses its handle. | ||
w = finalizer(w -> @async(stop(w)), | ||
new( | ||
port, | ||
proc, | ||
port, | ||
proc, | ||
getpid(proc), | ||
socket, | ||
MsgID(0), | ||
socket, | ||
MsgID(0), | ||
Dict{MsgID,Channel{WorkerResult}}(), | ||
) | ||
) | ||
|
@@ -158,15 +158,15 @@ function _exit_loop(worker::Worker) | |
end | ||
sleep(1) | ||
catch e | ||
@error "Unexpection error inside the exit loop" worker exception=(e,catch_backtrace()) | ||
@error "Unexpection error inside the exit loop" worker exception = (e, catch_backtrace()) | ||
end | ||
end | ||
end | ||
|
||
function _receive_loop(worker::Worker) | ||
io = worker.current_socket | ||
|
||
|
||
# Here we use: | ||
# `for _i in Iterators.countfrom(1)` | ||
# instead of | ||
|
@@ -620,7 +620,7 @@ function interrupt(w::Worker) | |
@warn "Tried to interrupt a worker that has already shut down." summary(w) | ||
else | ||
if Sys.iswindows() | ||
ccall((:GenerateConsoleCtrlEvent,"Kernel32"), Bool, (UInt32, UInt32), UInt32(1), UInt32(getpid(w.proc))) | ||
ccall((:GenerateConsoleCtrlEvent, "Kernel32"), Bool, (UInt32, UInt32), UInt32(1), UInt32(getpid(w.proc))) | ||
else | ||
Base.kill(w.proc, Base.SIGINT) | ||
end | ||
|
@@ -632,7 +632,31 @@ function interrupt(w::InProcessWorker) | |
nothing | ||
end | ||
|
||
""" | ||
Malt.requestgc(w::Worker) | ||
|
||
Request a garbage collection on the worker `w`. This is a non-blocking call (on the worker). | ||
""" | ||
function requestgc(w::Worker) | ||
if !isrunning(w) | ||
@warn "Tried to gc a worker that has already shut down." summary(w) | ||
else | ||
remote_eval_wait(Main, w, :(Base.notify(Main._gc_event))) | ||
end | ||
end | ||
|
||
""" | ||
Malt.autogc(w::Worker) | ||
|
||
Initiate the worker auto gc every `ENV["MALT_AUTO_GC_SECONDS"]` or 900 seconds. | ||
""" | ||
function autogc(w::Worker) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can call this once, to turn on the auto gc loop, right? And subsequent calls are ignored? I think that's nice API! We should document this, and maybe a different name would make this more clear. E.g. |
||
if !isrunning(w) | ||
@warn "Tried to gc a worker that has already shut down." summary(w) | ||
else | ||
remote_eval_wait(Main, w, :(Base.notify(Main._gc_auto_event))) | ||
end | ||
end | ||
|
||
|
||
# Based on `Base.task_done_hook` | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -73,23 +73,23 @@ function serve(server::Sockets.TCPServer) | |
end | ||
# this next line can't fail | ||
msg_id = read(io, MsgID) | ||
|
||
msg_data, success = try | ||
(Base.invokelatest(deserialize, io), true) | ||
catch err | ||
(format_error(err, catch_backtrace()), false) | ||
finally | ||
_discard_until_boundary(io) | ||
end | ||
|
||
if !success | ||
if msg_type === MsgType.from_host_call_with_response | ||
msg_type = MsgType.special_serialization_failure | ||
else | ||
continue | ||
end | ||
end | ||
|
||
try | ||
@debug("WORKER: Received message", msg_data) | ||
handle(Val(msg_type), io, msg_data, msg_id) | ||
|
@@ -142,7 +142,7 @@ function handle(::Val{MsgType.from_host_call_without_response}, socket, msg, msg | |
@async try | ||
f(args...; kwargs...) | ||
catch e | ||
@warn("WORKER: Got exception while running call without response", exception=(e, catch_backtrace())) | ||
@warn("WORKER: Got exception while running call without response", exception = (e, catch_backtrace())) | ||
# TODO: exception is ignored, is that what we want here? | ||
end | ||
end | ||
|
@@ -156,11 +156,40 @@ function handle(::Val{MsgType.special_serialization_failure}, socket, msg, msg_i | |
) | ||
end | ||
|
||
format_error(err, bt) = sprint() do io | ||
Base.invokelatest(showerror, io, err, bt) | ||
format_error(err, bt) = | ||
sprint() do io | ||
Base.invokelatest(showerror, io, err, bt) | ||
end | ||
|
||
const _channel_cache = Dict{UInt64,AbstractChannel}() | ||
const _gc_event = Base.Event() | ||
const _gc_task = Threads.@spawn begin | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Did you consider |
||
for _i in Iterators.countfrom(1) | ||
wait(_gc_event) | ||
sleep(5) # throttle by 5 seconds, so the computation can finish | ||
bytes1 = Base.gc_live_bytes() | ||
Base.GC.gc(true) | ||
bytes2 = Base.gc_live_bytes() | ||
@debug "WORKER: gc retrieved $(round((bytes1-bytes2)/1024/1024))MB" | ||
# ignore all events after the gc | ||
@atomic _gc_event.set = false | ||
end | ||
end | ||
|
||
const _channel_cache = Dict{UInt64, AbstractChannel}() | ||
const _auto_gc_event = Base.Event() | ||
const _auto_gc_task = Threads.@spawn begin | ||
# Notifying will activate the loop. Otherwise this task will just wait forever. | ||
wait(_auto_gc_event) | ||
for _i in Iterators.countfrom(1) | ||
!_auto_gc_event.set && break | ||
get(ENV, "MALT_AUTO_GC_SECONDS", "900") |> x -> parse(Int, x) |> sleep | ||
bytes1 = Base.gc_live_bytes() | ||
Base.GC.gc(true) | ||
bytes2 = Base.gc_live_bytes() | ||
@debug "WORKER: auto gc retrieved $(round((bytes1-bytes2)/1024/1024))MB" | ||
# ignore all events after the gc | ||
end | ||
end | ||
|
||
if abspath(PROGRAM_FILE) == @__FILE__ | ||
main() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could add in these docs: "For a blocking GC, you can use
Malt.remote_call_wait(GC.gc, w)
."I think the sentence
This is a non-blocking call (on the worker).
could be expanded a bit more, and be more technical about timing and throttling