diff --git a/Libraries/Microsoft.Teams.AI/Stream.cs b/Libraries/Microsoft.Teams.AI/Stream.cs index 1064c123..d758069d 100644 --- a/Libraries/Microsoft.Teams.AI/Stream.cs +++ b/Libraries/Microsoft.Teams.AI/Stream.cs @@ -39,7 +39,7 @@ public class Stream(OnStreamChunk onChunk) : IStream [Obsolete("Use EmitAsync instead to avoid sync-over-async blocking.")] public void Emit(string text) { - EmitAsync(text).ConfigureAwait(false).GetAwaiter().GetResult(); + onChunk(text).GetAwaiter().GetResult(); } public Task EmitAsync(string text) diff --git a/Libraries/Microsoft.Teams.Plugins/Microsoft.Teams.Plugins.AspNetCore/AspNetCorePlugin.Stream.cs b/Libraries/Microsoft.Teams.Plugins/Microsoft.Teams.Plugins.AspNetCore/AspNetCorePlugin.Stream.cs index 7b06d5b0..4c19075e 100644 --- a/Libraries/Microsoft.Teams.Plugins/Microsoft.Teams.Plugins.AspNetCore/AspNetCorePlugin.Stream.cs +++ b/Libraries/Microsoft.Teams.Plugins/Microsoft.Teams.Plugins.AspNetCore/AspNetCorePlugin.Stream.cs @@ -7,6 +7,7 @@ using Microsoft.Teams.Api.Activities; using Microsoft.Teams.Api.Entities; using Microsoft.Teams.Apps.Plugins; +using Microsoft.Teams.Common.Logging; using static Microsoft.Teams.Common.Extensions.TaskExtensions; @@ -21,6 +22,7 @@ public class Stream : IStreamer public int Sequence => _index; public required Func> Send { get; set; } + public ILogger? Logger { get; set; } public event IStreamer.OnChunkHandler OnChunk = (_) => { }; protected int _index = 1; @@ -215,8 +217,9 @@ private async Task FlushSafe() { await Flush().ConfigureAwait(false); } - catch (Exception) + catch (Exception ex) { + Logger?.Warn("Stream flush failed; will retry if there is pending state.", ex); // Reschedule a retry so Close() doesn't spin forever // waiting for _id to be set after a transient send failure. if (_queue.Count > 0 || _id is null && _count > 0) diff --git a/Libraries/Microsoft.Teams.Plugins/Microsoft.Teams.Plugins.AspNetCore/AspNetCorePlugin.cs b/Libraries/Microsoft.Teams.Plugins/Microsoft.Teams.Plugins.AspNetCore/AspNetCorePlugin.cs index 6a80912f..33c41c0e 100644 --- a/Libraries/Microsoft.Teams.Plugins/Microsoft.Teams.Plugins.AspNetCore/AspNetCorePlugin.cs +++ b/Libraries/Microsoft.Teams.Plugins/Microsoft.Teams.Plugins.AspNetCore/AspNetCorePlugin.cs @@ -138,7 +138,8 @@ public IStreamer CreateStream(Api.ConversationReference reference, CancellationT { var res = await Send(activity, reference, cancellationToken).ConfigureAwait(false); return res; - } + }, + Logger = Logger.Child("stream") }; } diff --git a/Libraries/Microsoft.Teams.Plugins/Microsoft.Teams.Plugins.External/Microsoft.Teams.Plugins.External.McpClient/McpClientPlugin.cs b/Libraries/Microsoft.Teams.Plugins/Microsoft.Teams.Plugins.External/Microsoft.Teams.Plugins.External.McpClient/McpClientPlugin.cs index e6ae8be6..18ff865d 100644 --- a/Libraries/Microsoft.Teams.Plugins/Microsoft.Teams.Plugins.External/Microsoft.Teams.Plugins.External.McpClient/McpClientPlugin.cs +++ b/Libraries/Microsoft.Teams.Plugins/Microsoft.Teams.Plugins.External/Microsoft.Teams.Plugins.External.McpClient/McpClientPlugin.cs @@ -70,7 +70,7 @@ public McpClientPlugin UseMcpServer(string url, McpClientPluginParams? pluginPar public override async Task OnBuildFunctions(IChatPrompt prompt, FunctionCollection functions, CancellationToken cancellationToken = default) { - await FetchToolsIfNeeded().ConfigureAwait(false); + await FetchToolsIfNeeded(cancellationToken).ConfigureAwait(false); foreach (var entry in _mcpServerParams) { @@ -100,7 +100,7 @@ public override async Task OnBuildFunctions(IChatP /// /// Checks if cached values have expired or if tools have never been fetched. Performs parallel fetching for efficiency. /// - internal async Task FetchToolsIfNeeded() + internal async Task FetchToolsIfNeeded(CancellationToken cancellationToken = default) { var fetchNeeded = new List>(); @@ -132,7 +132,7 @@ internal async Task FetchToolsIfNeeded() { string url = entry.Key; McpClientPluginParams pluginParams = entry.Value; - tasks.Add(FetchToolsFromServer(new Uri(url), pluginParams)); + tasks.Add(FetchToolsFromServer(new Uri(url), pluginParams, cancellationToken)); } try { @@ -179,11 +179,11 @@ internal async Task FetchToolsIfNeeded() } } - internal async Task> FetchToolsFromServer(Uri url, McpClientPluginParams pluginParams) + internal async Task> FetchToolsFromServer(Uri url, McpClientPluginParams pluginParams, CancellationToken cancellationToken = default) { IClientTransport transport = CreateTransport(url, pluginParams.Transport, pluginParams.HeadersFactory()); - var client = await McpClientFactory.CreateAsync(transport).ConfigureAwait(false); - var tools = await client.ListToolsAsync().ConfigureAwait(false); + var client = await McpClientFactory.CreateAsync(transport, cancellationToken: cancellationToken).ConfigureAwait(false); + var tools = await client.ListToolsAsync(cancellationToken: cancellationToken).ConfigureAwait(false); // Convert MCP tools to our format var mappedTools = tools.Select(t => new McpToolDetails()