Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion Project.toml
Original file line number Diff line number Diff line change
@@ -1,17 +1,23 @@
name = "Malt"
uuid = "36869731-bdee-424d-aa32-cab38c994e3b"
authors = ["Sergio Alejandro Vargas <[email protected]>"]
version = "1.1.2"
version = "1.1.3"

[deps]
Compat = "34da2185-b29b-5c13-b0c7-acf172513d20"
Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b"
Logging = "56ddb016-857b-54e1-b83d-db4d58db5568"
RelocatableFolders = "05181044-ff0b-4ac5-8273-598c1e38db00"
Serialization = "9e88b42a-f829-5b0c-bbe9-9e923198166b"
Sockets = "6462fe0b-24de-5631-8697-dd941f90decc"

[compat]
Compat = "3.47.0, 4.10.0"
Distributed = "1"
Logging = "1"
RelocatableFolders = "1"
Serialization = "1"
Sockets = "1"
julia = "^1.6"

[extras]
Expand Down
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,22 @@ julia> @time Malt.Worker();
0.964955 seconds (537 allocations: 308.734 KiB)
```

Performance of `Malt.remote_eval` is similar to `Distributed.remote_call`. We have automatic performance testing in place to ensure this.

### Limitations

In contrast to Distributed.jl, Malt.jl currently does not support launching workers on another machine (e.g. SSH remote workers).

## Worker Types

Malt.jl provides different types of workers to suit various use cases:

- **`Malt.InProcessWorker`**: Runs in the same Julia process as the host. Useful for debugging or when process isolation is not required. Lightweight and avoids the overhead of launching a separate process.
- **`Malt.DistributedStdlibWorker`**: Uses Julia's `Distributed` standard library under the hood, allowing compatibility with Malt's API while leveraging `Distributed` features.
- **`Malt.Worker`**: The default worker type that creates a new Julia process with full process isolation. Ideal for scenarios requiring sandboxing.

It is also possible to extend the `Malt.AbstractWorker` interface to create custom worker types. (E.g. `InterpretedWorker`, `WASMWorker`, `DockerWorker`, etc.)

# Sponsors

Development of Malt.jl is sponsored by:
Expand Down
22 changes: 12 additions & 10 deletions src/DistributedStdlibWorker.jl
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ const Distributed_expr = quote
end

"""
Malt.DistributedStdlibWorker()
Malt.DistributedStdlibWorker(; env=String[], exeflags=[])

This implements the same functions as `Malt.Worker` but it uses the Distributed stdlib as a backend. Can be used for backwards compatibility.
This implements the same functions as `Malt.Worker` but it uses the Distributed stdlib as a backend. Can be used for backwards compatibility. The `env` and `exeflags` are passed on to `Distributed.addprocs`.
"""
mutable struct DistributedStdlibWorker <: AbstractWorker
pid::Int64
Expand Down Expand Up @@ -40,15 +40,17 @@ Base.summary(io::IO, w::DistributedStdlibWorker) = write(io, "Malt.DistributedSt


macro transform_exception(worker, ex)
:(try
$(esc(ex))
catch e
if e isa Distributed.RemoteException
throw($(RemoteException)($(esc(worker)), sprint(showerror, e.captured)))
else
rethrow(e)
:(
try
$(esc(ex))
catch e
if e isa Distributed.RemoteException
throw($(RemoteException)($(esc(worker)), sprint(showerror, e.captured)))
else
rethrow(e)
end
end
end)
)
end

function remote_call(f, w::DistributedStdlibWorker, args...; kwargs...)
Expand Down
33 changes: 20 additions & 13 deletions src/Malt.jl
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@ module Malt
# using Logging: Logging, @debug
using Serialization: serialize, deserialize
using Sockets: Sockets
using Compat

using RelocatableFolders: RelocatableFolders

@compat public remote_call, remote_call_fetch, remote_call_wait, remote_do, remote_eval, remote_eval_fetch, remote_eval_wait, worker_channel, isrunning, stop, interrupt

include("./shared.jl")

# ENV["JULIA_DEBUG"] = @__MODULE__
Expand Down Expand Up @@ -104,11 +107,11 @@ mutable struct Worker <: AbstractWorker
# Spawn process
cmd = _get_worker_cmd(; env, exeflags)
proc = open(Cmd(
cmd;
detach=true,
windows_hide=true,
), "w+")
cmd;
detach=true,
windows_hide=true,
), "w+")

# Keep internal list
__iNtErNaL_get_running_procs()
push!(__iNtErNaL_running_procs, proc)
Expand All @@ -125,11 +128,11 @@ mutable struct Worker <: AbstractWorker
# There's no reason to keep the worker process alive after the manager loses its handle.
w = finalizer(w -> @async(stop(w)),
new(
port,
proc,
port,
proc,
getpid(proc),
socket,
MsgID(0),
socket,
MsgID(0),
Dict{MsgID,Channel{WorkerResult}}(),
)
)
Expand Down Expand Up @@ -158,15 +161,15 @@ function _exit_loop(worker::Worker)
end
sleep(1)
catch e
@error "Unexpection error inside the exit loop" worker exception=(e,catch_backtrace())
@error "Unexpection error inside the exit loop" worker exception = (e, catch_backtrace())
end
end
end

function _receive_loop(worker::Worker)
io = worker.current_socket


# Here we use:
# `for _i in Iterators.countfrom(1)`
# instead of
Expand Down Expand Up @@ -366,6 +369,10 @@ julia> promise = Malt.remote_call(uppercase ∘ *, w, "I ", "declare ", "bankrup
julia> fetch(promise)
"I DECLARE BANKRUPTCY!"
```

# Notes
- Use `Base.fetch` to retrieve the result of the computation.
- If the worker encounters an exception, it will be rethrown when `fetch` is called.
"""
function remote_call(f, w::Worker, args...; kwargs...)
_send_receive_async(
Expand Down Expand Up @@ -620,7 +627,7 @@ function interrupt(w::Worker)
@warn "Tried to interrupt a worker that has already shut down." summary(w)
else
if Sys.iswindows()
ccall((:GenerateConsoleCtrlEvent,"Kernel32"), Bool, (UInt32, UInt32), UInt32(1), UInt32(getpid(w.proc)))
ccall((:GenerateConsoleCtrlEvent, "Kernel32"), Bool, (UInt32, UInt32), UInt32(1), UInt32(getpid(w.proc)))
else
Base.kill(w.proc, Base.SIGINT)
end
Expand Down
Loading