From 9a70d6f4f0f9ab39d744edd0ab9da74af401456d Mon Sep 17 00:00:00 2001 From: Fons van der Plas Date: Mon, 14 Apr 2025 10:54:48 +0200 Subject: [PATCH 1/4] Update DistributedStdlibWorker.jl --- src/DistributedStdlibWorker.jl | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/src/DistributedStdlibWorker.jl b/src/DistributedStdlibWorker.jl index 2d57ab4..5b5cbd7 100644 --- a/src/DistributedStdlibWorker.jl +++ b/src/DistributedStdlibWorker.jl @@ -6,9 +6,9 @@ const Distributed_expr = quote end """ - Malt.DistributedStdlibWorker() + Malt.DistributedStdlibWorker(; env=String[], exeflags=[]) -This implements the same functions as `Malt.Worker` but it uses the Distributed stdlib as a backend. Can be used for backwards compatibility. +This implements the same functions as `Malt.Worker` but it uses the Distributed stdlib as a backend. Can be used for backwards compatibility. The `env` and `exeflags` are passed on to `Distributed.addprocs`. """ mutable struct DistributedStdlibWorker <: AbstractWorker pid::Int64 @@ -40,15 +40,17 @@ Base.summary(io::IO, w::DistributedStdlibWorker) = write(io, "Malt.DistributedSt macro transform_exception(worker, ex) - :(try - $(esc(ex)) - catch e - if e isa Distributed.RemoteException - throw($(RemoteException)($(esc(worker)), sprint(showerror, e.captured))) - else - rethrow(e) + :( + try + $(esc(ex)) + catch e + if e isa Distributed.RemoteException + throw($(RemoteException)($(esc(worker)), sprint(showerror, e.captured))) + else + rethrow(e) + end end - end) + ) end function remote_call(f, w::DistributedStdlibWorker, args...; kwargs...) From eb3c7b82f30d30255cb1755abca498dbf9c6256e Mon Sep 17 00:00:00 2001 From: Fons van der Plas Date: Mon, 14 Apr 2025 10:54:50 +0200 Subject: [PATCH 2/4] Update README.md --- README.md | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/README.md b/README.md index c7fdafa..1ace94f 100644 --- a/README.md +++ b/README.md @@ -85,10 +85,22 @@ julia> @time Malt.Worker(); 0.964955 seconds (537 allocations: 308.734 KiB) ``` +Performance of `Malt.remote_eval` is similar to `Distributed.remote_call`. We have automatic performance testing in place to ensure this. + ### Limitations In contrast to Distributed.jl, Malt.jl currently does not support launching workers on another machine (e.g. SSH remote workers). +## Worker Types + +Malt.jl provides different types of workers to suit various use cases: + +- **`Malt.InProcessWorker`**: Runs in the same Julia process as the host. Useful for debugging or when process isolation is not required. Lightweight and avoids the overhead of launching a separate process. +- **`Malt.DistributedStdlibWorker`**: Uses Julia's `Distributed` standard library under the hood, allowing compatibility with Malt's API while leveraging `Distributed` features. +- **`Malt.Worker`**: The default worker type that creates a new Julia process with full process isolation. Ideal for scenarios requiring sandboxing. + +It is also possible to extend the `Malt.AbstractWorker` interface to create custom worker types. (E.g. `InterpretedWorker`, `WASMWorker`, `DockerWorker`, etc.) + # Sponsors Development of Malt.jl is sponsored by: From d25126a6d38b73370f3c67d173695d72f3f93247 Mon Sep 17 00:00:00 2001 From: Fons van der Plas Date: Mon, 14 Apr 2025 10:54:52 +0200 Subject: [PATCH 3/4] Update Project.toml --- Project.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Project.toml b/Project.toml index 171486c..cb4cabe 100644 --- a/Project.toml +++ b/Project.toml @@ -1,7 +1,7 @@ name = "Malt" uuid = "36869731-bdee-424d-aa32-cab38c994e3b" authors = ["Sergio Alejandro Vargas "] -version = "1.1.2" +version = "1.1.3" [deps] Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b" From 39a7ba55978e1de44a1cd65b527dede744d55221 Mon Sep 17 00:00:00 2001 From: Fons van der Plas Date: Mon, 14 Apr 2025 11:00:01 +0200 Subject: [PATCH 4/4] asdf --- Project.toml | 6 ++++++ src/Malt.jl | 33 ++++++++++++++++++++------------- 2 files changed, 26 insertions(+), 13 deletions(-) diff --git a/Project.toml b/Project.toml index cb4cabe..d008a67 100644 --- a/Project.toml +++ b/Project.toml @@ -4,6 +4,7 @@ authors = ["Sergio Alejandro Vargas "] version = "1.1.3" [deps] +Compat = "34da2185-b29b-5c13-b0c7-acf172513d20" Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b" Logging = "56ddb016-857b-54e1-b83d-db4d58db5568" RelocatableFolders = "05181044-ff0b-4ac5-8273-598c1e38db00" @@ -11,7 +12,12 @@ Serialization = "9e88b42a-f829-5b0c-bbe9-9e923198166b" Sockets = "6462fe0b-24de-5631-8697-dd941f90decc" [compat] +Compat = "3.47.0, 4.10.0" +Distributed = "1" +Logging = "1" RelocatableFolders = "1" +Serialization = "1" +Sockets = "1" julia = "^1.6" [extras] diff --git a/src/Malt.jl b/src/Malt.jl index 043392e..080e28e 100644 --- a/src/Malt.jl +++ b/src/Malt.jl @@ -8,9 +8,12 @@ module Malt # using Logging: Logging, @debug using Serialization: serialize, deserialize using Sockets: Sockets +using Compat using RelocatableFolders: RelocatableFolders +@compat public remote_call, remote_call_fetch, remote_call_wait, remote_do, remote_eval, remote_eval_fetch, remote_eval_wait, worker_channel, isrunning, stop, interrupt + include("./shared.jl") # ENV["JULIA_DEBUG"] = @__MODULE__ @@ -104,11 +107,11 @@ mutable struct Worker <: AbstractWorker # Spawn process cmd = _get_worker_cmd(; env, exeflags) proc = open(Cmd( - cmd; - detach=true, - windows_hide=true, - ), "w+") - + cmd; + detach=true, + windows_hide=true, + ), "w+") + # Keep internal list __iNtErNaL_get_running_procs() push!(__iNtErNaL_running_procs, proc) @@ -125,11 +128,11 @@ mutable struct Worker <: AbstractWorker # There's no reason to keep the worker process alive after the manager loses its handle. w = finalizer(w -> @async(stop(w)), new( - port, - proc, + port, + proc, getpid(proc), - socket, - MsgID(0), + socket, + MsgID(0), Dict{MsgID,Channel{WorkerResult}}(), ) ) @@ -158,15 +161,15 @@ function _exit_loop(worker::Worker) end sleep(1) catch e - @error "Unexpection error inside the exit loop" worker exception=(e,catch_backtrace()) + @error "Unexpection error inside the exit loop" worker exception = (e, catch_backtrace()) end end end function _receive_loop(worker::Worker) io = worker.current_socket - - + + # Here we use: # `for _i in Iterators.countfrom(1)` # instead of @@ -366,6 +369,10 @@ julia> promise = Malt.remote_call(uppercase ∘ *, w, "I ", "declare ", "bankrup julia> fetch(promise) "I DECLARE BANKRUPTCY!" ``` + +# Notes +- Use `Base.fetch` to retrieve the result of the computation. +- If the worker encounters an exception, it will be rethrown when `fetch` is called. """ function remote_call(f, w::Worker, args...; kwargs...) _send_receive_async( @@ -620,7 +627,7 @@ function interrupt(w::Worker) @warn "Tried to interrupt a worker that has already shut down." summary(w) else if Sys.iswindows() - ccall((:GenerateConsoleCtrlEvent,"Kernel32"), Bool, (UInt32, UInt32), UInt32(1), UInt32(getpid(w.proc))) + ccall((:GenerateConsoleCtrlEvent, "Kernel32"), Bool, (UInt32, UInt32), UInt32(1), UInt32(getpid(w.proc))) else Base.kill(w.proc, Base.SIGINT) end