Email Template Fix:
- Fixed subject line syntax error in a2_to_a3_batch_complete template
- Removed Jinja2 control flow ({% if %}) from subject line
- Changed to simple expression-only format
- Fixes 'Failed to send email' error
Database Logging Fix:
- Updated get_master_asset() to return database primary key 'id'
- Updated store_derivative_asset() to actually store master_asset_id and dam_asset_id
- Updated a2_to_a3_upload_polling.py to pass master_asset['id'] instead of None
- Added migration script to add dam_asset_id column to derivative_assets table
- Fixes issue where derivatives weren't being linked to masters in database
- Enables proper lookups and tracking of uploaded derivatives
Impact:
- Email notifications will now send successfully
- Derivatives will be properly logged and linked to master assets
- Other tools can now find uploaded derivatives in database
772 lines
28 KiB
Python
772 lines
28 KiB
Python
"""
|
|
Database Client - PostgreSQL Operations
|
|
Handles tracking IDs, master assets, and all-done checks
|
|
Compatible with Python 3.6+
|
|
"""
|
|
|
|
import psycopg2
|
|
import psycopg2.pool
|
|
import json
|
|
import random
|
|
import string
|
|
import logging
|
|
from datetime import datetime, timezone
|
|
|
|
logger = logging.getLogger('Database')
|
|
|
|
class Database:
|
|
def __init__(self, config):
|
|
self.config = config['database']
|
|
|
|
# Create connection pool
|
|
try:
|
|
self.pool = psycopg2.pool.ThreadedConnectionPool(
|
|
minconn=1,
|
|
maxconn=10,
|
|
host=self.config['host'],
|
|
port=self.config['port'],
|
|
database=self.config['database'],
|
|
user=self.config['user'],
|
|
password=self.config['password']
|
|
)
|
|
logger.info("Database connection pool created")
|
|
except Exception as e:
|
|
logger.error("Database connection failed: {}".format(str(e)))
|
|
raise
|
|
|
|
def get_connection(self):
|
|
"""Get connection from pool"""
|
|
return self.pool.getconn()
|
|
|
|
def put_connection(self, conn):
|
|
"""Return connection to pool"""
|
|
self.pool.putconn(conn)
|
|
|
|
def generate_unique_tracking_id(self, is_master=False):
|
|
"""
|
|
Generate unique 6-character tracking ID
|
|
|
|
Args:
|
|
is_master: If True, generates ID starting with 'M' (for B1→B2 master files)
|
|
If False, generates ID that never starts with 'M' (for A1→A2 regular files)
|
|
|
|
Returns:
|
|
str: 6-character alphanumeric ID
|
|
"""
|
|
conn = self.get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
|
|
for attempt in range(100):
|
|
if is_master:
|
|
# Master files: M + 5 random chars (e.g., M4x7Qp)
|
|
tracking_id = 'M' + ''.join(random.choices(
|
|
string.ascii_letters + string.digits, k=5
|
|
))
|
|
else:
|
|
# Regular files: 6 random chars, but first char cannot be 'M'
|
|
# Remove uppercase 'M' from possible first characters
|
|
first_char_pool = string.ascii_letters.replace('M', '') + string.digits
|
|
first_char = random.choice(first_char_pool)
|
|
remaining = ''.join(random.choices(
|
|
string.ascii_letters + string.digits, k=5
|
|
))
|
|
tracking_id = first_char + remaining
|
|
|
|
# Check uniqueness (same query for both types)
|
|
cursor.execute(
|
|
"SELECT COUNT(*) FROM master_assets WHERE tracking_id = %s",
|
|
(tracking_id,)
|
|
)
|
|
|
|
count = cursor.fetchone()[0]
|
|
|
|
if count == 0:
|
|
logger.info("Generated tracking ID: {} (master={})".format(tracking_id, is_master))
|
|
return tracking_id
|
|
|
|
raise Exception("Failed to generate unique tracking ID after 100 attempts")
|
|
|
|
finally:
|
|
cursor.close()
|
|
self.put_connection(conn)
|
|
|
|
def find_or_create_tracking_id(self, opentext_id, local_campaign_id):
|
|
"""
|
|
Find existing tracking ID by opentext_id + local_campaign_id, or generate new one
|
|
|
|
This is used for A5→A6 rework workflow where assets may already exist from A1→A2.
|
|
We search by both opentext_id AND local_campaign_id to ensure we find the exact
|
|
asset from the original workflow for this specific campaign.
|
|
|
|
Args:
|
|
opentext_id: DAM asset ID
|
|
local_campaign_id: Local campaign ID (e.g., C000000078)
|
|
|
|
Returns:
|
|
dict with tracking_id (str) and is_existing (bool)
|
|
"""
|
|
conn = self.get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
|
|
# Search for existing asset by opentext_id AND local_campaign_id
|
|
cursor.execute("""
|
|
SELECT tracking_id
|
|
FROM master_assets
|
|
WHERE opentext_id = %s
|
|
AND local_campaign_id = %s
|
|
AND status = 'active'
|
|
LIMIT 1
|
|
""", (opentext_id, local_campaign_id))
|
|
|
|
row = cursor.fetchone()
|
|
|
|
if row:
|
|
# Found existing asset
|
|
tracking_id = row[0]
|
|
logger.info("Found existing asset: opentext_id={}, local_campaign_id={}, tracking_id={}".format(
|
|
opentext_id, local_campaign_id, tracking_id
|
|
))
|
|
return {
|
|
'tracking_id': tracking_id,
|
|
'is_existing': True
|
|
}
|
|
else:
|
|
# No existing asset found, generate new tracking ID
|
|
# A5→A6 rework assets are NOT masters, so is_master=False
|
|
logger.info("No existing asset found for opentext_id={}, local_campaign_id={}, generating new tracking ID".format(
|
|
opentext_id, local_campaign_id
|
|
))
|
|
tracking_id = self.generate_unique_tracking_id(is_master=False)
|
|
return {
|
|
'tracking_id': tracking_id,
|
|
'is_existing': False
|
|
}
|
|
|
|
finally:
|
|
cursor.close()
|
|
self.put_connection(conn)
|
|
|
|
def store_master_asset(self, tracking_id, opentext_id, asset_data, box_file_id, box_url, upload_folder_id, global_master_campaign_id=None, global_master_folder_id=None, local_campaign_id=None):
|
|
"""
|
|
Store master asset with FULL metadata in JSONB column
|
|
|
|
Args:
|
|
tracking_id: 6-char tracking ID
|
|
opentext_id: DAM asset ID
|
|
asset_data: Complete DAM asset JSON
|
|
box_file_id: Box file ID
|
|
box_url: Box URL
|
|
upload_folder_id: Final Assets folder ID for upload
|
|
global_master_campaign_id: Global master campaign ID (from GLOBAL CAMPAIGN REFERENCE)
|
|
global_master_folder_id: Global master folder ID
|
|
local_campaign_id: Local campaign ID (immediate campaign this asset belongs to)
|
|
|
|
Returns:
|
|
dict with success boolean
|
|
"""
|
|
conn = self.get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
|
|
# Extract basic info
|
|
name = asset_data.get('name', 'unknown')
|
|
name_parts = name.rsplit('.', 1)
|
|
filename = name_parts[0]
|
|
extension = '.' + name_parts[1] if len(name_parts) > 1 else ''
|
|
|
|
# Store complete metadata as JSONB
|
|
full_metadata_json = json.dumps(asset_data)
|
|
|
|
# Description with Box info
|
|
description = "Box File ID: {}\nBox URL: {}\nDAM Asset ID: {}".format(
|
|
box_file_id, box_url, opentext_id
|
|
)
|
|
|
|
# Insert or update
|
|
cursor.execute("""
|
|
INSERT INTO master_assets (
|
|
tracking_id, opentext_id, original_filename, file_extension,
|
|
file_size_bytes, mime_type, upload_directory,
|
|
description, full_metadata, status,
|
|
global_master_campaign_id, global_master_folder_id, local_campaign_id
|
|
) VALUES (
|
|
%s, %s, %s, %s, %s, %s, %s, %s, %s, 'active', %s, %s, %s
|
|
)
|
|
ON CONFLICT (tracking_id) DO UPDATE SET
|
|
upload_directory = EXCLUDED.upload_directory,
|
|
description = EXCLUDED.description,
|
|
full_metadata = EXCLUDED.full_metadata,
|
|
global_master_campaign_id = EXCLUDED.global_master_campaign_id,
|
|
global_master_folder_id = EXCLUDED.global_master_folder_id,
|
|
local_campaign_id = EXCLUDED.local_campaign_id,
|
|
updated_at = CURRENT_TIMESTAMP
|
|
""", (
|
|
tracking_id,
|
|
opentext_id,
|
|
filename,
|
|
extension,
|
|
asset_data.get('file_size'),
|
|
asset_data.get('mime_type'),
|
|
upload_folder_id,
|
|
description,
|
|
full_metadata_json,
|
|
global_master_campaign_id,
|
|
global_master_folder_id,
|
|
local_campaign_id
|
|
))
|
|
|
|
conn.commit()
|
|
logger.info("Stored master asset: {}".format(tracking_id))
|
|
|
|
return {'success': True, 'tracking_id': tracking_id}
|
|
|
|
except Exception as e:
|
|
conn.rollback()
|
|
logger.error("Failed to store master asset: {}".format(str(e)))
|
|
return {'success': False, 'error': str(e)}
|
|
|
|
finally:
|
|
cursor.close()
|
|
self.put_connection(conn)
|
|
|
|
def get_master_asset(self, tracking_id):
|
|
"""
|
|
Get master asset by tracking ID
|
|
|
|
Returns:
|
|
dict with tracking_id, opentext_id, upload_directory, full_metadata
|
|
"""
|
|
conn = self.get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("""
|
|
SELECT id, tracking_id, opentext_id, upload_directory, full_metadata, description
|
|
FROM master_assets
|
|
WHERE tracking_id = %s AND status = 'active'
|
|
""", (tracking_id,))
|
|
|
|
row = cursor.fetchone()
|
|
|
|
if not row:
|
|
return None
|
|
|
|
# Parse JSONB as dict
|
|
full_metadata = row[4] if isinstance(row[4], dict) else json.loads(row[4])
|
|
|
|
return {
|
|
'id': row[0],
|
|
'tracking_id': row[1],
|
|
'opentext_id': row[2],
|
|
'upload_directory': row[3],
|
|
'full_metadata': full_metadata,
|
|
'description': row[5]
|
|
}
|
|
|
|
finally:
|
|
cursor.close()
|
|
self.put_connection(conn)
|
|
|
|
def check_campaign_upload_complete(self, campaign_id):
|
|
"""
|
|
Check if ALL master assets for a campaign have been uploaded
|
|
|
|
Args:
|
|
campaign_id: Campaign ID
|
|
|
|
Returns:
|
|
bool: True if all assets uploaded, False otherwise
|
|
"""
|
|
conn = self.get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
|
|
# Count total master assets for this campaign
|
|
cursor.execute("""
|
|
SELECT COUNT(DISTINCT tracking_id)
|
|
FROM master_assets
|
|
WHERE campaign_id = %s AND status = 'active'
|
|
""", (campaign_id,))
|
|
|
|
total_masters = cursor.fetchone()[0]
|
|
|
|
if total_masters == 0:
|
|
return False
|
|
|
|
# Count how many have been uploaded (exist in derivative_assets)
|
|
cursor.execute("""
|
|
SELECT COUNT(DISTINCT ma.tracking_id)
|
|
FROM master_assets ma
|
|
INNER JOIN derivative_assets da ON ma.tracking_id = da.tracking_id
|
|
WHERE ma.campaign_id = %s AND ma.status = 'active'
|
|
AND da.upload_status = 'completed'
|
|
""", (campaign_id,))
|
|
|
|
uploaded_count = cursor.fetchone()[0]
|
|
|
|
all_done = uploaded_count == total_masters
|
|
|
|
logger.info("Campaign {} upload status: {}/{} assets uploaded{}".format(
|
|
campaign_id, uploaded_count, total_masters,
|
|
" - ALL DONE" if all_done else ""
|
|
))
|
|
|
|
return all_done
|
|
|
|
finally:
|
|
cursor.close()
|
|
self.put_connection(conn)
|
|
|
|
def store_derivative_asset(self, tracking_id, master_asset_id, dam_asset_id, filename):
|
|
"""
|
|
Store derivative asset record after upload
|
|
Uses existing schema columns only
|
|
|
|
Args:
|
|
tracking_id: Master asset tracking ID
|
|
master_asset_id: Master asset DB ID (optional, can be None)
|
|
dam_asset_id: DAM asset ID of uploaded derivative (logged but not stored)
|
|
filename: Clean filename of derivative
|
|
|
|
Returns:
|
|
dict with success boolean
|
|
"""
|
|
conn = self.get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
|
|
# Extract filename parts
|
|
name_without_ext = filename.rsplit('.', 1)[0] if '.' in filename else filename
|
|
extension = '.' + filename.rsplit('.', 1)[1] if '.' in filename else ''
|
|
|
|
# Store derivative with master link and DAM asset ID
|
|
cursor.execute("""
|
|
INSERT INTO derivative_assets (
|
|
tracking_id, master_asset_id, dam_asset_id, derivative_filename, file_extension, status
|
|
) VALUES (%s, %s, %s, %s, %s, 'active')
|
|
""", (tracking_id, master_asset_id, dam_asset_id, name_without_ext, extension))
|
|
|
|
conn.commit()
|
|
logger.info("Stored derivative asset: {} → {} (Master ID: {}, DAM ID: {})".format(
|
|
tracking_id, filename, master_asset_id, dam_asset_id
|
|
))
|
|
|
|
return {'success': True}
|
|
|
|
except Exception as e:
|
|
conn.rollback()
|
|
logger.error("Failed to store derivative: {}".format(str(e)))
|
|
return {'success': False, 'error': str(e)}
|
|
|
|
finally:
|
|
cursor.close()
|
|
self.put_connection(conn)
|
|
|
|
def get_campaign_asset_count(self, campaign_id):
|
|
"""Get total master asset count for campaign"""
|
|
conn = self.get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("""
|
|
SELECT COUNT(*) FROM master_assets
|
|
WHERE campaign_id = %s AND status = 'active'
|
|
""", (campaign_id,))
|
|
|
|
return cursor.fetchone()[0]
|
|
|
|
finally:
|
|
cursor.close()
|
|
self.put_connection(conn)
|
|
|
|
def test_connection(self):
|
|
"""Test database connection"""
|
|
try:
|
|
conn = self.get_connection()
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT 1")
|
|
cursor.close()
|
|
self.put_connection(conn)
|
|
logger.info("Database connection OK")
|
|
return True
|
|
except Exception as e:
|
|
logger.error("Database connection failed: {}".format(str(e)))
|
|
return False
|
|
|
|
def extract_global_campaign_reference(self, asset_data):
|
|
"""
|
|
Extract Global Campaign Reference and Local Campaign ID from asset metadata
|
|
|
|
Args:
|
|
asset_data: Complete DAM asset JSON
|
|
|
|
Returns:
|
|
dict with global_master_campaign_id, global_master_folder_id, local_campaign_id
|
|
"""
|
|
global_master_campaign_id = None
|
|
global_master_folder_id = None
|
|
local_campaign_id = None
|
|
|
|
# Look in inherited_metadata_collections
|
|
collections = asset_data.get('inherited_metadata_collections', [])
|
|
|
|
for collection in collections:
|
|
# Only check campaign containers
|
|
if collection.get('container_type_name') == 'L7+ - CAMPAIGN':
|
|
inherited_metadata = collection.get('inherited_metadata_values', [])
|
|
|
|
campaign_id_in_collection = None
|
|
has_global_reference = False
|
|
|
|
for inherited in inherited_metadata:
|
|
metadata_element = inherited.get('metadata_element', {})
|
|
|
|
# Extract Campaign ID
|
|
if metadata_element.get('id') == 'FERRERO.FIELD.CAMPAIGN ID':
|
|
campaign_id_value = metadata_element.get('value', {}).get('value', {}).get('value')
|
|
if campaign_id_value:
|
|
campaign_id_in_collection = campaign_id_value
|
|
|
|
# Look for GLOBAL CAMPAIGN REFERENCE field
|
|
if metadata_element.get('id') == 'FERRERO.FIELD.GLOBAL CAMPAIGN REFERENCE':
|
|
value = metadata_element.get('value', {}).get('value', {}).get('value')
|
|
if value:
|
|
global_master_campaign_id = value
|
|
has_global_reference = True
|
|
logger.info("Found Global Campaign Reference: {}".format(value))
|
|
|
|
# If this campaign has a global reference, it's the local campaign
|
|
if has_global_reference and campaign_id_in_collection:
|
|
local_campaign_id = campaign_id_in_collection
|
|
logger.info("Local Campaign ID: {}".format(local_campaign_id))
|
|
|
|
# Get the container ID (global master folder)
|
|
container_id = collection.get('container_id')
|
|
if container_id:
|
|
global_master_folder_id = container_id
|
|
logger.info("Global master folder ID: {}".format(container_id))
|
|
|
|
return {
|
|
'global_master_campaign_id': global_master_campaign_id,
|
|
'global_master_folder_id': global_master_folder_id,
|
|
'local_campaign_id': local_campaign_id
|
|
}
|
|
|
|
def record_campaign_status(self, campaign_id, campaign_number, campaign_name, live_campaign, status, webhook_sent=False):
|
|
"""
|
|
Record or update campaign status for webhook tracking
|
|
|
|
Args:
|
|
campaign_id: DAM campaign folder ID
|
|
campaign_number: Campaign number (e.g., C000000078)
|
|
campaign_name: Campaign name
|
|
live_campaign: 'YES' or 'NO'
|
|
status: Current status (A2, A4, etc.)
|
|
webhook_sent: Whether webhook was sent
|
|
|
|
Returns:
|
|
dict with success boolean
|
|
"""
|
|
conn = self.get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
|
|
# Insert or update campaign status
|
|
cursor.execute("""
|
|
INSERT INTO campaign_status (
|
|
campaign_id, campaign_number, campaign_name,
|
|
live_campaign, status, webhook_sent, webhook_sent_at
|
|
) VALUES (%s, %s, %s, %s, %s, %s, %s)
|
|
ON CONFLICT (campaign_id) DO UPDATE SET
|
|
campaign_number = EXCLUDED.campaign_number,
|
|
campaign_name = EXCLUDED.campaign_name,
|
|
live_campaign = EXCLUDED.live_campaign,
|
|
status = EXCLUDED.status,
|
|
webhook_sent = EXCLUDED.webhook_sent,
|
|
webhook_sent_at = EXCLUDED.webhook_sent_at,
|
|
updated_at = CURRENT_TIMESTAMP
|
|
""", (
|
|
campaign_id,
|
|
campaign_number,
|
|
campaign_name,
|
|
live_campaign,
|
|
status,
|
|
webhook_sent,
|
|
datetime.now(timezone.utc) if webhook_sent else None
|
|
))
|
|
|
|
conn.commit()
|
|
logger.info("Recorded campaign status: {} - {} (Live: {})".format(
|
|
campaign_number, status, live_campaign
|
|
))
|
|
|
|
return {'success': True}
|
|
|
|
except Exception as e:
|
|
conn.rollback()
|
|
logger.error("Failed to record campaign status: {}".format(str(e)))
|
|
return {'success': False, 'error': str(e)}
|
|
|
|
finally:
|
|
cursor.close()
|
|
self.put_connection(conn)
|
|
|
|
def check_campaign_processed(self, campaign_id):
|
|
"""
|
|
Check if campaign has already been processed (webhook sent)
|
|
|
|
Args:
|
|
campaign_id: DAM campaign folder ID
|
|
|
|
Returns:
|
|
dict with exists (bool) and campaign_status data if exists
|
|
"""
|
|
conn = self.get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("""
|
|
SELECT campaign_id, campaign_number, campaign_name,
|
|
live_campaign, status, webhook_sent, webhook_sent_at
|
|
FROM campaign_status
|
|
WHERE campaign_id = %s
|
|
""", (campaign_id,))
|
|
|
|
row = cursor.fetchone()
|
|
|
|
if row:
|
|
return {
|
|
'exists': True,
|
|
'campaign_id': row[0],
|
|
'campaign_number': row[1],
|
|
'campaign_name': row[2],
|
|
'live_campaign': row[3],
|
|
'status': row[4],
|
|
'webhook_sent': row[5],
|
|
'webhook_sent_at': row[6]
|
|
}
|
|
else:
|
|
return {'exists': False}
|
|
|
|
finally:
|
|
cursor.close()
|
|
self.put_connection(conn)
|
|
|
|
def store_creativex_score(self, filename, creativex_id, creativex_url, quality_score, box_file_id, full_extraction_data, tracking_id=None, status='active'):
|
|
"""
|
|
Store CreativeX score data extracted from PDF or DAM metadata
|
|
Uses soft delete strategy for 'active' status - marks old records as 'superseded'
|
|
For 'master-cx-score' status - no versioning, just stores reference
|
|
|
|
Args:
|
|
filename: Original filename from extraction
|
|
creativex_id: CreativeX ID from extraction
|
|
creativex_url: CreativeX URL from extraction
|
|
quality_score: Quality score percentage
|
|
box_file_id: Box file ID for tracking
|
|
full_extraction_data: Complete LlamaExtract JSON response or DAM metadata
|
|
tracking_id: Optional tracking ID (used for master-cx-score status)
|
|
status: 'active' (default), 'master-cx-score', or custom status
|
|
|
|
Returns:
|
|
dict with success boolean and is_update flag
|
|
"""
|
|
conn = self.get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
|
|
# Convert full_extraction_data to JSON string if it's a dict
|
|
import json
|
|
full_json = json.dumps(full_extraction_data) if isinstance(full_extraction_data, dict) else full_extraction_data
|
|
|
|
# Handle master-cx-score differently (no versioning, just reference storage)
|
|
if status == 'master-cx-score':
|
|
# Simple insert for master score reference (no versioning)
|
|
cursor.execute("""
|
|
INSERT INTO creativex_scores (
|
|
filename, creativex_id, creativex_url, quality_score,
|
|
box_file_id, full_extraction_data, tracking_id, status
|
|
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
|
|
""", (
|
|
filename,
|
|
creativex_id,
|
|
creativex_url,
|
|
quality_score,
|
|
box_file_id,
|
|
full_json,
|
|
tracking_id,
|
|
'master-cx-score'
|
|
))
|
|
|
|
conn.commit()
|
|
logger.info("Stored master CreativeX score: {} (Tracking: {}, Score: {})".format(
|
|
filename, tracking_id, quality_score
|
|
))
|
|
|
|
return {
|
|
'success': True,
|
|
'is_update': False,
|
|
'version_number': 1
|
|
}
|
|
|
|
# For 'active' status - use soft delete versioning
|
|
# Step 1: Check if filename already exists with status='active'
|
|
# Also count total versions for this filename
|
|
cursor.execute("""
|
|
SELECT id, quality_score FROM creativex_scores
|
|
WHERE filename = %s AND status = 'active'
|
|
""", (filename,))
|
|
|
|
existing = cursor.fetchone()
|
|
|
|
# Count total versions (including superseded)
|
|
cursor.execute("""
|
|
SELECT COUNT(*) FROM creativex_scores
|
|
WHERE filename = %s
|
|
""", (filename,))
|
|
|
|
total_versions = cursor.fetchone()[0]
|
|
|
|
if existing:
|
|
# Step 2: Mark existing record(s) as 'superseded'
|
|
cursor.execute("""
|
|
UPDATE creativex_scores
|
|
SET status = 'superseded'
|
|
WHERE filename = %s AND status = 'active'
|
|
""", (filename,))
|
|
|
|
logger.info("Superseded previous CreativeX score for: {} (old score: {})".format(
|
|
filename, existing[1]
|
|
))
|
|
|
|
# Step 3: Insert new 'active' record
|
|
cursor.execute("""
|
|
INSERT INTO creativex_scores (
|
|
filename, creativex_id, creativex_url, quality_score,
|
|
box_file_id, full_extraction_data, tracking_id, status
|
|
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
|
|
""", (
|
|
filename,
|
|
creativex_id,
|
|
creativex_url,
|
|
quality_score,
|
|
box_file_id,
|
|
full_json,
|
|
tracking_id,
|
|
status
|
|
))
|
|
|
|
conn.commit()
|
|
|
|
# New version number is total_versions + 1
|
|
version_number = total_versions + 1
|
|
|
|
if existing:
|
|
logger.info("Updated CreativeX score: {} (Score: {} -> {}, Version: {})".format(
|
|
filename, existing[1], quality_score, version_number
|
|
))
|
|
else:
|
|
logger.info("Stored new CreativeX score: {} (Score: {}, Version: {})".format(
|
|
filename, quality_score, version_number
|
|
))
|
|
|
|
return {
|
|
'success': True,
|
|
'is_update': bool(existing),
|
|
'version_number': version_number
|
|
}
|
|
|
|
except Exception as e:
|
|
conn.rollback()
|
|
logger.error("Failed to store CreativeX score: {}".format(str(e)))
|
|
return {'success': False, 'error': str(e)}
|
|
|
|
finally:
|
|
cursor.close()
|
|
self.put_connection(conn)
|
|
|
|
def get_creativex_score_by_filename(self, filename):
|
|
"""
|
|
Get CreativeX score data by filename
|
|
|
|
Args:
|
|
filename: Filename to search for
|
|
|
|
Returns:
|
|
dict with creativex data or None if not found
|
|
"""
|
|
conn = self.get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("""
|
|
SELECT filename, creativex_id, creativex_url, quality_score,
|
|
box_file_id, full_extraction_data, extracted_at
|
|
FROM creativex_scores
|
|
WHERE filename = %s AND status = 'active'
|
|
ORDER BY extracted_at DESC
|
|
LIMIT 1
|
|
""", (filename,))
|
|
|
|
row = cursor.fetchone()
|
|
|
|
if not row:
|
|
return None
|
|
|
|
# Parse JSONB as dict
|
|
import json
|
|
full_data = row[5] if isinstance(row[5], dict) else json.loads(row[5])
|
|
|
|
return {
|
|
'filename': row[0],
|
|
'creativex_id': row[1],
|
|
'creativex_url': row[2],
|
|
'quality_score': row[3],
|
|
'box_file_id': row[4],
|
|
'full_extraction_data': full_data,
|
|
'extracted_at': row[6]
|
|
}
|
|
|
|
finally:
|
|
cursor.close()
|
|
self.put_connection(conn)
|
|
|
|
def get_all_live_campaigns(self):
|
|
"""
|
|
Get all live campaigns for CSV report
|
|
|
|
Returns:
|
|
list of dicts with campaign_number, campaign_name
|
|
"""
|
|
conn = self.get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("""
|
|
SELECT campaign_number, campaign_name
|
|
FROM campaign_status
|
|
WHERE live_campaign = 'YES'
|
|
ORDER BY campaign_number DESC
|
|
""")
|
|
|
|
rows = cursor.fetchall()
|
|
|
|
campaigns = []
|
|
for row in rows:
|
|
campaigns.append({
|
|
'campaign_number': row[0],
|
|
'campaign_name': row[1]
|
|
})
|
|
|
|
return campaigns
|
|
|
|
finally:
|
|
cursor.close()
|
|
self.put_connection(conn)
|
|
|
|
def close(self):
|
|
"""Close all connections in pool"""
|
|
if self.pool:
|
|
self.pool.closeall()
|