164 lines
5.4 KiB
Python
164 lines
5.4 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Extract project margin data from email files.
|
|
"""
|
|
|
|
import email
|
|
import base64
|
|
import re
|
|
import csv
|
|
from pathlib import Path
|
|
from typing import Dict, List, Optional
|
|
|
|
|
|
def decode_base64_content(encoded_content: str) -> str:
|
|
"""Decode base64 encoded email content."""
|
|
try:
|
|
decoded_bytes = base64.b64decode(encoded_content)
|
|
return decoded_bytes.decode('utf-8', errors='ignore')
|
|
except Exception as e:
|
|
print(f"Error decoding base64: {e}")
|
|
return ""
|
|
|
|
|
|
def extract_project_data(text_content: str) -> Optional[Dict[str, str]]:
|
|
"""
|
|
Extract project data from the decoded email text.
|
|
|
|
Expected format after "Hello [Name],":
|
|
PROJECT MARGIN
|
|
TARGET MARGIN
|
|
TO DATE MARGIN AT
|
|
COMPLETION %
|
|
COMPLETE
|
|
OMG2141622 - Hellmann's Ranch Poland Adapt<link> 36.02% -77.59% 25.07% 25%
|
|
"""
|
|
data = {}
|
|
|
|
# Extract recipient name from "Hello [Name],"
|
|
hello_match = re.search(r'Hello\s+([^,]+),', text_content)
|
|
if hello_match:
|
|
data['recipient_name'] = hello_match.group(1).strip()
|
|
else:
|
|
data['recipient_name'] = 'Unknown'
|
|
|
|
# Extract email type (Overburning or Tracking Behind)
|
|
if 'Overburn' in text_content:
|
|
data['alert_type'] = 'Overburning'
|
|
elif 'Tracking Behind' in text_content or 'not on track' in text_content:
|
|
data['alert_type'] = 'Tracking Behind'
|
|
else:
|
|
data['alert_type'] = 'Unknown'
|
|
|
|
# Extract project information
|
|
# Pattern: OMG[numbers] - [Project Name]<optional link> percentages
|
|
project_pattern = r'(OMG\d+)\s+-\s+([^<\n]+?)(?:<[^>]+>)?\s+([\d.-]+%)\s+([\d.-]+%)\s+([\d.-]+%)\s+([\d.-]+%)'
|
|
project_match = re.search(project_pattern, text_content)
|
|
|
|
if project_match:
|
|
data['project_id'] = project_match.group(1).strip()
|
|
data['project_name'] = project_match.group(2).strip()
|
|
data['margin_target'] = project_match.group(3).strip()
|
|
data['margin_to_date'] = project_match.group(4).strip()
|
|
data['margin_at_completion'] = project_match.group(5).strip()
|
|
data['percent_complete'] = project_match.group(6).strip()
|
|
return data
|
|
|
|
return None
|
|
|
|
|
|
def parse_eml_file(file_path: Path) -> Optional[Dict[str, str]]:
|
|
"""Parse an .eml file and extract project data."""
|
|
try:
|
|
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
|
|
msg = email.message_from_file(f)
|
|
|
|
# Extract date from email headers
|
|
email_date = msg.get('Date', 'Unknown')
|
|
|
|
# Process multipart message
|
|
if msg.is_multipart():
|
|
for part in msg.walk():
|
|
content_type = part.get_content_type()
|
|
|
|
# Look for text/plain part with base64 encoding
|
|
if content_type == 'text/plain':
|
|
payload = part.get_payload()
|
|
if payload:
|
|
decoded_content = decode_base64_content(payload)
|
|
project_data = extract_project_data(decoded_content)
|
|
|
|
if project_data:
|
|
project_data['email_date'] = email_date
|
|
project_data['source_file'] = file_path.name
|
|
return project_data
|
|
|
|
return None
|
|
|
|
except Exception as e:
|
|
print(f"Error parsing {file_path.name}: {e}")
|
|
return None
|
|
|
|
|
|
def main():
|
|
"""Main function to process all email files."""
|
|
# Get current directory
|
|
current_dir = Path.cwd()
|
|
|
|
# Find all .eml files
|
|
eml_files = list(current_dir.glob('*.eml'))
|
|
print(f"Found {len(eml_files)} email files to process\n")
|
|
|
|
# Extract data from all emails
|
|
all_data = []
|
|
for eml_file in sorted(eml_files):
|
|
print(f"Processing: {eml_file.name}")
|
|
project_data = parse_eml_file(eml_file)
|
|
|
|
if project_data:
|
|
all_data.append(project_data)
|
|
print(f" ✓ Extracted: {project_data['project_id']} - {project_data['project_name']}")
|
|
else:
|
|
print(f" ✗ No data extracted")
|
|
|
|
print(f"\n{'='*60}")
|
|
print(f"Successfully extracted data from {len(all_data)} emails")
|
|
print(f"{'='*60}\n")
|
|
|
|
# Export to CSV
|
|
if all_data:
|
|
output_file = current_dir / 'project_margin_data.csv'
|
|
fieldnames = [
|
|
'source_file',
|
|
'email_date',
|
|
'recipient_name',
|
|
'alert_type',
|
|
'project_id',
|
|
'project_name',
|
|
'margin_target',
|
|
'margin_to_date',
|
|
'margin_at_completion',
|
|
'percent_complete'
|
|
]
|
|
|
|
with open(output_file, 'w', newline='', encoding='utf-8') as csvfile:
|
|
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
|
|
writer.writeheader()
|
|
writer.writerows(all_data)
|
|
|
|
print(f"Data exported to: {output_file}")
|
|
print(f"\nSummary:")
|
|
print(f" Total records: {len(all_data)}")
|
|
|
|
# Count by alert type
|
|
overburning = sum(1 for d in all_data if d['alert_type'] == 'Overburning')
|
|
tracking_behind = sum(1 for d in all_data if d['alert_type'] == 'Tracking Behind')
|
|
print(f" Overburning alerts: {overburning}")
|
|
print(f" Tracking Behind alerts: {tracking_behind}")
|
|
|
|
else:
|
|
print("No data extracted from any emails")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|