ferrero-opentext/Python-Version/scripts/shared/database.py
nickviljoen 90f326aecb Enhancement: Treat empty A1 folders as expected workflow
Campaign managers often create the campaign in DAM before assets are
uploaded, so an empty Master Assets folder is the normal pre-asset state
rather than a failure. Stop marking these as permanently failed and stop
emailing on every poll.

- increment_a1_retry() gains mark_failed_at_max param; empty-folder path
  passes False so the campaign keeps polling indefinitely until assets
  appear (or the DAM status changes).
- Empty-folder branch now skips silently on every poll and sends a single
  warning email at poll 20 (~1 hour at the 3-min cadence) so genuinely
  stuck campaigns still surface.
- New a1_to_a2_no_assets_warning email template — one-time soft warning,
  no permanent-failure language.
- Existing reset_a1_retry() on successful A1→A2 still clears the counter
  when assets eventually appear.
- Other folder-error paths (folder not found, etc.) keep the original
  3-retry-then-fail behavior.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-28 15:20:41 +02:00

1076 lines
39 KiB
Python

"""
Database Client - PostgreSQL Operations
Handles tracking IDs, master assets, and all-done checks
Compatible with Python 3.6+
"""
import psycopg2
import psycopg2.pool
import json
import random
import string
import logging
from datetime import datetime, timezone
logger = logging.getLogger('Database')
class Database:
def __init__(self, config):
self.config = config['database']
# Create connection pool
try:
self.pool = psycopg2.pool.ThreadedConnectionPool(
minconn=1,
maxconn=10,
host=self.config['host'],
port=self.config['port'],
database=self.config['database'],
user=self.config['user'],
password=self.config['password']
)
logger.info("Database connection pool created")
except Exception as e:
logger.error("Database connection failed: {}".format(str(e)))
raise
def get_connection(self):
"""Get connection from pool"""
return self.pool.getconn()
def put_connection(self, conn):
"""Return connection to pool"""
self.pool.putconn(conn)
def generate_unique_tracking_id(self, is_master=False):
"""
Generate unique 6-character tracking ID
Args:
is_master: If True, generates ID starting with 'M' (for B1→B2 master files)
If False, generates ID that never starts with 'M' (for A1→A2 regular files)
Returns:
str: 6-character alphanumeric ID
"""
conn = self.get_connection()
try:
cursor = conn.cursor()
for attempt in range(100):
if is_master:
# Master files: M + 5 random chars (e.g., M4x7Qp)
tracking_id = 'M' + ''.join(random.choices(
string.ascii_letters + string.digits, k=5
))
else:
# Regular files: 6 random chars, but first char cannot be 'M'
# Remove uppercase 'M' from possible first characters
first_char_pool = string.ascii_letters.replace('M', '') + string.digits
first_char = random.choice(first_char_pool)
remaining = ''.join(random.choices(
string.ascii_letters + string.digits, k=5
))
tracking_id = first_char + remaining
# Check uniqueness (same query for both types)
cursor.execute(
"SELECT COUNT(*) FROM master_assets WHERE tracking_id = %s",
(tracking_id,)
)
count = cursor.fetchone()[0]
if count == 0:
logger.info("Generated tracking ID: {} (master={})".format(tracking_id, is_master))
return tracking_id
raise Exception("Failed to generate unique tracking ID after 100 attempts")
finally:
cursor.close()
self.put_connection(conn)
def find_or_create_tracking_id(self, opentext_id, local_campaign_id):
"""
Find existing tracking ID by opentext_id + local_campaign_id, or generate new one
This is used for A5→A6 rework workflow where assets may already exist from A1→A2.
We search by both opentext_id AND local_campaign_id to ensure we find the exact
asset from the original workflow for this specific campaign.
Args:
opentext_id: DAM asset ID
local_campaign_id: Local campaign ID (e.g., C000000078)
Returns:
dict with tracking_id (str) and is_existing (bool)
"""
conn = self.get_connection()
try:
cursor = conn.cursor()
# Search for existing asset by opentext_id AND local_campaign_id
cursor.execute("""
SELECT tracking_id
FROM master_assets
WHERE opentext_id = %s
AND local_campaign_id = %s
AND status = 'active'
LIMIT 1
""", (opentext_id, local_campaign_id))
row = cursor.fetchone()
if row:
# Found existing asset
tracking_id = row[0]
logger.info("Found existing asset: opentext_id={}, local_campaign_id={}, tracking_id={}".format(
opentext_id, local_campaign_id, tracking_id
))
return {
'tracking_id': tracking_id,
'is_existing': True
}
else:
# No existing asset found, generate new tracking ID
# A5→A6 rework assets are NOT masters, so is_master=False
logger.info("No existing asset found for opentext_id={}, local_campaign_id={}, generating new tracking ID".format(
opentext_id, local_campaign_id
))
tracking_id = self.generate_unique_tracking_id(is_master=False)
return {
'tracking_id': tracking_id,
'is_existing': False
}
finally:
cursor.close()
self.put_connection(conn)
def find_global_master_by_opentext_id(self, opentext_id):
"""
Look up a B1→B2 global master asset by opentext_id.
Returns the M-prefixed tracking ID if a matching global master exists.
Args:
opentext_id: DAM asset ID to search for
Returns:
str: M-prefixed tracking ID if found, None otherwise
"""
conn = self.get_connection()
try:
cursor = conn.cursor()
cursor.execute("""
SELECT tracking_id FROM master_assets
WHERE opentext_id = %s
AND tracking_id LIKE 'M%%'
AND status = 'active'
LIMIT 1
""", (opentext_id,))
row = cursor.fetchone()
if row:
logger.info("Found global master tracking ID {} for opentext_id {}".format(
row[0], opentext_id
))
return row[0]
else:
logger.debug("No global master found for opentext_id {}".format(opentext_id))
return None
finally:
cursor.close()
self.put_connection(conn)
def store_master_asset(self, tracking_id, opentext_id, asset_data, box_file_id, box_url, upload_folder_id, global_master_campaign_id=None, global_master_folder_id=None, local_campaign_id=None, global_master_tracking_id=None):
"""
Store master asset with FULL metadata in JSONB column
Args:
tracking_id: 6-char tracking ID
opentext_id: DAM asset ID
asset_data: Complete DAM asset JSON
box_file_id: Box file ID
box_url: Box URL
upload_folder_id: Final Assets folder ID for upload
global_master_campaign_id: Global master campaign ID (from GLOBAL CAMPAIGN REFERENCE)
global_master_folder_id: Global master folder ID
local_campaign_id: Local campaign ID (immediate campaign this asset belongs to)
global_master_tracking_id: M-prefixed tracking ID from B1→B2 global master (if found)
Returns:
dict with success boolean
"""
conn = self.get_connection()
try:
cursor = conn.cursor()
# Extract basic info
name = asset_data.get('name', 'unknown')
name_parts = name.rsplit('.', 1)
filename = name_parts[0]
extension = '.' + name_parts[1] if len(name_parts) > 1 else ''
# Store complete metadata as JSONB
full_metadata_json = json.dumps(asset_data)
# Description with Box info
description = "Box File ID: {}\nBox URL: {}\nDAM Asset ID: {}".format(
box_file_id, box_url, opentext_id
)
# Insert or update
cursor.execute("""
INSERT INTO master_assets (
tracking_id, opentext_id, original_filename, file_extension,
file_size_bytes, mime_type, upload_directory,
description, full_metadata, status,
global_master_campaign_id, global_master_folder_id, local_campaign_id,
global_master_tracking_id
) VALUES (
%s, %s, %s, %s, %s, %s, %s, %s, %s, 'active', %s, %s, %s, %s
)
ON CONFLICT (tracking_id) DO UPDATE SET
upload_directory = EXCLUDED.upload_directory,
description = EXCLUDED.description,
full_metadata = EXCLUDED.full_metadata,
global_master_campaign_id = EXCLUDED.global_master_campaign_id,
global_master_folder_id = EXCLUDED.global_master_folder_id,
local_campaign_id = EXCLUDED.local_campaign_id,
global_master_tracking_id = EXCLUDED.global_master_tracking_id,
updated_at = CURRENT_TIMESTAMP
""", (
tracking_id,
opentext_id,
filename,
extension,
asset_data.get('file_size'),
asset_data.get('mime_type'),
upload_folder_id,
description,
full_metadata_json,
global_master_campaign_id,
global_master_folder_id,
local_campaign_id,
global_master_tracking_id
))
conn.commit()
logger.info("Stored master asset: {}".format(tracking_id))
return {'success': True, 'tracking_id': tracking_id}
except Exception as e:
conn.rollback()
logger.error("Failed to store master asset: {}".format(str(e)))
return {'success': False, 'error': str(e)}
finally:
cursor.close()
self.put_connection(conn)
def get_master_asset(self, tracking_id):
"""
Get master asset by tracking ID
Returns:
dict with tracking_id, opentext_id, upload_directory, full_metadata
"""
conn = self.get_connection()
try:
cursor = conn.cursor()
cursor.execute("""
SELECT tracking_id, opentext_id, upload_directory, full_metadata, description
FROM master_assets
WHERE tracking_id = %s AND status = 'active'
""", (tracking_id,))
row = cursor.fetchone()
if not row:
return None
# Parse JSONB as dict
full_metadata = row[3] if isinstance(row[3], dict) else json.loads(row[3])
# Parse Box info from description
box_info = self.parse_box_info_from_description(row[4])
return {
'tracking_id': row[0],
'opentext_id': row[1],
'upload_directory': row[2],
'full_metadata': full_metadata,
'description': row[4],
'box_file_id': box_info.get('box_file_id'),
'box_url': box_info.get('box_url')
}
finally:
cursor.close()
self.put_connection(conn)
@staticmethod
def parse_box_info_from_description(description):
"""
Parse Box file ID and URL from master asset description field.
Description format:
Box File ID: {id}
Box URL: {url}
DAM Asset ID: {opentext_id}
Returns:
dict with box_file_id and box_url (None if not found)
"""
result = {'box_file_id': None, 'box_url': None}
if not description:
return result
for line in description.split('\n'):
line = line.strip()
if line.startswith('Box File ID:'):
result['box_file_id'] = line.split(':', 1)[1].strip()
elif line.startswith('Box URL:'):
result['box_url'] = line.split(':', 1)[1].strip()
return result
def check_campaign_upload_complete(self, campaign_id):
"""
Check if ALL master assets for a campaign have been uploaded
Args:
campaign_id: Campaign ID
Returns:
bool: True if all assets uploaded, False otherwise
"""
conn = self.get_connection()
try:
cursor = conn.cursor()
# Count total master assets for this campaign
cursor.execute("""
SELECT COUNT(DISTINCT tracking_id)
FROM master_assets
WHERE campaign_id = %s AND status = 'active'
""", (campaign_id,))
total_masters = cursor.fetchone()[0]
if total_masters == 0:
return False
# Count how many have been uploaded (exist in derivative_assets)
cursor.execute("""
SELECT COUNT(DISTINCT ma.tracking_id)
FROM master_assets ma
INNER JOIN derivative_assets da ON ma.tracking_id = da.tracking_id
WHERE ma.campaign_id = %s AND ma.status = 'active'
AND da.upload_status = 'completed'
""", (campaign_id,))
uploaded_count = cursor.fetchone()[0]
all_done = uploaded_count == total_masters
logger.info("Campaign {} upload status: {}/{} assets uploaded{}".format(
campaign_id, uploaded_count, total_masters,
" - ALL DONE" if all_done else ""
))
return all_done
finally:
cursor.close()
self.put_connection(conn)
def store_derivative_asset(self, tracking_id, master_asset_id, dam_asset_id, filename):
"""
Store derivative asset record after upload
Uses existing schema columns only
Args:
tracking_id: Master asset tracking ID
master_asset_id: Master asset DB ID (optional, can be None)
dam_asset_id: DAM asset ID of uploaded derivative (logged but not stored)
filename: Clean filename of derivative
Returns:
dict with success boolean
"""
conn = self.get_connection()
try:
cursor = conn.cursor()
# Extract filename parts
name_without_ext = filename.rsplit('.', 1)[0] if '.' in filename else filename
extension = '.' + filename.rsplit('.', 1)[1] if '.' in filename else ''
# Store derivative with DAM asset ID (tracking_id already links to master)
cursor.execute("""
INSERT INTO derivative_assets (
tracking_id, dam_asset_id, derivative_filename, file_extension, status
) VALUES (%s, %s, %s, %s, 'active')
""", (tracking_id, dam_asset_id, name_without_ext, extension))
conn.commit()
logger.info("Stored derivative asset: {}{} (DAM ID: {})".format(
tracking_id, filename, dam_asset_id
))
return {'success': True}
except Exception as e:
conn.rollback()
logger.error("Failed to store derivative: {}".format(str(e)))
return {'success': False, 'error': str(e)}
finally:
cursor.close()
self.put_connection(conn)
def get_campaign_asset_count(self, campaign_id):
"""Get total master asset count for campaign"""
conn = self.get_connection()
try:
cursor = conn.cursor()
cursor.execute("""
SELECT COUNT(*) FROM master_assets
WHERE campaign_id = %s AND status = 'active'
""", (campaign_id,))
return cursor.fetchone()[0]
finally:
cursor.close()
self.put_connection(conn)
def test_connection(self):
"""Test database connection"""
try:
conn = self.get_connection()
cursor = conn.cursor()
cursor.execute("SELECT 1")
cursor.close()
self.put_connection(conn)
logger.info("Database connection OK")
return True
except Exception as e:
logger.error("Database connection failed: {}".format(str(e)))
return False
def extract_global_campaign_reference(self, asset_data, campaign_id=None):
"""
Extract Global Campaign Reference and Local Campaign ID from asset metadata
Args:
asset_data: Complete DAM asset JSON
campaign_id: Current campaign ID (used as fallback for local_campaign_id)
Returns:
dict with global_master_campaign_id, global_master_folder_id, local_campaign_id
"""
global_master_campaign_id = None
global_master_folder_id = None
local_campaign_id = None
# Look in inherited_metadata_collections
collections = asset_data.get('inherited_metadata_collections', [])
for collection in collections:
# Only check campaign containers
if collection.get('container_type_name') == 'L7+ - CAMPAIGN':
inherited_metadata = collection.get('inherited_metadata_values', [])
campaign_id_in_collection = None
has_global_reference = False
for inherited in inherited_metadata:
metadata_element = inherited.get('metadata_element', {})
# Extract Campaign ID
if metadata_element.get('id') == 'FERRERO.FIELD.CAMPAIGN ID':
campaign_id_value = metadata_element.get('value', {}).get('value', {}).get('value')
if campaign_id_value:
campaign_id_in_collection = campaign_id_value
# Look for GLOBAL CAMPAIGN REFERENCE field
if metadata_element.get('id') == 'FERRERO.FIELD.GLOBAL CAMPAIGN REFERENCE':
value = metadata_element.get('value', {}).get('value', {}).get('value')
if value:
global_master_campaign_id = value
has_global_reference = True
logger.info("Found Global Campaign Reference: {}".format(value))
# If this campaign has a global reference, it's the local campaign
if has_global_reference and campaign_id_in_collection:
local_campaign_id = campaign_id_in_collection
logger.info("Local Campaign ID: {}".format(local_campaign_id))
# Get the container ID (global master folder)
container_id = collection.get('container_id')
if container_id:
global_master_folder_id = container_id
logger.info("Global master folder ID: {}".format(container_id))
# CRITICAL FIX: Always set local_campaign_id to current campaign if not found
# This ensures every asset has a campaign ID, even without Global Campaign Reference
if not local_campaign_id and campaign_id:
local_campaign_id = campaign_id
logger.info("No Global Campaign Reference found, using current campaign ID: {}".format(campaign_id))
return {
'global_master_campaign_id': global_master_campaign_id,
'global_master_folder_id': global_master_folder_id,
'local_campaign_id': local_campaign_id
}
def record_campaign_status(self, campaign_id, campaign_number, campaign_name, live_campaign, status, webhook_sent=False):
"""
Record or update campaign status for webhook tracking
Args:
campaign_id: DAM campaign folder ID
campaign_number: Campaign number (e.g., C000000078)
campaign_name: Campaign name
live_campaign: 'YES' or 'NO'
status: Current status (A2, A4, etc.)
webhook_sent: Whether webhook was sent
Returns:
dict with success boolean
"""
conn = self.get_connection()
try:
cursor = conn.cursor()
# Insert or update campaign status
cursor.execute("""
INSERT INTO campaign_status (
campaign_id, campaign_number, campaign_name,
live_campaign, status, webhook_sent, webhook_sent_at
) VALUES (%s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (campaign_id) DO UPDATE SET
campaign_number = EXCLUDED.campaign_number,
campaign_name = EXCLUDED.campaign_name,
live_campaign = EXCLUDED.live_campaign,
status = EXCLUDED.status,
webhook_sent = EXCLUDED.webhook_sent,
webhook_sent_at = EXCLUDED.webhook_sent_at,
updated_at = CURRENT_TIMESTAMP
""", (
campaign_id,
campaign_number,
campaign_name,
live_campaign,
status,
webhook_sent,
datetime.now(timezone.utc) if webhook_sent else None
))
conn.commit()
logger.info("Recorded campaign status: {} - {} (Live: {})".format(
campaign_number, status, live_campaign
))
return {'success': True}
except Exception as e:
conn.rollback()
logger.error("Failed to record campaign status: {}".format(str(e)))
return {'success': False, 'error': str(e)}
finally:
cursor.close()
self.put_connection(conn)
def get_a1_retry_status(self, campaign_id):
"""
Get A1 retry status for campaign
Args:
campaign_id: DAM campaign folder ID
Returns:
dict with retry_count, last_retry_at, permanently_failed, failure_reason
Returns None if campaign not found
"""
conn = self.get_connection()
try:
cursor = conn.cursor()
cursor.execute("""
SELECT a1_retry_count, a1_last_retry_at,
a1_permanently_failed, a1_failure_reason
FROM campaign_status
WHERE campaign_id = %s
""", (campaign_id,))
row = cursor.fetchone()
if row:
return {
'retry_count': row[0] or 0,
'last_retry_at': row[1],
'permanently_failed': row[2] or False,
'failure_reason': row[3]
}
else:
return None
finally:
cursor.close()
self.put_connection(conn)
def increment_a1_retry(self, campaign_id, campaign_number, campaign_name, reason, mark_failed_at_max=True):
"""
Increment A1 retry counter and mark as permanently failed if max attempts reached
Args:
campaign_id: DAM campaign folder ID
campaign_number: Campaign number (e.g., C000000078)
campaign_name: Campaign name
reason: Description of failure (e.g., "No master assets found")
mark_failed_at_max: If True (default), set a1_permanently_failed=True at MAX_RETRIES.
Set False for empty-folder polling where the campaign is expected
to eventually receive assets and should keep retrying silently.
Returns:
dict with success, retry_count, permanently_failed
"""
conn = self.get_connection()
try:
cursor = conn.cursor()
# Maximum retry attempts before marking as permanently failed
MAX_RETRIES = 3
# Get current retry count
cursor.execute("""
SELECT a1_retry_count FROM campaign_status
WHERE campaign_id = %s
""", (campaign_id,))
row = cursor.fetchone()
current_count = (row[0] or 0) if row else 0
new_count = current_count + 1
is_permanently_failed = mark_failed_at_max and new_count >= MAX_RETRIES
# Insert or update campaign status with retry tracking
cursor.execute("""
INSERT INTO campaign_status (
campaign_id, campaign_number, campaign_name,
live_campaign, status, webhook_sent,
a1_retry_count, a1_last_retry_at,
a1_permanently_failed, a1_failure_reason
) VALUES (%s, %s, %s, 'NO', 'A1', FALSE, %s, CURRENT_TIMESTAMP, %s, %s)
ON CONFLICT (campaign_id) DO UPDATE SET
a1_retry_count = EXCLUDED.a1_retry_count,
a1_last_retry_at = EXCLUDED.a1_last_retry_at,
a1_permanently_failed = EXCLUDED.a1_permanently_failed,
a1_failure_reason = EXCLUDED.a1_failure_reason,
updated_at = CURRENT_TIMESTAMP
""", (
campaign_id,
campaign_number,
campaign_name,
new_count,
is_permanently_failed,
reason if is_permanently_failed else None
))
conn.commit()
logger.info("A1 retry tracking: Campaign {} - Attempt {}/{} (Permanently Failed: {})".format(
campaign_number, new_count, MAX_RETRIES, is_permanently_failed
))
return {
'success': True,
'retry_count': new_count,
'permanently_failed': is_permanently_failed
}
except Exception as e:
conn.rollback()
logger.error("Failed to increment A1 retry: {}".format(str(e)))
return {'success': False, 'error': str(e)}
finally:
cursor.close()
self.put_connection(conn)
def reset_a1_retry(self, campaign_id):
"""
Reset A1 retry tracking for campaign (used when campaign is fixed manually)
Args:
campaign_id: DAM campaign folder ID
Returns:
dict with success boolean
"""
conn = self.get_connection()
try:
cursor = conn.cursor()
cursor.execute("""
UPDATE campaign_status
SET a1_retry_count = 0,
a1_last_retry_at = NULL,
a1_permanently_failed = FALSE,
a1_failure_reason = NULL,
updated_at = CURRENT_TIMESTAMP
WHERE campaign_id = %s
""", (campaign_id,))
conn.commit()
logger.info("Reset A1 retry tracking for campaign: {}".format(campaign_id))
return {'success': True}
except Exception as e:
conn.rollback()
logger.error("Failed to reset A1 retry: {}".format(str(e)))
return {'success': False, 'error': str(e)}
finally:
cursor.close()
self.put_connection(conn)
def check_campaign_processed(self, campaign_id):
"""
Check if campaign has already been processed (webhook sent)
Args:
campaign_id: DAM campaign folder ID
Returns:
dict with exists (bool) and campaign_status data if exists
"""
conn = self.get_connection()
try:
cursor = conn.cursor()
cursor.execute("""
SELECT campaign_id, campaign_number, campaign_name,
live_campaign, status, webhook_sent, webhook_sent_at
FROM campaign_status
WHERE campaign_id = %s
""", (campaign_id,))
row = cursor.fetchone()
if row:
return {
'exists': True,
'campaign_id': row[0],
'campaign_number': row[1],
'campaign_name': row[2],
'live_campaign': row[3],
'status': row[4],
'webhook_sent': row[5],
'webhook_sent_at': row[6]
}
else:
return {'exists': False}
finally:
cursor.close()
self.put_connection(conn)
def store_creativex_score(self, filename, creativex_id, creativex_url, quality_score, box_file_id, full_extraction_data, tracking_id=None, status='active'):
"""
Store CreativeX score data extracted from PDF or DAM metadata
Uses soft delete strategy for 'active' status - marks old records as 'superseded'
For 'master-cx-score' status - no versioning, just stores reference
Args:
filename: Original filename from extraction
creativex_id: CreativeX ID from extraction
creativex_url: CreativeX URL from extraction
quality_score: Quality score percentage
box_file_id: Box file ID for tracking
full_extraction_data: Complete LlamaExtract JSON response or DAM metadata
tracking_id: Optional tracking ID (used for master-cx-score status)
status: 'active' (default), 'master-cx-score', or custom status
Returns:
dict with success boolean and is_update flag
"""
conn = self.get_connection()
try:
cursor = conn.cursor()
# Convert full_extraction_data to JSON string if it's a dict
import json
full_json = json.dumps(full_extraction_data) if isinstance(full_extraction_data, dict) else full_extraction_data
# Handle master-cx-score differently (no versioning, just reference storage)
if status == 'master-cx-score':
# Simple insert for master score reference (no versioning)
cursor.execute("""
INSERT INTO creativex_scores (
filename, creativex_id, creativex_url, quality_score,
box_file_id, full_extraction_data, tracking_id, status
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
""", (
filename,
creativex_id,
creativex_url,
quality_score,
box_file_id,
full_json,
tracking_id,
'master-cx-score'
))
conn.commit()
logger.info("Stored master CreativeX score: {} (Tracking: {}, Score: {})".format(
filename, tracking_id, quality_score
))
return {
'success': True,
'is_update': False,
'version_number': 1
}
# For 'active' status - use soft delete versioning
# Strip timestamp suffix (e.g. _2026-03-13-05-53-36) from filename
# so re-scored assets supersede previous versions regardless of timestamp
import re
dot_idx = filename.rfind('.')
name_part = filename[:dot_idx] if dot_idx >= 0 else filename
ext = filename[dot_idx:] if dot_idx >= 0 else ''
base_filename = re.sub(r'_\d{4}-\d{2}-\d{2}-\d{2}-\d{2}-\d{2}$', '', name_part) + ext
# Step 1: Check if this base asset already exists with status='active'
# Use LIKE pattern to match any timestamp variant of the same base filename
if base_filename != filename:
# Filename has a timestamp - match base pattern with any/no timestamp
like_pattern = base_filename.replace(ext, '') + '%' + ext
cursor.execute("""
SELECT id, quality_score, filename FROM creativex_scores
WHERE filename LIKE %s AND status = 'active'
""", (like_pattern,))
else:
# No timestamp in filename - still match variants that do have one
like_pattern = name_part + '%' + ext
cursor.execute("""
SELECT id, quality_score, filename FROM creativex_scores
WHERE filename LIKE %s AND status = 'active'
""", (like_pattern,))
existing = cursor.fetchall()
# Count total versions (including superseded) for the base asset
cursor.execute("""
SELECT COUNT(*) FROM creativex_scores
WHERE filename LIKE %s
""", (like_pattern,))
total_versions = cursor.fetchone()[0]
if existing:
# Step 2: Mark all existing active records as 'superseded'
cursor.execute("""
UPDATE creativex_scores
SET status = 'superseded'
WHERE filename LIKE %s AND status = 'active'
""", (like_pattern,))
superseded_filenames = [row[2] for row in existing]
logger.info("Superseded {} previous CreativeX score(s) for base asset: {} (old filenames: {})".format(
len(existing), base_filename, superseded_filenames
))
# Step 3: Insert new 'active' record
cursor.execute("""
INSERT INTO creativex_scores (
filename, creativex_id, creativex_url, quality_score,
box_file_id, full_extraction_data, tracking_id, status
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
""", (
filename,
creativex_id,
creativex_url,
quality_score,
box_file_id,
full_json,
tracking_id,
status
))
conn.commit()
# New version number is total_versions + 1
version_number = total_versions + 1
if existing:
old_scores = [row[1] for row in existing]
logger.info("Updated CreativeX score: {} (Old scores: {} -> {}, Version: {})".format(
filename, old_scores, quality_score, version_number
))
else:
logger.info("Stored new CreativeX score: {} (Score: {}, Version: {})".format(
filename, quality_score, version_number
))
return {
'success': True,
'is_update': bool(existing),
'version_number': version_number
}
except Exception as e:
conn.rollback()
logger.error("Failed to store CreativeX score: {}".format(str(e)))
return {'success': False, 'error': str(e)}
finally:
cursor.close()
self.put_connection(conn)
def get_creativex_score_by_filename(self, filename, tracking_id=None):
"""
Get CreativeX score data by filename
Performs extension-agnostic lookup: if exact filename not found,
tries common video/image extensions (.mp4, .jpg, .png, .mov, etc.)
If still not found and tracking_id provided, falls back to LIKE search
on tracking ID (handles mismatched naming from CreativeX PDFs).
Args:
filename: Filename to search for
tracking_id: Optional tracking ID for fallback lookup
Returns:
dict with creativex data or None if not found
"""
import os
conn = self.get_connection()
try:
cursor = conn.cursor()
# Try exact filename first
cursor.execute("""
SELECT filename, creativex_id, creativex_url, quality_score,
box_file_id, full_extraction_data, extracted_at
FROM creativex_scores
WHERE filename = %s AND status = 'active'
ORDER BY extracted_at DESC
LIMIT 1
""", (filename,))
row = cursor.fetchone()
# If not found, try alternative extensions
if not row:
base_name, ext = os.path.splitext(filename)
# Common extensions for video and image files
alt_extensions = ['.mp4', '.jpg', '.jpeg', '.png', '.mov', '.avi', '.gif', '.webp']
# Remove the current extension from alternatives to avoid duplicate search
alt_extensions = [e for e in alt_extensions if e.lower() != ext.lower()]
for alt_ext in alt_extensions:
alt_filename = base_name + alt_ext
cursor.execute("""
SELECT filename, creativex_id, creativex_url, quality_score,
box_file_id, full_extraction_data, extracted_at
FROM creativex_scores
WHERE filename = %s AND status = 'active'
ORDER BY extracted_at DESC
LIMIT 1
""", (alt_filename,))
row = cursor.fetchone()
if row:
break # Found with alternative extension
# If still not found, try tracking ID fallback
# CreativeX PDFs sometimes have different naming (extra text, stripped hyphens)
# but tracking ID is always consistent
if not row and tracking_id:
cursor.execute("""
SELECT filename, creativex_id, creativex_url, quality_score,
box_file_id, full_extraction_data, extracted_at
FROM creativex_scores
WHERE filename LIKE %s AND status = 'active'
ORDER BY extracted_at DESC
LIMIT 1
""", ('%' + tracking_id + '%',))
row = cursor.fetchone()
if row:
logger.info("CreativeX: Found score via tracking ID fallback '{}' -> {}".format(
tracking_id, row[0]))
if not row:
return None
# Parse JSONB as dict
import json
full_data = row[5] if isinstance(row[5], dict) else json.loads(row[5])
return {
'filename': row[0],
'creativex_id': row[1],
'creativex_url': row[2],
'quality_score': row[3],
'box_file_id': row[4],
'full_extraction_data': full_data,
'extracted_at': row[6]
}
finally:
cursor.close()
self.put_connection(conn)
def get_all_live_campaigns(self):
"""
Get all live campaigns for CSV report
Returns:
list of dicts with campaign_number, campaign_name
"""
conn = self.get_connection()
try:
cursor = conn.cursor()
cursor.execute("""
SELECT campaign_number, campaign_name
FROM campaign_status
WHERE live_campaign = 'YES'
ORDER BY campaign_number DESC
""")
rows = cursor.fetchall()
campaigns = []
for row in rows:
campaigns.append({
'campaign_number': row[0],
'campaign_name': row[1]
})
return campaigns
finally:
cursor.close()
self.put_connection(conn)
def close(self):
"""Close all connections in pool"""
if self.pool:
self.pool.closeall()