MAJOR MILESTONE: Complete Python automation system created! Components Implemented: ✅ Box Client (box_client.py) - JWT authentication via boxsdk - Upload with tracking ID suffix - Download files - Campaign folder creation - Connection testing ✅ Database Client (database.py) - PostgreSQL connection pooling - generate_unique_tracking_id() - store_master_asset() with full_metadata JSONB - get_master_asset(tracking_id) - check_campaign_upload_complete() - ALL-DONE CHECK! - store_derivative_asset() - Connection testing ✅ Filename Parser (filename_parser.py) - V2 naming convention parser (ported from PHP) - parse_filename() - 10 components - strip_upload_components() - Remove Job# and Tracking ID - Strict validation with detailed errors ✅ Metadata Extractor MVP (metadata_extractor_mvp.py) - Extract 28 MVP fields from master - Update fields from V2 filename (Description, Language, State) - Add missing fields with defaults - Build asset representation for upload ✅ Notifier (notifier.py) - Mailgun email integration - Outgoing webhook sender - Email templates (success, error, partial, critical) - Configurable recipients Main Scripts: ✅ A1→A2 Download (a1_to_a2_download.py) - Poll DAM every 5 minutes for A1 campaigns - Download all master assets - Upload to Box with tracking IDs - Store in DB with full metadata - ALL-DONE CHECK before status update - Update A1→A2 only if all assets successful - Send webhook with campaign ID/number - Email notifications ✅ A2→A3 Upload (a2_to_a3_upload.py) - Flask webhook receiver for Box uploads - Signature validation - Async task queue processing - Parse V2 filenames - Load master metadata - Extract MVP fields - Upload to DAM - ALL-DONE CHECK for campaign - Update A2→A3 when all assets uploaded - Send webhook notifications ✅ Test Connection Script (test_connection.py) - Verify DAM, Box, Database connectivity - Quick health check ✅ README.md - Complete setup guide - Usage instructions - Configuration examples - Troubleshooting Key Features: - Python 3.6+ compatible (server requirement) - Virtual environment isolated - Configuration-driven (YAML files) - Easy field updates (no code changes) - Environment switching (staging/production) - Comprehensive error handling - Email + webhook notifications - Retry logic - All-done checks before status updates - Campaign webhook notifications Ready for testing locally with Python 3.10! 🤖 Generated with Claude Code Co-Authored-By: Claude <noreply@anthropic.com>
315 lines
9.9 KiB
Python
315 lines
9.9 KiB
Python
"""
|
|
Database Client - PostgreSQL Operations
|
|
Handles tracking IDs, master assets, and all-done checks
|
|
Compatible with Python 3.6+
|
|
"""
|
|
|
|
import psycopg2
|
|
import psycopg2.pool
|
|
import json
|
|
import random
|
|
import string
|
|
import logging
|
|
|
|
logger = logging.getLogger('Database')
|
|
|
|
class Database:
|
|
def __init__(self, config):
|
|
self.config = config['database']
|
|
|
|
# Create connection pool
|
|
try:
|
|
self.pool = psycopg2.pool.ThreadedConnectionPool(
|
|
minconn=1,
|
|
maxconn=10,
|
|
host=self.config['host'],
|
|
port=self.config['port'],
|
|
database=self.config['database'],
|
|
user=self.config['user'],
|
|
password=self.config['password']
|
|
)
|
|
logger.info("Database connection pool created")
|
|
except Exception as e:
|
|
logger.error("Database connection failed: {}".format(str(e)))
|
|
raise
|
|
|
|
def get_connection(self):
|
|
"""Get connection from pool"""
|
|
return self.pool.getconn()
|
|
|
|
def put_connection(self, conn):
|
|
"""Return connection to pool"""
|
|
self.pool.putconn(conn)
|
|
|
|
def generate_unique_tracking_id(self):
|
|
"""
|
|
Generate unique 6-character tracking ID
|
|
|
|
Returns:
|
|
str: 6-character alphanumeric ID
|
|
"""
|
|
conn = self.get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
|
|
for attempt in range(100):
|
|
# Generate random 6-char ID
|
|
tracking_id = ''.join(random.choices(
|
|
string.ascii_letters + string.digits, k=6
|
|
))
|
|
|
|
# Check uniqueness
|
|
cursor.execute(
|
|
"SELECT COUNT(*) FROM master_assets WHERE tracking_id = %s",
|
|
(tracking_id,)
|
|
)
|
|
|
|
count = cursor.fetchone()[0]
|
|
|
|
if count == 0:
|
|
logger.info("Generated tracking ID: {}".format(tracking_id))
|
|
return tracking_id
|
|
|
|
raise Exception("Failed to generate unique tracking ID after 100 attempts")
|
|
|
|
finally:
|
|
cursor.close()
|
|
self.put_connection(conn)
|
|
|
|
def store_master_asset(self, tracking_id, opentext_id, asset_data, box_file_id, box_url, upload_folder_id):
|
|
"""
|
|
Store master asset with FULL metadata in JSONB column
|
|
|
|
Args:
|
|
tracking_id: 6-char tracking ID
|
|
opentext_id: DAM asset ID
|
|
asset_data: Complete DAM asset JSON
|
|
box_file_id: Box file ID
|
|
box_url: Box URL
|
|
upload_folder_id: Final Assets folder ID for upload
|
|
|
|
Returns:
|
|
dict with success boolean
|
|
"""
|
|
conn = self.get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
|
|
# Extract basic info
|
|
name = asset_data.get('name', 'unknown')
|
|
name_parts = name.rsplit('.', 1)
|
|
filename = name_parts[0]
|
|
extension = '.' + name_parts[1] if len(name_parts) > 1 else ''
|
|
|
|
# Store complete metadata as JSONB
|
|
full_metadata_json = json.dumps(asset_data)
|
|
|
|
# Description with Box info
|
|
description = "Box File ID: {}\nBox URL: {}\nDAM Asset ID: {}".format(
|
|
box_file_id, box_url, opentext_id
|
|
)
|
|
|
|
# Insert or update
|
|
cursor.execute("""
|
|
INSERT INTO master_assets (
|
|
tracking_id, opentext_id, original_filename, file_extension,
|
|
file_size_bytes, mime_type, upload_directory,
|
|
description, full_metadata, status
|
|
) VALUES (
|
|
%s, %s, %s, %s, %s, %s, %s, %s, %s, 'active'
|
|
)
|
|
ON CONFLICT (tracking_id) DO UPDATE SET
|
|
upload_directory = EXCLUDED.upload_directory,
|
|
description = EXCLUDED.description,
|
|
full_metadata = EXCLUDED.full_metadata,
|
|
updated_at = CURRENT_TIMESTAMP
|
|
""", (
|
|
tracking_id,
|
|
opentext_id,
|
|
filename,
|
|
extension,
|
|
asset_data.get('file_size'),
|
|
asset_data.get('mime_type'),
|
|
upload_folder_id,
|
|
description,
|
|
full_metadata_json
|
|
))
|
|
|
|
conn.commit()
|
|
logger.info("Stored master asset: {}".format(tracking_id))
|
|
|
|
return {'success': True, 'tracking_id': tracking_id}
|
|
|
|
except Exception as e:
|
|
conn.rollback()
|
|
logger.error("Failed to store master asset: {}".format(str(e)))
|
|
return {'success': False, 'error': str(e)}
|
|
|
|
finally:
|
|
cursor.close()
|
|
self.put_connection(conn)
|
|
|
|
def get_master_asset(self, tracking_id):
|
|
"""
|
|
Get master asset by tracking ID
|
|
|
|
Returns:
|
|
dict with tracking_id, opentext_id, upload_directory, full_metadata
|
|
"""
|
|
conn = self.get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("""
|
|
SELECT tracking_id, opentext_id, upload_directory, full_metadata, description
|
|
FROM master_assets
|
|
WHERE tracking_id = %s AND status = 'active'
|
|
""", (tracking_id,))
|
|
|
|
row = cursor.fetchone()
|
|
|
|
if not row:
|
|
return None
|
|
|
|
# Parse JSONB as dict
|
|
full_metadata = row[3] if isinstance(row[3], dict) else json.loads(row[3])
|
|
|
|
return {
|
|
'tracking_id': row[0],
|
|
'opentext_id': row[1],
|
|
'upload_directory': row[2],
|
|
'full_metadata': full_metadata,
|
|
'description': row[4]
|
|
}
|
|
|
|
finally:
|
|
cursor.close()
|
|
self.put_connection(conn)
|
|
|
|
def check_campaign_upload_complete(self, campaign_id):
|
|
"""
|
|
Check if ALL master assets for a campaign have been uploaded
|
|
|
|
Args:
|
|
campaign_id: Campaign ID
|
|
|
|
Returns:
|
|
bool: True if all assets uploaded, False otherwise
|
|
"""
|
|
conn = self.get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
|
|
# Count total master assets for this campaign
|
|
cursor.execute("""
|
|
SELECT COUNT(DISTINCT tracking_id)
|
|
FROM master_assets
|
|
WHERE campaign_id = %s AND status = 'active'
|
|
""", (campaign_id,))
|
|
|
|
total_masters = cursor.fetchone()[0]
|
|
|
|
if total_masters == 0:
|
|
return False
|
|
|
|
# Count how many have been uploaded (exist in derivative_assets)
|
|
cursor.execute("""
|
|
SELECT COUNT(DISTINCT ma.tracking_id)
|
|
FROM master_assets ma
|
|
INNER JOIN derivative_assets da ON ma.tracking_id = da.tracking_id
|
|
WHERE ma.campaign_id = %s AND ma.status = 'active'
|
|
AND da.upload_status = 'completed'
|
|
""", (campaign_id,))
|
|
|
|
uploaded_count = cursor.fetchone()[0]
|
|
|
|
all_done = uploaded_count == total_masters
|
|
|
|
logger.info("Campaign {} upload status: {}/{} assets uploaded{}".format(
|
|
campaign_id, uploaded_count, total_masters,
|
|
" - ALL DONE" if all_done else ""
|
|
))
|
|
|
|
return all_done
|
|
|
|
finally:
|
|
cursor.close()
|
|
self.put_connection(conn)
|
|
|
|
def store_derivative_asset(self, tracking_id, master_asset_id, dam_asset_id, filename):
|
|
"""
|
|
Store derivative asset record after upload
|
|
|
|
Args:
|
|
tracking_id: Master asset tracking ID
|
|
master_asset_id: Master asset DB ID
|
|
dam_asset_id: DAM asset ID of uploaded derivative
|
|
filename: Clean filename of derivative
|
|
|
|
Returns:
|
|
dict with success boolean
|
|
"""
|
|
conn = self.get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("""
|
|
INSERT INTO derivative_assets (
|
|
tracking_id, master_asset_id, dam_asset_id,
|
|
derivative_filename, upload_status
|
|
) VALUES (%s, %s, %s, %s, 'completed')
|
|
ON CONFLICT (tracking_id, derivative_filename) DO UPDATE SET
|
|
dam_asset_id = EXCLUDED.dam_asset_id,
|
|
uploaded_at = CURRENT_TIMESTAMP,
|
|
upload_status = 'completed'
|
|
""", (tracking_id, master_asset_id, dam_asset_id, filename))
|
|
|
|
conn.commit()
|
|
logger.info("Stored derivative asset: {} → {}".format(tracking_id, dam_asset_id))
|
|
|
|
return {'success': True}
|
|
|
|
except Exception as e:
|
|
conn.rollback()
|
|
logger.error("Failed to store derivative: {}".format(str(e)))
|
|
return {'success': False, 'error': str(e)}
|
|
|
|
finally:
|
|
cursor.close()
|
|
self.put_connection(conn)
|
|
|
|
def get_campaign_asset_count(self, campaign_id):
|
|
"""Get total master asset count for campaign"""
|
|
conn = self.get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("""
|
|
SELECT COUNT(*) FROM master_assets
|
|
WHERE campaign_id = %s AND status = 'active'
|
|
""", (campaign_id,))
|
|
|
|
return cursor.fetchone()[0]
|
|
|
|
finally:
|
|
cursor.close()
|
|
self.put_connection(conn)
|
|
|
|
def test_connection(self):
|
|
"""Test database connection"""
|
|
try:
|
|
conn = self.get_connection()
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT 1")
|
|
cursor.close()
|
|
self.put_connection(conn)
|
|
logger.info("Database connection OK")
|
|
return True
|
|
except Exception as e:
|
|
logger.error("Database connection failed: {}".format(str(e)))
|
|
return False
|
|
|
|
def close(self):
|
|
"""Close all connections in pool"""
|
|
if self.pool:
|
|
self.pool.closeall()
|