multithreading - catch undefined method exception from a thread and restart it in ruby -
i have activemq topic subscriber in ruby uses stomp protocol failover connect broker , if somehow activemq gets restarted exception :
undefined method `command' nil:nilclass /usr/lib64/ruby/gems/1.8/gems/stomp-1.1.8/lib/stomp/client.rb:295:in `start_listeners' /usr/lib64/ruby/gems/1.8/gems/stomp-1.1.8/lib/stomp/client.rb:108:in `join' /usr/lib64/ruby/gems/1.8/gems/stomp-1.1.8/lib/stomp/client.rb:108:in `join' ./lib/active_mq_topic_reader.rb:31:in `active_mq_topic_reader' main.rb:164 main.rb:163:in `initialize' main.rb:163:in `new'
but exception when use join() method on broker thread, otherwise no exception appears , subscriber unsubscribed topic. problem facing have different mechanism of shutting down process sending shutdown signal, , till process waits, if use join() process stuck on line , not able close method shutdown signal.so should catch exception , restart listener thread?
active_mq_topic_reader.rb :
require 'rubygems' require 'ffi-rzmq' require 'msgpack' require 'zmq_helper' require 'stomp' include zmqhelper def active_mq_topic_reader(context, notice_agg_fe_url, signal_pub_url, monitor_url, active_mq_broker, topic) begin sender = create_connect_socket(context, zmq::push, notice_agg_fe_url) monitor = create_connect_socket(context, zmq::push, monitor_url) active_mq_broker.subscribe(topic, {}) |msg| notice = {} ["entity_id","entity_type","system_name","change_type"].each |key| notice[key] = msg.headers[key].to_s end monitor.send_string("qreader-#{topic.slice(7..-1)}") sender.send_string(notice.to_msgpack) end active_mq_broker.join() #cannot use signal_subscriber.recv_string() #here code waits shutdown signal in case of process shutdown sender.close() monitor.close() signal_subscriber.close() active_mq_broker.unsubscribe(topic) return rescue exception => e puts "#{topic}: #{e}" puts e.backtrace $stdout.flush end end
main.rb :
context = zmq::context.new(1) active_mq_broker_audit = stomp::client.new("failover:(stomp://localhost:61613,stomp://localhost:61613)") new_thread = thread.new active_mq_topic_reader(context, "inproc://notice_agg_fe", "inproc://signal_pub", "tcp://localhost:xxxx", active_mq_broker_audit, "/topic/mytopic") end new_thread.join()
Comments
Post a Comment