""" Database Client - PostgreSQL Operations Handles tracking IDs, master assets, and all-done checks Compatible with Python 3.6+ """ import psycopg2 import psycopg2.pool import json import random import string import logging from datetime import datetime, timezone logger = logging.getLogger('Database') class Database: def __init__(self, config): self.config = config['database'] # Create connection pool try: self.pool = psycopg2.pool.ThreadedConnectionPool( minconn=1, maxconn=10, host=self.config['host'], port=self.config['port'], database=self.config['database'], user=self.config['user'], password=self.config['password'] ) logger.info("Database connection pool created") except Exception as e: logger.error("Database connection failed: {}".format(str(e))) raise def get_connection(self): """Get connection from pool""" return self.pool.getconn() def put_connection(self, conn): """Return connection to pool""" self.pool.putconn(conn) def generate_unique_tracking_id(self): """ Generate unique 6-character tracking ID Returns: str: 6-character alphanumeric ID """ conn = self.get_connection() try: cursor = conn.cursor() for attempt in range(100): # Generate random 6-char ID tracking_id = ''.join(random.choices( string.ascii_letters + string.digits, k=6 )) # Check uniqueness cursor.execute( "SELECT COUNT(*) FROM master_assets WHERE tracking_id = %s", (tracking_id,) ) count = cursor.fetchone()[0] if count == 0: logger.info("Generated tracking ID: {}".format(tracking_id)) return tracking_id raise Exception("Failed to generate unique tracking ID after 100 attempts") finally: cursor.close() self.put_connection(conn) def find_or_create_tracking_id(self, opentext_id, local_campaign_id): """ Find existing tracking ID by opentext_id + local_campaign_id, or generate new one This is used for A5→A6 rework workflow where assets may already exist from A1→A2. We search by both opentext_id AND local_campaign_id to ensure we find the exact asset from the original workflow for this specific campaign. Args: opentext_id: DAM asset ID local_campaign_id: Local campaign ID (e.g., C000000078) Returns: dict with tracking_id (str) and is_existing (bool) """ conn = self.get_connection() try: cursor = conn.cursor() # Search for existing asset by opentext_id AND local_campaign_id cursor.execute(""" SELECT tracking_id FROM master_assets WHERE opentext_id = %s AND local_campaign_id = %s AND status = 'active' LIMIT 1 """, (opentext_id, local_campaign_id)) row = cursor.fetchone() if row: # Found existing asset tracking_id = row[0] logger.info("Found existing asset: opentext_id={}, local_campaign_id={}, tracking_id={}".format( opentext_id, local_campaign_id, tracking_id )) return { 'tracking_id': tracking_id, 'is_existing': True } else: # No existing asset found, generate new tracking ID logger.info("No existing asset found for opentext_id={}, local_campaign_id={}, generating new tracking ID".format( opentext_id, local_campaign_id )) tracking_id = self.generate_unique_tracking_id() return { 'tracking_id': tracking_id, 'is_existing': False } finally: cursor.close() self.put_connection(conn) def store_master_asset(self, tracking_id, opentext_id, asset_data, box_file_id, box_url, upload_folder_id, global_master_campaign_id=None, global_master_folder_id=None, local_campaign_id=None): """ Store master asset with FULL metadata in JSONB column Args: tracking_id: 6-char tracking ID opentext_id: DAM asset ID asset_data: Complete DAM asset JSON box_file_id: Box file ID box_url: Box URL upload_folder_id: Final Assets folder ID for upload global_master_campaign_id: Global master campaign ID (from GLOBAL CAMPAIGN REFERENCE) global_master_folder_id: Global master folder ID local_campaign_id: Local campaign ID (immediate campaign this asset belongs to) Returns: dict with success boolean """ conn = self.get_connection() try: cursor = conn.cursor() # Extract basic info name = asset_data.get('name', 'unknown') name_parts = name.rsplit('.', 1) filename = name_parts[0] extension = '.' + name_parts[1] if len(name_parts) > 1 else '' # Store complete metadata as JSONB full_metadata_json = json.dumps(asset_data) # Description with Box info description = "Box File ID: {}\nBox URL: {}\nDAM Asset ID: {}".format( box_file_id, box_url, opentext_id ) # Insert or update cursor.execute(""" INSERT INTO master_assets ( tracking_id, opentext_id, original_filename, file_extension, file_size_bytes, mime_type, upload_directory, description, full_metadata, status, global_master_campaign_id, global_master_folder_id, local_campaign_id ) VALUES ( %s, %s, %s, %s, %s, %s, %s, %s, %s, 'active', %s, %s, %s ) ON CONFLICT (tracking_id) DO UPDATE SET upload_directory = EXCLUDED.upload_directory, description = EXCLUDED.description, full_metadata = EXCLUDED.full_metadata, global_master_campaign_id = EXCLUDED.global_master_campaign_id, global_master_folder_id = EXCLUDED.global_master_folder_id, local_campaign_id = EXCLUDED.local_campaign_id, updated_at = CURRENT_TIMESTAMP """, ( tracking_id, opentext_id, filename, extension, asset_data.get('file_size'), asset_data.get('mime_type'), upload_folder_id, description, full_metadata_json, global_master_campaign_id, global_master_folder_id, local_campaign_id )) conn.commit() logger.info("Stored master asset: {}".format(tracking_id)) return {'success': True, 'tracking_id': tracking_id} except Exception as e: conn.rollback() logger.error("Failed to store master asset: {}".format(str(e))) return {'success': False, 'error': str(e)} finally: cursor.close() self.put_connection(conn) def get_master_asset(self, tracking_id): """ Get master asset by tracking ID Returns: dict with tracking_id, opentext_id, upload_directory, full_metadata """ conn = self.get_connection() try: cursor = conn.cursor() cursor.execute(""" SELECT tracking_id, opentext_id, upload_directory, full_metadata, description FROM master_assets WHERE tracking_id = %s AND status = 'active' """, (tracking_id,)) row = cursor.fetchone() if not row: return None # Parse JSONB as dict full_metadata = row[3] if isinstance(row[3], dict) else json.loads(row[3]) return { 'tracking_id': row[0], 'opentext_id': row[1], 'upload_directory': row[2], 'full_metadata': full_metadata, 'description': row[4] } finally: cursor.close() self.put_connection(conn) def check_campaign_upload_complete(self, campaign_id): """ Check if ALL master assets for a campaign have been uploaded Args: campaign_id: Campaign ID Returns: bool: True if all assets uploaded, False otherwise """ conn = self.get_connection() try: cursor = conn.cursor() # Count total master assets for this campaign cursor.execute(""" SELECT COUNT(DISTINCT tracking_id) FROM master_assets WHERE campaign_id = %s AND status = 'active' """, (campaign_id,)) total_masters = cursor.fetchone()[0] if total_masters == 0: return False # Count how many have been uploaded (exist in derivative_assets) cursor.execute(""" SELECT COUNT(DISTINCT ma.tracking_id) FROM master_assets ma INNER JOIN derivative_assets da ON ma.tracking_id = da.tracking_id WHERE ma.campaign_id = %s AND ma.status = 'active' AND da.upload_status = 'completed' """, (campaign_id,)) uploaded_count = cursor.fetchone()[0] all_done = uploaded_count == total_masters logger.info("Campaign {} upload status: {}/{} assets uploaded{}".format( campaign_id, uploaded_count, total_masters, " - ALL DONE" if all_done else "" )) return all_done finally: cursor.close() self.put_connection(conn) def store_derivative_asset(self, tracking_id, master_asset_id, dam_asset_id, filename): """ Store derivative asset record after upload Uses existing schema columns only Args: tracking_id: Master asset tracking ID master_asset_id: Master asset DB ID (optional, can be None) dam_asset_id: DAM asset ID of uploaded derivative (logged but not stored) filename: Clean filename of derivative Returns: dict with success boolean """ conn = self.get_connection() try: cursor = conn.cursor() # Extract filename parts name_without_ext = filename.rsplit('.', 1)[0] if '.' in filename else filename extension = '.' + filename.rsplit('.', 1)[1] if '.' in filename else '' # Simple insert - just track that this tracking_id has a derivative cursor.execute(""" INSERT INTO derivative_assets ( tracking_id, derivative_filename, file_extension, status ) VALUES (%s, %s, %s, 'active') """, (tracking_id, name_without_ext, extension)) conn.commit() logger.info("Stored derivative asset: {} → {}".format(tracking_id, filename)) return {'success': True} except Exception as e: conn.rollback() logger.error("Failed to store derivative: {}".format(str(e))) return {'success': False, 'error': str(e)} finally: cursor.close() self.put_connection(conn) def get_campaign_asset_count(self, campaign_id): """Get total master asset count for campaign""" conn = self.get_connection() try: cursor = conn.cursor() cursor.execute(""" SELECT COUNT(*) FROM master_assets WHERE campaign_id = %s AND status = 'active' """, (campaign_id,)) return cursor.fetchone()[0] finally: cursor.close() self.put_connection(conn) def test_connection(self): """Test database connection""" try: conn = self.get_connection() cursor = conn.cursor() cursor.execute("SELECT 1") cursor.close() self.put_connection(conn) logger.info("Database connection OK") return True except Exception as e: logger.error("Database connection failed: {}".format(str(e))) return False def extract_global_campaign_reference(self, asset_data): """ Extract Global Campaign Reference and Local Campaign ID from asset metadata Args: asset_data: Complete DAM asset JSON Returns: dict with global_master_campaign_id, global_master_folder_id, local_campaign_id """ global_master_campaign_id = None global_master_folder_id = None local_campaign_id = None # Look in inherited_metadata_collections collections = asset_data.get('inherited_metadata_collections', []) for collection in collections: # Only check campaign containers if collection.get('container_type_name') == 'L7+ - CAMPAIGN': inherited_metadata = collection.get('inherited_metadata_values', []) campaign_id_in_collection = None has_global_reference = False for inherited in inherited_metadata: metadata_element = inherited.get('metadata_element', {}) # Extract Campaign ID if metadata_element.get('id') == 'FERRERO.FIELD.CAMPAIGN ID': campaign_id_value = metadata_element.get('value', {}).get('value', {}).get('value') if campaign_id_value: campaign_id_in_collection = campaign_id_value # Look for GLOBAL CAMPAIGN REFERENCE field if metadata_element.get('id') == 'FERRERO.FIELD.GLOBAL CAMPAIGN REFERENCE': value = metadata_element.get('value', {}).get('value', {}).get('value') if value: global_master_campaign_id = value has_global_reference = True logger.info("Found Global Campaign Reference: {}".format(value)) # If this campaign has a global reference, it's the local campaign if has_global_reference and campaign_id_in_collection: local_campaign_id = campaign_id_in_collection logger.info("Local Campaign ID: {}".format(local_campaign_id)) # Get the container ID (global master folder) container_id = collection.get('container_id') if container_id: global_master_folder_id = container_id logger.info("Global master folder ID: {}".format(container_id)) return { 'global_master_campaign_id': global_master_campaign_id, 'global_master_folder_id': global_master_folder_id, 'local_campaign_id': local_campaign_id } def record_campaign_status(self, campaign_id, campaign_number, campaign_name, live_campaign, status, webhook_sent=False): """ Record or update campaign status for webhook tracking Args: campaign_id: DAM campaign folder ID campaign_number: Campaign number (e.g., C000000078) campaign_name: Campaign name live_campaign: 'YES' or 'NO' status: Current status (A2, A4, etc.) webhook_sent: Whether webhook was sent Returns: dict with success boolean """ conn = self.get_connection() try: cursor = conn.cursor() # Insert or update campaign status cursor.execute(""" INSERT INTO campaign_status ( campaign_id, campaign_number, campaign_name, live_campaign, status, webhook_sent, webhook_sent_at ) VALUES (%s, %s, %s, %s, %s, %s, %s) ON CONFLICT (campaign_id) DO UPDATE SET campaign_number = EXCLUDED.campaign_number, campaign_name = EXCLUDED.campaign_name, live_campaign = EXCLUDED.live_campaign, status = EXCLUDED.status, webhook_sent = EXCLUDED.webhook_sent, webhook_sent_at = EXCLUDED.webhook_sent_at, updated_at = CURRENT_TIMESTAMP """, ( campaign_id, campaign_number, campaign_name, live_campaign, status, webhook_sent, datetime.now(timezone.utc) if webhook_sent else None )) conn.commit() logger.info("Recorded campaign status: {} - {} (Live: {})".format( campaign_number, status, live_campaign )) return {'success': True} except Exception as e: conn.rollback() logger.error("Failed to record campaign status: {}".format(str(e))) return {'success': False, 'error': str(e)} finally: cursor.close() self.put_connection(conn) def check_campaign_processed(self, campaign_id): """ Check if campaign has already been processed (webhook sent) Args: campaign_id: DAM campaign folder ID Returns: dict with exists (bool) and campaign_status data if exists """ conn = self.get_connection() try: cursor = conn.cursor() cursor.execute(""" SELECT campaign_id, campaign_number, campaign_name, live_campaign, status, webhook_sent, webhook_sent_at FROM campaign_status WHERE campaign_id = %s """, (campaign_id,)) row = cursor.fetchone() if row: return { 'exists': True, 'campaign_id': row[0], 'campaign_number': row[1], 'campaign_name': row[2], 'live_campaign': row[3], 'status': row[4], 'webhook_sent': row[5], 'webhook_sent_at': row[6] } else: return {'exists': False} finally: cursor.close() self.put_connection(conn) def store_creativex_score(self, filename, creativex_id, creativex_url, quality_score, box_file_id, full_extraction_data, tracking_id=None, status='active'): """ Store CreativeX score data extracted from PDF or DAM metadata Uses soft delete strategy for 'active' status - marks old records as 'superseded' For 'master-cx-score' status - no versioning, just stores reference Args: filename: Original filename from extraction creativex_id: CreativeX ID from extraction creativex_url: CreativeX URL from extraction quality_score: Quality score percentage box_file_id: Box file ID for tracking full_extraction_data: Complete LlamaExtract JSON response or DAM metadata tracking_id: Optional tracking ID (used for master-cx-score status) status: 'active' (default), 'master-cx-score', or custom status Returns: dict with success boolean and is_update flag """ conn = self.get_connection() try: cursor = conn.cursor() # Convert full_extraction_data to JSON string if it's a dict import json full_json = json.dumps(full_extraction_data) if isinstance(full_extraction_data, dict) else full_extraction_data # Handle master-cx-score differently (no versioning, just reference storage) if status == 'master-cx-score': # Simple insert for master score reference (no versioning) cursor.execute(""" INSERT INTO creativex_scores ( filename, creativex_id, creativex_url, quality_score, box_file_id, full_extraction_data, tracking_id, status ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s) """, ( filename, creativex_id, creativex_url, quality_score, box_file_id, full_json, tracking_id, 'master-cx-score' )) conn.commit() logger.info("Stored master CreativeX score: {} (Tracking: {}, Score: {})".format( filename, tracking_id, quality_score )) return { 'success': True, 'is_update': False, 'version_number': 1 } # For 'active' status - use soft delete versioning # Step 1: Check if filename already exists with status='active' # Also count total versions for this filename cursor.execute(""" SELECT id, quality_score FROM creativex_scores WHERE filename = %s AND status = 'active' """, (filename,)) existing = cursor.fetchone() # Count total versions (including superseded) cursor.execute(""" SELECT COUNT(*) FROM creativex_scores WHERE filename = %s """, (filename,)) total_versions = cursor.fetchone()[0] if existing: # Step 2: Mark existing record(s) as 'superseded' cursor.execute(""" UPDATE creativex_scores SET status = 'superseded' WHERE filename = %s AND status = 'active' """, (filename,)) logger.info("Superseded previous CreativeX score for: {} (old score: {})".format( filename, existing[1] )) # Step 3: Insert new 'active' record cursor.execute(""" INSERT INTO creativex_scores ( filename, creativex_id, creativex_url, quality_score, box_file_id, full_extraction_data, tracking_id, status ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s) """, ( filename, creativex_id, creativex_url, quality_score, box_file_id, full_json, tracking_id, status )) conn.commit() # New version number is total_versions + 1 version_number = total_versions + 1 if existing: logger.info("Updated CreativeX score: {} (Score: {} -> {}, Version: {})".format( filename, existing[1], quality_score, version_number )) else: logger.info("Stored new CreativeX score: {} (Score: {}, Version: {})".format( filename, quality_score, version_number )) return { 'success': True, 'is_update': bool(existing), 'version_number': version_number } except Exception as e: conn.rollback() logger.error("Failed to store CreativeX score: {}".format(str(e))) return {'success': False, 'error': str(e)} finally: cursor.close() self.put_connection(conn) def get_creativex_score_by_filename(self, filename): """ Get CreativeX score data by filename Args: filename: Filename to search for Returns: dict with creativex data or None if not found """ conn = self.get_connection() try: cursor = conn.cursor() cursor.execute(""" SELECT filename, creativex_id, creativex_url, quality_score, box_file_id, full_extraction_data, extracted_at FROM creativex_scores WHERE filename = %s AND status = 'active' ORDER BY extracted_at DESC LIMIT 1 """, (filename,)) row = cursor.fetchone() if not row: return None # Parse JSONB as dict import json full_data = row[5] if isinstance(row[5], dict) else json.loads(row[5]) return { 'filename': row[0], 'creativex_id': row[1], 'creativex_url': row[2], 'quality_score': row[3], 'box_file_id': row[4], 'full_extraction_data': full_data, 'extracted_at': row[6] } finally: cursor.close() self.put_connection(conn) def get_all_live_campaigns(self): """ Get all live campaigns for CSV report Returns: list of dicts with campaign_number, campaign_name """ conn = self.get_connection() try: cursor = conn.cursor() cursor.execute(""" SELECT campaign_number, campaign_name FROM campaign_status WHERE live_campaign = 'YES' ORDER BY campaign_number DESC """) rows = cursor.fetchall() campaigns = [] for row in rows: campaigns.append({ 'campaign_number': row[0], 'campaign_name': row[1] }) return campaigns finally: cursor.close() self.put_connection(conn) def close(self): """Close all connections in pool""" if self.pool: self.pool.closeall()