sql >> Databáze >  >> NoSQL >> Redis

Přístup k proměnné v rámci vlákna rails

AKTUALIZOVANÁ ÚPRAVA NA KONCI:Zobrazuje funkční kód. Hlavní modul nezměněn kromě ladícího kódu. Poznámka:Setkal jsem se s problémem, který jsem již zaznamenal ohledně potřeby odhlásit se před ukončením.

Kód vypadá správně. Chtěl bych vidět, jak to vytváříte.

V config/application.rb pravděpodobně máte alespoň něco jako:

require 'ws_communication'
config.middleware.use WsCommunication

Pak byste ve svém JavaScriptovém klientovi měli mít něco takového:

var ws = new WebSocket(uri);

Vytvoříte instanci další instance WsCommunication? To by nastavilo @clients na prázdné pole a mohlo by vykazovat vaše příznaky. Něco takového by bylo nesprávné:

var ws = new WsCommunication;

Pomohlo by nám, kdybyste ukázali klientovi a možná i config/application.rb, pokud tento příspěvek nepomůže.

Mimochodem souhlasím s komentářem, že @clients by měli být chráněni mutexem při každé aktualizaci, pokud ne čte také. Je to dynamická struktura, která se může v systému řízeném událostmi kdykoli změnit. redis-mutex je dobrá volba. (Doufám, že ten odkaz je správný, protože se zdá, že Github v tuto chvíli hází 500 chyb na všechno.)

Můžete si také všimnout, že $redis.publish vrací celočíselnou hodnotu počtu klientů, kteří zprávu obdrželi.

Nakonec možná zjistíte, že před ukončením musíte zajistit, aby byl váš kanál odhlášen. Měl jsem situace, kdy jsem každou zprávu poslal vícekrát, dokonce mnohokrát, kvůli dřívějším odběrům stejného kanálu, které nebyly vyčištěny. Vzhledem k tomu, že se přihlásíte k odběru kanálu v rámci vlákna, budete se muset v rámci stejného vlákna odhlásit z odběru, jinak se proces prostě „zasekne“ a čeká, až se jako kouzlem objeví správné vlákno. Tuto situaci řeším tak, že nastavím příznak „odhlásit se“ a poté odešlem zprávu. Poté v rámci bloku on.message otestuji příznak odhlášení a vystavím tam odhlášení.

Modul, který jste poskytli, pouze s malými úpravami ladění:

require 'faye/websocket'
require 'redis'

class WsCommunication
  KEEPALIVE_TIME = 15 #seconds
  CHANNEL = 'vip-deck'

  def initialize(app)
    @app = app
    @clients = []
    uri = URI.parse(ENV['REDISCLOUD_URL'])
    $redis = Redis.new(host: uri.host, port: uri.port, password: uri.password)
    Thread.new do
      redis_sub = Redis.new(host: uri.host, port: uri.port, password: uri.password)
      redis_sub.subscribe(CHANNEL) do |on|
        on.message do |channel, msg|
          puts "Message event. Clients receiving:#{@clients.count};"
          @clients.each { |ws| ws.send(msg) }
        end
      end
    end
  end

  def call(env)
    if Faye::WebSocket.websocket?(env)
      ws = Faye::WebSocket.new(env, nil, {ping: KEEPALIVE_TIME})

      ws.on :open do |event|
        @clients << ws
        puts "Open event. Clients open:#{@clients.count};"
      end

      ws.on :message do |event|
        receivers = $redis.publish(CHANNEL, event.data)
        puts "Message published:#{event.data}; Receivers:#{receivers};"
      end

      ws.on :close do |event|
        @clients.delete(ws)
        puts "Close event. Clients open:#{@clients.count};"
        ws = nil
      end

      ws.rack_response
    else
      @app.call(env)
    end
  end
end

Kód testovacího předplatitele, který jsem poskytl:

# encoding: UTF-8
puts "Starting client-subscriber.rb"
$:.unshift File.expand_path '../lib', File.dirname(__FILE__)
require 'rubygems'
require 'eventmachine'
require 'websocket-client-simple'

puts "websocket-client-simple v#{WebSocket::Client::Simple::VERSION}"

url = ARGV.shift || 'ws://localhost:3000'

EM.run do

  ws = WebSocket::Client::Simple.connect url

  ws.on :message do |msg|
    puts msg
  end

  ws.on :open do
    puts "-- Subscriber open (#{ws.url})"
  end

  ws.on :close do |e|
    puts "-- Subscriber close (#{e.inspect})"
    exit 1
  end

  ws.on :error do |e|
    puts "-- Subscriber error (#{e.inspect})"
  end

end

Testovací kód vydavatele, který jsem poskytl. Vydavatel a odběratel lze snadno kombinovat, protože se jedná pouze o testy:

# encoding: UTF-8
puts "Starting client-publisher.rb"
$:.unshift File.expand_path '../lib', File.dirname(__FILE__)
require 'rubygems'
require 'eventmachine'
require 'json'
require 'websocket-client-simple'

puts "websocket-client-simple v#{WebSocket::Client::Simple::VERSION}"

url = ARGV.shift || 'ws://localhost:3000'

EM.run do
  count ||= 0
  timer = EventMachine.add_periodic_timer(5+rand(5)) do
    count += 1
    send({"MESSAGE": "COUNT:#{count};"})
  end

  @ws = WebSocket::Client::Simple.connect url

  @ws.on :message do |msg|
    puts msg
  end

  @ws.on :open do
    puts "-- Publisher open"
  end

  @ws.on :close do |e|
    puts "-- Publisher close (#{e.inspect})"
    exit 1
  end

  @ws.on :error do |e|
    puts "-- Publisher error (#{e.inspect})"
    @ws.close
  end

  def self.send message
    payload = message.is_a?(Hash) ? message : {payload: message}
    @ws.send(payload.to_json)
  end
end

Ukázka config.ru, která toto vše spouští na vrstvě middlewaru racku:

require './controllers/main'
require './middlewares/ws_communication'
use WsCommunication
run Main.new

Toto je hlavní. Odstranil jsem to ze své běžící verze, takže možná bude potřeba ho upravit, pokud ho používáte:

%w(rubygems bundler sinatra/base json erb).each { |m| require m }
ENV['RACK_ENV'] ||= 'development'
Bundler.require
$: << File.expand_path('../', __FILE__)
$: << File.expand_path('../lib', __FILE__)

Dir["./lib/*.rb", "./lib/**/*.rb"].each { |file| require file }
env = ENV['OS'] == 'Windows_NT' ? 'development' : ENV['RACK_ENV']

  class Main < Sinatra::Base

    env = ENV['OS'] == 'Windows_NT' ? 'development' : ENV['RACK_ENV']
    get "/" do
      erb :"index.html"
    end

    get "/assets/js/application.js" do
      content_type :js
      @scheme = env == "production" ? "wss://" : "ws://"
      erb :"application.js"
    end
  end


  1. Nastavení dynamického pole v Ohm / Redis

  2. Java/MongoDB dotaz podle data

  3. Autentizace během připojení k instanci serveru MongoDB pomocí Java

  4. Základní kurz MongoDB