diff --git a/.github/actions/install-bitcoin/action.yml b/.github/actions/install-bitcoin/action.yml new file mode 100644 index 00000000..c84dbda1 --- /dev/null +++ b/.github/actions/install-bitcoin/action.yml @@ -0,0 +1,79 @@ +name: 'Install and Verify Bitcoin Core' +description: 'Downloads, Verify signatures and installs Bitcoin Core' + +inputs: + version: + description: 'Bitcoin Core version to install' + required: true + default: '31.0' + trusted_keys: + description: 'List of GPG fingerprints to validate' + required: false + # Signatures from the following GPG public keys checked during verification of the source code. + # The list can be found at https://github.com/bitcoin-core/guix.sigs/tree/main/builder-keys + # 15281230078...: achow101.gpg + # 9EDAFF80E08...: Emzy.gpg + # D1DBF2C4B96...: hebasto.gpg + default: >- + 152812300785C96444D3334D17565732E08E5E41 + 9EDAFF80E080659604F4A76B2EBB056FD847F8A7 + D1DBF2C4B96F2DEBF4C16654410108112E7EA81F + +runs: + using: "composite" + steps: + - name: Install verification dependencies + shell: bash + run: sudo apt-get update && sudo apt-get install -y gnupg wget + + - name: Download and Verify Bitcoin Core + shell: bash + run: | + VERSION="${{ inputs.version }}" + + # 1. Download Binaries & Manifests + wget -q "https://bitcoincore.org/bin/bitcoin-core-${VERSION}/bitcoin-${VERSION}-x86_64-linux-gnu.tar.gz" + wget -q "https://bitcoincore.org/bin/bitcoin-core-${VERSION}/SHA256SUMS" + wget -q "https://bitcoincore.org/bin/bitcoin-core-${VERSION}/SHA256SUMS.asc" + + # 2. Fetch builder keys + git clone --depth 1 https://github.com/bitcoin-core/guix.sigs + + # 3. Setup temporary GPG environment + export GNUPGHOME=$(mktemp -d) + gpg --batch --import guix.sigs/builder-keys/*.gpg + + # 4. Verify the SHA256SUMS signature + gpg --batch --verify --status-fd 1 SHA256SUMS.asc SHA256SUMS > verify.log + + # 5. Check against trusted fingerprints + TRUSTED_KEYS=(${{ inputs.trusted_keys }}) + MATCH=false + for KEY in "${TRUSTED_KEYS[@]}"; do + if grep -q "VALIDSIG .* $KEY" verify.log; then + echo "Verified signature from trusted developer: $KEY" + MATCH=true + break + fi + done + + if [ "$MATCH" != true ]; then + echo "No signatures from the trusted key list were found." + exit 1 + fi + + # 6. Verify checksum and extract + sha256sum --ignore-missing --check SHA256SUMS + tar -xzf bitcoin-${VERSION}-x86_64-linux-gnu.tar.gz + + # 7. Install to path (core v31.0 lists `bitcoin-node` in libexec/) + sudo install -m 0755 -o root -g root -t /usr/local/bin bitcoin-${VERSION}/bin/* + sudo install -m 0755 -o root -g root -t /usr/local/bin bitcoin-${VERSION}/libexec/* + + # Cleanup workspace + rm -rf guix.sigs bitcoin-${VERSION}* SHA256SUMS* verify.log + echo "Bitcoin Core v${VERSION} installed successfully." + + - name: Verify Installation + shell: bash + run: bitcoind --version diff --git a/.github/actions/setup-ubuntu-action/action.yml b/.github/actions/setup-ubuntu-action/action.yml index df1bb2aa..83bdbc26 100644 --- a/.github/actions/setup-ubuntu-action/action.yml +++ b/.github/actions/setup-ubuntu-action/action.yml @@ -17,7 +17,9 @@ runs: zstd \ binutils-dev \ elfutils \ - gcc-multilib + gcc-multilib \ + libcapnp-dev \ + capnproto - name: Cache cargo uses: actions/cache@v3 diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index ec6ec7c4..f84501dc 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -106,10 +106,14 @@ jobs: steps: - uses: actions/checkout@v6 - uses: ./.github/actions/setup-ubuntu-action + - name: Setup Bitcoin + uses: ./.github/actions/install-bitcoin + with: + version: '31.0' - name: Install integration test dependecies run: sudo apt-get install -y nats-server - name: Run tests and integration tests - run: NATS_SERVER_BINARY=$(which nats-server) cargo test --all-features + run: NATS_SERVER_BINARY=$(which nats-server) BITCOIN_NODE_EXE=$(which bitcoin-node) cargo test --all-features ubuntu-test-detect-intermittent-failures: runs-on: ubuntu-latest @@ -117,29 +121,33 @@ jobs: steps: - uses: actions/checkout@v6 - uses: ./.github/actions/setup-ubuntu-action + - name: Setup Bitcoin + uses: ./.github/actions/install-bitcoin + with: + version: '31.0' - name: Install integration test dependecies run: sudo apt-get install -y nats-server # use separate steps for iterations here to easily see on GHA the time it took - name: Iteration 0 - run: NATS_SERVER_BINARY=$(which nats-server) cargo test --all-features + run: NATS_SERVER_BINARY=$(which nats-server) BITCOIN_NODE_EXE=$(which bitcoin-node) cargo test --all-features - name: Iteration 1 - run: NATS_SERVER_BINARY=$(which nats-server) cargo test --all-features + run: NATS_SERVER_BINARY=$(which nats-server) BITCOIN_NODE_EXE=$(which bitcoin-node) cargo test --all-features - name: Iteration 2 - run: NATS_SERVER_BINARY=$(which nats-server) cargo test --all-features + run: NATS_SERVER_BINARY=$(which nats-server) BITCOIN_NODE_EXE=$(which bitcoin-node) cargo test --all-features - name: Iteration 3 - run: NATS_SERVER_BINARY=$(which nats-server) cargo test --all-features + run: NATS_SERVER_BINARY=$(which nats-server) BITCOIN_NODE_EXE=$(which bitcoin-node) cargo test --all-features - name: Iteration 4 - run: NATS_SERVER_BINARY=$(which nats-server) cargo test --all-features + run: NATS_SERVER_BINARY=$(which nats-server) BITCOIN_NODE_EXE=$(which bitcoin-node) cargo test --all-features - name: Iteration 5 - run: NATS_SERVER_BINARY=$(which nats-server) cargo test --all-features + run: NATS_SERVER_BINARY=$(which nats-server) BITCOIN_NODE_EXE=$(which bitcoin-node) cargo test --all-features - name: Iteration 6 - run: NATS_SERVER_BINARY=$(which nats-server) cargo test --all-features + run: NATS_SERVER_BINARY=$(which nats-server) BITCOIN_NODE_EXE=$(which bitcoin-node) cargo test --all-features - name: Iteration 7 - run: NATS_SERVER_BINARY=$(which nats-server) cargo test --all-features + run: NATS_SERVER_BINARY=$(which nats-server) BITCOIN_NODE_EXE=$(which bitcoin-node) cargo test --all-features - name: Iteration 8 - run: NATS_SERVER_BINARY=$(which nats-server) cargo test --all-features + run: NATS_SERVER_BINARY=$(which nats-server) BITCOIN_NODE_EXE=$(which bitcoin-node) cargo test --all-features - name: Iteration 9 - run: NATS_SERVER_BINARY=$(which nats-server) cargo test --all-features + run: NATS_SERVER_BINARY=$(which nats-server) BITCOIN_NODE_EXE=$(which bitcoin-node) cargo test --all-features ubuntu-docs-tools: needs: ubuntu-build diff --git a/Cargo.lock b/Cargo.lock index abae9cf1..5f07e1e6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -109,7 +109,7 @@ dependencies = [ "regex", "rustls-native-certs", "rustls-pki-types", - "rustls-webpki 0.103.10", + "rustls-webpki", "serde", "serde_json", "serde_repr", @@ -217,6 +217,24 @@ dependencies = [ "serde", ] +[[package]] +name = "bitcoind" +version = "0.38.0" +source = "git+https://github.com/0xb10c/corepc?rev=8798e7f9c240c626dc3aa61adadeaf42f8221e40#8798e7f9c240c626dc3aa61adadeaf42f8221e40" +dependencies = [ + "anyhow", + "bitcoin_hashes", + "bitreq", + "corepc-client", + "flate2", + "log", + "serde_json", + "tar", + "tempfile", + "which", + "zip", +] + [[package]] name = "bitflags" version = "1.3.2" @@ -231,11 +249,11 @@ checksum = "843867be96c8daad0d758b57df9392b6d8d271134fce549de6ce169ff98a92af" [[package]] name = "bitreq" -version = "0.3.4" -source = "git+https://github.com/0xb10c/corepc?rev=9e4eb4d7181575ff75f9e0a6997b091bd5023b1d#9e4eb4d7181575ff75f9e0a6997b091bd5023b1d" +version = "0.3.5" +source = "git+https://github.com/0xb10c/corepc?rev=8798e7f9c240c626dc3aa61adadeaf42f8221e40#8798e7f9c240c626dc3aa61adadeaf42f8221e40" dependencies = [ - "rustls 0.21.12", - "rustls-webpki 0.101.7", + "rustls", + "rustls-webpki", "serde", "serde_json", "webpki-roots", @@ -300,6 +318,46 @@ dependencies = [ "serde", ] +[[package]] +name = "capnp" +version = "0.25.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ae0593da254d02d0e69525b5eec7a59586753aa3bd2e54842eca33c6786330c" +dependencies = [ + "embedded-io", +] + +[[package]] +name = "capnp-futures" +version = "0.25.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73b69dfddccc57844f9a90f9d72b44b97c326914851ea94fb7da40ef9cad6e8d" +dependencies = [ + "capnp", + "futures-channel", + "futures-util", +] + +[[package]] +name = "capnp-rpc" +version = "0.25.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07ccca6d26009f4d6c12b741994f33b421da613b5dcf461508e236b53ef862f1" +dependencies = [ + "capnp", + "capnp-futures", + "futures", +] + +[[package]] +name = "capnpc" +version = "0.25.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6cdfa6b0df161a71201367910265b97180541ecdb48bd08e05ef8694c295d1f" +dependencies = [ + "capnp", +] + [[package]] name = "cargo-platform" version = "0.1.3" @@ -440,8 +498,8 @@ checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" [[package]] name = "corepc-client" -version = "0.12.0" -source = "git+https://github.com/0xb10c/corepc?rev=9e4eb4d7181575ff75f9e0a6997b091bd5023b1d#9e4eb4d7181575ff75f9e0a6997b091bd5023b1d" +version = "0.13.0" +source = "git+https://github.com/0xb10c/corepc?rev=8798e7f9c240c626dc3aa61adadeaf42f8221e40#8798e7f9c240c626dc3aa61adadeaf42f8221e40" dependencies = [ "bitcoin", "corepc-types", @@ -451,28 +509,10 @@ dependencies = [ "serde_json", ] -[[package]] -name = "corepc-node" -version = "0.12.0" -source = "git+https://github.com/0xb10c/corepc?rev=9e4eb4d7181575ff75f9e0a6997b091bd5023b1d#9e4eb4d7181575ff75f9e0a6997b091bd5023b1d" -dependencies = [ - "anyhow", - "bitcoin_hashes", - "bitreq", - "corepc-client", - "flate2", - "log", - "serde_json", - "tar", - "tempfile", - "which", - "zip", -] - [[package]] name = "corepc-types" version = "0.12.0" -source = "git+https://github.com/0xb10c/corepc?rev=9e4eb4d7181575ff75f9e0a6997b091bd5023b1d#9e4eb4d7181575ff75f9e0a6997b091bd5023b1d" +source = "git+https://github.com/0xb10c/corepc?rev=8798e7f9c240c626dc3aa61adadeaf42f8221e40#8798e7f9c240c626dc3aa61adadeaf42f8221e40" dependencies = [ "bitcoin", "serde", @@ -635,6 +675,12 @@ version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a26ae43d7bcc3b814de94796a5e736d4029efb0ee900c12e2d54c993ad1a1e07" +[[package]] +name = "embedded-io" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9eb1aa714776b75c7e67e1da744b81a129b3ff919c8712b5e1b32252c1f07cc7" + [[package]] name = "equivalent" version = "1.0.1" @@ -1045,6 +1091,17 @@ dependencies = [ "hashbrown", ] +[[package]] +name = "ipc-extractor" +version = "0.1.0" +dependencies = [ + "capnp", + "capnp-rpc", + "capnpc", + "shared", + "tokio-util", +] + [[package]] name = "is_terminal_polyfill" version = "1.70.2" @@ -1078,8 +1135,8 @@ dependencies = [ [[package]] name = "jsonrpc" -version = "0.19.0" -source = "git+https://github.com/0xb10c/corepc?rev=9e4eb4d7181575ff75f9e0a6997b091bd5023b1d#9e4eb4d7181575ff75f9e0a6997b091bd5023b1d" +version = "0.20.0" +source = "git+https://github.com/0xb10c/corepc?rev=8798e7f9c240c626dc3aa61adadeaf42f8221e40#8798e7f9c240c626dc3aa61adadeaf42f8221e40" dependencies = [ "base64 0.22.1", "bitreq", @@ -1691,24 +1748,14 @@ dependencies = [ [[package]] name = "rustls" -version = "0.21.12" +version = "0.23.40" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3f56a14d1f48b391359b22f731fd4bd7e43c97f3c50eee276f3aa09c94784d3e" -dependencies = [ - "ring", - "rustls-webpki 0.101.7", - "sct", -] - -[[package]] -name = "rustls" -version = "0.23.23" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "47796c98c480fce5406ef69d1c76378375492c3b0a0de587be0c1d9feb12f395" +checksum = "ef86cd5876211988985292b91c96a8f2d298df24e75989a43a3c73f2d4d8168b" dependencies = [ "once_cell", + "ring", "rustls-pki-types", - "rustls-webpki 0.102.8", + "rustls-webpki", "subtle", "zeroize", ] @@ -1736,35 +1783,15 @@ dependencies = [ [[package]] name = "rustls-webpki" -version = "0.101.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b6275d1ee7a1cd780b64aca7726599a1dbc893b1e64144529e55c3c2f745765" -dependencies = [ - "ring", - "untrusted", -] - -[[package]] -name = "rustls-webpki" -version = "0.102.8" +version = "0.103.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "64ca1bc8749bd4cf37b5ce386cc146580777b4e8572c7b97baf22c83f444bee9" +checksum = "61c429a8649f110dddef65e2a5ad240f747e85f7758a6bccc7e5777bd33f756e" dependencies = [ "ring", "rustls-pki-types", "untrusted", ] -[[package]] -name = "rustls-webpki" -version = "0.103.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "df33b2b81ac578cabaf06b89b0631153a3f416b0a886e8a7a1707fb51abbd1ef" -dependencies = [ - "rustls-pki-types", - "untrusted", -] - [[package]] name = "rustversion" version = "1.0.21" @@ -1792,16 +1819,6 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" -[[package]] -name = "sct" -version = "0.7.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da046153aa2352493d6cb7da4b6e5c0c057d8a1d0a9aa8560baffdd945acd414" -dependencies = [ - "ring", - "untrusted", -] - [[package]] name = "secp256k1" version = "0.29.0" @@ -1934,9 +1951,9 @@ dependencies = [ "async-nats", "base32", "bitcoin", + "bitcoind", "clap", "corepc-client", - "corepc-node", "futures", "hex", "lazy_static", @@ -2196,7 +2213,7 @@ version = "0.26.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8e727b36a1a0e8b74c376ac2211e40c2c8af09fb4013c60d910495810f008e9b" dependencies = [ - "rustls 0.23.23", + "rustls", "tokio", ] @@ -2225,9 +2242,9 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.7.15" +version = "0.7.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "66a539a9ad6d5d281510d5bd368c973d636c02dbf8a67300bfb6b950696ad7df" +checksum = "9ae9cec805b01e8fc3fd2fe289f89149a9b66dd16786abd8b19cfa7b48cb0098" dependencies = [ "bytes", "futures-core", @@ -2435,9 +2452,12 @@ dependencies = [ [[package]] name = "webpki-roots" -version = "0.25.4" +version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f20c57d8d7db6d3b86154206ae5d8fba62dd39573114de97c2cb0578251f8e1" +checksum = "52f5ee44c96cf55f1b349600768e3ece3a8f26010c05265ab73f945bb1a2eb9d" +dependencies = [ + "rustls-pki-types", +] [[package]] name = "websocket" diff --git a/Cargo.toml b/Cargo.toml index bbff9548..b3e2c047 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,6 +7,7 @@ members = [ "extractors/rpc", "extractors/p2p", "extractors/log", + "extractors/ipc", "tools/logger", "tools/websocket", "tools/metrics", diff --git a/README.md b/README.md index 8f10c5e8..23508545 100644 --- a/README.md +++ b/README.md @@ -24,6 +24,8 @@ selected P2P measurements as events into a NATS pub-sub queue. The `log-extractor` publishes them parsed `debug.log` log messages as events to NATS. +And finally an experimental `ipc-extractor` which periodically fetch data from a `bitcoin-node` binary through a UNIX socket created with the `-ipcbind` option. Publishes to NATS as IPC events. + The tools are written in Rust (or any other language that supports NATS and protobuf). They subscribe to the NATS server. For example, the `logger` tool simply prints out all messages that it receives, the `metrics` tool produces prometheus @@ -36,21 +38,27 @@ messages. For other languages, types can be generated directly from the Protobuf messages ┌─────────────┐ ┌───────────────┐ ┌─────────┐ ┌──────────────────────┐ │ ├────► ebpf-extractor├─────┤ │ │ │ -│ │ └───────────────┘ │ │ │ Tools │ -│ │ │ │ │ │ -│ │ ┌───────────────┐ │ ├──────┼──►logger │ +│ │ └───────────────┘ │ │ │ │ +│ │ │ │ │ Tools │ +│ │ ┌───────────────┐ │ │ │ │ │ ├────► rpc-extractor │─────┤ │ │ │ -│ Bitcoin │ └───────────────┘ │ NATS.io ├──────┼──►metrics │ +│ │ └───────────────┘ │ ├──────┼──►logger │ │ │ │ │ │ │ -│ Node │ ┌───────────────┐ │ PUB-SUB ├──────┼──►websocket │ +│ Bitcoin │ ┌───────────────┐ │ NATS.io ├──────┼──►metrics │ │ ├────► p2p-extractor │─────┤ │ │ │ -│ │ └───────────────┘ │ ├──────┼──►connectivity-check │ +│ Node │ └───────────────┘ │ PUB-SUB ├──────┼──►websocket │ │ │ │ │ │ │ -│ │ ┌───────────────┐ │ │ │ ... │ +│ │ ┌───────────────┐ │ ├──────┼──►connectivity-check │ │ ├────► log-extractor │─────┤ │ │ │ +│ │ └───────────────┘ │ │ │ ... │ +│ │ │ │ │ │ +│ │ ┌───────────────┐ │ │ │ │ +│ ├────► ipc-extractor ├─────┤ │ │ │ └─────────────┘ └───────────────┘ └─────────┘ └──────────────────────┘ (edit on asciiflow.com) + + ``` [nats.io]: https://nats.io @@ -69,6 +77,7 @@ NATS server. Each extractor connects to a different interface: | rpc | periodically fetches RPC for events | [extractors/rpc/](extractors/rpc) | | p2p |Bitcoin P2P events from an inbound node| [extractors/p2p/](extractors/p2p) | | log | parses the debug.log of a node | [extractors/log/](extractors/log) | +| ipc | Fetch data over via IPC socket (experimental) | [extractors/ipc/](extractors/ipc) | ## Tools @@ -101,7 +110,7 @@ I'll do my best to add more documentation. To run the integration tests, run with the feature `nats_integration_tests` and `node_integration_tests`. If you are not using the nix-shell, you need to set the -`NATS_SERVER_BINARY` to the path to your `nats-server` binary. Addtionally, +`NATS_SERVER_BINARY` to the path to your `nats-server` binary and `BITCOIN_NODE_EXE` to the path to a `bitcoin-node` binary. Additionally, `BITCOIND_EXE` can be set to a custom `bitcoind` binary. By default, a recent release will be downloaded and used if `BITCOIND_SKIP_DOWNLOAD` is unset. diff --git a/extractors/ipc/Cargo.toml b/extractors/ipc/Cargo.toml new file mode 100644 index 00000000..bf84227b --- /dev/null +++ b/extractors/ipc/Cargo.toml @@ -0,0 +1,23 @@ +[package] +name = "ipc-extractor" +version = "0.1.0" +edition = "2024" + +[dependencies] +capnp = "0.25.1" +capnp-rpc = "0.25.0" +shared = { path = "../../shared" } +tokio-util = { version = "0.7.18", features = ["compat"] } + +[features] +# Treat warnings as a build error. +strict = [] + +# Run integration tests needing a NATS server. +nats_integration_tests = [] + +# Run integration tests needing a Bitcoin Core node. +node_integration_tests = [] + +[build-dependencies] +capnpc = "0.25.0" diff --git a/extractors/ipc/README.md b/extractors/ipc/README.md new file mode 100644 index 00000000..0a8657f2 --- /dev/null +++ b/extractors/ipc/README.md @@ -0,0 +1,34 @@ +# `ipc` extractor + +> publishes data fetched from IPC + +The peer-observer ipc-extractor periodically queries data from the Bitcoin Core IPC interface and publishes the results as events into a NATS pub-sub queue + +## Usage + +``` +$ cargo run --bin ipc-extractor -- --help +The peer-observer ipc-extractor periodically queries data from the Bitcoin Core IPC endpoint and publishes the results as events into a NATS pub-sub queue + +Usage: ipc-extractor [OPTIONS] --ipc-socket-path + +Options: + -i, --ipc-socket-path + A path to an UNIX socket to read IPC data from + -a, --nats-address
+ The NATS server address the extractor/tool should connect and subscribe to [default: 127.0.0.1:4222] + -u, --nats-username + The NATS username the extractor/tool should try to authentificate to the NATS server with + -p, --nats-password + The NATS password the extractor/tool should try to authentificate to the NATS server with + -f, --nats-password-file + A path to a file containing a password the extractor/tool should try to authentificate to the NATS server with + -l, --log-level + The log level the extractor should run with. Valid log levels are "trace", "debug", "info", "warn", "error". See https://docs.rs/log/latest/log/enum.Level.html [default: DEBUG] + --query-interval + Interval (in seconds) in which to query from the Bitcoin Core IPC interface [default: 10] + -h, --help + Print help + -V, --version + Print version +``` diff --git a/extractors/ipc/build.rs b/extractors/ipc/build.rs new file mode 100644 index 00000000..827cc9e9 --- /dev/null +++ b/extractors/ipc/build.rs @@ -0,0 +1,18 @@ +fn main() -> Result<(), Box> { + capnpc::CompilerCommand::new() + .file("capnp/mp/proxy.capnp") + .file("capnp/common.capnp") + .file("capnp/echo.capnp") + .file("capnp/mining.capnp") + .file("capnp/init.capnp") + .default_parent_module(vec![String::from("ipc")]) + .run()?; + + println!("cargo:rerun-if-changed=capnp/mp/proxy.capnp"); + println!("cargo:rerun-if-changed=capnp/common.capnp"); + println!("cargo:rerun-if-changed=capnp/echo.capnp"); + println!("cargo:rerun-if-changed=capnp/mining.capnp"); + println!("cargo:rerun-if-changed=capnp/init.capnp"); + + Ok(()) +} diff --git a/extractors/ipc/capnp/common.capnp b/extractors/ipc/capnp/common.capnp new file mode 100644 index 00000000..97d1fcec --- /dev/null +++ b/extractors/ipc/capnp/common.capnp @@ -0,0 +1,16 @@ +# Copyright (c) 2024-present The Bitcoin Core developers +# Distributed under the MIT software license, see the accompanying +# file COPYING or http://www.opensource.org/licenses/mit-license.php. + +@0xcd2c6232cb484a28; + +using Cxx = import "/capnp/c++.capnp"; +$Cxx.namespace("ipc::capnp::messages"); + +using Proxy = import "./mp/proxy.capnp"; +$Proxy.includeTypes("ipc/capnp/common-types.h"); + +struct BlockRef $Proxy.wrap("interfaces::BlockRef") { + hash @0 :Data; + height @1 :Int32; +} \ No newline at end of file diff --git a/extractors/ipc/capnp/echo.capnp b/extractors/ipc/capnp/echo.capnp new file mode 100644 index 00000000..61cb5c11 --- /dev/null +++ b/extractors/ipc/capnp/echo.capnp @@ -0,0 +1,17 @@ +# Copyright (c) 2021-present The Bitcoin Core developers +# Distributed under the MIT software license, see the accompanying +# file COPYING or http://www.opensource.org/licenses/mit-license.php. + +@0x888b4f7f51e691f7; + +using Cxx = import "/capnp/c++.capnp"; +$Cxx.namespace("ipc::capnp::messages"); + +using Proxy = import "./mp/proxy.capnp"; +$Proxy.include("interfaces/echo.h"); +$Proxy.includeTypes("ipc/capnp/echo-types.h"); + +interface Echo $Proxy.wrap("interfaces::Echo") { + destroy @0 (context :Proxy.Context) -> (); + echo @1 (context :Proxy.Context, echo: Text) -> (result :Text); +} \ No newline at end of file diff --git a/extractors/ipc/capnp/init.capnp b/extractors/ipc/capnp/init.capnp new file mode 100644 index 00000000..2d653635 --- /dev/null +++ b/extractors/ipc/capnp/init.capnp @@ -0,0 +1,26 @@ +# Copyright (c) 2021-present The Bitcoin Core developers +# Distributed under the MIT software license, see the accompanying +# file COPYING or http://www.opensource.org/licenses/mit-license.php. + +@0xf2c5cfa319406aa6; + +using Cxx = import "/capnp/c++.capnp"; +$Cxx.namespace("ipc::capnp::messages"); + +using Proxy = import "./mp/proxy.capnp"; +$Proxy.include("interfaces/echo.h"); +$Proxy.include("interfaces/init.h"); +$Proxy.include("interfaces/mining.h"); +$Proxy.includeTypes("ipc/capnp/init-types.h"); + +using Echo = import "echo.capnp"; +using Mining = import "mining.capnp"; + +interface Init $Proxy.wrap("interfaces::Init") { + construct @0 (threadMap: Proxy.ThreadMap) -> (threadMap :Proxy.ThreadMap); + makeEcho @1 (context :Proxy.Context) -> (result :Echo.Echo); + makeMining @3 (context :Proxy.Context) -> (result :Mining.Mining); + + # DEPRECATED: no longer supported; server returns an error. + makeMiningOld2 @2 () -> (); +} \ No newline at end of file diff --git a/extractors/ipc/capnp/mining.capnp b/extractors/ipc/capnp/mining.capnp new file mode 100644 index 00000000..4f7bdf14 --- /dev/null +++ b/extractors/ipc/capnp/mining.capnp @@ -0,0 +1,67 @@ +# Copyright (c) 2024-present The Bitcoin Core developers +# Distributed under the MIT software license, see the accompanying +# file COPYING or http://www.opensource.org/licenses/mit-license.php. + +@0xc77d03df6a41b505; + +using Cxx = import "/capnp/c++.capnp"; +$Cxx.namespace("ipc::capnp::messages"); + +using Common = import "common.capnp"; +using Proxy = import "./mp/proxy.capnp"; +$Proxy.include("interfaces/mining.h"); +$Proxy.includeTypes("ipc/capnp/mining-types.h"); + +const maxMoney :Int64 = 2100000000000000; +const maxDouble :Float64 = 1.7976931348623157e308; +const defaultBlockReservedWeight :UInt32 = 8000; +const defaultCoinbaseOutputMaxAdditionalSigops :UInt32 = 400; + +interface Mining $Proxy.wrap("interfaces::Mining") { + isTestChain @0 (context :Proxy.Context) -> (result: Bool); + isInitialBlockDownload @1 (context :Proxy.Context) -> (result: Bool); + getTip @2 (context :Proxy.Context) -> (result: Common.BlockRef, hasResult: Bool); + waitTipChanged @3 (context :Proxy.Context, currentTip: Data, timeout: Float64 = .maxDouble) -> (result: Common.BlockRef); + createNewBlock @4 (context :Proxy.Context, options: BlockCreateOptions, cooldown: Bool = true) -> (result: BlockTemplate); + checkBlock @5 (context :Proxy.Context, block: Data, options: BlockCheckOptions) -> (reason: Text, debug: Text, result: Bool); + interrupt @6 () -> (); +} + +interface BlockTemplate $Proxy.wrap("interfaces::BlockTemplate") { + destroy @0 (context :Proxy.Context) -> (); + getBlockHeader @1 (context: Proxy.Context) -> (result: Data); + getBlock @2 (context: Proxy.Context) -> (result: Data); + getTxFees @3 (context: Proxy.Context) -> (result: List(Int64)); + getTxSigops @4 (context: Proxy.Context) -> (result: List(Int64)); + getCoinbaseTx @5 (context: Proxy.Context) -> (result: CoinbaseTx); + getCoinbaseMerklePath @6 (context: Proxy.Context) -> (result: List(Data)); + submitSolution @7 (context: Proxy.Context, version: UInt32, timestamp: UInt32, nonce: UInt32, coinbase :Data) -> (result: Bool); + waitNext @8 (context: Proxy.Context, options: BlockWaitOptions) -> (result: BlockTemplate); + interruptWait @9() -> (); +} + +struct BlockCreateOptions $Proxy.wrap("node::BlockCreateOptions") { + useMempool @0 :Bool = true $Proxy.name("use_mempool"); + blockReservedWeight @1 :UInt64 = .defaultBlockReservedWeight $Proxy.name("block_reserved_weight"); + coinbaseOutputMaxAdditionalSigops @2 :UInt64 = .defaultCoinbaseOutputMaxAdditionalSigops $Proxy.name("coinbase_output_max_additional_sigops"); +} + +struct BlockWaitOptions $Proxy.wrap("node::BlockWaitOptions") { + timeout @0 : Float64 = .maxDouble $Proxy.name("timeout"); + feeThreshold @1 : Int64 = .maxMoney $Proxy.name("fee_threshold"); +} + +struct BlockCheckOptions $Proxy.wrap("node::BlockCheckOptions") { + checkMerkleRoot @0 :Bool = true $Proxy.name("check_merkle_root"); + checkPow @1 :Bool = true $Proxy.name("check_pow"); +} + +struct CoinbaseTx $Proxy.wrap("node::CoinbaseTx") { + version @0 :UInt32 $Proxy.name("version"); + sequence @1 :UInt32 $Proxy.name("sequence"); + scriptSigPrefix @2 :Data $Proxy.name("script_sig_prefix"); + witness @3 :Data $Proxy.name("witness"); + blockRewardRemaining @4 :Int64 $Proxy.name("block_reward_remaining"); + requiredOutputs @5 :List(Data) $Proxy.name("required_outputs"); + lockTime @6 :UInt32 $Proxy.name("lock_time"); +} \ No newline at end of file diff --git a/extractors/ipc/capnp/mp/proxy.capnp b/extractors/ipc/capnp/mp/proxy.capnp new file mode 100644 index 00000000..85178662 --- /dev/null +++ b/extractors/ipc/capnp/mp/proxy.capnp @@ -0,0 +1,65 @@ +# Copyright (c) The Bitcoin Core developers +# Distributed under the MIT software license, see the accompanying +# file COPYING or http://www.opensource.org/licenses/mit-license.php. + +@0xcc316e3f71a040fb; + +using Cxx = import "/capnp/c++.capnp"; +$Cxx.namespace("mp"); + +annotation include(file): Text; +annotation includeTypes(file): Text; +# Extra include paths to add to generated files. + +annotation wrap(interface, struct): Text; +# Wrap capnp interface generating ProxyClient / ProxyServer C++ classes that +# forward calls to a C++ interface with same methods and parameters. Text +# string should be the name of the C++ interface. +# If applied to struct rather than an interface, this will generate a ProxyType +# struct with get methods for introspection and copying fields between C++ and +# capnp structs. + +annotation count(param, struct, interface): Int32; +# Indicate how many C++ method parameters there are corresponding to one capnp +# parameter (default is 1). If not 1, multiple C++ method arguments will be +# condensed into a single capnp parameter by the client and then expanded by +# the server by CustomReadField/CustomBuildField overloads which need to be +# provided separately. An example would be a capnp Text parameter initialized +# from C++ char* and size arguments. Can be 0 to fill an implicit capnp +# parameter from client or server side context. If annotation is applied to an +# interface or struct type it will apply to all parameters of that type. + +annotation exception(param): Text; +# Indicate that a result parameter corresponds to a C++ exception. Text string +# should be the name of a C++ exception type that the generated server class +# will catch and the client class will rethrow. + +annotation name(field, method): Text; +# Name of the C++ method or field corresponding to a capnp method or field. + +annotation skip(field): Void; +# Synonym for count(0). + +interface ThreadMap $count(0) { + # Interface letting clients control which thread a method call should + # execute on. Clients create and name threads and pass the thread handle as + # a call parameter. + makeThread @0 (name :Text) -> (result :Thread); +} + +interface Thread { + # Thread handle returned by makeThread corresponding to one server thread. + + getName @0 () -> (result: Text); +} + +struct Context $count(0) { + # Execution context passed as a parameter from the client class to the server class. + + thread @0 : Thread; + # Handle of the server thread the current method call should execute on. + + callbackThread @1 : Thread; + # Handle of the client thread that is calling the current method, and that + # any callbacks made by the server thread should be made on. +} \ No newline at end of file diff --git a/extractors/ipc/src/error.rs b/extractors/ipc/src/error.rs new file mode 100644 index 00000000..c9ec54cf --- /dev/null +++ b/extractors/ipc/src/error.rs @@ -0,0 +1,78 @@ +use shared::async_nats::{self, ConnectErrorKind}; +use shared::log::SetLoggerError; +use std::error; +use std::fmt; +use std::io; +use std::time::SystemTimeError; + +#[derive(Debug)] +pub enum RuntimeError { + SetLogger(SetLoggerError), + Io(io::Error), + IpcCall(capnp::Error), + SystemTime(SystemTimeError), + NatsConnect(async_nats::error::Error), + NatsPublish(async_nats::error::Error), +} + +impl fmt::Display for RuntimeError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + RuntimeError::SetLogger(e) => write!(f, "set logger error {}", e), + RuntimeError::Io(e) => write!(f, "IO error {}", e), + RuntimeError::IpcCall(e) => write!(f, "IPC error {}", e), + RuntimeError::SystemTime(e) => write!(f, "system time error {}", e), + RuntimeError::NatsConnect(e) => write!(f, "NATS connection error {}", e), + RuntimeError::NatsPublish(e) => write!(f, "NATS publish error {}", e), + } + } +} + +impl error::Error for RuntimeError { + fn source(&self) -> Option<&(dyn error::Error + 'static)> { + match *self { + RuntimeError::SetLogger(ref e) => Some(e), + RuntimeError::Io(ref e) => Some(e), + RuntimeError::IpcCall(ref e) => Some(e), + RuntimeError::SystemTime(ref e) => Some(e), + RuntimeError::NatsConnect(ref e) => Some(e), + RuntimeError::NatsPublish(ref e) => Some(e), + } + } +} + +impl From for RuntimeError { + fn from(e: capnp::Error) -> Self { + RuntimeError::IpcCall(e) + } +} + +impl From for RuntimeError { + fn from(e: SetLoggerError) -> Self { + RuntimeError::SetLogger(e) + } +} + +impl From for RuntimeError { + fn from(e: io::Error) -> Self { + RuntimeError::Io(e) + } +} + +impl From for RuntimeError { + fn from(e: SystemTimeError) -> Self { + RuntimeError::SystemTime(e) + } +} + +impl From> for RuntimeError { + fn from(e: async_nats::error::Error) -> Self { + RuntimeError::NatsConnect(e) + } +} + +impl From> for RuntimeError { + fn from(e: async_nats::error::Error) -> Self { + RuntimeError::NatsPublish(e) + } +} diff --git a/extractors/ipc/src/ipc.rs b/extractors/ipc/src/ipc.rs new file mode 100644 index 00000000..8fca5f86 --- /dev/null +++ b/extractors/ipc/src/ipc.rs @@ -0,0 +1,85 @@ +#[allow(dead_code)] +mod generated { + capnp::generated_code!(pub mod proxy_capnp, "capnp/mp/proxy_capnp.rs"); + capnp::generated_code!(pub mod common_capnp, "capnp/common_capnp.rs"); + capnp::generated_code!(pub mod mining_capnp, "capnp/mining_capnp.rs"); + capnp::generated_code!(pub mod echo_capnp, "capnp/echo_capnp.rs"); + capnp::generated_code!(pub mod init_capnp, "capnp/init_capnp.rs"); +} +use generated::*; + +use init_capnp::init::Client as InitClient; +use mining_capnp::mining::Client as MiningClient; +use proxy_capnp::thread::Client as ThreadClient; + +use capnp_rpc::{RpcSystem, rpc_twoparty_capnp, twoparty}; +use shared::{ + futures::AsyncReadExt, + protobuf::ipc_extractor::BlockTip, + tokio::{self, net::UnixStream, task::JoinHandle}, +}; + +use crate::error::RuntimeError; + +pub struct IpcClient { + pub mining: MiningClient, + pub thread: ThreadClient, + pub rpc_task: JoinHandle>, +} + +impl IpcClient { + pub async fn init(stream: UnixStream) -> Result { + let (reader, writer) = tokio_util::compat::TokioAsyncReadCompatExt::compat(stream).split(); + let network = Box::new(twoparty::VatNetwork::new( + reader, + writer, + rpc_twoparty_capnp::Side::Client, + Default::default(), + )); + + let mut rpc_system = RpcSystem::new(network, None); + let init: InitClient = rpc_system.bootstrap(rpc_twoparty_capnp::Side::Server); + let rpc_task = tokio::task::spawn_local(rpc_system); + + let response = init.construct_request().send().promise.await?; + let thread_map = response.get()?.get_thread_map()?; + + let response = thread_map.make_thread_request().send().promise.await?; + let thread = response.get()?.get_result()?; + + let mut req = init.make_mining_request(); + set_context(req.get().get_context()?, &thread); + + let response = req.send().promise.await?; + let mining = response.get()?.get_result()?; + + Ok(Self { + rpc_task, + thread, + mining, + }) + } + + pub async fn get_tip(&self) -> Result, RuntimeError> { + let mut req = self.mining.get_tip_request(); + set_context(req.get().get_context()?, &self.thread); + + let response = req.send().promise.await?; + + let has_result = response.get()?.get_has_result(); + if !has_result { + return Ok(None); + } + + let tip = response.get()?.get_result()?; + let height = tip.get_height(); + let hash = tip.get_hash()?.to_vec(); + + Ok(Some(BlockTip { height, hash })) + } +} + +fn set_context(mut ctx: proxy_capnp::context::Builder<'_>, thread: &ThreadClient) { + ctx.set_thread(thread.clone()); + ctx.set_callback_thread(thread.clone()); +} diff --git a/extractors/ipc/src/lib.rs b/extractors/ipc/src/lib.rs new file mode 100644 index 00000000..80d7a934 --- /dev/null +++ b/extractors/ipc/src/lib.rs @@ -0,0 +1,147 @@ +use shared::{ + async_nats, + clap::{self, Parser}, + log, + nats_subjects::Subject, + nats_util::{self, NatsArgs}, + prost::Message, + protobuf::{ + event::{Event, event::PeerObserverEvent}, + ipc_extractor, + }, + tokio::{ + self, + net::UnixStream, + sync::watch, + time::{self, Duration}, + }, +}; +use std::io; + +mod error; +use error::RuntimeError; + +mod ipc; +use ipc::IpcClient; + +/// The peer-observer ipc-extractor periodically queries data from the +/// Bitcoin Core IPC interface and publishes the results as events into +/// a NATS pub-sub queue. +#[derive(Parser, Debug)] +#[command(version, about, long_about = None)] +pub struct Args { + /// Arguments for the connection to the NATS server. + #[command(flatten)] + pub nats: nats_util::NatsArgs, + + /// The log level the extractor should run with. Valid log levels are "trace", + /// "debug", "info", "warn", "error". See https://docs.rs/log/latest/log/enum.Level.html. + #[arg(short, long, default_value_t = log::Level::Debug)] + pub log_level: log::Level, + + /// A path to an UNIX socket to read IPC data from. + #[arg(short, long)] + pub ipc_socket_path: String, + + /// Interval (in seconds) in which to query from the Bitcoin Core IPC interface. + #[arg(long, default_value_t = 10)] + pub query_interval: u64, +} + +impl Args { + #[allow(clippy::too_many_arguments)] + pub fn new( + nats: NatsArgs, + log_level: log::Level, + ipc_socket_path: String, + query_interval: u64, + ) -> Args { + Self { + nats, + log_level, + ipc_socket_path, + query_interval, + } + } +} + +pub async fn run(args: Args, mut shutdown_rx: watch::Receiver) -> Result<(), RuntimeError> { + let nats_client = nats_util::prepare_connection(&args.nats)? + .connect(&args.nats.address) + .await?; + log::info!("Connected to NATS server at {}", &args.nats.address); + + let stream = UnixStream::connect(&args.ipc_socket_path) + .await + .map_err(|e| { + io::Error::new( + e.kind(), + format!( + "could not connect to IPC socket at --ipc-socket-path '{}': {}", + &args.ipc_socket_path, e + ), + ) + })?; + log::info!("Connected to IPC socket at {}", &args.ipc_socket_path); + + let ipc_session = IpcClient::init(stream).await?; + + let duration_sec = Duration::from_secs(args.query_interval); + let mut interval = time::interval(duration_sec); + log::info!( + "Querying the Bitcoin Core IPC interface every {:?}.", + duration_sec + ); + + loop { + tokio::select! { + _ = interval.tick() => { + if ipc_session.rpc_task.is_finished() { + log::error!("Lost connection to IPC socket. Exiting."); + break; + } + + if let Err(e) = get_tip(&ipc_session, &nats_client).await { + log::error!("Could not fetch and publish 'BlockTip': {}", e) + } + } + res = shutdown_rx.changed() => { + match res { + Ok(_) => { + if *shutdown_rx.borrow() { + log::info!("ipc_extractor received shutdown signal."); + ipc_session.rpc_task.abort(); + break; + } + } + Err(_) => { + // all senders dropped -> treat as shutdown + log::warn!("The shutdown notification sender was dropped. Shutting down."); + ipc_session.rpc_task.abort(); + break; + } + } + } + } + } + Ok(()) +} + +async fn get_tip( + ipc_client: &IpcClient, + nats_client: &async_nats::Client, +) -> Result<(), RuntimeError> { + let tip = match ipc_client.get_tip().await? { + Some(t) => t, + None => return Ok(()), // the node has no tip loaded yet, skip NATS publish + }; + + let proto = Event::new(PeerObserverEvent::IpcExtractor(ipc_extractor::Ipc { + ipc_event: Some(ipc_extractor::ipc::IpcEvent::BlockTip(tip)), + }))?; + + nats_client + .publish(Subject::Ipc.to_string(), proto.encode_to_vec().into()) + .await?; + Ok(()) +} diff --git a/extractors/ipc/src/main.rs b/extractors/ipc/src/main.rs new file mode 100644 index 00000000..970e41ad --- /dev/null +++ b/extractors/ipc/src/main.rs @@ -0,0 +1,40 @@ +use ipc_extractor::Args; +use shared::{ + clap::Parser, + log, simple_logger, + tokio::{self, signal, sync::watch, task::LocalSet}, +}; + +#[tokio::main(flavor = "current_thread")] +async fn main() { + let args = Args::parse(); + + if let Err(e) = simple_logger::init_with_level(args.log_level) { + eprintln!("ipc extractor error: {}", e); + } + + LocalSet::new() + .run_until(async move { + let (shutdown_tx, shutdown_rx) = watch::channel(false); + let run_future = ipc_extractor::run(args, shutdown_rx); + tokio::pin!(run_future); + + tokio::select! { + _ = signal::ctrl_c() => { + log::info!("Received Ctrl+C. Stopping..."); + let _ = shutdown_tx.send(true); + match run_future.await { + Ok(_) => log::info!("ipc-extractor task completed."), + Err(e) => log::error!("ipc-extractor task failed: {e}"), + } + } + result = &mut run_future => { + match result { + Ok(_) => log::info!("ipc-extractor task completed."), + Err(e) => log::error!("ipc-extractor task failed: {e}"), + } + } + } + }) + .await; +} diff --git a/extractors/ipc/tests/integration.rs b/extractors/ipc/tests/integration.rs new file mode 100644 index 00000000..ed82b79d --- /dev/null +++ b/extractors/ipc/tests/integration.rs @@ -0,0 +1,155 @@ +#![cfg(feature = "nats_integration_tests")] +#![cfg(feature = "node_integration_tests")] + +use ipc_extractor::{self, Args}; +use shared::{ + async_nats, + bitcoin::{self, hashes::Hash}, + bitcoind, + futures::StreamExt, + log::{self, info}, + nats_util::NatsArgs, + prost::Message, + protobuf::{ + event::{Event, event::PeerObserverEvent}, + ipc_extractor::ipc::IpcEvent::BlockTip, + }, + simple_logger::SimpleLogger, + testing::nats_server::NatsServerForTesting, + tokio::{self, sync::watch, task::LocalSet, time::sleep}, +}; +use std::sync::Once; +use std::time::Duration; + +pub const QUERY_INTERVAL_SECONDS: u64 = 1; + +// 5 second check() timeout. +const TEST_TIMEOUT_SECONDS: u64 = 5; + +pub fn make_test_args(nats_port: u16, ipc_socket_path: String) -> Args { + Args::new( + NatsArgs { + address: format!("127.0.0.1:{}", nats_port), + username: None, + password: None, + password_file: None, + }, + log::Level::Trace, + ipc_socket_path, + QUERY_INTERVAL_SECONDS, + ) +} + +static INIT: Once = Once::new(); + +pub fn setup() { + INIT.call_once(|| { + SimpleLogger::new() + .with_level(log::LevelFilter::Trace) + .init() + .unwrap(); + }); +} + +pub fn setup_node(conf: bitcoind::Conf) -> bitcoind::BitcoinD { + info!( + "env BITCOIN_NODE_EXE={:?}", + std::env::var("BITCOIN_NODE_EXE") + ); + let exe_path = std::env::var("BITCOIN_NODE_EXE").unwrap(); + + info!("Using bitcoin-node at '{}'", exe_path); + bitcoind::BitcoinD::with_conf(exe_path, &conf).unwrap() +} + +fn configure_node() -> bitcoind::BitcoinD { + let mut node_conf = bitcoind::Conf::default(); + node_conf.args = vec!["-regtest", "-ipcbind=unix"]; + // enabling this is useful for debugging, but enabling this by default will + // be quite spammy. + node_conf.view_stdout = false; + // node_conf.wallet is `true` by default, but since `bitcoin-node` binary doesn't + // have wallet capabilities we disable it + node_conf.wallet = None; + + setup_node(node_conf) +} + +async fn check(check_expected: fn(PeerObserverEvent) -> ()) { + setup(); + let node = configure_node(); + let nats_server = NatsServerForTesting::new(&[]).await; + + let ipc_socket_path = node + .workdir() + .as_path() + .join("regtest") + .join("node.sock") + .to_str() + .unwrap() + .into(); + + let args = make_test_args(nats_server.port, ipc_socket_path); + + let local = LocalSet::new(); + local + .run_until(async move { + let (shutdown_tx, shutdown_rx) = watch::channel(false); + + let ipc_extractor_future = ipc_extractor::run(args, shutdown_rx.clone()); + tokio::pin!(ipc_extractor_future); + + let nc = async_nats::connect(format!("127.0.0.1:{}", nats_server.port)) + .await + .unwrap(); + let mut sub = nc.subscribe("*").await.unwrap(); + + tokio::select! { + _ = sleep(Duration::from_secs(TEST_TIMEOUT_SECONDS)) => { + panic!("timed out waiting for check() to complete"); + } + msg = sub.next() => { + if let Some(msg) = msg { + let unwrapped = Event::decode(msg.payload).unwrap(); + if let Some(event) = unwrapped.peer_observer_event { + check_expected(event); + } + } else { + panic!("subscription ended"); + } + } + result = &mut ipc_extractor_future => { + panic!("ipc_extractor stopped unexpectedly: {:?}", result); + } + } + + shutdown_tx.send(true).unwrap(); + ipc_extractor_future.await.unwrap(); + }) + .await; +} + +#[tokio::test] +async fn test_integration_ipc() { + println!("test that we receive BlockTip IPC events"); + + check(|event| match event { + PeerObserverEvent::IpcExtractor(i) => { + if let Some(ref e) = i.ipc_event { + match e { + BlockTip(t) => { + assert_eq!(t.height, 0); + assert_eq!(t.hash.len(), 32); + assert_eq!( + bitcoin::BlockHash::from_slice(&t.hash).unwrap().to_string(), + // genesis blockhash in Regtest + "0f9188f13cb7b2c71f2a335e3a4fc328bf5beb436012afca590b1a11466e2206" + ); + } + } + } + } + _ => panic!("unexpected event {:?}", event), + }) + .await; +} diff --git a/extractors/log/tests/integration.rs b/extractors/log/tests/integration.rs index e11fc25f..c339998f 100644 --- a/extractors/log/tests/integration.rs +++ b/extractors/log/tests/integration.rs @@ -5,7 +5,7 @@ use log_extractor::Args; use shared::{ async_nats, bitcoin::{Block, consensus::Decodable, hashes::Hash, hex::FromHex}, - corepc_node, + bitcoind, futures::StreamExt, log::{Level, LevelFilter, info}, nats_util::NatsArgs, @@ -128,19 +128,19 @@ fn make_test_args(nats_port: u16, bitcoind_pipe: String) -> Args { /// Starts a regtest `bitcoind` node with the given configuration. /// /// The binary is resolved from the `BITCOIND_EXE` environment variable -/// (via `corepc_node::exe_path`); if unset, a pre-built binary is +/// (via `bitcoind::exe_path`); if unset, a pre-built binary is /// downloaded automatically. -fn setup_node(conf: corepc_node::Conf) -> corepc_node::Node { +fn setup_node(conf: bitcoind::Conf) -> bitcoind::BitcoinD { info!("env BITCOIND_EXE={:?}", std::env::var("BITCOIND_EXE")); - info!("exe_path={:?}", corepc_node::exe_path()); + info!("exe_path={:?}", bitcoind::exe_path()); - if let Ok(exe_path) = corepc_node::exe_path() { + if let Ok(exe_path) = bitcoind::exe_path() { info!("Using bitcoind at '{}'", exe_path); - return corepc_node::Node::with_conf(exe_path, &conf).unwrap(); + return bitcoind::BitcoinD::with_conf(exe_path, &conf).unwrap(); } info!("Trying to download a bitcoind.."); - corepc_node::Node::from_downloaded_with_conf(&conf).unwrap() + bitcoind::BitcoinD::from_downloaded_with_conf(&conf).unwrap() } /// Creates a two-node regtest topology: @@ -149,10 +149,10 @@ fn setup_node(conf: corepc_node::Conf) -> corepc_node::Node { /// /// `node1_args` are appended as extra `bitcoind` CLI flags (e.g. `-debug=validation`) so tests /// can enable the specific logging needed to trigger the events they want to observe. -fn setup_two_connected_nodes(node1_args: Vec<&str>) -> (corepc_node::Node, corepc_node::Node) { +fn setup_two_connected_nodes(node1_args: Vec<&str>) -> (bitcoind::BitcoinD, bitcoind::BitcoinD) { // node1 listens for p2p connections - let mut node1_conf = corepc_node::Conf::default(); - node1_conf.p2p = corepc_node::P2P::Yes; + let mut node1_conf = bitcoind::Conf::default(); + node1_conf.p2p = bitcoind::P2P::Yes; for arg in node1_args { info!("Running node1 with arg: {}", arg); node1_conf.args.push(arg); @@ -160,7 +160,7 @@ fn setup_two_connected_nodes(node1_args: Vec<&str>) -> (corepc_node::Node, corep let node1 = setup_node(node1_conf); // node2 connects to node1 - let mut node2_conf = corepc_node::Conf::default(); + let mut node2_conf = bitcoind::Conf::default(); node2_conf.p2p = node1.p2p_connect(true).unwrap(); let node2 = setup_node(node2_conf); @@ -193,7 +193,7 @@ fn setup_two_connected_nodes(node1_args: Vec<&str>) -> (corepc_node::Node, corep /// exit before returning. async fn check( args: Vec<&str>, - test_setup: fn(&corepc_node::Client), + test_setup: fn(&bitcoind::Client), check_event: fn(PeerObserverEvent) -> bool, ) { setup(); diff --git a/extractors/p2p/tests/integration.rs b/extractors/p2p/tests/integration.rs index d03a86b4..b75c89e6 100644 --- a/extractors/p2p/tests/integration.rs +++ b/extractors/p2p/tests/integration.rs @@ -4,7 +4,7 @@ use shared::{ async_nats, bitcoin::{self, Amount}, - corepc_node::{self}, + bitcoind::{self}, futures::StreamExt, log::{self, info}, nats_util::NatsArgs, @@ -72,21 +72,21 @@ fn make_test_args( ) } -fn setup_node(conf: corepc_node::Conf) -> corepc_node::Node { +fn setup_node(conf: bitcoind::Conf) -> bitcoind::BitcoinD { info!("env BITCOIND_EXE={:?}", std::env::var("BITCOIND_EXE")); - info!("exe_path={:?}", corepc_node::exe_path()); + info!("exe_path={:?}", bitcoind::exe_path()); - if let Ok(exe_path) = corepc_node::exe_path() { + if let Ok(exe_path) = bitcoind::exe_path() { info!("Using bitcoind at '{}'", exe_path); - return corepc_node::Node::with_conf(exe_path, &conf).unwrap(); + return bitcoind::BitcoinD::with_conf(exe_path, &conf).unwrap(); } info!("Trying to download a bitcoind.."); - corepc_node::Node::from_downloaded_with_conf(&conf).unwrap() + bitcoind::BitcoinD::from_downloaded_with_conf(&conf).unwrap() } -fn configure_node() -> corepc_node::Node { - let mut node_conf = corepc_node::Conf::default(); +fn configure_node() -> bitcoind::BitcoinD { + let mut node_conf = bitcoind::Conf::default(); node_conf.args = vec![ "-regtest", "-debug=net", @@ -100,7 +100,7 @@ fn configure_node() -> corepc_node::Node { // enabling this is useful for debugging, but enabling this by default will // be quite spammy. node_conf.view_stdout = false; - node_conf.p2p = corepc_node::P2P::Yes; + node_conf.p2p = bitcoind::P2P::Yes; setup_node(node_conf) } @@ -110,7 +110,7 @@ async fn check( disable_addrv2: bool, disable_invs: bool, disable_feefilter: bool, - test_setup: fn(&corepc_node::Node), + test_setup: fn(&bitcoind::BitcoinD), mut check_expected: impl FnMut(PeerObserverEvent) -> bool, ) { setup(); diff --git a/extractors/rpc/src/lib.rs b/extractors/rpc/src/lib.rs index d9434c6b..37bc346b 100644 --- a/extractors/rpc/src/lib.rs +++ b/extractors/rpc/src/lib.rs @@ -1,9 +1,9 @@ +use shared::bitcoind::mtype::{ + GetBlockchainInfo, GetChainTxStats, GetNetworkInfo, GetOrphanTxsVerboseTwo, +}; use shared::clap::{ArgGroup, Parser}; use shared::corepc_client::client_sync::Auth; use shared::corepc_client::client_sync::v30::{Client, FeeEstimateMode}; -use shared::corepc_node::mtype::{ - GetBlockchainInfo, GetChainTxStats, GetNetworkInfo, GetOrphanTxsVerboseTwo, -}; use shared::log; use shared::nats_subjects::Subject; use shared::nats_util::{self, NatsArgs}; diff --git a/extractors/rpc/tests/common/mod.rs b/extractors/rpc/tests/common/mod.rs index 2f6c71f9..cd3ab6f5 100644 --- a/extractors/rpc/tests/common/mod.rs +++ b/extractors/rpc/tests/common/mod.rs @@ -1,5 +1,5 @@ use shared::{ - corepc_node, + bitcoind, log::{self, info}, nats_util::NatsArgs, simple_logger::SimpleLogger, @@ -148,27 +148,27 @@ pub fn make_test_args( ) } -pub fn setup_node(conf: corepc_node::Conf) -> corepc_node::Node { +pub fn setup_node(conf: bitcoind::Conf) -> bitcoind::BitcoinD { info!("env BITCOIND_EXE={:?}", std::env::var("BITCOIND_EXE")); - info!("exe_path={:?}", corepc_node::exe_path()); + info!("exe_path={:?}", bitcoind::exe_path()); - if let Ok(exe_path) = corepc_node::exe_path() { + if let Ok(exe_path) = bitcoind::exe_path() { info!("Using bitcoind at '{}'", exe_path); - return corepc_node::Node::with_conf(exe_path, &conf).unwrap(); + return bitcoind::BitcoinD::with_conf(exe_path, &conf).unwrap(); } info!("Trying to download a bitcoind.."); - corepc_node::Node::from_downloaded_with_conf(&conf).unwrap() + bitcoind::BitcoinD::from_downloaded_with_conf(&conf).unwrap() } -pub fn setup_two_connected_nodes() -> (corepc_node::Node, corepc_node::Node) { +pub fn setup_two_connected_nodes() -> (bitcoind::BitcoinD, bitcoind::BitcoinD) { // node1 listens for p2p connections - let mut node1_conf = corepc_node::Conf::default(); - node1_conf.p2p = corepc_node::P2P::Yes; + let mut node1_conf = bitcoind::Conf::default(); + node1_conf.p2p = bitcoind::P2P::Yes; let node1 = setup_node(node1_conf); // node2 connects to node1 - let mut node2_conf = corepc_node::Conf::default(); + let mut node2_conf = bitcoind::Conf::default(); node2_conf.p2p = node1.p2p_connect(true).unwrap(); let node2 = setup_node(node2_conf); diff --git a/extractors/rpc/tests/integration.rs b/extractors/rpc/tests/integration.rs index 1a60331c..836609a8 100644 --- a/extractors/rpc/tests/integration.rs +++ b/extractors/rpc/tests/integration.rs @@ -13,7 +13,7 @@ use shared::{ Amount, OutPoint, ScriptBuf, Sequence, Transaction, TxIn, TxOut, Txid, Witness, absolute, consensus, hashes::Hash, hex::DisplayHex, transaction, }, - corepc_node, + bitcoind, futures::StreamExt, prost::Message, protobuf::{ @@ -41,7 +41,7 @@ const TEST_TIMEOUT_SECONDS: u64 = 5; async fn check( rpcs: EnabledRPCsInTest, - test_setup: fn(&corepc_node::Node, &corepc_node::Node), + test_setup: fn(&bitcoind::BitcoinD, &bitcoind::BitcoinD), check_expected: fn(PeerObserverEvent) -> (), ) { setup(); diff --git a/protobuf/event.proto b/protobuf/event.proto index cf0b4cdd..dbe4c835 100644 --- a/protobuf/event.proto +++ b/protobuf/event.proto @@ -6,13 +6,15 @@ import "ebpf_extractor.proto"; import "rpc_extractor.proto"; import "p2p_extractor.proto"; import "log_extractor.proto"; +import "ipc_extractor.proto"; message Event { - required uint64 timestamp = 10; // Timestamp (milliseconds since UNIX epoch) when the event was constructed. + required uint64 timestamp = 10; // Timestamp (milliseconds since UNIX epoch) when the event was constructed. oneof peer_observer_event { ebpf_extractor.ebpf ebpf_extractor = 1; rpc_extractor.rpc rpc_extractor = 2; p2p_extractor.p2p p2p_extractor = 3; log_extractor.log log_extractor = 4; + ipc_extractor.ipc ipc_extractor = 5; } } diff --git a/protobuf/ipc_extractor.proto b/protobuf/ipc_extractor.proto new file mode 100644 index 00000000..cafb7f96 --- /dev/null +++ b/protobuf/ipc_extractor.proto @@ -0,0 +1,14 @@ +syntax = "proto2"; + +package ipc_extractor; + +message ipc { + oneof ipc_event { + BlockTip block_tip = 1; + } +} + +message BlockTip { + required int32 height = 1; + required bytes hash = 2; +} \ No newline at end of file diff --git a/shared/Cargo.toml b/shared/Cargo.toml index 124b23b0..d07c428e 100644 --- a/shared/Cargo.toml +++ b/shared/Cargo.toml @@ -24,11 +24,11 @@ regex = "1.12" # Use custom commit to support: # - cpu_load and inv_to_send in getpeerinfo -# (branch 2026-04-corepc-0.12+custom-peer-observer-patches) +# (branch 2026-05-corepc-0.12+custom-peer-observer-patches) # When updating the Bitcoin Core version used, grep for CORE_VERSION_GREP # and update the corepc_client::types::v* to the new version. -corepc-node = { git = "https://github.com/0xb10c/corepc", rev = "9e4eb4d7181575ff75f9e0a6997b091bd5023b1d", features = ["download", "30_2"] } -corepc-client = { git = "https://github.com/0xb10c/corepc", rev = "9e4eb4d7181575ff75f9e0a6997b091bd5023b1d", features = ["client-sync"]} +bitcoind = { git = "https://github.com/0xb10c/corepc", rev = "8798e7f9c240c626dc3aa61adadeaf42f8221e40", features = ["download", "30_2"] } +corepc-client = { git = "https://github.com/0xb10c/corepc", rev = "8798e7f9c240c626dc3aa61adadeaf42f8221e40", features = ["client-sync"]} [build-dependencies] prost-build = "0.14" diff --git a/shared/src/lib.rs b/shared/src/lib.rs index 4c97033a..a69039e4 100644 --- a/shared/src/lib.rs +++ b/shared/src/lib.rs @@ -2,9 +2,9 @@ pub extern crate async_nats; pub extern crate bitcoin; +pub extern crate bitcoind; pub extern crate clap; pub extern crate corepc_client; -pub extern crate corepc_node; pub extern crate futures; pub extern crate lazy_static; pub extern crate log; diff --git a/shared/src/nats_subjects.rs b/shared/src/nats_subjects.rs index d3360b43..cecbc9af 100644 --- a/shared/src/nats_subjects.rs +++ b/shared/src/nats_subjects.rs @@ -5,6 +5,7 @@ const NATS_SUBJECT_NETMSG: &str = "netmsg"; const NATS_SUBJECT_NETCONN: &str = "netconn"; const NATS_SUBJECT_VALIDATION: &str = "validation"; const NATS_SUBJECT_RPC: &str = "rpc"; +const NATS_SUBJECT_IPC: &str = "ipc"; const NATS_SUBJECT_P2P_EXTRACTOR: &str = "p2p-extractor"; const NATS_SUBJECT_LOG_EXTRACTOR: &str = "log-extractor"; @@ -14,6 +15,7 @@ pub enum Subject { NetConn, Validation, Rpc, + Ipc, P2PExtractor, LogExtractor, } @@ -26,6 +28,7 @@ impl fmt::Display for Subject { Subject::NetMsg => write!(f, "{}", NATS_SUBJECT_NETMSG), Subject::Validation => write!(f, "{}", NATS_SUBJECT_VALIDATION), Subject::Rpc => write!(f, "{}", NATS_SUBJECT_RPC), + Subject::Ipc => write!(f, "{}", NATS_SUBJECT_IPC), Subject::P2PExtractor => write!(f, "{}", NATS_SUBJECT_P2P_EXTRACTOR), Subject::LogExtractor => write!(f, "{}", NATS_SUBJECT_LOG_EXTRACTOR), } diff --git a/shared/src/protobuf/ipc_extractor.rs b/shared/src/protobuf/ipc_extractor.rs new file mode 100644 index 00000000..fea4ce85 --- /dev/null +++ b/shared/src/protobuf/ipc_extractor.rs @@ -0,0 +1,24 @@ +use crate::bitcoin::hashes::Hash; +use std::fmt; + +// structs are generated via the ipc_extractor.proto file +include!(concat!(env!("OUT_DIR"), "/ipc_extractor.rs")); + +impl fmt::Display for ipc::IpcEvent { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + ipc::IpcEvent::BlockTip(tip) => write!(f, "{}", tip), + } + } +} + +impl fmt::Display for BlockTip { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!( + f, + "BlockTip(height={}, hash={})", + self.height, + bitcoin::BlockHash::from_slice(&self.hash).unwrap() + ) + } +} diff --git a/shared/src/protobuf/mod.rs b/shared/src/protobuf/mod.rs index 664cf307..fa1cfcac 100644 --- a/shared/src/protobuf/mod.rs +++ b/shared/src/protobuf/mod.rs @@ -13,5 +13,8 @@ pub mod p2p_extractor; /// Protobuf types for rpc-extractor events. pub mod rpc_extractor; +/// Protobuf types for ipc-extractor events. +pub mod ipc_extractor; + /// Protobuf types for log-extractor events. pub mod log_extractor; diff --git a/shared/src/protobuf/rpc_extractor.rs b/shared/src/protobuf/rpc_extractor.rs index 8546eb44..5447fef4 100644 --- a/shared/src/protobuf/rpc_extractor.rs +++ b/shared/src/protobuf/rpc_extractor.rs @@ -12,7 +12,7 @@ use corepc_client::types::v30::{ }; // Ideally, all type imports should use the generic mtype types. -use corepc_node::mtype::{ +use bitcoind::mtype::{ EstimateSmartFee as RPCEstimateSmartFee, GetBlockchainInfo, GetChainTxStats, GetMempoolInfo, GetNetworkInfo, GetNetworkInfoAddress, GetNetworkInfoNetwork, GetOrphanTxsVerboseTwo, GetOrphanTxsVerboseTwoEntry, @@ -390,7 +390,7 @@ impl From for BlockchainInfo { size_on_disk: info.size_on_disk, pruned: info.pruned, prune_height: info.prune_height.unwrap_or_default(), - prune_target_size: info.prune_target_size.map(|s| s as u64).unwrap_or_default(), + prune_target_size: info.prune_target_size.unwrap_or_default(), warnings: info.warnings, } } diff --git a/shell.nix b/shell.nix index d648b4eb..41c7c614 100644 --- a/shell.nix +++ b/shell.nix @@ -18,6 +18,8 @@ pkgs.mkShell { pkgs.bpftools + pkgs.capnproto + # libbpf CO-RE pkgs # # use the unwrapped clang: @@ -42,6 +44,7 @@ pkgs.mkShell { # use the nix one instead export BITCOIND_SKIP_DOWNLOAD=1 export BITCOIND_EXE=${pkgs.bitcoind}/bin/bitcoind + export BITCOIN_NODE_EXE=${pkgs.bitcoind}/libexec/bitcoin-node # set the path of the Linux kernel headers. These are needed in # build.rs of the ebpf-extractor on Nix. diff --git a/tools/logger/README.md b/tools/logger/README.md index 4b310f66..07615e26 100644 --- a/tools/logger/README.md +++ b/tools/logger/README.md @@ -79,6 +79,8 @@ Options: If passed, show validation events --rpc If passed, show RPC events + --ipc + If passed, show IPC events --p2p-extractor If passed, show p2p-extractor events --log-extractor diff --git a/tools/logger/src/lib.rs b/tools/logger/src/lib.rs index 085c4caa..9ebca789 100644 --- a/tools/logger/src/lib.rs +++ b/tools/logger/src/lib.rs @@ -56,6 +56,10 @@ pub struct Args { #[arg(long)] pub rpc: bool, + /// If passed, show IPC events + #[arg(long)] + pub ipc: bool, + /// If passed, show p2p-extractor events #[arg(long)] pub p2p_extractor: bool, @@ -72,6 +76,7 @@ impl Args { || self.mempool || self.validation || self.rpc + || self.ipc || self.p2p_extractor || self.log_extractor) } @@ -85,6 +90,7 @@ impl Args { mempool: bool, validation: bool, rpc: bool, + ipc: bool, p2p_extractor: bool, log_extractor: bool, ) -> Self { @@ -96,6 +102,7 @@ impl Args { mempool, validation, rpc, + ipc, p2p_extractor, log_extractor, } @@ -112,6 +119,7 @@ pub async fn run(args: Args, mut shutdown_rx: watch::Receiver) -> Result<( log::info!("logging mempool events: {}", args.mempool); log::info!("logging validation events: {}", args.validation); log::info!("logging rpc events: {}", args.rpc); + log::info!("logging ipc events: {}", args.ipc); log::info!("logging p2p_extractor events: {}", args.p2p_extractor); log::info!("logging log_extractor events: {}", args.log_extractor); } @@ -183,6 +191,11 @@ fn log_event(event: Event, args: Args) { log::info!("rpc: {}", r.rpc_event.unwrap()); } } + PeerObserverEvent::IpcExtractor(i) => { + if log_all || args.ipc { + log::info!("ipc: {}", i.ipc_event.unwrap()); + } + } PeerObserverEvent::P2pExtractor(p) => { if log_all || args.p2p_extractor { log::info!("p2p event: {}", p.p2p_event.unwrap()); diff --git a/tools/logger/tests/integration.rs b/tools/logger/tests/integration.rs index 47752da5..9525210c 100644 --- a/tools/logger/tests/integration.rs +++ b/tools/logger/tests/integration.rs @@ -17,6 +17,7 @@ use shared::{ Ebpf, }, event::{event::PeerObserverEvent, Event}, + ipc_extractor::{self, BlockTip}, log_extractor::{self, LogDebugCategory}, p2p_extractor, rpc_extractor::{self, PeerInfo, PeerInfos}, @@ -75,6 +76,7 @@ fn make_test_args( rpc: bool, p2p_extractor: bool, log_extractor: bool, + ipc_extractor: bool, ) -> Args { Args::new( nats_util::NatsArgs { @@ -91,6 +93,7 @@ fn make_test_args( rpc, p2p_extractor, log_extractor, + ipc_extractor, ) } @@ -120,7 +123,17 @@ async fn publish_and_check(events: &[Event], subject: Subject, expected: &str) { let (shutdown_tx, shutdown_rx) = watch::channel(false); let logger_handle = tokio::spawn(async move { - let args = make_test_args(nats_server.port, true, true, true, true, true, true, true); + let args = make_test_args( + nats_server.port, + true, + true, + true, + true, + true, + true, + true, + true, + ); logger::run(args, shutdown_rx.clone()).await.unwrap(); }); // allow the logger tool to start @@ -163,7 +176,7 @@ async fn test_integration_logger_fail_if_no_nats() { let logger_handle = tokio::spawn(async move { let args = make_test_args( 65535, // There shouln't be a NATS server running on this port.. - true, true, true, true, true, true, true, + true, true, true, true, true, true, true, true, ); match logger::run(args, shutdown_rx.clone()).await { Ok(_) => panic!("We should fail when no NATS server is reachable."), @@ -450,6 +463,28 @@ async fn test_integration_logger_rpc_peerinfo() { .await; } +#[tokio::test] +async fn test_integration_logger_ipc_mining_get_tip() { + println!("test that IPC events are logged"); + + publish_and_check( + &[ + Event::new(PeerObserverEvent::IpcExtractor(ipc_extractor::Ipc { + ipc_event: Some(ipc_extractor::ipc::IpcEvent::BlockTip(BlockTip { + height: 111, + hash: ([0xff; 32]).to_vec(), + })), + })) + .unwrap(), + ], + Subject::Ipc, + r#" + ipc: BlockTip(height=111, hash=ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff) + "#, + ) + .await; +} + #[tokio::test] async fn test_integration_logger_p2pextractor_ping_duration() { println!("test that p2p-extractor events are logged"); diff --git a/tools/metrics/src/lib.rs b/tools/metrics/src/lib.rs index f8cf21f0..7c9000d1 100644 --- a/tools/metrics/src/lib.rs +++ b/tools/metrics/src/lib.rs @@ -19,6 +19,7 @@ use shared::protobuf::{ validation::validation_event, }, event::{event::PeerObserverEvent, Event}, + ipc_extractor, log_extractor::{log, Log, LogDebugCategory, LogLevel}, p2p_extractor::p2p, rpc_extractor::{rpc, Addrman, AddrmanBucket, FeeEstimateMode}, @@ -165,6 +166,11 @@ fn handle_event( PeerObserverEvent::LogExtractor(l) => { handle_log_event(&l, metrics); } + PeerObserverEvent::IpcExtractor(ipc) => { + if let Some(e) = ipc.ipc_event { + handle_ipc_event(&e, metrics); + } + } } } @@ -1465,3 +1471,11 @@ fn handle_log_event(log: &Log, metrics: metrics::Metrics) { } } } + +fn handle_ipc_event(e: &ipc_extractor::ipc::IpcEvent, metrics: metrics::Metrics) { + match e { + ipc_extractor::ipc::IpcEvent::BlockTip(tip) => { + metrics.ipc_block_tip_height.set(tip.height as i64); + } + } +} diff --git a/tools/metrics/src/metrics.rs b/tools/metrics/src/metrics.rs index 486e4f28..1ff3387d 100644 --- a/tools/metrics/src/metrics.rs +++ b/tools/metrics/src/metrics.rs @@ -232,6 +232,9 @@ pub struct Metrics { pub validation_block_connected_latest_transactions: IntGauge, pub validation_block_connected_connection_time: IntCounter, + // IPC-extractor + pub ipc_block_tip_height: IntGauge, + // RPC-extractor // getpeeinfo pub rpc_peer_info_list_peers_gmax_ban: IntGauge, @@ -447,6 +450,9 @@ impl Metrics { ig!(validation_block_connected_latest_transactions, "Last connected block transactions.", registry); ic!(validation_block_connected_connection_time, "Last connected block connection time in µs", registry); + // IPC-extractor + ig!(ipc_block_tip_height, "Tip height from IPC extractor.", registry); + // RPC-extractor // getpeerinfo ig!(rpc_peer_info_list_peers_gmax_ban, "Number of peers connected to us that are on the 2018 ban list by gmax.", registry); @@ -659,6 +665,9 @@ impl Metrics { validation_block_connected_latest_transactions, validation_block_connected_connection_time, + // IPC-extractor + ipc_block_tip_height, + // RPC-extractor // getpeerinfo rpc_peer_info_list_peers_gmax_ban, diff --git a/tools/metrics/tests/integration.rs b/tools/metrics/tests/integration.rs index 760264ef..7962a534 100644 --- a/tools/metrics/tests/integration.rs +++ b/tools/metrics/tests/integration.rs @@ -25,6 +25,7 @@ use shared::{ Ebpf, }, event::{event::PeerObserverEvent, Event}, + ipc_extractor, log_extractor::{self, LogDebugCategory}, p2p_extractor, rpc_extractor::{ @@ -3768,3 +3769,27 @@ async fn test_integration_metrics_rpc_estimatesmartfee() { ) .await; } + +#[tokio::test] +async fn test_integration_metrics_ipc_block_tip() { + println!("test that the IPC block tip metric works"); + + publish_and_check( + &[ + Event::new(PeerObserverEvent::IpcExtractor(ipc_extractor::Ipc { + ipc_event: Some(ipc_extractor::ipc::IpcEvent::BlockTip( + ipc_extractor::BlockTip { + height: 867530, + hash: vec![0x00, 0x01, 0x02, 0x03], + }, + )), + })) + .unwrap(), + ], + Subject::Ipc, + r#" + peerobserver_ipc_block_tip_height 867530 + "#, + ) + .await; +} diff --git a/tools/websocket/src/lib.rs b/tools/websocket/src/lib.rs index 5ad650e4..448cb45c 100644 --- a/tools/websocket/src/lib.rs +++ b/tools/websocket/src/lib.rs @@ -71,6 +71,7 @@ pub struct ClientSubscriptions { pub p2p: bool, pub log: bool, pub rpc: bool, + pub ipc: bool, } struct Client { @@ -254,6 +255,7 @@ async fn broadcast_to_clients(event: &PeerObserverEvent, clients: &Clients) { PeerObserverEvent::RpcExtractor(_) => client.subscriptions.rpc, PeerObserverEvent::P2pExtractor(_) => client.subscriptions.p2p, PeerObserverEvent::LogExtractor(_) => client.subscriptions.log, + PeerObserverEvent::IpcExtractor(_) => client.subscriptions.ipc, }; if !is_subscribed { diff --git a/tools/websocket/tests/integration.rs b/tools/websocket/tests/integration.rs index 0d725577..e7305a34 100644 --- a/tools/websocket/tests/integration.rs +++ b/tools/websocket/tests/integration.rs @@ -41,6 +41,7 @@ const SUBSCRIBE_NONE: ClientSubscriptions = ClientSubscriptions { p2p: false, log: false, rpc: false, + ipc: false, }; const SUBSCRIBE_ALL: ClientSubscriptions = ClientSubscriptions { @@ -53,6 +54,7 @@ const SUBSCRIBE_ALL: ClientSubscriptions = ClientSubscriptions { p2p: true, log: true, rpc: true, + ipc: true, }; #[derive(Debug, Clone)]