bunny
Bunny is a popular, easy to use, well-maintained Ruby client for RabbitMQ (3.3+)
Bunny, a dead easy to use RabbitMQ Ruby client
I'm integrating Bunny gem for RabbitMQ with Rails, should I start Bunny thread in an initializer that Rails starts with application start or do it in a separate rake task so I can start it in a separate process ?
I think if I'm producing messages only then I need to do it in Rails initialzer so it can be used allover the app, but if I'm consuming I should do it in a separate rake task, is this correct ?
Source: (StackOverflow)
I have developed an algorithm to compute the volume of point set.And now I plan to use Stanford bunny model for testing my algorithm,but I haven't found the true value of the volume yet,so I don't know whether the value I computed is accurate.Is there anybody who knows the true volume of Stanford bunny model?
Source: (StackOverflow)
I have an application that I wrote in .NET that can monitor multiple RabbitMq queues using a single consumer.
For example:
using (IConnection connection = factory.CreateConnection())
{
using (IModel channel = connection.CreateModel())
{
var _consumer = new QueueingBasicConsumer(channel);
string[] list = new string[] { "Queue1", "Queue2", "Queue3", "Queue4" };
_consumer = new QueueingBasicConsumer(channel);
foreach (string currQueueName in list)
{
channel.QueueDeclare(currQueueName, false, false, false, null);
channel.BasicConsume(currQueueName, true, _consumer);
}
while (true)
{
var ea = (BasicDeliverEventArgs)_consumer.Queue.Dequeue();
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received {0}", message);
ProcessMessage(message);
}
}
}
Basically, I just want to be able to distribute work across multiple queues, but only have a single application consuming them all (or multiple applications can be deployed and perform the same function).
I'm trying to spread work out across queues so that consumers are taking jobs equally across the queues.
Is this possible using Bunny, or the native Ruby driver?
Source: (StackOverflow)
I have this code of a rabbitmq consumer using bunny that should listen to messages published to a rabbitmq queue and display a flash notice on the view whenever a message is consumed. The consumer is running in a different session from the producer though they are in the same application. The application uses a direct exchange which uses the message receiver's email as the routing_key. I would like when a message with a routing_key
similar to the current_user's email is published, a flash message is displayed for that user indicating that he has a new message without refreshing the page. I want behavior similar to Facebook notifications.
The producer code looks like this:
class MessagesController < ApplicationController
def create
@message = Message.new(message_params)
@message.creator_id = current_user.id
@message.receiver_id = params[:message][:receiver_id]
if @message.save
email = @message.receiver.email
$message_exchange.publish(@message.content, :routing_key => email)
redirect_to user_path(current_user)
end
end
The consumer code: looks like this:
email = current_user.email
$message_queue.bind($message_exchange, :routing_key => email)
logger.info " [*] Waiting for messages. To exit press CTRL+C"
$message_queue.subscribe(:manual_ack => true) do |delivery_info, properties, body|
logger.info " [x] #{delivery_info.routing_key}:#{body}"
if delivery_info.routing_key == current_user.email
flash[:notice] = "you have new message: #{body}"
end
end
The problem is that i don't know where to put the consumer code. I have tried putting the code as a method in the application controller but this does not seem to work. Any suggestion on how to do this better is highly appreciated.
Source: (StackOverflow)
I want to start some consumers simultaneously in different threads. Here is rake task that starts consumers:
namespace :messaging do
desc 'Start consumers'
task start: :environment do
queues = { 'app.queue1' => 4, 'app.queue2' => 10, 'app.queue3' => 50 } # etc
puts "Starts #{queues.size} consumers"
threads = queues.map do |name, threads_number|
Thread.new do
Messaging::Consumer.new(MESSAGING_CONFIG, threads_number).subscribe name
end
end
threads.each(&:join)
end
end
And Messaging::Consumer
class (slightly simplified version):
class Messaging::Consumer
TIMEOUT = 120 # 2 minutes
def initialize options = {}, threads_number = 1
@options = options || {}
@threads_number = (threads_number || 1).to_i
end
def subscribe queue_name
queue = queue queue_name
puts "#{queue_name} connected"
queue.subscribe(manual_ack: true, block: true) do |delivery_info, properties, body|
begin
message = JSON.parse(body).with_indifferent_access
ActiveRecord::Base.connection_pool.with_connection do
Timeout::timeout(TIMEOUT) do
# somehow process message
end
end
rescue Exception => e
# retry message manually through retry exchange here
ensure
channel.ack delivery_info.delivery_tag
end
end
end
private
def connection
@connection ||= begin
conn = Bunny.new @options.merge(automatically_recover: true,
recover_from_connection_close: true)
at_exit do
self.close_connection
end
conn.start
conn
end
end
def close_connection
@channel.try(:close) unless @channel.try(:closed?)
@connection.try(:close) unless @connection.try(:closed?)
puts "closed connection"
ensure
@connection, @channel, @queues = nil, nil, {}
end
def channel
@channel ||= begin
ch = connection.create_channel nil, @threads_number
ch.prefetch @threads_number
ch
end
end
def queue name, exchange = nil, arguments = {}
@queues ||= {}
@queues[name] ||= begin
q = channel.queue name, arguments.merge(durable: true, auto_delete: false)
q.bind (exchange || default_exchange), routing_key: name
q
end
end
def default_exchange
@default_exchange ||= create_exchange MAIN_EXCHANGE
end
def create_exchange name
channel.direct name, durable: true
end
end
First problem:: when messaging:start
is executed it doesn't necessarily start consumers for each queue (e.g. 2 of 9; it means doesn't puts "#{queue_name} connected"
9 times). It happens sometimes.
Second problem: In some time (it may be 10 minutes or 10 hours) unacked messages appears blocking message processing. How can it be possible? Messages are acked in ensure block which is always executed. Any ideas?
I use rails 4.1.5, bunny 1.3.1, ruby 2.1.2, rabbitmq 3.4.2.
Solution: the problem really was not in the code above: i used the same channel to retry (publish) messages. So after fixing it everything goes fine.
Source: (StackOverflow)
I am using bunny gem to pop a string off from a queue. The string is a url that I'd like to access. So here's the code I've come up with:
# set up channel and queue ...
queue.subscribe(manual_ack: true, block: false) do |delivery_info, _, payload|
response = Net::HTTP.get(URI(payload))
# do something with the response ... (for now leave blank)
channel.acknowledge(delivery_info.delivery_tag, true) # ack
end
However, the ack never came back. Here is what I have tried:
- Using
pry
to debug this. During the debug session, calling Net::HTTP.get(URI(payload))
directly from the debug console works, and response came back. Stepping through the debugger ran into bunny raising timeout errors. But even if there was no timeout errors, the ack
never happened.
- switch to use other http clients including
open-uri
, and httparty
. The same symptom happens. This leads me to think there might be a naming conflict as a result of interactions between bunny
and some low level call all these http clients share.
Has anyone come across this before? Suggestions on how to troubleshoot this further?
Source: (StackOverflow)
When my Hutch consumer loses connection to a database I would like to requeue all the messages I got and try to handle (and save to the DB) them later.
I found I can use requeue! method like this in my consumer:
def process(message)
handle_message(message)
rescue ActiveRecord::ConnectionNotEstablished => error
Rails.logger.warn("Connection to database is broken: #{error}")
requeue!
ensure
::ActiveRecord::Base.clear_active_connections!
end
end
But then I will get that message instantly back from Rabbit, and so, my consumer stuck in trying to process this message when it's obviously can't be saved to the DB.
Is it possible to put a timeout on either Hutch or RabbitMQ site in this situation?
Source: (StackOverflow)
I have a piece of code that uses bunny to grab an message from a source_queue
, transform it into multiple messages, and publish those messages onto destination_queue
.
def run
source_queue.subscribe_one do |message|
destination_queue.push_all(transform(message))
end
end
def subscribe_one
source_queue.subscribe(manual_ack: true, block: true) do |delivery_info, _, payload|
yield payload if block_given?
source_channel.acknowledge(delivery_info.delivery_tag, false)
delivery_info.consumer.cancel
end
end
def push_all(payloads)
destination_channel.tx_select
payloads.each do |payload|
destination_queue.publish(payload, persistent: true)
end
destination_channel.tx_commit
end
What I noticed is this code works when the count of transformed messages is about 20 or so. But if the resulting transformed message count is around 3000, I came across what appears to be a timeout error:
/<path_to_bunny_gem>/gems/bunny-1.7.0/lib/bunny/consumer_work_pool.rb:55:in `join': Connection-level error: CHANNEL_ERROR - unexpected command while processing 'tx.commit' (Bunny::ChannelError)
from /<path_to_bunny_gem>/gems/bunny-1.7.0/lib/bunny/consumer_work_pool.rb:55:in `block in join'
from /<path_to_bunny_gem>/gems/bunny-1.7.0/lib/bunny/consumer_work_pool.rb:55:in `each'
from /<path_to_bunny_gem>/gems/bunny-1.7.0/lib/bunny/consumer_work_pool.rb:55:in `join'
from /<path_to_bunny_gem>/gems/bunny-1.7.0/lib/bunny/queue.rb:188:in `subscribe'
from /<my_project>/<file_that_contains_the_code_snippet>.rb:64:in `subscribe_one'
E, [2015-08-13T05:09:25.851870 #20135] ERROR --
#<Bunny::Session:70205855873340 ...>: Uncaught exception from consumer
#<Bunny::Consumer:70205864294760 @channel_id=2 @queue=files>
@consumer_tag=bunny-1439442530000-287206794908>: #<Bunny::ClientTimeout:
execution expired> @ /<path_to_bunny_gem/gems/bunny-1.7.0/lib/bunny
/concurrent/continuation_queue.rb:25:in `pop'
I was not able to fix this problem by changing the default continuation_timeout
of bunny from 4 seconds to 100 seconds. So in the bunny initialization code:
Bunny.new(<connection string>, continuation_timeout: 100000)
I don't quite understand the nature of this issue, and the meaning of this Connection-level error. Any suggestion or explanation to troubleshoot this further?
Source: (StackOverflow)
Stackers
I have a lot of messages in a RabbitMQ queue (running on localhost in my dev environment). The payload of the messages is a JSON string that I want to load directly into Elastic Search (also running on localhost for now). I wrote a quick ruby script to pull the messages from the queue and load them into ES, which is as follows :
#! /usr/bin/ruby
require 'bunny'
require 'json'
require 'elasticsearch'
# Connect to RabbitMQ to collect data
mq_conn = Bunny.new
mq_conn.start
mq_ch = mq_conn.create_channel
mq_q = mq_ch.queue("test.data")
# Connect to ElasticSearch to post the data
es = Elasticsearch::Client.new log: true
# Main loop - collect the message and stuff it into the db.
mq_q.subscribe do |delivery_info, metadata, payload|
begin
es.index index: "indexname",
type: "relationship",
body: payload
rescue
puts "Received #{payload} - #{delivery_info} - #{metadata}"
puts "Exception raised"
exit
end
end
mq_conn.close
There are around 4,000,000 messages in the queue.
When I run the script, I see a bunch of messages, say 30, being loaded into Elastic Search just fine. However, I see around 500 messages leaving the queue.
root@beep:~# rabbitmqctl list_queues
Listing queues ...
test.data 4333080
...done.
root@beep:~# rabbitmqctl list_queues
Listing queues ...
test.data 4332580
...done.
The script then silently exits without telling me an exception. The begin/rescue block never triggers an exception so I don't know why the script is finishing early or losing so many messages. Any clues how I should debug this next.
A
Source: (StackOverflow)
we are checking RabbitMQ for some Workflow use case.
So far, we created a test environment with ruby which fits our needs and seems to work fine.
The question I have, in case of being Rabbit newbies, is about best / good practise.
Lets define 3 QUEUES (just for the example)
- Q_DECISION
- Q_RIGHT
- Q_LEFT
every producer will post message inside Q_DECISION
There is a worker running on that queue which check some content of the body. In case of decision, the message / task has to be moved to Q_LEFT or Q_RIGHT.
We are storing message specific information properties.headers, so we repeat them as well as the body.
So far no problem, question is now about republishing:
q_decision.subscribe(:block => true) do |delivery_info, properties, body|
# ... more code here
if (decision_left)
q_left.publish(body, :headers => properties.headers, :persistent => true)
end
end
If we do re-publishing like above, do we loose something from the previous message?
There are a lot of attributes defined / stored in delivery_info
and properties
as well.
Do we have to re-post them or only the self created headers and body?
Source: (StackOverflow)
So I need to be able to create a single RabbitMQ connection and channel per Sidekiq thread because I run out of RabbitMQ connections if I don't and because the docs suggest it. The docs show how to do it with Unicorn:
before_fork do |server, worker|
$rabbitmq_connection.close if $rabbitmq_connection
end
after_fork do |server, worker|
# the following is *required* for Rails + "preload_app true",
defined?(ActiveRecord::Base) and
ActiveRecord::Base.establish_connection
$rabbitmq_connection = Bunny.new
$rabbitmq_connection.start
$rabbitmq_channel = $rabbitmq_connection.create_channel
end
end
Is it possible to do something similar for Sidekiq threads? Is there something I can do in Sidekiq.server_configure
? It looks like this is where Sidekiq starts a thread but I don't see anyway to hook into the start/stop?
Source: (StackOverflow)
Two queues are bound to a topic exchange with the following routing keys:
Queue A, bound with routing key pattern match *.foo
Queue B, bound with routing key pattern match *.bar
I'd like to add a third queue to this exchange that receives messages that are neither foo
messages nor bar
messages. If I bind this queue with a #
routing key, I naturally get all messages I need, but including foo
's and bar
's which I don't want.
Any way to route messages patching a pattern NOT *.foo
AND NOT *.bar
?
Source: (StackOverflow)
I am using RabbitMq for communication, and I would like to consume just one message and unsubscribe. How to do it in ruby bunny? My subscribe block is pretty easy:
queue.subscribe(block: true) do |delivery_info, properties, payload|
puts "[consumer] #{q.name} received a message: #{payload}"
end
Source: (StackOverflow)
I'm playing with Bunny, and trying to publish message to existing queue.
Unfortunately inside Bunny documentation are snipent for consumer creation but not for produser.
So for example when I try to bind to some exchange, it throws an error
PRECONDITION_FAILED - cannot redeclare exchange 'test' in vhost '/' with different type, durable, internal or autodelete value
Code:
conn = Bunny.new()
conn.start
ch = conn.create_channel
x = ch.direct("test")
Do you know why it's trying to redeclare.
Maybe I need first bind to a queue?
Thanks for any help.
Source: (StackOverflow)
I'm trying out the Bunny amqp client gem version 2.0.0. And while I was trying I found the following error :
W, [2015-08-03T07:36:21.913706 #9100] WARN -- #: Could not establish TCP connection to 10.223.19.94:5672:
/usr/local/rvm/gems/ruby-1.9.2-p320/gems/bunny-2.0.0/lib/bunny/session.rb:298:in rescue in start': Could not establish TCP connection to any of the configured hosts (Bunny::TCPConnectionFailedForAllHosts)
from /usr/local/rvm/gems/ruby-1.9.2-p320/gems/bunny-2.0.0/lib/bunny/session.rb:264:in
start'
from rabbit-client-test.rb:12:in <main>
Also, I noticed that the same error does not occur in the previous version of bunny 1.7.0. Is it something related to gem?
Source: (StackOverflow)