Created comprehensive FieldMapper module (400+ lines): - Fuzzy field matching with SequenceMatcher (60% similarity threshold) - 10+ aliases per standard field (title, subject, keywords, description) - Auto-mapping with confidence scores (0.0 to 1.0) - Mapping suggestions with alternatives (top 2 per field) - Exact match detection (score 1.0) and substring bonuses (0.85) - Preset save/load/delete for reusable mappings - Mapping validation (duplicate targets, coverage stats) - Unmapped field detection and coverage percentage FieldMapper features: - auto_map(): Generate mapping from source fields - suggest_mapping(): Get best match + alternatives for each field - validate_mapping(): Check for conflicts and warnings - apply_mapping(): Transform data using field mapping - get_mapping_coverage(): Calculate mapping completeness - Preset management: save, load, list, delete MetadataImporter enhancements: - preview_file_structure(): Preview columns and suggest mappings - import_with_mapping(): Import with custom field mapping - Integration with FieldMapper for smart detection - Sample row preview (5 rows) before import Web API additions: - /preview-import endpoint: Preview file structure and field suggestions - Returns: columns, sample rows, mapping suggestions with confidence - Supports CSV, Excel, JSON format detection Field mapping workflow: 1. User uploads import file for preview 2. System analyzes columns and suggests mappings 3. User reviews/adjusts mappings (confidence scores shown) 4. User confirms and imports with mapping 5. Optional: Save mapping as preset for reuse Technical highlights: - SequenceMatcher from difflib for fuzzy string matching - Normalize field names (lowercase, underscores) - Multiple alias sets per target field - Confidence-based ranking of matches - Preset persistence via JSON file Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
427 lines
14 KiB
Python
427 lines
14 KiB
Python
"""Metadata importer for external files (CSV, Excel, JSON)."""
|
|
|
|
import pandas as pd
|
|
import json
|
|
from pathlib import Path
|
|
from typing import Dict, Optional, List, Tuple
|
|
from .utils import get_logger
|
|
from .field_mapper import FieldMapper
|
|
|
|
logger = get_logger(__name__)
|
|
|
|
|
|
class MetadataImporter:
|
|
"""Import metadata from various file formats (CSV, Excel, JSON)."""
|
|
|
|
def import_from_csv(self, csv_path: str) -> Dict[str, Dict]:
|
|
"""
|
|
Import metadata from CSV file.
|
|
Expected columns: filename, title, subject/description, keywords
|
|
|
|
Args:
|
|
csv_path: Path to CSV file
|
|
|
|
Returns:
|
|
Dictionary mapping filename stems to metadata dicts
|
|
"""
|
|
try:
|
|
df = pd.read_csv(csv_path, encoding='utf-8')
|
|
logger.info(f"Loaded CSV with {len(df)} rows from {csv_path}")
|
|
return self._parse_dataframe(df)
|
|
|
|
except UnicodeDecodeError:
|
|
# Try alternative encodings
|
|
for encoding in ['latin1', 'iso-8859-1', 'cp1252']:
|
|
try:
|
|
df = pd.read_csv(csv_path, encoding=encoding)
|
|
logger.info(f"Loaded CSV with {len(df)} rows using {encoding} encoding")
|
|
return self._parse_dataframe(df)
|
|
except Exception:
|
|
continue
|
|
|
|
raise ValueError(f"Could not read CSV file with any supported encoding")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error importing from CSV: {e}")
|
|
raise
|
|
|
|
def import_from_excel(self, excel_path: str, sheet_name: Optional[str] = None) -> Dict[str, Dict]:
|
|
"""
|
|
Import metadata from Excel file.
|
|
|
|
Args:
|
|
excel_path: Path to Excel file (.xlsx, .xls)
|
|
sheet_name: Name of sheet to read (None = first sheet)
|
|
|
|
Returns:
|
|
Dictionary mapping filename stems to metadata dicts
|
|
"""
|
|
try:
|
|
# Read Excel file
|
|
if sheet_name:
|
|
df = pd.read_excel(excel_path, sheet_name=sheet_name)
|
|
logger.info(f"Loaded Excel sheet '{sheet_name}' with {len(df)} rows")
|
|
else:
|
|
df = pd.read_excel(excel_path)
|
|
logger.info(f"Loaded Excel with {len(df)} rows from first sheet")
|
|
|
|
return self._parse_dataframe(df)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error importing from Excel: {e}")
|
|
raise
|
|
|
|
def import_from_json(self, json_path: str) -> Dict[str, Dict]:
|
|
"""
|
|
Import metadata from JSON file.
|
|
|
|
Expected format:
|
|
{
|
|
"filename.pdf": {"title": "...", "subject": "...", "keywords": "..."},
|
|
"image.jpg": {"title": "...", "subject": "...", "keywords": "..."}
|
|
}
|
|
|
|
Or array format:
|
|
[
|
|
{"filename": "file.pdf", "title": "...", "subject": "...", "keywords": "..."},
|
|
{"filename": "image.jpg", "title": "...", "subject": "...", "keywords": "..."}
|
|
]
|
|
|
|
Args:
|
|
json_path: Path to JSON file
|
|
|
|
Returns:
|
|
Dictionary mapping filename stems to metadata dicts
|
|
"""
|
|
try:
|
|
with open(json_path, 'r', encoding='utf-8') as f:
|
|
data = json.load(f)
|
|
|
|
metadata_map = {}
|
|
|
|
if isinstance(data, dict):
|
|
# Object format: {"filename": {metadata}}
|
|
for filename, metadata in data.items():
|
|
filename_stem = Path(filename).stem.lower()
|
|
metadata_map[filename_stem] = self._normalize_metadata(metadata)
|
|
|
|
elif isinstance(data, list):
|
|
# Array format: [{filename, metadata}]
|
|
for item in data:
|
|
if not isinstance(item, dict):
|
|
continue
|
|
|
|
# Find filename field
|
|
filename = None
|
|
for key in ['filename', 'file', 'name', 'file_name']:
|
|
if key in item:
|
|
filename = item[key]
|
|
break
|
|
|
|
if not filename:
|
|
logger.warning(f"Skipping item without filename: {item}")
|
|
continue
|
|
|
|
filename_stem = Path(filename).stem.lower()
|
|
metadata_map[filename_stem] = self._normalize_metadata(item)
|
|
|
|
else:
|
|
raise ValueError("JSON must be an object or array")
|
|
|
|
logger.info(f"Loaded {len(metadata_map)} metadata records from JSON")
|
|
return metadata_map
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error importing from JSON: {e}")
|
|
raise
|
|
|
|
def _parse_dataframe(self, df: pd.DataFrame) -> Dict[str, Dict]:
|
|
"""
|
|
Parse pandas DataFrame into metadata map.
|
|
|
|
Args:
|
|
df: DataFrame with metadata
|
|
|
|
Returns:
|
|
Dictionary mapping filename stems to metadata dicts
|
|
"""
|
|
metadata_map = {}
|
|
|
|
# Detect filename column (try common names)
|
|
filename_col = self._detect_column(df, ['filename', 'file', 'name', 'file_name', 'path'])
|
|
|
|
if not filename_col:
|
|
raise ValueError("Could not find filename column in data. Tried: filename, file, name, file_name, path")
|
|
|
|
# Detect metadata columns
|
|
title_col = self._detect_column(df, ['title', 'heading', 'name', 'document_title'])
|
|
subject_col = self._detect_column(df, ['subject', 'description', 'summary', 'desc', 'external_description', 'alt_text'])
|
|
keywords_col = self._detect_column(df, ['keywords', 'tags', 'categories', 'labels'])
|
|
|
|
logger.info(f"Detected columns - filename: {filename_col}, title: {title_col}, subject: {subject_col}, keywords: {keywords_col}")
|
|
|
|
# Parse rows
|
|
for _, row in df.iterrows():
|
|
filename = str(row.get(filename_col, '')).strip()
|
|
if not filename or pd.isna(filename):
|
|
continue
|
|
|
|
filename_stem = Path(filename).stem.lower()
|
|
|
|
metadata_map[filename_stem] = {
|
|
'title': self._get_value(row, title_col),
|
|
'subject': self._get_value(row, subject_col),
|
|
'keywords': self._get_value(row, keywords_col)
|
|
}
|
|
|
|
logger.info(f"Parsed {len(metadata_map)} metadata records from DataFrame")
|
|
return metadata_map
|
|
|
|
def _detect_column(self, df: pd.DataFrame, candidates: List[str]) -> Optional[str]:
|
|
"""
|
|
Detect column name from a list of candidates (case-insensitive).
|
|
|
|
Args:
|
|
df: DataFrame to search
|
|
candidates: List of possible column names
|
|
|
|
Returns:
|
|
Actual column name if found, None otherwise
|
|
"""
|
|
# Create lowercase mapping
|
|
col_map = {col.lower(): col for col in df.columns}
|
|
|
|
# Try each candidate
|
|
for candidate in candidates:
|
|
if candidate.lower() in col_map:
|
|
return col_map[candidate.lower()]
|
|
|
|
return None
|
|
|
|
def _get_value(self, row: pd.Series, column: Optional[str]) -> str:
|
|
"""
|
|
Get value from row, handling None column and NaN values.
|
|
|
|
Args:
|
|
row: DataFrame row
|
|
column: Column name (can be None)
|
|
|
|
Returns:
|
|
String value or empty string
|
|
"""
|
|
if column is None:
|
|
return ''
|
|
|
|
value = row.get(column, '')
|
|
|
|
if pd.isna(value):
|
|
return ''
|
|
|
|
return str(value).strip()
|
|
|
|
def _normalize_metadata(self, metadata: Dict) -> Dict[str, str]:
|
|
"""
|
|
Normalize metadata dictionary to standard format.
|
|
|
|
Args:
|
|
metadata: Raw metadata dict
|
|
|
|
Returns:
|
|
Normalized metadata with title, subject, keywords keys
|
|
"""
|
|
normalized = {
|
|
'title': '',
|
|
'subject': '',
|
|
'keywords': ''
|
|
}
|
|
|
|
# Map title
|
|
for key in ['title', 'heading', 'name', 'document_title']:
|
|
if key in metadata and metadata[key]:
|
|
normalized['title'] = str(metadata[key]).strip()
|
|
break
|
|
|
|
# Map subject/description
|
|
for key in ['subject', 'description', 'summary', 'desc', 'external_description', 'alt_text']:
|
|
if key in metadata and metadata[key]:
|
|
normalized['subject'] = str(metadata[key]).strip()
|
|
break
|
|
|
|
# Map keywords
|
|
for key in ['keywords', 'tags', 'categories', 'labels']:
|
|
if key in metadata and metadata[key]:
|
|
value = metadata[key]
|
|
# Handle arrays
|
|
if isinstance(value, list):
|
|
normalized['keywords'] = ', '.join(str(v) for v in value)
|
|
else:
|
|
normalized['keywords'] = str(value).strip()
|
|
break
|
|
|
|
return normalized
|
|
|
|
def get_metadata_for_file(self, metadata_map: Dict[str, Dict], filename: str) -> Optional[Dict[str, str]]:
|
|
"""
|
|
Get metadata for a specific file from imported map.
|
|
|
|
Args:
|
|
metadata_map: Dictionary returned by import_* methods
|
|
filename: Filename to look up (with or without extension)
|
|
|
|
Returns:
|
|
Metadata dict if found, None otherwise
|
|
"""
|
|
filename_stem = Path(filename).stem.lower()
|
|
return metadata_map.get(filename_stem)
|
|
|
|
def validate_import(self, metadata_map: Dict[str, Dict]) -> Dict:
|
|
"""
|
|
Validate imported metadata and return statistics.
|
|
|
|
Args:
|
|
metadata_map: Dictionary returned by import_* methods
|
|
|
|
Returns:
|
|
Statistics about the import
|
|
"""
|
|
stats = {
|
|
'total_records': len(metadata_map),
|
|
'with_title': 0,
|
|
'with_subject': 0,
|
|
'with_keywords': 0,
|
|
'empty_records': 0
|
|
}
|
|
|
|
for metadata in metadata_map.values():
|
|
if metadata.get('title'):
|
|
stats['with_title'] += 1
|
|
if metadata.get('subject'):
|
|
stats['with_subject'] += 1
|
|
if metadata.get('keywords'):
|
|
stats['with_keywords'] += 1
|
|
|
|
if not any([metadata.get('title'), metadata.get('subject'), metadata.get('keywords')]):
|
|
stats['empty_records'] += 1
|
|
|
|
return stats
|
|
|
|
def preview_file_structure(self, file_path: str, file_type: str = 'auto') -> Tuple[List[str], List[Dict], Dict]:
|
|
"""
|
|
Preview file structure and suggest field mappings without importing.
|
|
|
|
Args:
|
|
file_path: Path to file (CSV, Excel, JSON)
|
|
file_type: File type ('csv', 'excel', 'json', or 'auto')
|
|
|
|
Returns:
|
|
Tuple of (column_names, sample_rows, suggested_mapping)
|
|
"""
|
|
if file_type == 'auto':
|
|
ext = Path(file_path).suffix.lower()
|
|
if ext == '.csv':
|
|
file_type = 'csv'
|
|
elif ext in ['.xlsx', '.xls']:
|
|
file_type = 'excel'
|
|
elif ext == '.json':
|
|
file_type = 'json'
|
|
else:
|
|
raise ValueError(f"Unsupported file type: {ext}")
|
|
|
|
# Load file
|
|
if file_type == 'csv':
|
|
df = pd.read_csv(file_path, encoding='utf-8', nrows=10)
|
|
elif file_type == 'excel':
|
|
df = pd.read_excel(file_path, nrows=10)
|
|
elif file_type == 'json':
|
|
with open(file_path, 'r', encoding='utf-8') as f:
|
|
data = json.load(f)
|
|
if isinstance(data, list) and len(data) > 0:
|
|
df = pd.DataFrame(data[:10])
|
|
elif isinstance(data, dict):
|
|
# Convert dict to list
|
|
items = [{'filename': k, **v} for k, v in list(data.items())[:10]]
|
|
df = pd.DataFrame(items)
|
|
else:
|
|
raise ValueError("JSON format not supported for preview")
|
|
|
|
# Get column names
|
|
columns = df.columns.tolist()
|
|
|
|
# Get sample rows
|
|
sample_rows = df.head(5).to_dict('records')
|
|
|
|
# Suggest field mapping
|
|
mapper = FieldMapper()
|
|
suggestions = mapper.suggest_mapping(columns)
|
|
|
|
return (columns, sample_rows, suggestions)
|
|
|
|
def import_with_mapping(self, file_path: str, mapping: Dict[str, str], file_type: str = 'auto') -> Dict[str, Dict]:
|
|
"""
|
|
Import file with custom field mapping.
|
|
|
|
Args:
|
|
file_path: Path to file
|
|
mapping: Field mapping {source_field: target_field}
|
|
file_type: File type ('csv', 'excel', 'json', or 'auto')
|
|
|
|
Returns:
|
|
Dictionary mapping filename stems to metadata dicts
|
|
"""
|
|
# Load file
|
|
if file_type == 'auto':
|
|
ext = Path(file_path).suffix.lower()
|
|
if ext == '.csv':
|
|
file_type = 'csv'
|
|
elif ext in ['.xlsx', '.xls']:
|
|
file_type = 'excel'
|
|
elif ext == '.json':
|
|
file_type = 'json'
|
|
|
|
if file_type == 'csv':
|
|
df = pd.read_csv(file_path, encoding='utf-8')
|
|
elif file_type == 'excel':
|
|
df = pd.read_excel(file_path)
|
|
elif file_type == 'json':
|
|
with open(file_path, 'r', encoding='utf-8') as f:
|
|
data = json.load(f)
|
|
if isinstance(data, list):
|
|
df = pd.DataFrame(data)
|
|
elif isinstance(data, dict):
|
|
items = [{'filename': k, **v} for k, v in data.items()]
|
|
df = pd.DataFrame(items)
|
|
|
|
# Apply field mapper
|
|
mapper = FieldMapper()
|
|
metadata_map = {}
|
|
|
|
# Find filename column
|
|
filename_col = None
|
|
for col in df.columns:
|
|
if col.lower() in ['filename', 'file', 'name', 'file_name']:
|
|
filename_col = col
|
|
break
|
|
|
|
if not filename_col:
|
|
raise ValueError("Could not find filename column")
|
|
|
|
# Process each row
|
|
for _, row in df.iterrows():
|
|
filename = str(row.get(filename_col, '')).strip()
|
|
if not filename or pd.isna(filename):
|
|
continue
|
|
|
|
filename_stem = Path(filename).stem.lower()
|
|
|
|
# Apply mapping to transform row data
|
|
row_dict = row.to_dict()
|
|
metadata = mapper.apply_mapping(row_dict, mapping)
|
|
|
|
metadata_map[filename_stem] = {
|
|
'title': str(metadata.get('title', '')).strip(),
|
|
'subject': str(metadata.get('subject', '')).strip(),
|
|
'keywords': str(metadata.get('keywords', '')).strip()
|
|
}
|
|
|
|
logger.info(f"Imported {len(metadata_map)} records with custom mapping")
|
|
return metadata_map
|