AKTUALIZOVANÁ ÚPRAVA NA KONCI:Zobrazuje funkční kód. Hlavní modul nezměněn kromě ladícího kódu. Poznámka:Setkal jsem se s problémem, který jsem již zaznamenal ohledně potřeby odhlásit se před ukončením.
Kód vypadá správně. Chtěl bych vidět, jak to vytváříte.
V config/application.rb pravděpodobně máte alespoň něco jako:
require 'ws_communication'
config.middleware.use WsCommunication
Pak byste ve svém JavaScriptovém klientovi měli mít něco takového:
var ws = new WebSocket(uri);
Vytvoříte instanci další instance WsCommunication? To by nastavilo @clients na prázdné pole a mohlo by vykazovat vaše příznaky. Něco takového by bylo nesprávné:
var ws = new WsCommunication;
Pomohlo by nám, kdybyste ukázali klientovi a možná i config/application.rb, pokud tento příspěvek nepomůže.
Mimochodem souhlasím s komentářem, že @clients by měli být chráněni mutexem při každé aktualizaci, pokud ne čte také. Je to dynamická struktura, která se může v systému řízeném událostmi kdykoli změnit. redis-mutex je dobrá volba. (Doufám, že ten odkaz je správný, protože se zdá, že Github v tuto chvíli hází 500 chyb na všechno.)
Můžete si také všimnout, že $redis.publish vrací celočíselnou hodnotu počtu klientů, kteří zprávu obdrželi.
Nakonec možná zjistíte, že před ukončením musíte zajistit, aby byl váš kanál odhlášen. Měl jsem situace, kdy jsem každou zprávu poslal vícekrát, dokonce mnohokrát, kvůli dřívějším odběrům stejného kanálu, které nebyly vyčištěny. Vzhledem k tomu, že se přihlásíte k odběru kanálu v rámci vlákna, budete se muset v rámci stejného vlákna odhlásit z odběru, jinak se proces prostě „zasekne“ a čeká, až se jako kouzlem objeví správné vlákno. Tuto situaci řeším tak, že nastavím příznak „odhlásit se“ a poté odešlem zprávu. Poté v rámci bloku on.message otestuji příznak odhlášení a vystavím tam odhlášení.
Modul, který jste poskytli, pouze s malými úpravami ladění:
require 'faye/websocket'
require 'redis'
class WsCommunication
KEEPALIVE_TIME = 15 #seconds
CHANNEL = 'vip-deck'
def initialize(app)
@app = app
@clients = []
uri = URI.parse(ENV['REDISCLOUD_URL'])
$redis = Redis.new(host: uri.host, port: uri.port, password: uri.password)
Thread.new do
redis_sub = Redis.new(host: uri.host, port: uri.port, password: uri.password)
redis_sub.subscribe(CHANNEL) do |on|
on.message do |channel, msg|
puts "Message event. Clients receiving:#{@clients.count};"
@clients.each { |ws| ws.send(msg) }
end
end
end
end
def call(env)
if Faye::WebSocket.websocket?(env)
ws = Faye::WebSocket.new(env, nil, {ping: KEEPALIVE_TIME})
ws.on :open do |event|
@clients << ws
puts "Open event. Clients open:#{@clients.count};"
end
ws.on :message do |event|
receivers = $redis.publish(CHANNEL, event.data)
puts "Message published:#{event.data}; Receivers:#{receivers};"
end
ws.on :close do |event|
@clients.delete(ws)
puts "Close event. Clients open:#{@clients.count};"
ws = nil
end
ws.rack_response
else
@app.call(env)
end
end
end
Kód testovacího předplatitele, který jsem poskytl:
# encoding: UTF-8
puts "Starting client-subscriber.rb"
$:.unshift File.expand_path '../lib', File.dirname(__FILE__)
require 'rubygems'
require 'eventmachine'
require 'websocket-client-simple'
puts "websocket-client-simple v#{WebSocket::Client::Simple::VERSION}"
url = ARGV.shift || 'ws://localhost:3000'
EM.run do
ws = WebSocket::Client::Simple.connect url
ws.on :message do |msg|
puts msg
end
ws.on :open do
puts "-- Subscriber open (#{ws.url})"
end
ws.on :close do |e|
puts "-- Subscriber close (#{e.inspect})"
exit 1
end
ws.on :error do |e|
puts "-- Subscriber error (#{e.inspect})"
end
end
Testovací kód vydavatele, který jsem poskytl. Vydavatel a odběratel lze snadno kombinovat, protože se jedná pouze o testy:
# encoding: UTF-8
puts "Starting client-publisher.rb"
$:.unshift File.expand_path '../lib', File.dirname(__FILE__)
require 'rubygems'
require 'eventmachine'
require 'json'
require 'websocket-client-simple'
puts "websocket-client-simple v#{WebSocket::Client::Simple::VERSION}"
url = ARGV.shift || 'ws://localhost:3000'
EM.run do
count ||= 0
timer = EventMachine.add_periodic_timer(5+rand(5)) do
count += 1
send({"MESSAGE": "COUNT:#{count};"})
end
@ws = WebSocket::Client::Simple.connect url
@ws.on :message do |msg|
puts msg
end
@ws.on :open do
puts "-- Publisher open"
end
@ws.on :close do |e|
puts "-- Publisher close (#{e.inspect})"
exit 1
end
@ws.on :error do |e|
puts "-- Publisher error (#{e.inspect})"
@ws.close
end
def self.send message
payload = message.is_a?(Hash) ? message : {payload: message}
@ws.send(payload.to_json)
end
end
Ukázka config.ru, která toto vše spouští na vrstvě middlewaru racku:
require './controllers/main'
require './middlewares/ws_communication'
use WsCommunication
run Main.new
Toto je hlavní. Odstranil jsem to ze své běžící verze, takže možná bude potřeba ho upravit, pokud ho používáte:
%w(rubygems bundler sinatra/base json erb).each { |m| require m }
ENV['RACK_ENV'] ||= 'development'
Bundler.require
$: << File.expand_path('../', __FILE__)
$: << File.expand_path('../lib', __FILE__)
Dir["./lib/*.rb", "./lib/**/*.rb"].each { |file| require file }
env = ENV['OS'] == 'Windows_NT' ? 'development' : ENV['RACK_ENV']
class Main < Sinatra::Base
env = ENV['OS'] == 'Windows_NT' ? 'development' : ENV['RACK_ENV']
get "/" do
erb :"index.html"
end
get "/assets/js/application.js" do
content_type :js
@scheme = env == "production" ? "wss://" : "ws://"
erb :"application.js"
end
end