Complete Flask → FastAPI migration with: - FastAPI app with session auth, Azure AD SSO, rate limiting - SQLite-backed session store (survives restarts) - Bulk AI metadata generation with SSE progress - Admin panel (user management, audit log, AI usage) - Subpath deployment support (ROOT_PATH config) - Docker + deploy.sh for production deployment - Test suite (auth, upload, templates, imports, admin, sessions) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
293 lines
9.6 KiB
Python
293 lines
9.6 KiB
Python
#!/usr/bin/env python3
|
|
"""Main CLI application for metadata automation."""
|
|
|
|
import sys
|
|
import argparse
|
|
from pathlib import Path
|
|
from typing import List, Dict
|
|
from tqdm import tqdm
|
|
import csv
|
|
from datetime import datetime
|
|
|
|
# Import project modules
|
|
from .config import Config
|
|
from .file_detector import FileDetector, FileType
|
|
from .metadata_analyzer import MetadataAnalyzer
|
|
from .utils import (
|
|
create_backup, get_logger, format_metadata_comparison,
|
|
validate_file_path, create_report_entry
|
|
)
|
|
|
|
# Import extractors
|
|
from .extractors.pdf_extractor import PDFExtractor
|
|
from .extractors.image_extractor import ImageExtractor
|
|
from .extractors.office_extractor import OfficeExtractor
|
|
from .extractors.video_extractor import VideoExtractor
|
|
|
|
# Import updaters
|
|
from .updaters.pdf_updater import PDFUpdater
|
|
from .updaters.image_updater import ImageUpdater
|
|
from .updaters.office_updater import OfficeUpdater
|
|
from .updaters.video_updater import VideoUpdater
|
|
|
|
logger = get_logger(__name__)
|
|
|
|
class MetadataProcessor:
|
|
"""Main processor for metadata automation."""
|
|
|
|
def __init__(self, preview_mode: bool = False):
|
|
"""
|
|
Initialize the processor.
|
|
|
|
Args:
|
|
preview_mode: If True, show changes without applying them
|
|
"""
|
|
self.preview_mode = preview_mode
|
|
self.analyzer = MetadataAnalyzer()
|
|
|
|
# Initialize extractors and updaters
|
|
self.extractors = {
|
|
FileType.PDF: PDFExtractor(),
|
|
FileType.IMAGE: ImageExtractor(),
|
|
FileType.OFFICE_DOC: OfficeExtractor(),
|
|
FileType.OFFICE_SHEET: OfficeExtractor(),
|
|
FileType.OFFICE_PRESENTATION: OfficeExtractor(),
|
|
FileType.VIDEO: VideoExtractor()
|
|
}
|
|
|
|
self.updaters = {
|
|
FileType.PDF: PDFUpdater(),
|
|
FileType.IMAGE: ImageUpdater(),
|
|
FileType.OFFICE_DOC: OfficeUpdater(),
|
|
FileType.OFFICE_SHEET: OfficeUpdater(),
|
|
FileType.OFFICE_PRESENTATION: OfficeUpdater(),
|
|
FileType.VIDEO: VideoUpdater()
|
|
}
|
|
|
|
self.report_data = []
|
|
|
|
def process_file(self, file_path: str) -> bool:
|
|
"""
|
|
Process a single file.
|
|
|
|
Args:
|
|
file_path: Path to the file
|
|
|
|
Returns:
|
|
True if successful
|
|
"""
|
|
try:
|
|
logger.info(f"\nProcessing: {file_path}")
|
|
|
|
# Validate file
|
|
if not validate_file_path(file_path):
|
|
logger.error(f"Invalid file path: {file_path}")
|
|
return False
|
|
|
|
# Detect file type
|
|
file_type = FileDetector.detect_file_type(file_path)
|
|
|
|
if file_type == FileType.UNSUPPORTED:
|
|
logger.warning(f"Unsupported file type: {file_path}")
|
|
return False
|
|
|
|
logger.info(f"File type: {FileDetector.get_file_type_name(file_type)}")
|
|
|
|
# Get appropriate extractor
|
|
extractor = self.extractors.get(file_type)
|
|
if not extractor:
|
|
logger.error(f"No extractor found for {file_type}")
|
|
return False
|
|
|
|
# Extract content and current metadata
|
|
logger.info("Extracting content...")
|
|
content = extractor.extract_content(file_path)
|
|
|
|
if not content or len(content.strip()) < 10:
|
|
logger.warning("Insufficient content extracted, using filename only")
|
|
content = Path(file_path).stem
|
|
|
|
logger.info(f"Extracted {len(content)} characters")
|
|
|
|
logger.info("Reading current metadata...")
|
|
old_metadata = extractor.read_metadata(file_path)
|
|
|
|
# Analyze content and generate new metadata
|
|
logger.info("Analyzing content with AI...")
|
|
filename = Path(file_path).name
|
|
new_metadata = self.analyzer.analyze_content(content, filename, file_type)
|
|
|
|
# Display comparison
|
|
print(format_metadata_comparison(old_metadata, new_metadata))
|
|
|
|
# Store report data
|
|
self.report_data.append(
|
|
create_report_entry(
|
|
file_path, file_type.value, old_metadata, new_metadata,
|
|
"preview" if self.preview_mode else "pending"
|
|
)
|
|
)
|
|
|
|
# Update metadata if not in preview mode
|
|
if not self.preview_mode:
|
|
updater = self.updaters.get(file_type)
|
|
if not updater:
|
|
logger.error(f"No updater found for {file_type}")
|
|
return False
|
|
|
|
logger.info("Updating metadata...")
|
|
success = updater.update_metadata(file_path, new_metadata, backup=True)
|
|
|
|
if success:
|
|
logger.info("✓ Metadata updated successfully!")
|
|
self.report_data[-1]['status'] = 'success'
|
|
|
|
# Verify metadata
|
|
if updater.verify_metadata(file_path, new_metadata):
|
|
logger.info("✓ Metadata verified!")
|
|
else:
|
|
logger.warning("⚠ Metadata verification failed")
|
|
else:
|
|
logger.error("✗ Failed to update metadata")
|
|
self.report_data[-1]['status'] = 'failed'
|
|
return False
|
|
else:
|
|
logger.info("[PREVIEW MODE] Changes not applied")
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing {file_path}: {e}", exc_info=True)
|
|
return False
|
|
|
|
def process_directory(self, directory: str, recursive: bool = False) -> Dict[str, int]:
|
|
"""
|
|
Process all supported files in a directory.
|
|
|
|
Args:
|
|
directory: Path to directory
|
|
recursive: Process subdirectories
|
|
|
|
Returns:
|
|
Dictionary with processing statistics
|
|
"""
|
|
dir_path = Path(directory)
|
|
|
|
if not dir_path.exists() or not dir_path.is_dir():
|
|
logger.error(f"Invalid directory: {directory}")
|
|
return {}
|
|
|
|
# Find all files
|
|
pattern = '**/*' if recursive else '*'
|
|
all_files = list(dir_path.glob(pattern))
|
|
|
|
# Filter supported files
|
|
supported_files = [
|
|
f for f in all_files
|
|
if f.is_file() and FileDetector.is_supported(str(f))
|
|
]
|
|
|
|
logger.info(f"Found {len(supported_files)} supported files")
|
|
|
|
# Process files with progress bar
|
|
stats = {'success': 0, 'failed': 0, 'total': len(supported_files)}
|
|
|
|
for file_path in tqdm(supported_files, desc="Processing files"):
|
|
if self.process_file(str(file_path)):
|
|
stats['success'] += 1
|
|
else:
|
|
stats['failed'] += 1
|
|
|
|
return stats
|
|
|
|
def save_report(self, output_path: str = None):
|
|
"""Save processing report to CSV."""
|
|
if not self.report_data:
|
|
logger.info("No report data to save")
|
|
return
|
|
|
|
if not output_path:
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
output_path = Config.REPORTS_DIR / f"metadata_report_{timestamp}.csv"
|
|
|
|
output_path = Path(output_path)
|
|
output_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
with open(output_path, 'w', newline='', encoding='utf-8') as f:
|
|
if self.report_data:
|
|
writer = csv.DictWriter(f, fieldnames=self.report_data[0].keys())
|
|
writer.writeheader()
|
|
writer.writerows(self.report_data)
|
|
|
|
logger.info(f"Report saved to: {output_path}")
|
|
|
|
def main():
|
|
"""Main CLI entry point."""
|
|
parser = argparse.ArgumentParser(
|
|
description='Universal Metadata Automation Tool',
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog="""
|
|
Examples:
|
|
# Process single file
|
|
python -m src.main file.pdf
|
|
|
|
# Preview changes without applying
|
|
python -m src.main --preview file.pdf
|
|
|
|
# Process entire directory
|
|
python -m src.main --directory ./files
|
|
|
|
# Process directory recursively
|
|
python -m src.main --directory ./files --recursive
|
|
|
|
# Save report
|
|
python -m src.main file.pdf --report report.csv
|
|
"""
|
|
)
|
|
|
|
parser.add_argument('input', nargs='?', help='Input file or directory')
|
|
parser.add_argument('--directory', '-d', help='Process entire directory')
|
|
parser.add_argument('--recursive', '-r', action='store_true', help='Process subdirectories')
|
|
parser.add_argument('--preview', '-p', action='store_true', help='Preview mode (no changes)')
|
|
parser.add_argument('--report', help='Save report to CSV file')
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Validate input
|
|
if not args.input and not args.directory:
|
|
parser.print_help()
|
|
sys.exit(1)
|
|
|
|
# Initialize processor
|
|
processor = MetadataProcessor(preview_mode=args.preview)
|
|
|
|
try:
|
|
# Process input
|
|
if args.directory:
|
|
stats = processor.process_directory(args.directory, args.recursive)
|
|
print(f"\n{'='*60}")
|
|
print(f"BATCH PROCESSING RESULTS")
|
|
print(f"{'='*60}")
|
|
print(f"Total files: {stats.get('total', 0)}")
|
|
print(f"Successful: {stats.get('success', 0)}")
|
|
print(f"Failed: {stats.get('failed', 0)}")
|
|
print(f"{'='*60}\n")
|
|
elif args.input:
|
|
success = processor.process_file(args.input)
|
|
sys.exit(0 if success else 1)
|
|
|
|
# Save report
|
|
if args.report:
|
|
processor.save_report(args.report)
|
|
elif processor.report_data:
|
|
processor.save_report()
|
|
|
|
except KeyboardInterrupt:
|
|
print("\n\nOperation cancelled by user")
|
|
sys.exit(1)
|
|
except Exception as e:
|
|
logger.error(f"Fatal error: {e}", exc_info=True)
|
|
sys.exit(1)
|
|
|
|
if __name__ == '__main__':
|
|
main()
|