A1→A2 now handles re-processing when campaign is reset to A1 after adding new master assets. Existing assets reuse tracking IDs and skip Box upload, new assets are processed normally. Also includes PPR domain registration for multiple master asset IDs in a2_to_a3 and dam_client. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
990 lines
35 KiB
Python
990 lines
35 KiB
Python
"""
|
|
Database Client - PostgreSQL Operations
|
|
Handles tracking IDs, master assets, and all-done checks
|
|
Compatible with Python 3.6+
|
|
"""
|
|
|
|
import psycopg2
|
|
import psycopg2.pool
|
|
import json
|
|
import random
|
|
import string
|
|
import logging
|
|
from datetime import datetime, timezone
|
|
|
|
logger = logging.getLogger('Database')
|
|
|
|
class Database:
|
|
def __init__(self, config):
|
|
self.config = config['database']
|
|
|
|
# Create connection pool
|
|
try:
|
|
self.pool = psycopg2.pool.ThreadedConnectionPool(
|
|
minconn=1,
|
|
maxconn=10,
|
|
host=self.config['host'],
|
|
port=self.config['port'],
|
|
database=self.config['database'],
|
|
user=self.config['user'],
|
|
password=self.config['password']
|
|
)
|
|
logger.info("Database connection pool created")
|
|
except Exception as e:
|
|
logger.error("Database connection failed: {}".format(str(e)))
|
|
raise
|
|
|
|
def get_connection(self):
|
|
"""Get connection from pool"""
|
|
return self.pool.getconn()
|
|
|
|
def put_connection(self, conn):
|
|
"""Return connection to pool"""
|
|
self.pool.putconn(conn)
|
|
|
|
def generate_unique_tracking_id(self, is_master=False):
|
|
"""
|
|
Generate unique 6-character tracking ID
|
|
|
|
Args:
|
|
is_master: If True, generates ID starting with 'M' (for B1→B2 master files)
|
|
If False, generates ID that never starts with 'M' (for A1→A2 regular files)
|
|
|
|
Returns:
|
|
str: 6-character alphanumeric ID
|
|
"""
|
|
conn = self.get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
|
|
for attempt in range(100):
|
|
if is_master:
|
|
# Master files: M + 5 random chars (e.g., M4x7Qp)
|
|
tracking_id = 'M' + ''.join(random.choices(
|
|
string.ascii_letters + string.digits, k=5
|
|
))
|
|
else:
|
|
# Regular files: 6 random chars, but first char cannot be 'M'
|
|
# Remove uppercase 'M' from possible first characters
|
|
first_char_pool = string.ascii_letters.replace('M', '') + string.digits
|
|
first_char = random.choice(first_char_pool)
|
|
remaining = ''.join(random.choices(
|
|
string.ascii_letters + string.digits, k=5
|
|
))
|
|
tracking_id = first_char + remaining
|
|
|
|
# Check uniqueness (same query for both types)
|
|
cursor.execute(
|
|
"SELECT COUNT(*) FROM master_assets WHERE tracking_id = %s",
|
|
(tracking_id,)
|
|
)
|
|
|
|
count = cursor.fetchone()[0]
|
|
|
|
if count == 0:
|
|
logger.info("Generated tracking ID: {} (master={})".format(tracking_id, is_master))
|
|
return tracking_id
|
|
|
|
raise Exception("Failed to generate unique tracking ID after 100 attempts")
|
|
|
|
finally:
|
|
cursor.close()
|
|
self.put_connection(conn)
|
|
|
|
def find_or_create_tracking_id(self, opentext_id, local_campaign_id):
|
|
"""
|
|
Find existing tracking ID by opentext_id + local_campaign_id, or generate new one
|
|
|
|
This is used for A5→A6 rework workflow where assets may already exist from A1→A2.
|
|
We search by both opentext_id AND local_campaign_id to ensure we find the exact
|
|
asset from the original workflow for this specific campaign.
|
|
|
|
Args:
|
|
opentext_id: DAM asset ID
|
|
local_campaign_id: Local campaign ID (e.g., C000000078)
|
|
|
|
Returns:
|
|
dict with tracking_id (str) and is_existing (bool)
|
|
"""
|
|
conn = self.get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
|
|
# Search for existing asset by opentext_id AND local_campaign_id
|
|
cursor.execute("""
|
|
SELECT tracking_id
|
|
FROM master_assets
|
|
WHERE opentext_id = %s
|
|
AND local_campaign_id = %s
|
|
AND status = 'active'
|
|
LIMIT 1
|
|
""", (opentext_id, local_campaign_id))
|
|
|
|
row = cursor.fetchone()
|
|
|
|
if row:
|
|
# Found existing asset
|
|
tracking_id = row[0]
|
|
logger.info("Found existing asset: opentext_id={}, local_campaign_id={}, tracking_id={}".format(
|
|
opentext_id, local_campaign_id, tracking_id
|
|
))
|
|
return {
|
|
'tracking_id': tracking_id,
|
|
'is_existing': True
|
|
}
|
|
else:
|
|
# No existing asset found, generate new tracking ID
|
|
# A5→A6 rework assets are NOT masters, so is_master=False
|
|
logger.info("No existing asset found for opentext_id={}, local_campaign_id={}, generating new tracking ID".format(
|
|
opentext_id, local_campaign_id
|
|
))
|
|
tracking_id = self.generate_unique_tracking_id(is_master=False)
|
|
return {
|
|
'tracking_id': tracking_id,
|
|
'is_existing': False
|
|
}
|
|
|
|
finally:
|
|
cursor.close()
|
|
self.put_connection(conn)
|
|
|
|
def store_master_asset(self, tracking_id, opentext_id, asset_data, box_file_id, box_url, upload_folder_id, global_master_campaign_id=None, global_master_folder_id=None, local_campaign_id=None):
|
|
"""
|
|
Store master asset with FULL metadata in JSONB column
|
|
|
|
Args:
|
|
tracking_id: 6-char tracking ID
|
|
opentext_id: DAM asset ID
|
|
asset_data: Complete DAM asset JSON
|
|
box_file_id: Box file ID
|
|
box_url: Box URL
|
|
upload_folder_id: Final Assets folder ID for upload
|
|
global_master_campaign_id: Global master campaign ID (from GLOBAL CAMPAIGN REFERENCE)
|
|
global_master_folder_id: Global master folder ID
|
|
local_campaign_id: Local campaign ID (immediate campaign this asset belongs to)
|
|
|
|
Returns:
|
|
dict with success boolean
|
|
"""
|
|
conn = self.get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
|
|
# Extract basic info
|
|
name = asset_data.get('name', 'unknown')
|
|
name_parts = name.rsplit('.', 1)
|
|
filename = name_parts[0]
|
|
extension = '.' + name_parts[1] if len(name_parts) > 1 else ''
|
|
|
|
# Store complete metadata as JSONB
|
|
full_metadata_json = json.dumps(asset_data)
|
|
|
|
# Description with Box info
|
|
description = "Box File ID: {}\nBox URL: {}\nDAM Asset ID: {}".format(
|
|
box_file_id, box_url, opentext_id
|
|
)
|
|
|
|
# Insert or update
|
|
cursor.execute("""
|
|
INSERT INTO master_assets (
|
|
tracking_id, opentext_id, original_filename, file_extension,
|
|
file_size_bytes, mime_type, upload_directory,
|
|
description, full_metadata, status,
|
|
global_master_campaign_id, global_master_folder_id, local_campaign_id
|
|
) VALUES (
|
|
%s, %s, %s, %s, %s, %s, %s, %s, %s, 'active', %s, %s, %s
|
|
)
|
|
ON CONFLICT (tracking_id) DO UPDATE SET
|
|
upload_directory = EXCLUDED.upload_directory,
|
|
description = EXCLUDED.description,
|
|
full_metadata = EXCLUDED.full_metadata,
|
|
global_master_campaign_id = EXCLUDED.global_master_campaign_id,
|
|
global_master_folder_id = EXCLUDED.global_master_folder_id,
|
|
local_campaign_id = EXCLUDED.local_campaign_id,
|
|
updated_at = CURRENT_TIMESTAMP
|
|
""", (
|
|
tracking_id,
|
|
opentext_id,
|
|
filename,
|
|
extension,
|
|
asset_data.get('file_size'),
|
|
asset_data.get('mime_type'),
|
|
upload_folder_id,
|
|
description,
|
|
full_metadata_json,
|
|
global_master_campaign_id,
|
|
global_master_folder_id,
|
|
local_campaign_id
|
|
))
|
|
|
|
conn.commit()
|
|
logger.info("Stored master asset: {}".format(tracking_id))
|
|
|
|
return {'success': True, 'tracking_id': tracking_id}
|
|
|
|
except Exception as e:
|
|
conn.rollback()
|
|
logger.error("Failed to store master asset: {}".format(str(e)))
|
|
return {'success': False, 'error': str(e)}
|
|
|
|
finally:
|
|
cursor.close()
|
|
self.put_connection(conn)
|
|
|
|
def get_master_asset(self, tracking_id):
|
|
"""
|
|
Get master asset by tracking ID
|
|
|
|
Returns:
|
|
dict with tracking_id, opentext_id, upload_directory, full_metadata
|
|
"""
|
|
conn = self.get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("""
|
|
SELECT tracking_id, opentext_id, upload_directory, full_metadata, description
|
|
FROM master_assets
|
|
WHERE tracking_id = %s AND status = 'active'
|
|
""", (tracking_id,))
|
|
|
|
row = cursor.fetchone()
|
|
|
|
if not row:
|
|
return None
|
|
|
|
# Parse JSONB as dict
|
|
full_metadata = row[3] if isinstance(row[3], dict) else json.loads(row[3])
|
|
|
|
# Parse Box info from description
|
|
box_info = self.parse_box_info_from_description(row[4])
|
|
|
|
return {
|
|
'tracking_id': row[0],
|
|
'opentext_id': row[1],
|
|
'upload_directory': row[2],
|
|
'full_metadata': full_metadata,
|
|
'description': row[4],
|
|
'box_file_id': box_info.get('box_file_id'),
|
|
'box_url': box_info.get('box_url')
|
|
}
|
|
|
|
finally:
|
|
cursor.close()
|
|
self.put_connection(conn)
|
|
|
|
@staticmethod
|
|
def parse_box_info_from_description(description):
|
|
"""
|
|
Parse Box file ID and URL from master asset description field.
|
|
|
|
Description format:
|
|
Box File ID: {id}
|
|
Box URL: {url}
|
|
DAM Asset ID: {opentext_id}
|
|
|
|
Returns:
|
|
dict with box_file_id and box_url (None if not found)
|
|
"""
|
|
result = {'box_file_id': None, 'box_url': None}
|
|
if not description:
|
|
return result
|
|
|
|
for line in description.split('\n'):
|
|
line = line.strip()
|
|
if line.startswith('Box File ID:'):
|
|
result['box_file_id'] = line.split(':', 1)[1].strip()
|
|
elif line.startswith('Box URL:'):
|
|
result['box_url'] = line.split(':', 1)[1].strip()
|
|
|
|
return result
|
|
|
|
def check_campaign_upload_complete(self, campaign_id):
|
|
"""
|
|
Check if ALL master assets for a campaign have been uploaded
|
|
|
|
Args:
|
|
campaign_id: Campaign ID
|
|
|
|
Returns:
|
|
bool: True if all assets uploaded, False otherwise
|
|
"""
|
|
conn = self.get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
|
|
# Count total master assets for this campaign
|
|
cursor.execute("""
|
|
SELECT COUNT(DISTINCT tracking_id)
|
|
FROM master_assets
|
|
WHERE campaign_id = %s AND status = 'active'
|
|
""", (campaign_id,))
|
|
|
|
total_masters = cursor.fetchone()[0]
|
|
|
|
if total_masters == 0:
|
|
return False
|
|
|
|
# Count how many have been uploaded (exist in derivative_assets)
|
|
cursor.execute("""
|
|
SELECT COUNT(DISTINCT ma.tracking_id)
|
|
FROM master_assets ma
|
|
INNER JOIN derivative_assets da ON ma.tracking_id = da.tracking_id
|
|
WHERE ma.campaign_id = %s AND ma.status = 'active'
|
|
AND da.upload_status = 'completed'
|
|
""", (campaign_id,))
|
|
|
|
uploaded_count = cursor.fetchone()[0]
|
|
|
|
all_done = uploaded_count == total_masters
|
|
|
|
logger.info("Campaign {} upload status: {}/{} assets uploaded{}".format(
|
|
campaign_id, uploaded_count, total_masters,
|
|
" - ALL DONE" if all_done else ""
|
|
))
|
|
|
|
return all_done
|
|
|
|
finally:
|
|
cursor.close()
|
|
self.put_connection(conn)
|
|
|
|
def store_derivative_asset(self, tracking_id, master_asset_id, dam_asset_id, filename):
|
|
"""
|
|
Store derivative asset record after upload
|
|
Uses existing schema columns only
|
|
|
|
Args:
|
|
tracking_id: Master asset tracking ID
|
|
master_asset_id: Master asset DB ID (optional, can be None)
|
|
dam_asset_id: DAM asset ID of uploaded derivative (logged but not stored)
|
|
filename: Clean filename of derivative
|
|
|
|
Returns:
|
|
dict with success boolean
|
|
"""
|
|
conn = self.get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
|
|
# Extract filename parts
|
|
name_without_ext = filename.rsplit('.', 1)[0] if '.' in filename else filename
|
|
extension = '.' + filename.rsplit('.', 1)[1] if '.' in filename else ''
|
|
|
|
# Store derivative with DAM asset ID (tracking_id already links to master)
|
|
cursor.execute("""
|
|
INSERT INTO derivative_assets (
|
|
tracking_id, dam_asset_id, derivative_filename, file_extension, status
|
|
) VALUES (%s, %s, %s, %s, 'active')
|
|
""", (tracking_id, dam_asset_id, name_without_ext, extension))
|
|
|
|
conn.commit()
|
|
logger.info("Stored derivative asset: {} → {} (DAM ID: {})".format(
|
|
tracking_id, filename, dam_asset_id
|
|
))
|
|
|
|
return {'success': True}
|
|
|
|
except Exception as e:
|
|
conn.rollback()
|
|
logger.error("Failed to store derivative: {}".format(str(e)))
|
|
return {'success': False, 'error': str(e)}
|
|
|
|
finally:
|
|
cursor.close()
|
|
self.put_connection(conn)
|
|
|
|
def get_campaign_asset_count(self, campaign_id):
|
|
"""Get total master asset count for campaign"""
|
|
conn = self.get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("""
|
|
SELECT COUNT(*) FROM master_assets
|
|
WHERE campaign_id = %s AND status = 'active'
|
|
""", (campaign_id,))
|
|
|
|
return cursor.fetchone()[0]
|
|
|
|
finally:
|
|
cursor.close()
|
|
self.put_connection(conn)
|
|
|
|
def test_connection(self):
|
|
"""Test database connection"""
|
|
try:
|
|
conn = self.get_connection()
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT 1")
|
|
cursor.close()
|
|
self.put_connection(conn)
|
|
logger.info("Database connection OK")
|
|
return True
|
|
except Exception as e:
|
|
logger.error("Database connection failed: {}".format(str(e)))
|
|
return False
|
|
|
|
def extract_global_campaign_reference(self, asset_data, campaign_id=None):
|
|
"""
|
|
Extract Global Campaign Reference and Local Campaign ID from asset metadata
|
|
|
|
Args:
|
|
asset_data: Complete DAM asset JSON
|
|
campaign_id: Current campaign ID (used as fallback for local_campaign_id)
|
|
|
|
Returns:
|
|
dict with global_master_campaign_id, global_master_folder_id, local_campaign_id
|
|
"""
|
|
global_master_campaign_id = None
|
|
global_master_folder_id = None
|
|
local_campaign_id = None
|
|
|
|
# Look in inherited_metadata_collections
|
|
collections = asset_data.get('inherited_metadata_collections', [])
|
|
|
|
for collection in collections:
|
|
# Only check campaign containers
|
|
if collection.get('container_type_name') == 'L7+ - CAMPAIGN':
|
|
inherited_metadata = collection.get('inherited_metadata_values', [])
|
|
|
|
campaign_id_in_collection = None
|
|
has_global_reference = False
|
|
|
|
for inherited in inherited_metadata:
|
|
metadata_element = inherited.get('metadata_element', {})
|
|
|
|
# Extract Campaign ID
|
|
if metadata_element.get('id') == 'FERRERO.FIELD.CAMPAIGN ID':
|
|
campaign_id_value = metadata_element.get('value', {}).get('value', {}).get('value')
|
|
if campaign_id_value:
|
|
campaign_id_in_collection = campaign_id_value
|
|
|
|
# Look for GLOBAL CAMPAIGN REFERENCE field
|
|
if metadata_element.get('id') == 'FERRERO.FIELD.GLOBAL CAMPAIGN REFERENCE':
|
|
value = metadata_element.get('value', {}).get('value', {}).get('value')
|
|
if value:
|
|
global_master_campaign_id = value
|
|
has_global_reference = True
|
|
logger.info("Found Global Campaign Reference: {}".format(value))
|
|
|
|
# If this campaign has a global reference, it's the local campaign
|
|
if has_global_reference and campaign_id_in_collection:
|
|
local_campaign_id = campaign_id_in_collection
|
|
logger.info("Local Campaign ID: {}".format(local_campaign_id))
|
|
|
|
# Get the container ID (global master folder)
|
|
container_id = collection.get('container_id')
|
|
if container_id:
|
|
global_master_folder_id = container_id
|
|
logger.info("Global master folder ID: {}".format(container_id))
|
|
|
|
# CRITICAL FIX: Always set local_campaign_id to current campaign if not found
|
|
# This ensures every asset has a campaign ID, even without Global Campaign Reference
|
|
if not local_campaign_id and campaign_id:
|
|
local_campaign_id = campaign_id
|
|
logger.info("No Global Campaign Reference found, using current campaign ID: {}".format(campaign_id))
|
|
|
|
return {
|
|
'global_master_campaign_id': global_master_campaign_id,
|
|
'global_master_folder_id': global_master_folder_id,
|
|
'local_campaign_id': local_campaign_id
|
|
}
|
|
|
|
def record_campaign_status(self, campaign_id, campaign_number, campaign_name, live_campaign, status, webhook_sent=False):
|
|
"""
|
|
Record or update campaign status for webhook tracking
|
|
|
|
Args:
|
|
campaign_id: DAM campaign folder ID
|
|
campaign_number: Campaign number (e.g., C000000078)
|
|
campaign_name: Campaign name
|
|
live_campaign: 'YES' or 'NO'
|
|
status: Current status (A2, A4, etc.)
|
|
webhook_sent: Whether webhook was sent
|
|
|
|
Returns:
|
|
dict with success boolean
|
|
"""
|
|
conn = self.get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
|
|
# Insert or update campaign status
|
|
cursor.execute("""
|
|
INSERT INTO campaign_status (
|
|
campaign_id, campaign_number, campaign_name,
|
|
live_campaign, status, webhook_sent, webhook_sent_at
|
|
) VALUES (%s, %s, %s, %s, %s, %s, %s)
|
|
ON CONFLICT (campaign_id) DO UPDATE SET
|
|
campaign_number = EXCLUDED.campaign_number,
|
|
campaign_name = EXCLUDED.campaign_name,
|
|
live_campaign = EXCLUDED.live_campaign,
|
|
status = EXCLUDED.status,
|
|
webhook_sent = EXCLUDED.webhook_sent,
|
|
webhook_sent_at = EXCLUDED.webhook_sent_at,
|
|
updated_at = CURRENT_TIMESTAMP
|
|
""", (
|
|
campaign_id,
|
|
campaign_number,
|
|
campaign_name,
|
|
live_campaign,
|
|
status,
|
|
webhook_sent,
|
|
datetime.now(timezone.utc) if webhook_sent else None
|
|
))
|
|
|
|
conn.commit()
|
|
logger.info("Recorded campaign status: {} - {} (Live: {})".format(
|
|
campaign_number, status, live_campaign
|
|
))
|
|
|
|
return {'success': True}
|
|
|
|
except Exception as e:
|
|
conn.rollback()
|
|
logger.error("Failed to record campaign status: {}".format(str(e)))
|
|
return {'success': False, 'error': str(e)}
|
|
|
|
finally:
|
|
cursor.close()
|
|
self.put_connection(conn)
|
|
|
|
def get_a1_retry_status(self, campaign_id):
|
|
"""
|
|
Get A1 retry status for campaign
|
|
|
|
Args:
|
|
campaign_id: DAM campaign folder ID
|
|
|
|
Returns:
|
|
dict with retry_count, last_retry_at, permanently_failed, failure_reason
|
|
Returns None if campaign not found
|
|
"""
|
|
conn = self.get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("""
|
|
SELECT a1_retry_count, a1_last_retry_at,
|
|
a1_permanently_failed, a1_failure_reason
|
|
FROM campaign_status
|
|
WHERE campaign_id = %s
|
|
""", (campaign_id,))
|
|
|
|
row = cursor.fetchone()
|
|
|
|
if row:
|
|
return {
|
|
'retry_count': row[0] or 0,
|
|
'last_retry_at': row[1],
|
|
'permanently_failed': row[2] or False,
|
|
'failure_reason': row[3]
|
|
}
|
|
else:
|
|
return None
|
|
|
|
finally:
|
|
cursor.close()
|
|
self.put_connection(conn)
|
|
|
|
def increment_a1_retry(self, campaign_id, campaign_number, campaign_name, reason):
|
|
"""
|
|
Increment A1 retry counter and mark as permanently failed if max attempts reached
|
|
|
|
Args:
|
|
campaign_id: DAM campaign folder ID
|
|
campaign_number: Campaign number (e.g., C000000078)
|
|
campaign_name: Campaign name
|
|
reason: Description of failure (e.g., "No master assets found")
|
|
|
|
Returns:
|
|
dict with success, retry_count, permanently_failed
|
|
"""
|
|
conn = self.get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
|
|
# Maximum retry attempts before marking as permanently failed
|
|
MAX_RETRIES = 3
|
|
|
|
# Get current retry count
|
|
cursor.execute("""
|
|
SELECT a1_retry_count FROM campaign_status
|
|
WHERE campaign_id = %s
|
|
""", (campaign_id,))
|
|
|
|
row = cursor.fetchone()
|
|
current_count = (row[0] or 0) if row else 0
|
|
new_count = current_count + 1
|
|
is_permanently_failed = new_count >= MAX_RETRIES
|
|
|
|
# Insert or update campaign status with retry tracking
|
|
cursor.execute("""
|
|
INSERT INTO campaign_status (
|
|
campaign_id, campaign_number, campaign_name,
|
|
live_campaign, status, webhook_sent,
|
|
a1_retry_count, a1_last_retry_at,
|
|
a1_permanently_failed, a1_failure_reason
|
|
) VALUES (%s, %s, %s, 'NO', 'A1', FALSE, %s, CURRENT_TIMESTAMP, %s, %s)
|
|
ON CONFLICT (campaign_id) DO UPDATE SET
|
|
a1_retry_count = EXCLUDED.a1_retry_count,
|
|
a1_last_retry_at = EXCLUDED.a1_last_retry_at,
|
|
a1_permanently_failed = EXCLUDED.a1_permanently_failed,
|
|
a1_failure_reason = EXCLUDED.a1_failure_reason,
|
|
updated_at = CURRENT_TIMESTAMP
|
|
""", (
|
|
campaign_id,
|
|
campaign_number,
|
|
campaign_name,
|
|
new_count,
|
|
is_permanently_failed,
|
|
reason if is_permanently_failed else None
|
|
))
|
|
|
|
conn.commit()
|
|
|
|
logger.info("A1 retry tracking: Campaign {} - Attempt {}/{} (Permanently Failed: {})".format(
|
|
campaign_number, new_count, MAX_RETRIES, is_permanently_failed
|
|
))
|
|
|
|
return {
|
|
'success': True,
|
|
'retry_count': new_count,
|
|
'permanently_failed': is_permanently_failed
|
|
}
|
|
|
|
except Exception as e:
|
|
conn.rollback()
|
|
logger.error("Failed to increment A1 retry: {}".format(str(e)))
|
|
return {'success': False, 'error': str(e)}
|
|
|
|
finally:
|
|
cursor.close()
|
|
self.put_connection(conn)
|
|
|
|
def reset_a1_retry(self, campaign_id):
|
|
"""
|
|
Reset A1 retry tracking for campaign (used when campaign is fixed manually)
|
|
|
|
Args:
|
|
campaign_id: DAM campaign folder ID
|
|
|
|
Returns:
|
|
dict with success boolean
|
|
"""
|
|
conn = self.get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("""
|
|
UPDATE campaign_status
|
|
SET a1_retry_count = 0,
|
|
a1_last_retry_at = NULL,
|
|
a1_permanently_failed = FALSE,
|
|
a1_failure_reason = NULL,
|
|
updated_at = CURRENT_TIMESTAMP
|
|
WHERE campaign_id = %s
|
|
""", (campaign_id,))
|
|
|
|
conn.commit()
|
|
logger.info("Reset A1 retry tracking for campaign: {}".format(campaign_id))
|
|
|
|
return {'success': True}
|
|
|
|
except Exception as e:
|
|
conn.rollback()
|
|
logger.error("Failed to reset A1 retry: {}".format(str(e)))
|
|
return {'success': False, 'error': str(e)}
|
|
|
|
finally:
|
|
cursor.close()
|
|
self.put_connection(conn)
|
|
|
|
def check_campaign_processed(self, campaign_id):
|
|
"""
|
|
Check if campaign has already been processed (webhook sent)
|
|
|
|
Args:
|
|
campaign_id: DAM campaign folder ID
|
|
|
|
Returns:
|
|
dict with exists (bool) and campaign_status data if exists
|
|
"""
|
|
conn = self.get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("""
|
|
SELECT campaign_id, campaign_number, campaign_name,
|
|
live_campaign, status, webhook_sent, webhook_sent_at
|
|
FROM campaign_status
|
|
WHERE campaign_id = %s
|
|
""", (campaign_id,))
|
|
|
|
row = cursor.fetchone()
|
|
|
|
if row:
|
|
return {
|
|
'exists': True,
|
|
'campaign_id': row[0],
|
|
'campaign_number': row[1],
|
|
'campaign_name': row[2],
|
|
'live_campaign': row[3],
|
|
'status': row[4],
|
|
'webhook_sent': row[5],
|
|
'webhook_sent_at': row[6]
|
|
}
|
|
else:
|
|
return {'exists': False}
|
|
|
|
finally:
|
|
cursor.close()
|
|
self.put_connection(conn)
|
|
|
|
def store_creativex_score(self, filename, creativex_id, creativex_url, quality_score, box_file_id, full_extraction_data, tracking_id=None, status='active'):
|
|
"""
|
|
Store CreativeX score data extracted from PDF or DAM metadata
|
|
Uses soft delete strategy for 'active' status - marks old records as 'superseded'
|
|
For 'master-cx-score' status - no versioning, just stores reference
|
|
|
|
Args:
|
|
filename: Original filename from extraction
|
|
creativex_id: CreativeX ID from extraction
|
|
creativex_url: CreativeX URL from extraction
|
|
quality_score: Quality score percentage
|
|
box_file_id: Box file ID for tracking
|
|
full_extraction_data: Complete LlamaExtract JSON response or DAM metadata
|
|
tracking_id: Optional tracking ID (used for master-cx-score status)
|
|
status: 'active' (default), 'master-cx-score', or custom status
|
|
|
|
Returns:
|
|
dict with success boolean and is_update flag
|
|
"""
|
|
conn = self.get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
|
|
# Convert full_extraction_data to JSON string if it's a dict
|
|
import json
|
|
full_json = json.dumps(full_extraction_data) if isinstance(full_extraction_data, dict) else full_extraction_data
|
|
|
|
# Handle master-cx-score differently (no versioning, just reference storage)
|
|
if status == 'master-cx-score':
|
|
# Simple insert for master score reference (no versioning)
|
|
cursor.execute("""
|
|
INSERT INTO creativex_scores (
|
|
filename, creativex_id, creativex_url, quality_score,
|
|
box_file_id, full_extraction_data, tracking_id, status
|
|
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
|
|
""", (
|
|
filename,
|
|
creativex_id,
|
|
creativex_url,
|
|
quality_score,
|
|
box_file_id,
|
|
full_json,
|
|
tracking_id,
|
|
'master-cx-score'
|
|
))
|
|
|
|
conn.commit()
|
|
logger.info("Stored master CreativeX score: {} (Tracking: {}, Score: {})".format(
|
|
filename, tracking_id, quality_score
|
|
))
|
|
|
|
return {
|
|
'success': True,
|
|
'is_update': False,
|
|
'version_number': 1
|
|
}
|
|
|
|
# For 'active' status - use soft delete versioning
|
|
# Step 1: Check if filename already exists with status='active'
|
|
# Also count total versions for this filename
|
|
cursor.execute("""
|
|
SELECT id, quality_score FROM creativex_scores
|
|
WHERE filename = %s AND status = 'active'
|
|
""", (filename,))
|
|
|
|
existing = cursor.fetchone()
|
|
|
|
# Count total versions (including superseded)
|
|
cursor.execute("""
|
|
SELECT COUNT(*) FROM creativex_scores
|
|
WHERE filename = %s
|
|
""", (filename,))
|
|
|
|
total_versions = cursor.fetchone()[0]
|
|
|
|
if existing:
|
|
# Step 2: Mark existing record(s) as 'superseded'
|
|
cursor.execute("""
|
|
UPDATE creativex_scores
|
|
SET status = 'superseded'
|
|
WHERE filename = %s AND status = 'active'
|
|
""", (filename,))
|
|
|
|
logger.info("Superseded previous CreativeX score for: {} (old score: {})".format(
|
|
filename, existing[1]
|
|
))
|
|
|
|
# Step 3: Insert new 'active' record
|
|
cursor.execute("""
|
|
INSERT INTO creativex_scores (
|
|
filename, creativex_id, creativex_url, quality_score,
|
|
box_file_id, full_extraction_data, tracking_id, status
|
|
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
|
|
""", (
|
|
filename,
|
|
creativex_id,
|
|
creativex_url,
|
|
quality_score,
|
|
box_file_id,
|
|
full_json,
|
|
tracking_id,
|
|
status
|
|
))
|
|
|
|
conn.commit()
|
|
|
|
# New version number is total_versions + 1
|
|
version_number = total_versions + 1
|
|
|
|
if existing:
|
|
logger.info("Updated CreativeX score: {} (Score: {} -> {}, Version: {})".format(
|
|
filename, existing[1], quality_score, version_number
|
|
))
|
|
else:
|
|
logger.info("Stored new CreativeX score: {} (Score: {}, Version: {})".format(
|
|
filename, quality_score, version_number
|
|
))
|
|
|
|
return {
|
|
'success': True,
|
|
'is_update': bool(existing),
|
|
'version_number': version_number
|
|
}
|
|
|
|
except Exception as e:
|
|
conn.rollback()
|
|
logger.error("Failed to store CreativeX score: {}".format(str(e)))
|
|
return {'success': False, 'error': str(e)}
|
|
|
|
finally:
|
|
cursor.close()
|
|
self.put_connection(conn)
|
|
|
|
def get_creativex_score_by_filename(self, filename):
|
|
"""
|
|
Get CreativeX score data by filename
|
|
|
|
Performs extension-agnostic lookup: if exact filename not found,
|
|
tries common video/image extensions (.mp4, .jpg, .png, .mov, etc.)
|
|
|
|
Args:
|
|
filename: Filename to search for
|
|
|
|
Returns:
|
|
dict with creativex data or None if not found
|
|
"""
|
|
import os
|
|
|
|
conn = self.get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
|
|
# Try exact filename first
|
|
cursor.execute("""
|
|
SELECT filename, creativex_id, creativex_url, quality_score,
|
|
box_file_id, full_extraction_data, extracted_at
|
|
FROM creativex_scores
|
|
WHERE filename = %s AND status = 'active'
|
|
ORDER BY extracted_at DESC
|
|
LIMIT 1
|
|
""", (filename,))
|
|
|
|
row = cursor.fetchone()
|
|
|
|
# If not found, try alternative extensions
|
|
if not row:
|
|
base_name, ext = os.path.splitext(filename)
|
|
# Common extensions for video and image files
|
|
alt_extensions = ['.mp4', '.jpg', '.jpeg', '.png', '.mov', '.avi', '.gif', '.webp']
|
|
|
|
# Remove the current extension from alternatives to avoid duplicate search
|
|
alt_extensions = [e for e in alt_extensions if e.lower() != ext.lower()]
|
|
|
|
for alt_ext in alt_extensions:
|
|
alt_filename = base_name + alt_ext
|
|
cursor.execute("""
|
|
SELECT filename, creativex_id, creativex_url, quality_score,
|
|
box_file_id, full_extraction_data, extracted_at
|
|
FROM creativex_scores
|
|
WHERE filename = %s AND status = 'active'
|
|
ORDER BY extracted_at DESC
|
|
LIMIT 1
|
|
""", (alt_filename,))
|
|
|
|
row = cursor.fetchone()
|
|
if row:
|
|
break # Found with alternative extension
|
|
|
|
if not row:
|
|
return None
|
|
|
|
# Parse JSONB as dict
|
|
import json
|
|
full_data = row[5] if isinstance(row[5], dict) else json.loads(row[5])
|
|
|
|
return {
|
|
'filename': row[0],
|
|
'creativex_id': row[1],
|
|
'creativex_url': row[2],
|
|
'quality_score': row[3],
|
|
'box_file_id': row[4],
|
|
'full_extraction_data': full_data,
|
|
'extracted_at': row[6]
|
|
}
|
|
|
|
finally:
|
|
cursor.close()
|
|
self.put_connection(conn)
|
|
|
|
def get_all_live_campaigns(self):
|
|
"""
|
|
Get all live campaigns for CSV report
|
|
|
|
Returns:
|
|
list of dicts with campaign_number, campaign_name
|
|
"""
|
|
conn = self.get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("""
|
|
SELECT campaign_number, campaign_name
|
|
FROM campaign_status
|
|
WHERE live_campaign = 'YES'
|
|
ORDER BY campaign_number DESC
|
|
""")
|
|
|
|
rows = cursor.fetchall()
|
|
|
|
campaigns = []
|
|
for row in rows:
|
|
campaigns.append({
|
|
'campaign_number': row[0],
|
|
'campaign_name': row[1]
|
|
})
|
|
|
|
return campaigns
|
|
|
|
finally:
|
|
cursor.close()
|
|
self.put_connection(conn)
|
|
|
|
def close(self):
|
|
"""Close all connections in pool"""
|
|
if self.pool:
|
|
self.pool.closeall()
|