Background Processing in Odoo
Odoo does not have a built-in task queue like Celery. Instead, background processing relies on scheduled actions (cron jobs) and creative patterns around them. This guide covers battle-tested approaches for reliable background task execution.
Pattern 1: Simple Cron Job
The simplest pattern — a method that runs on a schedule:
class DataSync(models.Model):
_name = 'data.sync'
_description = 'Data Synchronization'
def _cron_sync_products(self):
"""Sync products from external catalog — runs every hour."""
products = self._fetch_external_products()
for product_data in products:
existing = self.env['product.template'].search([
('default_code', '=', product_data['sku'])
], limit=1)
if existing:
existing.write({'list_price': product_data['price']})
else:
self.env['product.template'].create({
'name': product_data['name'],
'default_code': product_data['sku'],
'list_price': product_data['price'],
})XML definition:
<record id="cron_sync_products" model="ir.cron">
<field name="name">Product Sync</field>
<field name="model_id" ref="model_data_sync"/>
<field name="state">code</field>
<field name="code">model._cron_sync_products()</field>
<field name="interval_number">1</field>
<field name="interval_type">hours</field>
<field name="numbercall">-1</field>
</record>Pattern 2: Work Queue
For tasks that need reliable processing with retry and tracking:
class BackgroundJob(models.Model):
_name = 'background.job'
_description = 'Background Job Queue'
_order = 'priority desc, create_date asc'
name = fields.Char(required=True)
job_type = fields.Selection([
('email_send', 'Send Email'),
('report_generate', 'Generate Report'),
('data_export', 'Export Data'),
], required=True)
state = fields.Selection([
('pending', 'Pending'),
('running', 'Running'),
('done', 'Done'),
('error', 'Error'),
], default='pending', index=True)
priority = fields.Integer(default=10)
payload = fields.Text() # JSON data
result = fields.Text()
error_message = fields.Text()
retry_count = fields.Integer(default=0)
max_retries = fields.Integer(default=3)
started_at = fields.Datetime()
completed_at = fields.Datetime()
def _cron_process_jobs(self):
"""Process pending background jobs."""
# Advisory lock to prevent duplicate processing
self.env.cr.execute("SELECT pg_try_advisory_lock(999001)")
if not self.env.cr.fetchone()[0]:
return
try:
jobs = self.search([
('state', '=', 'pending'),
('retry_count', '<', 3),
], limit=20, order='priority desc, create_date asc')
for job in jobs:
job._process_single_job()
finally:
self.env.cr.execute("SELECT pg_advisory_unlock(999001)")
def _process_single_job(self):
self.write({
'state': 'running',
'started_at': fields.Datetime.now(),
})
self.env.cr.commit()
try:
handler = getattr(self, f'_handle_{self.job_type}', None)
if not handler:
raise ValueError(f'Unknown job type: {self.job_type}')
result = handler(json.loads(self.payload or '{}'))
self.write({
'state': 'done',
'result': json.dumps(result) if result else '',
'completed_at': fields.Datetime.now(),
})
self.env.cr.commit()
except Exception as e:
self.env.cr.rollback()
self.write({
'state': 'error' if self.retry_count >= self.max_retries - 1 else 'pending',
'error_message': str(e),
'retry_count': self.retry_count + 1,
})
self.env.cr.commit()Pattern 3: Batch Processing with Progress
Process large datasets in chunks with progress tracking:
class BatchProcessor(models.Model):
_name = 'batch.processor'
_description = 'Batch Processing'
name = fields.Char()
total_records = fields.Integer()
processed_records = fields.Integer(default=0)
state = fields.Selection([
('draft', 'Draft'),
('running', 'Running'),
('done', 'Done'),
('error', 'Error'),
], default='draft')
progress = fields.Float(compute='_compute_progress')
@api.depends('total_records', 'processed_records')
def _compute_progress(self):
for rec in self:
if rec.total_records:
rec.progress = (rec.processed_records / rec.total_records) * 100
else:
rec.progress = 0
def action_start(self):
domain = self._get_processing_domain()
total = self.env['sale.order'].search_count(domain)
self.write({
'state': 'running',
'total_records': total,
'processed_records': 0,
})
def _cron_process_batch(self):
batches = self.search([('state', '=', 'running')])
for batch in batches:
batch._process_next_chunk()
def _process_next_chunk(self, chunk_size=100):
domain = self._get_processing_domain()
records = self.env['sale.order'].search(
domain, limit=chunk_size,
offset=self.processed_records
)
if not records:
self.write({'state': 'done'})
return
for record in records:
try:
self._process_record(record)
except Exception:
_logger.exception('Failed to process %s', record.name)
self.write({
'processed_records': self.processed_records + len(records)
})
self.env.cr.commit()Pattern 4: Deferred Action
Trigger a background task from a user action:
class SaleOrder(models.Model):
_inherit = 'sale.order'
def action_generate_report_async(self):
"""Queue report generation instead of blocking the UI."""
self.env['background.job'].create({
'name': f'Generate report for {self.name}',
'job_type': 'report_generate',
'payload': json.dumps({'order_id': self.id}),
})
return {
'type': 'ir.actions.client',
'tag': 'display_notification',
'params': {
'title': 'Report Queued',
'message': 'Your report will be generated in the background.',
'type': 'info',
}
}Error Recovery Strategies
| Strategy | When To Use |
|---|---|
| Retry with backoff | Transient errors (API timeouts, network) |
| Dead letter queue | After max retries exceeded |
| Commit after each item | Long-running batches (prevents full rollback) |
| Skip and log | Non-critical processing where partial success is OK |
| Circuit breaker | External service integration (stop calling if consistently failing) |
Commit Patterns
# Process each record in its own transaction
for record in records:
try:
record._process()
self.env.cr.commit() # success: commit this record
except Exception:
self.env.cr.rollback() # failure: rollback this record only
_logger.exception('Failed to process %s', record.id)
# Continue to next recordWarning: After cr.rollback(), the environment cache is invalid. Call self.env.invalidate_all() and re-browse records if needed.
Monitoring Background Jobs
def _cron_monitor_jobs(self):
"""Alert on stuck or failed jobs."""
stuck = self.env['background.job'].search([
('state', '=', 'running'),
('started_at', '<', fields.Datetime.now() - timedelta(hours=1)),
])
if stuck:
stuck.write({'state': 'error', 'error_message': 'Job stuck for over 1 hour'})
failed = self.env['background.job'].search_count([
('state', '=', 'error'),
('create_date', '>=', fields.Date.today()),
])
if failed > 10:
_logger.warning('High job failure rate: %d failed jobs today', failed)Best Practices
- Use advisory locks to prevent duplicate cron execution across workers
- Commit after each record in long-running batches
- Implement retry with max attempts for transient failures
- Track progress for user-visible batch operations
- Log all job starts, completions, and failures
- Set reasonable cron intervals — too frequent wastes resources, too sparse delays processing
- Monitor for stuck jobs and high failure rates
- Use LIMIT in queries to prevent processing too many records at once