ferrero-opentext/Python-Version/scripts/shared/database.py
nickviljoen 26363f772d Enhancement: Campaign re-opening support and PPR master asset ID registration
A1→A2 now handles re-processing when campaign is reset to A1 after adding new
master assets. Existing assets reuse tracking IDs and skip Box upload, new assets
are processed normally. Also includes PPR domain registration for multiple master
asset IDs in a2_to_a3 and dam_client.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-05 21:06:14 +02:00

839 lines
30 KiB
Python

"""
Database Client - PostgreSQL Operations
Handles tracking IDs, master assets, and all-done checks
Compatible with Python 3.6+
"""
import psycopg2
import psycopg2.pool
import json
import random
import string
import logging
from datetime import datetime, timezone
logger = logging.getLogger('Database')
class Database:
def __init__(self, config):
self.config = config['database']
# Create connection pool
try:
self.pool = psycopg2.pool.ThreadedConnectionPool(
minconn=1,
maxconn=10,
host=self.config['host'],
port=self.config['port'],
database=self.config['database'],
user=self.config['user'],
password=self.config['password']
)
logger.info("Database connection pool created")
except Exception as e:
logger.error("Database connection failed: {}".format(str(e)))
raise
def get_connection(self):
"""Get connection from pool"""
return self.pool.getconn()
def put_connection(self, conn):
"""Return connection to pool"""
self.pool.putconn(conn)
def generate_unique_tracking_id(self, is_master=False):
"""
Generate unique 6-character tracking ID
Args:
is_master: If True, generates ID starting with 'M' (for B1→B2 master files)
If False, generates ID that never starts with 'M' (for A1→A2 regular files)
Returns:
str: 6-character alphanumeric ID
"""
conn = self.get_connection()
try:
cursor = conn.cursor()
for attempt in range(100):
if is_master:
# Master files: M + 5 random chars (e.g., M4x7Qp)
tracking_id = 'M' + ''.join(random.choices(
string.ascii_letters + string.digits, k=5
))
else:
# Regular files: 6 random chars, but first char cannot be 'M'
# Remove uppercase 'M' from possible first characters
first_char_pool = string.ascii_letters.replace('M', '') + string.digits
first_char = random.choice(first_char_pool)
remaining = ''.join(random.choices(
string.ascii_letters + string.digits, k=5
))
tracking_id = first_char + remaining
# Check uniqueness (same query for both types)
cursor.execute(
"SELECT COUNT(*) FROM master_assets WHERE tracking_id = %s",
(tracking_id,)
)
count = cursor.fetchone()[0]
if count == 0:
logger.info("Generated tracking ID: {} (master={})".format(tracking_id, is_master))
return tracking_id
raise Exception("Failed to generate unique tracking ID after 100 attempts")
finally:
cursor.close()
self.put_connection(conn)
def find_or_create_tracking_id(self, opentext_id, local_campaign_id):
"""
Find existing tracking ID by opentext_id + local_campaign_id, or generate new one
This is used for A5→A6 rework workflow where assets may already exist from A1→A2.
We search by both opentext_id AND local_campaign_id to ensure we find the exact
asset from the original workflow for this specific campaign.
Args:
opentext_id: DAM asset ID
local_campaign_id: Local campaign ID (e.g., C000000078)
Returns:
dict with tracking_id (str) and is_existing (bool)
"""
conn = self.get_connection()
try:
cursor = conn.cursor()
# Search for existing asset by opentext_id AND local_campaign_id
cursor.execute("""
SELECT tracking_id
FROM master_assets
WHERE opentext_id = %s
AND local_campaign_id = %s
AND status = 'active'
LIMIT 1
""", (opentext_id, local_campaign_id))
row = cursor.fetchone()
if row:
# Found existing asset
tracking_id = row[0]
logger.info("Found existing asset: opentext_id={}, local_campaign_id={}, tracking_id={}".format(
opentext_id, local_campaign_id, tracking_id
))
return {
'tracking_id': tracking_id,
'is_existing': True
}
else:
# No existing asset found, generate new tracking ID
# A5→A6 rework assets are NOT masters, so is_master=False
logger.info("No existing asset found for opentext_id={}, local_campaign_id={}, generating new tracking ID".format(
opentext_id, local_campaign_id
))
tracking_id = self.generate_unique_tracking_id(is_master=False)
return {
'tracking_id': tracking_id,
'is_existing': False
}
finally:
cursor.close()
self.put_connection(conn)
def store_master_asset(self, tracking_id, opentext_id, asset_data, box_file_id, box_url, upload_folder_id, global_master_campaign_id=None, global_master_folder_id=None, local_campaign_id=None):
"""
Store master asset with FULL metadata in JSONB column
Args:
tracking_id: 6-char tracking ID
opentext_id: DAM asset ID
asset_data: Complete DAM asset JSON
box_file_id: Box file ID
box_url: Box URL
upload_folder_id: Final Assets folder ID for upload
global_master_campaign_id: Global master campaign ID (from GLOBAL CAMPAIGN REFERENCE)
global_master_folder_id: Global master folder ID
local_campaign_id: Local campaign ID (immediate campaign this asset belongs to)
Returns:
dict with success boolean
"""
conn = self.get_connection()
try:
cursor = conn.cursor()
# Extract basic info
name = asset_data.get('name', 'unknown')
name_parts = name.rsplit('.', 1)
filename = name_parts[0]
extension = '.' + name_parts[1] if len(name_parts) > 1 else ''
# Store complete metadata as JSONB
full_metadata_json = json.dumps(asset_data)
# Description with Box info
description = "Box File ID: {}\nBox URL: {}\nDAM Asset ID: {}".format(
box_file_id, box_url, opentext_id
)
# Insert or update
cursor.execute("""
INSERT INTO master_assets (
tracking_id, opentext_id, original_filename, file_extension,
file_size_bytes, mime_type, upload_directory,
description, full_metadata, status,
global_master_campaign_id, global_master_folder_id, local_campaign_id
) VALUES (
%s, %s, %s, %s, %s, %s, %s, %s, %s, 'active', %s, %s, %s
)
ON CONFLICT (tracking_id) DO UPDATE SET
upload_directory = EXCLUDED.upload_directory,
description = EXCLUDED.description,
full_metadata = EXCLUDED.full_metadata,
global_master_campaign_id = EXCLUDED.global_master_campaign_id,
global_master_folder_id = EXCLUDED.global_master_folder_id,
local_campaign_id = EXCLUDED.local_campaign_id,
updated_at = CURRENT_TIMESTAMP
""", (
tracking_id,
opentext_id,
filename,
extension,
asset_data.get('file_size'),
asset_data.get('mime_type'),
upload_folder_id,
description,
full_metadata_json,
global_master_campaign_id,
global_master_folder_id,
local_campaign_id
))
conn.commit()
logger.info("Stored master asset: {}".format(tracking_id))
return {'success': True, 'tracking_id': tracking_id}
except Exception as e:
conn.rollback()
logger.error("Failed to store master asset: {}".format(str(e)))
return {'success': False, 'error': str(e)}
finally:
cursor.close()
self.put_connection(conn)
def get_master_asset(self, tracking_id):
"""
Get master asset by tracking ID
Returns:
dict with tracking_id, opentext_id, upload_directory, full_metadata
"""
conn = self.get_connection()
try:
cursor = conn.cursor()
cursor.execute("""
SELECT tracking_id, opentext_id, upload_directory, full_metadata, description
FROM master_assets
WHERE tracking_id = %s AND status = 'active'
""", (tracking_id,))
row = cursor.fetchone()
if not row:
return None
# Parse JSONB as dict
full_metadata = row[3] if isinstance(row[3], dict) else json.loads(row[3])
# Parse Box info from description
box_info = self.parse_box_info_from_description(row[4])
return {
'tracking_id': row[0],
'opentext_id': row[1],
'upload_directory': row[2],
'full_metadata': full_metadata,
'description': row[4],
'box_file_id': box_info.get('box_file_id'),
'box_url': box_info.get('box_url')
}
finally:
cursor.close()
self.put_connection(conn)
@staticmethod
def parse_box_info_from_description(description):
"""
Parse Box file ID and URL from master asset description field.
Description format:
Box File ID: {id}
Box URL: {url}
DAM Asset ID: {opentext_id}
Returns:
dict with box_file_id and box_url (None if not found)
"""
result = {'box_file_id': None, 'box_url': None}
if not description:
return result
for line in description.split('\n'):
line = line.strip()
if line.startswith('Box File ID:'):
result['box_file_id'] = line.split(':', 1)[1].strip()
elif line.startswith('Box URL:'):
result['box_url'] = line.split(':', 1)[1].strip()
return result
def check_campaign_upload_complete(self, campaign_id):
"""
Check if ALL master assets for a campaign have been uploaded
Args:
campaign_id: Campaign ID
Returns:
bool: True if all assets uploaded, False otherwise
"""
conn = self.get_connection()
try:
cursor = conn.cursor()
# Count total master assets for this campaign
cursor.execute("""
SELECT COUNT(DISTINCT tracking_id)
FROM master_assets
WHERE campaign_id = %s AND status = 'active'
""", (campaign_id,))
total_masters = cursor.fetchone()[0]
if total_masters == 0:
return False
# Count how many have been uploaded (exist in derivative_assets)
cursor.execute("""
SELECT COUNT(DISTINCT ma.tracking_id)
FROM master_assets ma
INNER JOIN derivative_assets da ON ma.tracking_id = da.tracking_id
WHERE ma.campaign_id = %s AND ma.status = 'active'
AND da.upload_status = 'completed'
""", (campaign_id,))
uploaded_count = cursor.fetchone()[0]
all_done = uploaded_count == total_masters
logger.info("Campaign {} upload status: {}/{} assets uploaded{}".format(
campaign_id, uploaded_count, total_masters,
" - ALL DONE" if all_done else ""
))
return all_done
finally:
cursor.close()
self.put_connection(conn)
def store_derivative_asset(self, tracking_id, master_asset_id, dam_asset_id, filename):
"""
Store derivative asset record after upload
Uses existing schema columns only
Args:
tracking_id: Master asset tracking ID
master_asset_id: Master asset DB ID (optional, can be None)
dam_asset_id: DAM asset ID of uploaded derivative (logged but not stored)
filename: Clean filename of derivative
Returns:
dict with success boolean
"""
conn = self.get_connection()
try:
cursor = conn.cursor()
# Extract filename parts
name_without_ext = filename.rsplit('.', 1)[0] if '.' in filename else filename
extension = '.' + filename.rsplit('.', 1)[1] if '.' in filename else ''
# Store derivative with DAM asset ID (tracking_id already links to master)
cursor.execute("""
INSERT INTO derivative_assets (
tracking_id, dam_asset_id, derivative_filename, file_extension, status
) VALUES (%s, %s, %s, %s, 'active')
""", (tracking_id, dam_asset_id, name_without_ext, extension))
conn.commit()
logger.info("Stored derivative asset: {}{} (DAM ID: {})".format(
tracking_id, filename, dam_asset_id
))
return {'success': True}
except Exception as e:
conn.rollback()
logger.error("Failed to store derivative: {}".format(str(e)))
return {'success': False, 'error': str(e)}
finally:
cursor.close()
self.put_connection(conn)
def get_campaign_asset_count(self, campaign_id):
"""Get total master asset count for campaign"""
conn = self.get_connection()
try:
cursor = conn.cursor()
cursor.execute("""
SELECT COUNT(*) FROM master_assets
WHERE campaign_id = %s AND status = 'active'
""", (campaign_id,))
return cursor.fetchone()[0]
finally:
cursor.close()
self.put_connection(conn)
def test_connection(self):
"""Test database connection"""
try:
conn = self.get_connection()
cursor = conn.cursor()
cursor.execute("SELECT 1")
cursor.close()
self.put_connection(conn)
logger.info("Database connection OK")
return True
except Exception as e:
logger.error("Database connection failed: {}".format(str(e)))
return False
def extract_global_campaign_reference(self, asset_data, campaign_id=None):
"""
Extract Global Campaign Reference and Local Campaign ID from asset metadata
Args:
asset_data: Complete DAM asset JSON
campaign_id: Current campaign ID (used as fallback for local_campaign_id)
Returns:
dict with global_master_campaign_id, global_master_folder_id, local_campaign_id
"""
global_master_campaign_id = None
global_master_folder_id = None
local_campaign_id = None
# Look in inherited_metadata_collections
collections = asset_data.get('inherited_metadata_collections', [])
for collection in collections:
# Only check campaign containers
if collection.get('container_type_name') == 'L7+ - CAMPAIGN':
inherited_metadata = collection.get('inherited_metadata_values', [])
campaign_id_in_collection = None
has_global_reference = False
for inherited in inherited_metadata:
metadata_element = inherited.get('metadata_element', {})
# Extract Campaign ID
if metadata_element.get('id') == 'FERRERO.FIELD.CAMPAIGN ID':
campaign_id_value = metadata_element.get('value', {}).get('value', {}).get('value')
if campaign_id_value:
campaign_id_in_collection = campaign_id_value
# Look for GLOBAL CAMPAIGN REFERENCE field
if metadata_element.get('id') == 'FERRERO.FIELD.GLOBAL CAMPAIGN REFERENCE':
value = metadata_element.get('value', {}).get('value', {}).get('value')
if value:
global_master_campaign_id = value
has_global_reference = True
logger.info("Found Global Campaign Reference: {}".format(value))
# If this campaign has a global reference, it's the local campaign
if has_global_reference and campaign_id_in_collection:
local_campaign_id = campaign_id_in_collection
logger.info("Local Campaign ID: {}".format(local_campaign_id))
# Get the container ID (global master folder)
container_id = collection.get('container_id')
if container_id:
global_master_folder_id = container_id
logger.info("Global master folder ID: {}".format(container_id))
# CRITICAL FIX: Always set local_campaign_id to current campaign if not found
# This ensures every asset has a campaign ID, even without Global Campaign Reference
if not local_campaign_id and campaign_id:
local_campaign_id = campaign_id
logger.info("No Global Campaign Reference found, using current campaign ID: {}".format(campaign_id))
return {
'global_master_campaign_id': global_master_campaign_id,
'global_master_folder_id': global_master_folder_id,
'local_campaign_id': local_campaign_id
}
def record_campaign_status(self, campaign_id, campaign_number, campaign_name, live_campaign, status, webhook_sent=False):
"""
Record or update campaign status for webhook tracking
Args:
campaign_id: DAM campaign folder ID
campaign_number: Campaign number (e.g., C000000078)
campaign_name: Campaign name
live_campaign: 'YES' or 'NO'
status: Current status (A2, A4, etc.)
webhook_sent: Whether webhook was sent
Returns:
dict with success boolean
"""
conn = self.get_connection()
try:
cursor = conn.cursor()
# Insert or update campaign status
cursor.execute("""
INSERT INTO campaign_status (
campaign_id, campaign_number, campaign_name,
live_campaign, status, webhook_sent, webhook_sent_at
) VALUES (%s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (campaign_id) DO UPDATE SET
campaign_number = EXCLUDED.campaign_number,
campaign_name = EXCLUDED.campaign_name,
live_campaign = EXCLUDED.live_campaign,
status = EXCLUDED.status,
webhook_sent = EXCLUDED.webhook_sent,
webhook_sent_at = EXCLUDED.webhook_sent_at,
updated_at = CURRENT_TIMESTAMP
""", (
campaign_id,
campaign_number,
campaign_name,
live_campaign,
status,
webhook_sent,
datetime.now(timezone.utc) if webhook_sent else None
))
conn.commit()
logger.info("Recorded campaign status: {} - {} (Live: {})".format(
campaign_number, status, live_campaign
))
return {'success': True}
except Exception as e:
conn.rollback()
logger.error("Failed to record campaign status: {}".format(str(e)))
return {'success': False, 'error': str(e)}
finally:
cursor.close()
self.put_connection(conn)
def check_campaign_processed(self, campaign_id):
"""
Check if campaign has already been processed (webhook sent)
Args:
campaign_id: DAM campaign folder ID
Returns:
dict with exists (bool) and campaign_status data if exists
"""
conn = self.get_connection()
try:
cursor = conn.cursor()
cursor.execute("""
SELECT campaign_id, campaign_number, campaign_name,
live_campaign, status, webhook_sent, webhook_sent_at
FROM campaign_status
WHERE campaign_id = %s
""", (campaign_id,))
row = cursor.fetchone()
if row:
return {
'exists': True,
'campaign_id': row[0],
'campaign_number': row[1],
'campaign_name': row[2],
'live_campaign': row[3],
'status': row[4],
'webhook_sent': row[5],
'webhook_sent_at': row[6]
}
else:
return {'exists': False}
finally:
cursor.close()
self.put_connection(conn)
def store_creativex_score(self, filename, creativex_id, creativex_url, quality_score, box_file_id, full_extraction_data, tracking_id=None, status='active'):
"""
Store CreativeX score data extracted from PDF or DAM metadata
Uses soft delete strategy for 'active' status - marks old records as 'superseded'
For 'master-cx-score' status - no versioning, just stores reference
Args:
filename: Original filename from extraction
creativex_id: CreativeX ID from extraction
creativex_url: CreativeX URL from extraction
quality_score: Quality score percentage
box_file_id: Box file ID for tracking
full_extraction_data: Complete LlamaExtract JSON response or DAM metadata
tracking_id: Optional tracking ID (used for master-cx-score status)
status: 'active' (default), 'master-cx-score', or custom status
Returns:
dict with success boolean and is_update flag
"""
conn = self.get_connection()
try:
cursor = conn.cursor()
# Convert full_extraction_data to JSON string if it's a dict
import json
full_json = json.dumps(full_extraction_data) if isinstance(full_extraction_data, dict) else full_extraction_data
# Handle master-cx-score differently (no versioning, just reference storage)
if status == 'master-cx-score':
# Simple insert for master score reference (no versioning)
cursor.execute("""
INSERT INTO creativex_scores (
filename, creativex_id, creativex_url, quality_score,
box_file_id, full_extraction_data, tracking_id, status
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
""", (
filename,
creativex_id,
creativex_url,
quality_score,
box_file_id,
full_json,
tracking_id,
'master-cx-score'
))
conn.commit()
logger.info("Stored master CreativeX score: {} (Tracking: {}, Score: {})".format(
filename, tracking_id, quality_score
))
return {
'success': True,
'is_update': False,
'version_number': 1
}
# For 'active' status - use soft delete versioning
# Step 1: Check if filename already exists with status='active'
# Also count total versions for this filename
cursor.execute("""
SELECT id, quality_score FROM creativex_scores
WHERE filename = %s AND status = 'active'
""", (filename,))
existing = cursor.fetchone()
# Count total versions (including superseded)
cursor.execute("""
SELECT COUNT(*) FROM creativex_scores
WHERE filename = %s
""", (filename,))
total_versions = cursor.fetchone()[0]
if existing:
# Step 2: Mark existing record(s) as 'superseded'
cursor.execute("""
UPDATE creativex_scores
SET status = 'superseded'
WHERE filename = %s AND status = 'active'
""", (filename,))
logger.info("Superseded previous CreativeX score for: {} (old score: {})".format(
filename, existing[1]
))
# Step 3: Insert new 'active' record
cursor.execute("""
INSERT INTO creativex_scores (
filename, creativex_id, creativex_url, quality_score,
box_file_id, full_extraction_data, tracking_id, status
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
""", (
filename,
creativex_id,
creativex_url,
quality_score,
box_file_id,
full_json,
tracking_id,
status
))
conn.commit()
# New version number is total_versions + 1
version_number = total_versions + 1
if existing:
logger.info("Updated CreativeX score: {} (Score: {} -> {}, Version: {})".format(
filename, existing[1], quality_score, version_number
))
else:
logger.info("Stored new CreativeX score: {} (Score: {}, Version: {})".format(
filename, quality_score, version_number
))
return {
'success': True,
'is_update': bool(existing),
'version_number': version_number
}
except Exception as e:
conn.rollback()
logger.error("Failed to store CreativeX score: {}".format(str(e)))
return {'success': False, 'error': str(e)}
finally:
cursor.close()
self.put_connection(conn)
def get_creativex_score_by_filename(self, filename):
"""
Get CreativeX score data by filename
Performs extension-agnostic lookup: if exact filename not found,
tries common video/image extensions (.mp4, .jpg, .png, .mov, etc.)
Args:
filename: Filename to search for
Returns:
dict with creativex data or None if not found
"""
import os
conn = self.get_connection()
try:
cursor = conn.cursor()
# Try exact filename first
cursor.execute("""
SELECT filename, creativex_id, creativex_url, quality_score,
box_file_id, full_extraction_data, extracted_at
FROM creativex_scores
WHERE filename = %s AND status = 'active'
ORDER BY extracted_at DESC
LIMIT 1
""", (filename,))
row = cursor.fetchone()
# If not found, try alternative extensions
if not row:
base_name, ext = os.path.splitext(filename)
# Common extensions for video and image files
alt_extensions = ['.mp4', '.jpg', '.jpeg', '.png', '.mov', '.avi', '.gif', '.webp']
# Remove the current extension from alternatives to avoid duplicate search
alt_extensions = [e for e in alt_extensions if e.lower() != ext.lower()]
for alt_ext in alt_extensions:
alt_filename = base_name + alt_ext
cursor.execute("""
SELECT filename, creativex_id, creativex_url, quality_score,
box_file_id, full_extraction_data, extracted_at
FROM creativex_scores
WHERE filename = %s AND status = 'active'
ORDER BY extracted_at DESC
LIMIT 1
""", (alt_filename,))
row = cursor.fetchone()
if row:
break # Found with alternative extension
if not row:
return None
# Parse JSONB as dict
import json
full_data = row[5] if isinstance(row[5], dict) else json.loads(row[5])
return {
'filename': row[0],
'creativex_id': row[1],
'creativex_url': row[2],
'quality_score': row[3],
'box_file_id': row[4],
'full_extraction_data': full_data,
'extracted_at': row[6]
}
finally:
cursor.close()
self.put_connection(conn)
def get_all_live_campaigns(self):
"""
Get all live campaigns for CSV report
Returns:
list of dicts with campaign_number, campaign_name
"""
conn = self.get_connection()
try:
cursor = conn.cursor()
cursor.execute("""
SELECT campaign_number, campaign_name
FROM campaign_status
WHERE live_campaign = 'YES'
ORDER BY campaign_number DESC
""")
rows = cursor.fetchall()
campaigns = []
for row in rows:
campaigns.append({
'campaign_number': row[0],
'campaign_name': row[1]
})
return campaigns
finally:
cursor.close()
self.put_connection(conn)
def close(self):
"""Close all connections in pool"""
if self.pool:
self.pool.closeall()