From 6807c7097b3809f7a15206390c51f7e82de8c61a Mon Sep 17 00:00:00 2001 From: kasemir Date: Wed, 11 Jun 2025 10:58:02 -0400 Subject: [PATCH 01/15] PVA: Start version 3, encode/decode search flag `reply src port` --- core/pva/TLS.md | 2 +- .../epics/pva/client/ClientUDPHandler.java | 18 ++++----- .../java/org/epics/pva/common/PVAHeader.java | 10 +++-- .../org/epics/pva/common/SearchRequest.java | 38 +++++++++++++++---- 4 files changed, 48 insertions(+), 20 deletions(-) diff --git a/core/pva/TLS.md b/core/pva/TLS.md index 20ac1b1f77..474ad14cb4 100644 --- a/core/pva/TLS.md +++ b/core/pva/TLS.md @@ -39,7 +39,7 @@ This is an example recipe for getting started. Note its "Certificate identifier": ``` - $ authnstd --name ioc --cert-usage hybrid + $ authnstd --name ioc --cert-usage ioc Keychain file created : /home/user/.config/pva/1.3/server.p12 Certificate identifier : e53ed409:15273288300286014953 ``` diff --git a/core/pva/src/main/java/org/epics/pva/client/ClientUDPHandler.java b/core/pva/src/main/java/org/epics/pva/client/ClientUDPHandler.java index e5180128d1..4f9dfff518 100644 --- a/core/pva/src/main/java/org/epics/pva/client/ClientUDPHandler.java +++ b/core/pva/src/main/java/org/epics/pva/client/ClientUDPHandler.java @@ -87,7 +87,6 @@ public interface SearchResponseHandler // with the understanding that it will only receive broadcasts; // since they are often blocked by firewall, may receive nothing, ever. private final DatagramChannel udp_beacon; - private final ByteBuffer beacon_buffer = ByteBuffer.allocate(PVASettings.MAX_UDP_PACKET); private volatile Thread search_thread4, search_thread6, beacon_thread; @@ -107,13 +106,15 @@ public ClientUDPHandler(final BeaconHandler beacon_handler, // IPv6 sockets // Beacon socket only receives, does not send broadcasts - if (PVASettings.EPICS_PVA_ENABLE_IPV6) { + if (PVASettings.EPICS_PVA_ENABLE_IPV6) + { udp_search6 = Network.createUDP(StandardProtocolFamily.INET6, null, 0); udp_localaddr6 = (InetSocketAddress) udp_search6.getLocalAddress(); ipV6Msg = String.format(" and %s", udp_localaddr6); udp_beacon = Network.createUDP(StandardProtocolFamily.INET6, null, PVASettings.EPICS_PVA_BROADCAST_PORT); } - else { + else + { udp_search6 = null; udp_beacon = Network.createUDP(StandardProtocolFamily.INET, null, PVASettings.EPICS_PVA_BROADCAST_PORT); udp_localaddr6 = null; @@ -150,11 +151,8 @@ public void send(final ByteBuffer buffer, final AddressInfo info) throws Excepti } else { - if (!PVASettings.EPICS_PVA_ENABLE_IPV6) { - throw new Exception( - "EPICS_PVA_ENABLE_IPV6 must be enabled to use IPv6 address!" - ); - } + if (!PVASettings.EPICS_PVA_ENABLE_IPV6) + throw new Exception("EPICS_PVA_ENABLE_IPV6 must be enabled to use IPv6 address!"); synchronized (udp_search6) { @@ -177,13 +175,15 @@ public void start() search_thread4.setDaemon(true); search_thread4.start(); - if (PVASettings.EPICS_PVA_ENABLE_IPV6) { + if (PVASettings.EPICS_PVA_ENABLE_IPV6) + { final ByteBuffer receive_buffer6 = ByteBuffer.allocate(PVASettings.MAX_UDP_PACKET); search_thread6 = new Thread(() -> listen(udp_search6, receive_buffer6), "UDP6-receiver " + Network.getLocalAddress(udp_search6)); search_thread6.setDaemon(true); search_thread6.start(); } + final ByteBuffer beacon_buffer = ByteBuffer.allocate(PVASettings.MAX_UDP_PACKET); beacon_thread = new Thread(() -> listen(udp_beacon, beacon_buffer), "UDP-beacon-receiver " + Network.getLocalAddress(udp_beacon)); beacon_thread.setDaemon(true); beacon_thread.start(); diff --git a/core/pva/src/main/java/org/epics/pva/common/PVAHeader.java b/core/pva/src/main/java/org/epics/pva/common/PVAHeader.java index b6f247e0cf..5f22fd04fe 100644 --- a/core/pva/src/main/java/org/epics/pva/common/PVAHeader.java +++ b/core/pva/src/main/java/org/epics/pva/common/PVAHeader.java @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright (c) 2019-2022 Oak Ridge National Laboratory. + * Copyright (c) 2019-2025 Oak Ridge National Laboratory. * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * which accompanies this distribution, and is available at @@ -26,8 +26,12 @@ public class PVAHeader /** PVA protocol magic */ public static final byte PVA_MAGIC = (byte)0xCA; - /** PVA protocol revision (implemented by this library) */ - public static final byte PVA_PROTOCOL_REVISION = 2; + /** PVA protocol revision (implemented by this library) + * + *
v2: Server's Echo reply includes the request payload + *
v3: SearchRequest FLAG_REPLY_SRC_PORT + */ + public static final byte PVA_PROTOCOL_REVISION = 3; /** Oldest PVA protocol revision handled by this library */ public static final byte REQUIRED_PVA_PROTOCOL_REVISION = 1; diff --git a/core/pva/src/main/java/org/epics/pva/common/SearchRequest.java b/core/pva/src/main/java/org/epics/pva/common/SearchRequest.java index f95019b54b..9d15fbbef8 100644 --- a/core/pva/src/main/java/org/epics/pva/common/SearchRequest.java +++ b/core/pva/src/main/java/org/epics/pva/common/SearchRequest.java @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright (c) 2019-2023 Oak Ridge National Laboratory. + * Copyright (c) 2019-2025 Oak Ridge National Laboratory. * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * which accompanies this distribution, and is available at @@ -63,12 +63,29 @@ public String toString() } }; + /** Server should reply with its GUID and empty CID list + * even if it does not host any of the searched channels + */ + public static final byte FLAG_SEARCH_MUST_REPLY = 0x01; + + /** Client should ignore the 'port' in the reply and + * simply use the port of the 'source', that is the peer port + * of the UDP message or TCP connection + * @since Version 3 + */ + public static final byte FLAG_REPLY_SRC_PORT = 0x02; + + /** Indicates that search message was unicast */ + public static final byte FLAG_SEARCH_UNICAST = (byte)0x80; + /** Sequence number */ public int seq; /** Is it a unicast? */ public boolean unicast; /** Is reply required? */ public boolean reply_required; + /** Reply to source port instead of port listed in the search request? */ + public boolean reply_to_src_port; /** Address of client */ public InetSocketAddress client; /** Use TLS, or plain TCP? */ @@ -106,10 +123,10 @@ public static SearchRequest decode(final InetSocketAddress from, final byte vers // Search Sequence ID search.seq = buffer.getInt(); - // 0-bit for replyRequired, 7-th bit for "sent as unicast" (1)/"sent as broadcast/multicast" (0) final byte flags = buffer.get(); - search.unicast = (flags & 0x80) == 0x80; - search.reply_required = (flags & 0x01) == 0x01; + search.unicast = (flags & FLAG_SEARCH_UNICAST) == FLAG_SEARCH_UNICAST; + search.reply_required = (flags & FLAG_SEARCH_MUST_REPLY) == FLAG_SEARCH_MUST_REPLY; + search.reply_to_src_port = (flags & FLAG_REPLY_SRC_PORT) == FLAG_REPLY_SRC_PORT; // reserved buffer.get(); @@ -127,7 +144,13 @@ public static SearchRequest decode(final InetSocketAddress from, final byte vers logger.log(Level.WARNING, "PVA Client " + from + " sent search #" + search.seq + " with invalid address"); return null; } - final int port = Short.toUnsignedInt(buffer.getShort()); + int port = Short.toUnsignedInt(buffer.getShort()); + // Since version 3, flag can ask us to ignore the reply port in the message + // and instead use the peer's port. + // This should help with NAT where we get the message from an intermediate + // and need to reply via that same intermediate + if (version >= 3 && search.reply_to_src_port) + port = from.getPort(); // Use address from message unless it's a generic local address if (addr.isAnyLocalAddress() || port <= 0) @@ -201,8 +224,9 @@ public static void encode(final boolean unicast, final int seq, final Collection // only the one started last will see the unicast. // Mark search message as unicast so that receiver will forward // it via local broadcast to other local listeners. - // 0-bit for replyRequired, 7-th bit for "sent as unicast" (1)/"sent as broadcast/multicast" (0) - buffer.put((byte) ((unicast ? 0x80 : 0x00) | (channels == null ? 0x01 : 0x00))); + buffer.put((byte) ((unicast ? FLAG_SEARCH_UNICAST : 0x00) | + ((channels == null || channels.isEmpty()) ? FLAG_SEARCH_MUST_REPLY : 0x00) | + FLAG_REPLY_SRC_PORT)); // reserved buffer.put((byte) 0); From b275a50b64dc205ceb7566aff122dca0331a0cb9 Mon Sep 17 00:00:00 2001 From: kasemir Date: Thu, 12 Jun 2025 11:31:53 -0400 Subject: [PATCH 02/15] MulticastDemo: -p option, use 224.0.0.128 as done by PVA --- .../org/epics/pva/common/MulticastDemo.java | 39 +++++++++++++++---- 1 file changed, 31 insertions(+), 8 deletions(-) diff --git a/core/pva/src/test/java/org/epics/pva/common/MulticastDemo.java b/core/pva/src/test/java/org/epics/pva/common/MulticastDemo.java index 6f049a8b3e..3b6113f260 100644 --- a/core/pva/src/test/java/org/epics/pva/common/MulticastDemo.java +++ b/core/pva/src/test/java/org/epics/pva/common/MulticastDemo.java @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright (c) 2021 Oak Ridge National Laboratory. + * Copyright (c) 2021-2025 Oak Ridge National Laboratory. * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * which accompanies this distribution, and is available at @@ -64,9 +64,11 @@ @SuppressWarnings("nls") public class MulticastDemo { - private static final String MCAST_GROUP4 = "224.0.0.129"; + // Same as defaults in PVASettings.EPICS_PVAS_INTF_ADDR_LIST + private static final String MCAST_GROUP4 = "224.0.0.128"; private static final String MCAST_GROUP6 = "ff02::42:1"; - private static final int PORT = 9876; + // Non-default port + private static int PORT = 9876; private static DatagramChannel createUDPChannel(final ProtocolFamily family, final int port) throws Exception { @@ -101,7 +103,7 @@ private static void receive(final NetworkInterface iface) final byte[] data = new byte[buf.limit()]; buf.get(data); final String received = new String(data); - System.out.println("Received: " + received + " from " + client); + System.out.println("Received: '" + received + "' from " + client); if ("end".equals(received)) --awaiting; } @@ -117,7 +119,7 @@ private static void receive(final NetworkInterface iface) private static void send(final DatagramChannel udp, final InetAddress target, final String text) throws Exception { final InetSocketAddress addr = new InetSocketAddress(target, PORT); - System.out.println("Sending to " + addr); + System.out.println("Sending '" + text + "' to " + addr); final ByteBuffer buf = ByteBuffer.allocate(500); buf.put(text.getBytes()); buf.flip(); @@ -129,22 +131,43 @@ public static void main(String[] args) throws Exception NetworkInterface iface = null; boolean do_send = false, do_receive = false; boolean help = false; - for (String arg : args) + for (int i=0; i Date: Thu, 12 Jun 2025 11:32:39 -0400 Subject: [PATCH 03/15] BoolDemo: More logging, optional PV name prefix --- .../java/org/epics/pva/server/BoolDemo.java | 22 ++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/core/pva/src/test/java/org/epics/pva/server/BoolDemo.java b/core/pva/src/test/java/org/epics/pva/server/BoolDemo.java index c95bec41a6..0830f0b247 100644 --- a/core/pva/src/test/java/org/epics/pva/server/BoolDemo.java +++ b/core/pva/src/test/java/org/epics/pva/server/BoolDemo.java @@ -12,6 +12,7 @@ import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.LogManager; +import java.util.logging.Logger; import org.epics.pva.PVASettings; import org.epics.pva.common.TCPHandler; @@ -29,8 +30,23 @@ public static void main(String[] args) throws Exception { // Log everything LogManager.getLogManager().readConfiguration(PVASettings.class.getResourceAsStream("/pva_logging.properties")); - PVASettings.logger.setLevel(Level.FINE); + PVASettings.logger.setLevel(Level.ALL); + Logger.getLogger("jdk.event.security").setLevel(PVASettings.logger.getLevel()); + + String prefix = ""; + for (String arg : args) + { + if (arg.startsWith("-h")) + { + System.out.println("Usage: BoolDemo [-h] [Prefix]"); + System.out.println(); + System.out.println("Prefix is added to PV names"); + return; + } + else + prefix = arg; + } try ( // Create PVA Server (auto-closed) @@ -53,8 +69,8 @@ public static void main(String[] args) throws Exception // Create PVs - final ServerPV pv1 = server.createPV("bool", data); - final ServerPV pv2 = server.createPV("struct", struct, (TCPHandler tcp, ServerPV pv, BitSet changes, PVAStructure written) -> + final ServerPV pv1 = server.createPV(prefix + "bool", data); + final ServerPV pv2 = server.createPV(prefix + "struct", struct, (TCPHandler tcp, ServerPV pv, BitSet changes, PVAStructure written) -> { System.out.println("Somebody wrote this to '" + pv.getName() + "':\n" + written); pv.update(written); From 666a442168e1377dc6df49e7d4fca6d73318b5bb Mon Sep 17 00:00:00 2001 From: kasemir Date: Thu, 12 Jun 2025 11:51:01 -0400 Subject: [PATCH 04/15] PVA search forwards handling of `replySrcPort` Client emits searches with new `replySrcPort` flag. Forwarded search messages have updated client address and `replySrcPort` option is thus cleared because client address can be used without further adjustments. Both server and client default to 224.0.0.128 for local multicast. Both client and server forward to local multicast but only server needs to listen to MC. Log more SearchRequest detail. --- .../main/java/org/epics/pva/PVASettings.java | 10 ++++----- .../org/epics/pva/client/ChannelSearch.java | 6 ++--- .../epics/pva/client/ClientUDPHandler.java | 6 ++--- .../java/org/epics/pva/common/Network.java | 13 ++++------- .../org/epics/pva/common/SearchRequest.java | 22 +++++++++++++++---- .../epics/pva/server/ServerUDPHandler.java | 10 ++++++--- 6 files changed, 40 insertions(+), 27 deletions(-) diff --git a/core/pva/src/main/java/org/epics/pva/PVASettings.java b/core/pva/src/main/java/org/epics/pva/PVASettings.java index 0d60a3fa2e..749f50d6c5 100644 --- a/core/pva/src/main/java/org/epics/pva/PVASettings.java +++ b/core/pva/src/main/java/org/epics/pva/PVASettings.java @@ -122,12 +122,12 @@ public class PVASettings *

Next, multicast groups may be added. * Each multicast group must include an interface. *

-     *  224.0.1.1,1@127.0.0.1     - Listen to local IPv4 multicasts
+     *  224.0.0.128,1@127.0.0.1   - Listen to local IPv4 multicasts
      *  [ff02::42:1],1@::1        - Listen to local IPv6 multicasts
      *  [ff02::42:1],1@en1        - Listen to IPv6 multicasts on network interface en1
      *  
*/ - public static String EPICS_PVAS_INTF_ADDR_LIST = "0.0.0.0 [::] 224.0.1.1,1@127.0.0.1 [ff02::42:1],1@::1"; + public static String EPICS_PVAS_INTF_ADDR_LIST = "0.0.0.0 [::] 224.0.0.128,1@127.0.0.1 [ff02::42:1],1@::1"; /** PVA server port for name searches and beacons */ public static int EPICS_PVAS_BROADCAST_PORT = EPICS_PVA_BROADCAST_PORT; @@ -250,11 +250,11 @@ public class PVASettings - /** Whether to allow PVA to use IPv6 + /** Whether to allow PVA to use IPv6 * - *

If this is false then PVA will not attempt to + *

If this is false then PVA will not attempt to * use any IPv6 capability at all. This is useful if your - * system does not have any IPv6 support. + * system does not have any IPv6 support. */ public static boolean EPICS_PVA_ENABLE_IPV6 = true; diff --git a/core/pva/src/main/java/org/epics/pva/client/ChannelSearch.java b/core/pva/src/main/java/org/epics/pva/client/ChannelSearch.java index 28ab70d8da..c43535149a 100644 --- a/core/pva/src/main/java/org/epics/pva/client/ChannelSearch.java +++ b/core/pva/src/main/java/org/epics/pva/client/ChannelSearch.java @@ -470,7 +470,7 @@ private void search(final Collection channels) // Use 'any' reply address since reply will be via this TCP socket final InetSocketAddress response_address = new InetSocketAddress(0); - SearchRequest.encode(true, seq, channels, response_address, tls , buffer); + SearchRequest.encode(true, true, seq, channels, response_address, tls , buffer); }; tcp.submit(search_request); } @@ -505,7 +505,7 @@ private void sendSearch(final int seq, final Collection c { send_buffer.clear(); final InetSocketAddress response = udp.getResponseAddress(addr); - SearchRequest.encode(true, seq, channels, response, tls, send_buffer); + SearchRequest.encode(true, true, seq, channels, response, tls, send_buffer); send_buffer.flip(); try { @@ -523,7 +523,7 @@ private void sendSearch(final int seq, final Collection c { send_buffer.clear(); final InetSocketAddress response = udp.getResponseAddress(addr); - SearchRequest.encode(false, seq, channels, response, tls, send_buffer); + SearchRequest.encode(false, true, seq, channels, response, tls, send_buffer); send_buffer.flip(); try { diff --git a/core/pva/src/main/java/org/epics/pva/client/ClientUDPHandler.java b/core/pva/src/main/java/org/epics/pva/client/ClientUDPHandler.java index 4f9dfff518..de61ac5d7c 100644 --- a/core/pva/src/main/java/org/epics/pva/client/ClientUDPHandler.java +++ b/core/pva/src/main/java/org/epics/pva/client/ClientUDPHandler.java @@ -99,7 +99,7 @@ public ClientUDPHandler(final BeaconHandler beacon_handler, // IPv4 socket, also used to send broadcasts and for the local re-sending udp_search4 = Network.createUDP(StandardProtocolFamily.INET, null, 0); udp_search4.socket().setBroadcast(true); - local_multicast = Network.configureLocalIPv4Multicast(udp_search4, PVASettings.EPICS_PVA_BROADCAST_PORT); + local_multicast = Network.getLocalMulticastGroup(udp_search4, PVASettings.EPICS_PVA_BROADCAST_PORT); udp_localaddr4 = (InetSocketAddress) udp_search4.getLocalAddress(); String ipV6Msg; @@ -300,7 +300,7 @@ private boolean handleSearchRequest(final InetSocketAddress from, final byte ver if (search.reply_required) { forward_buffer.clear(); - SearchRequest.encode(false, 0, null, search.client, search.tls, forward_buffer); + SearchRequest.encode(false, false, 0, null, search.client, search.tls, forward_buffer); forward_buffer.flip(); logger.log(Level.FINER, () -> "Forward search to list servers to " + local_multicast + "\n" + Hexdump.toHexdump(forward_buffer)); send(forward_buffer, local_multicast); @@ -309,7 +309,7 @@ private boolean handleSearchRequest(final InetSocketAddress from, final byte ver else { forward_buffer.clear(); - SearchRequest.encode(false, search.seq, search.channels, search.client, search.tls, forward_buffer); + SearchRequest.encode(false, false, search.seq, search.channels, search.client, search.tls, forward_buffer); forward_buffer.flip(); logger.log(Level.FINER, () -> "Forward search to " + local_multicast + "\n" + Hexdump.toHexdump(forward_buffer)); send(forward_buffer, local_multicast); diff --git a/core/pva/src/main/java/org/epics/pva/common/Network.java b/core/pva/src/main/java/org/epics/pva/common/Network.java index bbed69c232..5394f393ad 100644 --- a/core/pva/src/main/java/org/epics/pva/common/Network.java +++ b/core/pva/src/main/java/org/epics/pva/common/Network.java @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright (c) 2019-2023 Oak Ridge National Laboratory. + * Copyright (c) 2019-2025 Oak Ridge National Laboratory. * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * which accompanies this distribution, and is available at @@ -16,7 +16,6 @@ import java.net.InterfaceAddress; import java.net.NetworkInterface; import java.net.StandardProtocolFamily; -import java.net.StandardSocketOptions; import java.nio.channels.DatagramChannel; import java.util.ArrayList; import java.util.Enumeration; @@ -278,16 +277,16 @@ else if (family == StandardProtocolFamily.INET) return udp; } - /** Configure IPv4 socket to receive local multicast messages + /** Get a local multicast group address for IPv4 socket * * IPv4 unicasts are re-sent as local multicast, * and this configures a socket to receive them * - * @param udp UDP channel that should listen to multicast messages + * @param udp UDP channel from which we plan to send multicast messages * @param port Port to use * @return Local multicast address, or null if no multicast support */ - public static AddressInfo configureLocalIPv4Multicast(final DatagramChannel udp, final int port) + public static AddressInfo getLocalMulticastGroup(final DatagramChannel udp, final int port) { try { @@ -301,11 +300,7 @@ public static AddressInfo configureLocalIPv4Multicast(final DatagramChannel udp, { final InetAddress group = InetAddress.getByName(PVASettings.EPICS_PVA_MULTICAST_GROUP); final InetSocketAddress local_multicast = new InetSocketAddress(group, port); - udp.join(group, loopback); - logger.log(Level.CONFIG, "Local multicast of IPv4 unicast using group " + local_multicast + " using network interface " + loopback.getDisplayName()); - udp.setOption(StandardSocketOptions.IP_MULTICAST_LOOP, true); - udp.setOption(StandardSocketOptions.IP_MULTICAST_IF, loopback); return new AddressInfo(false, local_multicast, 1, loopback); } diff --git a/core/pva/src/main/java/org/epics/pva/common/SearchRequest.java b/core/pva/src/main/java/org/epics/pva/common/SearchRequest.java index 9d15fbbef8..e43fcb3c1f 100644 --- a/core/pva/src/main/java/org/epics/pva/common/SearchRequest.java +++ b/core/pva/src/main/java/org/epics/pva/common/SearchRequest.java @@ -145,6 +145,7 @@ public static SearchRequest decode(final InetSocketAddress from, final byte vers return null; } int port = Short.toUnsignedInt(buffer.getShort()); + final InetSocketAddress orig_response_addr = new InetSocketAddress(addr, port); // Since version 3, flag can ask us to ignore the reply port in the message // and instead use the peer's port. // This should help with NAT where we get the message from an intermediate @@ -193,7 +194,13 @@ else if ("tcp".equals(protocol)) { final int cid = buffer.getInt(); final String name = PVAString.decodeString(buffer); - logger.log(Level.FINER, () -> "PVA Client " + from + " sent search #" + search.seq + " for " + name + " [cid " + cid + "]"); + logger.log(Level.FINER, () -> "PVA Client " + from + " sent search #" + search.seq + " for " + name + " [cid " + cid + "]" + + ", reply addr " + orig_response_addr + + (orig_response_addr.equals(search.client) ? "" : ", using " + search.client) + + (search.tls ? " (TLS)" : "") + + (search.unicast ? " (unicast)" : "") + + (search.reply_required ? " (reply required)" : "") + + (search.reply_to_src_port ? " (reply to source port)" : "")); search.channels.add(new Channel(cid, name)); } } @@ -202,13 +209,17 @@ else if ("tcp".equals(protocol)) } /** @param unicast Unicast? + * @param use_src_port Reply to 'peer port' of message (which will be our port) instead of port in 'address'? * @param seq Sequence number * @param channels Channels to search, null for 'list' * @param address client's address * @param tls Use TLS? * @param buffer Buffer into which to encode */ - public static void encode(final boolean unicast, final int seq, final Collection channels, final InetSocketAddress address, final boolean tls, final ByteBuffer buffer) + public static void encode(final boolean unicast, final boolean use_src_port, + final int seq, final Collection channels, + final InetSocketAddress address, final boolean tls, + final ByteBuffer buffer) { // Create with zero payload size, to be patched later PVAHeader.encodeMessageHeader(buffer, PVAHeader.FLAG_NONE, PVAHeader.CMD_SEARCH, 0); @@ -222,11 +233,14 @@ public static void encode(final boolean unicast, final int seq, final Collection // If a host has multiple listeners on the UDP search port, // only the one started last will see the unicast. - // Mark search message as unicast so that receiver will forward + // Identify unicast search message so that receiver will forward // it via local broadcast to other local listeners. + // If there are no channels, force a "list all servers" reply. + // Typically use src port unless this is a forwarded message + // where the original source port is in the 'address'. buffer.put((byte) ((unicast ? FLAG_SEARCH_UNICAST : 0x00) | ((channels == null || channels.isEmpty()) ? FLAG_SEARCH_MUST_REPLY : 0x00) | - FLAG_REPLY_SRC_PORT)); + (use_src_port ? FLAG_REPLY_SRC_PORT : 0x00))); // reserved buffer.put((byte) 0); diff --git a/core/pva/src/main/java/org/epics/pva/server/ServerUDPHandler.java b/core/pva/src/main/java/org/epics/pva/server/ServerUDPHandler.java index 08b887ce4c..39b6cfd1aa 100644 --- a/core/pva/src/main/java/org/epics/pva/server/ServerUDPHandler.java +++ b/core/pva/src/main/java/org/epics/pva/server/ServerUDPHandler.java @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright (c) 2019-2023 Oak Ridge National Laboratory. + * Copyright (c) 2019-2025 Oak Ridge National Laboratory. * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * which accompanies this distribution, and is available at @@ -13,6 +13,7 @@ import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.StandardProtocolFamily; +import java.net.StandardSocketOptions; import java.nio.ByteBuffer; import java.nio.channels.DatagramChannel; import java.util.ArrayList; @@ -89,13 +90,16 @@ public ServerUDPHandler(final PVAServer server) throws Exception } else { - // Have channel (which must already exist) join multicast group + // Have socket channel (which must already exist) join multicast group if (info.getInterface() == null) throw new Exception("EPICS_PVAS_INTF_ADDR_LIST contains multicast group without interface"); if (info.isIPv4()) { if (udp4 == null) throw new Exception("EPICS_PVAS_INTF_ADDR_LIST lacks IPv4 address, cannot add multicast"); + // Configure interface to send multicasts out via this interface + udp4.setOption(StandardSocketOptions.IP_MULTICAST_IF, info.getInterface()); + // Configure socket channel to receive from the multicast group udp4.join(info.getAddress().getAddress(), info.getInterface()); logger.log(Level.FINE, "Listening to UDP multicast " + info); local_multicast = info; @@ -235,7 +239,7 @@ private void forwardSearchRequest(final int seq, final Collection "Forward search to " + local_multicast + "\n" + Hexdump.toHexdump(send_buffer)); try From 8054a20e119eecdaab1c3508a9c5b9d2211b422e Mon Sep 17 00:00:00 2001 From: kasemir Date: Thu, 12 Jun 2025 13:47:04 -0400 Subject: [PATCH 05/15] PVA MulticastDemo -6 option --- .../src/test/java/org/epics/pva/common/MulticastDemo.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/core/pva/src/test/java/org/epics/pva/common/MulticastDemo.java b/core/pva/src/test/java/org/epics/pva/common/MulticastDemo.java index 3b6113f260..cd9efca3fa 100644 --- a/core/pva/src/test/java/org/epics/pva/common/MulticastDemo.java +++ b/core/pva/src/test/java/org/epics/pva/common/MulticastDemo.java @@ -69,6 +69,7 @@ public class MulticastDemo private static final String MCAST_GROUP6 = "ff02::42:1"; // Non-default port private static int PORT = 9876; + private static boolean use_v6 = false; private static DatagramChannel createUDPChannel(final ProtocolFamily family, final int port) throws Exception { @@ -138,6 +139,8 @@ public static void main(String[] args) throws Exception do_send = true; else if ("-r".equals(arg)) do_receive = true; + else if ("-6".equals(arg)) + use_v6 = true; else if ("-p".equals(arg)) { if (i < args.length - 1) @@ -164,6 +167,7 @@ else if ("-h".equals(arg)) System.out.println("Options:"); System.out.println(" -s Run sender"); System.out.println(" -r Run receiver (may run both sender and receiver)"); + System.out.println(" -6 Send also via IPv6"); System.out.println(" -p " + PORT + " Set port"); System.out.println(" interface: 'lo' or 'lo0' depending on OS"); System.out.println(); @@ -202,7 +206,8 @@ else if ("-h".equals(arg)) if (receiver.isDone()) break; send(udp4, group4, text); - send(udp6, group6, text); + if (use_v6) + send(udp6, group6, text); } udp6.close(); From 1a6d15515738f2f3f365843aef5c17bcb22b856c Mon Sep 17 00:00:00 2001 From: kasemir Date: Thu, 12 Jun 2025 13:47:57 -0400 Subject: [PATCH 06/15] PVA: Comments on ServerTCPListener.checkForIPv4Server --- .../java/org/epics/pva/server/ServerTCPListener.java | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/core/pva/src/main/java/org/epics/pva/server/ServerTCPListener.java b/core/pva/src/main/java/org/epics/pva/server/ServerTCPListener.java index aeee63d1ff..5ee135542c 100644 --- a/core/pva/src/main/java/org/epics/pva/server/ServerTCPListener.java +++ b/core/pva/src/main/java/org/epics/pva/server/ServerTCPListener.java @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright (c) 2019-2023 Oak Ridge National Laboratory. + * Copyright (c) 2019-2025 Oak Ridge National Laboratory. * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * which accompanies this distribution, and is available at @@ -107,7 +107,7 @@ public InetSocketAddress getResponseAddress(final boolean tls) // will each again create a 'tcp46' type of socket that might miss an already bound tcp4 channel. // // Workaround: Try to connect to 127.0.0.1 at desired port. - // Seems a wasteful, but servers tend to run for a long time, so the initial overhead hardly matters. + // Seems wasteful, but servers tend to run for a long time, so the initial overhead hardly matters. // It's noted that we can still miss an existing tcp4 server simply because the connection // takes too long and we time out, or the tcp4 server starts up just after we checked. /** Attempt to check for a tcp4 server on port @@ -127,6 +127,11 @@ private static boolean checkForIPv4Server(final int desired_port) // Check for 1 second. Local connection is supposed to be much faster. check.connect(existing_server, 1000); // Managed to connect? Suggests existing IPv4 server on that port + // If it is indeed a PVA server, it will send validation request. + // But might also be other type of server, so best send nothing + // and close connection + check.shutdownInput(); + check.shutdownOutput(); return true; } catch (Exception ex) From a9c3a3aff7e969ce8873b95a128ad4fd5e40f236 Mon Sep 17 00:00:00 2001 From: kasemir Date: Mon, 16 Jun 2025 14:14:25 -0400 Subject: [PATCH 07/15] PVA: Handle CMD_ORIGIN_TAG When forwarding UDP unicast search requests, prefix them with origin tag. When server receives search request, check for optional preceding origin tag to identify forwarded message. On forward, 'reply_to_src_port' flag is copied, but server ignores it in forwarded messages. --- .../epics/pva/client/ClientUDPHandler.java | 13 +- .../java/org/epics/pva/common/OriginTag.java | 128 ++++++++++++++++++ .../java/org/epics/pva/common/PVAHeader.java | 11 +- .../org/epics/pva/common/SearchRequest.java | 27 ++-- .../java/org/epics/pva/common/TCPHandler.java | 10 +- .../pva/server/SearchCommandHandler.java | 2 +- .../epics/pva/server/ServerUDPHandler.java | 41 ++---- .../java/org/epics/pva/server/BoolDemo.java | 2 + 8 files changed, 187 insertions(+), 47 deletions(-) create mode 100644 core/pva/src/main/java/org/epics/pva/common/OriginTag.java diff --git a/core/pva/src/main/java/org/epics/pva/client/ClientUDPHandler.java b/core/pva/src/main/java/org/epics/pva/client/ClientUDPHandler.java index de61ac5d7c..66bf3cf3d0 100644 --- a/core/pva/src/main/java/org/epics/pva/client/ClientUDPHandler.java +++ b/core/pva/src/main/java/org/epics/pva/client/ClientUDPHandler.java @@ -22,6 +22,7 @@ import org.epics.pva.PVASettings; import org.epics.pva.common.AddressInfo; import org.epics.pva.common.Network; +import org.epics.pva.common.OriginTag; import org.epics.pva.common.PVAHeader; import org.epics.pva.common.SearchRequest; import org.epics.pva.common.SearchResponse; @@ -197,6 +198,9 @@ protected boolean handleMessage(final InetSocketAddress from, final byte version { case PVAHeader.CMD_BEACON: return handleBeacon(from, version, payload, buffer); + case PVAHeader.CMD_ORIGIN_TAG: + // Will be decoded with CMD_SEARCH + break; case PVAHeader.CMD_SEARCH: return handleSearchRequest(from, version, payload, buffer); case PVAHeader.CMD_SEARCH_RESPONSE: @@ -290,7 +294,8 @@ private boolean handleBeacon(final InetSocketAddress from, final byte version, private boolean handleSearchRequest(final InetSocketAddress from, final byte version, final int payload, final ByteBuffer buffer) { - final SearchRequest search = SearchRequest.decode(from, version, payload, buffer); + final OriginTag origin = OriginTag.testForOriginOfSearch(from, buffer); + final SearchRequest search = SearchRequest.decode(origin, from, version, payload, buffer); try { if (local_multicast != null && search != null && search.unicast) @@ -300,7 +305,8 @@ private boolean handleSearchRequest(final InetSocketAddress from, final byte ver if (search.reply_required) { forward_buffer.clear(); - SearchRequest.encode(false, false, 0, null, search.client, search.tls, forward_buffer); + OriginTag.encode(udp_search4, forward_buffer); + SearchRequest.encode(false, search.reply_to_src_port, 0, null, search.client, search.tls, forward_buffer); forward_buffer.flip(); logger.log(Level.FINER, () -> "Forward search to list servers to " + local_multicast + "\n" + Hexdump.toHexdump(forward_buffer)); send(forward_buffer, local_multicast); @@ -309,7 +315,8 @@ private boolean handleSearchRequest(final InetSocketAddress from, final byte ver else { forward_buffer.clear(); - SearchRequest.encode(false, false, search.seq, search.channels, search.client, search.tls, forward_buffer); + OriginTag.encode(udp_search4, forward_buffer); + SearchRequest.encode(false, search.reply_to_src_port, search.seq, search.channels, search.client, search.tls, forward_buffer); forward_buffer.flip(); logger.log(Level.FINER, () -> "Forward search to " + local_multicast + "\n" + Hexdump.toHexdump(forward_buffer)); send(forward_buffer, local_multicast); diff --git a/core/pva/src/main/java/org/epics/pva/common/OriginTag.java b/core/pva/src/main/java/org/epics/pva/common/OriginTag.java new file mode 100644 index 0000000000..66cca3d82a --- /dev/null +++ b/core/pva/src/main/java/org/epics/pva/common/OriginTag.java @@ -0,0 +1,128 @@ +/******************************************************************************* + * Copyright (c) 2025 Oak Ridge National Laboratory. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + ******************************************************************************/ +package org.epics.pva.common; + +import static org.epics.pva.PVASettings.logger; + +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.DatagramChannel; +import java.util.logging.Level; + +import org.epics.pva.data.PVAAddress; + +/** Helper for CMD_ORIGIN_TAG + * + *

UDP search messages that are forwarded via + * the local 224.0.0.128 multicast are prefixed + * with CMD_ORIGIN_TAG. + * + * @author Kay Kasemir + */ +@SuppressWarnings("nls") +public class OriginTag +{ + /** Size of CMD_ORIGIN_TAG payload (address) */ + public static final byte PAYLOAD_SIZE = 16; + + /** Size of CMD_ORIGIN_TAG message */ + public static final byte TOTAL_SIZE = PVAHeader.HEADER_SIZE + PAYLOAD_SIZE; + + /** "address to which the receiving socket was bound ..may be 0.0.0.0" */ + public InetAddress address; + + /** Check for optional CMD_ORIGIN_TAG before CMD_SEARCH + * @param from Peer address + * @param buffer UDP message buffer ready to decode CMD_SEARCH + * @return {@link OriginTag} or null + */ + public static OriginTag testForOriginOfSearch(final InetSocketAddress from, final ByteBuffer buffer) + { + // Check for optional origin tag. + // For a normal search packet, the buffer is positioned at PVAHeader.HEADER_SIZE (8). + // For a forwarded packet, it's at CMD_ORIGIN_TAG message (8+16) + HEADER_SIZE (8), i.e., 32 + // For now assuming no or exactly one CMD_ORIGIN_TAG before CMD_SEARCH. + // Not supporting multiple CMD_ORIGIN_TAGs nor anything else before CMD_SEARCH. + + // pos is just after the CMD_SEARCH message header + final int pos = buffer.position(); + if (// Is there room for CMD_ORIGIN_TAG before this message header? + pos == OriginTag.TOTAL_SIZE + PVAHeader.HEADER_SIZE && + // Is command of previous message indeed CMD_ORIGIN_TAG? + buffer.get(PVAHeader.HEADER_OFFSET_COMMAND) == PVAHeader.CMD_ORIGIN_TAG) + { + // Move back to origin message, decode it, restore buffer position + buffer.position(PVAHeader.HEADER_SIZE); + OriginTag origin = OriginTag.decode(from, OriginTag.PAYLOAD_SIZE, buffer); + buffer.position(pos); + return origin; + } + + return null; + } + + /** Decode origin tag + * + * @param from Peer address + * @param payload Payload size + * @param buffer Buffer positioned on payload + * @return Decoded origin tag or null if not a valid + */ + public static OriginTag decode(final InetSocketAddress from, + final int payload, final ByteBuffer buffer) + { + if (payload < PAYLOAD_SIZE) + { + logger.log(Level.WARNING, "PVA client " + from + " sent only " + payload + " bytes for origin tag"); + return null; + } + final OriginTag origin = new OriginTag(); + + // responseAddress, IPv6 address in case of IP based transport, UDP + try + { + origin.address = PVAAddress.decode(buffer); + } + catch (Exception ex) + { + logger.log(Level.WARNING, "PVA Client " + from + " sent origin tag with invalid address"); + return null; + } + logger.log(Level.FINER, () -> "PVA client " + from + " sent " + origin); + return origin; + } + + /** Encode origin tag + * @param udp UDP socket from which to pick the forwarder's address + * @param buffer Buffer into which to encode + * @return Encoded forwarder's address + */ + public static InetAddress encode(final DatagramChannel udp, final ByteBuffer buffer) + { + PVAHeader.encodeMessageHeader(buffer, PVAHeader.FLAG_NONE, PVAHeader.CMD_ORIGIN_TAG, PAYLOAD_SIZE); + InetAddress this_end; + try + { + this_end = ((InetSocketAddress) udp.getLocalAddress()).getAddress(); + } + catch (Exception ex) + { + logger.log(Level.WARNING, "Invalid address for CMD_ORIGIN_TAG", ex); + this_end = new InetSocketAddress(0).getAddress(); + } + PVAAddress.encode(this_end, buffer); + return this_end; + } + + @Override + public String toString() + { + return "Origin tag with address " + address; + } +} diff --git a/core/pva/src/main/java/org/epics/pva/common/PVAHeader.java b/core/pva/src/main/java/org/epics/pva/common/PVAHeader.java index 5f22fd04fe..5f11e90375 100644 --- a/core/pva/src/main/java/org/epics/pva/common/PVAHeader.java +++ b/core/pva/src/main/java/org/epics/pva/common/PVAHeader.java @@ -27,7 +27,7 @@ public class PVAHeader public static final byte PVA_MAGIC = (byte)0xCA; /** PVA protocol revision (implemented by this library) - * + * *
v2: Server's Echo reply includes the request payload *
v3: SearchRequest FLAG_REPLY_SRC_PORT */ @@ -137,12 +137,18 @@ public class PVAHeader /** Offset from start of common PVA message header to byte version */ public static final int HEADER_OFFSET_VERSION = 1; + /** Offset from start of common PVA message header to byte flags */ + public static final int HEADER_OFFSET_FLAGS = 2; + + /** Offset from start of common PVA message header to byte command */ + public static final int HEADER_OFFSET_COMMAND = 3; + /** Offset from start of common PVA message header to int payload_size */ public static final int HEADER_OFFSET_PAYLOAD_SIZE = 4; /** Encode common PVA message header - * @param buffer Buffer into which to encode + * @param buffer Buffer into which to encode, must be positioned at desired address (usually 'clear()'ed) * @param flags Combination of FLAG_ * @param command Command * @param payload_size Size of payload that follows @@ -154,7 +160,6 @@ public static void encodeMessageHeader(final ByteBuffer buffer, byte flags, fina flags |= FLAG_BIG_ENDIAN; else flags &= ~FLAG_BIG_ENDIAN; - buffer.clear(); buffer.put(PVA_MAGIC); buffer.put(PVA_PROTOCOL_REVISION); buffer.put(flags); diff --git a/core/pva/src/main/java/org/epics/pva/common/SearchRequest.java b/core/pva/src/main/java/org/epics/pva/common/SearchRequest.java index e43fcb3c1f..970a449a3b 100644 --- a/core/pva/src/main/java/org/epics/pva/common/SearchRequest.java +++ b/core/pva/src/main/java/org/epics/pva/common/SearchRequest.java @@ -70,7 +70,17 @@ public String toString() /** Client should ignore the 'port' in the reply and * simply use the port of the 'source', that is the peer port - * of the UDP message or TCP connection + * of the UDP message or TCP connection. + * + *

In a forwarded message, the 'client' (reply-to) + * address has been updated to reflect the original client. + * The 'reply_to_src_port' flag is copied, but server + * must NOT reply to the source port because that would + * be the port of the forwarder, not the real client. + * The presence of an {@link OriginTag} will indicate + * that the 'reply_to_src_port' flag is copied and needs + * to be ignored. + * * @since Version 3 */ public static final byte FLAG_REPLY_SRC_PORT = 0x02; @@ -94,14 +104,14 @@ public String toString() public List channels; /** Check search request - * + * @param origin Optional CMD_ORIGIN_TAG that preceded the search message * @param from Peer address * @param version Message version * @param payload Payload size * @param buffer Buffer positioned on payload * @return Decoded search request or null if not a valid search request */ - public static SearchRequest decode(final InetSocketAddress from, final byte version, + public static SearchRequest decode(final OriginTag origin, final InetSocketAddress from, final byte version, final int payload, final ByteBuffer buffer) { // pvinfo sends 0x1D=29 bytes: @@ -150,12 +160,12 @@ public static SearchRequest decode(final InetSocketAddress from, final byte vers // and instead use the peer's port. // This should help with NAT where we get the message from an intermediate // and need to reply via that same intermediate - if (version >= 3 && search.reply_to_src_port) + if (version >= 3 && search.reply_to_src_port && origin == null) port = from.getPort(); // Use address from message unless it's a generic local address if (addr.isAnyLocalAddress() || port <= 0) - search.client = from; + search.client = new InetSocketAddress(from.getAddress(), port); else search.client = new InetSocketAddress(addr, port); @@ -200,7 +210,7 @@ else if ("tcp".equals(protocol)) + (search.tls ? " (TLS)" : "") + (search.unicast ? " (unicast)" : "") + (search.reply_required ? " (reply required)" : "") - + (search.reply_to_src_port ? " (reply to source port)" : "")); + + (search.reply_to_src_port ? (origin == null ? " (reply to source port)" : " (reply to source port ignored because of origin tag)") : "")); search.channels.add(new Channel(cid, name)); } } @@ -221,10 +231,11 @@ public static void encode(final boolean unicast, final boolean use_src_port, final InetSocketAddress address, final boolean tls, final ByteBuffer buffer) { + final int start = buffer.position(); // Create with zero payload size, to be patched later PVAHeader.encodeMessageHeader(buffer, PVAHeader.FLAG_NONE, PVAHeader.CMD_SEARCH, 0); - final int payload_start = buffer.position(); + final int payload_start = start + PVAHeader.HEADER_SIZE; // SEARCH message sequence // PVXS sends "find".getBytes() instead @@ -282,6 +293,6 @@ public static void encode(final boolean unicast, final boolean use_src_port, } // Update payload size - buffer.putInt(PVAHeader.HEADER_OFFSET_PAYLOAD_SIZE, buffer.position() - payload_start); + buffer.putInt(start + PVAHeader.HEADER_OFFSET_PAYLOAD_SIZE, buffer.position() - payload_start); } } diff --git a/core/pva/src/main/java/org/epics/pva/common/TCPHandler.java b/core/pva/src/main/java/org/epics/pva/common/TCPHandler.java index 7c7aab5bbf..1ea1a2aaa9 100644 --- a/core/pva/src/main/java/org/epics/pva/common/TCPHandler.java +++ b/core/pva/src/main/java/org/epics/pva/common/TCPHandler.java @@ -394,14 +394,14 @@ private ByteBuffer assertBufferSize(final ByteBuffer buffer, final int size) */ private void handleMessage(final ByteBuffer buffer) throws Exception { - final byte flags = buffer.get(2); + final byte flags = buffer.get(PVAHeader.HEADER_OFFSET_FLAGS); final byte segemented = (byte) (flags & PVAHeader.FLAG_SEGMENT_MASK); if (segemented != 0) handleSegmentedMessage(segemented, buffer); else { final boolean control = (flags & PVAHeader.FLAG_CONTROL) != 0; - final byte command = buffer.get(3); + final byte command = buffer.get(PVAHeader.HEADER_OFFSET_COMMAND); // Move to start of potential payload if (buffer.limit() >= 8) buffer.position(8); @@ -462,11 +462,11 @@ else if (segments.position() > 0) if (segments == null || segments.position() <= 0) throw new Exception("Received " + (last ? "last" : "middle") + " message segment without first segment"); // Check if command matches the one in first segment - final byte seg_command = segments.get(3); - if (seg_command != buffer.get(3)) + final byte seg_command = segments.get(PVAHeader.HEADER_OFFSET_COMMAND); + if (seg_command != buffer.get(PVAHeader.HEADER_OFFSET_COMMAND)) throw new Exception(String.format("Received " + (last ? "last" : "middle") + " message segment for command 0x%02X after first segment for command 0x%02X", - buffer.get(3), seg_command)); + buffer.get(PVAHeader.HEADER_OFFSET_COMMAND), seg_command)); // Size of segments accumulated so far.. final int seg_size = segments.getInt(PVAHeader.HEADER_OFFSET_PAYLOAD_SIZE); diff --git a/core/pva/src/main/java/org/epics/pva/server/SearchCommandHandler.java b/core/pva/src/main/java/org/epics/pva/server/SearchCommandHandler.java index 070c095c8d..b41b777169 100644 --- a/core/pva/src/main/java/org/epics/pva/server/SearchCommandHandler.java +++ b/core/pva/src/main/java/org/epics/pva/server/SearchCommandHandler.java @@ -30,7 +30,7 @@ public void handleCommand(final ServerTCPHandler tcp, final ByteBuffer buffer) t final byte version = buffer.get(PVAHeader.HEADER_OFFSET_VERSION); final int payload_size = buffer.getInt(PVAHeader.HEADER_OFFSET_PAYLOAD_SIZE); - final SearchRequest search = SearchRequest.decode(tcp.getRemoteAddress(), version, payload_size, buffer); + final SearchRequest search = SearchRequest.decode(null, tcp.getRemoteAddress(), version, payload_size, buffer); if (search.channels != null) for (SearchRequest.Channel channel : search.channels) diff --git a/core/pva/src/main/java/org/epics/pva/server/ServerUDPHandler.java b/core/pva/src/main/java/org/epics/pva/server/ServerUDPHandler.java index 39b6cfd1aa..34d5729a46 100644 --- a/core/pva/src/main/java/org/epics/pva/server/ServerUDPHandler.java +++ b/core/pva/src/main/java/org/epics/pva/server/ServerUDPHandler.java @@ -24,12 +24,12 @@ import org.epics.pva.PVASettings; import org.epics.pva.common.AddressInfo; import org.epics.pva.common.Network; +import org.epics.pva.common.OriginTag; import org.epics.pva.common.PVAHeader; import org.epics.pva.common.SearchRequest; import org.epics.pva.common.SearchResponse; import org.epics.pva.common.UDPHandler; import org.epics.pva.data.Hexdump; -import org.epics.pva.data.PVAAddress; /** Listen to search requests, send beacons * @author Kay Kasemir @@ -142,7 +142,8 @@ protected boolean handleMessage(final InetSocketAddress from, final byte version switch (command) { case PVAHeader.CMD_ORIGIN_TAG: - return handleOriginTag(from, version, payload, buffer); + // Will be decoded with CMD_SEARCH + break; case PVAHeader.CMD_SEARCH: return handleSearch(from, version, payload, buffer); case PVAHeader.CMD_BEACON: @@ -154,25 +155,7 @@ protected boolean handleMessage(final InetSocketAddress from, final byte version return true; } - private boolean handleOriginTag(final InetSocketAddress from, final byte version, - final int payload, final ByteBuffer buffer) - { - final InetAddress addr; - try - { - addr = PVAAddress.decode(buffer); - } - catch (Exception ex) - { - logger.log(Level.WARNING, "PVA Client " + from + " sent origin tag with invalid address"); - return false; - } - logger.log(Level.FINER, () -> "PVA Client " + from + " sent origin tag " + addr); - - return true; - } - - /** @param from Origin of search request + /** @param from Sender of search request * @param version Client's version * @param payload Size of payload * @param buffer Buffer with search request @@ -181,7 +164,9 @@ private boolean handleOriginTag(final InetSocketAddress from, final byte version private boolean handleSearch(final InetSocketAddress from, final byte version, final int payload, final ByteBuffer buffer) { - final SearchRequest search = SearchRequest.decode(from, version, payload, buffer); + // Check for optional origin tag. + final OriginTag origin = OriginTag.testForOriginOfSearch(from, buffer); + final SearchRequest search = SearchRequest.decode(origin, from, version, payload, buffer); if (search == null) return false; @@ -191,7 +176,7 @@ private boolean handleSearch(final InetSocketAddress from, final byte version, { // pvlist request final boolean handled = server.handleSearchRequest(0, -1, null, search.client, search.tls, null); if (! handled && search.unicast) - PVAServer.POOL.submit(() -> forwardSearchRequest(0, null, search.client, search.tls)); + PVAServer.POOL.submit(() -> forwardSearchRequest(0, null, search.client, search.reply_to_src_port, search.tls)); } } else @@ -211,7 +196,7 @@ private boolean handleSearch(final InetSocketAddress from, final byte version, if (forward != null) { final List to_forward = forward; - PVAServer.POOL.submit(() -> forwardSearchRequest(search.seq, to_forward, search.client, search.tls)); + PVAServer.POOL.submit(() -> forwardSearchRequest(search.seq, to_forward, search.client, search.reply_to_src_port, search.tls)); } } @@ -229,9 +214,10 @@ private boolean handleSearch(final InetSocketAddress from, final byte version, * @param seq Search sequence or 0 * @param channels Channel CIDs and names or null for 'list' * @param address Client's address and port + * @param reply_to_src_port Set flag to use the 'peer' port, ignoring the reply port? * @param tls Use TLS or plain TCP? */ - private void forwardSearchRequest(final int seq, final Collection channels, final InetSocketAddress address, final boolean tls) + private void forwardSearchRequest(final int seq, final Collection channels, final InetSocketAddress address, final boolean reply_to_src_port, final boolean tls) { // TODO Remove the local IPv4 multicast re-send from the protocol, just use multicast from the start as with IPv6 if (local_multicast == null) @@ -239,9 +225,10 @@ private void forwardSearchRequest(final int seq, final Collection "Forward search to " + local_multicast + "\n" + Hexdump.toHexdump(send_buffer)); + logger.log(Level.FINER, () -> "Forward search from " + origin + " to " + local_multicast + "\n" + Hexdump.toHexdump(send_buffer)); try { udp4.send(send_buffer, local_multicast.getAddress()); diff --git a/core/pva/src/test/java/org/epics/pva/server/BoolDemo.java b/core/pva/src/test/java/org/epics/pva/server/BoolDemo.java index 0830f0b247..22455b83ad 100644 --- a/core/pva/src/test/java/org/epics/pva/server/BoolDemo.java +++ b/core/pva/src/test/java/org/epics/pva/server/BoolDemo.java @@ -23,6 +23,8 @@ /** PVA Server Demo for "bool" PV * @author Kay Kasemir */ +// mvn test-compile +// java -cp target/classes:target/test-classes org.epics.pva.server.BoolDemo @SuppressWarnings("nls") public class BoolDemo { From 19dd3b230f65bf8295405a6f9a29ef9a0a579ce2 Mon Sep 17 00:00:00 2001 From: kasemir Date: Mon, 16 Jun 2025 14:54:59 -0400 Subject: [PATCH 08/15] EPICS_PVAS_INTF_ADDR_LIST and CMD_ORIGIN_TAG --- .../src/main/java/org/epics/pva/common/OriginTag.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/core/pva/src/main/java/org/epics/pva/common/OriginTag.java b/core/pva/src/main/java/org/epics/pva/common/OriginTag.java index 66cca3d82a..460ead0ffe 100644 --- a/core/pva/src/main/java/org/epics/pva/common/OriginTag.java +++ b/core/pva/src/main/java/org/epics/pva/common/OriginTag.java @@ -23,6 +23,15 @@ * the local 224.0.0.128 multicast are prefixed * with CMD_ORIGIN_TAG. * + *

According to + * https://github.com/epics-docs/epics-docs/blob/master/pv-access/Protocol-Messages.md, + * the address listed in the origin tag should be + * "the address to which the receiving socket was bound. This may be 0.0.0.0". + * + *

By default, we have EPICS_PVAS_INTF_ADDR_LIST = "0.0.0.0 [::] 224.0.0.128,1@127.0.0.1 [ff02::42:1],1@::1". + * The IPv4 UDP socket will be bound to 0.0.0.0, and the origin tags will show 0.0.0.0. + * Replacing "0.0.0.0" with the IP address of a network interface will cause it to show in the origin tags. + * * @author Kay Kasemir */ @SuppressWarnings("nls") From 5611be2441e1b476e1876a6d1d4077d863919b26 Mon Sep 17 00:00:00 2001 From: kasemir Date: Wed, 16 Jul 2025 13:23:41 -0400 Subject: [PATCH 09/15] Replace ClientTCPHandler.getX509Name with getServer/ClientX509Name --- .../epics/pva/client/ClientTCPHandler.java | 36 ++++++++++++------- .../org/epics/pva/client/PVAClientMain.java | 10 +++++- .../epics/pva/client/ValidationHandler.java | 4 +-- 3 files changed, 34 insertions(+), 16 deletions(-) diff --git a/core/pva/src/main/java/org/epics/pva/client/ClientTCPHandler.java b/core/pva/src/main/java/org/epics/pva/client/ClientTCPHandler.java index c8aa81e352..b94a9e29ce 100644 --- a/core/pva/src/main/java/org/epics/pva/client/ClientTCPHandler.java +++ b/core/pva/src/main/java/org/epics/pva/client/ClientTCPHandler.java @@ -64,13 +64,6 @@ class ClientTCPHandler extends TCPHandler /** Client context */ private final PVAClient client; - /** When using TLS, the socket may come with a local certificate - * that TLS uses to authenticate to the server, - * and this is the name from that certificate. - * Otherwise null - */ - private String x509_name; - /** Channels that use this connection */ private final CopyOnWriteArrayList channels = new CopyOnWriteArrayList<>(); @@ -145,9 +138,6 @@ protected boolean initializeSocket() return false; } - // For TLS, check if the socket has a name that's used to authenticate - x509_name = tls ? SecureSockets.getPrincipalCN(((SSLSocket) socket).getSession().getLocalPrincipal()) : null; - // For default EPICS_CA_CONN_TMO: 30 sec, send echo at ~15 sec: // Check every ~3 seconds last_life_sign = last_message_sent = System.currentTimeMillis(); @@ -170,10 +160,30 @@ PVAClient getClient() return client; } - /** @return Name used by TLS socket's certificate, or null */ - String getX509Name() + /** When using TLS, the socket has a peer (server, IOC) certificate + * @return Name from server's certificate, or null + */ + String getServerX509Name() + { + try + { + if (tls) + return SecureSockets.getPrincipalCN(((SSLSocket) socket).getSession().getPeerPrincipal()); + } + catch (Exception ex) + { + logger.log(Level.WARNING, "Cannot get server principal", ex); + } + return null; + } + + /** When using TLS, the socket may come with a local (client) certificate + * that TLS uses to authenticate to the server. + * @return Name from client's certificate, or null */ + String getClientX509Name() { - return x509_name; + return tls ? SecureSockets.getPrincipalCN(((SSLSocket) socket).getSession().getLocalPrincipal()) + : null; } /** @param channel Channel that uses this TCP connection */ diff --git a/core/pva/src/main/java/org/epics/pva/client/PVAClientMain.java b/core/pva/src/main/java/org/epics/pva/client/PVAClientMain.java index 93d9b14f7a..f2a5fab4bc 100644 --- a/core/pva/src/main/java/org/epics/pva/client/PVAClientMain.java +++ b/core/pva/src/main/java/org/epics/pva/client/PVAClientMain.java @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright (c) 2019-2023 Oak Ridge National Laboratory. + * Copyright (c) 2019-2025 Oak Ridge National Laboratory. * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * which accompanies this distribution, and is available at @@ -85,6 +85,9 @@ private static void info(final List names) throws Exception final PVAChannel pv = iter.next(); if (pv.getState() == ClientChannelState.CONNECTED) { + PVASettings.logger.log(Level.INFO, "Server: " + pv.getTCP().getServerX509Name()); + PVASettings.logger.log(Level.INFO, "Client: " + pv.getTCP().getClientX509Name()); + final PVAData data = pv.info(request).get(timeout_ms, TimeUnit.MILLISECONDS); System.out.println(pv.getName() + " = " + data.formatType()); pv.close(); @@ -127,6 +130,9 @@ private static void get(final List names) throws Exception final PVAChannel pv = iter.next(); if (pv.getState() == ClientChannelState.CONNECTED) { + PVASettings.logger.log(Level.INFO, "Server: " + pv.getTCP().getServerX509Name()); + PVASettings.logger.log(Level.INFO, "Client: " + pv.getTCP().getClientX509Name()); + final PVAData data = pv.read(request).get(timeout_ms, TimeUnit.MILLISECONDS); System.out.println(pv.getName() + " = " + data); pv.close(); @@ -170,6 +176,8 @@ private static void monitor(final List names) throws Exception { try { + PVASettings.logger.log(Level.INFO, "Server: " + ch.getTCP().getServerX509Name()); + PVASettings.logger.log(Level.INFO, "Client: " + ch.getTCP().getClientX509Name()); ch.subscribe(request, listener); } catch (Exception ex) diff --git a/core/pva/src/main/java/org/epics/pva/client/ValidationHandler.java b/core/pva/src/main/java/org/epics/pva/client/ValidationHandler.java index 488e3eec3e..f57bc2afab 100644 --- a/core/pva/src/main/java/org/epics/pva/client/ValidationHandler.java +++ b/core/pva/src/main/java/org/epics/pva/client/ValidationHandler.java @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright (c) 2019-2023 Oak Ridge National Laboratory. + * Copyright (c) 2019-2025 Oak Ridge National Laboratory. * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * which accompanies this distribution, and is available at @@ -52,7 +52,7 @@ public void handleCommand(final ClientTCPHandler tcp, final ByteBuffer buffer) t // Support "x509" or "ca" authorization, fall back to any-no-mouse final ClientAuthentication authentication; // Even if server suggests x509, check that we have a certificate with name - if (tcp.getX509Name() != null && auth.contains(PVAAuth.X509)) + if (tcp.getClientX509Name() != null && auth.contains(PVAAuth.X509)) authentication = ClientAuthentication.X509; else if (auth.contains(PVAAuth.CA)) authentication = ClientAuthentication.CA; From c721f6f109e922d5c13009fdeb62ef749c446f4c Mon Sep 17 00:00:00 2001 From: kasemir Date: Fri, 18 Jul 2025 09:51:52 -0400 Subject: [PATCH 10/15] Clear subscriptions when client destroys channel .. used to require explicit unsubscribe or closedown of TCP connection --- .../org/epics/pva/server/MonitorSubscription.java | 7 ++++++- .../main/java/org/epics/pva/server/ServerPV.java | 15 ++++++++++++--- 2 files changed, 18 insertions(+), 4 deletions(-) diff --git a/core/pva/src/main/java/org/epics/pva/server/MonitorSubscription.java b/core/pva/src/main/java/org/epics/pva/server/MonitorSubscription.java index 1d84cba727..38a137328e 100644 --- a/core/pva/src/main/java/org/epics/pva/server/MonitorSubscription.java +++ b/core/pva/src/main/java/org/epics/pva/server/MonitorSubscription.java @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright (c) 2019-2020 Oak Ridge National Laboratory. + * Copyright (c) 2019-2025 Oak Ridge National Laboratory. * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * which accompanies this distribution, and is available at @@ -77,8 +77,13 @@ class MonitorSubscription tcp.submit(this::encodeMonitor); } + /** @param tcp TCP connection + * @param req Client's monitor request ID, -1 for any client request ID + * @return Is this subscription for that TCP connection and client's request ID? + */ boolean isFor(final ServerTCPHandler tcp, final int req) { + // Check for identity, not equal IP address. return this.tcp == tcp && (req == -1 || this.req == req); } diff --git a/core/pva/src/main/java/org/epics/pva/server/ServerPV.java b/core/pva/src/main/java/org/epics/pva/server/ServerPV.java index 1b690eab16..954fa85a0d 100644 --- a/core/pva/src/main/java/org/epics/pva/server/ServerPV.java +++ b/core/pva/src/main/java/org/epics/pva/server/ServerPV.java @@ -147,6 +147,7 @@ void addClient(final ServerTCPHandler tcp, final int cid) */ void removeClient(final ServerTCPHandler tcp, final int cid) { + // Stop associating PV with that TCP connection final Integer other = cid_by_client.remove(tcp); if (cid == -1) logger.log(Level.FINE, "Client " + tcp + " released " + this + " [CID was " + other + "]"); @@ -154,6 +155,11 @@ else if (other != null && other.intValue() == cid) logger.log(Level.FINE, "Client " + tcp + " released " + this + " [CID " + cid + "]"); else logger.log(Level.WARNING, "Client " + tcp + " released " + this + " as CID " + cid + " instead of " + other); + + // Delete all subscriptions to this PV from that TCP connection + // A perfect client would separately clear the subscription, + // but this asserts they're all gone for sure + unregisterSubscription(tcp, -1); } /** @param subscription Subscription that needs to receive value updates */ @@ -169,13 +175,16 @@ void registerSubscription(final MonitorSubscription subscription) */ void unregisterSubscription(final ServerTCPHandler tcp, final int req) { - for (MonitorSubscription subscription : subscriptions) + subscriptions.removeIf(subscription -> + { if (subscription.isFor(tcp, req)) { logger.log(Level.FINER, () -> "Remove " + subscription); - subscriptions.remove(subscription); - break; + return true; } + return false; + }); + logger.log(Level.FINEST, () -> "There are " + subscriptions.size() + " remaining subscriptions"); } /** @return Does the PV have client subscriptions? */ From ab17c0e15f978f9f56c28512b1b1f3c343d60dab Mon Sep 17 00:00:00 2001 From: kasemir Date: Tue, 22 Jul 2025 15:47:05 -0400 Subject: [PATCH 11/15] PVAEnum: Allow setting by index or label --- .../java/org/epics/pva/data/nt/PVAEnum.java | 41 +++++++++++++++---- 1 file changed, 34 insertions(+), 7 deletions(-) diff --git a/core/pva/src/main/java/org/epics/pva/data/nt/PVAEnum.java b/core/pva/src/main/java/org/epics/pva/data/nt/PVAEnum.java index 44b5362de8..792ed26fb9 100644 --- a/core/pva/src/main/java/org/epics/pva/data/nt/PVAEnum.java +++ b/core/pva/src/main/java/org/epics/pva/data/nt/PVAEnum.java @@ -24,12 +24,12 @@ /** * Normative enum type - * + * * An enum_t describes an enumeration. The field is a structure describing a * value drawn from a given set of valid values also given. - * + * * enum_t := - * + * *

    *
  • structure *
      @@ -45,7 +45,7 @@ public class PVAEnum extends PVAStructure { private PVAStringArray choices; /** - * Constructor + * Constructor * @param name Name of the enum * @param index The index of the current value of the enumeration in the array choices below. * @param choices An array of strings specifying the set of labels for the valid values of the enumeration. @@ -55,7 +55,7 @@ public PVAEnum(String name, int index, String[] choices) { } /** - * Constructor + * Constructor * @param name Name of the enum * @param index The index of the current value of the enumeration in the array choices below. * @param choices An array of strings specifying the set of labels for the valid values of the enumeration. @@ -68,7 +68,7 @@ public PVAEnum(String name, PVAInt index, PVAStringArray choices) { /** * String of the enum output - * + * * @return The resulting string of the enum_t */ public String enumString() { @@ -81,9 +81,36 @@ public String enumString() { return null; } + /** @param new_value Number for index or String to select option + * @throws Exception on error + */ + @Override + public void setValue(Object new_value) throws Exception + { + // Set numeric index + if (new_value instanceof Number num) + index.set(num.intValue()); + // or find string in options + else if (new_value instanceof String str) + { + int i=0; + for (String choice : choices.get()) + { + if (choice.equals(str)) + { + index.set(i); + break; + } + ++i; + } + } + else + super.setValue(new_value); + } + /** * Converts from a generic PVAStruture to PVAEnum - * + * * @param structure Input structure * @return Representative Enum */ From 1b680327e89828a1f22134382de9a88c50d082ad Mon Sep 17 00:00:00 2001 From: kasemir Date: Wed, 23 Jul 2025 14:59:02 -0400 Subject: [PATCH 12/15] pva.nt: Provider proper epics:nt/NTEnum:1.0 for enum --- .../java/org/epics/pva/data/nt/PVAScalar.java | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/core/pva/src/main/java/org/epics/pva/data/nt/PVAScalar.java b/core/pva/src/main/java/org/epics/pva/data/nt/PVAScalar.java index 2ead803d14..d6d8d148ff 100644 --- a/core/pva/src/main/java/org/epics/pva/data/nt/PVAScalar.java +++ b/core/pva/src/main/java/org/epics/pva/data/nt/PVAScalar.java @@ -43,7 +43,7 @@ /** * Normative scalar and scaler[] type - * + * * NTScalar := *
        *
      • structure @@ -56,7 +56,7 @@ *
      • control_t control :opt *
      *
    - * + * * @param can be from scalar_t or scalar_t[] as specified by: * where scalar_t can be: *
      @@ -69,7 +69,7 @@ *
    • {@link PVADouble} *
    • {@link PVAString} *
    - * + * * and scalar_t[] can be: *
      *
    • {@link PVABoolArray} @@ -84,15 +84,16 @@ */ public class PVAScalar extends PVAStructure { public static final String SCALAR_STRUCT_NAME_STRING = "epics:nt/NTScalar:1.0"; + public static final String ENUM_STRUCT_NAME_STRING = "epics:nt/NTEnum:1.0"; public static final String ARRAY_STRUCT_NAME_STRING = "epics:nt/NTScalarArray:1.0"; public static final String VALUE_NAME_STRING = "value"; public static final String DESCRIPTION_NAME_STRING = "description"; /** * Builder for the PVAScalar class - * + * * @param is in the {@link PVAScalar} list - * + * * The description element must be named description * otherwise a PVAScalarDescriptionNameException will be thrown * The value element must be named value @@ -113,9 +114,10 @@ public Builder() { } String determineStructName(S value) { - if (value instanceof PVAArray) { + if (value instanceof PVAArray) return ARRAY_STRUCT_NAME_STRING; - } + else if (value instanceof PVAEnum) + return ENUM_STRUCT_NAME_STRING; return SCALAR_STRUCT_NAME_STRING; } @@ -260,7 +262,7 @@ public static Builder stringArrayScalarBuilder(String... value) /** * Converts from generic PVAStructure to PVAScalar - * + * * @param structure * @return * @throws PVAScalarValueNameException From a437c64d8766231da3757e67efdf4a24810ce56f Mon Sep 17 00:00:00 2001 From: kasemir Date: Thu, 24 Jul 2025 09:57:49 -0400 Subject: [PATCH 13/15] PVA search handler can use reply_sender with `null` ... ... to indicate that this server now has the PV. Useful for poxies to indicate ASAP that a PV was just created, not waiting until the next client search tick --- .../java/org/epics/pva/server/PVAServer.java | 32 +++++++++---------- .../org/epics/pva/server/SearchHandler.java | 10 ++++-- 2 files changed, 24 insertions(+), 18 deletions(-) diff --git a/core/pva/src/main/java/org/epics/pva/server/PVAServer.java b/core/pva/src/main/java/org/epics/pva/server/PVAServer.java index cd10f3bedf..0b4c847b2b 100644 --- a/core/pva/src/main/java/org/epics/pva/server/PVAServer.java +++ b/core/pva/src/main/java/org/epics/pva/server/PVAServer.java @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright (c) 2019-2023 Oak Ridge National Laboratory. + * Copyright (c) 2019-2025 Oak Ridge National Laboratory. * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * which accompanies this distribution, and is available at @@ -184,14 +184,22 @@ boolean handleSearchRequest(final int seq, final int cid, final String name, if (tls_requested && !tls) logger.log(Level.WARNING, "PVA Client " + client + " searches for '" + name + "' with TLS, but EPICS_PVAS_TLS_KEYCHAIN is not configured"); + // Send search reply from either custom_search_handler or later in here final Consumer send_search_reply = server_address -> { + // Use 'server_address', or fall back to what we would normally use + final InetSocketAddress address; + if (server_address != null) + address = server_address; + else if (tcp_connection != null) + address = USE_THIS_TCP_CONNECTION; + else + address = getTCPAddress(tls); // If received via TCP, reply via same connection. if (tcp_connection != null) - tcp_connection.submitSearchReply(guid, seq, cid, server_address, tls); - else - // Otherwise reply via UDP to the given address. - POOL.execute(() -> udp.sendSearchReply(guid, seq, cid, server_address, tls, client)); + tcp_connection.submitSearchReply(guid, seq, cid, address, tls); + else // Otherwise reply via UDP to the given address. + POOL.execute(() -> udp.sendSearchReply(guid, seq, cid, address, tls, client)); }; // Does custom handler consume the search request? @@ -208,20 +216,12 @@ boolean handleSearchRequest(final int seq, final int cid, final String name, return true; } else - { - // Known channel? + { // Known channel? final ServerPV pv = getPV(name); if (pv != null) - { - // Reply with TCP connection info + { // Reply with TCP connection info logger.log(Level.FINE, () -> "Received Search for known PV " + pv); - - // If received via TCP, ask client to continue on same connection. - // Otherwise provide the TCP address for the UDP request. - if (tcp_connection != null) - send_search_reply.accept(USE_THIS_TCP_CONNECTION); - else - send_search_reply.accept(getTCPAddress(tls)); + send_search_reply.accept(null); return true; } else diff --git a/core/pva/src/main/java/org/epics/pva/server/SearchHandler.java b/core/pva/src/main/java/org/epics/pva/server/SearchHandler.java index f6bd31c22f..87c518e00a 100644 --- a/core/pva/src/main/java/org/epics/pva/server/SearchHandler.java +++ b/core/pva/src/main/java/org/epics/pva/server/SearchHandler.java @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright (c) 2020-2022 Oak Ridge National Laboratory. + * Copyright (c) 2020-2025 Oak Ridge National Laboratory. * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * which accompanies this distribution, and is available at @@ -31,7 +31,13 @@ public interface SearchHandler * @param cid Client channel ID or -1 * @param name Channel name or null * @param client Client's address - * @param reply_sender Callback for TCP address of server + * @param reply_sender Callback for TCP address of server. + * Name servers that can resolve the name + * will return the address of the PV's server. + * If null, this server will + * reply with its own address, + * for usage by a gateway that wants so + * indicate that it can now proxy that PV. * @return true if the search request was handled, * i.e. the name was recognized and the request does not need * to be forwarded or passed to anybody else From 67c33c93b9adf97de8fd2eea86bcf72cd7fc674d Mon Sep 17 00:00:00 2001 From: kasemir Date: Thu, 24 Jul 2025 14:44:53 -0400 Subject: [PATCH 14/15] PVA server: Fix subscription update race --- .../epics/pva/server/MonitorSubscription.java | 61 ++++++++++++++----- 1 file changed, 45 insertions(+), 16 deletions(-) diff --git a/core/pva/src/main/java/org/epics/pva/server/MonitorSubscription.java b/core/pva/src/main/java/org/epics/pva/server/MonitorSubscription.java index 38a137328e..b263d87b8b 100644 --- a/core/pva/src/main/java/org/epics/pva/server/MonitorSubscription.java +++ b/core/pva/src/main/java/org/epics/pva/server/MonitorSubscription.java @@ -11,7 +11,6 @@ import java.nio.ByteBuffer; import java.util.BitSet; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Level; import org.epics.pva.common.PVAHeader; @@ -26,7 +25,6 @@ * * @author Kay Kasemir */ -@SuppressWarnings("nls") class MonitorSubscription { /** ID of monitor request sent by client */ @@ -42,8 +40,11 @@ class MonitorSubscription // and their TCP connection might be able to handle updates // at different rates, so each subscription maintains // the per-client state of the data, changes and overruns. + // + // /------------------------------------\ + // /--- SYNC on data for the following ---\ - /** Most recent value, to be sent to clients. + /** Most recent value, to be sent to clients * SYNC on data */ private final PVAStructure data; @@ -60,10 +61,15 @@ class MonitorSubscription /** Is an update pending to be sent out? * - *

      Used to prevent scheduling more updates that TCP connection can handle. - * Changes from multiple updates are combined, potentially triggering overrun. + *

      Used to prevent scheduling more updates than TCP connection can handle. + * Changes from multiple updates are combined, potentially indicating overrun. + * SYNC on data */ - private final AtomicBoolean pending = new AtomicBoolean(true); + private volatile boolean pending = true; + + // \--- SYNC on data for the above ---/ + // \------------------------------------/ + MonitorSubscription(final int req, final ServerPV pv, final ServerTCPHandler tcp) { @@ -72,7 +78,9 @@ class MonitorSubscription this.tcp = tcp; data = pv.getData(); - // Initial update: Send all the data + // Initial update: Send all the data (bit zero) + // Later we typically send changes to "value" etc. + // as determined in `update` changes.set(0); tcp.submit(this::encodeMonitor); } @@ -89,6 +97,23 @@ boolean isFor(final ServerTCPHandler tcp, final int req) void update(final PVAStructure new_data) throws Exception { + // We update `data`, then submit an `encodeMonitor` run. + // As updates arrive, one could occur before or right when + // `encodeMonitor` runs. + // Since we all sync on data, there are several scenarios + // a) Previously submitted `encodeMonitor` is pending, + // another `update` adds changes and maybe accumulates overruns. + // It will not submit another `encodeMonitor` run + // because one is already pending which will eventually + // transmit the combined update. + // c) Previously submitted `encodeMonitor` runs but + // we are able to lock `data` before `encodeMonitor`. + // Plays out just like case a), add changes, detect + // a pending `encodeMonitor` run. + // b) Previously submitted `encodeMonitor` runs and + // syncs on data, we are blocked. + // `encodeMonitor` transmits the update, clears `pending`, + // and then we can update data and submit another `encodeMonitor` run. synchronized (data) { final BitSet old_changes = changes; @@ -100,20 +125,20 @@ void update(final PVAStructure new_data) throws Exception // See what had changed before, and now changed again old_changes.and(changes); overrun.or(old_changes); - } - // Only submit when there's not already one pending, waiting to be sent out - if (pending.compareAndSet(false, true)) - tcp.submit(this::encodeMonitor); - else - logger.log(Level.WARNING, "Skipping already submitted " + this); + // Only submit when there's not already one pending, waiting to be sent out + if (pending) + logger.log(Level.WARNING, "Skipping already submitted " + this + ", changes " + changes + ", overrun " + overrun); + else + { + tcp.submit(this::encodeMonitor); + pending = true; + } + } } private void encodeMonitor(final byte version, final ByteBuffer buffer) throws Exception { - pending.set(false); - - logger.log(Level.FINE, () -> "Sending MONITOR value for " + pv + ": changes " + changes + ", overrun " + overrun); PVAHeader.encodeMessageHeader(buffer, PVAHeader.FLAG_SERVER, PVAHeader.CMD_MONITOR, 0); final int payload_start = buffer.position(); @@ -124,6 +149,8 @@ private void encodeMonitor(final byte version, final ByteBuffer buffer) throws E synchronized (data) { + logger.log(Level.FINE, () -> "Sending MONITOR value for " + pv + ": changes " + changes + ", overrun " + overrun); + // Encode what changed PVABitSet.encodeBitSet(changes, buffer); // Encode the changed data @@ -146,6 +173,8 @@ private void encodeMonitor(final byte version, final ByteBuffer buffer) throws E PVABitSet.encodeBitSet(overrun, buffer); overrun.clear(); + + pending = false; } final int payload_end = buffer.position(); From 30c1b229edc7d17571e4dc9501ee897494cbbbe8 Mon Sep 17 00:00:00 2001 From: kasemir Date: Tue, 29 Jul 2025 10:41:06 -0400 Subject: [PATCH 15/15] PVA local mcast: Restore previously used options and 'join' again --- core/pva/src/main/java/org/epics/pva/common/Network.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/core/pva/src/main/java/org/epics/pva/common/Network.java b/core/pva/src/main/java/org/epics/pva/common/Network.java index 5394f393ad..fafe0bf774 100644 --- a/core/pva/src/main/java/org/epics/pva/common/Network.java +++ b/core/pva/src/main/java/org/epics/pva/common/Network.java @@ -16,6 +16,7 @@ import java.net.InterfaceAddress; import java.net.NetworkInterface; import java.net.StandardProtocolFamily; +import java.net.StandardSocketOptions; import java.nio.channels.DatagramChannel; import java.util.ArrayList; import java.util.Enumeration; @@ -302,6 +303,10 @@ public static AddressInfo getLocalMulticastGroup(final DatagramChannel udp, fina final InetSocketAddress local_multicast = new InetSocketAddress(group, port); logger.log(Level.CONFIG, "Local multicast of IPv4 unicast using group " + local_multicast + " using network interface " + loopback.getDisplayName()); + udp.join(group, loopback); + // Default is TRUE anyway? + udp.setOption(StandardSocketOptions.IP_MULTICAST_LOOP, true); + udp.setOption(StandardSocketOptions.IP_MULTICAST_IF, loopback); return new AddressInfo(false, local_multicast, 1, loopback); } }