EzDevInfo.com

pika

Pure Python RabbitMQ/AMQP 0-9-1 client library Introduction to Pika — pika 0.9.13 documentation

How to consume RabbitMQ messages via pika for some limited time?

All the examples in pika tutorial end with the client invoking start_consuming(), which starts an infinite loop. These examples work for me.

However, I do not want my client to run forever. Instead, I need my client to consume messages for some time, such as 15 minutes, then stop.

How do I accomplish that?


Source: (StackOverflow)

How to create a delayed queue in RabbitMQ?

What is the easiest way to create a delay (or parking) queue with Python, Pika and RabbitMQ? I have seen an similar questions, but none for Python.

I find this an useful idea when designing applications, as it allows us to throttle messages that needs to be re-queued again.

There are always the possibility that you will receive more messages than you can handle, maybe the HTTP server is slow, or the database is under too much stress.

I also found it very useful when something went wrong in scenarios where there is a zero tolerance to losing messages, and while re-queuing messages that could not be handled may solve that. It can also cause problems where the message will be queued over and over again. Potentially causing performance issues, and log spam.


Source: (StackOverflow)

Advertisements

No handlers could be found for logger "pika.adapters.blocking_connection"

Similar questions all seem to be based around using a custom logger, I'm happy to just use the default / none at all. My pika python app runs and receives messages but after a few seconds crashes with No handlers could be found for logger "pika.adapters.blocking_connection", any ideas?

import pika

credentials = pika.PlainCredentials('xxx_apphb.com', 'xxx')
parameters = pika.ConnectionParameters('bunny.cloudamqp.com', 5672, 'xxx_apphb.com', credentials)

connection = pika.BlockingConnection(parameters)
channel = connection.channel()

channel.queue_declare('messages')

def message_received(channel, method, properties, body):
    print "[x] Received %r" % (body)

channel.basic_consume(message_received, queue='messages', no_ack=True)

channel.start_consuming()

Fixed by adding:

import logging
logging.basicConfig()

Source: (StackOverflow)

How can I recover unacknowledged AMQP messages from other channels than my connection's own?

It seems the longer I keep my rabbitmq server running, the more trouble I have with unacknowledged messages. I would love to requeue them. In fact there seems to be an amqp command to do this, but it only applies to the channel that your connection is using. I built a little pika script to at least try it out, but I am either missing something or it cannot be done this way (how about with rabbitmqctl?)

import pika

credentials = pika.PlainCredentials('***', '***')
parameters = pika.ConnectionParameters(host='localhost',port=5672,\
    credentials=credentials, virtual_host='***')

def handle_delivery(body):
    """Called when we receive a message from RabbitMQ"""
    print body

def on_connected(connection):
    """Called when we are fully connected to RabbitMQ"""
    connection.channel(on_channel_open)    

def on_channel_open(new_channel):
    """Called when our channel has opened"""
    global channel
    channel = new_channel
    channel.basic_recover(callback=handle_delivery,requeue=True)    

try:
    connection = pika.SelectConnection(parameters=parameters,\
        on_open_callback=on_connected)    

    # Loop so we can communicate with RabbitMQ
    connection.ioloop.start()
except KeyboardInterrupt:
    # Gracefully close the connection
    connection.close()
    # Loop until we're fully closed, will stop on its own
    connection.ioloop.start()

Source: (StackOverflow)

RabbitMQ, Pika and reconnection strategy

I'm using Pika to process data from RabbitMQ. As I seemed to run into different kind of problems I decided to write a small test application to see how I can handle disconnects.

I wrote this test app which does following:

  1. Connect to Broker, retry until successful
  2. When connected create a queue.
  3. Consume this queue and put result into a python Queue.Queue(0)
  4. Get item from Queue.Queue(0) and produce it back into the broker queue.

What I noticed were 2 issues:

  1. When I run my script from one host connecting to rabbitmq on another host (inside a vm) then this scripts exits on random moments without producing an error.
  2. When I run my script on the same host on which RabbitMQ is installed it runs fine and keeps running.

This might be explained because of network issues, packets dropped although I find the connection not really robust.

When the script runs locally on the RabbitMQ server and I kill the RabbitMQ then the script exits with error: "ERROR pika SelectConnection: Socket Error on 3: 104"

So it looks like I can't get the reconnection strategy working as it should be. Could someone have a look at the code so see what I'm doing wrong?

Thanks,

Jay

#!/bin/python
import logging
import threading
import Queue
import pika
from pika.reconnection_strategies import SimpleReconnectionStrategy
from pika.adapters import SelectConnection
import time
from threading import Lock

class Broker(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
        self.logging = logging.getLogger(__name__)
        self.to_broker = Queue.Queue(0)
        self.from_broker = Queue.Queue(0)
        self.parameters = pika.ConnectionParameters(host='sandbox',heartbeat=True)
        self.srs = SimpleReconnectionStrategy()
        self.properties = pika.BasicProperties(delivery_mode=2)

        self.connection = None
        while True:
            try:
                self.connection = SelectConnection(self.parameters, self.on_connected,  reconnection_strategy=self.srs)
                break
            except Exception as err:
                self.logging.warning('Cant connect. Reason: %s' % err)
                time.sleep(1)

        self.daemon=True
    def run(self):
        while True:
            self.submitData(self.from_broker.get(block=True))
        pass
    def on_connected(self,connection):
        connection.channel(self.on_channel_open)
    def on_channel_open(self,new_channel):
        self.channel = new_channel
        self.channel.queue_declare(queue='sandbox', durable=True)
        self.channel.basic_consume(self.processData, queue='sandbox')    
    def processData(self, ch, method, properties, body):
        self.logging.info('Received data from broker')
        self.channel.basic_ack(delivery_tag=method.delivery_tag)
        self.from_broker.put(body)
    def submitData(self,data):
        self.logging.info('Submitting data to broker.')
        self.channel.basic_publish(exchange='',
                    routing_key='sandbox',
                    body=data,
                    properties=self.properties)
if __name__ == '__main__':
    format=('%(asctime)s %(levelname)s %(name)s %(message)s')
    logging.basicConfig(level=logging.DEBUG, format=format)
    broker=Broker()
    broker.start()
    try:
        broker.connection.ioloop.start()
    except Exception as err:
        print err

Source: (StackOverflow)

Which form of connection to use with pika

I've been trying to figure out which form of connection i should use when using pika, I've got two alternatives as far as I understand.

Either the BlockingConnection or the SelectConnection, however I'm not really sure about the differences between these two (i.e. what is the BlockingConnection blocking? and more)

The documentation for pika says that SelectConnection is the preferred way to connect to rabbit since it provides "multiple event notification methods including select, epoll, kqueue and poll."

So I'm wondering what are the implications of these two different kinds of connections?

PS: I know I shouldn't put a tag in the title but in this case I think it does help to clarify the question.


Source: (StackOverflow)

Pika + RabbitMQ: setting basic_qos to prefetch=1 still appears to consume all messages in the queue

I've got a python worker client that spins up a 10 workers which each hook onto a RabbitMQ queue. A bit like this:

#!/usr/bin/python
worker_count=10

def mqworker(queue, configurer):
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='mqhost'))
    channel = connection.channel()
    channel.queue_declare(queue=qname, durable=True)
    channel.basic_consume(callback,queue=qname,no_ack=False)
    channel.basic_qos(prefetch_count=1)
    channel.start_consuming()


def callback(ch, method, properties, body):
    doSomeWork();
    ch.basic_ack(delivery_tag = method.delivery_tag)

if __name__ == '__main__':
    for i in range(worker_count):
        worker = multiprocessing.Process(target=mqworker)
        worker.start()

The issue I have is that despite setting basic_qos on the channel, the first worker to start accepts all the messages off the queue, whilst the others sit there idle. I can see this in the rabbitmq interface, that even when I set worker_count to be 1 and dump 50 messages on the queue, all 50 go into the 'unacknowledged' bucket, whereas I'd expect 1 to become unacknowledged and the other 49 to be ready.

Why isn't this working?


Source: (StackOverflow)

What's the best pattern to design an asynchronous RPC application using Python, Pika and AMQP?

The producer module of my application is run by users who want to submit work to be done on a small cluster. It sends the subscriptions in JSON form through the RabbitMQ message broker.

I have tried several strategies, and the best so far is the following, which is still not fully working:

Each cluster machine runs a consumer module, which subscribes itself to the AMQP queue and issues a prefetch_count to tell the broker how many tasks it can run at once.

I was able to make it work using SelectConnection from the Pika AMQP library. Both consumer and producer start two channels, one connected to each queue. The producer sends requests on channel [A] and waits for responses in channel [B], and the consumer waits for requests on channel [A] and send responses on channel [B]. It seems, however, that when the consumer runs the callback that calculates the response, it blocks, so I have only one task executed at each consumer at each time.

What I need in the end:

  1. the consumer [A] subscribes his tasks (around 5k each time) to the cluster
  2. the broker dispatches N messages/requests for each consumer, where N is the number of concurrent tasks it can handle
  3. when a single task is finished, the consumer replies to the broker/producer with the result
  4. the producer receives the replies, update the computation status and, in the end, prints some reports

Restrictions:

  • If another user submits work, all of his tasks will be queued after the previous user (I guess this is automatically true from the queue system, but I haven't thought about the implications on a threaded environment)
  • Tasks have an order to be submitted, but the order they are replied is not important

UPDATE

I have studied a bit further and my actual problem seems to be that I use a simple function as callback to the pika's SelectConnection.channel.basic_consume() function. My last (unimplemented) idea is to pass a threading function, instead of a regular one, so the callback would not block and the consumer can keep listening.


Source: (StackOverflow)

Is there any way to list queues in a rabbitmq via pika?

I know that we can do this to list queue in a rabbitmq.

rabbitmqctl list_queues

but how can I do this via pika?


Source: (StackOverflow)

RabbitMQ: What Does Celery Offer That Pika Doesn't?

I've been working on getting some distributed tasks working via RabbitMQ.

I spent some time trying to get Celery to do what I wanted and couldn't make it work.

Then I tried using Pika and things just worked, flawlessly, and within minutes.

Is there anything I'm missing out on by using Pika instead of Celery?


Source: (StackOverflow)

How to Implement Priority Queues in RabbitMQ/pika

I am looking to implement a priority queue with RabbitMQ. The mailing list recommends to use multiple queues, each queue representing a different priority level.

My question is, how do you poll multiple queues in some prioritized order using pika (or possibly some other python library)?


Source: (StackOverflow)

Handling long running tasks in pika / RabbitMQ

We're trying to set up a basic directed queue system where a producer will generate several tasks and one or more consumers will grab a task at a time, process it, and acknowledge the message.

The problem is, the processing can take 10-20 minutes, and we're not responding to messages at that time, causing the server to disconnect us.

Here's some pseudo code for our consumer:

#!/usr/bin/env python
import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)
print ' [*] Waiting for messages. To exit press CTRL+C'

def callback(ch, method, properties, body):
    long_running_task(connection)
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
                      queue='task_queue')

channel.start_consuming()

After the first task completes, an exception is thrown somewhere deep inside of BlockingConnection, complaining that the socket was reset. In addition, the RabbitMQ logs show that the consumer was disconnected for not responding in time (why it resets the connection rather than sending a FIN is strange, but we won't worry about that).

We searched around a lot because we believed this was the normal use case for RabbitMQ (having a lot of long running tasks that should be split up among many consumers), but it seems like nobody else really had this issue. Finally we stumbled upon a thread where it was recommended to use heartbeats and to spawn the long_running_task() in a separate thread.

So the code has become:

#!/usr/bin/env python
import pika
import time
import threading

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost',
        heartbeat_interval=20))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)
print ' [*] Waiting for messages. To exit press CTRL+C'

def thread_func(ch, method, body):
    long_running_task(connection)
    ch.basic_ack(delivery_tag = method.delivery_tag)

def callback(ch, method, properties, body):
    threading.Thread(target=thread_func, args=(ch, method, body)).start()

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
                      queue='task_queue')

channel.start_consuming()

And this seems to work, but it's very messy. Are we sure that the ch object is thread safe? In addition, imagine that long_running_task() is using that connection parameter to add a task to a new queue (i.e. the first part of this long process is done, let's send the task on to the second part). So, the thread is using the connection object. Is that thread safe?

More to the point, what's the preferred way of doing this? I feel like this is very messy and possibly not thread safe, so maybe we're not doing it right. Thanks!


Source: (StackOverflow)

Error "unknown delivery tag" occurs when i try ack messages to RabbitMQ using pika (python)

I want process messages in few threads but i'm getting error during execute this code:

from __future__ import with_statement
import pika
import sys
from pika.adapters.blocking_connection import BlockingConnection
from pika import connection, credentials
import time
import threading
import random
from pika.adapters.select_connection import SelectConnection
from pika.connection import Connection
import traceback


def doWork(body, args, channel):


    r = random.random()
    time.sleep(r * 10)
    try:        
        channel.basic_ack(delivery_tag=args.delivery_tag)

    except :
        traceback.print_exc()


auth = credentials.PlainCredentials(username="guest", password="guest")
params = connection.ConnectionParameters(host="localhost", credentials=auth)
conn = BlockingConnection(params)
channel = conn.channel()


while True:

    time.sleep(0.03)    
    try:

        method_frame, header_frame, body = channel.basic_get(queue="test_queue")
        if method_frame.NAME == 'Basic.GetEmpty':
            continue        

        t = threading.Thread(target=doWork, args=[body, method_frame, channel])
        t.setDaemon(True)
        t.start()

    except Exception, e:
        traceback.print_exc()
        continue

Error desctiption:

Traceback (most recent call last):
  File "C:\work\projects\mq\start.py", line 43, in 
    method_frame, header_frame, body = channel.basic_get(queue="test_queue")
  File "C:\work\projects\mq\libs\pika\adapters\blocking_connection.py", line 318, in basic_get
    self.basic_get_(self, self._on_basic_get, ticket, queue, no_ack)
  File "C:\work\projects\mq\libs\pika\channel.py", line 469, in basic_get
    no_ack=no_ack))
  File "C:\work\projects\mq\libs\pika\adapters\blocking_connection.py", line 244, in send_method
    self.connection.process_data_events()
  File "C:\work\projects\mq\libs\pika\adapters\blocking_connection.py", line 94, in process_data_events
    self._handle_read()
  File "C:\work\projects\mq\libs\pika\adapters\base_connection.py", line 162, in _handle_read
    self._on_data_available(data)
  File "C:\work\projects\mq\libs\pika\connection.py", line 589, in _on_data_available
    frame)                 # Args
  File "C:\work\projects\mq\libs\pika\callback.py", line 124, in process
    callback(*args, **keywords)
  File "C:\work\projects\mq\libs\pika\adapters\blocking_connection.py", line 269, in _on_remote_close
    frame.method.reply_text)
AMQPChannelError: (406, 'PRECONDITION_FAILED - unknown delivery tag 204')

Versions: pika 0.9.5, rabbitMQ 2.6.1


Source: (StackOverflow)

Random timeout error with Pika and gevent

I've been trying to make use of RabbitMQ from within my gevent program by using the Pika library (monkey patched by gevent), gevent likes randomly throwing a timeout error.

What should I do? Is there another library I could use?

WARNING:root:Document not found, retrying primary.
Traceback (most recent call last):
  ...
  File "/usr/lib/python2.7/dist-packages/pika/adapters/blocking_connection.py", line 32, in __init__
    BaseConnection.__init__(self, parameters, None, reconnection_strategy)
  File "/usr/lib/python2.7/dist-packages/pika/adapters/base_connection.py", line 50, in __init__
    reconnection_strategy)
  File "/usr/lib/python2.7/dist-packages/pika/connection.py", line 170, in __init__
    self._connect()
  File "/usr/lib/python2.7/dist-packages/pika/connection.py", line 228, in _connect
    self.parameters.port or  spec.PORT)
  File "/usr/lib/python2.7/dist-packages/pika/adapters/blocking_connection.py", line 44, in _adapter_connect
    self._handle_read()
  File "/usr/lib/python2.7/dist-packages/pika/adapters/base_connection.py", line 151, in _handle_read
    data = self.socket.recv(self._suggested_buffer_size)
  File "/usr/lib/python2.7/dist-packages/gevent/socket.py", line 427, in recv
    wait_read(sock.fileno(), timeout=self.timeout, event=self._read_event)
  File "/usr/lib/python2.7/dist-packages/gevent/socket.py", line 169, in wait_read
    switch_result = get_hub().switch()
  File "/usr/lib/python2.7/dist-packages/gevent/hub.py", line 164, in switch
    return greenlet.switch(self)
timeout: timed out

Source: (StackOverflow)

How to send RabbitMQ messages to Pykka actor?

UPDATE Aug, 2015: For people wanting to use messaging, I currently would recommend zeromq. Could be used in addition to, or as a complete replacement of, pykka.

How can I listen to a RabbitMQ queue for messages and then forward them to an actor within Pykka?

Currently, when I try to do so, I get weird behavior and the system halts to a stop.

Here is how I have my actor implemented:

class EventListener(eventlet.EventletActor):
    def __init__(self, target):
        """
        :param pykka.ActorRef target: Where to send the queue messages.
        """
        super(EventListener, self).__init__()

        self.target = target

    def on_start(self):
        ApplicationService.listen_for_events(self.actor_ref)

And here is my method inside the ApplicationService class that is supposed to check the queue for new messages:

@classmethod
def listen_for_events(cls, actor):
    """
    Subscribe to messages and forward them to the given actor.
    """    
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()
    channel.queue_declare(queue='test')
    def callback(ch, method, properties, body):
        message = pickle.loads(body)
        actor.tell(message)

    channel.basic_consume(callback, queue='test', no_ack=True)
    channel.start_consuming()            

It seems like start_consuming is blocking indefinitely. Is there a way I can "poll" the queue periodically myself?


Source: (StackOverflow)