Skip to content

Odoo Background Task Patterns: Cron Jobs, Queues, and Async Processing

DeployMonkey Team · March 23, 2026 12 min read

Background Processing in Odoo

Odoo does not have a built-in task queue like Celery. Instead, background processing relies on scheduled actions (cron jobs) and creative patterns around them. This guide covers battle-tested approaches for reliable background task execution.

Pattern 1: Simple Cron Job

The simplest pattern — a method that runs on a schedule:

class DataSync(models.Model):
    _name = 'data.sync'
    _description = 'Data Synchronization'

    def _cron_sync_products(self):
        """Sync products from external catalog — runs every hour."""
        products = self._fetch_external_products()
        for product_data in products:
            existing = self.env['product.template'].search([
                ('default_code', '=', product_data['sku'])
            ], limit=1)
            if existing:
                existing.write({'list_price': product_data['price']})
            else:
                self.env['product.template'].create({
                    'name': product_data['name'],
                    'default_code': product_data['sku'],
                    'list_price': product_data['price'],
                })

XML definition:

<record id="cron_sync_products" model="ir.cron">
    <field name="name">Product Sync</field>
    <field name="model_id" ref="model_data_sync"/>
    <field name="state">code</field>
    <field name="code">model._cron_sync_products()</field>
    <field name="interval_number">1</field>
    <field name="interval_type">hours</field>
    <field name="numbercall">-1</field>
</record>

Pattern 2: Work Queue

For tasks that need reliable processing with retry and tracking:

class BackgroundJob(models.Model):
    _name = 'background.job'
    _description = 'Background Job Queue'
    _order = 'priority desc, create_date asc'

    name = fields.Char(required=True)
    job_type = fields.Selection([
        ('email_send', 'Send Email'),
        ('report_generate', 'Generate Report'),
        ('data_export', 'Export Data'),
    ], required=True)
    state = fields.Selection([
        ('pending', 'Pending'),
        ('running', 'Running'),
        ('done', 'Done'),
        ('error', 'Error'),
    ], default='pending', index=True)
    priority = fields.Integer(default=10)
    payload = fields.Text()  # JSON data
    result = fields.Text()
    error_message = fields.Text()
    retry_count = fields.Integer(default=0)
    max_retries = fields.Integer(default=3)
    started_at = fields.Datetime()
    completed_at = fields.Datetime()

    def _cron_process_jobs(self):
        """Process pending background jobs."""
        # Advisory lock to prevent duplicate processing
        self.env.cr.execute("SELECT pg_try_advisory_lock(999001)")
        if not self.env.cr.fetchone()[0]:
            return

        try:
            jobs = self.search([
                ('state', '=', 'pending'),
                ('retry_count', '<', 3),
            ], limit=20, order='priority desc, create_date asc')

            for job in jobs:
                job._process_single_job()
        finally:
            self.env.cr.execute("SELECT pg_advisory_unlock(999001)")

    def _process_single_job(self):
        self.write({
            'state': 'running',
            'started_at': fields.Datetime.now(),
        })
        self.env.cr.commit()

        try:
            handler = getattr(self, f'_handle_{self.job_type}', None)
            if not handler:
                raise ValueError(f'Unknown job type: {self.job_type}')

            result = handler(json.loads(self.payload or '{}'))
            self.write({
                'state': 'done',
                'result': json.dumps(result) if result else '',
                'completed_at': fields.Datetime.now(),
            })
            self.env.cr.commit()
        except Exception as e:
            self.env.cr.rollback()
            self.write({
                'state': 'error' if self.retry_count >= self.max_retries - 1 else 'pending',
                'error_message': str(e),
                'retry_count': self.retry_count + 1,
            })
            self.env.cr.commit()

Pattern 3: Batch Processing with Progress

Process large datasets in chunks with progress tracking:

class BatchProcessor(models.Model):
    _name = 'batch.processor'
    _description = 'Batch Processing'

    name = fields.Char()
    total_records = fields.Integer()
    processed_records = fields.Integer(default=0)
    state = fields.Selection([
        ('draft', 'Draft'),
        ('running', 'Running'),
        ('done', 'Done'),
        ('error', 'Error'),
    ], default='draft')
    progress = fields.Float(compute='_compute_progress')

    @api.depends('total_records', 'processed_records')
    def _compute_progress(self):
        for rec in self:
            if rec.total_records:
                rec.progress = (rec.processed_records / rec.total_records) * 100
            else:
                rec.progress = 0

    def action_start(self):
        domain = self._get_processing_domain()
        total = self.env['sale.order'].search_count(domain)
        self.write({
            'state': 'running',
            'total_records': total,
            'processed_records': 0,
        })

    def _cron_process_batch(self):
        batches = self.search([('state', '=', 'running')])
        for batch in batches:
            batch._process_next_chunk()

    def _process_next_chunk(self, chunk_size=100):
        domain = self._get_processing_domain()
        records = self.env['sale.order'].search(
            domain, limit=chunk_size,
            offset=self.processed_records
        )

        if not records:
            self.write({'state': 'done'})
            return

        for record in records:
            try:
                self._process_record(record)
            except Exception:
                _logger.exception('Failed to process %s', record.name)

        self.write({
            'processed_records': self.processed_records + len(records)
        })
        self.env.cr.commit()

Pattern 4: Deferred Action

Trigger a background task from a user action:

class SaleOrder(models.Model):
    _inherit = 'sale.order'

    def action_generate_report_async(self):
        """Queue report generation instead of blocking the UI."""
        self.env['background.job'].create({
            'name': f'Generate report for {self.name}',
            'job_type': 'report_generate',
            'payload': json.dumps({'order_id': self.id}),
        })
        return {
            'type': 'ir.actions.client',
            'tag': 'display_notification',
            'params': {
                'title': 'Report Queued',
                'message': 'Your report will be generated in the background.',
                'type': 'info',
            }
        }

Error Recovery Strategies

StrategyWhen To Use
Retry with backoffTransient errors (API timeouts, network)
Dead letter queueAfter max retries exceeded
Commit after each itemLong-running batches (prevents full rollback)
Skip and logNon-critical processing where partial success is OK
Circuit breakerExternal service integration (stop calling if consistently failing)

Commit Patterns

# Process each record in its own transaction
for record in records:
    try:
        record._process()
        self.env.cr.commit()  # success: commit this record
    except Exception:
        self.env.cr.rollback()  # failure: rollback this record only
        _logger.exception('Failed to process %s', record.id)
        # Continue to next record

Warning: After cr.rollback(), the environment cache is invalid. Call self.env.invalidate_all() and re-browse records if needed.

Monitoring Background Jobs

def _cron_monitor_jobs(self):
    """Alert on stuck or failed jobs."""
    stuck = self.env['background.job'].search([
        ('state', '=', 'running'),
        ('started_at', '<', fields.Datetime.now() - timedelta(hours=1)),
    ])
    if stuck:
        stuck.write({'state': 'error', 'error_message': 'Job stuck for over 1 hour'})

    failed = self.env['background.job'].search_count([
        ('state', '=', 'error'),
        ('create_date', '>=', fields.Date.today()),
    ])
    if failed > 10:
        _logger.warning('High job failure rate: %d failed jobs today', failed)

Best Practices

  • Use advisory locks to prevent duplicate cron execution across workers
  • Commit after each record in long-running batches
  • Implement retry with max attempts for transient failures
  • Track progress for user-visible batch operations
  • Log all job starts, completions, and failures
  • Set reasonable cron intervals — too frequent wastes resources, too sparse delays processing
  • Monitor for stuck jobs and high failure rates
  • Use LIMIT in queries to prevent processing too many records at once