Automated job market intelligence pipeline for the Ethiopian tech sector. Scrapes job listings from multiple sources, preprocesses them through a 10-step NLP pipeline, stores structured data and vector embeddings in Supabase, and trains ML models monthly via Kaggle — all orchestrated by Apache Airflow.
Every 12 hours (06:00 & 18:00 UTC):
[fetch_clean_afriwork] ──┐
[fetch_clean_hahu] ──┼──▶ [merge_and_load] ──▶ [generate_embeddings] ──▶ [log_run_status]
[fetch_clean_source_c] ──┘ preprocess + updates jobs.embedding
DO Spaces backup + (sentence-transformers,
Supabase upsert non-fatal)
1st of each month (02:00 UTC):
[read_training_data] ──▶ [push_to_kaggle] ──▶ [trigger_kaggle_run] ──▶ [poll_kaggle]
│
┌────────────────────────────────────────────────┘
▼ ▼
[fetch_artifact_prophet] [fetch_artifact_lstm]
[upload_to_staging_prophet] [upload_to_staging_lstm]
[log_training_run_prophet] [log_training_run_lstm]
└────────────────────┬───────────┘
▼
[notify_admin]
Both DAGs write to Supabase (PostgreSQL + pgvector):
┌───────────────────────────────────────────┐
│ Supabase (PostgreSQL) │
│ │
│ jobs – structured data + │
│ vector(384) embed. │
│ raw_job_postings – raw scraped data │
│ ml_training_runs – model training log │
│ match_jobs() – semantic search fn │
└───────────────────────────────────────────┘
CareerCompass/
├── dags/
│ ├── job_data_pipeline.py # ETL DAG — runs every 12h
│ └── monthly_training_pipeline.py # ML training DAG — runs 1st of month
├── Job_pipeline/
│ ├── scrape_jobs.py # GraphQL scrapers (Afriwork, Hahu)
│ ├── run_preprocessing_pipeline.py # Batch preprocessing runner
│ ├── load_to_postgres.py # CSV → PostgreSQL loader
│ ├── run_pipeline.py # End-to-end CLI runner
│ ├── preprocessing/ # NLP pipeline modules (steps 0–10)
│ ├── taxonomy/
│ │ ├── roles.json # 21 canonical job roles
│ │ └── skills.json # ~130+ tech skills
│ ├── db/
│ │ └── schema.sql # Supabase schema (run once)
│ └── tests/ # Unit tests for each pipeline step
├── data/
│ ├── raw/ # Scraped CSVs (mounted into container)
│ └── processed/ # Preprocessed CSVs + merged Parquet
├── Dockerfile # apache/airflow:2.10.4-python3.11
├── docker-compose.yml # LocalExecutor setup
├── requirements.txt
├── .env.example # Template — copy to .env
└── .gitignore
Before you run anything, you need:
- Docker Desktop (includes Docker Compose)
- A Supabase project with the
pgvectorextension enabled- Enable it in the Supabase dashboard: Database → Extensions → vector
psqlclient (to run the schema script)- On macOS:
brew install libpq - On Ubuntu:
sudo apt install postgresql-client - On Windows: install PostgreSQL and use the bundled
psql
- On macOS:
Optional services (pipeline degrades gracefully without them):
- DigitalOcean Spaces bucket (Parquet backups + training data source)
- Groq API key (LLM fallback for skills/title extraction)
- Gemini API key (secondary LLM fallback)
- Kaggle account (monthly ML training pipeline)
- FastAPI service (admin notifications from
notify_admintask) - Afriwork bearer token (authenticated scraping; public endpoints work without it)
git clone https://github.com/VentureScope/CareerCompass.git
cd CareerCompass
cp .env.example .envEdit .env and fill in your values. The minimum required set:
# Supabase connection string (from Project Settings → Database → Connection string → URI)
DATABASE_URL=postgresql://postgres.<project-ref>:<password>@aws-1-<region>.pooler.supabase.com:5432/postgres
# Generate with: python -c "from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())"
AIRFLOW_FERNET_KEY=
# Generate with: python -c "import secrets; print(secrets.token_hex(32))"
AIRFLOW_SECRET_KEY=
# Any strong password for the internal Airflow metadata Postgres container
AIRFLOW_DB_PASSWORD=Everything else in .env is optional — see the Environment Variables Reference table below.
This is a one-time step. Run the schema script against your Supabase database to create all tables, indexes, functions, and views:
psql "$DATABASE_URL" -f Job_pipeline/db/schema.sqlOn Windows (PowerShell), set the variable inline:
$env:DATABASE_URL="postgresql://postgres.<ref>:<pass>@aws-1-<region>.pooler.supabase.com:5432/postgres"
psql $env:DATABASE_URL -f Job_pipeline/db/schema.sqlThis creates:
| Object | Type | Purpose |
|---|---|---|
jobs |
Table | Structured job data + vector(384) embeddings |
raw_job_postings |
Table | Raw scraped data before preprocessing |
ml_training_runs |
Table | Monthly ML training run log |
match_jobs() |
Function | Semantic similarity search over jobs.embedding |
top_trending_jobs |
View | Top roles posted in the last 30 days |
monthly_job_counts |
View | Posting counts per role per month |
The schema is idempotent (
CREATE TABLE IF NOT EXISTS,CREATE INDEX IF NOT EXISTS,CREATE OR REPLACE). Safe to re-run.
Docker mounts ./data, ./logs, ./dags, and ./plugins into the containers. The Airflow process runs as UID 50000 inside the container.
Linux / macOS:
mkdir -p data logs plugins
sudo chown 50000:0 data logs dagsWindows (Docker Desktop with WSL2):
mkdir -p data logs plugins
# No chown needed — Docker Desktop handles permissions transparentlydocker compose buildThis installs PyTorch (CPU-only), all requirements.txt packages, and downloads the en_core_web_sm spaCy model. The first build takes several minutes.
docker compose up airflow-initThis runs airflow db migrate and creates the admin user defined in your .env:
_AIRFLOW_WWW_USER_USERNAME=admin
_AIRFLOW_WWW_USER_PASSWORD=changeme-on-first-login
Wait for the container to exit cleanly (exit code 0) before proceeding.
docker compose up -d airflow-webserver airflow-schedulerCheck that both containers are healthy:
docker compose psBoth airflow-webserver and airflow-scheduler should show healthy after ~30–60 seconds.
Open http://localhost:8080 and log in with the credentials from your .env.
You will see two DAGs, both paused by default:
| DAG | Schedule | Purpose |
|---|---|---|
job_data_pipeline |
Every 12h (0 6,18 * * * UTC) |
Scrape → preprocess → embed → load to Supabase |
monthly_training_pipeline |
1st of month (0 2 1 * * UTC) |
Export training data → Kaggle → poll → staging → notify |
Unpause a DAG or trigger a manual run via the Play button.
[fetch_clean_afriwork] ──┐
[fetch_clean_hahu] ──┼──▶ [merge_and_load] ──▶ [generate_embeddings] ──▶ [log_run_status]
[fetch_clean_source_c] ──┘
| Task | What it does |
|---|---|
fetch_clean_afriwork |
Scrapes Afriwork GraphQL API; runs 10-step NLP preprocessing; saves to data/raw/ and data/processed/ |
fetch_clean_hahu |
Same for Hahu GraphQL API |
fetch_clean_source_c |
Placeholder for a third source |
merge_and_load |
Merges processed CSVs; writes Parquet to DO Spaces; upserts structured data into jobs table |
generate_embeddings |
Encodes description with all-MiniLM-L6-v2; writes 384-dim vectors to jobs.embedding |
log_run_status |
Always runs last; logs a summary of all task outcomes |
Fan-out behavior: the three fetch_clean_* tasks run in parallel. One failure doesn't block the others (trigger_rule=ALL_DONE on merge_and_load).
[read_training_data] ──▶ [push_to_kaggle] ──▶ [trigger_kaggle_run] ──▶ [poll_kaggle]
│
┌────────────────────────────────────────────────┘
▼ ▼
[fetch_artifact_prophet] [fetch_artifact_lstm]
[upload_to_staging_prophet] [upload_to_staging_lstm]
[log_training_run_prophet] [log_training_run_lstm]
└────────────────────┬───────────┘
▼
[notify_admin]
| Task | What it does |
|---|---|
read_training_data |
Lists and fetches all Parquet files under data/cleaned/ in DO Spaces (full history, paginated). Deduplicates on job_id. Writes merged CSV to model_staging/. |
push_to_kaggle |
Zips the full-history CSV and uploads it as a new Kaggle dataset version (KAGGLE_DATASET_SLUG). Each monthly run adds a new version. |
trigger_kaggle_run |
Triggers the Kaggle notebook (KAGGLE_NOTEBOOK_SLUG) via the Kaggle API. Auto-run must be OFF in notebook settings. |
poll_kaggle |
Polls the notebook every 5 min until complete or error. retries=0 — if the notebook fails, Airflow surfaces it to the admin rather than re-triggering. Timeout: 4 hours. |
fetch_artifact_prophet |
Downloads the Kaggle output zip; extracts prophet_model.pkl and reads metrics.json["prophet"]. |
fetch_artifact_lstm |
Same — extracts lstm_model.pkl and reads metrics.json["lstm"]. |
upload_to_staging_prophet |
Uploads prophet/model.pkl + prophet/metadata.json to models/staging/YYYY-MM/prophet/ in DO Spaces. |
upload_to_staging_lstm |
Same for models/staging/YYYY-MM/lstm/. |
log_training_run_prophet |
Inserts a row into ml_training_runs with model_type=prophet, status=awaiting_review. run_id is {airflow_run_id}_prophet. |
log_training_run_lstm |
Same with model_type=lstm. run_id is {airflow_run_id}_lstm. |
notify_admin |
POSTs to FASTAPI_INTERNAL_URL/admin/notifications with both models' metrics combined. trigger_rule=ALL_DONE — always fires. |
Note: the pipeline does not auto-deploy. An admin reviews both models' metrics in the dashboard and manually deploys via the FastAPI service.
The notebook must write these files to its working directory before completing:
| File | Required | Description |
|---|---|---|
prophet_model.pkl |
Yes | Serialized Prophet model (pickle.dump) |
lstm_model.pkl |
Yes | Serialized LSTM model (pickle.dump or torch.save) |
metrics.json |
Recommended | Nested metrics for both models (see structure below) |
metrics.json structure:
{
"prophet": {
"accuracy": 0.87,
"f1_score": 0.84,
"auc_roc": 0.91,
"class_balance": { "Software Engineer": 312, "Data Scientist": 198 }
},
"lstm": {
"accuracy": 0.89,
"f1_score": 0.86,
"auc_roc": 0.93,
"class_balance": { "Software Engineer": 298, "Data Scientist": 201 }
}
}The notebook receives the full historical dataset at:
/kaggle/input/<dataset-slug>/training_data_YYYY-MM.csv
Columns: job_id, normalized_title, year_month, month, posted_date, skills, city, country, is_remote, job_type, education_level, source, ...
models/staging/YYYY-MM/
prophet/
model.pkl
metadata.json
lstm/
model.pkl
metadata.json
Each job listing passes through these steps in order:
| Step | Module | What it does |
|---|---|---|
| 0 | tech_job_validation |
Filters out non-tech postings using taxonomy keyword matching |
| 1 | clean_text |
Strips HTML, normalizes Unicode, removes boilerplate |
| 2 | job_id |
Generates deterministic 16-char SHA256 ID from title + company + date + source |
| 3 | date_features |
Extracts year_month, month, week, quarter, holiday_flag (Ethiopia holidays) |
| 4 | title_normalization |
Maps titles to canonical forms via all-MiniLM-L6-v2 cosine similarity + taxonomy |
| 5 | description_embedding |
Generates 384-dim DescriptionVec with all-MiniLM-L6-v2 |
| 6 | location_extraction |
Rule/regex → spaCy NER (en_core_web_sm) → Gemini fallback → city/region/country |
| 7 | remote_detection |
Structured hint → keyword scoring → Gemini → is_remote bool + remote_mode |
| 8 | job_type_extraction |
Rule/regex → Gemini → full_time/part_time/internship/contractual/... |
| 9 | education_extraction |
Rule/regex → Gemini → PhD/Masters/Bachelors/Diploma/Not specified |
| 10 | skills_extraction |
Taxonomy embedding match + mention boost → up to 12 skills; Gemini fallback |
LLM fallback (Groq primary → Gemini secondary) is disabled by default. Enable with ENABLE_LLM_FALLBACK=true in .env.
The schema lives in Job_pipeline/db/schema.sql and must be applied once before the first run (see Step 2).
| Column | Type | Notes |
|---|---|---|
id |
VARCHAR(36) PK | UUID |
job_id |
VARCHAR(255) UNIQUE | SHA256-derived deterministic ID |
company_name |
VARCHAR(500) | |
job_title |
VARCHAR(500) | Raw title from source |
normalized_title |
VARCHAR(500) | Canonical form from taxonomy |
description |
TEXT | Cleaned text |
embedding |
vector(384) | sentence-transformers output; HNSW index |
city |
VARCHAR(255) | |
region |
VARCHAR(255) | |
country |
VARCHAR(100) | |
is_remote |
BOOLEAN | |
job_type |
VARCHAR(100) | full_time / part_time / etc. |
education_level |
VARCHAR(255) | |
skills |
JSONB | Array of skill names; GIN-indexed |
source |
VARCHAR(50) | afriwork / hahu / etc. |
posted_date |
TIMESTAMPTZ | |
year_month |
TEXT | e.g. 2026-05 |
One row per model per monthly run (e.g. two rows per run: one for prophet, one for lstm).
| Column | Type | Notes |
|---|---|---|
run_id |
TEXT PK | {airflow_run_id}_{model_type} — unique per model per run |
model_type |
TEXT | prophet or lstm |
run_yearmonth |
TEXT | e.g. 2026-06 |
status |
TEXT | awaiting_review / deployed / superseded |
record_count |
INTEGER | Total rows in the training dataset |
months_covered |
TEXT | JSON array of year_month values in the training data |
accuracy / f1_score / auc_roc |
FLOAT | Model metrics from metrics.json |
staging_pkl_key |
TEXT | DO Spaces path, e.g. models/staging/2026-06/prophet/model.pkl |
deployed_at / deployed_by |
TIMESTAMPTZ / TEXT | Set on manual deploy |
SELECT * FROM match_jobs(
query_embedding => '<384-dim vector>',
match_count => 10,
match_threshold => 0.7
);| Variable | Description | How to generate |
|---|---|---|
DATABASE_URL |
Supabase PostgreSQL connection string | Supabase dashboard → Project Settings → Database → URI |
AIRFLOW_FERNET_KEY |
Encryption key for Airflow connections | python -c "from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())" |
AIRFLOW_SECRET_KEY |
Flask secret key for Airflow webserver | python -c "import secrets; print(secrets.token_hex(32))" |
AIRFLOW_DB_PASSWORD |
Password for internal Airflow metadata Postgres | Any strong password |
| Variable | Default | Description |
|---|---|---|
AIRFLOW_PORT |
8080 |
Host port for the Airflow webserver |
_AIRFLOW_WWW_USER_USERNAME |
admin |
Admin UI username |
_AIRFLOW_WWW_USER_PASSWORD |
changeme |
Admin UI password — change this |
AIRFLOW_ADMIN_EMAIL |
admin@venturesope.com |
Admin email |
AIRFLOW_UID |
50000 |
Host UID for volume permissions; set to $(id -u) on Linux |
| Variable | Description |
|---|---|
AFRIWORK_BEARER_TOKEN |
Bearer token for authenticated Afriwork API access |
GROQ_API_KEY |
Groq LLM for skills/title fallback extraction |
GEMINI_API_KEY |
Gemini LLM fallback (secondary to Groq) |
ENABLE_LLM_FALLBACK |
true to enable LLM extraction (default: false) |
DO_SPACES_BUCKET |
DO Spaces bucket for Parquet backups |
DO_SPACES_ENDPOINT |
DO Spaces endpoint URL (e.g. https://ams3.digitaloceanspaces.com) |
DO_SPACES_KEY |
DO Spaces access key |
DO_SPACES_SECRET |
DO Spaces secret key |
| Variable | Description |
|---|---|
KAGGLE_USERNAME |
Kaggle account username |
KAGGLE_API_KEY |
Kaggle API key (from Kaggle account settings) |
KAGGLE_DATASET_SLUG |
Kaggle dataset identifier, e.g. your-org/venturesope-training-data |
KAGGLE_NOTEBOOK_SLUG |
Kaggle notebook identifier, e.g. your-org/venturesope-model-training |
FASTAPI_INTERNAL_URL |
Internal URL for admin notifications, e.g. http://your-service:8000 |
# Install dependencies locally
pip install torch --index-url https://download.pytorch.org/whl/cpu
pip install -r requirements.txt
python -m spacy download en_core_web_sm
# Run all tests
python -m pytest Job_pipeline/tests/ -v
# Or with unittest
python -m unittest discover -s Job_pipeline/tests -vYou can run the preprocessing pipeline standalone (useful for testing or backfill):
python Job_pipeline/run_preprocessing_pipeline.py
# Options:
python Job_pipeline/run_preprocessing_pipeline.py --raw-dir Job_pipeline/data/raw --processed-dir Job_pipeline/data/processed
python Job_pipeline/run_preprocessing_pipeline.py --max-rows 100
python Job_pipeline/run_preprocessing_pipeline.py --enable-llm-fallbackOr the full scrape → preprocess → load sequence:
python Job_pipeline/run_pipeline.py
python Job_pipeline/run_pipeline.py --skip-scrape # preprocess + load only
python Job_pipeline/run_pipeline.py --analytics # run analytics queries after loadgit clone https://github.com/VentureScope/CareerCompass.git
cd CareerCompass
cp .env.example .env
nano .env # fill in all values
# Set UID to your user
echo "AIRFLOW_UID=$(id -u)" >> .env
# Prepare directories
mkdir -p data logs plugins
sudo chown $(id -u):0 data logs dags
# Initialize schema (one-time)
psql "$DATABASE_URL" -f Job_pipeline/db/schema.sql
# Build and start
docker compose build
docker compose up airflow-init
docker compose up -dAccess the UI at http://<droplet-ip>:8080.
- Add a scrape function to
Job_pipeline/scrape_jobs.pyfollowing thescrape_hahu_tech()pattern - Fill in the
fetch_clean_source_c()task indags/job_data_pipeline.py(or add a new task following the same pattern) - Add the new task to the fan-in list before
merge_and_load
No DAG restructuring needed — the ALL_DONE trigger rule handles partial failures gracefully.
Proprietary — VentureScope.