""" Box.com video processing classes. BoxProcessor and BoxPoller are imported and used directly by app.py. This module no longer runs as a standalone service — all Box endpoints live in app.py on port 5000. To run the Box automation: python app.py # starts web UI + Box in one process To test: python test_box_processor.py # targets http://localhost:5000 python box_setup.py # verifies Box credentials and folders """ import os import json import time import hmac import hashlib import tempfile import shutil import threading from datetime import datetime from typing import Optional, Dict, Tuple, Set from box_client import BoxClient from video_processor import VideoProcessor from conversion_logger import ConversionLogger from platform_specs import ( detect_platform_from_filename, detect_aspect_ratio_from_filename, get_platform_info ) def verify_box_signature(secret: str, body: bytes, primary: str, secondary: str = '') -> bool: """ Verify Box webhook HMAC-SHA256 signature. Box sends both Box-Signature-Primary and Box-Signature-Secondary headers. A valid signature on either is sufficient. Returns True (skip verification) if secret is empty. """ if not secret: return True expected = hmac.new(secret.encode(), body, hashlib.sha256).hexdigest() if primary and hmac.compare_digest(expected, primary): return True if secondary and hmac.compare_digest(expected, secondary): return True return False class BoxProcessor: """Main Box video processor — 8-step pipeline per file""" def __init__(self, box_config_path: str, conversion_logs_folder: str, as_user_id: str = ''): self.box_client = BoxClient(box_config_path, as_user_id) self.conversion_logger = ConversionLogger(conversion_logs_folder) self.folders = {} def initialize(self, video_optimizer_folder_id: str) -> bool: """Authenticate with Box and discover IN / OUT_SUCCESS / OUT_FAILED folders""" print("=" * 70) print("BOX AUTOMATION - INITIALIZING") print("=" * 70) if not video_optimizer_folder_id: print("✗ BOX_VIDEO_OPTIMIZER_FOLDER_ID not set in .env") return False if not self.box_client.authenticate(): return False self.folders = self.box_client.discover_folders(video_optimizer_folder_id) required = ['IN', 'OUT_SUCCESS', 'OUT_FAILED'] missing = [f for f in required if f not in self.folders] if missing: print(f"✗ Missing required Box folders: {missing}") return False print("=" * 70) print("✓ Box automation ready") print(f" IN folder: {self.folders['IN']}") print(f" OUT_SUCCESS folder: {self.folders['OUT_SUCCESS']}") print(f" OUT_FAILED folder: {self.folders['OUT_FAILED']}") print("=" * 70) return True def validate_filename(self, filename: str) -> Tuple[bool, Optional[str], Optional[str], Optional[str]]: """ Validate filename contains detectable platform and aspect ratio. Returns (valid, platform, aspect_ratio, error_message) """ platform = detect_platform_from_filename(filename) aspect_ratio = detect_aspect_ratio_from_filename(filename) if platform is None: return (False, None, None, "No platform detected. Expected patterns like _tiktok_, _meta_, _yt_, etc.") if aspect_ratio is None: return (False, platform, None, "No aspect ratio detected. Expected patterns like _9x16_, _16x9_, _1x1_, etc.") platform_info = get_platform_info(platform) if not platform_info: return (False, platform, aspect_ratio, f"Invalid platform: {platform}") supported_ratios = [fmt['ratio'] for fmt in platform_info['formats']] if aspect_ratio not in supported_ratios: return (False, platform, aspect_ratio, f"Platform '{platform}' does not support {aspect_ratio}. " f"Supported: {', '.join(supported_ratios)}") return (True, platform, aspect_ratio, None) def process_file(self, file_id: str, filename: str) -> Dict: """ Main 8-step processing pipeline: 1. Validate filename 2. Download from Box IN 3. Convert with FFmpeg 4. Generate JSON report 5. Upload video to OUT_SUCCESS 6. Upload report to OUT_SUCCESS 7. Delete original from IN 8. Log + cleanup temp files """ job_id = f"{int(time.time())}_{file_id}" start_time = time.time() temp_dir = os.path.join(tempfile.gettempdir(), 'box_processor', job_id) os.makedirs(temp_dir, exist_ok=True) result = { 'job_id': job_id, 'file_id': file_id, 'filename': filename, 'status': 'pending', 'timestamp': datetime.now().isoformat() } print(f"\n{'=' * 70}") print(f"BOX: Processing {filename}") print(f"Job: {job_id}") print(f"{'=' * 70}") try: # 1. Validate filename print("\n[1/8] Validating filename...") valid, platform, aspect_ratio, error = self.validate_filename(filename) if not valid: print(f"✗ Validation failed: {error}") result.update({'status': 'skipped', 'error': error, 'platform': platform, 'aspect_ratio': aspect_ratio}) report = self.generate_error_report(filename, error, platform, aspect_ratio) self.upload_error_report(filename, report) return result print(f"✓ Platform: {platform} | Aspect ratio: {aspect_ratio}") result['platform'] = platform result['aspect_ratio'] = aspect_ratio # 2. Download from Box print("\n[2/8] Downloading from Box IN folder...") input_path = os.path.join(temp_dir, 'input_' + filename) if not self.box_client.download_with_retry(file_id, input_path): raise Exception("Failed to download file from Box") input_size = os.path.getsize(input_path) result['input_size'] = input_size print(f"✓ Downloaded ({input_size / (1024 * 1024):.2f} MB)") # Probe original video for report metadata original_info = {} try: original_info = VideoProcessor(input_path).get_video_info() or {} except Exception as e: print(f"⚠ Could not probe original metadata: {e}") # 3. Convert print("\n[3/8] Converting with FFmpeg...") platform_info = get_platform_info(platform) container = platform_info['container'] output_filename = f"{os.path.splitext(filename)[0]}_optimized.{container}" output_path = os.path.join(temp_dir, output_filename) conversion_result = VideoProcessor(input_path).convert_video( platform=platform, aspect_ratio=aspect_ratio, output_path=output_path, custom_bitrate=None ) output_size = os.path.getsize(output_path) size_reduction = ((input_size - output_size) / input_size) * 100 result['output_size'] = output_size result['size_reduction_percent'] = round(size_reduction, 2) result['conversion_details'] = conversion_result print(f"✓ Done ({output_size / (1024 * 1024):.2f} MB, {size_reduction:.1f}% reduction)") # 4. Generate report print("\n[4/8] Generating JSON report...") report = self.generate_success_report( filename=filename, output_filename=output_filename, platform=platform, aspect_ratio=aspect_ratio, input_size=input_size, output_size=output_size, original_info=original_info, conversion_result=conversion_result, duration=time.time() - start_time ) print("✓ Report ready") # 5 & 6. Upload video + report to OUT_SUCCESS print("\n[5/8] Uploading video to OUT_SUCCESS...") success_folder_id = self.folders['OUT_SUCCESS'] video_file_id = self.box_client.upload_with_retry( success_folder_id, output_path, output_filename) if not video_file_id: raise Exception("Failed to upload optimised video to OUT_SUCCESS") print("\n[6/8] Uploading report to OUT_SUCCESS...") report_filename = f"{os.path.splitext(filename)[0]}_report.json" report_path = os.path.join(temp_dir, report_filename) with open(report_path, 'w') as f: json.dump(report, f, indent=2) report_file_id = self.box_client.upload_with_retry( success_folder_id, report_path, report_filename) result['status'] = 'success' result['uploaded_video_id'] = video_file_id result['uploaded_report_id'] = report_file_id print("✓ Uploads complete") # 7. Delete original from IN print("\n[7/8] Deleting original from IN folder...") try: self.box_client.client.file(file_id).delete() print(f"✓ Deleted original (ID: {file_id})") except Exception as del_err: print(f"⚠ Could not delete original from IN: {del_err}") # 8. Log print("\n[8/8] Logging conversion...") self.conversion_logger.log_conversion( user_email='box_automation@system', platform=platform, aspect_ratio=aspect_ratio, input_file_size=input_size, output_file_size=output_size, conversion_duration=time.time() - start_time, status='success', file_id=file_id, error_message=None ) print("✓ Logged") except Exception as e: print(f"\n✗ ERROR: {e}") result['status'] = 'failed' result['error'] = str(e) self.upload_error_report(filename, self.generate_error_report( filename, str(e), result.get('platform'), result.get('aspect_ratio') ), failed=True) self.conversion_logger.log_conversion( user_email='box_automation@system', platform=result.get('platform', 'unknown'), aspect_ratio=result.get('aspect_ratio', 'unknown'), input_file_size=result.get('input_size', 0), output_file_size=0, conversion_duration=time.time() - start_time, status='failure', file_id=file_id, error_message=str(e) ) finally: try: shutil.rmtree(temp_dir) except Exception: pass print(f"\n{'=' * 70}") print(f"BOX: {result['status'].upper()} — {filename} ({time.time() - start_time:.1f}s)") print(f"{'=' * 70}\n") return result def generate_success_report(self, filename: str, output_filename: str, platform: str, aspect_ratio: str, input_size: int, output_size: int, original_info: Dict, conversion_result: Dict, duration: float) -> Dict: size_reduction = ((input_size - output_size) / input_size) * 100 return { 'status': 'success', 'timestamp': datetime.now().isoformat(), 'processing_time_seconds': round(duration, 2), 'original_file': { 'filename': filename, 'size_bytes': input_size, 'size_mb': round(input_size / (1024 * 1024), 2), 'codec': original_info.get('codec', 'unknown'), 'resolution': original_info.get('resolution', 'unknown'), 'bitrate': original_info.get('bitrate', 'unknown'), 'duration_seconds': original_info.get('duration', 0), 'aspect_ratio': original_info.get('aspect_ratio', 'unknown') }, 'optimised_file': { 'filename': output_filename, 'size_bytes': output_size, 'size_mb': round(output_size / (1024 * 1024), 2), 'size_reduction_percent': round(size_reduction, 2), 'savings_mb': round((input_size - output_size) / (1024 * 1024), 2) }, 'conversion_details': { 'platform': platform, 'aspect_ratio': aspect_ratio, 'resolution': conversion_result.get('resolution', 'N/A'), 'codec': conversion_result.get('codec', 'N/A'), 'bitrate': conversion_result.get('bitrate', 'N/A'), 'duration_seconds': conversion_result.get('duration', 0) } } def generate_error_report(self, filename: str, error: str, platform: Optional[str], aspect_ratio: Optional[str]) -> Dict: return { 'status': 'error', 'timestamp': datetime.now().isoformat(), 'original_file': {'filename': filename}, 'error': { 'message': error, 'detected_platform': platform, 'detected_aspect_ratio': aspect_ratio, 'reason': ( 'Filename must include both a platform pattern (e.g. _tiktok_, _meta_) ' 'and an aspect ratio pattern (e.g. _9x16_, _16x9_, _1x1_)' ) } } def upload_error_report(self, filename: str, report: Dict, failed: bool = False): folder_id = self.folders.get('OUT_FAILED') if not folder_id: return temp_path = os.path.join(tempfile.gettempdir(), f"error_{int(time.time())}.json") try: with open(temp_path, 'w') as f: json.dump(report, f, indent=2) report_name = f"{os.path.splitext(filename)[0]}_error_report.json" self.box_client.upload_with_retry(folder_id, temp_path, report_name) print("✓ Error report uploaded to OUT_FAILED") except Exception as e: print(f"⚠ Failed to upload error report: {e}") finally: if os.path.exists(temp_path): os.remove(temp_path) class BoxPoller: """ Polls the Box IN folder on a configurable interval. Alternative to webhooks — no public URL required. Activated by BOX_USE_POLLING=true in .env. Interval controlled by BOX_POLL_INTERVAL_SECONDS (default 60). """ def __init__(self, processor: BoxProcessor, interval_seconds: int = 60): self.processor = processor self.interval = interval_seconds self._processed: Set[str] = set() self._running = False self._thread: Optional[threading.Thread] = None def start(self): self._running = True self._thread = threading.Thread(target=self._poll_loop, daemon=True) self._thread.start() print(f"✓ Box polling active — checking IN folder every {self.interval}s") def stop(self): self._running = False if self._thread: self._thread.join(timeout=5) def _poll_loop(self): print(f"[POLLER] Started (interval: {self.interval}s)") while self._running: try: self._check_in_folder() except Exception as e: print(f"[POLLER] Error: {e}") time.sleep(self.interval) def _check_in_folder(self): in_folder_id = self.processor.folders.get('IN') if not in_folder_id: return items = self.processor.box_client.client.folder(in_folder_id).get_items() new_files = [item for item in items if item.type == 'file' and item.id not in self._processed] if not new_files: print(f"[POLLER] No new files ({datetime.now().strftime('%H:%M:%S')})") return print(f"[POLLER] Found {len(new_files)} new file(s)") for item in new_files: self._processed.add(item.id) self.processor.process_file(item.id, item.name)