Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
_build
api.txt
test.json
test.json
.capnp
14 changes: 14 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,17 @@ clarke monitor --meter=variorum

The Intelligent Platform Management Interface (IPMI) is another way we can query information about the power usage of a computer, or usually in this case a server. Using `--meter=ipmi` will try to use `ipmi-tool` to query sensors for power consumption statistics.

## Collecting Information

Clarke allows you to output the period power usage information to "flows" (like `stdout`, a file and a socket) and also to a capnp address. The command line tool can be used to generate an address.

```
clarke serve
+Server address: capnp://address
```

You can then point the monitor to this and it will send the information to a centralised server.

```
clarke monitor --machine=my-machine --period=60 -c gb --output=capnp:.capnp --reporter=log --report-period=10000
```
18 changes: 12 additions & 6 deletions clarke.opam
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ dev-repo: "git+https://github.com/patricoferris/clarke.git"
doc: "https://patricoferris.github.io/clarke/"
build: ["dune" "build" "-p" name "-j" jobs]
depends: [
"dune" {>= "2.0"}
"eio_luv"
"dune" {>= "2.0"}
"eio_luv" {>= "0.7"}
"variorum"
"ptime"
"cmdliner"
Expand All @@ -21,14 +21,20 @@ depends: [
"lwt_eio"
"prometheus"
"prometheus-app"
"ISO3166"
"carbon"
"capnp-rpc-lwt"
"capnp-rpc-unix"
]
pin-depends: [
[ "hwloc.dev" "git+https://github.com/patricoferris/ocaml-hwloc#3447dc5c40f0d868529aafcc84b5d2d971062b30" ]
[ "jansson.dev" "git+https://github.com/patricoferris/ocaml-jansson#42cb429e722ec64807d75ae401758eb666c9d189" ]
[ "eio.dev" "git+https://github.com/patricoferris/eio#6e681fec1f5894c29c28a042ab107675dd0fd518" ]
[ "eio_luv.dev" "git+https://github.com/patricoferris/eio#6e681fec1f5894c29c28a042ab107675dd0fd518" ]
[ "variorum.dev" "git+https://github.com/patricoferris/ocaml-variorum#9128770e9df08ca7a90f317acc08b9cd49da6766" ]
[ "ISO3166.dev" "git+https://github.com/geocaml/ISO3166#425ce3790f8e0a94ad31dd3ff6033463d645b3db" ]
[ "carbon.dev" "git+https://github.com/geocaml/carbon-intensity#2d9e7b0376584fcf931f7847d32acfcde8a0b83c" ]
[ "carbon.dev" "git+https://github.com/geocaml/carbon-intensity#872bc97cf1e98c55a6ea6f72f485bcaa605d19b1" ]
[ "http.dev" "git+https://github.com/mirage/ocaml-cohttp#df6e8fc58edac41b285c9b6d2bbc0b23533fa713"]
[ "cohttp-eio.dev" "git+https://github.com/mirage/ocaml-cohttp#df6e8fc58edac41b285c9b6d2bbc0b23533fa713"]
[ "capnp-rpc.dev" "git+https://github.com/talex5/capnp-rpc#c06e767b8a4ec5a87a07e62958cdb55d0d59c59d" ]
[ "capnp-rpc-net.dev" "git+https://github.com/talex5/capnp-rpc#c06e767b8a4ec5a87a07e62958cdb55d0d59c59d" ]
[ "capnp-rpc-lwt.dev" "git+https://github.com/talex5/capnp-rpc#c06e767b8a4ec5a87a07e62958cdb55d0d59c59d" ]
[ "capnp-rpc-unix.dev" "git+https://github.com/talex5/capnp-rpc#c06e767b8a4ec5a87a07e62958cdb55d0d59c59d" ]
]
1 change: 1 addition & 0 deletions src/bin/dune
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
(libraries
eio_luv
eio.unix
capnp-rpc-unix
clarke
cmdliner
clarke.prometheus-eio
Expand Down
116 changes: 77 additions & 39 deletions src/bin/main.ml
Original file line number Diff line number Diff line change
Expand Up @@ -11,47 +11,74 @@ module Metrics = struct
Prometheus.Gauge.v ~help ~namespace ~subsystem "intensity"
end

type output = O : (module S.Output with type t = 'a) * 'a -> output

let with_socket ~sw net v fn =
let listener = Net.listen ~sw ~backlog:5 net v in
while true do
Net.accept_fork ~sw listener ~on_error:(fun e ->
traceln "Err: %s" (Printexc.to_string e))
@@ fun socket _addr -> fn (socket :> Flow.sink)
@@ fun socket _addr ->
fn
(O
( (module Outputs.Flow : S.Output with type t = Flow.sink),
(socket :> Flow.sink) ))
done

module Specs = struct
type output_spec = [ `Stdout | Net.Sockaddr.stream | `File of string ]
type output_spec =
| Flow of [ `Stdout | Net.Sockaddr.stream | `File of string ]
| Capnp of string

let with_sink ~sw ~fs ~stdout ~net spec fn =
let with_output ~sw ~fs ~stdout ~net spec fn =
match spec with
| `Stdout -> fn (stdout :> Flow.sink)
| `File f ->
| Flow `Stdout ->
fn
(O
( (module Outputs.Flow : S.Output with type t = Flow.sink),
(stdout :> Flow.sink) ))
| Flow (`File f) ->
Path.with_open_out ~append:true ~create:(`If_missing 0o644)
Path.(fs / f)
@@ fun flow -> fn (flow :> Flow.sink)
| #Net.Sockaddr.stream as v -> with_socket ~sw net v fn
@@ fun flow ->
fn
(O
( (module Outputs.Flow : S.Output with type t = Flow.sink),
(flow :> Flow.sink) ))
| Flow (#Net.Sockaddr.stream as v) -> with_socket ~sw net v fn
| Capnp f ->
let uri = Path.(load (fs / f)) |> String.trim |> Uri.of_string in
let client_vat = Capnp_rpc_unix.client_only_vat ~sw net in
let sr = Capnp_rpc_unix.Vat.import_exn client_vat uri in
Capnp_rpc_lwt.Sturdy_ref.with_cap_exn sr @@ fun client ->
fn
(O
( (module Outputs.Capnp : S.Output with type t = Capnp_client.t),
client ))

let output_spec_of_string s =
match String.lowercase_ascii s with
| "stdout" -> Ok `Stdout
| "stdout" -> Ok (Flow `Stdout)
| v -> (
match String.split_on_char ':' v with
| [ "file"; path ] -> Ok (`File path)
| [ "file"; path ] -> Ok (Flow (`File path))
| [ "tcp"; addr; port ] -> (
try
let addr =
if addr = "loopback" then Net.Ipaddr.V4.loopback
else Net.Ipaddr.of_raw addr
in
Ok (`Tcp (addr, int_of_string port))
Ok (Flow (`Tcp (addr, int_of_string port)))
with _ -> Error (`Msg "Port parsing failed"))
| [ "unix"; addr ] -> Ok (`Unix addr)
| [ "unix"; addr ] -> Ok (Flow (`Unix addr))
| [ "capnp"; path ] -> Ok (Capnp path)
| _ -> Error (`Msg ("Unknown " ^ v)))

let pp_output_spec ppf = function
| `File f -> Fmt.pf ppf "file:%s" f
| `Stdout -> Format.pp_print_string ppf "stdout"
| #Net.Sockaddr.stream as v -> Net.Sockaddr.pp ppf v
| Flow (`File f) -> Fmt.pf ppf "file:%s" f
| Flow `Stdout -> Format.pp_print_string ppf "stdout"
| Flow (#Net.Sockaddr.stream as v) -> Net.Sockaddr.pp ppf v
| Capnp file -> Fmt.pf ppf "capnp:%s" file

let output_spec = Cmdliner.Arg.conv (output_spec_of_string, pp_output_spec)

Expand Down Expand Up @@ -91,8 +118,11 @@ let meter_spec_term =

let output_spec_term =
Arg.value
@@ Arg.opt Specs.output_spec `Stdout
@@ Arg.info ~doc:"The output location of the monitoring data"
@@ Arg.opt Specs.output_spec (Flow `Stdout)
@@ Arg.info
~doc:
"The output location of the monitoring data, which can be a capnp \
capability written to a file"
[ "o"; "output" ]

let period_term =
Expand Down Expand Up @@ -156,7 +186,7 @@ let get_intensity ?country ?api_code net =
| None, None -> None
| Some code, None ->
if `GB = code then
Carbon.Gb.get_intensity net
Carbon.Gb.(get_intensity (v net))
|> Carbon.Gb.Intensity.actual |> Option.map float_of_int
else (
Logs.warn (fun f ->
Expand All @@ -167,8 +197,8 @@ let get_intensity ?country ?api_code net =
Logs.info (fun f ->
f "Calculating carbon intensity for %s"
(ISO3166.alpha2_to_string country_code));
let v = Carbon.Co2_signal.v api in
let i = Co2_signal.get_intensity ~net v ~country_code in
let v = Carbon.Co2_signal.v ~api_key:api net in
let i = Co2_signal.get_intensity v ~country_code in
Co2_signal.Intensity.intensity i |> float_of_int |> Option.some
| _ -> None

Expand Down Expand Up @@ -197,14 +227,14 @@ let report ~env ~machine spec file =
let s =
Path.(with_open_in (env#fs / file)) @@ fun flow ->
let reader = Buf_read.of_flow ~max_size:max_int flow in
Clarke.Summary.summary reader |> Result.get_ok
Clarke.Summary.summary ~format:`Csv reader |> Result.get_ok
in
Path.(unlink (env#fs / file));
T.report ~machine conf s |> log_err

let monitor ~env ~stdout ~net ~clock =
let run () machine output meter period prom country api_code reporter_spec
reporter_period =
let run () machine output_spec meter period prom country api_code
reporter_spec reporter_period =
let api_code =
Option.map (fun f -> Path.(load (env#fs / f) |> String.trim)) api_code
in
Expand All @@ -215,6 +245,11 @@ let monitor ~env ~stdout ~net ~clock =
i
in
Logs.info (fun f -> f "Monitoring...");
(* The log file is used for summaries, it should be optional
if the reporter is defined or not. *)
let log_file =
Filename.(temp_file ~temp_dir:(get_temp_dir_name ()) "clarke" ".log")
in
Switch.run @@ fun sw ->
let intensity = ref (get_intensity ()) in
let (S.Meter ((module M), t) : S.meter) =
Expand All @@ -231,27 +266,29 @@ let monitor ~env ~stdout ~net ~clock =
intensity := get_intensity ()
done);
(fun () ->
match output with
| `File file ->
while true do
Eio_unix.sleep reporter_period;
report ~env ~machine reporter_spec file
done
| _ ->
Logs.warn (fun f ->
f
"Reporter will only work if data is being output to a \
file"));
(fun () ->
Logs.info (fun f -> f "Reporter log: %s" log_file);
while true do
Specs.with_sink ~sw ~fs ~stdout ~net output (fun sink ->
Eio_unix.sleep reporter_period;
report ~env ~machine reporter_spec log_file
done);
(fun () ->
(* The reporter deletes the file after reporting for similicty
so it needs to be recreated. *)
Specs.with_output ~sw ~fs ~stdout ~net output_spec
(fun (O ((module O), o) : output) ->
while true do
Path.(
with_open_out ~append:true ~create:(`If_missing 0o644)
(fs / log_file))
@@ fun log ->
let info = M.collect t in
let info = update_info_with_intensity !intensity info in
Outputs.Flow.send sink info;
O.send ~machine o info;
Flow.copy_string (Info.to_csv info) log;
Flow.copy_string "\n" log;
Prometheus.Gauge.set Clarke.Metrics.watts (Info.watts info);
Eio.Flow.copy_string "\n" sink;
Eio_unix.sleep (float_of_int period))
done);
Eio_unix.sleep (float_of_int period)
done));
];
Ok ())
in
Expand All @@ -265,6 +302,7 @@ let monitor ~env ~stdout ~net ~clock =
let cmds env =
[
monitor ~env ~stdout:env#stdout ~net:env#net ~clock:env#clock;
Server.cmd setup_log ~net:env#net;
Calc.cmd setup_log env#fs;
]

Expand Down
34 changes: 34 additions & 0 deletions src/bin/server.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
(* A Capnp Server that accepts machines reporting their
monitoring information *)
open Clarke

module Store = struct
let add_raw_string ~machine ~msg =
let v = Info.of_json msg in
Eio.traceln "Machine: %s, Info: %a" machine Info.pp v
end

let secret_key = `Ephemeral
let listen_address = `TCP ("127.0.0.1", 7000)

let start_server ~sw net =
let config = Capnp_rpc_unix.Vat_config.create ~secret_key listen_address in
let service_id = Capnp_rpc_unix.Vat_config.derived_id config "main" in
let restore =
Capnp_rpc_net.Restorer.single service_id
(Capnp_client.Reporter.local Store.add_raw_string)
in
let vat = Capnp_rpc_unix.serve ~sw ~net ~restore config in
Capnp_rpc_unix.Vat.sturdy_uri vat service_id

let run_server ~sw net =
let uri = start_server ~sw net in
Eio.traceln "Server address: %a" Uri.pp uri;
Ok ()

open Cmdliner

let cmd setup_log ~net =
let main () = Eio.Switch.run @@ fun sw -> run_server ~sw net in
let info = Cmd.info "serve" in
Cmd.v info Term.(const main $ setup_log)
40 changes: 40 additions & 0 deletions src/lib/capnp_client.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
module Api = Carbon_api.MakeRPC (Capnp_rpc_lwt)
open Capnp_rpc_lwt

type t = Api.Client.Reporter.t Capability.t

module Reporter = struct
let local fn =
let module Reporter = Api.Service.Reporter in
Reporter.local
@@ object
inherit Reporter.service

method report_impl params release_param_caps =
let open Reporter.Report in
let machine = Params.machine_get params in
let msg = Params.msg_get params in
release_param_caps ();
fn ~machine ~msg;
let response, results =
Service.Response.create Results.init_pointer
in
Results.reply_set results "success";
Service.return response
end

let report t machine msg =
let open Api.Client.Reporter in
let request, params =
Capability.Request.create Report.Params.init_pointer
in
Report.Params.machine_set params machine;
Report.Params.msg_set params msg;
Capability.call_for_unit t Report.method_id request
end

let report ~machine t info =
let msg = Info.to_json info in
match Reporter.report t machine msg with
| Ok () -> Ok ()
| Error (`Capnp cap) -> Error (`Msg (Fmt.to_to_string Capnp_rpc.Error.pp cap))
5 changes: 5 additions & 0 deletions src/lib/carbon_api.capnp
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
@0xb8d9dcb6ce8d71bf;

interface Reporter {
report @0 (machine :Text, msg :Text) -> (reply :Text);
}
1 change: 1 addition & 0 deletions src/lib/clarke.ml
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ module Models = Models
module Outputs = Outputs
module Metrics = Metrics
module Summary = Summary
module Capnp_client = Capnp_client
19 changes: 18 additions & 1 deletion src/lib/dune
Original file line number Diff line number Diff line change
@@ -1,4 +1,21 @@
(rule
(targets carbon_api.ml carbon_api.mli)
(deps carbon_api.capnp)
(action
(run capnp compile -o %{bin:capnpc-ocaml} %{deps})))

(library
(name clarke)
(public_name clarke)
(libraries eio_luv eio.unix ptime variorum ezjsonm prometheus carbon))
(flags
(:standard -w -53-55))
(libraries
eio_luv
eio.unix
ptime
variorum
ezjsonm
prometheus
carbon
capnp
capnp-rpc-lwt))
Loading