Skip to content

VentureScope/CareerCompass

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

19 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

CareerCompass

Automated job market intelligence pipeline for the Ethiopian tech sector. Scrapes job listings from multiple sources, preprocesses them through a 10-step NLP pipeline, stores structured data and vector embeddings in Supabase, and trains ML models monthly via Kaggle — all orchestrated by Apache Airflow.

Architecture

Every 12 hours (06:00 & 18:00 UTC):

[fetch_clean_afriwork]  ──┐
[fetch_clean_hahu]      ──┼──▶ [merge_and_load] ──▶ [generate_embeddings] ──▶ [log_run_status]
[fetch_clean_source_c]  ──┘    preprocess +           updates jobs.embedding
                               DO Spaces backup +      (sentence-transformers,
                               Supabase upsert         non-fatal)

1st of each month (02:00 UTC):

[read_training_data] ──▶ [push_to_kaggle] ──▶ [trigger_kaggle_run] ──▶ [poll_kaggle]
                                                                               │
                              ┌────────────────────────────────────────────────┘
                              ▼                                ▼
                  [fetch_artifact_prophet]        [fetch_artifact_lstm]
                  [upload_to_staging_prophet]     [upload_to_staging_lstm]
                  [log_training_run_prophet]      [log_training_run_lstm]
                              └────────────────────┬───────────┘
                                                   ▼
                                            [notify_admin]

Both DAGs write to Supabase (PostgreSQL + pgvector):

┌───────────────────────────────────────────┐
│           Supabase (PostgreSQL)           │
│                                           │
│  jobs              – structured data +   │
│                      vector(384) embed.  │
│  raw_job_postings  – raw scraped data    │
│  ml_training_runs  – model training log  │
│  match_jobs()      – semantic search fn  │
└───────────────────────────────────────────┘

Project Structure

CareerCompass/
├── dags/
│   ├── job_data_pipeline.py          # ETL DAG — runs every 12h
│   └── monthly_training_pipeline.py  # ML training DAG — runs 1st of month
├── Job_pipeline/
│   ├── scrape_jobs.py                # GraphQL scrapers (Afriwork, Hahu)
│   ├── run_preprocessing_pipeline.py # Batch preprocessing runner
│   ├── load_to_postgres.py           # CSV → PostgreSQL loader
│   ├── run_pipeline.py               # End-to-end CLI runner
│   ├── preprocessing/                # NLP pipeline modules (steps 0–10)
│   ├── taxonomy/
│   │   ├── roles.json                # 21 canonical job roles
│   │   └── skills.json               # ~130+ tech skills
│   ├── db/
│   │   └── schema.sql                # Supabase schema (run once)
│   └── tests/                        # Unit tests for each pipeline step
├── data/
│   ├── raw/                          # Scraped CSVs (mounted into container)
│   └── processed/                    # Preprocessed CSVs + merged Parquet
├── Dockerfile                        # apache/airflow:2.10.4-python3.11
├── docker-compose.yml                # LocalExecutor setup
├── requirements.txt
├── .env.example                      # Template — copy to .env
└── .gitignore

Prerequisites

Before you run anything, you need:

  • Docker Desktop (includes Docker Compose)
  • A Supabase project with the pgvector extension enabled
    • Enable it in the Supabase dashboard: Database → Extensions → vector
  • psql client (to run the schema script)
    • On macOS: brew install libpq
    • On Ubuntu: sudo apt install postgresql-client
    • On Windows: install PostgreSQL and use the bundled psql

Optional services (pipeline degrades gracefully without them):

  • DigitalOcean Spaces bucket (Parquet backups + training data source)
  • Groq API key (LLM fallback for skills/title extraction)
  • Gemini API key (secondary LLM fallback)
  • Kaggle account (monthly ML training pipeline)
  • FastAPI service (admin notifications from notify_admin task)
  • Afriwork bearer token (authenticated scraping; public endpoints work without it)

Setup

Step 1 — Clone and configure

git clone https://github.com/VentureScope/CareerCompass.git
cd CareerCompass
cp .env.example .env

Edit .env and fill in your values. The minimum required set:

# Supabase connection string (from Project Settings → Database → Connection string → URI)
DATABASE_URL=postgresql://postgres.<project-ref>:<password>@aws-1-<region>.pooler.supabase.com:5432/postgres

# Generate with: python -c "from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())"
AIRFLOW_FERNET_KEY=

# Generate with: python -c "import secrets; print(secrets.token_hex(32))"
AIRFLOW_SECRET_KEY=

# Any strong password for the internal Airflow metadata Postgres container
AIRFLOW_DB_PASSWORD=

Everything else in .env is optional — see the Environment Variables Reference table below.


Step 2 — Initialize the Supabase schema

This is a one-time step. Run the schema script against your Supabase database to create all tables, indexes, functions, and views:

psql "$DATABASE_URL" -f Job_pipeline/db/schema.sql

On Windows (PowerShell), set the variable inline:

$env:DATABASE_URL="postgresql://postgres.<ref>:<pass>@aws-1-<region>.pooler.supabase.com:5432/postgres"
psql $env:DATABASE_URL -f Job_pipeline/db/schema.sql

This creates:

Object Type Purpose
jobs Table Structured job data + vector(384) embeddings
raw_job_postings Table Raw scraped data before preprocessing
ml_training_runs Table Monthly ML training run log
match_jobs() Function Semantic similarity search over jobs.embedding
top_trending_jobs View Top roles posted in the last 30 days
monthly_job_counts View Posting counts per role per month

The schema is idempotent (CREATE TABLE IF NOT EXISTS, CREATE INDEX IF NOT EXISTS, CREATE OR REPLACE). Safe to re-run.


Step 3 — Prepare directories

Docker mounts ./data, ./logs, ./dags, and ./plugins into the containers. The Airflow process runs as UID 50000 inside the container.

Linux / macOS:

mkdir -p data logs plugins
sudo chown 50000:0 data logs dags

Windows (Docker Desktop with WSL2):

mkdir -p data logs plugins
# No chown needed — Docker Desktop handles permissions transparently

Step 4 — Build the Docker image

docker compose build

This installs PyTorch (CPU-only), all requirements.txt packages, and downloads the en_core_web_sm spaCy model. The first build takes several minutes.


Step 5 — Initialize Airflow (one-time)

docker compose up airflow-init

This runs airflow db migrate and creates the admin user defined in your .env:

_AIRFLOW_WWW_USER_USERNAME=admin
_AIRFLOW_WWW_USER_PASSWORD=changeme-on-first-login

Wait for the container to exit cleanly (exit code 0) before proceeding.


Step 6 — Start Airflow

docker compose up -d airflow-webserver airflow-scheduler

Check that both containers are healthy:

docker compose ps

Both airflow-webserver and airflow-scheduler should show healthy after ~30–60 seconds.


Step 7 — Access the UI

Open http://localhost:8080 and log in with the credentials from your .env.

You will see two DAGs, both paused by default:

DAG Schedule Purpose
job_data_pipeline Every 12h (0 6,18 * * * UTC) Scrape → preprocess → embed → load to Supabase
monthly_training_pipeline 1st of month (0 2 1 * * UTC) Export training data → Kaggle → poll → staging → notify

Unpause a DAG or trigger a manual run via the Play button.


DAG Reference

job_data_pipeline — ETL (every 12h)

[fetch_clean_afriwork]  ──┐
[fetch_clean_hahu]      ──┼──▶ [merge_and_load] ──▶ [generate_embeddings] ──▶ [log_run_status]
[fetch_clean_source_c]  ──┘
Task What it does
fetch_clean_afriwork Scrapes Afriwork GraphQL API; runs 10-step NLP preprocessing; saves to data/raw/ and data/processed/
fetch_clean_hahu Same for Hahu GraphQL API
fetch_clean_source_c Placeholder for a third source
merge_and_load Merges processed CSVs; writes Parquet to DO Spaces; upserts structured data into jobs table
generate_embeddings Encodes description with all-MiniLM-L6-v2; writes 384-dim vectors to jobs.embedding
log_run_status Always runs last; logs a summary of all task outcomes

Fan-out behavior: the three fetch_clean_* tasks run in parallel. One failure doesn't block the others (trigger_rule=ALL_DONE on merge_and_load).


monthly_training_pipeline — ML training (1st of month)

[read_training_data] ──▶ [push_to_kaggle] ──▶ [trigger_kaggle_run] ──▶ [poll_kaggle]
                                                                               │
                              ┌────────────────────────────────────────────────┘
                              ▼                                ▼
                  [fetch_artifact_prophet]        [fetch_artifact_lstm]
                  [upload_to_staging_prophet]     [upload_to_staging_lstm]
                  [log_training_run_prophet]      [log_training_run_lstm]
                              └────────────────────┬───────────┘
                                                   ▼
                                            [notify_admin]
Task What it does
read_training_data Lists and fetches all Parquet files under data/cleaned/ in DO Spaces (full history, paginated). Deduplicates on job_id. Writes merged CSV to model_staging/.
push_to_kaggle Zips the full-history CSV and uploads it as a new Kaggle dataset version (KAGGLE_DATASET_SLUG). Each monthly run adds a new version.
trigger_kaggle_run Triggers the Kaggle notebook (KAGGLE_NOTEBOOK_SLUG) via the Kaggle API. Auto-run must be OFF in notebook settings.
poll_kaggle Polls the notebook every 5 min until complete or error. retries=0 — if the notebook fails, Airflow surfaces it to the admin rather than re-triggering. Timeout: 4 hours.
fetch_artifact_prophet Downloads the Kaggle output zip; extracts prophet_model.pkl and reads metrics.json["prophet"].
fetch_artifact_lstm Same — extracts lstm_model.pkl and reads metrics.json["lstm"].
upload_to_staging_prophet Uploads prophet/model.pkl + prophet/metadata.json to models/staging/YYYY-MM/prophet/ in DO Spaces.
upload_to_staging_lstm Same for models/staging/YYYY-MM/lstm/.
log_training_run_prophet Inserts a row into ml_training_runs with model_type=prophet, status=awaiting_review. run_id is {airflow_run_id}_prophet.
log_training_run_lstm Same with model_type=lstm. run_id is {airflow_run_id}_lstm.
notify_admin POSTs to FASTAPI_INTERNAL_URL/admin/notifications with both models' metrics combined. trigger_rule=ALL_DONE — always fires.

Note: the pipeline does not auto-deploy. An admin reviews both models' metrics in the dashboard and manually deploys via the FastAPI service.

Kaggle notebook output contract

The notebook must write these files to its working directory before completing:

File Required Description
prophet_model.pkl Yes Serialized Prophet model (pickle.dump)
lstm_model.pkl Yes Serialized LSTM model (pickle.dump or torch.save)
metrics.json Recommended Nested metrics for both models (see structure below)

metrics.json structure:

{
  "prophet": {
    "accuracy": 0.87,
    "f1_score": 0.84,
    "auc_roc":  0.91,
    "class_balance": { "Software Engineer": 312, "Data Scientist": 198 }
  },
  "lstm": {
    "accuracy": 0.89,
    "f1_score": 0.86,
    "auc_roc":  0.93,
    "class_balance": { "Software Engineer": 298, "Data Scientist": 201 }
  }
}

The notebook receives the full historical dataset at:

/kaggle/input/<dataset-slug>/training_data_YYYY-MM.csv

Columns: job_id, normalized_title, year_month, month, posted_date, skills, city, country, is_remote, job_type, education_level, source, ...

DO Spaces staging layout (after each run)

models/staging/YYYY-MM/
  prophet/
    model.pkl
    metadata.json
  lstm/
    model.pkl
    metadata.json

Preprocessing Pipeline

Each job listing passes through these steps in order:

Step Module What it does
0 tech_job_validation Filters out non-tech postings using taxonomy keyword matching
1 clean_text Strips HTML, normalizes Unicode, removes boilerplate
2 job_id Generates deterministic 16-char SHA256 ID from title + company + date + source
3 date_features Extracts year_month, month, week, quarter, holiday_flag (Ethiopia holidays)
4 title_normalization Maps titles to canonical forms via all-MiniLM-L6-v2 cosine similarity + taxonomy
5 description_embedding Generates 384-dim DescriptionVec with all-MiniLM-L6-v2
6 location_extraction Rule/regex → spaCy NER (en_core_web_sm) → Gemini fallback → city/region/country
7 remote_detection Structured hint → keyword scoring → Gemini → is_remote bool + remote_mode
8 job_type_extraction Rule/regex → Gemini → full_time/part_time/internship/contractual/...
9 education_extraction Rule/regex → Gemini → PhD/Masters/Bachelors/Diploma/Not specified
10 skills_extraction Taxonomy embedding match + mention boost → up to 12 skills; Gemini fallback

LLM fallback (Groq primary → Gemini secondary) is disabled by default. Enable with ENABLE_LLM_FALLBACK=true in .env.


Database Schema

The schema lives in Job_pipeline/db/schema.sql and must be applied once before the first run (see Step 2).

jobs table (primary store)

Column Type Notes
id VARCHAR(36) PK UUID
job_id VARCHAR(255) UNIQUE SHA256-derived deterministic ID
company_name VARCHAR(500)
job_title VARCHAR(500) Raw title from source
normalized_title VARCHAR(500) Canonical form from taxonomy
description TEXT Cleaned text
embedding vector(384) sentence-transformers output; HNSW index
city VARCHAR(255)
region VARCHAR(255)
country VARCHAR(100)
is_remote BOOLEAN
job_type VARCHAR(100) full_time / part_time / etc.
education_level VARCHAR(255)
skills JSONB Array of skill names; GIN-indexed
source VARCHAR(50) afriwork / hahu / etc.
posted_date TIMESTAMPTZ
year_month TEXT e.g. 2026-05

ml_training_runs table

One row per model per monthly run (e.g. two rows per run: one for prophet, one for lstm).

Column Type Notes
run_id TEXT PK {airflow_run_id}_{model_type} — unique per model per run
model_type TEXT prophet or lstm
run_yearmonth TEXT e.g. 2026-06
status TEXT awaiting_review / deployed / superseded
record_count INTEGER Total rows in the training dataset
months_covered TEXT JSON array of year_month values in the training data
accuracy / f1_score / auc_roc FLOAT Model metrics from metrics.json
staging_pkl_key TEXT DO Spaces path, e.g. models/staging/2026-06/prophet/model.pkl
deployed_at / deployed_by TIMESTAMPTZ / TEXT Set on manual deploy

Semantic search

SELECT * FROM match_jobs(
    query_embedding => '<384-dim vector>',
    match_count     => 10,
    match_threshold => 0.7
);

Environment Variables Reference

Required

Variable Description How to generate
DATABASE_URL Supabase PostgreSQL connection string Supabase dashboard → Project Settings → Database → URI
AIRFLOW_FERNET_KEY Encryption key for Airflow connections python -c "from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())"
AIRFLOW_SECRET_KEY Flask secret key for Airflow webserver python -c "import secrets; print(secrets.token_hex(32))"
AIRFLOW_DB_PASSWORD Password for internal Airflow metadata Postgres Any strong password

Airflow UI

Variable Default Description
AIRFLOW_PORT 8080 Host port for the Airflow webserver
_AIRFLOW_WWW_USER_USERNAME admin Admin UI username
_AIRFLOW_WWW_USER_PASSWORD changeme Admin UI password — change this
AIRFLOW_ADMIN_EMAIL admin@venturesope.com Admin email
AIRFLOW_UID 50000 Host UID for volume permissions; set to $(id -u) on Linux

ETL pipeline (optional)

Variable Description
AFRIWORK_BEARER_TOKEN Bearer token for authenticated Afriwork API access
GROQ_API_KEY Groq LLM for skills/title fallback extraction
GEMINI_API_KEY Gemini LLM fallback (secondary to Groq)
ENABLE_LLM_FALLBACK true to enable LLM extraction (default: false)
DO_SPACES_BUCKET DO Spaces bucket for Parquet backups
DO_SPACES_ENDPOINT DO Spaces endpoint URL (e.g. https://ams3.digitaloceanspaces.com)
DO_SPACES_KEY DO Spaces access key
DO_SPACES_SECRET DO Spaces secret key

Monthly training pipeline (optional)

Variable Description
KAGGLE_USERNAME Kaggle account username
KAGGLE_API_KEY Kaggle API key (from Kaggle account settings)
KAGGLE_DATASET_SLUG Kaggle dataset identifier, e.g. your-org/venturesope-training-data
KAGGLE_NOTEBOOK_SLUG Kaggle notebook identifier, e.g. your-org/venturesope-model-training
FASTAPI_INTERNAL_URL Internal URL for admin notifications, e.g. http://your-service:8000

Running Tests

# Install dependencies locally
pip install torch --index-url https://download.pytorch.org/whl/cpu
pip install -r requirements.txt
python -m spacy download en_core_web_sm

# Run all tests
python -m pytest Job_pipeline/tests/ -v

# Or with unittest
python -m unittest discover -s Job_pipeline/tests -v

Running the Pipeline Without Airflow

You can run the preprocessing pipeline standalone (useful for testing or backfill):

python Job_pipeline/run_preprocessing_pipeline.py

# Options:
python Job_pipeline/run_preprocessing_pipeline.py --raw-dir Job_pipeline/data/raw --processed-dir Job_pipeline/data/processed
python Job_pipeline/run_preprocessing_pipeline.py --max-rows 100
python Job_pipeline/run_preprocessing_pipeline.py --enable-llm-fallback

Or the full scrape → preprocess → load sequence:

python Job_pipeline/run_pipeline.py
python Job_pipeline/run_pipeline.py --skip-scrape     # preprocess + load only
python Job_pipeline/run_pipeline.py --analytics       # run analytics queries after load

Deployment (DigitalOcean Droplet)

git clone https://github.com/VentureScope/CareerCompass.git
cd CareerCompass
cp .env.example .env
nano .env   # fill in all values

# Set UID to your user
echo "AIRFLOW_UID=$(id -u)" >> .env

# Prepare directories
mkdir -p data logs plugins
sudo chown $(id -u):0 data logs dags

# Initialize schema (one-time)
psql "$DATABASE_URL" -f Job_pipeline/db/schema.sql

# Build and start
docker compose build
docker compose up airflow-init
docker compose up -d

Access the UI at http://<droplet-ip>:8080.


Adding a New Scraping Source

  1. Add a scrape function to Job_pipeline/scrape_jobs.py following the scrape_hahu_tech() pattern
  2. Fill in the fetch_clean_source_c() task in dags/job_data_pipeline.py (or add a new task following the same pattern)
  3. Add the new task to the fan-in list before merge_and_load

No DAG restructuring needed — the ALL_DONE trigger rule handles partial failures gracefully.


License

Proprietary — VentureScope.

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors