diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 82db0bfa..a67cea0a 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -185,6 +185,100 @@ jobs: # https://github.com/actions/cache/blob/main/tips-and-workarounds.md#update-a-cache key: ${{ github.job }}-${{ matrix.job-type }}-ccache-${{ github.run_id }} + windows-cross: + name: 'Linux->Windows cross, no tests' + runs-on: ubuntu-24.04 + if: ${{ vars.SKIP_BRANCH_PUSH != 'true' || github.event_name == 'pull_request' }} + + env: + FILE_ENV: './ci/test/00_setup_env_win64.sh' + DANGER_CI_ON_HOST_FOLDERS: 1 + + steps: + - name: Checkout + uses: actions/checkout@v5 + with: + ref: ${{ github.event_name == 'pull_request' && github.ref || '' }} + + - name: Set CI directories + run: | + echo "CCACHE_DIR=${{ runner.temp }}/ccache_dir" >> "$GITHUB_ENV" + echo "BASE_ROOT_DIR=${{ runner.temp }}" >> "$GITHUB_ENV" + echo "DEPENDS_DIR=${{ runner.temp }}/depends" >> "$GITHUB_ENV" + echo "BASE_BUILD_DIR=${{ runner.temp }}/build" >> "$GITHUB_ENV" + + - name: Depends cache + uses: actions/cache@v4 + with: + path: ${{ env.DEPENDS_DIR }}/built + key: ${{ github.job }}-depends-${{ hashFiles('depends/**', 'ci/test/00_setup_env_win64.sh') }} + + - name: Restore Ccache cache + id: ccache-cache + uses: actions/cache/restore@v4 + with: + path: ${{ env.CCACHE_DIR }} + key: ${{ github.job }}-ccache-${{ github.run_id }} + restore-keys: ${{ github.job }}-ccache- + + - name: CI script + run: ./ci/test_run_all.sh + + - name: Save Ccache cache + uses: actions/cache/save@v4 + if: github.event_name != 'pull_request' && steps.ccache-cache.outputs.cache-hit != 'true' + with: + path: ${{ env.CCACHE_DIR }} + key: ${{ github.job }}-ccache-${{ github.run_id }} + + - name: Upload built executables + uses: actions/upload-artifact@v4 + with: + name: x86_64-w64-mingw32-executables-${{ github.run_id }} + path: | + ${{ env.BASE_BUILD_DIR }}/bin/*.exe + ${{ env.BASE_BUILD_DIR }}/test/config.ini + + windows-native-test: + name: 'Windows, test cross-built' + runs-on: windows-2022 + needs: windows-cross + + env: + PYTHONUTF8: 1 + + steps: + - name: Checkout + uses: actions/checkout@v5 + with: + ref: ${{ github.event_name == 'pull_request' && github.ref || '' }} + + - name: Download built executables + uses: actions/download-artifact@v4 + with: + name: x86_64-w64-mingw32-executables-${{ github.run_id }} + + - name: Run sv2-tp.exe + run: ./bin/sv2-tp.exe -version + + - name: Find mt.exe tool + shell: pwsh + run: | + $sdk_dir = (Get-ItemProperty 'HKLM:\SOFTWARE\Wow6432Node\Microsoft\Windows Kits\Installed Roots' -Name KitsRoot10).KitsRoot10 + $sdk_latest = (Get-ChildItem "$sdk_dir\bin" -Directory | Where-Object { $_.Name -match '^\d+\.\d+\.\d+\.\d+$' } | Sort-Object Name -Descending | Select-Object -First 1).Name + "MT_EXE=${sdk_dir}bin\${sdk_latest}\x64\mt.exe" >> $env:GITHUB_ENV + + - name: Validate sv2-tp manifest + shell: pwsh + run: | + & $env:MT_EXE -nologo -inputresource:bin\sv2-tp.exe -out:sv2-tp.manifest + Get-Content sv2-tp.manifest + & $env:MT_EXE -nologo -inputresource:bin\sv2-tp.exe -validate_manifest + + - name: Run unit tests + run: | + ./bin/test_sv2.exe -l test_suite + ci-matrix: name: ${{ matrix.name }} needs: runners diff --git a/ci/test/00_setup_env_win64.sh b/ci/test/00_setup_env_win64.sh index 80044e99..9dcfe252 100755 --- a/ci/test/00_setup_env_win64.sh +++ b/ci/test/00_setup_env_win64.sh @@ -12,6 +12,6 @@ export CI_IMAGE_PLATFORM="linux/amd64" export HOST=x86_64-w64-mingw32 export PACKAGES="g++-mingw-w64-x86-64-posix nsis" export RUN_UNIT_TESTS=false -export GOAL="deploy" +export GOAL="install" export BITCOIN_CONFIG="-DREDUCE_EXPORTS=ON \ -DCMAKE_CXX_FLAGS='-Wno-error=maybe-uninitialized'" diff --git a/cmake/module/Maintenance.cmake b/cmake/module/Maintenance.cmake index 9c3d2d34..6f30ca97 100644 --- a/cmake/module/Maintenance.cmake +++ b/cmake/module/Maintenance.cmake @@ -43,7 +43,7 @@ function(add_maintenance_targets) endfunction() function(add_windows_deploy_target) - if(MINGW AND TARGET sv2-tp) + if(MINGW AND TARGET bitcoin) find_program(MAKENSIS_EXECUTABLE makensis) if(NOT MAKENSIS_EXECUTABLE) add_custom_target(deploy diff --git a/contrib/guix/guix-build b/contrib/guix/guix-build index 866453fd..ee285bf3 100755 --- a/contrib/guix/guix-build +++ b/contrib/guix/guix-build @@ -81,8 +81,8 @@ check_source_date_epoch # Default to building for all supported HOSTs (overridable by environment) # powerpc64le-linux-gnu currently disabled due non-determinism issues across build arches. -# x86_64-w64-mingw32 current disabled pending Windows support export HOSTS="${HOSTS:-x86_64-linux-gnu arm-linux-gnueabihf aarch64-linux-gnu riscv64-linux-gnu powerpc64-linux-gnu + x86_64-w64-mingw32 x86_64-apple-darwin arm64-apple-darwin}" # Usage: distsrc_for_host HOST diff --git a/depends/Makefile b/depends/Makefile index e4df3e89..0276b544 100644 --- a/depends/Makefile +++ b/depends/Makefile @@ -34,8 +34,7 @@ BASE_CACHE ?= $(BASEDIR)/built SDK_PATH ?= $(BASEDIR)/SDKs NO_BOOST ?= NO_WALLET ?= -# Default NO_IPC value is 1 on Windows -NO_IPC ?= $(if $(findstring mingw32,$(HOST)),1,) +NO_IPC ?= LTO ?= FALLBACK_DOWNLOAD_PATH ?= https://bitcoincore.org/depends-sources diff --git a/depends/README.md b/depends/README.md index 7b092111..1670044a 100644 --- a/depends/README.md +++ b/depends/README.md @@ -78,7 +78,7 @@ The following can be set when running make: `make FOO=bar` - `C_STANDARD`: Set the C standard version used. Defaults to `c11`. - `CXX_STANDARD`: Set the C++ standard version used. Defaults to `c++20`. - `NO_BOOST`: Don't download/build/cache Boost -- `NO_IPC`: Don't build Cap’n Proto and libmultiprocess packages. Default on Windows. +- `NO_IPC`: Don't build Cap’n Proto and libmultiprocess packages. - `DEBUG`: Disable some optimizations and enable more runtime checking - `HOST_ID_SALT`: Optional salt to use when generating host package ids - `BUILD_ID_SALT`: Optional salt to use when generating build package ids diff --git a/depends/packages/capnp.mk b/depends/packages/capnp.mk index 542bf126..981ab3b9 100644 --- a/depends/packages/capnp.mk +++ b/depends/packages/capnp.mk @@ -10,6 +10,7 @@ define $(package)_set_vars $(package)_config_opts += -DWITH_OPENSSL=OFF $(package)_config_opts += -DWITH_ZLIB=OFF $(package)_cxxflags += -fdebug-prefix-map=$($(package)_extract_dir)=/usr -fmacro-prefix-map=$($(package)_extract_dir)=/usr + $(package)_cppflags += -D_WIN32_WINNT=0x0602 endef define $(package)_config_cmds diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index b0e3f521..2909bc8f 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -102,7 +102,7 @@ if(BUILD_MINE) sv2-tp.cpp init/basic.cpp ) - add_windows_resources(sv2-tp sv2-tp-res.rc) + add_windows_resources(sv2-tp sv2-tp.rc) add_windows_application_manifest(sv2-tp) target_link_libraries(sv2-tp core_interface diff --git a/src/ipc/capnp/protocol.cpp b/src/ipc/capnp/protocol.cpp index 27ef73e8..41457938 100644 --- a/src/ipc/capnp/protocol.cpp +++ b/src/ipc/capnp/protocol.cpp @@ -23,9 +23,11 @@ #include #include #include -#include #include #include +#ifndef WIN32 +#include +#endif namespace ipc { namespace capnp { @@ -65,36 +67,36 @@ void IpcLogFn(mp::LogMessage message) class CapnpProtocol : public Protocol { public: + CapnpProtocol(const char* exe_name) : m_exe_name{exe_name} {} ~CapnpProtocol() noexcept(true) { m_loop_ref.reset(); if (m_loop_thread.joinable()) m_loop_thread.join(); assert(!m_loop); }; - std::unique_ptr connect(int fd, const char* exe_name) override + std::unique_ptr connect(mp::Stream stream) override { - startLoop(exe_name); - return mp::ConnectStream(*m_loop, fd); + startLoop(); + return mp::ConnectStream(*m_loop, std::move(stream)); } - void listen(int listen_fd, const char* exe_name, interfaces::Init& init) override + void listen(mp::SocketId listen_fd, interfaces::Init& init) override { - startLoop(exe_name); + startLoop(); if (::listen(listen_fd, /*backlog=*/5) != 0) { throw std::system_error(errno, std::system_category()); } mp::ListenConnections(*m_loop, listen_fd, init); } - void serve(int fd, const char* exe_name, interfaces::Init& init, const std::function& ready_fn = {}) override + void serve(interfaces::Init& init, const std::function& make_stream) override { assert(!m_loop); - mp::g_thread_context.thread_name = mp::ThreadName(exe_name); + mp::g_thread_context.thread_name = mp::ThreadName(m_exe_name); mp::LogOptions opts = { .log_fn = IpcLogFn, .log_level = GetRequestedIPCLogLevel() }; - m_loop.emplace(exe_name, std::move(opts), &m_context); - if (ready_fn) ready_fn(); - mp::ServeStream(*m_loop, fd, init); + m_loop.emplace(m_exe_name, std::move(opts), &m_context); + mp::ServeStream(*m_loop, make_stream(), init); m_parent_connection = &m_loop->m_incoming_connections.back(); m_loop->loop(); m_loop.reset(); @@ -109,12 +111,21 @@ class CapnpProtocol : public Protocol m_loop->m_incoming_connections.remove_if([this](mp::Connection& c) { return &c != m_parent_connection; }); }); } + mp::Stream makeStream(mp::SocketId socket) override + { + startLoop(); +#if MP_MAJOR_VERSION < 12 + return socket; +#else + return m_loop->m_io_context.lowLevelProvider->wrapSocketFd(socket, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP); +#endif + } void addCleanup(std::type_index type, void* iface, std::function cleanup) override { mp::ProxyTypeRegister::types().at(type)(iface).cleanup_fns.emplace_back(std::move(cleanup)); } Context& context() override { return m_context; } - void startLoop(const char* exe_name) + void startLoop() { if (m_loop) return; std::promise promise; @@ -124,7 +135,7 @@ class CapnpProtocol : public Protocol .log_fn = IpcLogFn, .log_level = GetRequestedIPCLogLevel() }; - m_loop.emplace(exe_name, std::move(opts), &m_context); + m_loop.emplace(m_exe_name, std::move(opts), &m_context); m_loop_ref.emplace(*m_loop); promise.set_value(); m_loop->loop(); @@ -132,8 +143,8 @@ class CapnpProtocol : public Protocol }); promise.get_future().wait(); } + const char* m_exe_name; Context m_context; - std::thread m_loop_thread; //! EventLoop object which manages I/O events for all connections. std::optional m_loop; //! Reference to the same EventLoop. Increments the loop’s refcount on @@ -142,9 +153,10 @@ class CapnpProtocol : public Protocol std::optional m_loop_ref; //! Connection to parent, if this is a child process spawned by a parent process. mp::Connection* m_parent_connection{nullptr}; + std::thread m_loop_thread; }; } // namespace -std::unique_ptr MakeCapnpProtocol() { return std::make_unique(); } +std::unique_ptr MakeCapnpProtocol(const char* exe_name) { return std::make_unique(exe_name); } } // namespace capnp } // namespace ipc diff --git a/src/ipc/capnp/protocol.h b/src/ipc/capnp/protocol.h index eb057949..b12a0284 100644 --- a/src/ipc/capnp/protocol.h +++ b/src/ipc/capnp/protocol.h @@ -10,7 +10,7 @@ namespace ipc { class Protocol; namespace capnp { -std::unique_ptr MakeCapnpProtocol(); +std::unique_ptr MakeCapnpProtocol(const char* exe_name); } // namespace capnp } // namespace ipc diff --git a/src/ipc/interfaces.cpp b/src/ipc/interfaces.cpp index d6b078e6..efd78efb 100644 --- a/src/ipc/interfaces.cpp +++ b/src/ipc/interfaces.cpp @@ -54,15 +54,14 @@ class IpcImpl : public interfaces::Ipc public: IpcImpl(const char* exe_name, const char* process_argv0, interfaces::Init& init) : m_exe_name(exe_name), m_process_argv0(process_argv0), m_init(init), - m_protocol(ipc::capnp::MakeCapnpProtocol()), m_process(ipc::MakeProcess()) + m_protocol(ipc::capnp::MakeCapnpProtocol(exe_name)), m_process(ipc::MakeProcess()) { } std::unique_ptr spawnProcess(const char* new_exe_name) override { - int pid; - int fd = m_process->spawn(new_exe_name, m_process_argv0, pid); + const auto [pid, socket] = m_process->spawn(new_exe_name, m_process_argv0); LogDebug(::BCLog::IPC, "Process %s pid %i launched\n", new_exe_name, pid); - auto init = m_protocol->connect(fd, m_exe_name); + auto init = m_protocol->connect(m_protocol->makeStream(socket)); Ipc::addCleanup(*init, [this, new_exe_name, pid] { int status = m_process->waitSpawned(pid); LogDebug(::BCLog::IPC, "Process %s pid %i exited with status %i\n", new_exe_name, pid, status); @@ -72,19 +71,19 @@ class IpcImpl : public interfaces::Ipc bool startSpawnedProcess(int argc, char* argv[], int& exit_status) override { exit_status = EXIT_FAILURE; - int32_t fd = -1; - if (!m_process->checkSpawned(argc, argv, fd)) { + mp::SocketId socket{mp::SocketError}; + if (!m_process->checkSpawned(argc, argv, socket)) { return false; } IgnoreCtrlC(strprintf("[%s] SIGINT received — waiting for parent to shut down.\n", m_exe_name)); - m_protocol->serve(fd, m_exe_name, m_init); + m_protocol->serve(m_init, [&] { return m_protocol->makeStream(socket); } ); exit_status = EXIT_SUCCESS; return true; } std::unique_ptr connectAddress(std::string& address) override { if (address.empty() || address == "0") return nullptr; - int fd; + mp::SocketId fd; if (address == "auto") { // Treat "auto" the same as "unix" except don't treat it an as error // if the connection is not accepted. Just return null so the caller @@ -103,12 +102,12 @@ class IpcImpl : public interfaces::Ipc } else { fd = m_process->connect(gArgs.GetDataDirNet(), "bitcoin-node", address); } - return m_protocol->connect(fd, m_exe_name); + return m_protocol->connect(m_protocol->makeStream(fd)); } void listenAddress(std::string& address) override { - int fd = m_process->bind(gArgs.GetDataDirNet(), m_exe_name, address); - m_protocol->listen(fd, m_exe_name, m_init); + mp::SocketId fd = m_process->bind(gArgs.GetDataDirNet(), m_exe_name, address); + m_protocol->listen(fd, m_init); } void disconnectIncoming() override { diff --git a/src/ipc/libmultiprocess/.github/workflows/bitcoin-core-ci.yml b/src/ipc/libmultiprocess/.github/workflows/bitcoin-core-ci.yml index e6ac83f0..89380ac4 100644 --- a/src/ipc/libmultiprocess/.github/workflows/bitcoin-core-ci.yml +++ b/src/ipc/libmultiprocess/.github/workflows/bitcoin-core-ci.yml @@ -18,6 +18,8 @@ concurrency: env: BITCOIN_REPO: bitcoin/bitcoin + # Temporary: use PR #35084 until it merges; revert to refs/heads/master after + BITCOIN_CORE_REF: refs/pull/35084/merge LLVM_VERSION: 22 LIBCXX_DIR: /tmp/libcxx-build/ @@ -79,6 +81,7 @@ jobs: uses: actions/checkout@v4 with: repository: ${{ env.BITCOIN_REPO }} + ref: ${{ env.BITCOIN_CORE_REF }} fetch-depth: 1 - name: Checkout libmultiprocess @@ -195,6 +198,7 @@ jobs: uses: actions/checkout@v4 with: repository: ${{ env.BITCOIN_REPO }} + ref: ${{ env.BITCOIN_CORE_REF }} fetch-depth: 1 - name: Checkout libmultiprocess diff --git a/src/ipc/libmultiprocess/CMakeLists.txt b/src/ipc/libmultiprocess/CMakeLists.txt index a36023b1..56f77b62 100644 --- a/src/ipc/libmultiprocess/CMakeLists.txt +++ b/src/ipc/libmultiprocess/CMakeLists.txt @@ -13,7 +13,7 @@ endif() include("cmake/compat_find.cmake") find_package(Threads REQUIRED) -find_package(CapnProto 0.7 QUIET NO_MODULE) +find_package(CapnProto 0.9 QUIET NO_MODULE) if(NOT CapnProto_FOUND) message(FATAL_ERROR "Cap'n Proto is required but was not found.\n" @@ -203,6 +203,10 @@ target_link_libraries(mpgen PRIVATE CapnProto::capnp-rpc) target_link_libraries(mpgen PRIVATE CapnProto::capnpc) target_link_libraries(mpgen PRIVATE CapnProto::kj) target_link_libraries(mpgen PRIVATE Threads::Threads) +target_compile_definitions(mpgen PRIVATE + "CAPNP_EXECUTABLE=\"$\"" + "CAPNPC_CXX_EXECUTABLE=\"$\"" + "CAPNP_INCLUDE_DIRS=\"${CAPNP_INCLUDE_DIRS}\"") set_target_properties(mpgen PROPERTIES INSTALL_RPATH_USE_LINK_PATH TRUE) set_target_properties(mpgen PROPERTIES diff --git a/src/ipc/libmultiprocess/ci/configs/olddeps.bash b/src/ipc/libmultiprocess/ci/configs/olddeps.bash index 95f44128..1a363b1b 100644 --- a/src/ipc/libmultiprocess/ci/configs/olddeps.bash +++ b/src/ipc/libmultiprocess/ci/configs/olddeps.bash @@ -1,5 +1,5 @@ CI_DESC="CI job using old Cap'n Proto and cmake versions" CI_DIR=build-olddeps export CXXFLAGS="-Werror -Wall -Wextra -Wpedantic -Wno-unused-parameter -Wno-error=array-bounds" -NIX_ARGS=(--argstr capnprotoVersion "0.7.1" --argstr cmakeVersion "3.12.4") +NIX_ARGS=(--argstr capnprotoVersion "0.9.2" --argstr cmakeVersion "3.12.4") BUILD_ARGS=(-k) diff --git a/src/ipc/libmultiprocess/cmake/compat_config.cmake b/src/ipc/libmultiprocess/cmake/compat_config.cmake index f9d3004f..51bda36b 100644 --- a/src/ipc/libmultiprocess/cmake/compat_config.cmake +++ b/src/ipc/libmultiprocess/cmake/compat_config.cmake @@ -12,6 +12,75 @@ if (NOT DEFINED capnp_PREFIX AND DEFINED CAPNP_INCLUDE_DIRS) get_filename_component(capnp_PREFIX "${CAPNP_INCLUDE_DIRS}" DIRECTORY) endif() +if (NOT DEFINED CAPNP_INCLUDE_DIRS AND DEFINED capnp_PREFIX) + set(CAPNP_INCLUDE_DIRS "${capnp_PREFIX}/include") +endif() + +if (NOT TARGET CapnProto::capnp_tool) + if (DEFINED CAPNP_EXECUTABLE) + add_executable(CapnProto::capnp_tool IMPORTED GLOBAL) + set_target_properties(CapnProto::capnp_tool PROPERTIES IMPORTED_LOCATION "${CAPNP_EXECUTABLE}") + elseif (DEFINED capnp_PREFIX) + add_executable(CapnProto::capnp_tool IMPORTED GLOBAL) + set_target_properties(CapnProto::capnp_tool PROPERTIES IMPORTED_LOCATION "${capnp_PREFIX}/bin/capnp") + endif() +endif() + +if (NOT TARGET CapnProto::capnpc_cpp) + if (DEFINED CAPNPC_CXX_EXECUTABLE) + add_executable(CapnProto::capnpc_cpp IMPORTED GLOBAL) + set_target_properties(CapnProto::capnpc_cpp PROPERTIES IMPORTED_LOCATION "${CAPNPC_CXX_EXECUTABLE}") + elseif (DEFINED capnp_PREFIX) + add_executable(CapnProto::capnpc_cpp IMPORTED GLOBAL) + set_target_properties(CapnProto::capnpc_cpp PROPERTIES IMPORTED_LOCATION "${capnp_PREFIX}/bin/capnpc-c++") + endif() +endif() + +# Validate CapnProto tool target locations and fix if broken. +# Some packaged capnproto versions (e.g., Ubuntu Noble libcapnp-dev 1.0.1) +# have incorrect IMPORTED_LOCATION paths due to a packaging bug where the cmake +# config file is installed under /usr/lib/.../cmake/ but the _IMPORT_PREFIX +# calculation goes up too few directory levels, yielding /usr/lib/bin/capnp +# instead of the correct /usr/bin/capnp. +foreach(_mp_tool IN ITEMS capnp_tool capnpc_cpp) + if (TARGET "CapnProto::${_mp_tool}") + get_target_property(_mp_configs "CapnProto::${_mp_tool}" IMPORTED_CONFIGURATIONS) + set(_mp_valid FALSE) + foreach(_mp_cfg IN LISTS _mp_configs) + get_target_property(_mp_loc "CapnProto::${_mp_tool}" "IMPORTED_LOCATION_${_mp_cfg}") + if (EXISTS "${_mp_loc}") + set(_mp_valid TRUE) + break() + endif() + endforeach() + if (NOT _mp_valid) + get_target_property(_mp_loc "CapnProto::${_mp_tool}" IMPORTED_LOCATION) + if (EXISTS "${_mp_loc}") + set(_mp_valid TRUE) + endif() + endif() + if (NOT _mp_valid) + if ("${_mp_tool}" STREQUAL "capnp_tool") + find_program(_mp_fixed capnp HINTS "${capnp_PREFIX}/bin") + else() + find_program(_mp_fixed capnpc-c++ HINTS "${capnp_PREFIX}/bin") + endif() + if (_mp_fixed) + foreach(_mp_cfg IN LISTS _mp_configs) + set_target_properties("CapnProto::${_mp_tool}" PROPERTIES "IMPORTED_LOCATION_${_mp_cfg}" "${_mp_fixed}") + endforeach() + set_target_properties("CapnProto::${_mp_tool}" PROPERTIES IMPORTED_LOCATION "${_mp_fixed}") + endif() + unset(_mp_fixed CACHE) + endif() + endif() +endforeach() +unset(_mp_tool) +unset(_mp_configs) +unset(_mp_valid) +unset(_mp_cfg) +unset(_mp_loc) + if (NOT DEFINED CAPNPC_OUTPUT_DIR) set(CAPNPC_OUTPUT_DIR "${CMAKE_CURRENT_BINARY_DIR}") endif() diff --git a/src/ipc/libmultiprocess/doc/design.md b/src/ipc/libmultiprocess/doc/design.md index 113cafc4..094602e9 100644 --- a/src/ipc/libmultiprocess/doc/design.md +++ b/src/ipc/libmultiprocess/doc/design.md @@ -120,7 +120,7 @@ sequenceDiagram participant PMT as ProxyMethodTraits participant Impl as Actual C++ Method - serverInvoke->>SF1: SF1::invoke + serverInvoke->>SF1: SF1::invoke SF1->>SF2: SF2::invoke SF2->>SR: SR::invoke SR->>SC: SC::invoke @@ -165,7 +165,7 @@ Thread mapping enables each client thread to have a dedicated server thread proc Thread mapping is initialized by defining an interface method with a `ThreadMap` parameter and/or response. The example below adds `ThreadMap` to the `construct` method because libmultiprocess calls the `construct` method automatically. ```capnp -interface InitInterface $Proxy.wrap("Init") { +interface InitInterface $Proxy.wrap("Init") { construct @0 (threadMap: Proxy.ThreadMap) -> (threadMap :Proxy.ThreadMap); } ``` diff --git a/src/ipc/libmultiprocess/doc/versions.md b/src/ipc/libmultiprocess/doc/versions.md index 2c2ec50e..14bd8ad8 100644 --- a/src/ipc/libmultiprocess/doc/versions.md +++ b/src/ipc/libmultiprocess/doc/versions.md @@ -7,8 +7,17 @@ Library versions are tracked with simple Versioning policy is described in the [version.h](../include/mp/version.h) include. -## v10 +## v12 - Current unstable version. +- Adds support for nonunix platforms, making API changes that are not backwards compatible. + +## [v11.0](https://github.com/bitcoin-core/libmultiprocess/commits/v11.0) +- Improves debug output if EventLoop::post callback fails. + +## [v10.0](https://github.com/bitcoin-core/libmultiprocess/commits/v10.0) +- Increases spawn test timeout to avoid spurious failures. +- Uses `throwRecoverableException` instead of raw `throw` to improve runtime error messages in macOS builds. +- Used in Bitcoin Core master branch, pulled in by [#34977](https://github.com/bitcoin/bitcoin/pull/34977). Also pulled into Bitcoin Core 31.x stable branch by [#35028](https://github.com/bitcoin/bitcoin/pull/35028). ## [v9.0](https://github.com/bitcoin-core/libmultiprocess/commits/v9.0) - Fixes race conditions where worker thread could be used after destruction, where getParams() could be called after request cancel, and where m_on_cancel could be called after request finishes. diff --git a/src/ipc/libmultiprocess/example/calculator.cpp b/src/ipc/libmultiprocess/example/calculator.cpp index 86ce388b..6ed2df5f 100644 --- a/src/ipc/libmultiprocess/example/calculator.cpp +++ b/src/ipc/libmultiprocess/example/calculator.cpp @@ -6,8 +6,7 @@ #include #include // NOLINT(misc-include-cleaner) // IWYU pragma: keep -#include -#include +#include // IWYU pragma: keep #include #include #include @@ -16,9 +15,9 @@ #include #include #include +#include #include #include -#include #include class CalculatorImpl : public Calculator @@ -51,14 +50,10 @@ int main(int argc, char** argv) std::cout << "Usage: mpcalculator \n"; return 1; } - int fd; - if (std::from_chars(argv[1], argv[1] + strlen(argv[1]), fd).ec != std::errc{}) { - std::cerr << argv[1] << " is not a number or is larger than an int\n"; - return 1; - } + mp::SocketId socket{mp::StartSpawned(argv[1])}; mp::EventLoop loop("mpcalculator", LogPrint); std::unique_ptr init = std::make_unique(); - mp::ServeStream(loop, fd, *init); + mp::ServeStream(loop, mp::MakeStream(loop.m_io_context, socket), *init); loop.loop(); return 0; } diff --git a/src/ipc/libmultiprocess/example/example.cpp b/src/ipc/libmultiprocess/example/example.cpp index 38313977..68bce888 100644 --- a/src/ipc/libmultiprocess/example/example.cpp +++ b/src/ipc/libmultiprocess/example/example.cpp @@ -19,20 +19,20 @@ #include #include #include +#include #include namespace fs = std::filesystem; static auto Spawn(mp::EventLoop& loop, const std::string& process_argv0, const std::string& new_exe_name) { - int pid; - const int fd = mp::SpawnProcess(pid, [&](int fd) -> std::vector { + const auto [pid, socket] = mp::SpawnProcess([&](mp::ConnectInfo info) -> std::vector { fs::path path = process_argv0; path.remove_filename(); path.append(new_exe_name); - return {path.string(), std::to_string(fd)}; + return {path.string(), std::move(info)}; }); - return std::make_tuple(mp::ConnectStream(loop, fd), pid); + return std::make_tuple(mp::ConnectStream(loop, mp::MakeStream(loop.m_io_context, socket)), pid); } static void LogPrint(mp::LogMessage log_data) diff --git a/src/ipc/libmultiprocess/example/printer.cpp b/src/ipc/libmultiprocess/example/printer.cpp index 9150d59b..9b456d9c 100644 --- a/src/ipc/libmultiprocess/example/printer.cpp +++ b/src/ipc/libmultiprocess/example/printer.cpp @@ -7,8 +7,7 @@ #include #include // NOLINT(misc-include-cleaner) // IWYU pragma: keep -#include -#include +#include // IWYU pragma: keep #include #include #include @@ -16,9 +15,9 @@ #include #include #include +#include #include #include -#include class PrinterImpl : public Printer { @@ -44,14 +43,10 @@ int main(int argc, char** argv) std::cout << "Usage: mpprinter \n"; return 1; } - int fd; - if (std::from_chars(argv[1], argv[1] + strlen(argv[1]), fd).ec != std::errc{}) { - std::cerr << argv[1] << " is not a number or is larger than an int\n"; - return 1; - } + mp::SocketId socket{mp::StartSpawned(argv[1])}; mp::EventLoop loop("mpprinter", LogPrint); std::unique_ptr init = std::make_unique(); - mp::ServeStream(loop, fd, *init); + mp::ServeStream(loop, mp::MakeStream(loop.m_io_context, socket), *init); loop.loop(); return 0; } diff --git a/src/ipc/libmultiprocess/include/mp/config.h.in b/src/ipc/libmultiprocess/include/mp/config.h.in index 9d3c6240..4a8c9168 100644 --- a/src/ipc/libmultiprocess/include/mp/config.h.in +++ b/src/ipc/libmultiprocess/include/mp/config.h.in @@ -6,7 +6,6 @@ #define MP_CONFIG_H #cmakedefine CMAKE_INSTALL_PREFIX "@CMAKE_INSTALL_PREFIX@" -#cmakedefine capnp_PREFIX "@capnp_PREFIX@" #cmakedefine HAVE_KJ_FILESYSTEM #cmakedefine HAVE_PTHREAD_GETNAME_NP @HAVE_PTHREAD_GETNAME_NP@ diff --git a/src/ipc/libmultiprocess/include/mp/proxy-io.h b/src/ipc/libmultiprocess/include/mp/proxy-io.h index d7b9f0e5..4f629963 100644 --- a/src/ipc/libmultiprocess/include/mp/proxy-io.h +++ b/src/ipc/libmultiprocess/include/mp/proxy-io.h @@ -16,6 +16,7 @@ #include #include #include +#include #include #include #include @@ -210,6 +211,21 @@ class Logger std::string LongThreadName(const char* exe_name); +inline SocketId StreamSocketId(const Stream& stream) +{ + if (stream) KJ_IF_MAYBE(socket, stream->getFd()) return *socket; +#ifdef WIN32 + if (stream) KJ_IF_MAYBE(handle, stream->getWin32Handle()) return reinterpret_cast(*handle); +#endif + throw std::logic_error("Stream socket unset"); +} + +//! Wrap a socket file descriptor as an async stream, taking ownership of the fd. +inline Stream MakeStream(kj::AsyncIoContext& io_context, SocketId socket) +{ + return io_context.lowLevelProvider->wrapSocketFd(socket, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP); +} + //! Event loop implementation. //! //! Cap'n Proto threading model is very simple: all I/O operations are @@ -308,11 +324,12 @@ class EventLoop //! Callback functions to run on async thread. std::optional m_async_fns MP_GUARDED_BY(m_mutex); - //! Pipe read handle used to wake up the event loop thread. - int m_wait_fd = -1; + //! Socket pair used to post and wait for wakeups to the event loop thread. + kj::Own m_wait_stream; + kj::Own m_post_stream; - //! Pipe write handle used to wake up the event loop thread. - int m_post_fd = -1; + //! Synchronous writer used to write to m_post_stream. + kj::Own m_post_writer; //! Number of clients holding references to ProxyServerBase objects that //! reference this event loop. @@ -793,17 +810,15 @@ kj::Promise ProxyServer::post(Fn&& fn) return ret; } -//! Given stream file descriptor, make a new ProxyClient object to send requests -//! over the stream. Also create a new Connection object embedded in the -//! client that is freed when the client is closed. +//! Given a stream, make a new ProxyClient object to send requests over it. +//! Also create a new Connection object embedded in the client that is freed +//! when the client is closed. template -std::unique_ptr> ConnectStream(EventLoop& loop, int fd) +std::unique_ptr> ConnectStream(EventLoop& loop, kj::Own stream) { typename InitInterface::Client init_client(nullptr); std::unique_ptr connection; loop.sync([&] { - auto stream = - loop.m_io_context.lowLevelProvider->wrapSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP); connection = std::make_unique(loop, kj::mv(stream)); init_client = connection->m_rpc_system->bootstrap(ServerVatId().vat_id).castAs(); Connection* connection_ptr = connection.get(); @@ -851,13 +866,12 @@ void _Listen(EventLoop& loop, kj::Own&& listener, InitIm })); } -//! Given stream file descriptor and an init object, handle requests on the -//! stream by calling methods on the Init object. +//! Given a stream and an init object, handle requests on the stream by calling +//! methods on the Init object. template -void ServeStream(EventLoop& loop, int fd, InitImpl& init) +void ServeStream(EventLoop& loop, kj::Own stream, InitImpl& init) { - _Serve( - loop, loop.m_io_context.lowLevelProvider->wrapSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP), init); + _Serve(loop, kj::mv(stream), init); } //! Given listening socket file descriptor and an init object, handle incoming diff --git a/src/ipc/libmultiprocess/include/mp/proxy.h b/src/ipc/libmultiprocess/include/mp/proxy.h index c55380c1..b63eaa5b 100644 --- a/src/ipc/libmultiprocess/include/mp/proxy.h +++ b/src/ipc/libmultiprocess/include/mp/proxy.h @@ -314,11 +314,11 @@ static constexpr int FIELD_BOXED = 16; template struct Accessor : public Field { - static const bool in = flags & FIELD_IN; - static const bool out = flags & FIELD_OUT; - static const bool optional = flags & FIELD_OPTIONAL; - static const bool requested = flags & FIELD_REQUESTED; - static const bool boxed = flags & FIELD_BOXED; + static constexpr bool in = (flags & FIELD_IN) != 0; + static constexpr bool out = (flags & FIELD_OUT) != 0; + static constexpr bool optional = (flags & FIELD_OPTIONAL) != 0; + static constexpr bool requested = (flags & FIELD_REQUESTED) != 0; + static constexpr bool boxed = (flags & FIELD_BOXED) != 0; }; //! Wrapper around std::function for passing std::function objects between client and servers. diff --git a/src/ipc/libmultiprocess/include/mp/type-interface.h b/src/ipc/libmultiprocess/include/mp/type-interface.h index a32c53d2..f685a623 100644 --- a/src/ipc/libmultiprocess/include/mp/type-interface.h +++ b/src/ipc/libmultiprocess/include/mp/type-interface.h @@ -54,12 +54,12 @@ void CustomBuildField(TypeList, InvokeContext& invoke_context, Impl& value, Output&& output, - typename decltype(output.get())::Calls* enable = nullptr) + typename Decay::Calls* enable = nullptr) { // Disable deleter so proxy server object doesn't attempt to delete the // wrapped implementation when the proxy client is destroyed or // disconnected. - using Interface = typename decltype(output.get())::Calls; + using Interface = typename Decay::Calls; output.set(CustomMakeProxyServer(invoke_context, std::shared_ptr(&value, [](Impl*){}))); } diff --git a/src/ipc/libmultiprocess/include/mp/util.h b/src/ipc/libmultiprocess/include/mp/util.h index a3db1282..5b5daa8c 100644 --- a/src/ipc/libmultiprocess/include/mp/util.h +++ b/src/ipc/libmultiprocess/include/mp/util.h @@ -5,12 +5,15 @@ #ifndef MP_UTIL_H #define MP_UTIL_H +#include #include #include #include #include #include #include +#include +#include #include #include #include @@ -20,6 +23,10 @@ #include #include +#ifdef WIN32 +#include +#endif + namespace mp { //! Generic utility functions used by capnp code. @@ -136,7 +143,10 @@ struct PtrOrValue { std::variant data; template - PtrOrValue(T* ptr, Args&&... args) : data(ptr ? ptr : std::variant{std::in_place_type, std::forward(args)...}) {} + PtrOrValue(T* ptr, Args&&... args) : data(std::in_place_type, ptr) + { + if (!ptr) data.template emplace(std::forward(args)...); + } T& operator*() { return data.index() ? std::get(data) : *std::get(data); } T* operator->() { return &**this; } @@ -249,25 +259,51 @@ std::string ThreadName(const char* exe_name); //! errors in python unit tests. std::string LogEscape(const kj::StringTree& string, size_t max_size); +using Stream = kj::Own; + +#ifdef WIN32 +using ProcessId = uintptr_t; +using SocketId = uintptr_t; +constexpr SocketId SocketError{INVALID_SOCKET}; +#else +using ProcessId = int; +using SocketId = int; +constexpr SocketId SocketError{-1}; +#endif + +//! Information about parent process passed to child process as a command-line +//! argument. On unix this is the child socket fd number formatted as a string. +//! On windows, this is a path to a named pipe the parent process will write +//! WSADuplicateSocket info to. +using ConnectInfo = std::string; + //! Callback type used by SpawnProcess below. -using FdToArgsFn = std::function(int fd)>; +using ConnectInfoToArgsFn = std::function(const ConnectInfo&)>; //! Spawn a new process that communicates with the current process over a socket -//! pair. Returns pid through an output argument, and file descriptor for the -//! local side of the socket. -//! The fd_to_args callback is invoked in the parent process before fork(). -//! It must not rely on child pid/state, and must return the command line -//! arguments that should be used to execute the process. Embed the remote file -//! descriptor number in whatever format the child process expects. -int SpawnProcess(int& pid, FdToArgsFn&& fd_to_args); - -//! Call execvp with vector args. -//! Not safe to call in a post-fork child of a multi-threaded process. -//! Currently only used by mpgen at build time. -void ExecProcess(const std::vector& args); +//! pair. Calls connect_info_to_args callback with a connection string that +//! needs to be passed to the child process, and executes the argv command line +//! it returns. Returns child process id and socket id. +std::tuple SpawnProcess(ConnectInfoToArgsFn&& connect_info_to_args); + +//! Spawn a process and return its process id. Caller should call WaitProcess +//! on the returned id. +ProcessId SpawnProcess(const std::vector& args); + +//! Initialize spawned child process using the ConnectInfo string passed to it, +//! returning a socket id for communicating with the parent process. +SocketId StartSpawned(const ConnectInfo& connect_info); + +//! Create a socket pair that can be used to communicate within a process or +//! between parent and child processes. +std::array SocketPair(); + +//! Start a process and return its process id. Caller should call WaitProcess +//! on the returned id. +ProcessId ExecProcess(const std::vector& args); //! Wait for a process to exit and return its exit code. -int WaitProcess(int pid); +int WaitProcess(ProcessId pid); inline char* CharCast(char* c) { return c; } inline char* CharCast(unsigned char* c) { return (char*)c; } diff --git a/src/ipc/libmultiprocess/include/mp/version.h b/src/ipc/libmultiprocess/include/mp/version.h index 964667a9..4587a288 100644 --- a/src/ipc/libmultiprocess/include/mp/version.h +++ b/src/ipc/libmultiprocess/include/mp/version.h @@ -24,7 +24,7 @@ //! pointing at the prior merge commit. The /doc/versions.md file should also be //! updated, noting any significant or incompatible changes made since the //! previous version. -#define MP_MAJOR_VERSION 10 +#define MP_MAJOR_VERSION 12 //! Minor version number. Should be incremented in stable branches after //! backporting changes. The /doc/versions.md file should also be updated to diff --git a/src/ipc/libmultiprocess/src/mp/gen.cpp b/src/ipc/libmultiprocess/src/mp/gen.cpp index 603f9ccb..07a41a1f 100644 --- a/src/ipc/libmultiprocess/src/mp/gen.cpp +++ b/src/ipc/libmultiprocess/src/mp/gen.cpp @@ -8,7 +8,6 @@ #include #include #include -#include #include #include #include @@ -20,14 +19,13 @@ #include #include #include +#include #include #include #include #include #include #include -#include -#include #include #include @@ -170,7 +168,7 @@ static void Generate(kj::StringPtr src_prefix, if (p != std::string::npos) include_base.erase(p); std::vector args; - args.emplace_back(capnp_PREFIX "/bin/capnp"); + args.emplace_back(CAPNP_EXECUTABLE); args.emplace_back("compile"); args.emplace_back("--src-prefix="); args.back().append(src_prefix.cStr(), src_prefix.size()); @@ -178,18 +176,11 @@ static void Generate(kj::StringPtr src_prefix, args.emplace_back("--import-path="); args.back().append(import_path.cStr(), import_path.size()); } - args.emplace_back("--output=" capnp_PREFIX "/bin/capnpc-c++"); + args.emplace_back("--output=" CAPNPC_CXX_EXECUTABLE); args.emplace_back(src_file); - const int pid = fork(); - if (pid == -1) { - throw std::system_error(errno, std::system_category(), "fork"); - } - if (!pid) { - mp::ExecProcess(args); - } - const int status = mp::WaitProcess(pid); + const int status = mp::WaitProcess(mp::ExecProcess(args)); if (status) { - throw std::runtime_error("Invoking " capnp_PREFIX "/bin/capnp failed"); + throw std::runtime_error("Invoking " CAPNP_EXECUTABLE " failed"); } const capnp::SchemaParser parser; @@ -677,35 +668,41 @@ static void Generate(kj::StringPtr src_prefix, int main(int argc, char** argv) { - if (argc < 3) { - std::cerr << "Usage: " << PROXY_BIN << " SRC_PREFIX INCLUDE_PREFIX SRC_FILE [IMPORT_PATH...]\n"; - exit(1); - } - std::vector import_paths; - std::vector> import_dirs; - auto fs = kj::newDiskFilesystem(); - auto cwd = fs->getCurrentPath(); - kj::Own src_dir; - KJ_IF_MAYBE(dir, fs->getRoot().tryOpenSubdir(cwd.evalNative(argv[1]))) { - src_dir = kj::mv(*dir); - } else { - throw std::runtime_error(std::string("Failed to open src_prefix prefix directory: ") + argv[1]); - } - for (int i = 4; i < argc; ++i) { - KJ_IF_MAYBE(dir, fs->getRoot().tryOpenSubdir(cwd.evalNative(argv[i]))) { - import_paths.emplace_back(argv[i]); - import_dirs.emplace_back(kj::mv(*dir)); + int ret = 1; + KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() { + if (argc < 3) { + std::cerr << "Usage: " << PROXY_BIN << " SRC_PREFIX INCLUDE_PREFIX SRC_FILE [IMPORT_PATH...]\n"; + exit(1); + } + std::vector import_paths; + std::vector> import_dirs; + auto fs = kj::newDiskFilesystem(); + auto cwd = fs->getCurrentPath(); + kj::Own src_dir; + KJ_IF_MAYBE(dir, fs->getRoot().tryOpenSubdir(cwd.evalNative(argv[1]))) { + src_dir = kj::mv(*dir); } else { - throw std::runtime_error(std::string("Failed to open import directory: ") + argv[i]); + throw std::runtime_error(std::string("Failed to open src_prefix prefix directory: ") + argv[1]); } - } - for (const char* path : {CMAKE_INSTALL_PREFIX "/include", capnp_PREFIX "/include"}) { - KJ_IF_MAYBE(dir, fs->getRoot().tryOpenSubdir(cwd.evalNative(path))) { - import_paths.emplace_back(path); - import_dirs.emplace_back(kj::mv(*dir)); + for (int i = 4; i < argc; ++i) { + KJ_IF_MAYBE(dir, fs->getRoot().tryOpenSubdir(cwd.evalNative(argv[i]))) { + import_paths.emplace_back(argv[i]); + import_dirs.emplace_back(kj::mv(*dir)); + } else { + throw std::runtime_error(std::string("Failed to open import directory: ") + argv[i]); + } + } + for (const char* path : {CMAKE_INSTALL_PREFIX "/include", CAPNP_INCLUDE_DIRS}) { + KJ_IF_MAYBE(dir, fs->getRoot().tryOpenSubdir(cwd.evalNative(path))) { + import_paths.emplace_back(path); + import_dirs.emplace_back(kj::mv(*dir)); + } + // No exception thrown if _PREFIX directories do not exist } - // No exception thrown if _PREFIX directories do not exist + Generate(argv[1], argv[2], argv[3], import_paths, *src_dir, import_dirs); + ret = 0; + })) { + std::cerr << "mpgen error: " << kj::str(*exception).cStr() << '\n'; } - Generate(argv[1], argv[2], argv[3], import_paths, *src_dir, import_dirs); - return 0; + return ret; } diff --git a/src/ipc/libmultiprocess/src/mp/proxy.cpp b/src/ipc/libmultiprocess/src/mp/proxy.cpp index d24208db..64f5693a 100644 --- a/src/ipc/libmultiprocess/src/mp/proxy.cpp +++ b/src/ipc/libmultiprocess/src/mp/proxy.cpp @@ -22,7 +22,9 @@ #include #include #include +#include #include +#include #include #include #include @@ -30,10 +32,8 @@ #include #include #include -#include #include #include -#include #include namespace mp { @@ -66,10 +66,9 @@ void EventLoopRef::reset(bool relock) MP_NO_TSA loop->m_num_clients -= 1; if (loop->done()) { loop->m_cv.notify_all(); - int post_fd{loop->m_post_fd}; loop_lock->unlock(); char buffer = 0; - KJ_SYSCALL(write(post_fd, &buffer, 1)); // NOLINT(bugprone-suspicious-semicolon) + loop->m_post_writer->write(&buffer, 1); // By default, do not try to relock `loop_lock` after writing, // because the event loop could wake up and destroy itself and the // mutex might no longer exist. @@ -100,6 +99,35 @@ Connection::~Connection() // after the calls finish. m_rpc_system.reset(); + // shutdownWrite is needed on Windows so pending data in the m_stream socket + // will be sent instead of discarded when m_stream is destroyed. On unix, + // this doesn't seem to be needed because data is sent more reliably. + // + // Sending pending data is important if the connection is a socketpair + // because when one side of the socketpair is closed, the other side doesn't + // seem to receive any onDisconnect event. So it is important for the other + // side to instead receive Cap'n Proto "release" messages (see `struct + // Release` in capnp/rpc.capnp) from local Client objects being destroyed so + // the remote side can free resources and shut down cleanly. Without this, + // when one side of a socket pair is closed the other side may not receive + // these messages, preventing the remote side from freeing ProxyServer + // resources and shutting down cleanly. + // Use kj::runCatchingExceptions instead of try/catch because on macOS with + // dynamic libraries, kj::Exception typeinfo differs between libcapnp and + // the calling binary, so catch (const kj::Exception&) silently fails to + // match. kj::runCatchingExceptions uses KJ's own interception mechanism + // which works correctly across dynamic library boundaries. + KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() { + m_stream->shutdownWrite(); + })) { + // Ignore ENOTCONN: on macOS/FreeBSD (unlike Linux), shutdown(SHUT_WR) + // returns ENOTCONN if the peer already closed the connection. This is + // expected when the destructor is triggered by a remote disconnect. + if (exception->getType() != kj::Exception::Type::DISCONNECTED) { + kj::throwRecoverableException(kj::mv(*exception)); + } + } + // ProxyClient cleanup handlers are in sync list, and ProxyServer cleanup // handlers are in the async list. // @@ -173,6 +201,40 @@ void Connection::removeSyncCleanup(CleanupIt it) m_sync_cleanup_fns.erase(it); } +#ifdef WIN32 +//! Synchronous socket output stream. Cap'n Proto library only provides limited +//! support for synchronous IO. It provides `FdOutputStream` which wraps unix +//! file descriptors and calls write() internally, and `HandleOutStream` which +//! wraps windows HANDLE values and calls WriteFile() internally. This class +//! just provides analogous functionality wrapping SOCKET values and calls +//! send() internally. +class SocketOutputStream : public kj::OutputStream { +public: + explicit SocketOutputStream(SOCKET socket) : m_socket(socket) {} + + void write(const void* buffer, size_t size) override; + +private: + SOCKET m_socket; +}; + +static constexpr size_t WRITE_CLAMP_SIZE = 1u << 30; // 1GB clamp for Windows, like FdOutputStream + +void SocketOutputStream::write(const void* buffer, size_t size) { + const char* pos = reinterpret_cast(buffer); + + while (size > 0) { + int n = send(m_socket, pos, static_cast(kj::min(size, WRITE_CLAMP_SIZE)), 0); + + KJ_WIN32(n != SOCKET_ERROR, "send() failed"); + KJ_ASSERT(n > 0, "send() returned zero."); + + pos += n; + size -= n; + } +} +#endif + void EventLoop::addAsyncCleanup(std::function fn) { const Lock lock(m_mutex); @@ -203,10 +265,18 @@ EventLoop::EventLoop(const char* exe_name, LogOptions log_opts, void* context) m_log_opts(std::move(log_opts)), m_context(context) { - int fds[2]; - KJ_SYSCALL(socketpair(AF_UNIX, SOCK_STREAM, 0, fds)); - m_wait_fd = fds[0]; - m_post_fd = fds[1]; + auto pipe = m_io_context.provider->newTwoWayPipe(); + m_wait_stream = kj::mv(pipe.ends[0]); + m_post_stream = kj::mv(pipe.ends[1]); + KJ_IF_MAYBE(fd, m_post_stream->getFd()) { + m_post_writer = kj::heap(*fd); +#ifdef WIN32 + } else KJ_IF_MAYBE(handle, m_post_stream->getWin32Handle()) { + m_post_writer = kj::heap(reinterpret_cast(*handle)); +#endif + } else { + throw std::logic_error("Could not get file descriptor for new pipe."); + } } EventLoop::~EventLoop() @@ -215,8 +285,8 @@ EventLoop::~EventLoop() const Lock lock(m_mutex); KJ_ASSERT(m_post_fn == nullptr); KJ_ASSERT(!m_async_fns); - KJ_ASSERT(m_wait_fd == -1); - KJ_ASSERT(m_post_fd == -1); + KJ_ASSERT(!m_wait_stream); + KJ_ASSERT(!m_post_stream); KJ_ASSERT(m_num_clients == 0); // Spin event loop. wait for any promises triggered by RPC shutdown. @@ -236,21 +306,24 @@ void EventLoop::loop() m_async_fns.emplace(); } - kj::Own wait_stream{ - m_io_context.lowLevelProvider->wrapSocketFd(m_wait_fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP)}; - int post_fd{m_post_fd}; + kj::Own& wait_stream{m_wait_stream}; char buffer = 0; for (;;) { const size_t read_bytes = wait_stream->read(&buffer, 0, 1).wait(m_io_context.waitScope); if (read_bytes != 1) throw std::logic_error("EventLoop wait_stream closed unexpectedly"); Lock lock(m_mutex); if (m_post_fn) { - Unlock(lock, *m_post_fn); + // m_post_fn throwing is never expected. If it does happen, the caller + // of EventLoop::post() will return without any indication of failure, + // which will likely cause other bugs. Log the error and continue. + KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() MP_REQUIRES(m_mutex) { Unlock(lock, *m_post_fn); })) { + MP_LOG(*this, Log::Error) << "EventLoop: m_post_fn threw: " << kj::str(*exception).cStr(); + } m_post_fn = nullptr; m_cv.notify_all(); } else if (done()) { // Intentionally do not break if m_post_fn was set, even if done() - // would return true, to ensure that the EventLoopRef write(post_fd) + // would return true, to ensure that the post() m_post_writer->write() // call always succeeds and the loop does not exit between the time // that the done condition is set and the write call is made. break; @@ -260,10 +333,9 @@ void EventLoop::loop() m_task_set.reset(); MP_LOG(*this, Log::Info) << "EventLoop::loop bye."; wait_stream = nullptr; - KJ_SYSCALL(::close(post_fd)); const Lock lock(m_mutex); - m_wait_fd = -1; - m_post_fd = -1; + m_wait_stream = nullptr; + m_post_stream = nullptr; m_async_fns.reset(); m_cv.notify_all(); } @@ -278,10 +350,9 @@ void EventLoop::post(kj::Function fn) EventLoopRef ref(*this, &lock); m_cv.wait(lock.m_lock, [this]() MP_REQUIRES(m_mutex) { return m_post_fn == nullptr; }); m_post_fn = &fn; - int post_fd{m_post_fd}; Unlock(lock, [&] { char buffer = 0; - KJ_SYSCALL(write(post_fd, &buffer, 1)); + m_post_writer->write(&buffer, 1); }); m_cv.wait(lock.m_lock, [this, &fn]() MP_REQUIRES(m_mutex) { return m_post_fn != &fn; }); } diff --git a/src/ipc/libmultiprocess/src/mp/util.cpp b/src/ipc/libmultiprocess/src/mp/util.cpp index 463947b9..a1255e07 100644 --- a/src/ipc/libmultiprocess/src/mp/util.cpp +++ b/src/ipc/libmultiprocess/src/mp/util.cpp @@ -10,20 +10,31 @@ #include #include #include +#include #include -#include #include #include -#include -#include -#include -#include #include #include // NOLINT(misc-include-cleaner) // IWYU pragma: keep -#include #include #include +#ifdef WIN32 +#include +#include +#include +#include +#else +#include +#include +#include +#include +#include +#include +#include +#define _getpid getpid +#endif + #ifdef __linux__ #include #endif @@ -32,11 +43,17 @@ #include #endif // HAVE_PTHREAD_GETTHREADID_NP +#ifdef WIN32 +// Forward-declare internal capnp function. +namespace kj { namespace _ { int win32Socketpair(SOCKET socks[2]); } } +#endif + namespace fs = std::filesystem; namespace mp { namespace { +#ifndef WIN32 std::vector MakeArgv(const std::vector& args) { std::vector argv; @@ -58,18 +75,19 @@ size_t MaxFd() return 1023; } } +#endif } // namespace std::string ThreadName(const char* exe_name) { char thread_name[16] = {0}; -#ifdef HAVE_PTHREAD_GETNAME_NP +#if defined(HAVE_PTHREAD_GETNAME_NP) && !defined(WIN32) pthread_getname_np(pthread_self(), thread_name, sizeof(thread_name)); #endif // HAVE_PTHREAD_GETNAME_NP std::ostringstream buffer; - buffer << (exe_name ? exe_name : "") << "-" << getpid() << "/"; + buffer << (exe_name ? exe_name : "") << "-" << _getpid() << "/"; if (thread_name[0] != '\0') { buffer << thread_name << "-"; @@ -79,6 +97,8 @@ std::string ThreadName(const char* exe_name) // the former are shorter and are the same as what gdb prints "LWP ...". #ifdef __linux__ buffer << syscall(SYS_gettid); +#elif defined(WIN32) + buffer << GetCurrentThreadId(); #elif defined(HAVE_PTHREAD_THREADID_NP) uint64_t tid = 0; pthread_threadid_np(nullptr, &tid); @@ -116,23 +136,66 @@ std::string LogEscape(const kj::StringTree& string, size_t max_size) return result; } -int SpawnProcess(int& pid, FdToArgsFn&& fd_to_args) +//! Generate command line that the executable being invoked will split up using +//! the CommandLineToArgvW function, which expects arguments with spaces to be +//! quoted, quote characters to be backslash-escaped, and backslashes to also be +//! backslash-escaped, but only if they precede a quote character. +std::string CommandLineFromArgv(const std::vector& argv) { - int fds[2]; - if (socketpair(AF_UNIX, SOCK_STREAM, 0, fds) != 0) { - throw std::system_error(errno, std::system_category(), "socketpair"); + std::string out; + for (const auto& arg : argv) { + if (!out.empty()) out += " "; + if (!arg.empty() && arg.find_first_of(" \t\"") == std::string::npos) { + // Argument has no quotes or spaces so escaping not necessary. + out += arg; + } else { + out += '"'; // Start with a quote + for (size_t i = 0; i < arg.size(); ++i) { + if (arg[i] == '\\') { + // Count consecutive backslashes + size_t backslash_count = 0; + while (i < arg.size() && arg[i] == '\\') { + ++backslash_count; + ++i; + } + if (i < arg.size() && arg[i] == '"') { + // Backslashes before a quote need to be doubled + out.append(backslash_count * 2 + 1, '\\'); + out.push_back('"'); + } else { + // Otherwise, backslashes remain as-is + out.append(backslash_count, '\\'); + --i; // Compensate for the outer loop's increment + } + } else if (arg[i] == '"') { + // Escape double quotes with a backslash + out.push_back('\\'); + out.push_back('"'); + } else { + out.push_back(arg[i]); + } + } + out += '"'; // End with a quote + } } + return out; +} + +std::tuple SpawnProcess(ConnectInfoToArgsFn&& connect_info_to_args) +{ + auto fds{SocketPair()}; +#ifndef WIN32 // Evaluate the callback and build the argv array before forking. // // The parent process may be multi-threaded and holding internal library // locks at fork time. In that case, running code that allocates memory or // takes locks in the child between fork() and exec() can deadlock // indefinitely. Precomputing arguments in the parent avoids this. - const std::vector args{fd_to_args(fds[0])}; + const std::vector args{connect_info_to_args(std::to_string(fds[0]))}; const std::vector argv{MakeArgv(args)}; - pid = fork(); + ProcessId pid = fork(); if (pid == -1) { throw std::system_error(errno, std::system_category(), "fork"); } @@ -160,6 +223,16 @@ int SpawnProcess(int& pid, FdToArgsFn&& fd_to_args) } } + // Explicitly clear FD_CLOEXEC on the child's socket before calling + // exec, so the fd survives into the spawned process regardless of how + // the socket was created. + int flags = fcntl(fds[0], F_GETFD); + if (flags == -1) throw std::system_error(errno, std::system_category(), "fcntl F_GETFD"); + if (flags & FD_CLOEXEC) { + flags &= ~FD_CLOEXEC; + if (fcntl(fds[0], F_SETFD, flags) == -1) throw std::system_error(errno, std::system_category(), "fcntl F_SETFD"); + } + execvp(argv[0], argv.data()); // NOTE: perror() is not async-signal-safe; calling it here in a // post-fork child may deadlock in multithreaded parents. @@ -168,12 +241,77 @@ int SpawnProcess(int& pid, FdToArgsFn&& fd_to_args) perror("execvp failed"); _exit(127); } - return fds[1]; + return {pid, fds[1]}; +#else + // Create windows pipe to send socket over to child process. + static std::atomic counter{1}; + ConnectInfo pipe_path{"\\\\.\\pipe\\mp-" + std::to_string(GetCurrentProcessId()) + "-" + std::to_string(counter.fetch_add(1))}; + HANDLE pipe{CreateNamedPipeA(pipe_path.c_str(), PIPE_ACCESS_OUTBOUND, PIPE_TYPE_MESSAGE | PIPE_WAIT, /*nMaxInstances=*/1, /*nOutBufferSize=*/0, /*nInBufferSize=*/0, /*nDefaultTimeOut=*/0, /*lpSecurityAttributes=*/nullptr)}; + KJ_WIN32(pipe != INVALID_HANDLE_VALUE, "CreateNamedPipe failed"); + + // Start child process + std::string cmd{CommandLineFromArgv(connect_info_to_args(pipe_path))}; + STARTUPINFOA si{}; + si.cb = sizeof(si); + PROCESS_INFORMATION pi{}; + KJ_WIN32(CreateProcessA(/*lpApplicationName=*/nullptr, const_cast(cmd.c_str()), /*lpProcessAttributes=*/nullptr, /*lpThreadAttributes=*/nullptr, /*bInheritHandles=*/TRUE, /*dwCreationFlags=*/0, /*lpEnvironment=*/nullptr, /*lpCurrentDirectory=*/nullptr, &si, &pi), "CreateProcess failed"); + KJ_WIN32(CloseHandle(pi.hThread), "CloseHandle(hThread)"); + + // Duplicate socket for the child (now that we know its PID) + WSAPROTOCOL_INFO info{}; + KJ_WINSOCK(WSADuplicateSocket(fds[0], pi.dwProcessId, &info), "WSADuplicateSocket failed"); + + // Send socket to the child via the pipe + KJ_WIN32(ConnectNamedPipe(pipe, nullptr) || GetLastError() == ERROR_PIPE_CONNECTED, "ConnectNamedPipe failed"); + DWORD wr; + KJ_WIN32(WriteFile(pipe, &info, sizeof(info), &wr, nullptr) && wr == sizeof(info), "WriteFile(pipe) failed"); + KJ_WIN32(CloseHandle(pipe), "CloseHandle(pipe)"); + + return {reinterpret_cast(pi.hProcess), fds[1]}; +#endif +} + +SocketId StartSpawned(const ConnectInfo& connect_info) +{ +#ifndef WIN32 + return std::stoi(connect_info); +#else + HANDLE pipe = CreateFileA(connect_info.c_str(), /*dwDesiredAccess=*/GENERIC_READ, /*dwShareMode=*/0, /*lpSecurityAttributes=*/nullptr, /*dwCreationDisposition=*/OPEN_EXISTING, /*dwFlagsAndAttributes=*/0, /*hTemplateFile=*/nullptr); + KJ_WIN32(pipe != INVALID_HANDLE_VALUE, "CreateFile(pipe) failed"); + + WSAPROTOCOL_INFO info{}; + DWORD rd; + KJ_WIN32(ReadFile(pipe, &info, sizeof(info), &rd, nullptr) && rd == sizeof(info), "ReadFile(pipe) failed"); + KJ_WIN32(CloseHandle(pipe), "CloseHandle(pipe)"); + + WSADATA dontcare; + if (int wsaErr = WSAStartup(MAKEWORD(2, 2), &dontcare)) KJ_FAIL_WIN32("WSAStartup()", wsaErr); + + SOCKET socket{WSASocket(FROM_PROTOCOL_INFO, FROM_PROTOCOL_INFO, FROM_PROTOCOL_INFO, &info, 0, WSA_FLAG_OVERLAPPED | WSA_FLAG_NO_HANDLE_INHERIT)}; + KJ_WINSOCK(socket, "WSASocket(FROM_PROTOCOL_INFO) failed"); + return socket; +#endif +} + +std::array SocketPair() +{ +#ifdef WIN32 + SOCKET pair[2]; + KJ_WINSOCK(kj::_::win32Socketpair(pair)); +#else + int pair[2]; + KJ_SYSCALL(socketpair(AF_UNIX, SOCK_STREAM, 0, pair)); +#endif + return {pair[0], pair[1]}; } -void ExecProcess(const std::vector& args) +ProcessId ExecProcess(const std::vector& args) { +#ifndef WIN32 const std::vector argv{MakeArgv(args)}; + ProcessId pid; + KJ_SYSCALL(pid = fork()); + if (pid) return pid; if (execvp(argv[0], argv.data()) != 0) { perror("execvp failed"); if (errno == ENOENT && !args.empty()) { @@ -181,15 +319,34 @@ void ExecProcess(const std::vector& args) } _exit(1); } + KJ_UNREACHABLE; +#else + std::string cmd{CommandLineFromArgv(args)}; + STARTUPINFOA si{}; + si.cb = sizeof(si); + PROCESS_INFORMATION pi{}; + KJ_WIN32(CreateProcessA(/*lpApplicationName=*/nullptr, const_cast(cmd.c_str()), /*lpProcessAttributes=*/nullptr, /*lpThreadAttributes=*/nullptr, /*bInheritHandles=*/FALSE, /*dwCreationFlags=*/0, /*lpEnvironment=*/nullptr, /*lpCurrentDirectory=*/nullptr, &si, &pi), "CreateProcess"); + KJ_WIN32(CloseHandle(pi.hThread), "CloseHandle(hThread)"); + return reinterpret_cast(pi.hProcess); +#endif } -int WaitProcess(int pid) +int WaitProcess(ProcessId pid) { +#ifndef WIN32 int status; if (::waitpid(pid, &status, /*options=*/0) != pid) { throw std::system_error(errno, std::system_category(), "waitpid"); } return status; +#else + HANDLE handle{reinterpret_cast(pid)}; + DWORD result{WaitForSingleObject(handle, /*dwMilliseconds=*/INFINITE)}; + if (result != WAIT_OBJECT_0) KJ_FAIL_WIN32("WaitForSingleObject(child)", GetLastError()); + KJ_WIN32(GetExitCodeProcess(handle, &result), "GetExitCodeProcess"); + KJ_WIN32(CloseHandle(handle), "CloseHandle(process)"); + return result; +#endif } } // namespace mp diff --git a/src/ipc/libmultiprocess/test/mp/test/spawn_tests.cpp b/src/ipc/libmultiprocess/test/mp/test/spawn_tests.cpp index a14e50e2..5184667b 100644 --- a/src/ipc/libmultiprocess/test/mp/test/spawn_tests.cpp +++ b/src/ipc/libmultiprocess/test/mp/test/spawn_tests.cpp @@ -9,23 +9,31 @@ #include #include #include -#include #include #include #include -#include #include -#include +#include +#include #include +#ifndef WIN32 +#include +#include +#include +#endif + +namespace mp { +namespace test { namespace { +#ifndef WIN32 constexpr auto FAILURE_TIMEOUT = std::chrono::seconds{30}; // Poll for child process exit using waitpid(..., WNOHANG) until the child exits // or timeout expires. Returns true if the child exited and status_out was set. // Returns false on timeout or error. -static bool WaitPidWithTimeout(int pid, std::chrono::milliseconds timeout, int& status_out) +static bool WaitPidWithTimeout(ProcessId pid, std::chrono::milliseconds timeout, int& status_out) { const auto deadline = std::chrono::steady_clock::now() + timeout; while (std::chrono::steady_clock::now() < deadline) { @@ -40,14 +48,19 @@ static bool WaitPidWithTimeout(int pid, std::chrono::milliseconds timeout, int& } return false; } +#endif // !WIN32 } // namespace +#ifndef WIN32 KJ_TEST("SpawnProcess does not run callback in child") { // This test is designed to fail deterministically if fd_to_args is invoked // in the post-fork child: a mutex held by another parent thread at fork // time appears locked forever in the child. + // + // This test is Unix-only: Windows uses CreateProcess (not fork), so the + // inherited-locked-mutex hazard does not apply there. std::mutex target_mutex; std::mutex control_mutex; std::condition_variable control_cv; @@ -86,14 +99,13 @@ KJ_TEST("SpawnProcess does not run callback in child") control_cv.notify_one(); }); - int pid{-1}; - const int fd{mp::SpawnProcess(pid, [&](int child_fd) -> std::vector { + const auto [pid, socket]{SpawnProcess([&](ConnectInfo connect_info) -> std::vector { // If this callback runs in the post-fork child, target_mutex appears // locked forever (the owning thread does not exist), so this deadlocks. std::lock_guard g(target_mutex); - return {"true", std::to_string(child_fd)}; + return {"true", std::move(connect_info)}; })}; - ::close(fd); + ::close(socket); int status{0}; // Give the child some time to exit. If it does not, terminate it and @@ -110,3 +122,6 @@ KJ_TEST("SpawnProcess does not run callback in child") KJ_EXPECT(exited, "Timeout waiting for child process to exit"); KJ_EXPECT(WIFEXITED(status) && WEXITSTATUS(status) == 0); } +#endif // !WIN32 +} // namespace test +} // namespace mp diff --git a/src/ipc/process.cpp b/src/ipc/process.cpp index 6c9ec216..7cbe2064 100644 --- a/src/ipc/process.cpp +++ b/src/ipc/process.cpp @@ -18,11 +18,17 @@ #include #include #include +#include +#include + +#ifdef WIN32 +#include +#else #include #include #include -#include -#include +#define closesocket close +#endif using util::RemovePrefixView; @@ -31,17 +37,17 @@ namespace { class ProcessImpl : public Process { public: - int spawn(const std::string& new_exe_name, const fs::path& argv0_path, int& pid) override + std::tuple spawn(const std::string& new_exe_name, const fs::path& argv0_path) override { - return mp::SpawnProcess(pid, [&](int fd) { + return mp::SpawnProcess([&](mp::ConnectInfo info) { fs::path path = argv0_path; path.remove_filename(); path /= fs::PathFromString(new_exe_name); - return std::vector{fs::PathToString(path), "-ipcfd", strprintf("%i", fd)}; + return std::vector{fs::PathToString(path), "-ipcfd", std::move(info)}; }); } - int waitSpawned(int pid) override { return mp::WaitProcess(pid); } - bool checkSpawned(int argc, char* argv[], int& fd) override + int waitSpawned(mp::ProcessId pid) override { return mp::WaitProcess(pid); } + bool checkSpawned(int argc, char* argv[], mp::SocketId& socket) override { // If this process was not started with a single -ipcfd argument, it is // not a process spawned by the spawn() call above, so return false and @@ -55,17 +61,17 @@ class ProcessImpl : public Process // in combination with other arguments because the parent process // should be able to control the child process through the IPC protocol // without passing information out of band. - const auto maybe_fd{ToIntegral(argv[2])}; - if (!maybe_fd) { - throw std::runtime_error(strprintf("Invalid -ipcfd number '%s'", argv[2])); + try { + socket = mp::StartSpawned(argv[2]); + } catch (const std::exception& e) { + throw std::runtime_error(strprintf("Invalid -ipcfd number '%s' (%s)", argv[2], e.what())); } - fd = *maybe_fd; return true; } - int connect(const fs::path& data_dir, + mp::SocketId connect(const fs::path& data_dir, const std::string& dest_exe_name, std::string& address) override; - int bind(const fs::path& data_dir, const std::string& exe_name, std::string& address) override; + mp::SocketId bind(const fs::path& data_dir, const std::string& exe_name, std::string& address) override; }; static bool ParseAddress(std::string& address, @@ -97,7 +103,7 @@ static bool ParseAddress(std::string& address, return false; } -int ProcessImpl::connect(const fs::path& data_dir, +mp::SocketId ProcessImpl::connect(const fs::path& data_dir, const std::string& dest_exe_name, std::string& address) { @@ -107,21 +113,21 @@ int ProcessImpl::connect(const fs::path& data_dir, throw std::invalid_argument(error); } - int fd; - if ((fd = ::socket(addr.sun_family, SOCK_STREAM, 0)) == -1) { + mp::SocketId fd; + if ((fd = ::socket(addr.sun_family, SOCK_STREAM, 0)) == mp::SocketError) { throw std::system_error(errno, std::system_category()); } if (::connect(fd, (struct sockaddr*)&addr, sizeof(addr)) == 0) { return fd; } int connect_error = errno; - if (::close(fd) != 0) { + if (::closesocket(fd) != 0) { LogPrintf("Error closing file descriptor %i '%s': %s\n", fd, address, SysErrorString(errno)); } throw std::system_error(connect_error, std::system_category()); } -int ProcessImpl::bind(const fs::path& data_dir, const std::string& exe_name, std::string& address) +mp::SocketId ProcessImpl::bind(const fs::path& data_dir, const std::string& exe_name, std::string& address) { struct sockaddr_un addr; std::string error; @@ -132,13 +138,18 @@ int ProcessImpl::bind(const fs::path& data_dir, const std::string& exe_name, std if (addr.sun_family == AF_UNIX) { fs::path path = addr.sun_path; if (path.has_parent_path()) fs::create_directories(path.parent_path()); - if (fs::symlink_status(path).type() == fs::file_type::socket) { + if (fs::symlink_status(path).type() == fs::file_type::socket +#ifdef WIN32 + // On windows, sockets show up as regular files with size 0 + || (fs::is_regular_file(path) && fs::file_size(path) == 0) +#endif + ) { fs::remove(path); } } - int fd; - if ((fd = ::socket(addr.sun_family, SOCK_STREAM, 0)) == -1) { + mp::SocketId fd; + if ((fd = ::socket(addr.sun_family, SOCK_STREAM, 0)) == mp::SocketError) { throw std::system_error(errno, std::system_category()); } @@ -146,7 +157,7 @@ int ProcessImpl::bind(const fs::path& data_dir, const std::string& exe_name, std return fd; } int bind_error = errno; - if (::close(fd) != 0) { + if (::closesocket(fd) != 0) { LogPrintf("Error closing file descriptor %i: %s\n", fd, SysErrorString(errno)); } throw std::system_error(bind_error, std::system_category()); diff --git a/src/ipc/process.h b/src/ipc/process.h index 2ed8b73f..f42aeec7 100644 --- a/src/ipc/process.h +++ b/src/ipc/process.h @@ -8,6 +8,7 @@ #include #include +#include #include namespace ipc { @@ -15,9 +16,6 @@ class Protocol; //! IPC process interface for spawning bitcoin processes and serving requests //! in processes that have been spawned. -//! -//! There will be different implementations of this interface depending on the -//! platform (e.g. unix, windows). class Process { public: @@ -25,23 +23,23 @@ class Process //! Spawn process and return socket file descriptor for communicating with //! it. - virtual int spawn(const std::string& new_exe_name, const fs::path& argv0_path, int& pid) = 0; + virtual std::tuple spawn(const std::string& new_exe_name, const fs::path& argv0_path) = 0; //! Wait for spawned process to exit and return its exit code. - virtual int waitSpawned(int pid) = 0; + virtual int waitSpawned(mp::ProcessId pid) = 0; //! Parse command line and determine if current process is a spawned child //! process. If so, return true and a file descriptor for communicating //! with the parent process. - virtual bool checkSpawned(int argc, char* argv[], int& fd) = 0; + virtual bool checkSpawned(int argc, char* argv[], mp::SocketId& socket) = 0; //! Canonicalize and connect to address, returning socket descriptor. - virtual int connect(const fs::path& data_dir, + virtual mp::SocketId connect(const fs::path& data_dir, const std::string& dest_exe_name, std::string& address) = 0; //! Create listening socket, bind and canonicalize address, and return socket descriptor. - virtual int bind(const fs::path& data_dir, + virtual mp::SocketId bind(const fs::path& data_dir, const std::string& exe_name, std::string& address) = 0; }; diff --git a/src/ipc/protocol.h b/src/ipc/protocol.h index 335ffddc..49263b5f 100644 --- a/src/ipc/protocol.h +++ b/src/ipc/protocol.h @@ -6,6 +6,7 @@ #define BITCOIN_IPC_PROTOCOL_H #include +#include #include #include @@ -32,31 +33,31 @@ class Protocol //! up its own state (calling ProxyServer destructors, etc) on disconnect, //! and any client calls will just throw ipc::Exception errors after a //! disconnect. - virtual std::unique_ptr connect(int fd, const char* exe_name) = 0; + virtual std::unique_ptr connect(mp::Stream stream) = 0; //! Listen for connections on provided socket descriptor, accept them, and //! handle requests on accepted connections. This method doesn't block, and //! performs I/O on a background thread. - virtual void listen(int listen_fd, const char* exe_name, interfaces::Init& init) = 0; + virtual void listen(mp::SocketId listen_fd, interfaces::Init& init) = 0; - //! Handle requests on provided socket descriptor, forwarding them to the - //! provided Init interface. Socket communication is handled on the - //! current thread, and this call blocks until the socket is closed. + //! Handle requests from a stream provided by the make_stream callback, + //! forwarding them to the provided Init interface. Socket communication is + //! handled on the current thread, and this call blocks until the socket is + //! closed. A callback is used to specify the stream because this method + //! initializes the event loop and it may not be possible to create the + //! stream before the event loop is initialized. //! - //! @note: If this method is called, it needs be called before connect() or - //! listen() methods, because for ease of implementation it's inflexible and - //! always runs the event loop in the foreground thread. It can share its - //! event loop with the other methods but can't share an event loop that was - //! created by them. This isn't really a problem because serve() is only - //! called by spawned child processes that call it immediately to + //! @note: If this method is called, it needs to be called before connect() + //! or listen() methods, because for ease of implementation this method is + //! inflexible and always runs the event loop in the foreground thread. It + //! can share its event loop with the other methods but can't share an event + //! loop that was created by them. This isn't a problem because serve() is + //! only called by spawned child processes that call it immediately to //! communicate back with parent processes. - // - //! The optional `ready_fn` callback will be called after the event loop is - //! created but before it is started. This can be useful in tests to trigger - //! client connections from another thread as soon as the event loop is - //! available, but should not be necessary in normal code which starts - //! clients and servers independently. - virtual void serve(int fd, const char* exe_name, interfaces::Init& init, const std::function& ready_fn = {}) = 0; + virtual void serve(interfaces::Init& init, const std::function& make_stream) = 0; + + //! Make stream object from socket id. + virtual mp::Stream makeStream(mp::SocketId socket) = 0; //! Disconnect any incoming connections that are still connected. virtual void disconnectIncoming() = 0; diff --git a/src/ipc/util.h b/src/ipc/util.h new file mode 100644 index 00000000..7b823495 --- /dev/null +++ b/src/ipc/util.h @@ -0,0 +1,57 @@ +// Copyright (c) The Bitcoin Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#ifndef BITCOIN_IPC_UTIL_H +#define BITCOIN_IPC_UTIL_H + +#include +#include + +#include +#include +#include +#include +#include +#include +#if MP_MAJOR_VERSION < 12 && !defined(WIN32) +#include +#endif + +namespace mp { +// Definitions that can be deleted when libmultiprocess subtree is updated to +// v12. Having these allows Bitcoin Core changes to be decoupled from +// libmultiprocess changes so they don't have to be reviewed in a single PR. +#if MP_MAJOR_VERSION < 12 +using ProcessId = int; +using SocketId = int; +constexpr SocketId SocketError{-1}; + +inline std::array SocketPair() +{ + int pair[2]; + KJ_SYSCALL(socketpair(AF_UNIX, SOCK_STREAM, 0, pair)); + return {pair[0], pair[1]}; +} + +using Stream = SocketId; + +using ConnectInfo = std::string; +inline SocketId StartSpawned(const ConnectInfo& connect_info) +{ + auto socket = ToIntegral(connect_info); + if (!socket) throw std::invalid_argument(strprintf("Invalid socket descriptor '%s'", connect_info)); + return *socket; +} + +using ConnectInfoToArgsFn = std::function(const ConnectInfo&)>; +inline std::tuple SpawnProcess(ConnectInfoToArgsFn&& connect_info_to_args) +{ + ProcessId pid; + SocketId socket = SpawnProcess(pid, [&](int fd) { return connect_info_to_args(strprintf("%d", fd)); }); + return {pid, socket}; +} +#endif +} // namespace mp + +#endif // BITCOIN_IPC_UTIL_H diff --git a/src/sv2/template_provider.cpp b/src/sv2/template_provider.cpp index a445bd46..5c6d6da4 100644 --- a/src/sv2/template_provider.cpp +++ b/src/sv2/template_provider.cpp @@ -128,6 +128,10 @@ void Sv2TemplateProvider::Interrupt() { AssertLockNotHeld(m_tp_mutex); + // Flip the atomic first so the handler thread exits its main loop as soon + // as the current waitNext() returns, before we issue any IPC calls below. + m_flag_interrupt_sv2 = true; + LogPrintLevel(BCLog::SV2, BCLog::Level::Trace, "Interrupt pending mining waits..."); { LOCK(m_tp_mutex); @@ -136,7 +140,6 @@ void Sv2TemplateProvider::Interrupt() } } - m_flag_interrupt_sv2 = true; m_mining.interrupt(); // Also interrupt network threads so client handlers can wind down quickly. if (m_connman) m_connman->Interrupt(); @@ -218,10 +221,12 @@ void Sv2TemplateProvider::ThreadSv2Handler() if (client_threads.contains(client.m_id)) return; - client_threads.emplace(client.m_id, + const size_t client_id{client.m_id}; + + client_threads.emplace(client_id, std::thread(&util::TraceThread, - strprintf("sv2-%zu", client.m_id), - [this, &client] { ThreadSv2ClientHandler(client.m_id); })); + strprintf("sv2-%zu", client_id), + [this, client_id] { ThreadSv2ClientHandler(client_id); })); }); // Take a break (handling new connections is not urgent) diff --git a/src/sv2/template_provider.h b/src/sv2/template_provider.h index 5b53bcb6..1033494d 100644 --- a/src/sv2/template_provider.h +++ b/src/sv2/template_provider.h @@ -158,6 +158,13 @@ class Sv2TemplateProvider : public Sv2EventsInterface */ void ProcessSv2Message(const node::Sv2NetMsg& sv2_header, Sv2Client& client) EXCLUSIVE_LOCKS_REQUIRED(!m_tp_mutex); + /** + * Set the interrupt flag without performing any IPC. Useful for tests that + * need to wind the handler thread down before the IPC event loop is free to + * service further calls (see TPTester teardown). + */ + void RequestInterrupt() noexcept { m_flag_interrupt_sv2 = true; } + // Only used for tests XOnlyPubKey m_authority_pubkey; diff --git a/src/test/main.cpp b/src/test/main.cpp index 5d149bd7..5773416b 100644 --- a/src/test/main.cpp +++ b/src/test/main.cpp @@ -17,6 +17,29 @@ const TranslateFn G_TRANSLATION_FUN{nullptr}; #include #include +#ifdef WIN32 +#include +#include +#include + +// Some tests intentionally leak a TPTester on Windows because libmultiprocess +// teardown deadlocks during thread-local cleanup (libmultiprocess#231, +// bitcoin#32387). Those leaked threads can also fault during static destruction +// at process exit. Once Boost.Test has reported its results, bypass static +// destructors entirely so the process exits cleanly with the right code. +struct WinExitFixture { + ~WinExitFixture() + { + std::fflush(stdout); + std::fflush(stderr); + const auto& results = boost::unit_test::results_collector.results( + boost::unit_test::framework::master_test_suite().p_id); + _exit(results.passed() ? 0 : 1); + } +}; +BOOST_GLOBAL_FIXTURE(WinExitFixture); +#endif + /** Redirect debug log to unit_test.log files */ std::function G_TEST_LOG_FUN = [](const std::string& s) { static const bool should_log{std::any_of( diff --git a/src/test/sv2_mock_mining.cpp b/src/test/sv2_mock_mining.cpp index a34feba3..c22447fb 100644 --- a/src/test/sv2_mock_mining.cpp +++ b/src/test/sv2_mock_mining.cpp @@ -104,9 +104,12 @@ std::unique_ptr MockBlockTemplate::waitNext(node::Blo } } - void MockBlockTemplate::interruptWait() +void MockBlockTemplate::interruptWait() { - LogPrintLevel(BCLog::SV2, BCLog::Level::Trace, "mock interruptWait()"); + LogPrintLevel(BCLog::SV2, BCLog::Level::Trace, "mock interruptWait()"); + LOCK(state->m); + state->shutdown = true; + state->cv.notify_all(); } MockMining::MockMining(std::shared_ptr st) : state(std::move(st)) {} @@ -120,7 +123,13 @@ std::unique_ptr MockMining::createNewBlock(const node uint64_t seq = ++state->chain.template_seq; return std::make_unique(state, state->chain.prev_hash, state->txs, seq); } -void MockMining::interrupt() { LogPrintLevel(BCLog::SV2, BCLog::Level::Trace, "mock interrupt()"); } +void MockMining::interrupt() +{ + LogPrintLevel(BCLog::SV2, BCLog::Level::Trace, "mock interrupt()"); + LOCK(state->m); + state->shutdown = true; + state->cv.notify_all(); +} bool MockMining::checkBlock(const CBlock&, const node::BlockCheckOptions&, std::string&, std::string&) { return true; } uint64_t MockMining::GetTemplateSeq() diff --git a/src/test/sv2_template_provider_tests.cpp b/src/test/sv2_template_provider_tests.cpp index 29c039c3..ad3dfae5 100644 --- a/src/test/sv2_template_provider_tests.cpp +++ b/src/test/sv2_template_provider_tests.cpp @@ -40,7 +40,8 @@ BOOST_AUTO_TEST_CASE(block_reserved_weight_floor) BOOST_AUTO_TEST_CASE(client_tests) { - TPTester tester{}; + TPTesterHandle tester_handle{}; + TPTester& tester = *tester_handle; tester.handshake(); @@ -218,7 +219,8 @@ BOOST_AUTO_TEST_CASE(fee_timer_blocking_test) Sv2TemplateProviderOptions opts; opts.is_test = false; opts.template_interval = std::chrono::seconds{2}; - TPTester tester{opts}; + TPTesterHandle tester_handle{opts}; + TPTester& tester = *tester_handle; tester.handshake(); tester.SendSetupConnection(); @@ -261,7 +263,8 @@ BOOST_AUTO_TEST_CASE(new_tip_bypasses_fee_timer_test) Sv2TemplateProviderOptions opts; opts.is_test = false; opts.template_interval = std::chrono::seconds{10}; - TPTester tester{opts}; + TPTesterHandle tester_handle{opts}; + TPTester& tester = *tester_handle; tester.handshake(); tester.SendSetupConnection(); diff --git a/src/test/sv2_tester_lifecycle_tests.cpp b/src/test/sv2_tester_lifecycle_tests.cpp index 3c83c850..8647de21 100644 --- a/src/test/sv2_tester_lifecycle_tests.cpp +++ b/src/test/sv2_tester_lifecycle_tests.cpp @@ -15,6 +15,7 @@ */ BOOST_FIXTURE_TEST_SUITE(sv2_tester_lifecycle_tests, Sv2BasicTestingSetup) +#ifndef WIN32 BOOST_AUTO_TEST_CASE(tp_tester_repeated_construction) { // Run a few iterations; keep count modest to stay fast in CI while @@ -49,5 +50,12 @@ BOOST_AUTO_TEST_CASE(tp_tester_repeated_construction) // test hangs or use-after-frees under sanitizers / valgrind. } } +#else +// TODO: Re-enable on Windows once the libmultiprocess shutdown hang is fixed +// upstream. Tearing down the IPC EventLoop / per-thread state at process +// exit deadlocks std::thread::join on mingw winpthreads. Tracked in +// libmultiprocess#231 (rewrites the EventLoop wakeup primitive and adds +// shutdownWrite() in ~Connection) and bitcoin#32387. +#endif BOOST_AUTO_TEST_SUITE_END() diff --git a/src/test/sv2_tp_tester.cpp b/src/test/sv2_tp_tester.cpp index 777bdf74..9ed2fc5f 100644 --- a/src/test/sv2_tp_tester.cpp +++ b/src/test/sv2_tp_tester.cpp @@ -7,6 +7,7 @@ #include #include #include +#include #include #include #include @@ -22,10 +23,7 @@ extern std::function G_TEST_LOG_FUN; #include #include -#include -#include -namespace { struct MockInit : public interfaces::Init { std::shared_ptr state; explicit MockInit(std::shared_ptr s) : state(std::move(s)) {} @@ -34,7 +32,6 @@ struct MockInit : public interfaces::Init { return std::make_unique(state); } }; -} // namespace TPTester::TPTester() : TPTester(Sv2TemplateProviderOptions{.is_test = true}) {} @@ -55,21 +52,28 @@ TPTester::TPTester(Sv2TemplateProviderOptions opts) loop_ready.get_future().wait(); // Create socketpair for in-process IPC stream - int fds[2]; - int rc = ::socketpair(AF_UNIX, SOCK_STREAM, 0, fds); - BOOST_REQUIRE_EQUAL(rc, 0); - m_ipc_fds[0] = fds[0]; - m_ipc_fds[1] = fds[1]; + m_ipc_fds = mp::SocketPair(); // Create server Init exposing MockMining via shared state m_server_init = std::make_unique(m_state); + MockInit& server_init = *m_server_init; // Register server side on the event loop thread m_loop->sync([&] { - mp::ServeStream(*m_loop, m_ipc_fds[0], *static_cast(m_server_init.get())); + mp::Stream server_stream{m_loop->m_io_context.lowLevelProvider->wrapSocketFd(m_ipc_fds[0], kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP)}; + m_server_connection = std::make_unique( + *m_loop, + std::move(server_stream), + [&server_init](mp::Connection& connection) { + auto server_proxy = kj::heap>( + std::shared_ptr(&server_init, [](MockInit*) {}), connection); + return capnp::Capability::Client(kj::mv(server_proxy)); + }); + m_server_connection->onDisconnect([this] { m_server_connection.reset(); }); }); // Connect client side and fetch Mining proxy - m_client_init = mp::ConnectStream(*m_loop, m_ipc_fds[1]); + mp::Stream client_stream{m_loop->m_io_context.lowLevelProvider->wrapSocketFd(m_ipc_fds[1], kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP)}; + m_client_init = mp::ConnectStream(*m_loop, std::move(client_stream)); BOOST_REQUIRE(m_client_init != nullptr); m_mining_proxy = m_client_init->makeMining(); BOOST_REQUIRE(m_mining_proxy != nullptr); @@ -88,22 +92,52 @@ TPTester::TPTester(Sv2TemplateProviderOptions opts) TPTester::~TPTester() { + // Set the interrupt flag directly (atomic, no IPC needed) so the + // client thread exits its main loop after the current waitNext() + // returns. This MUST happen before Shutdown() below, otherwise + // the client thread spins in a tight IPC waitNext() loop that + // starves the destructor's own IPC calls on Windows. + if (m_tp) { + m_tp->RequestInterrupt(); + } + + // Signal the mock state directly (bypasses IPC) so that any + // MockBlockTemplate::waitNext() blocked on the event-loop thread + // returns immediately. + m_mining_control->Shutdown(); + + // Now that the flag is set and waitNext() will return, the client + // thread will see m_flag_interrupt_sv2 and exit. Connman interrupt + // + stop ensures the socket handler thread also winds down. + if (m_tp) { + m_tp->Interrupt(); + m_tp->StopThreads(); + } + // Hold a loop ref while tearing down dependent objects to keep loop alive. if (m_loop) { mp::EventLoopRef loop_ref{*m_loop}; - // Destroy objects that may post work to the loop while the loop is guaranteed alive. m_tp.reset(); m_mining_proxy.reset(); m_client_init.reset(); - // Server init can go after clients; it only owns exported capabilities. + + // Pump the loop once so pending release/disconnect messages are + // processed, then explicitly tear down the server-side Connection. + // This avoids depending on Windows socketpair disconnect behavior to + // release the final EventLoopRef and let the loop thread exit. + m_loop->sync([] {}); + if (m_server_connection) { + m_loop->sync([this] { m_server_connection.reset(); }); + } + m_server_init.reset(); } else { + m_server_connection.reset(); m_tp.reset(); m_mining_proxy.reset(); m_client_init.reset(); m_server_init.reset(); } - // Join loop thread (loop exits automatically when refs & connections reach zero). if (m_loop_thread.joinable()) m_loop_thread.join(); } diff --git a/src/test/sv2_tp_tester.h b/src/test/sv2_tp_tester.h index 5082ff99..78a777d9 100644 --- a/src/test/sv2_tp_tester.h +++ b/src/test/sv2_tp_tester.h @@ -10,16 +10,20 @@ #include #include +#include #include +#include #include // Forward declarations class Sv2Transport; namespace mp { class EventLoop; } +namespace mp { class Connection; } namespace interfaces { class Init; class Mining; } struct MockState; class MockMining; +struct MockInit; class TPTester { private: @@ -30,9 +34,10 @@ class TPTester { // IPC loopback components std::thread m_loop_thread; mp::EventLoop* m_loop{nullptr}; - std::unique_ptr m_server_init; + std::unique_ptr m_server_connection; + std::unique_ptr m_server_init; std::unique_ptr m_client_init; - int m_ipc_fds[2]{-1, -1}; + std::array m_ipc_fds{mp::SocketError, mp::SocketError}; public: std::unique_ptr m_tp; //!< Sv2TemplateProvider being tested @@ -80,4 +85,34 @@ class TPTester { 1; // merkle_path count (CompactSize(0)) }; +/** + * RAII handle around a TPTester. On non-Windows platforms it owns the tester + * by value and tears it down at scope exit. On Windows it heap-allocates and + * intentionally leaks the tester: tearing down the IPC EventLoop / per-thread + * state at process exit deadlocks std::thread::join during libmultiprocess + * thread-local cleanup. See https://github.com/bitcoin-core/libmultiprocess/pull/231 + * and https://github.com/bitcoin/bitcoin/pull/32387. The OS reclaims the + * remaining loop thread and IPC state at process exit. This only disables + * test cleanup, not the tests themselves. + */ +class TPTesterHandle { +public: + TPTesterHandle() : TPTesterHandle(Sv2TemplateProviderOptions{.is_test = true}) {} + explicit TPTesterHandle(Sv2TemplateProviderOptions opts) +#ifdef WIN32 + : m_tester(*new TPTester(opts)) {} +#else + : m_owned(opts), m_tester(m_owned) {} +#endif + + TPTester* operator->() noexcept { return &m_tester; } + TPTester& operator*() noexcept { return m_tester; } + +private: +#ifndef WIN32 + TPTester m_owned; +#endif + TPTester& m_tester; +}; + #endif // BITCOIN_TEST_SV2_TP_TESTER_H diff --git a/test/lint/lint-includes.py b/test/lint/lint-includes.py index 0d15ac54..953f7d58 100755 --- a/test/lint/lint-includes.py +++ b/test/lint/lint-includes.py @@ -22,6 +22,7 @@ EXPECTED_BOOST_INCLUDES = [ "boost/test/included/unit_test.hpp", + "boost/test/results_collector.hpp", "boost/test/unit_test.hpp", ]