-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathlambda_function2.py
More file actions
250 lines (221 loc) · 12.4 KB
/
lambda_function2.py
File metadata and controls
250 lines (221 loc) · 12.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
import os, json, base64, logging, pathlib, boto3, numpy as np
from concurrent.futures import ThreadPoolExecutor, as_completed
from opensearchpy import OpenSearch, RequestsHttpConnection, AWSV4SignerAuth
from supabase import create_client
from typing import List
from dotenv import load_dotenv
load_dotenv()
# ╔══════════════════════════════════════════════════════════════════╗
# ║ 1. CONFIGURATION ║
# ╚══════════════════════════════════════════════════════════════════╝
S3_BUCKET_FRAMES = os.getenv("S3_BUCKET_FRAMES", "oriane-contents")
MODEL_ID = os.getenv("MODEL_ID", "amazon.titan-embed-image-v1")
AWS_REGION = os.getenv("AWS_REGION", "us-east-1")
EMB_DIM = int(os.getenv("EMB_DIM", "1024"))
MAX_BATCH = int(os.getenv("MAX_FRAMES_PER_BATCH", "20"))
CONCURRENCY_LIMIT = int(os.getenv("CONCURRENCY_LIMIT", "4"))
SUPABASE_URL = os.getenv("SUPABASE_URL")
SUPABASE_KEY = os.getenv("SUPABASE_KEY")
OS_ENDPOINT = os.getenv("OS_ENDPOINT")
OS_PASS = os.getenv("OS_PASS", "")
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper()
# ╔══════════════════════════════════════════════════════════════════╗
# ║ 2. CLIENTS & LOGGING ║
# ╚══════════════════════════════════════════════════════════════════╝
# 2‑a root logger for libraries (no %(code)s)
logging.basicConfig(
level=getattr(logging, LOG_LEVEL, logging.INFO),
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
)
for h in logging.getLogger().handlers:
if "%(code)" in h.formatter._fmt:
h.setFormatter(logging.Formatter("%(asctime)s %(levelname)s %(name)s: %(message)s"))
# 2‑b dedicated logger for your video jobs (adds [code])
_embed_log = logging.getLogger("embed")
_code_fmt = logging.Formatter("%(asctime)s %(levelname)s [%(code)s] %(message)s")
_code_hdlr = logging.StreamHandler()
_code_hdlr.setFormatter(_code_fmt)
_embed_log.addHandler(_code_hdlr)
_embed_log.propagate = False # prevents library logs from using this fmt
def get_log(code="-"):
return logging.LoggerAdapter(_embed_log, {"code": code})
# AWS / DB clients
s3 = boto3.client("s3")
bedrock = boto3.client("bedrock-runtime", region_name=AWS_REGION)
supabase = create_client(SUPABASE_URL, SUPABASE_KEY)
def _host_only(url: str) -> str:
if url and url.startswith("http"):
return url.split("://", 1)[1].rstrip("/") # drop scheme + trailing /
return url
host_only = _host_only(OS_ENDPOINT)
if not host_only:
raise RuntimeError("OS_ENDPOINT env var is missing or invalid")
credentials = boto3.Session().get_credentials()
signer = AWSV4SignerAuth(credentials, AWS_REGION, service="aoss")
os_client = OpenSearch(
hosts=[{"host": host_only, "port": 443}],
http_auth=signer,
use_ssl=True,
connection_class=RequestsHttpConnection,
timeout=40,
)
# ╔══════════════════════════════════════════════════════════════════╗
# ║ 3. SUPABASE HELPERS ║
# ╚══════════════════════════════════════════════════════════════════╝
def mark_video(code: str, **fields):
supabase.table("insta_content").update(fields).eq("code", code).execute()
def log_frame_error(code: str, frame: int, msg: str):
supabase.table("embedding_errors").insert(
{"code": code, "frame": frame, "error": msg}
).execute()
# ╔══════════════════════════════════════════════════════════════════╗
# ║ 4. S3 & BEDROCK HELPERS ║
# ╚══════════════════════════════════════════════════════════════════╝
def list_frame_keys(platform, code, extension = '.jpg'):
pref = f"{platform}/{code}/frames/"
for p in s3.get_paginator("list_objects_v2").paginate(Bucket=S3_BUCKET_FRAMES, Prefix=pref):
for o in p.get("Contents", []):
if o["Key"].endswith(extension):
yield o["Key"]
def titan_embed(img_b64s: List[str]) -> List[List[float]]:
"""
One Bedrock call per frame (the model is single‑request only).
"""
vecs: List[List[float]] = []
for b64 in img_b64s:
body = json.dumps({
"inputImage": b64,
"embeddingConfig": {
"outputEmbeddingLength": EMB_DIM
}
})
rsp = bedrock.invoke_model(
modelId=MODEL_ID,
body=body,
accept="application/json",
contentType="application/json",
)
vecs.append(json.loads(rsp["body"].read())["embedding"])
return vecs
# ╔══════════════════════════════════════════════════════════════════╗
# ║ 5. OPENSEARCH HELPERS ║
# ╚══════════════════════════════════════════════════════════════════╝
def bulk_gen(index: str, docs: list[dict]):
"""
Yields bulk actions. If a doc contains an `_id` field we pop it out
so the action line can specify it; afterwards we re‑add it so the
caller can still use the dict.
"""
for d in docs:
yield {"index": {"_index": index, "_id": d["_id"]}}
yield d
def write_bulk(index: str, docs: list[dict], log):
"""
Executes bulk with `refresh=wait_for` so the docs are visible right
after the call, and raises if any individual item failed.
"""
if not docs:
return
resp = os_client.bulk(bulk_gen(index, docs), refresh=False)
if resp.get("errors"):
# show the first few item errors to the logs
for item in resp["items"][:5]:
err = item["index"].get("error")
if err:
log.error("bulk‑item error: %s", err)
raise RuntimeError(f"bulk to {index} had errors")
def delete_by_video_id(index: str, video_id: str):
os_client.delete_by_query(
index=index,
body={"query": {"term": {"video_id": video_id}}},
wait_for_completion=True,
ignore=[404] # nothing to delete the first time
)
# ╔══════════════════════════════════════════════════════════════════╗
# ║ 6. PER‑VIDEO PIPELINE ║
# ╚══════════════════════════════════════════════════════════════════╝
def embed_video(platform: str, code: str, log):
"""
• Embeds every frame in the S3 prefix → writes to `video_frames`
• Builds a per‑video summary → writes to `videos`
• Re‑runs are idempotent: previous docs with the same logical key
(`video_id`) are removed first.
"""
# ── locate all frame files ──────────────────────────────────────
keys = list(list_frame_keys(platform, code))
if not keys:
raise RuntimeError(f"No frames found for {code}")
video_id = f"{platform}.{code}" # logical key we control
frame_docs = []
vecs = []
# ── embed frames in chunks ──────────────────────────────────────
for i in range(0, len(keys), MAX_BATCH):
chunk = keys[i : i + MAX_BATCH]
try:
embeds = titan_embed([
base64.b64encode(
s3.get_object(Bucket=S3_BUCKET_FRAMES, Key=k)["Body"].read()
).decode()
for k in chunk
])
except Exception as e:
for k in chunk:
log_frame_error(code, int(pathlib.Path(k).stem), str(e))
raise
# build docs for this chunk
for k, v in zip(chunk, embeds):
fno = int(pathlib.Path(k).stem)
frame_docs.append({
"_id": f"{video_id}#{fno}",
"video_id" : video_id,
"vector" : v,
"platform" : platform,
"code" : code,
"frame" : fno,
})
vecs.append(v)
# ── make the operation idempotent ──────────────────────────────
# (remove any previous docs that belong to this video)
# delete_by_video_id("video_frames", video_id)
# delete_by_video_id("videos", video_id)
# ── write frame docs ────────────────────────────────────────────
write_bulk("video_frames", frame_docs, log)
# ── write summary doc ───────────────────────────────────────────
summary_doc = {
"_id": video_id,
"video_id": video_id,
"vector" : np.mean(vecs, axis=0).tolist(),
"platform": platform,
"code" : code,
"frames" : len(vecs),
}
write_bulk("videos", [summary_doc], log)
# ── flag completion in Supabase ─────────────────────────────────
mark_video(code, is_embedded=True)
log.info("✓ indexed %s frames", len(vecs))
# ╔══════════════════════════════════════════════════════════════════╗
# ║ 7. PER‑RECORD WRAPPER ║
# ╚══════════════════════════════════════════════════════════════════╝
def already_embedded(code: str) -> bool:
resp = supabase.table("insta_content") \
.select("is_embedded") \
.eq("code", code).single().execute()
return bool(resp.data and resp.data["is_embedded"])
def process_record(rec):
body = json.loads(rec["body"])
platform, code = body["platform"], body["code"]
if already_embedded(code):
return {"code": code, "status": "skipped (already embedded)"}
log = get_log(code)
log.info("⏩ start")
embed_video(platform, code, log)
return {"code": code, "status": "done"}
# ╔══════════════════════════════════════════════════════════════════╗
# ║ 8. LAMBDA HANDLER ║
# ╚══════════════════════════════════════════════════════════════════╝
def lambda_handler(event, _ctx):
with ThreadPoolExecutor(max_workers=CONCURRENCY_LIMIT) as pool:
futs = [pool.submit(process_record, r) for r in event.get("Records", [])]
return {"status": "completed",
"results": [f.result() for f in as_completed(futs)]}
os_client.indices.refresh(index)