pika
Pure Python RabbitMQ/AMQP 0-9-1 client library
Introduction to Pika — pika 0.9.13 documentation
All the examples in pika tutorial end with the client invoking start_consuming()
, which starts an infinite loop. These examples work for me.
However, I do not want my client to run forever. Instead, I need my client to consume messages for some time, such as 15 minutes, then stop.
How do I accomplish that?
Source: (StackOverflow)
What is the easiest way to create a delay (or parking) queue with Python, Pika and RabbitMQ? I have seen an similar questions, but none for Python.
I find this an useful idea when designing applications, as it allows us to throttle messages that needs to be re-queued again.
There are always the possibility that you will receive more messages than you can handle, maybe the HTTP server is slow, or the database is under too much stress.
I also found it very useful when something went wrong in scenarios where there is a zero tolerance to losing messages, and while re-queuing messages that could not be handled may solve that. It can also cause problems where the message will be queued over and over again. Potentially causing performance issues, and log spam.
Source: (StackOverflow)
Similar questions all seem to be based around using a custom logger, I'm happy to just use the default / none at all. My pika python app runs and receives messages but after a few seconds crashes with No handlers could be found for logger "pika.adapters.blocking_connection"
, any ideas?
import pika
credentials = pika.PlainCredentials('xxx_apphb.com', 'xxx')
parameters = pika.ConnectionParameters('bunny.cloudamqp.com', 5672, 'xxx_apphb.com', credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.queue_declare('messages')
def message_received(channel, method, properties, body):
print "[x] Received %r" % (body)
channel.basic_consume(message_received, queue='messages', no_ack=True)
channel.start_consuming()
Fixed by adding:
import logging
logging.basicConfig()
Source: (StackOverflow)
It seems the longer I keep my rabbitmq server running, the more trouble I have with unacknowledged messages. I would love to requeue them. In fact there seems to be an amqp command to do this, but it only applies to the channel that your connection is using. I built a little pika script to at least try it out, but I am either missing something or it cannot be done this way (how about with rabbitmqctl?)
import pika
credentials = pika.PlainCredentials('***', '***')
parameters = pika.ConnectionParameters(host='localhost',port=5672,\
credentials=credentials, virtual_host='***')
def handle_delivery(body):
"""Called when we receive a message from RabbitMQ"""
print body
def on_connected(connection):
"""Called when we are fully connected to RabbitMQ"""
connection.channel(on_channel_open)
def on_channel_open(new_channel):
"""Called when our channel has opened"""
global channel
channel = new_channel
channel.basic_recover(callback=handle_delivery,requeue=True)
try:
connection = pika.SelectConnection(parameters=parameters,\
on_open_callback=on_connected)
# Loop so we can communicate with RabbitMQ
connection.ioloop.start()
except KeyboardInterrupt:
# Gracefully close the connection
connection.close()
# Loop until we're fully closed, will stop on its own
connection.ioloop.start()
Source: (StackOverflow)
I'm using Pika to process data from RabbitMQ.
As I seemed to run into different kind of problems I decided to write a small test application to see how I can handle disconnects.
I wrote this test app which does following:
- Connect to Broker, retry until successful
- When connected create a queue.
- Consume this queue and put result into a python Queue.Queue(0)
- Get item from Queue.Queue(0) and produce it back into the broker queue.
What I noticed were 2 issues:
- When I run my script from one host connecting to rabbitmq on another host (inside a vm) then this scripts exits on random moments without producing an error.
- When I run my script on the same host on which RabbitMQ is installed it runs fine and keeps running.
This might be explained because of network issues, packets dropped although I find the connection not really robust.
When the script runs locally on the RabbitMQ server and I kill the RabbitMQ then the script exits with error: "ERROR pika SelectConnection: Socket Error on 3: 104"
So it looks like I can't get the reconnection strategy working as it should be. Could someone have a look at the code so see what I'm doing wrong?
Thanks,
Jay
#!/bin/python
import logging
import threading
import Queue
import pika
from pika.reconnection_strategies import SimpleReconnectionStrategy
from pika.adapters import SelectConnection
import time
from threading import Lock
class Broker(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.logging = logging.getLogger(__name__)
self.to_broker = Queue.Queue(0)
self.from_broker = Queue.Queue(0)
self.parameters = pika.ConnectionParameters(host='sandbox',heartbeat=True)
self.srs = SimpleReconnectionStrategy()
self.properties = pika.BasicProperties(delivery_mode=2)
self.connection = None
while True:
try:
self.connection = SelectConnection(self.parameters, self.on_connected, reconnection_strategy=self.srs)
break
except Exception as err:
self.logging.warning('Cant connect. Reason: %s' % err)
time.sleep(1)
self.daemon=True
def run(self):
while True:
self.submitData(self.from_broker.get(block=True))
pass
def on_connected(self,connection):
connection.channel(self.on_channel_open)
def on_channel_open(self,new_channel):
self.channel = new_channel
self.channel.queue_declare(queue='sandbox', durable=True)
self.channel.basic_consume(self.processData, queue='sandbox')
def processData(self, ch, method, properties, body):
self.logging.info('Received data from broker')
self.channel.basic_ack(delivery_tag=method.delivery_tag)
self.from_broker.put(body)
def submitData(self,data):
self.logging.info('Submitting data to broker.')
self.channel.basic_publish(exchange='',
routing_key='sandbox',
body=data,
properties=self.properties)
if __name__ == '__main__':
format=('%(asctime)s %(levelname)s %(name)s %(message)s')
logging.basicConfig(level=logging.DEBUG, format=format)
broker=Broker()
broker.start()
try:
broker.connection.ioloop.start()
except Exception as err:
print err
Source: (StackOverflow)
I've been trying to figure out which form of connection i should use when using pika, I've got two alternatives as far as I understand.
Either the BlockingConnection
or the SelectConnection
, however I'm not really sure about the differences between these two (i.e. what is the BlockingConnection blocking? and more)
The documentation for pika
says that SelectConnection
is the preferred way to connect to rabbit since it provides "multiple event notification methods including select, epoll, kqueue and poll."
So I'm wondering what are the implications of these two different kinds of connections?
PS: I know I shouldn't put a tag in the title but in this case I think it does help to clarify the question.
Source: (StackOverflow)
I've got a python worker client that spins up a 10 workers which each hook onto a RabbitMQ queue. A bit like this:
#!/usr/bin/python
worker_count=10
def mqworker(queue, configurer):
connection = pika.BlockingConnection(pika.ConnectionParameters(host='mqhost'))
channel = connection.channel()
channel.queue_declare(queue=qname, durable=True)
channel.basic_consume(callback,queue=qname,no_ack=False)
channel.basic_qos(prefetch_count=1)
channel.start_consuming()
def callback(ch, method, properties, body):
doSomeWork();
ch.basic_ack(delivery_tag = method.delivery_tag)
if __name__ == '__main__':
for i in range(worker_count):
worker = multiprocessing.Process(target=mqworker)
worker.start()
The issue I have is that despite setting basic_qos on the channel, the first worker to start accepts all the messages off the queue, whilst the others sit there idle. I can see this in the rabbitmq interface, that even when I set worker_count
to be 1 and dump 50 messages on the queue, all 50 go into the 'unacknowledged' bucket, whereas I'd expect 1 to become unacknowledged and the other 49 to be ready.
Why isn't this working?
Source: (StackOverflow)
The producer module of my application is run by users who want to submit work to be done on a small cluster. It sends the subscriptions in JSON form through the RabbitMQ message broker.
I have tried several strategies, and the best so far is the following, which is still not fully working:
Each cluster machine runs a consumer module, which subscribes itself to the AMQP queue and issues a prefetch_count to tell the broker how many tasks it can run at once.
I was able to make it work using SelectConnection from the Pika AMQP library. Both consumer and producer start two channels, one connected to each queue. The producer sends requests on channel [A] and waits for responses in channel [B], and the consumer waits for requests on channel [A] and send responses on channel [B]. It seems, however, that when the consumer runs the callback that calculates the response, it blocks, so I have only one task executed at each consumer at each time.
What I need in the end:
- the consumer [A] subscribes his tasks (around 5k each time) to the cluster
- the broker dispatches N messages/requests for each consumer, where N is the number of concurrent tasks it can handle
- when a single task is finished, the consumer replies to the broker/producer with the result
- the producer receives the replies, update the computation status and, in the end, prints some reports
Restrictions:
- If another user submits work, all of his tasks will be queued after the previous user (I guess this is automatically true from the queue system, but I haven't thought about the implications on a threaded environment)
- Tasks have an order to be submitted, but the order they are replied is not important
UPDATE
I have studied a bit further and my actual problem seems to be that I use a simple function as callback to the pika's SelectConnection.channel.basic_consume() function. My last (unimplemented) idea is to pass a threading function, instead of a regular one, so the callback would not block and the consumer can keep listening.
Source: (StackOverflow)
I know that we can do this to list queue in a rabbitmq.
rabbitmqctl list_queues
but how can I do this via pika?
Source: (StackOverflow)
I've been working on getting some distributed tasks working via RabbitMQ.
I spent some time trying to get Celery to do what I wanted and couldn't make it work.
Then I tried using Pika and things just worked, flawlessly, and within minutes.
Is there anything I'm missing out on by using Pika instead of Celery?
Source: (StackOverflow)
I am looking to implement a priority queue with RabbitMQ. The mailing list recommends to use multiple queues, each queue representing a different priority level.
My question is, how do you poll multiple queues in some prioritized order using pika (or possibly some other python library)?
Source: (StackOverflow)
We're trying to set up a basic directed queue system where a producer will generate several tasks and one or more consumers will grab a task at a time, process it, and acknowledge the message.
The problem is, the processing can take 10-20 minutes, and we're not responding to messages at that time, causing the server to disconnect us.
Here's some pseudo code for our consumer:
#!/usr/bin/env python
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
print ' [*] Waiting for messages. To exit press CTRL+C'
def callback(ch, method, properties, body):
long_running_task(connection)
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
queue='task_queue')
channel.start_consuming()
After the first task completes, an exception is thrown somewhere deep inside of BlockingConnection, complaining that the socket was reset. In addition, the RabbitMQ logs show that the consumer was disconnected for not responding in time (why it resets the connection rather than sending a FIN is strange, but we won't worry about that).
We searched around a lot because we believed this was the normal use case for RabbitMQ (having a lot of long running tasks that should be split up among many consumers), but it seems like nobody else really had this issue. Finally we stumbled upon a thread where it was recommended to use heartbeats and to spawn the long_running_task()
in a separate thread.
So the code has become:
#!/usr/bin/env python
import pika
import time
import threading
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost',
heartbeat_interval=20))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
print ' [*] Waiting for messages. To exit press CTRL+C'
def thread_func(ch, method, body):
long_running_task(connection)
ch.basic_ack(delivery_tag = method.delivery_tag)
def callback(ch, method, properties, body):
threading.Thread(target=thread_func, args=(ch, method, body)).start()
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
queue='task_queue')
channel.start_consuming()
And this seems to work, but it's very messy. Are we sure that the ch
object is thread safe? In addition, imagine that long_running_task()
is using that connection parameter to add a task to a new queue (i.e. the first part of this long process is done, let's send the task on to the second part). So, the thread is using the connection
object. Is that thread safe?
More to the point, what's the preferred way of doing this? I feel like this is very messy and possibly not thread safe, so maybe we're not doing it right. Thanks!
Source: (StackOverflow)
I want process messages in few threads but i'm getting error during execute this code:
from __future__ import with_statement
import pika
import sys
from pika.adapters.blocking_connection import BlockingConnection
from pika import connection, credentials
import time
import threading
import random
from pika.adapters.select_connection import SelectConnection
from pika.connection import Connection
import traceback
def doWork(body, args, channel):
r = random.random()
time.sleep(r * 10)
try:
channel.basic_ack(delivery_tag=args.delivery_tag)
except :
traceback.print_exc()
auth = credentials.PlainCredentials(username="guest", password="guest")
params = connection.ConnectionParameters(host="localhost", credentials=auth)
conn = BlockingConnection(params)
channel = conn.channel()
while True:
time.sleep(0.03)
try:
method_frame, header_frame, body = channel.basic_get(queue="test_queue")
if method_frame.NAME == 'Basic.GetEmpty':
continue
t = threading.Thread(target=doWork, args=[body, method_frame, channel])
t.setDaemon(True)
t.start()
except Exception, e:
traceback.print_exc()
continue
Error desctiption:
Traceback (most recent call last):
File "C:\work\projects\mq\start.py", line 43, in
method_frame, header_frame, body = channel.basic_get(queue="test_queue")
File "C:\work\projects\mq\libs\pika\adapters\blocking_connection.py", line 318, in basic_get
self.basic_get_(self, self._on_basic_get, ticket, queue, no_ack)
File "C:\work\projects\mq\libs\pika\channel.py", line 469, in basic_get
no_ack=no_ack))
File "C:\work\projects\mq\libs\pika\adapters\blocking_connection.py", line 244, in send_method
self.connection.process_data_events()
File "C:\work\projects\mq\libs\pika\adapters\blocking_connection.py", line 94, in process_data_events
self._handle_read()
File "C:\work\projects\mq\libs\pika\adapters\base_connection.py", line 162, in _handle_read
self._on_data_available(data)
File "C:\work\projects\mq\libs\pika\connection.py", line 589, in _on_data_available
frame) # Args
File "C:\work\projects\mq\libs\pika\callback.py", line 124, in process
callback(*args, **keywords)
File "C:\work\projects\mq\libs\pika\adapters\blocking_connection.py", line 269, in _on_remote_close
frame.method.reply_text)
AMQPChannelError: (406, 'PRECONDITION_FAILED - unknown delivery tag 204')
Versions: pika 0.9.5, rabbitMQ 2.6.1
Source: (StackOverflow)
I've been trying to make use of RabbitMQ from within my gevent program by using the Pika library (monkey patched by gevent), gevent likes randomly throwing a timeout error.
What should I do? Is there another library I could use?
WARNING:root:Document not found, retrying primary.
Traceback (most recent call last):
...
File "/usr/lib/python2.7/dist-packages/pika/adapters/blocking_connection.py", line 32, in __init__
BaseConnection.__init__(self, parameters, None, reconnection_strategy)
File "/usr/lib/python2.7/dist-packages/pika/adapters/base_connection.py", line 50, in __init__
reconnection_strategy)
File "/usr/lib/python2.7/dist-packages/pika/connection.py", line 170, in __init__
self._connect()
File "/usr/lib/python2.7/dist-packages/pika/connection.py", line 228, in _connect
self.parameters.port or spec.PORT)
File "/usr/lib/python2.7/dist-packages/pika/adapters/blocking_connection.py", line 44, in _adapter_connect
self._handle_read()
File "/usr/lib/python2.7/dist-packages/pika/adapters/base_connection.py", line 151, in _handle_read
data = self.socket.recv(self._suggested_buffer_size)
File "/usr/lib/python2.7/dist-packages/gevent/socket.py", line 427, in recv
wait_read(sock.fileno(), timeout=self.timeout, event=self._read_event)
File "/usr/lib/python2.7/dist-packages/gevent/socket.py", line 169, in wait_read
switch_result = get_hub().switch()
File "/usr/lib/python2.7/dist-packages/gevent/hub.py", line 164, in switch
return greenlet.switch(self)
timeout: timed out
Source: (StackOverflow)
UPDATE Aug, 2015: For people wanting to use messaging, I currently would recommend zeromq. Could be used in addition to, or as a complete replacement of, pykka.
How can I listen to a RabbitMQ queue for messages and then forward them to an actor within Pykka?
Currently, when I try to do so, I get weird behavior and the system halts to a stop.
Here is how I have my actor implemented:
class EventListener(eventlet.EventletActor):
def __init__(self, target):
"""
:param pykka.ActorRef target: Where to send the queue messages.
"""
super(EventListener, self).__init__()
self.target = target
def on_start(self):
ApplicationService.listen_for_events(self.actor_ref)
And here is my method inside the ApplicationService
class that is supposed to check the queue for new messages:
@classmethod
def listen_for_events(cls, actor):
"""
Subscribe to messages and forward them to the given actor.
"""
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='test')
def callback(ch, method, properties, body):
message = pickle.loads(body)
actor.tell(message)
channel.basic_consume(callback, queue='test', no_ack=True)
channel.start_consuming()
It seems like start_consuming
is blocking indefinitely. Is there a way I can "poll" the queue periodically myself?
Source: (StackOverflow)