diff --git a/shard.yml b/shard.yml index 56b86d73a..48d808cbe 100644 --- a/shard.yml +++ b/shard.yml @@ -18,6 +18,9 @@ dependencies: github: amberframework/amber-router version: ~> 0.4.4 + spawn-bang: + github: compumike/spawn-bang + cli: github: amberframework/cli version: ~> 0.11.3 @@ -50,13 +53,8 @@ dependencies: github: crystal-lang/crystal-mysql version: ~> 0.14.0 - sqlite3: - github: crystal-lang/crystal-sqlite3 - version: ~> 0.19.0 - redis: - github: stefanwille/crystal-redis - version: ~> 2.8.0 + github: jonsilverman50-star/crystal-redis shell-table: github: luckyframework/shell-table.cr diff --git a/spec/amber/websockets/adapters/redis_adapter.cr b/spec/amber/websockets/adapters/redis_adapter.cr new file mode 100644 index 000000000..05371c51b --- /dev/null +++ b/spec/amber/websockets/adapters/redis_adapter.cr @@ -0,0 +1,70 @@ + + +require "../../../spec_helper" + +module Amber + describe Amber::WebSockets::Adapters::RedisAdapter do + + describe "#initialize" do + it "should subscribe to CHANNEL_TOPIC_PATHS" do + + + _, client_socket = create_user_socket + _, client_socket2 = create_user_socket + + + + channel = UserSocket.channels[0][:channel] + channel2 = UserSocket.channels[1][:channel] + + channel.subscribe_to_channel(client_socket, "{}") + channel.subscribe_to_channel(client_socket2, "{}") + + channel2.subscribe_to_channel(client_socket2, "{}") + channel2.subscribe_to_channel(client_socket, "{}") + + Amber::Server.pubsub_adapter = Amber::WebSockets::Adapters::RedisAdapter + + redis_adapter = Amber::Server.instance.pubsub_adapter.instance + + sleep 1.second + + redis_adapter.as(Amber::WebSockets::Adapters::RedisAdapter).subscribed.should eq true + end + end + + describe "#publish" do + it "should publish the message to the channel" do + _, client_socket = create_user_socket + _, client_socket2 = create_user_socket + + Amber::Server.instance.pubsub_adapter = Amber::WebSockets::Adapters::RedisAdapter + + channel = UserSocket.channels[0][:channel] + channel2 = UserSocket.channels[1][:channel] + + channel.subscribe_to_channel(client_socket, "{}") + channel.subscribe_to_channel(client_socket2, "{}") + + channel2.subscribe_to_channel(client_socket2, "{}") + channel2.subscribe_to_channel(client_socket, "{}") + + # channel.test_field.last.should eq "handle joined #{client_socket.id}" + Amber::WebSockets::CHANNEL_TOPIC_PATHS.should eq ["user_room", "secondary_room"] + + redis_adapter = Amber::WebSockets::Adapters::RedisAdapter.new + + sleep 1.second + + redis_adapter.subscribed.should eq true + + channel = UserSocket.channels[0][:channel] + message = JSON.parse({"event" => "message", "topic" => "user_room:123", "subject" => "msg:new", "payload" => {"message" => "hey guys"}}.to_json) + channel.on_message("123", message) + channel.test_field.last.should eq "hey guys" + end + end + + + end +end diff --git a/spec/support/fixtures/websockets_fixtures.cr b/spec/support/fixtures/websockets_fixtures.cr index 0b473f09b..9ceb60b02 100644 --- a/spec/support/fixtures/websockets_fixtures.cr +++ b/spec/support/fixtures/websockets_fixtures.cr @@ -2,6 +2,7 @@ struct UserSocket < Amber::WebSockets::ClientSocket property test_field = Array(String).new channel "user_room:*", UserChannel + channel "secondary_room:*", SecondaryChannel def on_disconnect(**args) test_field.push("on close #{self.id}") @@ -23,3 +24,19 @@ class UserChannel < Amber::WebSockets::Channel test_field.push(msg["payload"]["message"].as_s) end end + +class SecondaryChannel < Amber::WebSockets::Channel + property test_field = Array(String).new + + def handle_leave(client_socket) + test_field.push("secondary channel handle leave #{client_socket.id}") + end + + def handle_joined(client_socket, msg) + test_field.push("secondary channel handle joined #{client_socket.id}") + end + + def handle_message(client_socket, msg) + test_field.push("secondary channel #{msg["payload"]["message"].as_s}") + end +end diff --git a/spec/support/helpers/websockets_helper.cr b/spec/support/helpers/websockets_helper.cr index 1cb27d639..fcb70a841 100644 --- a/spec/support/helpers/websockets_helper.cr +++ b/spec/support/helpers/websockets_helper.cr @@ -10,7 +10,7 @@ module WebsocketsHelper channel = Channel(Int32).new http_server = nil - spawn do + spawn! do handler = Amber::WebSockets::Server.create_endpoint("/", UserSocket) http_server = server = HTTP::Server.new(handler) address = server.bind_unused_port @@ -20,7 +20,7 @@ module WebsocketsHelper listen_port = channel.receive ws = HTTP::WebSocket.new("ws://127.0.0.1:#{listen_port}") - spawn { ws.run } + spawn! { ws.run } return http_server.not_nil!, ws end end diff --git a/src/amber.cr b/src/amber.cr index 3c5c3051a..9f050e364 100644 --- a/src/amber.cr +++ b/src/amber.cr @@ -7,6 +7,7 @@ require "kilt" require "kilt/slang" require "redis" require "compiled_license" +require "spawn-bang" require "./amber/version" require "./amber/controller/**" diff --git a/src/amber/cli/templates/app/public/js/amber.js b/src/amber/cli/templates/app/public/js/amber.js index 1abbd755f..d66fdbbb5 100644 --- a/src/amber/cli/templates/app/public/js/amber.js +++ b/src/amber/cli/templates/app/public/js/amber.js @@ -39,14 +39,22 @@ export class Channel { * Join a channel, subscribe to all channels messages */ join() { - this.socket.ws.send(JSON.stringify({ event: EVENTS.join, topic: this.topic })) + try { + this.socket.ws.send(JSON.stringify({ event: EVENTS.join, topic: this.topic })) + } catch { + this._reconnect() + } } /** * Leave a channel, stop subscribing to channel messages */ leave() { - this.socket.ws.send(JSON.stringify({ event: EVENTS.leave, topic: this.topic })) + try { + this.socket.ws.send(JSON.stringify({ event: EVENTS.leave, topic: this.topic })) + } catch { + this._reconnect() + } } /** @@ -73,7 +81,11 @@ export class Channel { * @param {Object} payload - payload object: `{message: 'hello'}` */ push(subject, payload) { - this.socket.ws.send(JSON.stringify({ event: EVENTS.message, topic: this.topic, subject: subject, payload: payload })) + try { + this.socket.ws.send(JSON.stringify({ event: EVENTS.message, topic: this.topic, subject: subject, payload: payload })) + } catch { + this._reconnect() + } } } diff --git a/src/amber/server/server.cr b/src/amber/server/server.cr index 42abf79f7..8d0425f1e 100644 --- a/src/amber/server/server.cr +++ b/src/amber/server/server.cr @@ -27,6 +27,11 @@ module Amber instance.pubsub_adapter.instance end + def self.pubsub_adapter=(adapter) + instance.pubsub_adapter = adapter + Log.info { "using #{instance.pubsub_adapter.instance}" } + end + def self.router instance.router end diff --git a/src/amber/websockets/adapters/memory.cr b/src/amber/websockets/adapters/memory.cr index f41bc9bfa..6ee2a6cb3 100644 --- a/src/amber/websockets/adapters/memory.cr +++ b/src/amber/websockets/adapters/memory.cr @@ -9,7 +9,7 @@ module Amber::WebSockets::Adapters # On *message* publish, just call all listeners procs def publish(topic_path, client_socket, message) - spawn do + spawn! do @listeners.select { |l| l[:path] == topic_path }.each { |l| l[:listener].call(client_socket.id, message) } end end diff --git a/src/amber/websockets/adapters/redis.cr b/src/amber/websockets/adapters/redis.cr index 4a2c044a9..e1eb2be6d 100644 --- a/src/amber/websockets/adapters/redis.cr +++ b/src/amber/websockets/adapters/redis.cr @@ -3,15 +3,55 @@ module Amber::WebSockets::Adapters class RedisAdapter @subscriber : Redis @publisher : Redis + @subscribed : Bool = false + @listeners : Hash(String,Proc(String, JSON::Any, Nil)) = Hash(String, Proc(String, JSON::Any, Nil)).new def self.instance @@instance ||= new end + def subscribed # test helper + @subscribed + end + # Establish subscribe and publish connections to Redis def initialize - @subscriber = Redis.new(url: Amber.settings.redis_url) - @publisher = Redis.new(url: Amber.settings.redis_url) + + uri = URI.parse(Amber.settings.redis_url.to_s) + + @subscriber = Redis.new(host: uri.host.to_s, port: uri.port.to_s.to_i) + @publisher = Redis.new(host: uri.host.to_s, port: uri.port.to_s.to_i) + + if Amber.settings.secrets.has_key?("redis_password") + @subscriber.auth(Amber.settings.secrets["redis_password"]) + @publisher.auth(Amber.settings.secrets["redis_password"]) + end + + spawn! do + @subscriber.subscribe(CHANNEL_TOPIC_PATHS) do |on| + on.message do |_, m| + Fiber.yield + msg = JSON.parse(m) + sender_id = msg["sender"].as_s + message = msg["msg"] + channel_name = message["topic"].to_s.split(":").first + if @listeners.has_key?(channel_name) + @listeners[channel_name].call(sender_id, message) + end + end + on.subscribe do |channel, subscriptions| + Fiber.yield + Log.info { "Subscribed to Redis channel #{channel}" } + @subscribed = true + end + on.unsubscribe do |channel, subscriptions| + Fiber.yield + Log.info { "Unsubscribed from Redis channel #{channel}" } + @subscribed = false + end + end + end + end # Publish the *message* to the redis publisher with topic *topic_path* @@ -21,13 +61,33 @@ module Amber::WebSockets::Adapters # Add a redis subscriber with topic *topic_path* def on_message(topic_path, listener) - spawn do - @subscriber.subscribe(topic_path) do |on| - on.message do |_, m| - msg = JSON.parse(m) - sender_id = msg["sender"].as_s - message = msg["msg"] - listener.call(sender_id, message) + Log.info { "Setting websocket adapter listener for #{topic_path}"} + @listeners[topic_path] = listener + begin + @subscriber.subscribe(topic_path) + rescue # if we can't do it we're not in a subscribe loop, just resubscribe to all channels + spawn! do + @subscriber.subscribe(CHANNEL_TOPIC_PATHS) do |on| + on.message do |_, m| + Fiber.yield + msg = JSON.parse(m) + sender_id = msg["sender"].as_s + message = msg["msg"] + channel_name = message["topic"].to_s.split(":").first + if @listeners.has_key?(channel_name) + @listeners[channel_name].call(sender_id, message) + end + end + on.subscribe do |channel, subscriptions| + Fiber.yield + Log.info { "Subscribed to Redis channel #{channel}" } + @subscribed = true + end + on.unsubscribe do |channel, subscriptions| + Fiber.yield + Log.info { "Unsubscribed from Redis channel #{channel}" } + @subscribed = false # just in case we do get unsubscribed some how + end end end end diff --git a/src/amber/websockets/channel.cr b/src/amber/websockets/channel.cr index 0e3092763..00749c895 100644 --- a/src/amber/websockets/channel.cr +++ b/src/amber/websockets/channel.cr @@ -21,6 +21,11 @@ module Amber # end # end # ``` + CHANNEL_TOPIC_PATHS = [] of String + + SUBSCRIBE_CHANNEL = ::Channel(String).new + + abstract class Channel @@adapter : WebSockets::Adapters::RedisAdapter? | WebSockets::Adapters::MemoryAdapter? @topic_path : String @@ -32,11 +37,19 @@ module Amber def handle_leave(client_socket); end - def initialize(@topic_path); end + def initialize(@topic_path) + + CHANNEL_TOPIC_PATHS << @topic_path + + end # Called from proc when message is returned from the pubsub service def on_message(client_socket_id, message) client_socket = ClientSockets.client_sockets[client_socket_id]? + if client_socket.nil? + ClientSockets.client_sockets.delete(client_socket) + return + end handle_message(client_socket, message) end @@ -51,11 +64,19 @@ module Amber # Called when a socket subscribes to a channel protected def subscribe_to_channel(client_socket, message) + if client_socket.nil? + ClientSockets.client_sockets.delete(client_socket) + return + end handle_joined(client_socket, message) end # Called when a socket unsubscribes from a channel protected def unsubscribe_from_channel(client_socket) + if client_socket.nil? + ClientSockets.client_sockets.delete(client_socket) + return + end handle_leave(client_socket) end @@ -68,7 +89,9 @@ module Amber # example message: {"event" => "message", "topic" => "rooms:123", "subject" => "msg:new", "payload" => {"message" => "hello"}} protected def rebroadcast!(message) subscribers = ClientSockets.get_subscribers_for_topic(message["topic"]) - subscribers.each_value(&.socket.send(message.to_json)) + subscribers.each_value do |subscriber| + subscriber.socket.send(message.to_json) rescue ClientSockets.client_sockets.delete(subscriber.socket) + end end # Ensure the pubsub adapter instance exists, and set up the on_message proc callback diff --git a/src/amber/websockets/client_socket.cr b/src/amber/websockets/client_socket.cr index f7a77de47..c2fbd32c2 100644 --- a/src/amber/websockets/client_socket.cr +++ b/src/amber/websockets/client_socket.cr @@ -70,8 +70,11 @@ module Amber @raw_params = @context.params @params = Amber::Validators::Params.new(@raw_params) @socket.on_pong do + # Log.info { "Pong received" } @pongs.push(Time.utc) @pongs.delete_at(0) if @pongs.size > 3 + Fiber.yield + check_alive! end end @@ -93,13 +96,16 @@ module Amber # Sends ping opcode to client : https://tools.ietf.org/html/rfc6455#section-5.5.2 protected def beat - @socket.send("ping") - @socket.ping - @pings.push(Time.utc) - @pings.delete_at(0) if @pings.size > 3 - check_alive! - rescue ex : IO::Error - disconnect! + begin + @socket.send("ping") + @socket.ping + @pings.push(Time.utc) + @pings.delete_at(0) if @pings.size > 3 + rescue ex : IO::Error + disconnect! + rescue ex : OpenSSL::SSL::Error + disconnect! + end end protected def subscribed_to_topic?(topic) @@ -133,7 +139,10 @@ module Amber # disconnect if no pongs have been received # or no pongs have been received beyond the threshold time + # puts "pongs empty #{@pongs.empty?}" + # puts "socket over max idle time #{(@pings.last - @pongs.first) > MAX_SOCKET_IDLE_TIME}" if @pongs.empty? || (@pings.last - @pongs.first) > MAX_SOCKET_IDLE_TIME + Log.info { "Disconnected websocket because pings empty or socket idle too long" } disconnect! end end diff --git a/src/amber/websockets/client_sockets.cr b/src/amber/websockets/client_sockets.cr index 2a0af1057..83c6d4534 100644 --- a/src/amber/websockets/client_sockets.cr +++ b/src/amber/websockets/client_sockets.cr @@ -11,16 +11,20 @@ module Amber @@client_sockets[client_socket.id] = client_socket # send ping & receive pong control frames, to prevent stale connections : https://tools.ietf.org/html/rfc6455#section-5.5.2 - spawn do + spawn! do while client_socket && !client_socket.socket.closed? + sleep ClientSocket::BEAT_INTERVAL client_socket.beat end end + end def remove_client_socket(client_socket) - @@client_sockets.delete(client_socket.id) + if @@client_sockets.has_key?(client_socket.id) + @@client_sockets.delete(client_socket.id) + end end def client_sockets diff --git a/src/amber/websockets/server.cr b/src/amber/websockets/server.cr index d074fae52..d02b2cea6 100644 --- a/src/amber/websockets/server.cr +++ b/src/amber/websockets/server.cr @@ -17,6 +17,7 @@ module Amber end socket.on_close do + Log.info { "Client socket closed" } instance.on_disconnect ClientSockets.remove_client_socket(instance) end