-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathlambda_function.py
More file actions
217 lines (194 loc) · 8.39 KB
/
lambda_function.py
File metadata and controls
217 lines (194 loc) · 8.39 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
import os
import json
import time
import logging
import boto3
import hashlib
import uuid
from supabase import create_client
# Configure logging according to DEBUG flag from .env
DEBUG = os.getenv("DEBUG", "False").lower() in ("true", "1", "yes")
logging.basicConfig(level=logging.DEBUG if DEBUG else logging.INFO)
# Model identifier
MODEL = "FCM"
# Load environment variables for Supabase and S3
SUPABASE_URL = os.getenv("SUPABASE_URL")
SUPABASE_KEY = os.getenv("SUPABASE_KEY")
S3_BUCKET = os.getenv("S3_BUCKET", "oriane-contents")
# Initialize Supabase and boto3 S3 clients
supabase = create_client(SUPABASE_URL, SUPABASE_KEY)
s3 = boto3.client("s3")
def get_frames_from_s3(platform, shortcode, extension):
"""
Lists and downloads video frames from the S3 bucket.
Expected S3 path: <platform>/<shortcode>/frames/0.<extension> ... n.<extension>
"""
prefix = f"{platform}/{shortcode}/frames/"
logging.debug(f"Listing objects in bucket {S3_BUCKET} with prefix {prefix}")
response = s3.list_objects_v2(Bucket=S3_BUCKET, Prefix=prefix)
frames = []
if 'Contents' in response:
objects = response['Contents']
def extract_frame_number(obj):
key = obj["Key"]
filename = key.split("/")[-1]
try:
return int(filename.split(".")[0])
except Exception:
return -1
objects = sorted(objects, key=extract_frame_number)
for obj in objects:
key = obj["Key"]
try:
file_obj = s3.get_object(Bucket=S3_BUCKET, Key=key)
frame_data = file_obj["Body"].read()
frames.append(frame_data)
except Exception as e:
logging.error(f"Error fetching frame {key}: {e}")
else:
logging.warning(f"No frames found in S3 for prefix {prefix}")
return frames
def compare_frames(frame1, frame2):
"""
Dummy similarity function that computes a similarity value based on
the SHA-256 hash differences between the two frames.
This function computes the Hamming distance between the two hash digests
(i.e. the number of differing bits) and then normalizes it to a [0, 1] range.
A result closer to 1 indicates higher similarity.
"""
hash1 = hashlib.sha256(frame1).digest()
hash2 = hashlib.sha256(frame2).digest()
# Calculate the Hamming distance between the two hash digests.
hamming_distance = sum(bin(b1 ^ b2).count("1") for b1, b2 in zip(hash1, hash2))
similarity = 1 - (hamming_distance / 256)
return similarity
def process_message(data):
"""
Processes a single payload dictionary.
"""
start_time = time.time()
job_id = data.get("job_id")
if not job_id:
job_id = str(uuid.uuid4())
job_insert_response = supabase.table("ai_jobs").insert({"job_id": job_id}).execute()
if job_insert_response.error:
raise Exception(f"Error inserting job: {job_insert_response.error}")
logging.info(f"Generated new job_id: {job_id}")
monitored_shortcode = data.get("monitored_shortcode")
if not monitored_shortcode:
raise Exception("monitored_shortcode is required")
watched_shortcodes = data.get("watched_shortcodes", [])
platform = data.get("platform", "instagram")
extension = data.get("extension", "jpg")
logging.info(f"Job {job_id}: Processing monitored shortcode {monitored_shortcode}.")
# Verify that the monitored shortcode exists and is flagged as monitored.
supabase_response = supabase.table("insta_content") \
.select("*") \
.eq("code", monitored_shortcode) \
.eq("is_monitored", True) \
.execute()
if not supabase_response.data:
logging.error(f"Monitored shortcode {monitored_shortcode} not found or not monitored.")
return {"error": "Monitored shortcode not found", "job_id": job_id}
logging.info(f"Monitored shortcode {monitored_shortcode} verified.")
# Fetch frames for the monitored video.
monitored_frames = get_frames_from_s3(platform, monitored_shortcode, extension)
if not monitored_frames:
logging.error("No frames found for monitored video.")
return {"error": "Frames for monitored video not found", "job_id": job_id}
logging.info(f"Fetched {len(monitored_frames)} frames for monitored video {monitored_shortcode}.")
results_summary = []
for watched_shortcode in watched_shortcodes:
logging.info(f"Processing watched shortcode {watched_shortcode}.")
# Skip if already processed.
check_response = supabase.table("ai_results") \
.select("id") \
.eq("monitored_video", monitored_shortcode) \
.eq("watched_video", watched_shortcode) \
.eq("model", MODEL) \
.execute()
if check_response.data:
logging.info(f"Comparison between {monitored_shortcode} and {watched_shortcode} exists. Skipping.")
continue
watched_frames = get_frames_from_s3(platform, watched_shortcode, extension)
if not watched_frames:
logging.error(f"No frames found for watched video {watched_shortcode}. Skipping.")
continue
logging.info(f"Fetched {len(watched_frames)} frames for watched video {watched_shortcode}.")
frame_results = []
similarity_sum = 0.0
comparison_count = 0
max_similarity = 0.0
start_time_video = time.time()
for i in range(len(monitored_frames)):
for j in range(len(watched_frames)):
sim = compare_frames(monitored_frames[i], watched_frames[j])
frame_results.append({
"monitored_frame": i,
"watched_frame": j,
"similarity": sim
})
similarity_sum += sim
comparison_count += 1
if sim > max_similarity:
max_similarity = sim
avg_similarity = similarity_sum / comparison_count if comparison_count > 0 else 0
processed_in_secs = time.time() - start_time_video
record = {
"job_id": job_id,
"model": MODEL,
"monitored_video": monitored_shortcode,
"watched_video": watched_shortcode,
"avg_similarity": avg_similarity,
"processed_in_secs": processed_in_secs,
"frame_results": frame_results,
"max_similarity": max_similarity
}
logging.info(f"Inserting result for watched video {watched_shortcode}.")
insert_response = supabase.table("ai_results").insert(record).execute()
logging.debug(f"Insert response: {insert_response}")
results_summary.append(record)
total_time = time.time() - start_time
logging.info(f"Job {job_id} completed in {total_time:.2f} seconds.")
return {
"job_id": job_id,
"message": "Processing completed",
"results": results_summary,
"total_time": total_time
}
def lambda_handler(event, context):
start_time = time.time()
try:
results = []
# Check if the event is from SQS (contains "Records")
if "Records" in event:
for record in event["Records"]:
body = record.get("body")
if not body:
logging.error("Record without body found, skipping")
continue
# Parse the JSON message from SQS
data = json.loads(body) if isinstance(body, str) else body
result = process_message(data)
results.append(result)
else:
# Assume direct invocation
data = event.get("body", event)
if isinstance(data, str):
data = json.loads(data)
results.append(process_message(data))
total_time = time.time() - start_time
return {
"statusCode": 200,
"body": json.dumps({
"message": "Processing completed",
"results": results,
"total_time": total_time
})
}
except Exception as e:
logging.exception("An error occurred during processing:")
return {
"statusCode": 500,
"body": json.dumps({"error": str(e)})
}