diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 0a1adc3..ed694ef 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -25,12 +25,12 @@ jobs: os: ubuntu-latest arch: x64 steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v4 - uses: julia-actions/setup-julia@v1 with: version: ${{ matrix.version }} arch: ${{ matrix.arch }} - - uses: actions/cache@v1 + - uses: actions/cache@v4 env: cache-name: cache-artifacts with: diff --git a/Project.toml b/Project.toml index 9ee33c9..d03ffae 100644 --- a/Project.toml +++ b/Project.toml @@ -1,7 +1,7 @@ name = "JobQueueMPI" uuid = "32d208e1-246e-420c-b6ff-18b71b410923" authors = ["pedroripper "] -version = "0.1.1" +version = "0.1.2" [deps] MPI = "da04e1cc-30fd-572f-bb4f-1f8673147195" diff --git a/src/JobQueueMPI.jl b/src/JobQueueMPI.jl index 0caf7c6..0325a86 100644 --- a/src/JobQueueMPI.jl +++ b/src/JobQueueMPI.jl @@ -2,6 +2,7 @@ module JobQueueMPI using MPI +include("debug_utils.jl") include("mpi_utils.jl") include("job.jl") include("worker.jl") diff --git a/src/controller.jl b/src/controller.jl index 81f676a..94cb36b 100644 --- a/src/controller.jl +++ b/src/controller.jl @@ -80,6 +80,9 @@ function send_jobs_to_any_available_workers(controller::Controller) job = _pick_job_to_send!(controller) controller.worker_status[worker] = WORKER_BUSY request = MPI.isend(job, _mpi_comm(); dest = worker, tag = worker + 32) + if _is_debug_enabled() + _debug_message("Sending job $(job.id) to worker $worker") + end push!(controller.pending_jobs, JobRequest(worker, request)) end end @@ -96,9 +99,16 @@ function send_termination_message() for worker in 1:num_workers() request = MPI.isend(Job(0, TerminationMessage()), _mpi_comm(); dest = worker, tag = worker + 32) + if _is_debug_enabled() + _debug_message("Sending Termination message job to worker $worker") + end push!(requests, JobRequest(worker, request)) end - return _wait_all(requests) + if _is_debug_enabled() + _debug_message("Waiting for termination messsages to be received.") + end + _wait_all(requests) + return nothing end """ @@ -122,6 +132,11 @@ function check_for_job_answers(controller::Controller) source = controller.pending_jobs[j_i].worker, tag = controller.pending_jobs[j_i].worker + 32, ) + if _is_debug_enabled() + _debug_message( + "completed job $(job_answer.job_id)", + ) + end controller.worker_status[controller.pending_jobs[j_i].worker] = WORKER_AVAILABLE deleteat!(controller.pending_jobs, j_i) return job_answer diff --git a/src/debug_utils.jl b/src/debug_utils.jl new file mode 100644 index 0000000..8a84b11 --- /dev/null +++ b/src/debug_utils.jl @@ -0,0 +1,26 @@ +const DEBUG_MODE = Ref{Bool}(false) + +function enable_debug_messages() + return DEBUG_MODE[] = true +end + +function disable_debug_messages() + return DEBUG_MODE[] = false +end + +function _is_debug_enabled() + return DEBUG_MODE[] +end + +function _debug_message(message) + if is_controller_process() + open("controller_job_queue_mpi.log", "a") do io + return println(io, "DEBUG (controller): ", message) + end + else + worker_rank = my_rank() + open("worker_rank_$(worker_rank)_job_queue_mpi.log", "a") do io + return println(io, "DEBUG (worker $worker_rank): ", message) + end + end +end diff --git a/src/worker.jl b/src/worker.jl index 941fffb..1d03c0f 100644 --- a/src/worker.jl +++ b/src/worker.jl @@ -28,7 +28,11 @@ function send_job_answer_to_controller(worker::Worker, message) error("Only the controller process can send job answers.") end job = JobAnswer(worker.job_id_running, message) - return MPI.isend(job, _mpi_comm(); dest = controller_rank(), tag = worker.rank + 32) + MPI.isend(job, _mpi_comm(); dest = controller_rank(), tag = worker.rank + 32) + if _is_debug_enabled() + _debug_message("Sending job answer $(job.job_id) to controller") + end + return nothing end """ @@ -41,6 +45,9 @@ function receive_job(worker::Worker) error("Only the controller process can receive jobs.") end job = MPI.recv(_mpi_comm(); source = controller_rank(), tag = worker.rank + 32) + if _is_debug_enabled() + _debug_message("Received job $(job.id)") + end worker.job_id_running = job.id return job end diff --git a/test/test_1.jl b/test/test_1.jl index a26d8c9..76fe73b 100644 --- a/test/test_1.jl +++ b/test/test_1.jl @@ -7,6 +7,8 @@ mutable struct Message vector_idx::Int end +JQM.enable_debug_messages() + all_jobs_done(controller) = JQM.is_job_queue_empty(controller) && !JQM.any_pending_jobs(controller) function sum_100(message::Message)