Skip to content

Feat/celery integration#527

Draft
parinporecha wants to merge 5 commits intoPostHog:mainfrom
parinporecha:feat/celery_integration
Draft

Feat/celery integration#527
parinporecha wants to merge 5 commits intoPostHog:mainfrom
parinporecha:feat/celery_integration

Conversation

@parinporecha
Copy link
Copy Markdown
Contributor

Summary

This PR:

  • Adds a new PosthogCeleryIntegration to automatically capture Celery task lifecycle events and exceptions.
  • Propagates PostHog context (distinct_id, session_id, tags) from the task producer to the worker so Celery tasks can be correlated with the originating user/session.
  • Makes Client safer across process forks by reinitializing fork-unsafe client state in child processes.
from posthog.integrations.celery import PosthogCeleryIntegration
integration = PosthogCeleryIntegration(
    capture_exceptions=True,
    capture_task_lifecycle_events=True,
    propagate_context=True
)
integration.instrument()

Context

I saw users asking for advice on how to use PostHog with Celery for error tracking in community questions and realized that there's currently no first-class way to instrument Celery workloads with PostHog.

That leaves a few gaps:

  • background task execution is hard to observe without manual instrumentation.
  • worker-side events are difficult to correlate back to the originating user or request.

This PR addresses those gaps by adding a Celery integration that helps users observe task execution end-to-end out of the box.

The integration takes inspiration from OpenTelemetry's Celery instrumentor and PostHog context propagation is achieved through task headers mirroring Sentry and DataDog's Celery integrations.

While testing this, I found a separate SDK issue: when a Client configured in async mode is inherited across a process fork, the child process inherits a client whose consumer threads no longer exist. In practice, that means worker-side events don't get delivered. This would also be a problem when using the SDK in some Django deployments.

So this PR also adds fork handling to Client by reinitializing its queue, consumers, and other state in the child process via os.register_at_fork.

Changes

New: Celery Integration (posthog/integrations/celery.py)

  • Lifecycle Events: Hooks into Celery signals (task_prerun, task_success, task_failure, etc.) to capture events like celery task started, celery task success etc. Check the docstring in the integration module code for complete list of supported events.
    • Lifecycle events include Celery-specific properties such as task ID, task name, queue, retry count, duration, Celery version etc. Check the docstring for complete set of event properties.
  • Context Propagation:
    • _on_before_task_publish: Injects current PostHog context (distinct_id, session_id, tags) into task headers.
    • _on_task_prerun: Extracts headers in the worker and restores the PostHog context for the duration of the task. This context is exited upon task completion.
    • Any custom events captured inside a task inherit the same propagated PostHog context and Celery task tags.
  • Exception Capture: Automatically captures exceptions from failed tasks.

Refactored: Client Fork Safety posthog/client.py

  • Added _reinit_after_fork method to reset the internal queue and spin up new consumers in a child process.
  • Uses os.register_at_fork (on supported platforms) to automatically call this method, so that the SDK does not drop captured events when used in child processes.

Examples examples/celery_integration.py

  • Added a complete example showing how to configure the integration on both the producer and worker sides and all features in practice.

Tests

  • Added posthog/test/integrations/test_celery_integration.py covering:
    • Signal handlers for all task states.
    • Context propagation (header injection and extraction).
    • Task filtering logic.
    • Exception capture.
  • Added posthog/test/test_client_fork.py covering:
    • Unit tests for client's fork safety
    • End to end tests forking Client and verifying behaviour
  • Manually tested the example against Celery 5.2.1 (2021), 5.3.1, 5.4.0, and 5.6.2 (2026).

Screenshots (created through example script)

  • Celery task lifecycle events and captured Exception -
    image

  • Celery task success event emitted from worker carrying correct distinct ID, session ID set in parent and context tags -
    image

  • Captured exception -
    image

parinporecha and others added 5 commits April 21, 2026 15:21
Co-authored-by: Dustin Byrne <dustinsbyrne@gmail.com>
- make fork safety complete in the client
- add shutdown mechanism to the integration
- better test coverage
- better docs on usage
@parinporecha parinporecha force-pushed the feat/celery_integration branch from 9f4fb89 to dd71537 Compare April 21, 2026 13:21
@marandaneto
Copy link
Copy Markdown
Member

follow up from #464

@marandaneto
Copy link
Copy Markdown
Member

@parinporecha

Run ruff format --check .
Would reformat: examples/celery_integration.py
Would reformat: posthog/client.py
Would reformat: posthog/integrations/celery.py
Would reformat: posthog/test/integrations/test_celery_integration.py
Would reformat: posthog/test/test_client_fork.py
5 files would be reformatted, 152 files already formatted

can you also add a changeset entry, please
https://github.com/PostHog/posthog-python/blob/main/RELEASING.md

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants