kombu
Messaging library for Python.
Kombu Documentation — Kombu 3.0.26 documentation
I'm looking for articles and references that give an overview of 'queueing' (I'm probably not even using the right term here). I'm hoping for an introductory styled guide through a world of Redis, RabbitMQ, Celery, Kombu, and whatever other components exist that I haven't read about yet, and how they fit together.
My problem is I need to queue up background tasks for issued by my Django website, and every blog and article I read recommend different solutions.
Source: (StackOverflow)
I want to use Amazon SQS as broker backed of Celery. There’s the SQS transport implementation for Kombu, which Celery depends on. However there is not enough documentation for using it, so I cannot find how to configure SQS on Celery. Is there somebody that had succeeded to configure SQS on Celery?
Source: (StackOverflow)
When I run syncdb, I notice a lot of tables created like:
- djcelery_crontabschedule
- ...
- djcelery_taskstate
django-kombu is providing the transport, so it can't be related to the actual queue. Even when I run tasks, I still see nothing populated in these tables. What are these tables used for? Monitoring purposes only -- if I enable it?
If so, is it also true that if I do a lookup of AsyncResult(), I'm guessing that is actually looking up the task result via the django-kombu tables instead of djcelery?
Thanks.
Source: (StackOverflow)
After upgrading from celery 2.4.5 i have started having celery randomly shutdown.
I am using celery 3.0.12, boto 2.6 and amazon sqs and django 1.2.7 all this on a centOS machine (pip freeze dump at the bottom)
i am running
service celerybeat start
service celeryd start
A few seconds after i start celery it stops(shutdown) and if i look into one of the celery logs i always see this:
[2012-12-31 10:13:40,275: INFO/MainProcess] Task patrol.tasks.test[270f1558-bcc2-441b-8961 e1f21a2dbd27] succeeded in 0.318082094193s: None
[2012-12-31 10:13:40,424: INFO/MainProcess] child process calling self.run()
[2012-12-31 10:13:40,428: INFO/MainProcess] Got task from broker: patrol.tasks.myTask[d9a5ab26-71ca-448b-a4da-40315570f219]
[2012-12-31 10:13:40,666: INFO/MainProcess] Got task from broker: tasks.test[99edb7e2-caff-4892-a95b-c18a9d7f5c51]
[2012-12-31 10:13:41,114: WARNING/MainProcess] Restoring 2 unacknowledged message(s).
[2012-12-31 10:13:41,115: WARNING/MainProcess] UNABLE TO RESTORE 2 MESSAGES: (TypeError('<boto.sqs.message.Message instance at 0x3269758> is not JSON serializable',), TypeError('<boto.sqs.message.Message instance at 0x32697e8> is not JSON serializable',))
[2012-12-31 10:13:41,116: WARNING/MainProcess] EMERGENCY DUMP STATE TO FILE -> /tmp/tmppO4Bbp <-
[2012-12-31 10:13:41,116: WARNING/MainProcess] Cannot pickle state: TypeError('a class that defines __slots__ without defining __getstate__ cannot be pickled',). Fallback to pformat.
I use use low values for maxtaskperchild to recreate the shutdown fast. if i give a higher value then it takes longer before shutdown occur.
EDIT
While trying to isolate the problem i removed all periodic tasks. and now i have only one periodic task and one task which basicly do nothing and still i can reproduce the bug every time.
@task
def myTask():
print 1
return
class test(PeriodicTask):
run_every = datetime.timedelta(seconds=3)
def run(self, **kwargs):
myTask.delay()
print '2'
my /init.d/celeryd
celeryd
my /default/celeryd
# Name of nodes to start, here we have a single node
# or we could have three nodes:
CELERYD_NODES="w1 w2"
CELERYD_LOG_LEVEL="INFO"
# Where to chdir at start.
CELERYD_CHDIR="/var/myproject"
# How to call "manage.py celeryd_multi"
CELERYD_MULTI="python $CELERYD_CHDIR/manage.py celeryd_multi"
# How to call "manage.py celeryctl"
CELERYCTL="python $CELERYD_CHDIR/manage.py celeryctl"
MAXTASKPERCHILD=2 # this is low on purpose to recreate the shutdown fast
CELERY_CONC=5
EXPRESS_CONC=2
# Extra arguments to celeryd
CELERYD_OPTS="-Q:w1 celery,backup -c:w1 $CELERY_CONC -Q:w2 express -c:w2 $EXPRESS_CONC --time-limit=3600 --maxtasksperchild=$MAXTASKPERCHILD -E"
# Name of the celery config module.
CELERY_CONFIG_MODULE="celeryconfig"
# %n will be replaced with the nodename.
CELERYD_LOG_FILE="/var/log/celeryd/%n.log"
CELERYD_PID_FILE="/var/run/celeryd/%n.pid"
# Name of the projects settings module.
export DJANGO_SETTINGS_MODULE="settings"
# Path to celerybeat
CELERYBEAT="python $CELERYD_CHDIR/manage.py celerybeat"
# Extra arguments to celerybeat. This is a file that will get
# created for scheduled tasks. It's generated automatically
# when Celerybeat starts.
CELERYBEAT_OPTS="--schedule=/var/run/celerybeat-schedule"
# Log level. Can be one of DEBUG, INFO, WARNING, ERROR or CRITICAL.
CELERYBEAT_LOG_LEVEL="INFO"
# Log file locations
CELERYBEAT_LOGFILE="/var/log/celeryd/celerybeat.log"
CELERYBEAT_PIDFILE="/var/run/celeryd/celerybeat.pid"
my pip freeze
Django==1.2.7
M2Crypto==0.20.2
MySQL-python==1.2.3c1
amqp==1.0.6
amqplib==1.0.2
anyjson==0.3.3
billiard==2.7.3.19
boto==2.1.1
celery==3.0.12
certifi==0.0.6
distribute==0.6.10
django-celery==3.0.11
django-kombu==0.9.4
django-picklefield==0.3.0
ghettoq==0.4.5
importlib==1.0.2
iniparse==0.3.1
ipython==0.12
kombu==2.5.4
lxml==2.3.4
mixpanel-celery==0.5.0
netaddr==0.7.6
numpy==1.6.2
odict==1.4.4
ordereddict==1.1
pycrypto==2.6
pycurl==7.19.0
pygooglechart==0.3.0
pygpgme==0.1
python-dateutil==1.5
python-memcached==1.48
pytz==2012h
requests==0.9.0
six==1.2.0
urlgrabber==3.9.1
yum-metadata-parser==1.1.2
Source: (StackOverflow)
I have a REST API written in Django, with and endpoint that queues a celery task when posting to it. The response contains the task id which I'd like to use to test that the task is created and get the result. So, I'd like to do something like:
def test_async_job():
response = self.client.post("/api/jobs/", some_test_data, format="json")
task_id = response.data['task_id']
result = my_task.AsyncResult(task_id).get()
self.assertEquals(result, ...)
I obviously don't want to have to run a celery worker to run the unit tests, I expect to mock it somehow. I can't use CELERY_ALWAYS_EAGER because that seems to bypass the broker altogether, preventing me to use AsyncResult to get the task by its id (as stated here).
Going through celery and kombu docs, I've found that there is an in-memory transport for unit tests, that would do what I'm looking for. I tried overriding the BROKER_URL
setting to use it on the tests:
@override_settings(BROKER_URL='memory://')
def test_async_job():
But the behavior is the same as with the ampq broker: it blocks the test waiting for the result. Any Idea how am I supposed to configure this broker to get it working in the tests?
Source: (StackOverflow)
Is it possible for a kombu producer to queue a message on rabbitmq to be processed by celery workers? It seems the celery workers do not understand the message put by the kombu producer.
Source: (StackOverflow)
I run an online game with Django 1.6 and Celery 3.1.11. Kombu is 3.0.15.
Recently I had problems with Celery and DST, so I decided to run the whole site on UTC and save myself the bother of worrying about timezones.
The relevant parts of my settings.py:
TIME_ZONE = 'UTC'
USE_TZ = True
....
CELERY_ENABLE_UTC = True
CELERY_TIMEZONE = 'UTC'
Now, whenever I send any delayed task, I see a bunch of RuntimeWarnings that complain about naive datetimes. I went into my settings.py and turned this into an exception, and this is the traceback that resulted:
[2014-04-18 15:03:49,748: INFO/MainProcess] Received task: sometask[sometaskid] eta:[2014-04-19 00:33:32.410032+00:00]
[2014-04-18 15:03:50,130: ERROR/MainProcess] Error in timer: RuntimeWarning(u'DateTimeField TaskState.eta received a naive datetime (2014-04-18 17:50:19.547046) while time zone support is active.',)
Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/kombu/async/timer.py", line 171, in apply_entry
entry()
File "/usr/local/lib/python2.7/dist-packages/kombu/async/timer.py", line 64, in __call__
return self.fun(*self.args, **self.kwargs)
File "/usr/local/lib/python2.7/dist-packages/kombu/async/timer.py", line 132, in _reschedules
return fun(*args, **kwargs)
File "/usr/local/lib/python2.7/dist-packages/celery/events/snapshot.py", line 73, in capture
self.state.freeze_while(self.shutter, clear_after=self.clear_after)
File "/usr/local/lib/python2.7/dist-packages/celery/events/state.py", line 421, in freeze_while
return fun(*args, **kwargs)
File "/usr/local/lib/python2.7/dist-packages/celery/events/snapshot.py", line 70, in shutter
self.on_shutter(self.state)
File "/usr/local/lib/python2.7/dist-packages/djcelery/snapshot.py", line 145, in on_shutter
_handle_tasks()
File "/usr/local/lib/python2.7/dist-packages/djcelery/snapshot.py", line 139, in _handle_tasks
self.handle_task(task)
File "/usr/local/lib/python2.7/dist-packages/djcelery/snapshot.py", line 105, in handle_task
task_id=uuid, defaults=defaults)
File "/usr/local/lib/python2.7/dist-packages/djcelery/snapshot.py", line 128, in update_task
obj.save()
File "/usr/local/lib/python2.7/dist-packages/djcelery/models.py", line 358, in save
super(TaskState, self).save(*args, **kwargs)
File "/usr/local/lib/python2.7/dist-packages/django/db/models/base.py", line 545, in save
force_update=force_update, update_fields=update_fields)
File "/usr/local/lib/python2.7/dist-packages/django/db/models/base.py", line 573, in save_base
updated = self._save_table(raw, cls, force_insert, force_update, using, update_fields)
File "/usr/local/lib/python2.7/dist-packages/django/db/models/base.py", line 635, in _save_table
forced_update)
File "/usr/local/lib/python2.7/dist-packages/django/db/models/base.py", line 679, in _do_update
return filtered._update(values) > 0
File "/usr/local/lib/python2.7/dist-packages/django/db/models/query.py", line 507, in _update
return query.get_compiler(self.db).execute_sql(None)
File "/usr/local/lib/python2.7/dist-packages/django/db/models/sql/compiler.py", line 975, in execute_sql
cursor = super(SQLUpdateCompiler, self).execute_sql(result_type)
File "/usr/local/lib/python2.7/dist-packages/django/db/models/sql/compiler.py", line 771, in execute_sql
sql, params = self.as_sql()
File "/usr/local/lib/python2.7/dist-packages/django/db/models/sql/compiler.py", line 940, in as_sql
val = field.get_db_prep_save(val, connection=self.connection)
File "/usr/local/lib/python2.7/dist-packages/django/db/models/fields/__init__.py", line 353, in get_db_prep_save
prepared=False)
File "/usr/local/lib/python2.7/dist-packages/django/db/models/fields/__init__.py", line 914, in get_db_prep_value
value = self.get_prep_value(value)
File "/usr/local/lib/python2.7/dist-packages/django/db/models/fields/__init__.py", line 906, in get_prep_value
RuntimeWarning)
RuntimeWarning: DateTimeField TaskState.eta received a naive datetime (2014-04-18 17:50:19.547046) while time zone support is active.
As you can see, none of the traceback is due to my code, so I do not know how to proceed. I could merely leave the warnings in (if I recall correctly, Celery defaults time offsets to the CELERY_TIMEZONE
in settings.py, and this is what I want anyway) but my OCD is screaming out at me to get this fixed. Plus, I can't ctrl-F to find any warning that are due to my code.
Any advice?
Source: (StackOverflow)
I am following along this tutorial to get celery and django running on heroku.
However, I get this error in my logs when I put in the specified code for the worker:
2011-12-22T05:31:56+00:00 heroku[web.1]: Starting process with command `python canada/manage.py run_gunicorn -b "0.0.0.0:47336" -w 3`
2011-12-22T05:31:56+00:00 app[web.1]: Unexpected error: (<type 'exceptions.NameError'>, NameError("name 'DATABASES' is not defined",), <traceback object at 0x11a9560>)
2011-12-22T05:31:56+00:00 app[web.1]: Traceback (most recent call last):
2011-12-22T05:31:56+00:00 app[web.1]: File "canada/manage.py", line 11, in <module>
2011-12-22T05:31:56+00:00 app[web.1]: import settings
2011-12-22T05:31:56+00:00 app[web.1]: File "/app/canada/settings.py", line 51, in <module>
2011-12-22T05:31:56+00:00 app[web.1]: CELERY_RESULT_DBURI = DATABASES['default']
2011-12-22T05:31:56+00:00 app[web.1]: NameError: name 'DATABASES' is not defined
2011-12-22T05:31:57+00:00 heroku[slugc]: Slug compilation finished
2011-12-22T05:31:57+00:00 heroku[web.1]: State changed from starting to crashed
2011-12-22T05:31:58+00:00 heroku[web.1]: Process exited
My settings.py looks like
import djcelery
djcelery.setup_loader()
BROKER_BACKEND = "djkombu.transport.DatabaseTransport"
CELERY_RESULT_DBURI = DATABASES['default']
...
When I synced before adding this line, CELERY_RESULT_DBURI = DATABASES['default']
, it ran fine. According to the document
When you deploy a Django application, the compile process appends the following code to your settings.py to use the DATABASE_URL environment variable:
Source: (StackOverflow)
I'm testing how kombu works. I'm planning to replace pika in several projects. I see that kombu has a lot of documentation but using what I found in the documentation some messages are lost. Here it's the code:
from kombu import Connection, Producer
conn = Connection('amqp://localhost:5672')
def errback(exc, interval):
logger.error('Error: %r', exc, exc_info=1)
logger.info('Retry in %s seconds.', interval)
producer = Producer(conn)
publish = conn.ensure(producer, producer.publish, errback=errback, max_retries=3)
for i in range(1, 200000):
publish({'hello': 'world'}, routing_key='test_queue')
time.sleep(0.001)
When it's publishing I close the connection several times and it keeps publishing but in the queue there are around 60000 messages, so there are a lot of lost messages.
I've tried different alternatives e.g:
publish({'hello': 'world'}, retry=True, mandatory=True, routing_key='hipri')
Thanks!
Source: (StackOverflow)
I cant seem to find any documentation on using AMQP transactions through the Kombu api.
This page talks about appending a message to the transactional state but it does not seem related.
I know the pika backend supports them and I am quite sure the amqplib backend (which Im currently using) does too but I dont yet see how this is exposed in Kombu.
Edit: to clarify, Im looking for channel.commit(), channel.select(), ... type methods
Source: (StackOverflow)
I'm in the process of making a consumer for rabbtMQ. I'm using python and after research i decided to use Kombu. With Kombu I already connected to a queue in rabbit and read the messages. The code is
queue = Queue('someQueue')
def process(body, message):
# Something
message.ack()
# connections
with Connection(hostname="localhost", userid="****", password="****", port=****, virtualhost="/") as conn:
# consume
with conn.Consumer(queue, callbacks=[process]) as consumer:
# Process messages and handle events on all channels
while True:
conn.drain_events()
It seems to work but I often see that Celery and Kombu are used together. I only need to consume the messages from the queue, is Kombu enough or should I integrate Celery as well. If so, does anyone has a good example, I found examples but they aren't clear to me.
Also I want to make my queue durable=false but the consumer seems to have durable =true by default. How can I change this?
Thanks for any help!
Source: (StackOverflow)
The documentation suggests extra AMQP properties are passed as keyword arguments to publish
as according to the AMQP spec, but correlationId="foo"
does not seem to have the desired effect.
Source: (StackOverflow)
I have a strange problem with my Kombu created queues. After some time they just stop getting messages. no error, but the messages aren't received any more. When I unbind the queue (from the rabbitmq web interface) and rebind it to the same exchange and routing key, everything is working again, for some time.. and then again.
What Am I missing here? I'll be glad for help with this.
I thought maybe I should adding heartbeat? something else?
this is the class the listener uses to create the queue and listen to it:
class UpdaterMixin(object):
# binding management commands to event names
# override in subclass
event_handlers = {}
app_name = '' #override in subclass
def __init__(self):
if not self.app_name or len(self.event_handlers) == 0:
print('app_name or event_handlers arent implemented')
raise NotImplementedError()
else:
self.connection_url = settings.BROKER_URL
self.exchange_name = settings.BUS_SETTINGS['exchange_name']
self.exchange_type = settings.BUS_SETTINGS['exchange_type']
self.routing_key = settings.ROUTING_KEYS[self.app_name]
def start_listener(self):
logger.info('started %s updater listener' % self.app_name)\\
with Connection(self.connection_url) as connection:
exchange = Exchange(self.exchange_name, self.exchange_type, durable=True)
queue = Queue('%s_updater' % self.app_name, exchange=exchange, routing_key=self.routing_key)
with connection.Consumer(queue, callbacks=[self.process_message]) as consumer:
while True:
logger.info('Consuming events')
connection.drain_events()
def process_message(self, body, message):
logger.info('data received: %s' % body)
handler = self.event_handlers[body['event']]
logger.info('Executing management command: %s' % str(handler))
handler(body)
message.ack()
Rabbitmq version: 3.2
Source: (StackOverflow)
The documentation for the Django ORM transport for Celery says that using more than a few workers can cause some tasks to be executed more than once.
http://docs.celeryproject.org/en/latest/getting-started/brokers/django.html
Can anyone quantify what "more than a few" means? Concretely speaking, I have two servers and would like to run two celery workers. I am wondering if anyone can tell me if having two workers exposes me to the risk that some tasks will execute more than once.
Source: (StackOverflow)
I'm working in Django and have installed django-celery. The celery daemon is running on my local server and accepting/executing tasks.
The last piece for me is to create a task that sends a message to an AMPQ broker on another server. The broker configuration is in my settings.py file, but I'm not clear on how to make the connection to the AMPQ server and construct the message (with header and json-encoded payload.
And now I've come to wonder whether I even need to be running celery just to send a message to an external AMQP broker.
UPDATE:
I'm using Kombu to publish to the AMQP broker, and it appears I can successfully establish the Publisher connection using the correct exchange, routing_key and exchange_type. My message must consist of a header with three key:value pairs and a json-encoded payload. I'm unclear how to construct the message.
Source: (StackOverflow)