From 64e3c31c4826d94be03f99d19127861eb9a7de90 Mon Sep 17 00:00:00 2001 From: Jonathan Silverman Date: Tue, 15 Jun 2021 19:33:16 -0600 Subject: [PATCH 01/48] redis fix? --- src/amber/websockets/adapters/redis.cr | 2 +- src/amber/websockets/channel.cr | 9 ++++++++- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/src/amber/websockets/adapters/redis.cr b/src/amber/websockets/adapters/redis.cr index 4a2c044a9..e0e597594 100644 --- a/src/amber/websockets/adapters/redis.cr +++ b/src/amber/websockets/adapters/redis.cr @@ -22,7 +22,7 @@ module Amber::WebSockets::Adapters # Add a redis subscriber with topic *topic_path* def on_message(topic_path, listener) spawn do - @subscriber.subscribe(topic_path) do |on| + @subscriber.subscribe(CHANNEL_TOPIC_PATHS) do |on| on.message do |_, m| msg = JSON.parse(m) sender_id = msg["sender"].as_s diff --git a/src/amber/websockets/channel.cr b/src/amber/websockets/channel.cr index 0e3092763..79b164225 100644 --- a/src/amber/websockets/channel.cr +++ b/src/amber/websockets/channel.cr @@ -21,6 +21,9 @@ module Amber # end # end # ``` + CHANNEL_TOPIC_PATHS = [] of String + + abstract class Channel @@adapter : WebSockets::Adapters::RedisAdapter? | WebSockets::Adapters::MemoryAdapter? @topic_path : String @@ -32,7 +35,11 @@ module Amber def handle_leave(client_socket); end - def initialize(@topic_path); end + def initialize(@topic_path) + + CHANNEL_TOPIC_PATHS << @topic_path + + end # Called from proc when message is returned from the pubsub service def on_message(client_socket_id, message) From fcfc9fc66472a237da1953cb72e11c1e22cd370e Mon Sep 17 00:00:00 2001 From: Jonathan Silverman Date: Tue, 15 Jun 2021 19:36:50 -0600 Subject: [PATCH 02/48] redis fix 2? redis fix 3? redis fix 4? try this try this next try hacking redis adapter again try hacking redis adapter again will this fix it? better amber redis websocket patch remove extraneous puts change incorrect comment, explain how patch works in comment fiber.yields for better execution subscribe once and on the fly oopsies signed jsilver avoid namespace concerns weird code but will this work please sir, infer my type keep subscribe channel open forever take out fiber yields and subscribe in on message... will be in loop i think trying this oopsie 2 test choose function to activate better if i understand crystal-redis right this should work fix try this try this try this happy wise thinking programmer zen choice: register callback FIRST then subscribe will this wakeup websocket beat thread try something better tests for redis adapter might need this might need this 2 show heartbeat remove dead sockets fix lengthen deadbeat socket interval debug use Fiber.yield bug fix hang fix recover if the subscriber breaks dont loop in loop a good patch wit some consideration... dont get stuck anywhere better? remove race condition reset to 2am last night this might have been the only issue keep forgetting to do this autoselect adapter, subscribe early this is beginning to become hard please help i am trapped inside a computer program setup adapter early initialize and subscribe early dont load pubsub adapter yet cleanup get tests passing... ready to merge make tests fast attempt subscribe when registering message handlers remake subscription block fiber if subscribing no longer works without block make logs faster, remove unneeded puts, use logging mechanism for faster logging add logging to find out why sockets are left closed and undeleted nicer client socket we probably do need to yield in these websocket loops so pings can occur remove spam logging i added sleep the beat interval when waiting for pongs, this allows fibers to catch up and all pongs to get received, correct me if i'm wrong apparently, HTTP::WebSocket itself from Crystal core lib is doing a loop do... for its block/event loop and since im now using fibers we have to yield for the pings to come and go properly... also testing this --- .../websockets/adapters/redis_adapter.cr | 73 +++++++++++++++++++ spec/spec_helper.cr | 2 +- spec/support/fixtures/websockets_fixtures.cr | 17 +++++ src/amber/server/server.cr | 5 ++ src/amber/websockets/adapters/redis.cr | 63 ++++++++++++++-- src/amber/websockets/channel.cr | 2 + src/amber/websockets/client_socket.cr | 7 +- src/amber/websockets/server.cr | 1 + 8 files changed, 160 insertions(+), 10 deletions(-) create mode 100644 spec/amber/websockets/adapters/redis_adapter.cr diff --git a/spec/amber/websockets/adapters/redis_adapter.cr b/spec/amber/websockets/adapters/redis_adapter.cr new file mode 100644 index 000000000..724290a7e --- /dev/null +++ b/spec/amber/websockets/adapters/redis_adapter.cr @@ -0,0 +1,73 @@ + + +require "../../../spec_helper" + +module Amber + describe Amber::WebSockets::Adapters::RedisAdapter do + + describe "#initialize" do + it "should subscribe to CHANNEL_TOPIC_PATHS" do + + + _, client_socket = create_user_socket + _, client_socket2 = create_user_socket + + + + channel = UserSocket.channels[0][:channel] + channel2 = UserSocket.channels[1][:channel] + + channel.subscribe_to_channel(client_socket, "{}") + channel.subscribe_to_channel(client_socket2, "{}") + + channel2.subscribe_to_channel(client_socket2, "{}") + channel2.subscribe_to_channel(client_socket, "{}") + + # channel.test_field.last.should eq "handle joined #{client_socket.id}" + # Amber::WebSockets::CHANNEL_TOPIC_PATHS.should eq ["user_room", "secondary_room"] + + Amber::Server.pubsub_adapter = Amber::WebSockets::Adapters::RedisAdapter + + redis_adapter = Amber::Server.instance.pubsub_adapter.instance + + sleep 1.second + + redis_adapter.as(Amber::WebSockets::Adapters::RedisAdapter).subscribed.should eq true + end + end + + describe "#publish" do + it "should publish the message to the channel" do + _, client_socket = create_user_socket + _, client_socket2 = create_user_socket + + Amber::Server.instance.pubsub_adapter = Amber::WebSockets::Adapters::RedisAdapter + + channel = UserSocket.channels[0][:channel] + channel2 = UserSocket.channels[1][:channel] + + channel.subscribe_to_channel(client_socket, "{}") + channel.subscribe_to_channel(client_socket2, "{}") + + channel2.subscribe_to_channel(client_socket2, "{}") + channel2.subscribe_to_channel(client_socket, "{}") + + # channel.test_field.last.should eq "handle joined #{client_socket.id}" + Amber::WebSockets::CHANNEL_TOPIC_PATHS.should eq ["user_room", "secondary_room"] + + redis_adapter = Amber::WebSockets::Adapters::RedisAdapter.new + + sleep 1.second + + redis_adapter.subscribed.should eq true + + channel = UserSocket.channels[0][:channel] + message = JSON.parse({"event" => "message", "topic" => "user_room:123", "subject" => "msg:new", "payload" => {"message" => "hey guys"}}.to_json) + channel.on_message("123", message) + channel.test_field.last.should eq "hey guys" + end + end + + + end +end diff --git a/spec/spec_helper.cr b/spec/spec_helper.cr index 810d1108c..09245584e 100644 --- a/spec/spec_helper.cr +++ b/spec/spec_helper.cr @@ -21,4 +21,4 @@ require "../src/amber/cli/commands" require "./support/fixtures" require "./support/helpers" -include Helpers +include Helpers \ No newline at end of file diff --git a/spec/support/fixtures/websockets_fixtures.cr b/spec/support/fixtures/websockets_fixtures.cr index 0b473f09b..9ceb60b02 100644 --- a/spec/support/fixtures/websockets_fixtures.cr +++ b/spec/support/fixtures/websockets_fixtures.cr @@ -2,6 +2,7 @@ struct UserSocket < Amber::WebSockets::ClientSocket property test_field = Array(String).new channel "user_room:*", UserChannel + channel "secondary_room:*", SecondaryChannel def on_disconnect(**args) test_field.push("on close #{self.id}") @@ -23,3 +24,19 @@ class UserChannel < Amber::WebSockets::Channel test_field.push(msg["payload"]["message"].as_s) end end + +class SecondaryChannel < Amber::WebSockets::Channel + property test_field = Array(String).new + + def handle_leave(client_socket) + test_field.push("secondary channel handle leave #{client_socket.id}") + end + + def handle_joined(client_socket, msg) + test_field.push("secondary channel handle joined #{client_socket.id}") + end + + def handle_message(client_socket, msg) + test_field.push("secondary channel #{msg["payload"]["message"].as_s}") + end +end diff --git a/src/amber/server/server.cr b/src/amber/server/server.cr index f27f5c258..f8ef5cbc3 100644 --- a/src/amber/server/server.cr +++ b/src/amber/server/server.cr @@ -27,6 +27,11 @@ module Amber instance.pubsub_adapter.instance end + def self.pubsub_adapter=(adapter) + instance.pubsub_adapter = adapter + puts "using #{instance.pubsub_adapter.instance}" + end + def self.router instance.router end diff --git a/src/amber/websockets/adapters/redis.cr b/src/amber/websockets/adapters/redis.cr index e0e597594..bc556e6e8 100644 --- a/src/amber/websockets/adapters/redis.cr +++ b/src/amber/websockets/adapters/redis.cr @@ -3,15 +3,44 @@ module Amber::WebSockets::Adapters class RedisAdapter @subscriber : Redis @publisher : Redis + @subscribed : Bool = false + @listeners : Hash(String,Proc(String, JSON::Any, Nil)) = Hash(String, Proc(String, JSON::Any, Nil)).new def self.instance @@instance ||= new end + def subscribed # test helper + @subscribed + end + # Establish subscribe and publish connections to Redis def initialize @subscriber = Redis.new(url: Amber.settings.redis_url) @publisher = Redis.new(url: Amber.settings.redis_url) + + spawn do + @subscriber.subscribe(CHANNEL_TOPIC_PATHS) do |on| + on.message do |_, m| + Fiber.yield + msg = JSON.parse(m) + sender_id = msg["sender"].as_s + message = msg["msg"] + channel_name = message["topic"].to_s.split(":").first + @listeners[channel_name].call(sender_id, message) + end + on.subscribe do |channel, subscriptions| + Fiber.yield + Log.info { "Subscribed to Redis channel #{channel}" } + @subscribed = true + end + on.unsubscribe do |channel, subscriptions| + Fiber.yield + Log.info { "Unsubscribed from Redis channel #{channel}" } + @subscribed = false + end + end + end end # Publish the *message* to the redis publisher with topic *topic_path* @@ -21,16 +50,34 @@ module Amber::WebSockets::Adapters # Add a redis subscriber with topic *topic_path* def on_message(topic_path, listener) - spawn do - @subscriber.subscribe(CHANNEL_TOPIC_PATHS) do |on| - on.message do |_, m| - msg = JSON.parse(m) - sender_id = msg["sender"].as_s - message = msg["msg"] - listener.call(sender_id, message) + Log.info { "Setting websocket adapter listener for #{topic_path}"} + @listeners[topic_path] = listener + begin + @subscriber.subscribe(topic_path) + rescue # if we can't do it we're not in a subscribe loop, just resubscribe to all channels + spawn do + @subscriber.subscribe(CHANNEL_TOPIC_PATHS) do |on| + on.message do |_, m| + Fiber.yield + msg = JSON.parse(m) + sender_id = msg["sender"].as_s + message = msg["msg"] + channel_name = message["topic"].to_s.split(":").first + @listeners[channel_name].call(sender_id, message) + end + on.subscribe do |channel, subscriptions| + Fiber.yield + Log.info { "Subscribed to Redis channel #{channel}" } + @subscribed = true + end + on.unsubscribe do |channel, subscriptions| + Fiber.yield + Log.info { "Unsubscribed from Redis channel #{channel}" } + @subscribed = false # just in case we do get unsubscribed some how + end end end end end end -end +end \ No newline at end of file diff --git a/src/amber/websockets/channel.cr b/src/amber/websockets/channel.cr index 79b164225..1ad9fd4b1 100644 --- a/src/amber/websockets/channel.cr +++ b/src/amber/websockets/channel.cr @@ -23,6 +23,8 @@ module Amber # ``` CHANNEL_TOPIC_PATHS = [] of String + SUBSCRIBE_CHANNEL = ::Channel(String).new + abstract class Channel @@adapter : WebSockets::Adapters::RedisAdapter? | WebSockets::Adapters::MemoryAdapter? diff --git a/src/amber/websockets/client_socket.cr b/src/amber/websockets/client_socket.cr index 17fb0d818..972a1f468 100644 --- a/src/amber/websockets/client_socket.cr +++ b/src/amber/websockets/client_socket.cr @@ -21,7 +21,7 @@ module Amber @@channels = [] of NamedTuple(path: String, channel: Channel) - MAX_SOCKET_IDLE_TIME = 100.seconds + MAX_SOCKET_IDLE_TIME = 1.minute BEAT_INTERVAL = 30.seconds protected getter id : String @@ -72,6 +72,7 @@ module Amber @socket.on_pong do @pongs.push(Time.utc) @pongs.delete_at(0) if @pongs.size > 3 + Fiber.yield end end @@ -97,6 +98,7 @@ module Amber @socket.ping @pings.push(Time.utc) @pings.delete_at(0) if @pings.size > 3 + Fiber.yield check_alive! rescue ex : IO::Error disconnect! @@ -133,7 +135,10 @@ module Amber # disconnect if no pongs have been received # or no pongs have been received beyond the threshold time + # puts "pongs empty #{@pongs.empty?}" + # puts "socket over max idle time #{(@pings.last - @pongs.first) > MAX_SOCKET_IDLE_TIME}" if @pongs.empty? || (@pings.last - @pongs.first) > MAX_SOCKET_IDLE_TIME + Log.info { "Disconnected websocket because pings empty or socket idle too long" } disconnect! end end diff --git a/src/amber/websockets/server.cr b/src/amber/websockets/server.cr index d074fae52..d02b2cea6 100644 --- a/src/amber/websockets/server.cr +++ b/src/amber/websockets/server.cr @@ -17,6 +17,7 @@ module Amber end socket.on_close do + Log.info { "Client socket closed" } instance.on_disconnect ClientSockets.remove_client_socket(instance) end From 4933dd37c08d1c347aaa2309683fe79a8990062b Mon Sep 17 00:00:00 2001 From: Jonathan Silverman Date: Wed, 7 Jul 2021 19:31:40 -0600 Subject: [PATCH 03/48] i think this will catch up. --- src/amber/websockets/client_socket.cr | 1 - src/amber/websockets/client_sockets.cr | 1 + 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/src/amber/websockets/client_socket.cr b/src/amber/websockets/client_socket.cr index 972a1f468..eb4c5417a 100644 --- a/src/amber/websockets/client_socket.cr +++ b/src/amber/websockets/client_socket.cr @@ -98,7 +98,6 @@ module Amber @socket.ping @pings.push(Time.utc) @pings.delete_at(0) if @pings.size > 3 - Fiber.yield check_alive! rescue ex : IO::Error disconnect! diff --git a/src/amber/websockets/client_sockets.cr b/src/amber/websockets/client_sockets.cr index 2a0af1057..5682a5792 100644 --- a/src/amber/websockets/client_sockets.cr +++ b/src/amber/websockets/client_sockets.cr @@ -15,6 +15,7 @@ module Amber while client_socket && !client_socket.socket.closed? sleep ClientSocket::BEAT_INTERVAL client_socket.beat + Fiber.yield end end end From 468403d9ba9da6a985667c8a715a1a88bf7b3bbe Mon Sep 17 00:00:00 2001 From: Jonathan Silverman Date: Wed, 7 Jul 2021 19:48:49 -0600 Subject: [PATCH 04/48] factor out fiber --- src/amber/websockets/client_socket.cr | 6 +++++- src/amber/websockets/client_sockets.cr | 8 +------- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/src/amber/websockets/client_socket.cr b/src/amber/websockets/client_socket.cr index eb4c5417a..aaf0ae494 100644 --- a/src/amber/websockets/client_socket.cr +++ b/src/amber/websockets/client_socket.cr @@ -22,7 +22,7 @@ module Amber @@channels = [] of NamedTuple(path: String, channel: Channel) MAX_SOCKET_IDLE_TIME = 1.minute - BEAT_INTERVAL = 30.seconds + BEAT_INTERVAL = 5.seconds protected getter id : String getter socket : HTTP::WebSocket @@ -72,6 +72,10 @@ module Amber @socket.on_pong do @pongs.push(Time.utc) @pongs.delete_at(0) if @pongs.size > 3 + if client_socket && !client_socket.socket.closed? + sleep ClientSocket::BEAT_INTERVAL + client_socket.beat + end Fiber.yield end end diff --git a/src/amber/websockets/client_sockets.cr b/src/amber/websockets/client_sockets.cr index 5682a5792..daa80143d 100644 --- a/src/amber/websockets/client_sockets.cr +++ b/src/amber/websockets/client_sockets.cr @@ -11,13 +11,7 @@ module Amber @@client_sockets[client_socket.id] = client_socket # send ping & receive pong control frames, to prevent stale connections : https://tools.ietf.org/html/rfc6455#section-5.5.2 - spawn do - while client_socket && !client_socket.socket.closed? - sleep ClientSocket::BEAT_INTERVAL - client_socket.beat - Fiber.yield - end - end + client_socket.beat end def remove_client_socket(client_socket) From 3dc5aff1d0aa8c0d3ad193de053909ddeb2a7bc9 Mon Sep 17 00:00:00 2001 From: Jonathan Silverman Date: Wed, 7 Jul 2021 19:50:26 -0600 Subject: [PATCH 05/48] finish refactor --- src/amber/websockets/client_socket.cr | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/amber/websockets/client_socket.cr b/src/amber/websockets/client_socket.cr index aaf0ae494..a55f667aa 100644 --- a/src/amber/websockets/client_socket.cr +++ b/src/amber/websockets/client_socket.cr @@ -72,9 +72,9 @@ module Amber @socket.on_pong do @pongs.push(Time.utc) @pongs.delete_at(0) if @pongs.size > 3 - if client_socket && !client_socket.socket.closed? + if @socket && @socket.closed? sleep ClientSocket::BEAT_INTERVAL - client_socket.beat + @socket.beat end Fiber.yield end From 8a94cf9b3bbda6123db2a12b523824930b10c111 Mon Sep 17 00:00:00 2001 From: Jonathan Silverman Date: Wed, 7 Jul 2021 19:55:43 -0600 Subject: [PATCH 06/48] different pinging mechanism --- src/amber/websockets/client_socket.cr | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/amber/websockets/client_socket.cr b/src/amber/websockets/client_socket.cr index a55f667aa..fce929e35 100644 --- a/src/amber/websockets/client_socket.cr +++ b/src/amber/websockets/client_socket.cr @@ -70,13 +70,16 @@ module Amber @raw_params = @context.params @params = Amber::Validators::Params.new(@raw_params) @socket.on_pong do + @pongs.push(Time.utc) @pongs.delete_at(0) if @pongs.size > 3 + Fiber.yield if @socket && @socket.closed? - sleep ClientSocket::BEAT_INTERVAL - @socket.beat + spawn do + sleep ClientSocket::BEAT_INTERVAL + beat + end end - Fiber.yield end end From 6902ce82b1a2e0e86f02b152e3c155bf677416a9 Mon Sep 17 00:00:00 2001 From: Jonathan Silverman Date: Wed, 7 Jul 2021 19:58:43 -0600 Subject: [PATCH 07/48] different ping mechanism --- src/amber/websockets/client_socket.cr | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/amber/websockets/client_socket.cr b/src/amber/websockets/client_socket.cr index fce929e35..ed213a367 100644 --- a/src/amber/websockets/client_socket.cr +++ b/src/amber/websockets/client_socket.cr @@ -73,13 +73,13 @@ module Amber @pongs.push(Time.utc) @pongs.delete_at(0) if @pongs.size > 3 - Fiber.yield + if @socket && @socket.closed? - spawn do - sleep ClientSocket::BEAT_INTERVAL - beat - end + sleep ClientSocket::BEAT_INTERVAL + beat end + + Fiber.yield end end From 2a07483f83ccebf1aa6fc0fc5236efe655174b70 Mon Sep 17 00:00:00 2001 From: Jonathan Silverman Date: Wed, 7 Jul 2021 20:30:39 -0600 Subject: [PATCH 08/48] fixing ws ping --- src/amber/websockets/client_socket.cr | 5 ----- src/amber/websockets/client_sockets.cr | 10 +++++++++- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/src/amber/websockets/client_socket.cr b/src/amber/websockets/client_socket.cr index ed213a367..c7462dc0c 100644 --- a/src/amber/websockets/client_socket.cr +++ b/src/amber/websockets/client_socket.cr @@ -74,11 +74,6 @@ module Amber @pongs.push(Time.utc) @pongs.delete_at(0) if @pongs.size > 3 - if @socket && @socket.closed? - sleep ClientSocket::BEAT_INTERVAL - beat - end - Fiber.yield end end diff --git a/src/amber/websockets/client_sockets.cr b/src/amber/websockets/client_sockets.cr index daa80143d..675301105 100644 --- a/src/amber/websockets/client_sockets.cr +++ b/src/amber/websockets/client_sockets.cr @@ -11,7 +11,15 @@ module Amber @@client_sockets[client_socket.id] = client_socket # send ping & receive pong control frames, to prevent stale connections : https://tools.ietf.org/html/rfc6455#section-5.5.2 - client_socket.beat + spawn do + while true + if @socket && @socket.closed? + beat + sleep ClientSocket::BEAT_INTERVAL + end + end + end + end def remove_client_socket(client_socket) From a93e5701eb503cc3636f57c278fc0b6df9beca0f Mon Sep 17 00:00:00 2001 From: Jonathan Silverman Date: Wed, 7 Jul 2021 20:33:10 -0600 Subject: [PATCH 09/48] try this, sorry guys ill squash later if this gets insane again --- src/amber/websockets/client_sockets.cr | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/src/amber/websockets/client_sockets.cr b/src/amber/websockets/client_sockets.cr index 675301105..9490299f8 100644 --- a/src/amber/websockets/client_sockets.cr +++ b/src/amber/websockets/client_sockets.cr @@ -12,11 +12,9 @@ module Amber # send ping & receive pong control frames, to prevent stale connections : https://tools.ietf.org/html/rfc6455#section-5.5.2 spawn do - while true - if @socket && @socket.closed? - beat - sleep ClientSocket::BEAT_INTERVAL - end + while client_socket && !client_socket.socket.closed? + client_socket.beat + sleep ClientSocket::BEAT_INTERVAL end end From 04e6fc679b31f72f4ec4b5ccb5551df099603884 Mon Sep 17 00:00:00 2001 From: Jonathan Silverman Date: Wed, 7 Jul 2021 20:43:27 -0600 Subject: [PATCH 10/48] i hope that this will delete a race condition in pinging. --- src/amber/websockets/client_socket.cr | 6 ++---- src/amber/websockets/client_sockets.cr | 3 ++- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/src/amber/websockets/client_socket.cr b/src/amber/websockets/client_socket.cr index c7462dc0c..b3a5027bb 100644 --- a/src/amber/websockets/client_socket.cr +++ b/src/amber/websockets/client_socket.cr @@ -70,11 +70,10 @@ module Amber @raw_params = @context.params @params = Amber::Validators::Params.new(@raw_params) @socket.on_pong do - + Log.debug("Pong received") @pongs.push(Time.utc) @pongs.delete_at(0) if @pongs.size > 3 - - Fiber.yield + check_alive! end end @@ -100,7 +99,6 @@ module Amber @socket.ping @pings.push(Time.utc) @pings.delete_at(0) if @pings.size > 3 - check_alive! rescue ex : IO::Error disconnect! end diff --git a/src/amber/websockets/client_sockets.cr b/src/amber/websockets/client_sockets.cr index 9490299f8..4411f4ec4 100644 --- a/src/amber/websockets/client_sockets.cr +++ b/src/amber/websockets/client_sockets.cr @@ -13,8 +13,9 @@ module Amber # send ping & receive pong control frames, to prevent stale connections : https://tools.ietf.org/html/rfc6455#section-5.5.2 spawn do while client_socket && !client_socket.socket.closed? - client_socket.beat + sleep ClientSocket::BEAT_INTERVAL + client_socket.beat end end From 5781f786af455f23cceb5ae34ecf6faf12cdafe3 Mon Sep 17 00:00:00 2001 From: Jonathan Silverman Date: Wed, 7 Jul 2021 20:44:31 -0600 Subject: [PATCH 11/48] github copilot crashed the car again --- src/amber/websockets/client_socket.cr | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/amber/websockets/client_socket.cr b/src/amber/websockets/client_socket.cr index b3a5027bb..f4862a48c 100644 --- a/src/amber/websockets/client_socket.cr +++ b/src/amber/websockets/client_socket.cr @@ -70,7 +70,7 @@ module Amber @raw_params = @context.params @params = Amber::Validators::Params.new(@raw_params) @socket.on_pong do - Log.debug("Pong received") + Log.info { "Pong received" } @pongs.push(Time.utc) @pongs.delete_at(0) if @pongs.size > 3 check_alive! From 4f0f74c5ae3f9660ddd21da2b4356d3704c8bf0c Mon Sep 17 00:00:00 2001 From: Jonathan Silverman Date: Wed, 7 Jul 2021 20:48:59 -0600 Subject: [PATCH 12/48] this should better and final, but ill probably remove the logs if this is great. --- src/amber/websockets/client_socket.cr | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/amber/websockets/client_socket.cr b/src/amber/websockets/client_socket.cr index f4862a48c..dbcdc12f5 100644 --- a/src/amber/websockets/client_socket.cr +++ b/src/amber/websockets/client_socket.cr @@ -73,6 +73,7 @@ module Amber Log.info { "Pong received" } @pongs.push(Time.utc) @pongs.delete_at(0) if @pongs.size > 3 + Fiber.yield check_alive! end end @@ -95,6 +96,7 @@ module Amber # Sends ping opcode to client : https://tools.ietf.org/html/rfc6455#section-5.5.2 protected def beat + Log.info { "Sending WS ping" } @socket.send("ping") @socket.ping @pings.push(Time.utc) From 6197b558a61a0e35f31510c4efbc0aac29d3f8bb Mon Sep 17 00:00:00 2001 From: Jonathan Silverman Date: Wed, 7 Jul 2021 20:53:28 -0600 Subject: [PATCH 13/48] logic seems to work great.. use the original server ping constants --- src/amber/websockets/client_socket.cr | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/amber/websockets/client_socket.cr b/src/amber/websockets/client_socket.cr index dbcdc12f5..17aa2b2a8 100644 --- a/src/amber/websockets/client_socket.cr +++ b/src/amber/websockets/client_socket.cr @@ -21,8 +21,8 @@ module Amber @@channels = [] of NamedTuple(path: String, channel: Channel) - MAX_SOCKET_IDLE_TIME = 1.minute - BEAT_INTERVAL = 5.seconds + MAX_SOCKET_IDLE_TIME = 100.seconds + BEAT_INTERVAL = 30.seconds protected getter id : String getter socket : HTTP::WebSocket @@ -70,7 +70,7 @@ module Amber @raw_params = @context.params @params = Amber::Validators::Params.new(@raw_params) @socket.on_pong do - Log.info { "Pong received" } + # Log.info { "Pong received" } @pongs.push(Time.utc) @pongs.delete_at(0) if @pongs.size > 3 Fiber.yield @@ -96,7 +96,7 @@ module Amber # Sends ping opcode to client : https://tools.ietf.org/html/rfc6455#section-5.5.2 protected def beat - Log.info { "Sending WS ping" } + # Log.info { "Sending WS ping" } @socket.send("ping") @socket.ping @pings.push(Time.utc) From 9798b8eb0d024163895f0f000ab14fb7e5289672 Mon Sep 17 00:00:00 2001 From: Jonathan Silverman Date: Thu, 8 Jul 2021 11:05:51 -0600 Subject: [PATCH 14/48] prevent memory error by only deleting socket if it exists --- src/amber/websockets/client_sockets.cr | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/amber/websockets/client_sockets.cr b/src/amber/websockets/client_sockets.cr index 4411f4ec4..d327e9f3b 100644 --- a/src/amber/websockets/client_sockets.cr +++ b/src/amber/websockets/client_sockets.cr @@ -22,7 +22,9 @@ module Amber end def remove_client_socket(client_socket) - @@client_sockets.delete(client_socket.id) + if @@client_sockets.has_key?(client_socket.id) + @@client_sockets.delete(client_socket.id) + end end def client_sockets From 5f4212a7c684c2e475bf76dfbad03130acabc9ea Mon Sep 17 00:00:00 2001 From: Jonathan Silverman Date: Wed, 4 Aug 2021 20:22:16 -0600 Subject: [PATCH 15/48] cleanup --- spec/amber/websockets/adapters/redis_adapter.cr | 3 --- spec/spec_helper.cr | 2 +- src/amber/server/server.cr | 2 +- src/amber/websockets/client_socket.cr | 1 - 4 files changed, 2 insertions(+), 6 deletions(-) diff --git a/spec/amber/websockets/adapters/redis_adapter.cr b/spec/amber/websockets/adapters/redis_adapter.cr index 724290a7e..05371c51b 100644 --- a/spec/amber/websockets/adapters/redis_adapter.cr +++ b/spec/amber/websockets/adapters/redis_adapter.cr @@ -23,9 +23,6 @@ module Amber channel2.subscribe_to_channel(client_socket2, "{}") channel2.subscribe_to_channel(client_socket, "{}") - # channel.test_field.last.should eq "handle joined #{client_socket.id}" - # Amber::WebSockets::CHANNEL_TOPIC_PATHS.should eq ["user_room", "secondary_room"] - Amber::Server.pubsub_adapter = Amber::WebSockets::Adapters::RedisAdapter redis_adapter = Amber::Server.instance.pubsub_adapter.instance diff --git a/spec/spec_helper.cr b/spec/spec_helper.cr index 09245584e..810d1108c 100644 --- a/spec/spec_helper.cr +++ b/spec/spec_helper.cr @@ -21,4 +21,4 @@ require "../src/amber/cli/commands" require "./support/fixtures" require "./support/helpers" -include Helpers \ No newline at end of file +include Helpers diff --git a/src/amber/server/server.cr b/src/amber/server/server.cr index f8ef5cbc3..600d9a7b5 100644 --- a/src/amber/server/server.cr +++ b/src/amber/server/server.cr @@ -29,7 +29,7 @@ module Amber def self.pubsub_adapter=(adapter) instance.pubsub_adapter = adapter - puts "using #{instance.pubsub_adapter.instance}" + Log.info { "using #{instance.pubsub_adapter.instance}" } end def self.router diff --git a/src/amber/websockets/client_socket.cr b/src/amber/websockets/client_socket.cr index 17aa2b2a8..42df76b50 100644 --- a/src/amber/websockets/client_socket.cr +++ b/src/amber/websockets/client_socket.cr @@ -96,7 +96,6 @@ module Amber # Sends ping opcode to client : https://tools.ietf.org/html/rfc6455#section-5.5.2 protected def beat - # Log.info { "Sending WS ping" } @socket.send("ping") @socket.ping @pings.push(Time.utc) From 50da51e55810a7a5193358a74d55e6b4af70943d Mon Sep 17 00:00:00 2001 From: Jonathan Silverman Date: Sun, 22 Aug 2021 17:13:13 -0600 Subject: [PATCH 16/48] fix redis command overflows by being rude in ruby/crystal style --- shard.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/shard.yml b/shard.yml index fa95f4a8b..3d97cef00 100644 --- a/shard.yml +++ b/shard.yml @@ -51,8 +51,8 @@ dependencies: version: ~> 0.18.0 redis: - github: stefanwille/crystal-redis - version: ~> 2.7.0 + github: mixflame/crystal-redis + version: ~> 2.8.0 shell-table: github: luckyframework/shell-table.cr From 7eeb9bfe210623318ef4fe440ee66112a66ef971 Mon Sep 17 00:00:00 2001 From: Jonathan Silverman Date: Sun, 22 Aug 2021 17:15:03 -0600 Subject: [PATCH 17/48] update shard.yml --- shard.yml | 1 - 1 file changed, 1 deletion(-) diff --git a/shard.yml b/shard.yml index 3d97cef00..78089c23e 100644 --- a/shard.yml +++ b/shard.yml @@ -52,7 +52,6 @@ dependencies: redis: github: mixflame/crystal-redis - version: ~> 2.8.0 shell-table: github: luckyframework/shell-table.cr From 377407405008109b27cf450fafb102dba99869b7 Mon Sep 17 00:00:00 2001 From: Jonathan Silverman Date: Sun, 22 Aug 2021 18:51:05 -0600 Subject: [PATCH 18/48] use branch master --- shard.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/shard.yml b/shard.yml index 78089c23e..6d2eee18e 100644 --- a/shard.yml +++ b/shard.yml @@ -52,6 +52,7 @@ dependencies: redis: github: mixflame/crystal-redis + branch: master shell-table: github: luckyframework/shell-table.cr From 6ee7ab8989cced4564add951cb7b5525451e5f86 Mon Sep 17 00:00:00 2001 From: Jonathan Silverman Date: Thu, 9 Sep 2021 19:17:57 -0600 Subject: [PATCH 19/48] back to using default crystal redis --- shard.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/shard.yml b/shard.yml index 6d2eee18e..fedf4d07f 100644 --- a/shard.yml +++ b/shard.yml @@ -51,7 +51,7 @@ dependencies: version: ~> 0.18.0 redis: - github: mixflame/crystal-redis + github: stefanwille/crystal-redis branch: master shell-table: From 9013f0c9c36d4e2189f5b2a8e105ba9eb077c128 Mon Sep 17 00:00:00 2001 From: Jonathan Silverman Date: Sun, 12 Sep 2021 19:24:50 -0600 Subject: [PATCH 20/48] require password for redis adapter --- src/amber/websockets/adapters/redis.cr | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/amber/websockets/adapters/redis.cr b/src/amber/websockets/adapters/redis.cr index bc556e6e8..7153ec7b4 100644 --- a/src/amber/websockets/adapters/redis.cr +++ b/src/amber/websockets/adapters/redis.cr @@ -16,8 +16,8 @@ module Amber::WebSockets::Adapters # Establish subscribe and publish connections to Redis def initialize - @subscriber = Redis.new(url: Amber.settings.redis_url) - @publisher = Redis.new(url: Amber.settings.redis_url) + @subscriber = Redis.new(url: Amber.settings.redis_url, password: Amber.settings.secrets["REDIS_PASSWORD"]) + @publisher = Redis.new(url: Amber.settings.redis_url, password: Amber.settings.secrets["REDIS_PASSWORD"]) spawn do @subscriber.subscribe(CHANNEL_TOPIC_PATHS) do |on| From fb549c09d74991b8034ce07a1992b7e6e1214c2d Mon Sep 17 00:00:00 2001 From: Jonathan Silverman Date: Sun, 12 Sep 2021 19:28:16 -0600 Subject: [PATCH 21/48] better auth --- src/amber/websockets/adapters/redis.cr | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/amber/websockets/adapters/redis.cr b/src/amber/websockets/adapters/redis.cr index 7153ec7b4..a8c1701e1 100644 --- a/src/amber/websockets/adapters/redis.cr +++ b/src/amber/websockets/adapters/redis.cr @@ -16,8 +16,11 @@ module Amber::WebSockets::Adapters # Establish subscribe and publish connections to Redis def initialize - @subscriber = Redis.new(url: Amber.settings.redis_url, password: Amber.settings.secrets["REDIS_PASSWORD"]) - @publisher = Redis.new(url: Amber.settings.redis_url, password: Amber.settings.secrets["REDIS_PASSWORD"]) + @subscriber = Redis.new(url: Amber.settings.redis_url) + @publisher = Redis.new(url: Amber.settings.redis_url) + + @subscriber.auth(Amber.settings.secrets["REDIS_PASSWORD"]) + @publisher.auth(Amber.settings.secrets["REDIS_PASSWORD"]) spawn do @subscriber.subscribe(CHANNEL_TOPIC_PATHS) do |on| From 1f9ef862cb1268b5a2cebe723a320ac2b2854b5f Mon Sep 17 00:00:00 2001 From: Jonathan Silverman Date: Sat, 30 Apr 2022 23:08:12 -0600 Subject: [PATCH 22/48] Update shard.yml --- shard.yml | 4 ---- 1 file changed, 4 deletions(-) diff --git a/shard.yml b/shard.yml index fedf4d07f..38a04eb83 100644 --- a/shard.yml +++ b/shard.yml @@ -46,10 +46,6 @@ dependencies: github: crystal-lang/crystal-mysql version: ~> 0.13.0 - sqlite3: - github: crystal-lang/crystal-sqlite3 - version: ~> 0.18.0 - redis: github: stefanwille/crystal-redis branch: master From 579915485bdc8feef79ec4de06a871cff3088f3b Mon Sep 17 00:00:00 2001 From: Jonathan Silverman Date: Sun, 1 May 2022 02:07:04 -0600 Subject: [PATCH 23/48] Update redis.cr --- src/amber/websockets/adapters/redis.cr | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/src/amber/websockets/adapters/redis.cr b/src/amber/websockets/adapters/redis.cr index a8c1701e1..5402806d5 100644 --- a/src/amber/websockets/adapters/redis.cr +++ b/src/amber/websockets/adapters/redis.cr @@ -16,11 +16,8 @@ module Amber::WebSockets::Adapters # Establish subscribe and publish connections to Redis def initialize - @subscriber = Redis.new(url: Amber.settings.redis_url) - @publisher = Redis.new(url: Amber.settings.redis_url) - - @subscriber.auth(Amber.settings.secrets["REDIS_PASSWORD"]) - @publisher.auth(Amber.settings.secrets["REDIS_PASSWORD"]) + @subscriber = Redis.new(host: Amber.settings.secrets["REDIS_HOST"], port: Amber.settings.secrets["REDIS_PORT"], password: Amber.settings.secrets["REDIS_PASSWORD"]) + @publisher = Redis.new(host: Amber.settings.secrets["REDIS_HOST"], port: Amber.settings.secrets["REDIS_PORT"], password: Amber.settings.secrets["REDIS_PASSWORD"]) spawn do @subscriber.subscribe(CHANNEL_TOPIC_PATHS) do |on| @@ -83,4 +80,4 @@ module Amber::WebSockets::Adapters end end end -end \ No newline at end of file +end From 7b69925aa2d9a2338cdb3b29c6ef56f0bb3fcb7e Mon Sep 17 00:00:00 2001 From: Jonathan Silverman Date: Sun, 1 May 2022 02:09:49 -0600 Subject: [PATCH 24/48] Update redis.cr --- src/amber/websockets/adapters/redis.cr | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/amber/websockets/adapters/redis.cr b/src/amber/websockets/adapters/redis.cr index 5402806d5..2f3d3d1a2 100644 --- a/src/amber/websockets/adapters/redis.cr +++ b/src/amber/websockets/adapters/redis.cr @@ -16,8 +16,8 @@ module Amber::WebSockets::Adapters # Establish subscribe and publish connections to Redis def initialize - @subscriber = Redis.new(host: Amber.settings.secrets["REDIS_HOST"], port: Amber.settings.secrets["REDIS_PORT"], password: Amber.settings.secrets["REDIS_PASSWORD"]) - @publisher = Redis.new(host: Amber.settings.secrets["REDIS_HOST"], port: Amber.settings.secrets["REDIS_PORT"], password: Amber.settings.secrets["REDIS_PASSWORD"]) + @subscriber = Redis.new(host: Amber.settings.secrets["REDIS_HOST"], port: Amber.settings.secrets["REDIS_PORT"].to_i, password: Amber.settings.secrets["REDIS_PASSWORD"]) + @publisher = Redis.new(host: Amber.settings.secrets["REDIS_HOST"], port: Amber.settings.secrets["REDIS_PORT"].to_i, password: Amber.settings.secrets["REDIS_PASSWORD"]) spawn do @subscriber.subscribe(CHANNEL_TOPIC_PATHS) do |on| From 37833a94602384fe29f80f810e6443b1c2db466b Mon Sep 17 00:00:00 2001 From: Jonathan Silverman Date: Sun, 1 May 2022 02:40:19 -0600 Subject: [PATCH 25/48] Update redis.cr --- src/amber/websockets/adapters/redis.cr | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/amber/websockets/adapters/redis.cr b/src/amber/websockets/adapters/redis.cr index 2f3d3d1a2..1efdafb5e 100644 --- a/src/amber/websockets/adapters/redis.cr +++ b/src/amber/websockets/adapters/redis.cr @@ -16,8 +16,11 @@ module Amber::WebSockets::Adapters # Establish subscribe and publish connections to Redis def initialize - @subscriber = Redis.new(host: Amber.settings.secrets["REDIS_HOST"], port: Amber.settings.secrets["REDIS_PORT"].to_i, password: Amber.settings.secrets["REDIS_PASSWORD"]) - @publisher = Redis.new(host: Amber.settings.secrets["REDIS_HOST"], port: Amber.settings.secrets["REDIS_PORT"].to_i, password: Amber.settings.secrets["REDIS_PASSWORD"]) + @subscriber = Redis.new(host: Amber.settings.secrets["REDIS_HOST"], port: Amber.settings.secrets["REDIS_PORT"].to_i) + @publisher = Redis.new(host: Amber.settings.secrets["REDIS_HOST"], port: Amber.settings.secrets["REDIS_PORT"].to_i) + + @subscriber.auth(Amber.settings.secrets["REDIS_PASSWORD"]) + @publisher.auth(Amber.settings.secrets["REDIS_PASSWORD"]) spawn do @subscriber.subscribe(CHANNEL_TOPIC_PATHS) do |on| From 99224ae96a22094aee65f27f2d5335fa9969d737 Mon Sep 17 00:00:00 2001 From: Jonathan Silverman Date: Mon, 2 May 2022 02:05:02 -0600 Subject: [PATCH 26/48] Update redis.cr --- src/amber/websockets/adapters/redis.cr | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/src/amber/websockets/adapters/redis.cr b/src/amber/websockets/adapters/redis.cr index 1efdafb5e..04761895a 100644 --- a/src/amber/websockets/adapters/redis.cr +++ b/src/amber/websockets/adapters/redis.cr @@ -16,11 +16,14 @@ module Amber::WebSockets::Adapters # Establish subscribe and publish connections to Redis def initialize - @subscriber = Redis.new(host: Amber.settings.secrets["REDIS_HOST"], port: Amber.settings.secrets["REDIS_PORT"].to_i) - @publisher = Redis.new(host: Amber.settings.secrets["REDIS_HOST"], port: Amber.settings.secrets["REDIS_PORT"].to_i) - @subscriber.auth(Amber.settings.secrets["REDIS_PASSWORD"]) - @publisher.auth(Amber.settings.secrets["REDIS_PASSWORD"]) + uri = URI.parse(ENV["REDIS_URL"]) + + @subscriber = Redis.new(host: uri.host, port: uri.port.to_i) + @publisher = Redis.new(host: uri.host, port: uri.port.to_i) + + @subscriber.auth(uri.password) + @publisher.auth(uri.password) spawn do @subscriber.subscribe(CHANNEL_TOPIC_PATHS) do |on| From 766e26de8fb780488a305a1dc0765940e79e2aad Mon Sep 17 00:00:00 2001 From: jonsilverman50-star Date: Fri, 20 Feb 2026 20:58:59 -0700 Subject: [PATCH 27/48] Refactor Redis connection initialization speed fix --- src/amber/websockets/adapters/redis.cr | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/amber/websockets/adapters/redis.cr b/src/amber/websockets/adapters/redis.cr index 04761895a..f1228b624 100644 --- a/src/amber/websockets/adapters/redis.cr +++ b/src/amber/websockets/adapters/redis.cr @@ -17,13 +17,13 @@ module Amber::WebSockets::Adapters # Establish subscribe and publish connections to Redis def initialize - uri = URI.parse(ENV["REDIS_URL"]) + uri = URI.parse(Amber.settings.redis_url.to_s) - @subscriber = Redis.new(host: uri.host, port: uri.port.to_i) - @publisher = Redis.new(host: uri.host, port: uri.port.to_i) + @subscriber = Redis.new(host: uri.host.to_s, port: uri.port.to_s.to_i) + @publisher = Redis.new(host: uri.host.to_s, port: uri.port.to_s.to_i) - @subscriber.auth(uri.password) - @publisher.auth(uri.password) + @subscriber.auth(Amber.settings.secrets["redis_password"]) + @publisher.auth(Amber.settings.secrets["redis_password"]) spawn do @subscriber.subscribe(CHANNEL_TOPIC_PATHS) do |on| From ff66bf6b17613bfd5e7569156b36e1469dd8bac3 Mon Sep 17 00:00:00 2001 From: jonsilverman50-star Date: Fri, 20 Feb 2026 22:51:44 -0700 Subject: [PATCH 28/48] Remove Redis subscriber implementation Removed subscriber logic for Redis channel handling. --- src/amber/websockets/adapters/redis.cr | 24 +----------------------- 1 file changed, 1 insertion(+), 23 deletions(-) diff --git a/src/amber/websockets/adapters/redis.cr b/src/amber/websockets/adapters/redis.cr index f1228b624..b2ed54fb8 100644 --- a/src/amber/websockets/adapters/redis.cr +++ b/src/amber/websockets/adapters/redis.cr @@ -24,29 +24,7 @@ module Amber::WebSockets::Adapters @subscriber.auth(Amber.settings.secrets["redis_password"]) @publisher.auth(Amber.settings.secrets["redis_password"]) - - spawn do - @subscriber.subscribe(CHANNEL_TOPIC_PATHS) do |on| - on.message do |_, m| - Fiber.yield - msg = JSON.parse(m) - sender_id = msg["sender"].as_s - message = msg["msg"] - channel_name = message["topic"].to_s.split(":").first - @listeners[channel_name].call(sender_id, message) - end - on.subscribe do |channel, subscriptions| - Fiber.yield - Log.info { "Subscribed to Redis channel #{channel}" } - @subscribed = true - end - on.unsubscribe do |channel, subscriptions| - Fiber.yield - Log.info { "Unsubscribed from Redis channel #{channel}" } - @subscribed = false - end - end - end + end # Publish the *message* to the redis publisher with topic *topic_path* From 1094e310544d80d6e44b6494b44e0bfd0a958c82 Mon Sep 17 00:00:00 2001 From: jonsilverman50-star Date: Sat, 21 Feb 2026 08:53:07 -0700 Subject: [PATCH 29/48] Implement conditional Redis auth based on password presence Add conditional authentication for Redis connections --- src/amber/websockets/adapters/redis.cr | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/amber/websockets/adapters/redis.cr b/src/amber/websockets/adapters/redis.cr index b2ed54fb8..2e151a15b 100644 --- a/src/amber/websockets/adapters/redis.cr +++ b/src/amber/websockets/adapters/redis.cr @@ -22,8 +22,10 @@ module Amber::WebSockets::Adapters @subscriber = Redis.new(host: uri.host.to_s, port: uri.port.to_s.to_i) @publisher = Redis.new(host: uri.host.to_s, port: uri.port.to_s.to_i) - @subscriber.auth(Amber.settings.secrets["redis_password"]) - @publisher.auth(Amber.settings.secrets["redis_password"]) + if Amber.settings.secrets["redis_password"] + @subscriber.auth(Amber.settings.secrets["redis_password"]) + @publisher.auth(Amber.settings.secrets["redis_password"]) + end end From a710c192582d0a76db56367b6b356116c4f35712 Mon Sep 17 00:00:00 2001 From: Jon Silverman Date: Sat, 21 Feb 2026 09:01:56 -0700 Subject: [PATCH 30/48] fix --- src/amber/websockets/adapters/redis.cr | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/amber/websockets/adapters/redis.cr b/src/amber/websockets/adapters/redis.cr index 2e151a15b..3639dae47 100644 --- a/src/amber/websockets/adapters/redis.cr +++ b/src/amber/websockets/adapters/redis.cr @@ -22,7 +22,7 @@ module Amber::WebSockets::Adapters @subscriber = Redis.new(host: uri.host.to_s, port: uri.port.to_s.to_i) @publisher = Redis.new(host: uri.host.to_s, port: uri.port.to_s.to_i) - if Amber.settings.secrets["redis_password"] + if Amber.settings.secrets.has_key?("redis_password") @subscriber.auth(Amber.settings.secrets["redis_password"]) @publisher.auth(Amber.settings.secrets["redis_password"]) end From da67f801f882653bcaa10278ac69bcb333bbefb8 Mon Sep 17 00:00:00 2001 From: jonsilverman50-star Date: Sat, 21 Feb 2026 09:02:41 -0700 Subject: [PATCH 31/48] Check for redis_password key before authentication --- src/amber/websockets/adapters/redis.cr | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/amber/websockets/adapters/redis.cr b/src/amber/websockets/adapters/redis.cr index 2e151a15b..3639dae47 100644 --- a/src/amber/websockets/adapters/redis.cr +++ b/src/amber/websockets/adapters/redis.cr @@ -22,7 +22,7 @@ module Amber::WebSockets::Adapters @subscriber = Redis.new(host: uri.host.to_s, port: uri.port.to_s.to_i) @publisher = Redis.new(host: uri.host.to_s, port: uri.port.to_s.to_i) - if Amber.settings.secrets["redis_password"] + if Amber.settings.secrets.has_key?("redis_password") @subscriber.auth(Amber.settings.secrets["redis_password"]) @publisher.auth(Amber.settings.secrets["redis_password"]) end From 33088cb7c5387cf29e1a3c3920fc405ecf1325fa Mon Sep 17 00:00:00 2001 From: jonsilverman50-star Date: Sat, 21 Feb 2026 09:17:14 -0700 Subject: [PATCH 32/48] Check for channel_name in listeners before calling --- src/amber/websockets/adapters/redis.cr | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/amber/websockets/adapters/redis.cr b/src/amber/websockets/adapters/redis.cr index 3639dae47..f7d89dfa9 100644 --- a/src/amber/websockets/adapters/redis.cr +++ b/src/amber/websockets/adapters/redis.cr @@ -49,7 +49,9 @@ module Amber::WebSockets::Adapters sender_id = msg["sender"].as_s message = msg["msg"] channel_name = message["topic"].to_s.split(":").first - @listeners[channel_name].call(sender_id, message) + if @listeners.has_key?(channel_name) + @listeners[channel_name].call(sender_id, message) + end end on.subscribe do |channel, subscriptions| Fiber.yield From a77e631847c845ea5959e9beae0ed9749383061b Mon Sep 17 00:00:00 2001 From: jonsilverman50-star Date: Sat, 21 Feb 2026 09:32:04 -0700 Subject: [PATCH 33/48] Handle nil client_socket in on_message method Fix nil check for client_socket and delete from client_sockets. --- src/amber/websockets/channel.cr | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/amber/websockets/channel.cr b/src/amber/websockets/channel.cr index 1ad9fd4b1..5517d6f28 100644 --- a/src/amber/websockets/channel.cr +++ b/src/amber/websockets/channel.cr @@ -46,6 +46,10 @@ module Amber # Called from proc when message is returned from the pubsub service def on_message(client_socket_id, message) client_socket = ClientSockets.client_sockets[client_socket_id]? + if client_socket.nil? + ClientSockets.client_sockets.delete(client_socket) + return + end handle_message(client_socket, message) end From 54ada16c09b98f2d44e7aaa7e76c3d522ea3afd7 Mon Sep 17 00:00:00 2001 From: Jon Silverman Date: Sat, 21 Feb 2026 12:39:55 -0700 Subject: [PATCH 34/48] stability fixes.. testing --- src/amber/websockets/channel.cr | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/src/amber/websockets/channel.cr b/src/amber/websockets/channel.cr index 5517d6f28..a160dad2c 100644 --- a/src/amber/websockets/channel.cr +++ b/src/amber/websockets/channel.cr @@ -47,7 +47,8 @@ module Amber def on_message(client_socket_id, message) client_socket = ClientSockets.client_sockets[client_socket_id]? if client_socket.nil? - ClientSockets.client_sockets.delete(client_socket) + ClientSockets.remove_client_socket(client_socket_id) + # ClientSockets.client_sockets.delete(client_socket) return end handle_message(client_socket, message) @@ -64,11 +65,21 @@ module Amber # Called when a socket subscribes to a channel protected def subscribe_to_channel(client_socket, message) + if client_socket.nil? + ClientSockets.remove_client_socket(client_socket.id) + # ClientSockets.client_sockets.delete(client_socket) + return + end handle_joined(client_socket, message) end # Called when a socket unsubscribes from a channel protected def unsubscribe_from_channel(client_socket) + if client_socket.nil? + ClientSockets.remove_client_socket(client_socket.id) + # ClientSockets.client_sockets.delete(client_socket) + return + end handle_leave(client_socket) end From 3c57263aadd59a20a318fcd8be9450d03cf76bca Mon Sep 17 00:00:00 2001 From: Jon Silverman Date: Sat, 21 Feb 2026 12:44:01 -0700 Subject: [PATCH 35/48] oops --- src/amber/websockets/channel.cr | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/amber/websockets/channel.cr b/src/amber/websockets/channel.cr index a160dad2c..f78eb860c 100644 --- a/src/amber/websockets/channel.cr +++ b/src/amber/websockets/channel.cr @@ -66,7 +66,7 @@ module Amber # Called when a socket subscribes to a channel protected def subscribe_to_channel(client_socket, message) if client_socket.nil? - ClientSockets.remove_client_socket(client_socket.id) + ClientSockets.remove_client_socket(client_socket) # ClientSockets.client_sockets.delete(client_socket) return end @@ -76,7 +76,7 @@ module Amber # Called when a socket unsubscribes from a channel protected def unsubscribe_from_channel(client_socket) if client_socket.nil? - ClientSockets.remove_client_socket(client_socket.id) + ClientSockets.remove_client_socket(client_socket) # ClientSockets.client_sockets.delete(client_socket) return end From 275375b360852c69e3529aa20599407ee6fa35b3 Mon Sep 17 00:00:00 2001 From: Jon Silverman Date: Sat, 21 Feb 2026 12:51:27 -0700 Subject: [PATCH 36/48] dead socket fix --- src/amber/websockets/channel.cr | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/src/amber/websockets/channel.cr b/src/amber/websockets/channel.cr index f78eb860c..161ae72f5 100644 --- a/src/amber/websockets/channel.cr +++ b/src/amber/websockets/channel.cr @@ -47,8 +47,7 @@ module Amber def on_message(client_socket_id, message) client_socket = ClientSockets.client_sockets[client_socket_id]? if client_socket.nil? - ClientSockets.remove_client_socket(client_socket_id) - # ClientSockets.client_sockets.delete(client_socket) + ClientSockets.client_sockets.delete(client_socket) return end handle_message(client_socket, message) @@ -66,8 +65,7 @@ module Amber # Called when a socket subscribes to a channel protected def subscribe_to_channel(client_socket, message) if client_socket.nil? - ClientSockets.remove_client_socket(client_socket) - # ClientSockets.client_sockets.delete(client_socket) + ClientSockets.client_sockets.delete(client_socket) return end handle_joined(client_socket, message) @@ -76,8 +74,7 @@ module Amber # Called when a socket unsubscribes from a channel protected def unsubscribe_from_channel(client_socket) if client_socket.nil? - ClientSockets.remove_client_socket(client_socket) - # ClientSockets.client_sockets.delete(client_socket) + ClientSockets.client_sockets.delete(client_socket) return end handle_leave(client_socket) From bec194d0e0983eb0aef3b405db519ce4fdde2671 Mon Sep 17 00:00:00 2001 From: Jon Silverman Date: Sat, 21 Feb 2026 13:43:29 -0700 Subject: [PATCH 37/48] dont crash if rebroadcasting to dead sockets --- src/amber/websockets/channel.cr | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/amber/websockets/channel.cr b/src/amber/websockets/channel.cr index 161ae72f5..e0ee88850 100644 --- a/src/amber/websockets/channel.cr +++ b/src/amber/websockets/channel.cr @@ -89,7 +89,7 @@ module Amber # example message: {"event" => "message", "topic" => "rooms:123", "subject" => "msg:new", "payload" => {"message" => "hello"}} protected def rebroadcast!(message) subscribers = ClientSockets.get_subscribers_for_topic(message["topic"]) - subscribers.each_value(&.socket.send(message.to_json)) + subscribers.each_value(&.socket.send(message.to_json) rescue ClientSockets.client_sockets.delete(socket)) end # Ensure the pubsub adapter instance exists, and set up the on_message proc callback From b399fd2ead7b9b0f04932e57419de460a1ad0f89 Mon Sep 17 00:00:00 2001 From: Jon Silverman Date: Sat, 21 Feb 2026 13:45:27 -0700 Subject: [PATCH 38/48] dont crash if rebroadcasting to dead sockets --- src/amber/websockets/channel.cr | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/amber/websockets/channel.cr b/src/amber/websockets/channel.cr index e0ee88850..d66ff016e 100644 --- a/src/amber/websockets/channel.cr +++ b/src/amber/websockets/channel.cr @@ -89,7 +89,9 @@ module Amber # example message: {"event" => "message", "topic" => "rooms:123", "subject" => "msg:new", "payload" => {"message" => "hello"}} protected def rebroadcast!(message) subscribers = ClientSockets.get_subscribers_for_topic(message["topic"]) - subscribers.each_value(&.socket.send(message.to_json) rescue ClientSockets.client_sockets.delete(socket)) + subscribers.each_value do |subscriber| + subscriber.socket.send(message.to_json) rescue ClientSockets.client_sockets.delete(socket) + end end # Ensure the pubsub adapter instance exists, and set up the on_message proc callback From 5ea6b14f3f5e8eb23edf1fefde95bf317a3dab4b Mon Sep 17 00:00:00 2001 From: Jon Silverman Date: Sat, 21 Feb 2026 13:46:37 -0700 Subject: [PATCH 39/48] dont crash if rebroadcasting to dead sockets --- src/amber/websockets/channel.cr | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/amber/websockets/channel.cr b/src/amber/websockets/channel.cr index d66ff016e..00749c895 100644 --- a/src/amber/websockets/channel.cr +++ b/src/amber/websockets/channel.cr @@ -90,7 +90,7 @@ module Amber protected def rebroadcast!(message) subscribers = ClientSockets.get_subscribers_for_topic(message["topic"]) subscribers.each_value do |subscriber| - subscriber.socket.send(message.to_json) rescue ClientSockets.client_sockets.delete(socket) + subscriber.socket.send(message.to_json) rescue ClientSockets.client_sockets.delete(subscriber.socket) end end From 6351b2f327109912871e62b508e2514af5e80560 Mon Sep 17 00:00:00 2001 From: Jon Silverman Date: Sat, 21 Feb 2026 14:11:38 -0700 Subject: [PATCH 40/48] oops put this back --- src/amber/websockets/adapters/redis.cr | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/src/amber/websockets/adapters/redis.cr b/src/amber/websockets/adapters/redis.cr index f7d89dfa9..c7f6855bd 100644 --- a/src/amber/websockets/adapters/redis.cr +++ b/src/amber/websockets/adapters/redis.cr @@ -26,6 +26,31 @@ module Amber::WebSockets::Adapters @subscriber.auth(Amber.settings.secrets["redis_password"]) @publisher.auth(Amber.settings.secrets["redis_password"]) end + + spawn do + @subscriber.subscribe(CHANNEL_TOPIC_PATHS) do |on| + on.message do |_, m| + Fiber.yield + msg = JSON.parse(m) + sender_id = msg["sender"].as_s + message = msg["msg"] + channel_name = message["topic"].to_s.split(":").first + if @listeners.has_key?(channel_name) + @listeners[channel_name].call(sender_id, message) + end + end + on.subscribe do |channel, subscriptions| + Fiber.yield + Log.info { "Subscribed to Redis channel #{channel}" } + @subscribed = true + end + on.unsubscribe do |channel, subscriptions| + Fiber.yield + Log.info { "Unsubscribed from Redis channel #{channel}" } + @subscribed = false + end + end + end end From 2f6513d8e06dbb7bc852f086bffaca1139be7346 Mon Sep 17 00:00:00 2001 From: Jon Silverman Date: Sun, 22 Feb 2026 13:57:14 -0700 Subject: [PATCH 41/48] crystal redis update --- shard.yml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/shard.yml b/shard.yml index 157c7a7aa..58a38f358 100644 --- a/shard.yml +++ b/shard.yml @@ -51,8 +51,7 @@ dependencies: version: ~> 0.14.0 redis: - github: stefanwille/crystal-redis - version: ~> 2.8.0 + github: jonsilverman50-star/crystal-redis shell-table: github: luckyframework/shell-table.cr From b15b184a3e97525aa023533e9a07d7018086a28d Mon Sep 17 00:00:00 2001 From: Jon Silverman Date: Sun, 22 Feb 2026 23:27:12 -0700 Subject: [PATCH 42/48] reconnect on failed push --- src/amber/cli/templates/app/public/js/amber.js | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/amber/cli/templates/app/public/js/amber.js b/src/amber/cli/templates/app/public/js/amber.js index 1abbd755f..7c9cdfb4f 100644 --- a/src/amber/cli/templates/app/public/js/amber.js +++ b/src/amber/cli/templates/app/public/js/amber.js @@ -73,7 +73,11 @@ export class Channel { * @param {Object} payload - payload object: `{message: 'hello'}` */ push(subject, payload) { - this.socket.ws.send(JSON.stringify({ event: EVENTS.message, topic: this.topic, subject: subject, payload: payload })) + try { + this.socket.ws.send(JSON.stringify({ event: EVENTS.message, topic: this.topic, subject: subject, payload: payload })) + } catch { + this._reconnect() + } } } From 027a3d3c9df6242d42d3a6ccc6f640cac8f48af1 Mon Sep 17 00:00:00 2001 From: Jon Silverman Date: Sun, 22 Feb 2026 23:36:30 -0700 Subject: [PATCH 43/48] reconnect on all failed sends --- src/amber/cli/templates/app/public/js/amber.js | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/src/amber/cli/templates/app/public/js/amber.js b/src/amber/cli/templates/app/public/js/amber.js index 7c9cdfb4f..d66fdbbb5 100644 --- a/src/amber/cli/templates/app/public/js/amber.js +++ b/src/amber/cli/templates/app/public/js/amber.js @@ -39,14 +39,22 @@ export class Channel { * Join a channel, subscribe to all channels messages */ join() { - this.socket.ws.send(JSON.stringify({ event: EVENTS.join, topic: this.topic })) + try { + this.socket.ws.send(JSON.stringify({ event: EVENTS.join, topic: this.topic })) + } catch { + this._reconnect() + } } /** * Leave a channel, stop subscribing to channel messages */ leave() { - this.socket.ws.send(JSON.stringify({ event: EVENTS.leave, topic: this.topic })) + try { + this.socket.ws.send(JSON.stringify({ event: EVENTS.leave, topic: this.topic })) + } catch { + this._reconnect() + } } /** From 719d6c7b339ad1ecb9eca01bf9aebeab182355ff Mon Sep 17 00:00:00 2001 From: Jon Silverman Date: Mon, 23 Feb 2026 21:30:10 -0700 Subject: [PATCH 44/48] broaden rescue to handle other errors than io. such as ssl errors. --- src/amber/websockets/client_socket.cr | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/amber/websockets/client_socket.cr b/src/amber/websockets/client_socket.cr index 753be9d66..5d05fe6d0 100644 --- a/src/amber/websockets/client_socket.cr +++ b/src/amber/websockets/client_socket.cr @@ -100,7 +100,7 @@ module Amber @socket.ping @pings.push(Time.utc) @pings.delete_at(0) if @pings.size > 3 - rescue ex : IO::Error + rescue disconnect! end From 13f301d52d3af6dc8ea3184ccccde043c05f5264 Mon Sep 17 00:00:00 2001 From: Jon Silverman Date: Mon, 23 Feb 2026 21:39:20 -0700 Subject: [PATCH 45/48] refactor --- src/amber/websockets/client_socket.cr | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/src/amber/websockets/client_socket.cr b/src/amber/websockets/client_socket.cr index 5d05fe6d0..c2fbd32c2 100644 --- a/src/amber/websockets/client_socket.cr +++ b/src/amber/websockets/client_socket.cr @@ -96,12 +96,16 @@ module Amber # Sends ping opcode to client : https://tools.ietf.org/html/rfc6455#section-5.5.2 protected def beat - @socket.send("ping") - @socket.ping - @pings.push(Time.utc) - @pings.delete_at(0) if @pings.size > 3 - rescue - disconnect! + begin + @socket.send("ping") + @socket.ping + @pings.push(Time.utc) + @pings.delete_at(0) if @pings.size > 3 + rescue ex : IO::Error + disconnect! + rescue ex : OpenSSL::SSL::Error + disconnect! + end end protected def subscribed_to_topic?(topic) From 01f88966d6149ee2a3b2b13c433e6c5313acc3e8 Mon Sep 17 00:00:00 2001 From: Jon Silverman Date: Tue, 24 Feb 2026 13:15:40 -0700 Subject: [PATCH 46/48] add spawn-bang --- shard.yml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/shard.yml b/shard.yml index 58a38f358..48d808cbe 100644 --- a/shard.yml +++ b/shard.yml @@ -18,6 +18,9 @@ dependencies: github: amberframework/amber-router version: ~> 0.4.4 + spawn-bang: + github: compumike/spawn-bang + cli: github: amberframework/cli version: ~> 0.11.3 From f478de604e395f33d0ba36251762ae2f789f2e80 Mon Sep 17 00:00:00 2001 From: Jon Silverman Date: Tue, 24 Feb 2026 13:17:06 -0700 Subject: [PATCH 47/48] spawn-bangify --- spec/support/helpers/websockets_helper.cr | 4 ++-- src/amber/websockets/adapters/memory.cr | 2 +- src/amber/websockets/adapters/redis.cr | 4 ++-- src/amber/websockets/client_sockets.cr | 2 +- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/spec/support/helpers/websockets_helper.cr b/spec/support/helpers/websockets_helper.cr index 1cb27d639..fcb70a841 100644 --- a/spec/support/helpers/websockets_helper.cr +++ b/spec/support/helpers/websockets_helper.cr @@ -10,7 +10,7 @@ module WebsocketsHelper channel = Channel(Int32).new http_server = nil - spawn do + spawn! do handler = Amber::WebSockets::Server.create_endpoint("/", UserSocket) http_server = server = HTTP::Server.new(handler) address = server.bind_unused_port @@ -20,7 +20,7 @@ module WebsocketsHelper listen_port = channel.receive ws = HTTP::WebSocket.new("ws://127.0.0.1:#{listen_port}") - spawn { ws.run } + spawn! { ws.run } return http_server.not_nil!, ws end end diff --git a/src/amber/websockets/adapters/memory.cr b/src/amber/websockets/adapters/memory.cr index f41bc9bfa..6ee2a6cb3 100644 --- a/src/amber/websockets/adapters/memory.cr +++ b/src/amber/websockets/adapters/memory.cr @@ -9,7 +9,7 @@ module Amber::WebSockets::Adapters # On *message* publish, just call all listeners procs def publish(topic_path, client_socket, message) - spawn do + spawn! do @listeners.select { |l| l[:path] == topic_path }.each { |l| l[:listener].call(client_socket.id, message) } end end diff --git a/src/amber/websockets/adapters/redis.cr b/src/amber/websockets/adapters/redis.cr index c7f6855bd..e1eb2be6d 100644 --- a/src/amber/websockets/adapters/redis.cr +++ b/src/amber/websockets/adapters/redis.cr @@ -27,7 +27,7 @@ module Amber::WebSockets::Adapters @publisher.auth(Amber.settings.secrets["redis_password"]) end - spawn do + spawn! do @subscriber.subscribe(CHANNEL_TOPIC_PATHS) do |on| on.message do |_, m| Fiber.yield @@ -66,7 +66,7 @@ module Amber::WebSockets::Adapters begin @subscriber.subscribe(topic_path) rescue # if we can't do it we're not in a subscribe loop, just resubscribe to all channels - spawn do + spawn! do @subscriber.subscribe(CHANNEL_TOPIC_PATHS) do |on| on.message do |_, m| Fiber.yield diff --git a/src/amber/websockets/client_sockets.cr b/src/amber/websockets/client_sockets.cr index d327e9f3b..83c6d4534 100644 --- a/src/amber/websockets/client_sockets.cr +++ b/src/amber/websockets/client_sockets.cr @@ -11,7 +11,7 @@ module Amber @@client_sockets[client_socket.id] = client_socket # send ping & receive pong control frames, to prevent stale connections : https://tools.ietf.org/html/rfc6455#section-5.5.2 - spawn do + spawn! do while client_socket && !client_socket.socket.closed? sleep ClientSocket::BEAT_INTERVAL From db472f80f5eb3251a929245109e4d95f78f5e3df Mon Sep 17 00:00:00 2001 From: Jon Silverman Date: Tue, 24 Feb 2026 13:18:53 -0700 Subject: [PATCH 48/48] spawn-bang require --- src/amber.cr | 1 + 1 file changed, 1 insertion(+) diff --git a/src/amber.cr b/src/amber.cr index 3c5c3051a..9f050e364 100644 --- a/src/amber.cr +++ b/src/amber.cr @@ -7,6 +7,7 @@ require "kilt" require "kilt/slang" require "redis" require "compiled_license" +require "spawn-bang" require "./amber/version" require "./amber/controller/**"