Skip to content

Commit 9e60092

Browse files
committed
WatsonWebsocket
1 parent 423f882 commit 9e60092

2 files changed

Lines changed: 116 additions & 111 deletions

File tree

src/HuaJiBot.NET/HuaJiBot.NET.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
<PackageReference Include="SixLabors.Fonts" Version="2.1.3" />
1616
<PackageReference Include="SixLabors.ImageSharp" Version="3.1.11" />
1717
<PackageReference Include="SixLabors.ImageSharp.Drawing" Version="2.1.7" />
18-
<PackageReference Include="WebsocketClientLite.PCL" Version="8.2.0" />
18+
<PackageReference Include="WatsonWebsocket" Version="4.1.6" />
1919
</ItemGroup>
2020

2121
<ItemGroup>

src/HuaJiBot.NET/Websocket/WebsocketClient.cs

Lines changed: 115 additions & 110 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,20 @@
1-
using System.Net.Sockets;
2-
using System.Reactive.Disposables;
3-
using System.Reactive.Linq;
4-
using System.Security.Authentication;
1+
using System.Net.WebSockets;
52
using System.Text;
6-
using IWebsocketClientLite;
73
using Microsoft.Extensions.Logging;
84
using Newtonsoft.Json.Linq;
9-
using WebsocketClientLite;
5+
using WatsonWebsocket;
106
using ILogger = Microsoft.Extensions.Logging.ILogger;
117

128
namespace HuaJiBot.NET.Websocket;
139

1410
public class WebsocketClient : IWebsocketClient
1511
{
16-
private readonly ClientWebSocketRx _client;
12+
private readonly WatsonWsClient _client;
1713
private readonly ILogger? _logger;
18-
private readonly CancellationTokenSource _outerCancellationTokenSource = new();
19-
private readonly CompositeDisposable _disposables = new();
14+
private readonly CancellationTokenSource _cancellationTokenSource = new();
2015
private bool _hasConnectedBefore;
16+
private readonly Uri _url;
17+
private readonly Dictionary<string, string> _headers;
2118

2219
public WebsocketClient(
2320
string url,
@@ -35,116 +32,107 @@ public WebsocketClient(
3532
)
3633
{
3734
_logger = logger;
38-
39-
headers ??= new();
35+
_url = url;
36+
_headers = headers ?? new Dictionary<string, string>();
4037

4138
if (!string.IsNullOrEmpty(token))
4239
{
43-
headers = headers.Append(new("Authorization", $"Bearer {token}")).ToDictionary();
40+
_headers["Authorization"] = $"Bearer {token}";
4441
}
4542

46-
_client = new() { Headers = headers, TlsProtocolType = SslProtocols.Tls13 };
47-
IDisposable isConnectedDisposable = _client.IsConnectedObservable.Subscribe(isConnected =>
43+
// Create Watson WebSocket client with Uri
44+
_client = new WatsonWsClient(url);
45+
46+
// Configure client options with custom headers
47+
_client.ConfigureOptions(options =>
4848
{
49-
_logger?.LogDebug($"Is connected: {isConnected}");
50-
if (isConnected)
49+
// Set custom headers on the underlying ClientWebSocket
50+
foreach (var header in _headers)
5151
{
52-
var connectionInfo = new ConnectionInfo
52+
try
5353
{
54-
IsReconnect = _hasConnectedBefore,
55-
Timestamp = DateTimeOffset.Now,
56-
};
57-
_hasConnectedBefore = true;
58-
OnConnected?.Invoke(connectionInfo);
59-
}
60-
else
61-
{
62-
var disconnectionInfo = new DisconnectionInfo
54+
options.SetRequestHeader(header.Key, header.Value);
55+
}
56+
catch (Exception ex)
6357
{
64-
Type = DisconnectionType.Lost,
65-
Timestamp = DateTimeOffset.Now,
66-
Reason = "Connection lost or closed",
67-
};
68-
OnClosed?.Invoke(disconnectionInfo);
58+
_logger?.LogWarning(ex, $"Failed to set header {header.Key}");
59+
}
6960
}
7061
});
7162

72-
_disposables.Add(isConnectedDisposable);
73-
74-
Func<IObservable<(IDataframe dataframe, ConnectionStatus state)>> connect = () =>
75-
_client.WebsocketConnectWithStatusObservable(
76-
uri: url,
77-
hasClientPing: true,
78-
clientPingInterval: TimeSpan.FromSeconds(10),
79-
clientPingMessage: "ping message",
80-
cancellationToken: _outerCancellationTokenSource.Token
81-
)!;
82-
83-
IDisposable disposableConnectionStatus = Observable
84-
.Defer(connect)
85-
.Retry()
86-
.Repeat()
87-
.DelaySubscription(TimeSpan.FromSeconds(10))
88-
.Do(async tuple =>
63+
// Subscribe to events
64+
_client.ServerConnected += OnServerConnected;
65+
_client.ServerDisconnected += OnServerDisconnected;
66+
_client.MessageReceived += OnMessageReceived;
67+
68+
// Start connection
69+
_ = StartConnectionAsync();
70+
}
71+
72+
private async Task StartConnectionAsync()
73+
{
74+
try
75+
{
76+
_logger?.LogInformation($"WebSocket connecting to {_url}");
77+
78+
await Task.Run(() => _client.Start(), _cancellationTokenSource.Token);
79+
}
80+
catch (Exception e)
81+
{
82+
_logger?.LogError(e, "Error starting WebSocket connection");
83+
OnClosed?.Invoke(new DisconnectionInfo
8984
{
90-
_logger?.LogDebug($"Connection status: {tuple.state}");
85+
Type = DisconnectionType.Error,
86+
Reason = e.Message,
87+
Exception = e
88+
});
89+
}
90+
}
9191

92-
if (
93-
tuple.state == ConnectionStatus.DataframeReceived
94-
&& tuple.dataframe?.Message is { } message
95-
)
96-
{
97-
_logger?.LogDebug($"Received message: {message}");
98-
try
99-
{
100-
await ProcessMessageAsync(message);
101-
}
102-
catch (Exception e)
103-
{
104-
_logger?.LogError(e, "Error processing message");
105-
}
106-
}
107-
})
108-
.Subscribe(
109-
_ => { },
110-
ex =>
111-
{
112-
_logger?.LogError(ex, "Connection status error");
113-
114-
// 根据异常类型推断断开原因
115-
var disconnectionInfo = new DisconnectionInfo
116-
{
117-
Timestamp = DateTimeOffset.Now,
118-
Exception = ex,
119-
};
120-
121-
if (ex is OperationCanceledException)
122-
{
123-
disconnectionInfo.Type = DisconnectionType.ByUser;
124-
disconnectionInfo.Reason = "Operation cancelled by user";
125-
}
126-
else if (ex is SocketException socketEx)
127-
{
128-
disconnectionInfo.Type = DisconnectionType.Error;
129-
disconnectionInfo.Reason = $"Socket error: {socketEx.Message}";
130-
}
131-
else if (ex is TimeoutException)
132-
{
133-
disconnectionInfo.Type = DisconnectionType.Timeout;
134-
disconnectionInfo.Reason = "Connection timeout";
135-
}
136-
else
137-
{
138-
disconnectionInfo.Type = DisconnectionType.Error;
139-
disconnectionInfo.Reason = $"Connection error: {ex.Message}";
140-
}
141-
142-
OnClosed?.Invoke(disconnectionInfo);
143-
},
144-
() => _logger?.LogDebug("Connection status completed")
145-
);
146-
147-
_disposables.Add(disposableConnectionStatus);
92+
private void OnServerConnected(object? sender, EventArgs e)
93+
{
94+
_logger?.LogInformation("WebSocket connected");
95+
96+
var connectionInfo = new ConnectionInfo
97+
{
98+
IsReconnect = _hasConnectedBefore,
99+
Timestamp = DateTimeOffset.Now
100+
};
101+
102+
_hasConnectedBefore = true;
103+
OnConnected?.Invoke(connectionInfo);
104+
}
105+
106+
private void OnServerDisconnected(object? sender, EventArgs e)
107+
{
108+
_logger?.LogInformation("WebSocket disconnected");
109+
110+
OnClosed?.Invoke(new DisconnectionInfo
111+
{
112+
Type = DisconnectionType.ByServer,
113+
Reason = "Server disconnected",
114+
Timestamp = DateTimeOffset.Now
115+
});
116+
}
117+
118+
private void OnMessageReceived(object? sender, MessageReceivedEventArgs e)
119+
{
120+
// Fire and forget - don't block the event handler
121+
_ = ProcessMessageInternalAsync(e);
122+
}
123+
124+
private async Task ProcessMessageInternalAsync(MessageReceivedEventArgs e)
125+
{
126+
try
127+
{
128+
var msg = Encoding.UTF8.GetString(e.Data.ToArray());
129+
_logger?.LogDebug($"Received message: {msg}");
130+
await ProcessMessageAsync(msg);
131+
}
132+
catch (Exception ex)
133+
{
134+
_logger?.LogError(ex, "Error processing received message");
135+
}
148136
}
149137

150138
private async ValueTask ProcessMessageAsync(string msg)
@@ -158,11 +146,22 @@ private async ValueTask ProcessMessageAsync(string msg)
158146
public event Action<DisconnectionInfo>? OnClosed;
159147

160148
public void Send(string msg)
149+
{
150+
// Fire and forget - don't block
151+
_ = SendAsync(msg);
152+
}
153+
154+
private async Task SendAsync(string msg)
161155
{
162156
try
163157
{
164-
var bytes = Encoding.UTF8.GetBytes(msg);
165-
_ = _client.Sender?.SendBinary(bytes, OpcodeKind.Text);
158+
if (!_client.Connected)
159+
{
160+
_logger?.LogWarning("Cannot send message: WebSocket is not connected");
161+
return;
162+
}
163+
164+
await _client.SendAsync(msg, WebSocketMessageType.Text);
166165
_logger?.LogDebug($"Sent message: {msg}");
167166
}
168167
catch (Exception e)
@@ -182,9 +181,15 @@ public void Dispose()
182181

183182
try
184183
{
185-
_outerCancellationTokenSource.Cancel();
186-
_disposables.Dispose();
187-
_outerCancellationTokenSource.Dispose();
184+
_cancellationTokenSource.Cancel();
185+
186+
if (_client.Connected)
187+
{
188+
_client.Stop();
189+
}
190+
191+
_client.Dispose();
192+
_cancellationTokenSource.Dispose();
188193
}
189194
catch (Exception e)
190195
{

0 commit comments

Comments
 (0)