loreal-video-optimizer/backend/box_processor.py
2026-02-24 15:04:26 +05:30

426 lines
16 KiB
Python

"""
Box.com video processing classes.
BoxProcessor and BoxPoller are imported and used directly by app.py.
This module no longer runs as a standalone service — all Box endpoints
live in app.py on port 5000.
To run the Box automation:
python app.py # starts web UI + Box in one process
To test:
python test_box_processor.py # targets http://localhost:5000
python box_setup.py # verifies Box credentials and folders
"""
import os
import json
import time
import hmac
import hashlib
import tempfile
import shutil
import threading
from datetime import datetime
from typing import Optional, Dict, Tuple, Set
from box_client import BoxClient
from video_processor import VideoProcessor
from conversion_logger import ConversionLogger
from platform_specs import (
detect_platform_from_filename,
detect_aspect_ratio_from_filename,
get_platform_info
)
def verify_box_signature(secret: str, body: bytes, primary: str, secondary: str = '') -> bool:
"""
Verify Box webhook HMAC-SHA256 signature.
Box sends both Box-Signature-Primary and Box-Signature-Secondary headers.
A valid signature on either is sufficient.
Returns True (skip verification) if secret is empty.
"""
if not secret:
return True
expected = hmac.new(secret.encode(), body, hashlib.sha256).hexdigest()
if primary and hmac.compare_digest(expected, primary):
return True
if secondary and hmac.compare_digest(expected, secondary):
return True
return False
class BoxProcessor:
"""Main Box video processor — 8-step pipeline per file"""
def __init__(self, box_config_path: str, conversion_logs_folder: str, as_user_id: str = ''):
self.box_client = BoxClient(box_config_path, as_user_id)
self.conversion_logger = ConversionLogger(conversion_logs_folder)
self.folders = {}
def initialize(self, video_optimizer_folder_id: str) -> bool:
"""Authenticate with Box and discover IN / OUT_SUCCESS / OUT_FAILED folders"""
print("=" * 70)
print("BOX AUTOMATION - INITIALIZING")
print("=" * 70)
if not video_optimizer_folder_id:
print("✗ BOX_VIDEO_OPTIMIZER_FOLDER_ID not set in .env")
return False
if not self.box_client.authenticate():
return False
self.folders = self.box_client.discover_folders(video_optimizer_folder_id)
required = ['IN', 'OUT_SUCCESS', 'OUT_FAILED']
missing = [f for f in required if f not in self.folders]
if missing:
print(f"✗ Missing required Box folders: {missing}")
return False
print("=" * 70)
print("✓ Box automation ready")
print(f" IN folder: {self.folders['IN']}")
print(f" OUT_SUCCESS folder: {self.folders['OUT_SUCCESS']}")
print(f" OUT_FAILED folder: {self.folders['OUT_FAILED']}")
print("=" * 70)
return True
def validate_filename(self, filename: str) -> Tuple[bool, Optional[str], Optional[str], Optional[str]]:
"""
Validate filename contains detectable platform and aspect ratio.
Returns (valid, platform, aspect_ratio, error_message)
"""
platform = detect_platform_from_filename(filename)
aspect_ratio = detect_aspect_ratio_from_filename(filename)
if platform is None:
return (False, None, None,
"No platform detected. Expected patterns like _tiktok_, _meta_, _yt_, etc.")
if aspect_ratio is None:
return (False, platform, None,
"No aspect ratio detected. Expected patterns like _9x16_, _16x9_, _1x1_, etc.")
platform_info = get_platform_info(platform)
if not platform_info:
return (False, platform, aspect_ratio, f"Invalid platform: {platform}")
supported_ratios = [fmt['ratio'] for fmt in platform_info['formats']]
if aspect_ratio not in supported_ratios:
return (False, platform, aspect_ratio,
f"Platform '{platform}' does not support {aspect_ratio}. "
f"Supported: {', '.join(supported_ratios)}")
return (True, platform, aspect_ratio, None)
def process_file(self, file_id: str, filename: str) -> Dict:
"""
Main 8-step processing pipeline:
1. Validate filename
2. Download from Box IN
3. Convert with FFmpeg
4. Generate JSON report
5. Upload video to OUT_SUCCESS
6. Upload report to OUT_SUCCESS
7. Delete original from IN
8. Log + cleanup temp files
"""
job_id = f"{int(time.time())}_{file_id}"
start_time = time.time()
temp_dir = os.path.join(tempfile.gettempdir(), 'box_processor', job_id)
os.makedirs(temp_dir, exist_ok=True)
result = {
'job_id': job_id,
'file_id': file_id,
'filename': filename,
'status': 'pending',
'timestamp': datetime.now().isoformat()
}
print(f"\n{'=' * 70}")
print(f"BOX: Processing {filename}")
print(f"Job: {job_id}")
print(f"{'=' * 70}")
try:
# 1. Validate filename
print("\n[1/8] Validating filename...")
valid, platform, aspect_ratio, error = self.validate_filename(filename)
if not valid:
print(f"✗ Validation failed: {error}")
result.update({'status': 'skipped', 'error': error,
'platform': platform, 'aspect_ratio': aspect_ratio})
report = self.generate_error_report(filename, error, platform, aspect_ratio)
self.upload_error_report(filename, report)
return result
print(f"✓ Platform: {platform} | Aspect ratio: {aspect_ratio}")
result['platform'] = platform
result['aspect_ratio'] = aspect_ratio
# 2. Download from Box
print("\n[2/8] Downloading from Box IN folder...")
input_path = os.path.join(temp_dir, 'input_' + filename)
if not self.box_client.download_with_retry(file_id, input_path):
raise Exception("Failed to download file from Box")
input_size = os.path.getsize(input_path)
result['input_size'] = input_size
print(f"✓ Downloaded ({input_size / (1024 * 1024):.2f} MB)")
# Probe original video for report metadata
original_info = {}
try:
original_info = VideoProcessor(input_path).get_video_info() or {}
except Exception as e:
print(f"⚠ Could not probe original metadata: {e}")
# 3. Convert
print("\n[3/8] Converting with FFmpeg...")
platform_info = get_platform_info(platform)
container = platform_info['container']
output_filename = f"{os.path.splitext(filename)[0]}_optimized.{container}"
output_path = os.path.join(temp_dir, output_filename)
conversion_result = VideoProcessor(input_path).convert_video(
platform=platform,
aspect_ratio=aspect_ratio,
output_path=output_path,
custom_bitrate=None
)
output_size = os.path.getsize(output_path)
size_reduction = ((input_size - output_size) / input_size) * 100
result['output_size'] = output_size
result['size_reduction_percent'] = round(size_reduction, 2)
result['conversion_details'] = conversion_result
print(f"✓ Done ({output_size / (1024 * 1024):.2f} MB, {size_reduction:.1f}% reduction)")
# 4. Generate report
print("\n[4/8] Generating JSON report...")
report = self.generate_success_report(
filename=filename,
output_filename=output_filename,
platform=platform,
aspect_ratio=aspect_ratio,
input_size=input_size,
output_size=output_size,
original_info=original_info,
conversion_result=conversion_result,
duration=time.time() - start_time
)
print("✓ Report ready")
# 5 & 6. Upload video + report to OUT_SUCCESS
print("\n[5/8] Uploading video to OUT_SUCCESS...")
success_folder_id = self.folders['OUT_SUCCESS']
video_file_id = self.box_client.upload_with_retry(
success_folder_id, output_path, output_filename)
if not video_file_id:
raise Exception("Failed to upload optimised video to OUT_SUCCESS")
print("\n[6/8] Uploading report to OUT_SUCCESS...")
report_filename = f"{os.path.splitext(filename)[0]}_report.json"
report_path = os.path.join(temp_dir, report_filename)
with open(report_path, 'w') as f:
json.dump(report, f, indent=2)
report_file_id = self.box_client.upload_with_retry(
success_folder_id, report_path, report_filename)
result['status'] = 'success'
result['uploaded_video_id'] = video_file_id
result['uploaded_report_id'] = report_file_id
print("✓ Uploads complete")
# 7. Delete original from IN
print("\n[7/8] Deleting original from IN folder...")
try:
self.box_client.client.file(file_id).delete()
print(f"✓ Deleted original (ID: {file_id})")
except Exception as del_err:
print(f"⚠ Could not delete original from IN: {del_err}")
# 8. Log
print("\n[8/8] Logging conversion...")
self.conversion_logger.log_conversion(
user_email='box_automation@system',
platform=platform,
aspect_ratio=aspect_ratio,
input_file_size=input_size,
output_file_size=output_size,
conversion_duration=time.time() - start_time,
status='success',
file_id=file_id,
error_message=None
)
print("✓ Logged")
except Exception as e:
print(f"\n✗ ERROR: {e}")
result['status'] = 'failed'
result['error'] = str(e)
self.upload_error_report(filename, self.generate_error_report(
filename, str(e), result.get('platform'), result.get('aspect_ratio')
), failed=True)
self.conversion_logger.log_conversion(
user_email='box_automation@system',
platform=result.get('platform', 'unknown'),
aspect_ratio=result.get('aspect_ratio', 'unknown'),
input_file_size=result.get('input_size', 0),
output_file_size=0,
conversion_duration=time.time() - start_time,
status='failure',
file_id=file_id,
error_message=str(e)
)
finally:
try:
shutil.rmtree(temp_dir)
except Exception:
pass
print(f"\n{'=' * 70}")
print(f"BOX: {result['status'].upper()}{filename} ({time.time() - start_time:.1f}s)")
print(f"{'=' * 70}\n")
return result
def generate_success_report(self, filename: str, output_filename: str,
platform: str, aspect_ratio: str,
input_size: int, output_size: int,
original_info: Dict, conversion_result: Dict,
duration: float) -> Dict:
size_reduction = ((input_size - output_size) / input_size) * 100
return {
'status': 'success',
'timestamp': datetime.now().isoformat(),
'processing_time_seconds': round(duration, 2),
'original_file': {
'filename': filename,
'size_bytes': input_size,
'size_mb': round(input_size / (1024 * 1024), 2),
'codec': original_info.get('codec', 'unknown'),
'resolution': original_info.get('resolution', 'unknown'),
'bitrate': original_info.get('bitrate', 'unknown'),
'duration_seconds': original_info.get('duration', 0),
'aspect_ratio': original_info.get('aspect_ratio', 'unknown')
},
'optimised_file': {
'filename': output_filename,
'size_bytes': output_size,
'size_mb': round(output_size / (1024 * 1024), 2),
'size_reduction_percent': round(size_reduction, 2),
'savings_mb': round((input_size - output_size) / (1024 * 1024), 2)
},
'conversion_details': {
'platform': platform,
'aspect_ratio': aspect_ratio,
'resolution': conversion_result.get('resolution', 'N/A'),
'codec': conversion_result.get('codec', 'N/A'),
'bitrate': conversion_result.get('bitrate', 'N/A'),
'duration_seconds': conversion_result.get('duration', 0)
}
}
def generate_error_report(self, filename: str, error: str,
platform: Optional[str], aspect_ratio: Optional[str]) -> Dict:
return {
'status': 'error',
'timestamp': datetime.now().isoformat(),
'original_file': {'filename': filename},
'error': {
'message': error,
'detected_platform': platform,
'detected_aspect_ratio': aspect_ratio,
'reason': (
'Filename must include both a platform pattern (e.g. _tiktok_, _meta_) '
'and an aspect ratio pattern (e.g. _9x16_, _16x9_, _1x1_)'
)
}
}
def upload_error_report(self, filename: str, report: Dict, failed: bool = False):
folder_id = self.folders.get('OUT_FAILED')
if not folder_id:
return
temp_path = os.path.join(tempfile.gettempdir(), f"error_{int(time.time())}.json")
try:
with open(temp_path, 'w') as f:
json.dump(report, f, indent=2)
report_name = f"{os.path.splitext(filename)[0]}_error_report.json"
self.box_client.upload_with_retry(folder_id, temp_path, report_name)
print("✓ Error report uploaded to OUT_FAILED")
except Exception as e:
print(f"⚠ Failed to upload error report: {e}")
finally:
if os.path.exists(temp_path):
os.remove(temp_path)
class BoxPoller:
"""
Polls the Box IN folder on a configurable interval.
Alternative to webhooks — no public URL required.
Activated by BOX_USE_POLLING=true in .env.
Interval controlled by BOX_POLL_INTERVAL_SECONDS (default 60).
"""
def __init__(self, processor: BoxProcessor, interval_seconds: int = 60):
self.processor = processor
self.interval = interval_seconds
self._processed: Set[str] = set()
self._running = False
self._thread: Optional[threading.Thread] = None
def start(self):
self._running = True
self._thread = threading.Thread(target=self._poll_loop, daemon=True)
self._thread.start()
print(f"✓ Box polling active — checking IN folder every {self.interval}s")
def stop(self):
self._running = False
if self._thread:
self._thread.join(timeout=5)
def _poll_loop(self):
print(f"[POLLER] Started (interval: {self.interval}s)")
while self._running:
try:
self._check_in_folder()
except Exception as e:
print(f"[POLLER] Error: {e}")
time.sleep(self.interval)
def _check_in_folder(self):
in_folder_id = self.processor.folders.get('IN')
if not in_folder_id:
return
items = self.processor.box_client.client.folder(in_folder_id).get_items()
new_files = [item for item in items
if item.type == 'file' and item.id not in self._processed]
if not new_files:
print(f"[POLLER] No new files ({datetime.now().strftime('%H:%M:%S')})")
return
print(f"[POLLER] Found {len(new_files)} new file(s)")
for item in new_files:
self._processed.add(item.id)
self.processor.process_file(item.id, item.name)