diff --git a/Cargo.lock b/Cargo.lock index f7ab227a..38647547 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -763,6 +763,7 @@ version = "0.25.0" dependencies = [ "clipcat-base", "clipcat-proto", + "futures", "http", "hyper-util", "mime", @@ -933,8 +934,10 @@ dependencies = [ "clipcat-client", "clipcat-external-editor", "directories", + "futures", "http", "http-serde", + "lru", "mime", "serde", "shadow-rs", @@ -4934,6 +4937,7 @@ dependencies = [ "futures-core", "pin-project-lite", "tokio", + "tokio-util", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 1248cce3..03910175 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -56,7 +56,7 @@ tokio = { version = "1", features = [ "sync", "process", ] } -tokio-stream = { version = "0.1", features = ["net"] } +tokio-stream = { version = "0.1", features = ["net", "sync"] } arboard = { version = "3", default-features = false, features = [ "image-data", @@ -90,6 +90,7 @@ humansize = "2" image = "0.25" libc = "0.2" linicon = "2" +lru = "0.16" mime = "0.3" mio = { version = "1.2", features = ["os-ext"] } notify = "8" diff --git a/README.md b/README.md index 113ae8fc..f5412855 100644 --- a/README.md +++ b/README.md @@ -165,12 +165,13 @@ clipcatd --no-daemon 3. You can run the following commands with `clipcatctl` or `clipcat-menu`: -| Command | Comment | -| ------------------------- | ----------------------------------------------------- | -| `clipcatctl list` | List cached clipboard history | -| `clipcatctl promote ` | Insert cached clip with `` into the X11 clipboard | -| `clipcatctl remove [ids]` | Remove cached clips with `[ids]` from the server | -| `clipcatctl clear` | Clear cached clipboard history | +| Command | Comment | +| ------------------------- | ------------------------------------------------------------------ | +| `clipcatctl list` | List cached clipboard history | +| `clipcatctl tail [-f]` | Print the most recent entries (default 10); `-f` streams new clips | +| `clipcatctl promote ` | Insert cached clip with `` into the X11 clipboard | +| `clipcatctl remove [ids]` | Remove cached clips with `[ids]` from the server | +| `clipcatctl clear` | Clear cached clipboard history | | Command | Comment | | --------------------- | ------------------------------------------- | @@ -584,6 +585,53 @@ pkill clipcatd +
+ Reacting to new clips with clipcatctl tail + +`clipcatctl tail -f` streams each new clip's id and preview as it is +added, in the same format as `clipcatctl list`. This lets you write +small shell hooks that mutate clips out-of-band. + +Run **one** hook script for every transform you want, even unrelated +ones. Independent `tail -f` listeners each see the other listeners' +outputs as new events, so two separate hooks can cascade-trigger each +other indefinitely. A single script applies all transforms in one pass +and records the result in a shared state file: + +```bash +#!/usr/bin/env bash +# `-n 0` → react only to clips added after the hook starts. +SEEN=~/.cache/clipcat-hooks/produced-ids +mkdir -p "$(dirname "$SEEN")" && touch "$SEEN" + +clipcatctl tail -f -n 0 | while IFS= read -r line; do + id=${line%%:*} + grep -qxF "$id" "$SEEN" && continue + raw=$(clipcatctl get "$id") || continue + + # Apply your transforms here (compose as many as you need). + out=${raw#"${raw%%[![:space:]]*}"} # trim leading whitespace + out=${out%"${out##*[![:space:]]}"} # trim trailing whitespace + # out="[$(date -I)] $out" # uncomment to date-stamp + + [ "$raw" = "$out" ] && continue + new_id=$(clipcatctl update "$id" "$out") || continue + printf '%s\n' "$new_id" >>"$SEEN" + + # Cap the state file so it does not grow without bound. + if [ "$(wc -l <"$SEEN")" -gt 1024 ]; then + tail -n 512 "$SEEN" >"$SEEN.tmp" && mv "$SEEN.tmp" "$SEEN" + fi +done +``` + +`clipcatctl update` prints the new clip's id, which the hook records in +`$SEEN` so it's skipped when `tail -f` re-emits it. `clipcatctl tail -f` +also reconnects automatically on `clipcatd` restarts and resumes +streaming new clips, so the script can run as a long-lived service. + +
+
Starting clipcatd with systemd diff --git a/clipcatctl/Cargo.toml b/clipcatctl/Cargo.toml index 6d08b915..cbba1eca 100644 --- a/clipcatctl/Cargo.toml +++ b/clipcatctl/Cargo.toml @@ -27,7 +27,9 @@ bytes = { workspace = true } clap = { workspace = true } clap_complete = { workspace = true } directories = { workspace = true } +futures = { workspace = true } http = { workspace = true } +lru = { workspace = true } mime = { workspace = true } shadow-rs = { workspace = true } shellexpand = { workspace = true } diff --git a/clipcatctl/src/cli.rs b/clipcatctl/src/cli.rs index 79f81b61..9cf74b6b 100644 --- a/clipcatctl/src/cli.rs +++ b/clipcatctl/src/cli.rs @@ -1,9 +1,16 @@ -use std::{io::Write, num::ParseIntError, path::PathBuf}; +use std::{ + io::Write, + num::{NonZeroUsize, ParseIntError}, + path::PathBuf, + time::Duration, +}; use clap::{CommandFactory, Parser, Subcommand}; use clipcat_base::{ClipEntryMetadata, ClipboardKind, ClipboardWatcherState}; use clipcat_client::{Client, History, Manager as _, System, Watcher as _}; use clipcat_external_editor::ExternalEditor; +use futures::StreamExt; +use lru::LruCache; use snafu::ResultExt; use tokio::{ io::{AsyncReadExt, AsyncWriteExt}, @@ -126,6 +133,27 @@ pub enum Commands { no_id: bool, }, + #[clap(about = "Print most recent clipboard entries, oldest first; `-f` to follow")] + Tail { + #[clap(long)] + no_id: bool, + + #[clap( + long = "lines", + short = 'n', + default_value = "10", + help = "Print the most recent N entries (oldest of those first)" + )] + lines: u64, + + #[clap( + long = "follow", + short = 'f', + help = "Follow new clipboard entries; reconnects on clipcatd restart" + )] + follow: bool, + }, + #[clap(about = "Update clip with ")] Update { #[clap(value_parser = parse_hex)] @@ -214,6 +242,13 @@ impl Cli { pub fn run(self) -> Result { let client_version = Self::command().get_version().unwrap_or_default().to_string(); match self.commands { + Some(Commands::Tail { no_id, lines, follow }) => { + let config = self.load_config(); + config.log.registry(); + return Runtime::new() + .context(error::InitializeTokioRuntimeSnafu)? + .block_on(run_tail(config, no_id, lines, follow)); + } Some(Commands::Version { client }) if client => { std::io::stdout() .write_all(Self::command().render_long_version().as_bytes()) @@ -466,6 +501,16 @@ fn print_watcher_state(state: ClipboardWatcherState) { println!("{msg}"); } +fn format_metadata_line( + metadata: &ClipEntryMetadata, + no_id: bool, + show_source_prefix: bool, +) -> String { + let ClipEntryMetadata { id, preview, kind, .. } = metadata; + let prefix = if show_source_prefix { format!("{} ", kind.prefix()) } else { String::new() }; + if no_id { format!("{prefix}{preview}\n") } else { format!("{id:016x}: {prefix}{preview}\n") } +} + async fn print_list( client: &Client, preview_length: usize, @@ -474,17 +519,149 @@ async fn print_list( ) -> Result<(), Error> { let metadata_list = client.list(preview_length).await?; for metadata in metadata_list { - let ClipEntryMetadata { id, preview, kind, .. } = metadata; - let prefix = if show_source_prefix { format!("{} ", kind.prefix()) } else { String::new() }; - let output = if no_id { - format!("{prefix}{preview}\n") - } else { - format!("{id:016x}: {prefix}{preview}\n") - }; - tokio::io::stdout().write_all(output.as_bytes()).await.context(error::WriteStdoutSnafu)?; + let line = format_metadata_line(&metadata, no_id, show_source_prefix); + tokio::io::stdout().write_all(line.as_bytes()).await.context(error::WriteStdoutSnafu)?; } Ok(()) } #[inline] const fn parse_hex(src: &str) -> Result { u64::from_str_radix(src, 16) } + +const TAIL_BACKOFF_INITIAL: Duration = Duration::from_millis(100); +const TAIL_BACKOFF_MAX: Duration = Duration::from_secs(5); +const TAIL_PRINTED_IDS_CAP: NonZeroUsize = NonZeroUsize::new(1024).unwrap(); + +struct TailState { + printed_ids: LruCache, + is_first_session: bool, + backoff: Duration, +} + +impl TailState { + fn new() -> Self { + Self { + printed_ids: LruCache::new(TAIL_PRINTED_IDS_CAP), + is_first_session: true, + backoff: TAIL_BACKOFF_INITIAL, + } + } + + fn next_backoff(&mut self) -> Duration { + let current = self.backoff; + self.backoff = std::cmp::min(self.backoff.saturating_mul(2), TAIL_BACKOFF_MAX); + current + } + + const fn reset_backoff(&mut self) { self.backoff = TAIL_BACKOFF_INITIAL; } +} + +async fn write_metadata_line( + metadata: &ClipEntryMetadata, + no_id: bool, + show_source_prefix: bool, +) -> Result<(), Error> { + let line = format_metadata_line(metadata, no_id, show_source_prefix); + tokio::io::stdout().write_all(line.as_bytes()).await.context(error::WriteStdoutSnafu)?; + Ok(()) +} + +async fn run_tail(config: Config, no_id: bool, lines: u64, follow: bool) -> Result { + let preview_length = config.preview_length; + let show_source_prefix = config.show_source_prefix; + let mut state = TailState::new(); + if !follow { + // Single-shot: fail fast if the daemon is unreachable. + return run_tail_session( + &config, + no_id, + lines, + preview_length, + show_source_prefix, + &mut state, + false, + ) + .await + .map(|()| 0); + } + loop { + match run_tail_session( + &config, + no_id, + lines, + preview_length, + show_source_prefix, + &mut state, + true, + ) + .await + { + Ok(()) => return Ok(0), + Err(err) => { + let delay = state.next_backoff(); + tracing::debug!( + "tail session ended ({err}); reconnecting in {} ms", + delay.as_millis() + ); + tokio::time::sleep(delay).await; + } + } + } +} + +async fn run_tail_session( + config: &Config, + no_id: bool, + lines: u64, + preview_length: usize, + show_source_prefix: bool, + state: &mut TailState, + follow: bool, +) -> Result<(), Error> { + let client = Client::builder() + .grpc_endpoint(config.server_endpoint.clone()) + .access_token(config.access_token()) + .max_decoding_message_size(config.grpc_max_message_size) + .build() + .await?; + + // Subscribe before listing so events from the gap between the two are + // buffered in the receiver and deduped via `printed_ids`. On reconnect we + // skip the list entirely; only entries arriving on the new stream are + // emitted. + let stream = if follow { Some(client.subscribe(preview_length).await?) } else { None }; + + if state.is_first_session { + let take = usize::try_from(lines).unwrap_or(0); + if take > 0 { + let snapshot = client.list(preview_length).await?; + // `client.list` returns newest first; take the most recent `take` + // entries, then iterate in reverse so output reads oldest -> newest, + // matching the chronological order of streamed events under `-f`. + let recent: Vec<_> = snapshot.into_iter().take(take).collect(); + for metadata in recent.iter().rev() { + if state.printed_ids.contains(&metadata.id) { + continue; + } + write_metadata_line(metadata, no_id, show_source_prefix).await?; + let _ = state.printed_ids.put(metadata.id, ()); + } + } + state.is_first_session = false; + } + + let Some(mut stream) = stream else { + return Ok(()); + }; + while let Some(item) = stream.next().await { + let metadata = item?; + if state.printed_ids.contains(&metadata.id) { + continue; + } + write_metadata_line(&metadata, no_id, show_source_prefix).await?; + let _ = state.printed_ids.put(metadata.id, ()); + state.reset_backoff(); + } + + Err(Error::Operation { error: "subscription stream ended".to_owned() }) +} diff --git a/clipcatctl/src/error.rs b/clipcatctl/src/error.rs index 54d5c330..5a6cb1b8 100644 --- a/clipcatctl/src/error.rs +++ b/clipcatctl/src/error.rs @@ -102,6 +102,12 @@ impl From for Error { } } +impl From for Error { + fn from(err: clipcat_client::error::SubscribeClipError) -> Self { + Self::Operation { error: err.to_string() } + } +} + impl From for Error { fn from(err: clipcat_client::error::EnableWatcherError) -> Self { Self::Operation { error: err.to_string() } diff --git a/crates/client/Cargo.toml b/crates/client/Cargo.toml index 96d3fff7..afb63d7b 100644 --- a/crates/client/Cargo.toml +++ b/crates/client/Cargo.toml @@ -15,6 +15,7 @@ keywords.workspace = true [dependencies] tracing = { workspace = true } +futures = { workspace = true } hyper-util = { workspace = true } tokio = { workspace = true } diff --git a/crates/client/src/error.rs b/crates/client/src/error.rs index e2b777b2..6eba8760 100644 --- a/crates/client/src/error.rs +++ b/crates/client/src/error.rs @@ -166,6 +166,21 @@ impl fmt::Display for ListClipError { } } +#[derive(Debug)] +pub enum SubscribeClipError { + Status { source: tonic::Status }, +} + +impl fmt::Display for SubscribeClipError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Status { source } => source.fmt(f), + } + } +} + +impl std::error::Error for SubscribeClipError {} + #[derive(Debug)] pub enum EnableWatcherError { Status { source: tonic::Status }, diff --git a/crates/client/src/lib.rs b/crates/client/src/lib.rs index 63517bd8..2d34e3b2 100644 --- a/crates/client/src/lib.rs +++ b/crates/client/src/lib.rs @@ -14,7 +14,7 @@ use self::interceptor::Interceptor; pub use self::{ error::{Error, Result}, history::History, - manager::Manager, + manager::{Manager, SubscribeStream}, system::System, watcher::Watcher, }; diff --git a/crates/client/src/manager.rs b/crates/client/src/manager.rs index 4bee2901..f3db9228 100644 --- a/crates/client/src/manager.rs +++ b/crates/client/src/manager.rs @@ -1,15 +1,22 @@ +use std::pin::Pin; + use clipcat_base::{ClipEntry, ClipEntryMetadata, ClipboardKind}; use clipcat_proto as proto; +use futures::{Stream, StreamExt}; use tonic::Request; use crate::{ Client, error::{ BatchRemoveClipError, ClearClipError, GetClipError, GetCurrentClipError, GetLengthError, - InsertClipError, ListClipError, MarkClipError, RemoveClipError, UpdateClipError, + InsertClipError, ListClipError, MarkClipError, RemoveClipError, SubscribeClipError, + UpdateClipError, }, }; +pub type SubscribeStream = + Pin> + Send>>; + pub trait Manager { async fn get(&self, id: u64) -> Result; @@ -48,6 +55,9 @@ pub trait Manager { async fn list(&self, preview_length: usize) -> Result, ListClipError>; + async fn subscribe(&self, preview_length: usize) + -> Result; + async fn remove(&self, id: u64) -> Result; async fn batch_remove(&self, ids: &[u64]) -> Result, BatchRemoveClipError>; @@ -161,6 +171,29 @@ impl Manager for Client { Ok(list) } + async fn subscribe( + &self, + preview_length: usize, + ) -> Result { + let stream = + proto::ManagerClient::with_interceptor(self.channel.clone(), self.interceptor.clone()) + .max_decoding_message_size(self.max_decoding_message_size) + .subscribe(Request::new(proto::SubscribeRequest { + preview_length: u64::try_from(preview_length).unwrap_or(30), + })) + .await + .map_err(|source| SubscribeClipError::Status { source })? + .into_inner(); + let mapped = stream.map(|item| match item { + Ok(proto::SubscribeEvent { metadata: Some(m) }) => Ok(ClipEntryMetadata::from(m)), + Ok(proto::SubscribeEvent { metadata: None }) => Err(SubscribeClipError::Status { + source: tonic::Status::data_loss("SubscribeEvent missing metadata"), + }), + Err(source) => Err(SubscribeClipError::Status { source }), + }); + Ok(Box::pin(mapped)) + } + async fn remove(&self, id: u64) -> Result { let proto::RemoveResponse { ok } = proto::ManagerClient::with_interceptor(self.channel.clone(), self.interceptor.clone()) diff --git a/crates/proto/proto/manager.proto b/crates/proto/proto/manager.proto index 85c54c3f..0b552860 100644 --- a/crates/proto/proto/manager.proto +++ b/crates/proto/proto/manager.proto @@ -21,6 +21,8 @@ service Manager { rpc Mark(MarkRequest) returns (MarkResponse); rpc Length(google.protobuf.Empty) returns (LengthResponse); + + rpc Subscribe(SubscribeRequest) returns (stream SubscribeEvent); } enum ClipboardKind { @@ -110,3 +112,10 @@ message BatchRemoveRequest { message BatchRemoveResponse { repeated uint64 ids = 1; } + +message SubscribeRequest { + uint64 preview_length = 1; +} +message SubscribeEvent { + ClipEntryMetadata metadata = 1; +} diff --git a/crates/proto/src/lib.rs b/crates/proto/src/lib.rs index 50beeda5..41a29447 100644 --- a/crates/proto/src/lib.rs +++ b/crates/proto/src/lib.rs @@ -30,8 +30,8 @@ pub use self::proto::{ BatchRemoveRequest, BatchRemoveResponse, ClipEntry, ClipEntryMetadata, ClipboardKind, GetCurrentClipRequest, GetCurrentClipResponse, GetRequest, GetResponse, GetSystemVersionResponse, InsertRequest, InsertResponse, LengthResponse, ListRequest, - ListResponse, MarkRequest, MarkResponse, RemoveRequest, RemoveResponse, UpdateRequest, - UpdateResponse, WatcherState, WatcherStateReply, + ListResponse, MarkRequest, MarkResponse, RemoveRequest, RemoveResponse, SubscribeEvent, + SubscribeRequest, UpdateRequest, UpdateResponse, WatcherState, WatcherStateReply, history_client::HistoryClient, history_server::{History, HistoryServer}, manager_client::ManagerClient, diff --git a/crates/server/src/grpc/manager.rs b/crates/server/src/grpc/manager.rs index 8abb5a59..80384d0c 100644 --- a/crates/server/src/grpc/manager.rs +++ b/crates/server/src/grpc/manager.rs @@ -1,7 +1,12 @@ -use std::{str::FromStr, sync::Arc}; +use std::{pin::Pin, str::FromStr, sync::Arc}; use clipcat_proto as proto; +use futures::Stream; use tokio::sync::Mutex; +use tokio_stream::{ + StreamExt, + wrappers::{BroadcastStream, errors::BroadcastStreamRecvError}, +}; use tonic::{Request, Response, Status}; use crate::{ClipboardManager, notification}; @@ -21,6 +26,9 @@ impl proto::Manager for ManagerService where Notification: notification::Notification + 'static, { + type SubscribeStream = + Pin> + Send + 'static>>; + async fn insert( &self, request: Request, @@ -146,4 +154,28 @@ where }; Ok(Response::new(proto::LengthResponse { length })) } + + async fn subscribe( + &self, + request: Request, + ) -> Result, Status> { + let proto::SubscribeRequest { preview_length } = request.into_inner(); + let preview_length = usize::try_from(preview_length).unwrap_or(30); + let receiver = { + let manager = self.manager.lock().await; + manager.subscribe() + }; + let stream = BroadcastStream::new(receiver).filter_map(move |result| match result { + Ok(entry) => { + let metadata = proto::ClipEntryMetadata::from(entry.metadata(Some(preview_length))); + Some(Ok(proto::SubscribeEvent { metadata: Some(metadata) })) + } + Err(BroadcastStreamRecvError::Lagged(skipped)) => { + tracing::warn!("Subscriber lagged, skipped {skipped} clip events"); + None + } + }); + let stream: Self::SubscribeStream = Box::pin(stream); + Ok(Response::new(stream)) + } } diff --git a/crates/server/src/manager/mod.rs b/crates/server/src/manager/mod.rs index 600dfde6..5c563f78 100644 --- a/crates/server/src/manager/mod.rs +++ b/crates/server/src/manager/mod.rs @@ -8,10 +8,13 @@ use std::{ use clipcat_base::{ClipEntry, ClipEntryMetadata, ClipboardContent, ClipboardKind}; use snafu::ResultExt; use time::OffsetDateTime; +use tokio::sync::broadcast; pub use self::error::Error; use crate::{backend::ClipboardBackend, notification}; +const CLIP_EVENT_CHANNEL_CAPACITY: usize = 64; + #[cfg(test)] const DEFAULT_CAPACITY: usize = 40; @@ -34,6 +37,8 @@ pub struct ClipboardManager { snippet_ids: HashSet, notification: Notification, + + clip_sender: broadcast::Sender, } impl ClipboardManager @@ -46,6 +51,7 @@ where primary_threshold: time::Duration, notification: Notification, ) -> Self { + let (clip_sender, _) = broadcast::channel(CLIP_EVENT_CHANNEL_CAPACITY); Self { backend, primary_threshold, @@ -55,9 +61,13 @@ where timestamp_to_id: BTreeMap::new(), snippet_ids: HashSet::new(), notification, + clip_sender, } } + #[inline] + pub fn subscribe(&self) -> broadcast::Receiver { self.clip_sender.subscribe() } + #[cfg(test)] #[inline] pub fn new(backend: Arc, notification: Notification) -> Self { @@ -157,9 +167,10 @@ where let (id, timestamp) = (entry.id(), entry.timestamp()); self.current_clips[usize::from(entry.kind())] = Some(id); - drop(self.clips.insert(id, entry)); + drop(self.clips.insert(id, entry.clone())); let _unused = self.timestamp_to_id.insert(timestamp, id); self.remove_oldest(); + let _unused = self.clip_sender.send(entry); id } @@ -256,10 +267,11 @@ mod tests { use std::{collections::HashSet, sync::Arc, time::Duration}; use clipcat_base::{ClipEntry, ClipboardKind}; + use tokio::sync::broadcast::error::TryRecvError; use crate::{ backend::LocalClipboardBackend, - manager::{ClipboardManager, DEFAULT_CAPACITY}, + manager::{CLIP_EVENT_CHANNEL_CAPACITY, ClipboardManager, DEFAULT_CAPACITY}, notification::DummyNotification, }; @@ -472,4 +484,78 @@ mod tests { assert_eq!(mgr.len(), 0, "With capacity 0, clips should be immediately evicted"); assert!(mgr.export(false).is_empty()); } + + #[test] + fn test_subscribe_receives_inserted_clips() { + let backend = Arc::new(LocalClipboardBackend::new()); + let notification = DummyNotification::default(); + let mut mgr = ClipboardManager::new(backend, notification); + let mut rx = mgr.subscribe(); + + let clips = create_clips(3); + let ids: Vec = clips.iter().map(ClipEntry::id).collect(); + for clip in clips { + let _ = mgr.insert(clip); + } + + for expected_id in ids { + let received = rx.try_recv().expect("subscriber should receive event"); + assert_eq!(received.id(), expected_id); + } + assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty); + } + + #[test] + fn test_subscribe_no_subscriber_does_not_panic() { + let backend = Arc::new(LocalClipboardBackend::new()); + let notification = DummyNotification::default(); + let mut mgr = ClipboardManager::new(backend, notification); + + for clip in create_clips(5) { + let _ = mgr.insert(clip); + } + assert_eq!(mgr.len(), 5); + } + + #[test] + fn test_subscribe_receives_replaced_clip() { + const MIME: mime::Mime = mime::TEXT_PLAIN_UTF_8; + + let backend = Arc::new(LocalClipboardBackend::new()); + let notification = DummyNotification::default(); + let mut mgr = ClipboardManager::new(backend, notification); + let old_id = + mgr.insert(ClipEntry::new(b"original", &MIME, ClipboardKind::Clipboard, None).unwrap()); + + let mut rx = mgr.subscribe(); + let (ok, new_id) = mgr.replace(old_id, b"transformed", &MIME); + assert!(ok); + + let received = rx.try_recv().expect("subscriber should receive replace event"); + assert_eq!(received.id(), new_id); + assert_eq!(received.as_bytes(), b"transformed"); + assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty); + } + + #[test] + fn test_subscribe_lagged_when_overrun() { + let backend = Arc::new(LocalClipboardBackend::new()); + let notification = DummyNotification::default(); + let mut mgr = ClipboardManager::with_capacity( + backend, + CLIP_EVENT_CHANNEL_CAPACITY * 2, + time::Duration::milliseconds(0), + notification, + ); + let mut rx = mgr.subscribe(); + + for clip in create_clips(CLIP_EVENT_CHANNEL_CAPACITY + 5) { + let _ = mgr.insert(clip); + } + + match rx.try_recv() { + Err(TryRecvError::Lagged(_)) => {} + other => panic!("expected Lagged error, got {other:?}"), + } + } }