- Ferrero filename parser with NEW format support - CreativeX API client with retry logic - State persistence with resume capability - Upload and status checking scripts - Comprehensive documentation - Virtual environment support
129 lines
4.2 KiB
Python
129 lines
4.2 KiB
Python
"""
|
|
Pre-upload validation for files and metadata
|
|
"""
|
|
|
|
from typing import Tuple, List
|
|
from core.data_loader import DataLoader
|
|
from utils.file_handler import FileHandler
|
|
|
|
|
|
class UploadValidator:
|
|
"""Pre-upload validation to ensure data quality"""
|
|
|
|
def __init__(self, data_loader: DataLoader, max_file_size_mb: int = 500):
|
|
"""
|
|
Initialize validator
|
|
|
|
Args:
|
|
data_loader: DataLoader instance for validation
|
|
max_file_size_mb: Maximum allowed file size in MB
|
|
"""
|
|
self.data_loader = data_loader
|
|
self.max_file_size_mb = max_file_size_mb
|
|
|
|
def validate_file(self, file_path: str) -> Tuple[bool, List[str]]:
|
|
"""
|
|
Validate file exists, is readable, and has supported format
|
|
|
|
Args:
|
|
file_path: Path to file
|
|
|
|
Returns:
|
|
tuple: (is_valid: bool, errors: List[str])
|
|
"""
|
|
errors = []
|
|
|
|
# Check file exists
|
|
if not FileHandler.file_exists(file_path):
|
|
errors.append(f"File not found: {file_path}")
|
|
return False, errors
|
|
|
|
# Check readable
|
|
if not FileHandler.is_readable(file_path):
|
|
errors.append(f"File not readable (permission denied): {file_path}")
|
|
return False, errors
|
|
|
|
# Check format
|
|
if not FileHandler.is_supported_format(file_path):
|
|
ext = FileHandler.get_file_extension(file_path)
|
|
supported = (FileHandler.SUPPORTED_VIDEO_FORMATS +
|
|
FileHandler.SUPPORTED_IMAGE_FORMATS)
|
|
errors.append(
|
|
f"Unsupported file format: {ext}. "
|
|
f"Supported formats: {', '.join(supported)}"
|
|
)
|
|
return False, errors
|
|
|
|
# Check file size
|
|
size_mb = FileHandler.get_file_size_mb(file_path)
|
|
if size_mb > self.max_file_size_mb:
|
|
errors.append(
|
|
f"File too large: {size_mb:.2f}MB "
|
|
f"(max: {self.max_file_size_mb}MB)"
|
|
)
|
|
return False, errors
|
|
|
|
return True, []
|
|
|
|
def validate_parsed_metadata(self, metadata: dict) -> Tuple[bool, List[str]]:
|
|
"""
|
|
Validate all required fields are present and valid
|
|
|
|
Args:
|
|
metadata: Parsed metadata dict
|
|
|
|
Returns:
|
|
tuple: (is_valid: bool, errors: List[str])
|
|
"""
|
|
errors = []
|
|
|
|
# Check required fields
|
|
required_fields = [
|
|
'brand_code', 'brand_name',
|
|
'subject',
|
|
'asset_type',
|
|
'aspect_ratio',
|
|
'country_code', 'country_name',
|
|
'language_code',
|
|
'channel'
|
|
]
|
|
|
|
for field in required_fields:
|
|
if not metadata.get(field):
|
|
errors.append(f"Missing required field: {field}")
|
|
|
|
# Validate brand
|
|
if metadata.get('brand_code') and not self.data_loader.validate_brand_code(metadata['brand_code']):
|
|
errors.append(f"Invalid brand code: {metadata['brand_code']}")
|
|
|
|
# Validate country
|
|
if metadata.get('country_code') and not self.data_loader.validate_country_code(metadata['country_code']):
|
|
errors.append(f"Invalid country code: {metadata['country_code']}")
|
|
|
|
# Validate language
|
|
if metadata.get('language_code') and not self.data_loader.validate_language_code(metadata['language_code']):
|
|
errors.append(f"Invalid language code: {metadata['language_code']}")
|
|
|
|
# Validate asset type
|
|
if metadata.get('asset_type') and not self.data_loader.validate_asset_type(metadata['asset_type']):
|
|
errors.append(f"Invalid asset type: {metadata['asset_type']}")
|
|
|
|
# Validate social media (if present)
|
|
if metadata.get('social_media'):
|
|
if not self.data_loader.validate_social_code(metadata['social_media']):
|
|
errors.append(f"Invalid social media code: {metadata['social_media']}")
|
|
|
|
is_valid = len(errors) == 0
|
|
return is_valid, errors
|
|
|
|
def get_content_type(self, file_path: str) -> str:
|
|
"""
|
|
Get MIME content type for file
|
|
|
|
Args:
|
|
file_path: Path to file
|
|
|
|
Returns:
|
|
str: MIME type
|
|
"""
|
|
return FileHandler.get_content_type(file_path)
|