EzDevInfo.com

php-amqplib

AMQP library for PHP

How to use channel.assertQueue function from amqplib library for node.JS?

I am developing a messaging app using RabbitMQ, and Node.JS. I am using amqplib for this purpose. I am new to Node.JS and finding few difficulties in understanding the syntax of amqplib.. For e.g. there is a function for declaring queue, that is

channel.assertQueue([queue, [options, [function(err, ok) {...}]]]);

I have been referring This from last 2-3 days but still I am not clear about these -> err and ok. How to use these parameters?

An example would be much much appreciated.


Source: (StackOverflow)

Rabbitmq restores acknowledged messages after crash/restart

I'm using rabbit with PHP on osx.

Simple example. I've one durable queue with 50K durable messages. I run the consumer script. There is example of that script:

...
while ($this->run) {
    if ($message = $channel->basic_get("testq.{$this->id}")) {
        $channel->basic_ack($message->delivery_info['delivery_tag']);
        echo "{$message->delivery_info['routing_key']} (".$message->get('priority')."): {$message->body}\n";
        $sleep = 15000; // usecs
    } else {
        usleep($sleep);
    }
    ...
}
...

After running consumer script i see in rabbit webadmin that messages count from that queue is lowering.

Seems everything is OK.

If i kill php consumer script with CTRL+C and script till stopping was consumed for example 10K messages i see in that 40K messages left in queue.

It's OK.

But if i kill rabbitmq with kill -9 or CRLT+C, while consumer script is running (i want to simulate crash), after restart messages count in that queue restores to 50K ..

I don't understand why? where may be the problem?


Source: (StackOverflow)

Advertisements

RabbitMQ : binding from a DLX

I've searched for that information (including the docs) and I can't find it.

I'm using the latest version of php-amqplib with RabbitMQ v. 2.7.1. I have three queues and three exchanges :

// Declare the exchanges
$this->channel->exchange_declare(self::EXCHANGE_TO_PROCESS, 'direct', false, true, false, false, false);
$this->channel->exchange_declare(self::EXCHANGE_WAITING, 'direct', false, true, false, false, false);
$this->channel->exchange_declare(self::EXCHANGE_TO_CLEAN, 'direct', false, true, false, false, false);

// Messages in the to_process queue are sent to to_clean after 24 hours without being processed
$this->channel->queue_declare(self::QUEUE_TO_PROCESS, false, true, false, false, false, array(
    'x-dead-letter-exchange' => array('S', self::EXCHANGE_TO_CLEAN),
    'x-message-ttl' => array('I', 86400000), // 1 day in milli-seconds
));

// Messages in the waiting queue are sent to to_process after 5 minutes (wait period before retry)
$this->channel->queue_declare(self::QUEUE_WAITING, false, true, false, false, false, array(
    'x-dead-letter-exchange' => array('S', self::EXCHANGE_TO_PROCESS),
    'x-message-ttl' => array('I', 300000), // 5 minutes in milli-seconds
));

// Messages in the to_clean queue are kept until they are processed
$this->channel->queue_declare(self::QUEUE_TO_CLEAN, false, true, false, false, false);

// Bind the queues to the exchanges
$this->channel->queue_bind(self::QUEUE_TO_PROCESS, self::EXCHANGE_TO_PROCESS);
$this->channel->queue_bind(self::QUEUE_TO_CLEAN, self::EXCHANGE_TO_CLEAN);
$this->channel->queue_bind(self::QUEUE_WAITING, self::EXCHANGE_WAITING);

The behavior is pretty straightforward : messages are published into the EXCHANGE_TO_PROCESS. An external worker processes the message : if the processing goes A-OK, the message is simply ACK'ed and thus removed from the queue (this part works perfectly) ; if the processing goes wrong, the message is instead inserted into the EXCHANGE_WAITING where, after a TTL of 5 minutes, it is reinserted through DLX into the EXCHANGE_TO_PROCESS for re-processing. After the third failure, though, it is inserted into the EXCHANGE_TO_CLEAN where a cron job will come and clean up messages, log errors, etc.

The problem I've run into, however, is that the code clearly binds the QUEUE_WAITING to the EXCHANGE_WAITING (as expected), but when I look into the RabbitMQ management page, I notice that two queues are bound to that exchange, namely QUEUE_TO_PROCESS and QUEUE_WAITING, in that order. When the 5 minutes are over, the message then disappears. I'm not quite sure why.

All this to bring us to my questions : does the dead letter exchange implicitly bind the exchange in parameter to the queue? And : what could possibly be happening to my lost messages?

EDIT

I'm even more confused than I was. I've tried the following, very basic code :

    $this->channel->exchange_declare('exchangeA', 'fanout', false, true, false, false, false);
    $this->channel->exchange_declare('exchangeB', 'fanout', false, true, false, false, false);
    $this->channel->queue_declare('queueA', false, true, false, false, false, array(
        'x-dead-letter-exchange' => array('S', 'exchangeB'),
        'x-message-ttl' => array('I', 5000)
    ));
    $this->channel->queue_declare('queueB', false, true, false, false, false);
    $this->channel->queue_bind('queueA', 'exchangeA');
    $this->channel->queue_bind('queueB', 'exchangeB');

    $msg = new AMQPMessage('hello!');
    $this->channel->basic_publish($msg, 'exchangeA');

This creates two queues and two exchanges (I've seen them to fanout to avoid bothering with routing keys), binds queueA to exchangeA and queueB to exchangeB, setting a TTL on queueA and its DLX to exchangeB. Watching what happens in the management page, I see a message spending 5 seconds in queueA, as expected, and then the message disappears, just like in my more complex code above.


Source: (StackOverflow)

RabbitMQ (PHP) Is it possible to check if there is listener to a channel before posting?

using php-amqplib, is it possible to check if there is a listener to a channel before sending a message. The idea behind it is only to publish a message when there is some audience only.

I looked at the code and was not able to find something explaining it.

Thank you for your help.


Source: (StackOverflow)

AMQPRuntimeException with WordPress

I have WordPress 4.1.1 and when I try to create or update a post I get the following error:

PhpAmqpLib\Exception\AMQPRuntimeException thrown Error Connecting to server(111): Connection refused

It happens on my prod server, but not on local.

Someone knows why this is happening?


Source: (StackOverflow)

How to stop Rabbitmq Consumer after processing 10 messages?

I was running consumer from crontab and it processing all the messages one by one, Is there any way to consume only 10 or 20 messages and then stop to consumer.

So next time cron will call to consumer and same process will happen again.


Source: (StackOverflow)

RabbitMQ PHP AMQP Library - Get message headers

I have got a simple queueing system which, obviously, takes messages and publishes them.

However, due to a new development in the system, we now need to check for the x-death headers from the exchange, however, I can't seem any documentation on how to retrieve that through the PHP AMQP Library.

Anyone have any ideas on how to achieve this?


Source: (StackOverflow)

Re-consuming nack'ed messages when all the other work is done

I'm implementing RabbitMQ to perform some image editing operations on another server. Though, from time to time the request may arrive on that server before the source image is synced to it - in which case I would like to pop the message back in the the queue and process it after all other operations have completed.

However, calling basic.nack with the resubmit bit set makes my queue re-receive that message immediately - ahead of any operations that operations that can actually complete.

Currently I feel like I'm forced to implement some logic that just re-submits the original message to the exchange, but I'd like to avoid that. Both because the same message may have been successfully processed on another server (with it's own queue), and because I expect this to be so much of a common pattern that there must be better way.

(oh, I'm using php-amqplib in both consumer and server code)

Thanks!

Update: I solved my problem using Dead Letter Exchange, as suggested by zaq178miami

My current solution:

  • Declares a dead letter exchange $dead_letter_exchange on the original queue $worker
  • Declares a recovery exchange $recovery_exchange
  • Declares a queue $dead_queue, with a x-message-ttl of 5 seconds and x-dead-letter-exchange set to $recovery_exchange
  • Binds $dead_letter_queue to $dead_letter_exchange
  • And binds $worker to $recovery_exchange
  • $dead_letter_exchange and $recovery_exchange are generated names, based on the exchange I'm consuming from and the value of $worker

Making every message that gets nack'ed return to the worker only on that specific queue (server) after five seconds for a retry. I may still want to apply some logic that throws the message away after $n retries.

I'm still open to better ideas ;-)


Source: (StackOverflow)

Topic exchange ambiguity with RabbitMQ

I'm a little confused. I'm trying to implement topic exchanges and am not sure what is needed.

I want to have several routing keys and 1 topic exchange (the default amq.topic). My keys would be like:

  • customer.appA.created
  • customer.appB.created
  • customer.*.created

I want my queue(s) to be durable, but do I need 1 'customer' queue or 2 queues for appA and appB? I have my publisher figured out; connect, exchange declare, basic publish.

But I'm struggling with the consumers. Let's say I want to open 3 consoles, one for each of the aforementioned routing keys.
My current consumer has: connect, exchange declare, queue bind, basic consume. These are connected to a durable 'customer' queue. However my messages are being round-robin'ed to each console/consumer and not using the routing keys.

So my questions;

  1. For a typical topic exchange set up; how many queues do you need?
  2. Can my consumers get away with just exchange binding, or does it have to include queue interaction?
  3. Is it possible for a single message to appear in 2 consumers with topic exchange (or do you need fanout for that)?

Source: (StackOverflow)

How do I use the RabbitMQ delayed message queue from PHP?

I'm trying to use the Delayed Message Queue for RabbitMQ from PHP, but my messages are simply disappearing.

I'm declaring the exchange with the following code:

$this->channel->exchange_declare(
    'delay',
    'x-delayed-message',
    false,  /* passive, create if exchange doesn't exist */
    true,   /* durable, persist through server reboots */
    false,  /* autodelete */
    false,  /* internal */
    false,  /* nowait */
    ['x-delayed-type' => ['S', 'direct']]);

I'm binding the queue with this code:

$this->channel->queue_declare(
    $queueName,
    false,  /* Passive */
    true,   /* Durable */
    false,  /* Exclusive */
    false   /* Auto Delete */
);
$this->channel->queue_bind($queueName, "delay", $queueName);

And I'm publishing a message with this code:

$msg = new AMQPMessage(json_encode($msgData), [
    'delivery_mode' => 2,
    'x-delay' => 5000]);
$this->channel->basic_publish($msg, 'delay', $queueName);

But the message doesn't get delayed; it's still immediately delivered. What am I missing?


Source: (StackOverflow)

php-amqplib loop while there are messages only

There is a demo consumer:

It loops as long as the channel has callbacks registered

while (count($ch->callbacks)) {
  $ch->wait();
}

The thing is that I need to get not more than 100 messages from the queue for example. If there are only 80 for example it should return just 80 and exit loop.

Thanks


Source: (StackOverflow)

RabbitMQ missing channel reference in message delivery_info

I'm currently implementing some logic after getting message from rabbitMQ using basic_get without automatically sending ack for messages being received.

According to the tutorial here (Message acknowledgment section), I can't find the channel reference within the msg itself and send ack like mentioned in above link:

$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);

That is because in my msg delivery info array there is no such thing channel.
I wonder how could it be that it is missing.

Edit: code snippet of basic get

  $msg = $this->channel->basic_get($this->queueName, false);  

Here is a var_dump of my message:(Yellow part)
ampq messagel


Source: (StackOverflow)

RabbitMq: Replace duplicated messages

I'm using RabbitMq for submitting data for registered web hooks.

base info: If a contact is created in the system, the message is put in the queue and the consumer is sending later the hook data to the registered url.

To my question: Its possible, that a contact is updated in 5 seconds twice and that both messages are still in the queue. But I'd like, if the second message is queued, that the first message will be removed.

I know that i can't delete the message manually. But is it somehow possible to set an id on the message and if two messages with the same id are in the same queue, that the first one is automatically removed/replaced? That only one request is sent to the url. I know you can set a message id on the message self. But I have nothing found to replace the old one.

My PHP Code (simplified):

    $connection = new AMQPConnection('localhost', 5672, 'test', 'test');
    $channel = $connection->channel();
    $channel->queue_declare(self::QUEUE_NAME, false, true, false, false);

    $data = array(
        'model' => get_class($subject),
        'id' => $subject->getId(),
        'event' => $event->getName()
    );
    $messageProperties = array(
        'message_id' => get_class($subject) . '-' . $subject->getId()
    );
    $channel->basic_publish(new AMQPMessage(json_encode($data), $messageProperties), '', self::QUEUE_NAME);

    $channel->close();
    $connection->close();

Btw i'm using the the php amqplib https://github.com/videlalvaro/php-amqplib.

Thanks for the help Flo


Source: (StackOverflow)

PHP Ampqlib Consumer Dies: Error reading data. Received 0 instead of expected 1 bytes

OK this is part of a Symfony2 Commandline script. While the script is waiting it dies with this Exception.

[PhpAmqpLib\Exception\AMQPRuntimeException]
    Error reading data. Received 0 instead of expected 1 bytes

I have searched google and found mention of heartbeat and set_time_out, but when I have tried to set in the constructor, but instead when I change it from the defaults it dies faster.

This is how my script is setup. for the Amqplib aspect of it.

    // Get the configuration options for RabbitMQ
        $queueConfig = $this->getApplication()->getKernel()->getContainer()->getParameter("rabbit_mq");

    /**
     * Callback function for RabbitMQ
     * Everything in the callback function must remain in the call back function.
     */
    $callback = function($msg)
    {
      $msgObj = json_decode($msg->body, true);
      print_r($msgObj);
    };

    $connection = new AMQPConnection(
        $queueConfig['response']['host'],
        $queueConfig['response']['port'],
        $queueConfig['response']['username'],
        $queueConfig['response']['password'],
        '/'
      );

    $channel = $connection->channel();

    $queueName = 'myQueueName';

    $channel->basic_consume($queueName, '', false, true, false, false, $callback);

    while(count($channel->callbacks)) {
        $channel->wait();
    }

This is from AMQPStreamConnection.php which has the constructor. AMQPConnection extends AMQPStreamConnection

public function __construct($host, $port,
                            $user, $password,
                            $vhost="/",$insist=false,
                            $login_method="AMQPLAIN",
                            $login_response=null,
                            $locale="en_US",
                            $connection_timeout = 3,
                            $read_write_timeout = 3,
                            $context = null)

Thoughts on how to get rid of the error?


Source: (StackOverflow)

consumer connection is getting dropped everytime I queue something

I am new to RabbitMQ, and using RabbitMQ 3.2.4, Erlang R16B03 on a HA Cluster queuing. I am using phpamqplib downloaded from videlalvaro/php-amqplib.

I have evolved it with a pacemaker system that continuously sends an impulse to the connection to make it alive. It works!. The consumer connection is ok. Atleast it seems to be!

$exchange = $queue = populateQueueName($queue, $ha);
$ch->queue_declare($queue, false, true, false, false);
$ch->exchange_declare($exchange, 'fanout', false, true, false);

Clearly it shows that the auto-delete is True. Means the queue will be deleted as soon as last consumer consumes the queue.

I cannot understands now whats wrong I have made or this may be the default Rabbit feature, that the consumer connection has dropped exactly after I have queue them with two message.

Can anybody here explain Where may be the error?


Source: (StackOverflow)