Feat/celery integration#527
Draft
parinporecha wants to merge 5 commits intoPostHog:mainfrom
Draft
Conversation
Co-authored-by: Dustin Byrne <dustinsbyrne@gmail.com>
- make fork safety complete in the client - add shutdown mechanism to the integration - better test coverage - better docs on usage
9f4fb89 to
dd71537
Compare
Member
|
follow up from #464 |
Member
can you also add a changeset entry, please |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
This PR:
PosthogCeleryIntegrationto automatically capture Celery task lifecycle events and exceptions.distinct_id,session_id, tags) from the task producer to the worker so Celery tasks can be correlated with the originating user/session.Clientsafer across process forks by reinitializing fork-unsafe client state in child processes.Context
I saw users asking for advice on how to use PostHog with Celery for error tracking in community questions and realized that there's currently no first-class way to instrument Celery workloads with PostHog.
That leaves a few gaps:
This PR addresses those gaps by adding a Celery integration that helps users observe task execution end-to-end out of the box.
The integration takes inspiration from OpenTelemetry's Celery instrumentor and PostHog context propagation is achieved through task headers mirroring Sentry and DataDog's Celery integrations.
While testing this, I found a separate SDK issue: when a
Clientconfigured in async mode is inherited across a process fork, the child process inherits a client whose consumer threads no longer exist. In practice, that means worker-side events don't get delivered. This would also be a problem when using the SDK in some Django deployments.So this PR also adds fork handling to
Clientby reinitializing its queue, consumers, and other state in the child process viaos.register_at_fork.Changes
New: Celery Integration (
posthog/integrations/celery.py)task_prerun,task_success,task_failure, etc.) to capture events likecelery task started,celery task successetc. Check the docstring in the integration module code for complete list of supported events._on_before_task_publish: Injects current PostHog context (distinct_id, session_id, tags) into task headers._on_task_prerun: Extracts headers in the worker and restores the PostHog context for the duration of the task. This context is exited upon task completion.Refactored: Client Fork Safety
posthog/client.py_reinit_after_forkmethod to reset the internal queue and spin up new consumers in a child process.os.register_at_fork(on supported platforms) to automatically call this method, so that the SDK does not drop captured events when used in child processes.Examples
examples/celery_integration.pyTests
posthog/test/integrations/test_celery_integration.pycovering:posthog/test/test_client_fork.pycovering:Screenshots (created through example script)
Celery task lifecycle events and captured Exception -

Celery task success event emitted from worker carrying correct distinct ID, session ID set in parent and context tags -

Captured exception -
