Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions lib_eio_linux/eio_linux.ml
Original file line number Diff line number Diff line change
Expand Up @@ -505,8 +505,8 @@ let stdenv ~run_event_loop =
method backend_id = "linux"
end

let run_event_loop (type a) ?fallback config (main : _ -> a) arg : a =
Sched.with_sched ?fallback config @@ fun st ->
let run_event_loop (type a) ?fallback ?eventfd config (main : _ -> a) arg : a =
Sched.with_sched ?fallback ?eventfd config @@ fun st ->
let open Effect.Deep in
let extra_effects : _ effect_handler = {
effc = fun (type a) (e : a Effect.t) : ((a, Sched.exit) continuation -> Sched.exit) option ->
Expand Down Expand Up @@ -561,9 +561,9 @@ let run_event_loop (type a) ?fallback config (main : _ -> a) arg : a =
} in
Sched.run ~extra_effects st main arg

let run ?queue_depth ?n_blocks ?block_size ?polling_timeout ?fallback main =
let run ?queue_depth ?n_blocks ?block_size ?polling_timeout ?fallback ?eventfd main =
let config = Sched.config ?queue_depth ?n_blocks ?block_size ?polling_timeout () in
let stdenv = stdenv ~run_event_loop:(run_event_loop ?fallback:None config) in
let stdenv = stdenv ~run_event_loop:(run_event_loop ?fallback:None ?eventfd config) in
(* SIGPIPE makes no sense in a modern application. *)
Sys.(set_signal sigpipe Signal_ignore);
run_event_loop ?fallback config main stdenv
run_event_loop ?fallback ?eventfd config main stdenv
5 changes: 4 additions & 1 deletion lib_eio_linux/eio_linux.mli
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ val run :
?block_size:int ->
?polling_timeout:int ->
?fallback:([`Msg of string] -> 'a) ->
?eventfd:(int -> Unix.file_descr) ->
(stdenv -> 'a) -> 'a
(** Run an event loop using io_uring.

Expand All @@ -45,6 +46,8 @@ val run :

@param fallback Call this instead if io_uring is not available for some reason.
The argument is a message describing the problem (for logging).
The default simply raises an exception. *)
The default simply raises an exception.
@param eventfd Call this instead of a binding to eventfd(2) to create an eventfd
for the scheduler. *)

module Low_level = Low_level
8 changes: 4 additions & 4 deletions lib_eio_linux/sched.ml
Original file line number Diff line number Diff line change
Expand Up @@ -498,8 +498,8 @@ external eio_eventfd : int -> Unix.file_descr = "caml_eio_eventfd"

let no_fallback (`Msg msg) = failwith msg

let with_eventfd fn =
let eventfd = Eio_unix.Private.Rcfd.make (eio_eventfd 0) in
let with_eventfd ?(eventfd = eio_eventfd) fn =
let eventfd = Eio_unix.Private.Rcfd.make (eventfd 0) in
let close () =
if not (Eio_unix.Private.Rcfd.close eventfd) then failwith "eventfd already closed!"
in
Expand All @@ -510,7 +510,7 @@ let with_eventfd fn =
close ();
Printexc.raise_with_backtrace ex bt

let with_sched ?(fallback=no_fallback) config fn =
let with_sched ?(fallback=no_fallback) ?eventfd config fn =
let { queue_depth; n_blocks; block_size; polling_timeout } = config in
match Uring.create ~queue_depth ?polling_timeout () with
| exception Unix.Unix_error(ENOSYS, _, _) -> fallback (`Msg "io_uring is not available on this system")
Expand Down Expand Up @@ -540,7 +540,7 @@ let with_sched ?(fallback=no_fallback) config fn =
let sleep_q = Zzz.create () in
let io_q = Queue.create () in
let mem_q = Lwt_dllist.create () in
with_eventfd @@ fun eventfd ->
with_eventfd ?eventfd @@ fun eventfd ->
let thread_pool = Eio_unix.Private.Thread_pool.create ~sleep_q in
fn { mem; uring; run_q; io_q; mem_q; eventfd; need_wakeup = Atomic.make false; sleep_q; thread_pool }
with
Expand Down
18 changes: 10 additions & 8 deletions lib_eio_posix/domain_mgr.ml
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ let socketpair k ~sw ~domain ~ty ~protocol wrap_a wrap_b =
discontinue k (Err.wrap code name arg)

(* Run an event loop in the current domain, using [fn x] as the root fiber. *)
let run_event_loop fn x =
Sched.with_sched @@ fun sched ->
let run_event_loop ?pipe fn x =
Sched.with_sched ?pipe @@ fun sched ->
let open Effect.Deep in
let extra_effects : _ effect_handler = {
effc = fun (type a) (e : a Effect.t) : ((a, Sched.exit) continuation -> Sched.exit) option ->
Expand Down Expand Up @@ -94,34 +94,36 @@ let unwrap_backtrace = function
| Error (ex, bt) -> Printexc.raise_with_backtrace ex bt

module Impl = struct
type t = unit
type t = {
pipe : (cloexec:bool -> Unix.file_descr * Unix.file_descr) option;
}

let domain_spawn ctx enqueue fn =
Domain.spawn @@ fun () ->
Trace.domain_spawn ~parent:(Eio.Private.Fiber_context.tid ctx);
Fun.protect fn ~finally:(fun () -> enqueue (Ok ()))

let run_raw () fn =
let run_raw _t fn =
let domain = ref None in
Eio.Private.Suspend.enter "run-domain" (fun ctx enqueue ->
domain := Some (domain_spawn ctx enqueue (wrap_backtrace fn))
);
Trace.with_span "Domain.join" @@ fun () ->
unwrap_backtrace (Domain.join (Option.get !domain))

let run () fn =
let run t fn =
let domain = ref None in
Eio.Private.Suspend.enter "run-domain" (fun ctx enqueue ->
let cancelled, set_cancelled = Promise.create () in
Eio.Private.Fiber_context.set_cancel_fn ctx (Promise.resolve set_cancelled);
domain := Some (domain_spawn ctx enqueue (fun () ->
run_event_loop (wrap_backtrace (fun () -> fn ~cancelled)) ()
run_event_loop ?pipe:t.pipe (wrap_backtrace (fun () -> fn ~cancelled)) ()
))
);
Trace.with_span "Domain.join" @@ fun () ->
unwrap_backtrace (Domain.join (Option.get !domain))
end

let v =
let v ?pipe () =
let handler = Eio.Domain_manager.Pi.mgr (module Impl) in
Eio.Resource.T ((), handler)
Eio.Resource.T (Impl.{ pipe }, handler)
4 changes: 2 additions & 2 deletions lib_eio_posix/eio_posix.ml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ module Low_level = Low_level

type stdenv = Eio_unix.Stdenv.base

let run main =
let run ?pipe main =
(* SIGPIPE makes no sense in a modern application. *)
Sys.(set_signal sigpipe Signal_ignore);
Eio_unix.Process.install_sigchld_handler ();
Expand All @@ -34,7 +34,7 @@ let run main =
method mono_clock = Time.mono_clock
method net = Net.v
method process_mgr = Process.mgr
method domain_mgr = Domain_mgr.v
method domain_mgr = Domain_mgr.v ?pipe ()
method cwd = ((Fs.cwd, "") :> Eio.Fs.dir_ty Eio.Path.t)
method fs = ((Fs.fs, "") :> Eio.Fs.dir_ty Eio.Path.t)
method secure_random = Flow.secure_random
Expand Down
6 changes: 4 additions & 2 deletions lib_eio_posix/eio_posix.mli
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
type stdenv = Eio_unix.Stdenv.base
(** The type of the standard set of resources available on POSIX systems. *)

val run : (stdenv -> 'a) -> 'a
val run : ?pipe:(cloexec:bool -> Unix.file_descr * Unix.file_descr) -> (stdenv -> 'a) -> 'a
(** [run main] runs an event loop and calls [main stdenv] inside it.

For portable code, you should use {!Eio_main.run} instead, which will call this for you if appropriate. *)
For portable code, you should use {!Eio_main.run} instead, which will call this for you if appropriate.

@param pipe Configure how to create a pipe for the underlying scheduler (defaults to {! Unix.pipe}). *)

module Low_level = Low_level
(** Low-level API for making POSIX calls directly. *)
4 changes: 2 additions & 2 deletions lib_eio_posix/sched.ml
Original file line number Diff line number Diff line change
Expand Up @@ -233,11 +233,11 @@ let rec next t : [`Exit_scheduler] =
next t
)

let with_sched fn =
let with_sched ?(pipe=(fun ~cloexec -> Unix.pipe ~cloexec ())) fn =
let run_q = Lf_queue.create () in
Lf_queue.push run_q IO;
let sleep_q = Zzz.create () in
let eventfd_r, eventfd_w = Unix.pipe ~cloexec:true () in
let eventfd_r, eventfd_w = pipe ~cloexec:true in
Unix.set_nonblock eventfd_r;
Unix.set_nonblock eventfd_w;
let eventfd = Rcfd.make eventfd_w in
Expand Down
6 changes: 4 additions & 2 deletions lib_eio_posix/sched.mli
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ type exit
and so does not return until the whole event loop is finished. Such functions should normally
be called in tail position. *)

val with_sched : (t -> 'a) -> 'a
val with_sched : ?pipe:(cloexec:bool -> Unix.file_descr * Unix.file_descr) -> (t -> 'a) -> 'a
(** [with_sched fn] sets up a scheduler and calls [fn t].
Typically [fn] will call {!run}.
When [fn] returns, the scheduler's resources are freed. *)
When [fn] returns, the scheduler's resources are freed.

@param pipe Can be used to change the function that provides a pipe for the scheduler. *)

val run :
extra_effects:exit Effect.Deep.effect_handler ->
Expand Down
Loading