Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ jobs:
os: ubuntu-latest
arch: x64
steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v4
- uses: julia-actions/setup-julia@v1
with:
version: ${{ matrix.version }}
arch: ${{ matrix.arch }}
- uses: actions/cache@v1
- uses: actions/cache@v4
env:
cache-name: cache-artifacts
with:
Expand Down
2 changes: 1 addition & 1 deletion Project.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name = "JobQueueMPI"
uuid = "32d208e1-246e-420c-b6ff-18b71b410923"
authors = ["pedroripper <pedros.ripper@gmail.com>"]
version = "0.1.1"
version = "0.1.2"

[deps]
MPI = "da04e1cc-30fd-572f-bb4f-1f8673147195"
Expand Down
1 change: 1 addition & 0 deletions src/JobQueueMPI.jl
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ module JobQueueMPI

using MPI

include("debug_utils.jl")
include("mpi_utils.jl")
include("job.jl")
include("worker.jl")
Expand Down
17 changes: 16 additions & 1 deletion src/controller.jl
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ function send_jobs_to_any_available_workers(controller::Controller)
job = _pick_job_to_send!(controller)
controller.worker_status[worker] = WORKER_BUSY
request = MPI.isend(job, _mpi_comm(); dest = worker, tag = worker + 32)
if _is_debug_enabled()
_debug_message("Sending job $(job.id) to worker $worker")
end
push!(controller.pending_jobs, JobRequest(worker, request))
end
end
Expand All @@ -96,9 +99,16 @@ function send_termination_message()
for worker in 1:num_workers()
request =
MPI.isend(Job(0, TerminationMessage()), _mpi_comm(); dest = worker, tag = worker + 32)
if _is_debug_enabled()
_debug_message("Sending Termination message job to worker $worker")
end
push!(requests, JobRequest(worker, request))
end
return _wait_all(requests)
if _is_debug_enabled()
_debug_message("Waiting for termination messsages to be received.")
end
_wait_all(requests)
return nothing
end

"""
Expand All @@ -122,6 +132,11 @@ function check_for_job_answers(controller::Controller)
source = controller.pending_jobs[j_i].worker,
tag = controller.pending_jobs[j_i].worker + 32,
)
if _is_debug_enabled()
_debug_message(
"completed job $(job_answer.job_id)",
)
end
controller.worker_status[controller.pending_jobs[j_i].worker] = WORKER_AVAILABLE
deleteat!(controller.pending_jobs, j_i)
return job_answer
Expand Down
26 changes: 26 additions & 0 deletions src/debug_utils.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
const DEBUG_MODE = Ref{Bool}(false)

function enable_debug_messages()
return DEBUG_MODE[] = true
end

function disable_debug_messages()
return DEBUG_MODE[] = false
end

function _is_debug_enabled()
return DEBUG_MODE[]
end

function _debug_message(message)
if is_controller_process()
open("controller_job_queue_mpi.log", "a") do io
return println(io, "DEBUG (controller): ", message)
end
else
worker_rank = my_rank()
open("worker_rank_$(worker_rank)_job_queue_mpi.log", "a") do io
return println(io, "DEBUG (worker $worker_rank): ", message)
end
end
end
9 changes: 8 additions & 1 deletion src/worker.jl
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@ function send_job_answer_to_controller(worker::Worker, message)
error("Only the controller process can send job answers.")
end
job = JobAnswer(worker.job_id_running, message)
return MPI.isend(job, _mpi_comm(); dest = controller_rank(), tag = worker.rank + 32)
MPI.isend(job, _mpi_comm(); dest = controller_rank(), tag = worker.rank + 32)
if _is_debug_enabled()
_debug_message("Sending job answer $(job.job_id) to controller")
end
return nothing
end

"""
Expand All @@ -41,6 +45,9 @@ function receive_job(worker::Worker)
error("Only the controller process can receive jobs.")
end
job = MPI.recv(_mpi_comm(); source = controller_rank(), tag = worker.rank + 32)
if _is_debug_enabled()
_debug_message("Received job $(job.id)")
end
worker.job_id_running = job.id
return job
end
2 changes: 2 additions & 0 deletions test/test_1.jl
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ mutable struct Message
vector_idx::Int
end

JQM.enable_debug_messages()

all_jobs_done(controller) = JQM.is_job_queue_empty(controller) && !JQM.any_pending_jobs(controller)

function sum_100(message::Message)
Expand Down
Loading