A Framework Agnostic Client for Google Cloud Tasks.
Move from "Pull" (Workers) to "Push" (Serverless) architecture effortlessly.
py-cloud-task is a lightweight, async-first library that abstracts the complexity of Google Cloud Tasks. It provides a developer experience similar to Celery or TaskIQ but is designed specifically for Serverless environments (Cloud Run, App Engine, Cloud Functions, FastAPI).
It handles serialization, authentication (OIDC), scheduling, and—crucially—FastAPI Dependency Injection automatically.
| Feature | Celery / Redis (Pull) | py-cloud-task (Push) |
|---|---|---|
| Architecture | Workers poll Redis 24/7 ("Are there tasks?") | Google calls your API via HTTP ("Here is a task") |
| Cost | You pay for idle workers & Redis instances | Pay-per-use (Scale to Zero supported) |
| Infra | Requires Redis/RabbitMQ management | Zero Ops (Managed by Google) |
| Retries | Managed by worker code | Native (Exponential backoff managed by GCP) |
| DX | Heavy setup | Decorator-based (Just like FastAPI) |
Currently, the package is available via GitHub. You can install it using uv or pip.
# Instalação Core
uv add "py-cloud-task @ git+https://github.com/uhmiller/py-cloud-task.git"
# Com suporte a FastAPI
uv add "py-cloud-task[fastapi] @ git+https://github.com/uhmiller/py-cloud-task.git"
# Para simulação local (testes)
uv add "py-cloud-task[test] @ git+https://github.com/uhmiller/py-cloud-task.git"The CloudTaskClient is the entry point. It holds the configuration for your Google Cloud project and queue.
from cloudtask import CloudTaskClient
client = CloudTaskClient(
project="my-gcp-project",
location="europe-west1",
queue="default",
url="[https://api.myapp.com/tasks/run](https://api.myapp.com/tasks/run)", # The public URL of your worker
sae="my-service-account@my-gcp-project.iam.gserviceaccount.com", # Service Account email for OIDC auth
secret="super-secret-token", # Optional: Header secret for extra security
force_to_queue=None, # Optional: Force all tasks to a specific queue (useful for Staging)
eager=None, # None = Production (Sends to Google Cloud)
)Use the @client.task decorator. You can define tasks anywhere in your code.
@client.task(queue='high-priority', name='unique-task-name')
async def send_welcome_email(user_id: str, email: str):
print(f"Sending email to {email}...")
# ... logic to send email ...
return "sent"You can trigger tasks asynchronously. This will serialize the arguments and send them to Google Cloud Tasks.
# Simple trigger
await send_welcome_email(user_id="123", email="user@example.com").push()Schedule a task to run in the future using the at parameter.
from datetime import datetime, timedelta
# Run 1 hour from now
eta = datetime.now() + timedelta(hours=1)
await send_welcome_email("123", "user@example.com").push(at=eta)Google Cloud Tasks ensures that tasks with the same name are executed only once. You can set a custom name to prevent duplicate execution.
# Instantiate the task wrapper first
task = send_welcome_email("123", "user@example.com")
# Set a deterministic name (e.g., specific to the user and action)
task.name = "welcome-email-user-123"
# Push to cloud
await task.push()When developing locally, you often don't want to send tasks to Google Cloud. The eager parameter supports three modes
to help you develop and test safely.
Runs the function directly in the current process. Fastest option for Unit Tests.
client = CloudTaskClient(..., eager="immediate")
await send_welcome_email("123", "user@example.com").push()
# Result: Function runs instantly. No HTTP. No Serialization.Simulates a full HTTP request to your local worker using httpx. This is perfect for Integration Tests because it
validates serialization, headers, and dependency injection without needing Google infrastructure.
client = CloudTaskClient(..., eager="remote")
await send_welcome_email("123", "user@example.com").push()
# Result: Sends POST http://localhost:8000/tasks/run.The default behavior. Serializes the task and sends it to Google Cloud Tasks.
py-cloud-task has first-class support for FastAPI. It leverages FastAPI's native Dependency Injection system.
from fastapi import FastAPI
from cloudtask.fastapi import CloudTaskRouter
from app.core.tasks import client
app = FastAPI()
# Register the route that receives tasks from Google
app.include_router(CloudTaskRouter(client), prefix="/tasks")You can inject database sessions, services, or any other dependency directly into your tasks, just like in API endpoints.
from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.db import get_db
@client.task()
async def process_order(
order_id: int,
db: AsyncSession = Depends(get_db) # <--- Magic happens here
):
# The 'db' session is created, injected, and closed automatically!
order = await db.get(Order, order_id)
order.status = "processed"
await db.commit()Note: When triggering the task, you only pass the data arguments. The dependencies are resolved automatically by the worker.
# Correct usage (Dependency is ignored during push)
await process_order(order_id=500).push()To ensure that only Google Cloud Tasks can call your worker endpoint, the library supports two mechanisms:
- OIDC Token (Recommended): The library automatically attaches an OIDC token identifying the Service Account. Your Cloud Run/Functions service should validate this token (Google handles this automatically for Cloud Run if you don't allow unauthenticated invocations).
- Secret Header: You can configure a shared secret.
client = CloudTaskClient(..., secret="my-secret-key")The router will automatically validate the X-PYCT-SECRET header and reject unauthorized requests (403 Forbidden).
Contributions are welcome! If you find a bug or want to add a feature (e.g., Flask or Django adapters), please open an issue or submit a PR.
Built with ❤️ by the engineering team at Ziett