Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Libraries/Microsoft.Teams.AI/Stream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public class Stream(OnStreamChunk onChunk) : IStream
[Obsolete("Use EmitAsync instead to avoid sync-over-async blocking.")]
public void Emit(string text)
{
EmitAsync(text).ConfigureAwait(false).GetAwaiter().GetResult();
onChunk(text).GetAwaiter().GetResult();
}

public Task EmitAsync(string text)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using Microsoft.Teams.Api.Activities;
using Microsoft.Teams.Api.Entities;
using Microsoft.Teams.Apps.Plugins;
using Microsoft.Teams.Common.Logging;

using static Microsoft.Teams.Common.Extensions.TaskExtensions;

Expand All @@ -21,6 +22,7 @@ public class Stream : IStreamer
public int Sequence => _index;

public required Func<IActivity, Task<IActivity>> Send { get; set; }
public ILogger? Logger { get; set; }
public event IStreamer.OnChunkHandler OnChunk = (_) => { };

protected int _index = 1;
Expand Down Expand Up @@ -215,8 +217,9 @@ private async Task FlushSafe()
{
await Flush().ConfigureAwait(false);
}
catch (Exception)
catch (Exception ex)
{
Logger?.Warn("Stream flush failed; will retry if there is pending state.", ex);
// Reschedule a retry so Close() doesn't spin forever
// waiting for _id to be set after a transient send failure.
if (_queue.Count > 0 || _id is null && _count > 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,8 @@ public IStreamer CreateStream(Api.ConversationReference reference, CancellationT
{
var res = await Send(activity, reference, cancellationToken).ConfigureAwait(false);
return res;
}
},
Logger = Logger.Child("stream")
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public McpClientPlugin UseMcpServer(string url, McpClientPluginParams? pluginPar

public override async Task<FunctionCollection> OnBuildFunctions<TOptions>(IChatPrompt<TOptions> prompt, FunctionCollection functions, CancellationToken cancellationToken = default)
{
await FetchToolsIfNeeded().ConfigureAwait(false);
await FetchToolsIfNeeded(cancellationToken).ConfigureAwait(false);

foreach (var entry in _mcpServerParams)
{
Expand Down Expand Up @@ -100,7 +100,7 @@ public override async Task<FunctionCollection> OnBuildFunctions<TOptions>(IChatP
///
/// Checks if cached values have expired or if tools have never been fetched. Performs parallel fetching for efficiency.
/// </summary>
internal async Task FetchToolsIfNeeded()
internal async Task FetchToolsIfNeeded(CancellationToken cancellationToken = default)
{
var fetchNeeded = new List<KeyValuePair<string, McpClientPluginParams>>();

Expand Down Expand Up @@ -132,7 +132,7 @@ internal async Task FetchToolsIfNeeded()
{
string url = entry.Key;
McpClientPluginParams pluginParams = entry.Value;
tasks.Add(FetchToolsFromServer(new Uri(url), pluginParams));
tasks.Add(FetchToolsFromServer(new Uri(url), pluginParams, cancellationToken));
}
try
{
Expand Down Expand Up @@ -179,11 +179,11 @@ internal async Task FetchToolsIfNeeded()
}
}

internal async Task<List<McpToolDetails>> FetchToolsFromServer(Uri url, McpClientPluginParams pluginParams)
internal async Task<List<McpToolDetails>> FetchToolsFromServer(Uri url, McpClientPluginParams pluginParams, CancellationToken cancellationToken = default)
{
IClientTransport transport = CreateTransport(url, pluginParams.Transport, pluginParams.HeadersFactory());
var client = await McpClientFactory.CreateAsync(transport).ConfigureAwait(false);
var tools = await client.ListToolsAsync().ConfigureAwait(false);
var client = await McpClientFactory.CreateAsync(transport, cancellationToken: cancellationToken).ConfigureAwait(false);
var tools = await client.ListToolsAsync(cancellationToken: cancellationToken).ConfigureAwait(false);

// Convert MCP tools to our format
var mappedTools = tools.Select(t => new McpToolDetails()
Expand Down