tom-l-pm-dashboard/extract_email_data.py
DJP a6db845054 Initial commit - PM dashboard with CSV and email versions
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-25 12:01:21 -05:00

164 lines
5.4 KiB
Python

#!/usr/bin/env python3
"""
Extract project margin data from email files.
"""
import email
import base64
import re
import csv
from pathlib import Path
from typing import Dict, List, Optional
def decode_base64_content(encoded_content: str) -> str:
"""Decode base64 encoded email content."""
try:
decoded_bytes = base64.b64decode(encoded_content)
return decoded_bytes.decode('utf-8', errors='ignore')
except Exception as e:
print(f"Error decoding base64: {e}")
return ""
def extract_project_data(text_content: str) -> Optional[Dict[str, str]]:
"""
Extract project data from the decoded email text.
Expected format after "Hello [Name],":
PROJECT MARGIN
TARGET MARGIN
TO DATE MARGIN AT
COMPLETION %
COMPLETE
OMG2141622 - Hellmann's Ranch Poland Adapt<link> 36.02% -77.59% 25.07% 25%
"""
data = {}
# Extract recipient name from "Hello [Name],"
hello_match = re.search(r'Hello\s+([^,]+),', text_content)
if hello_match:
data['recipient_name'] = hello_match.group(1).strip()
else:
data['recipient_name'] = 'Unknown'
# Extract email type (Overburning or Tracking Behind)
if 'Overburn' in text_content:
data['alert_type'] = 'Overburning'
elif 'Tracking Behind' in text_content or 'not on track' in text_content:
data['alert_type'] = 'Tracking Behind'
else:
data['alert_type'] = 'Unknown'
# Extract project information
# Pattern: OMG[numbers] - [Project Name]<optional link> percentages
project_pattern = r'(OMG\d+)\s+-\s+([^<\n]+?)(?:<[^>]+>)?\s+([\d.-]+%)\s+([\d.-]+%)\s+([\d.-]+%)\s+([\d.-]+%)'
project_match = re.search(project_pattern, text_content)
if project_match:
data['project_id'] = project_match.group(1).strip()
data['project_name'] = project_match.group(2).strip()
data['margin_target'] = project_match.group(3).strip()
data['margin_to_date'] = project_match.group(4).strip()
data['margin_at_completion'] = project_match.group(5).strip()
data['percent_complete'] = project_match.group(6).strip()
return data
return None
def parse_eml_file(file_path: Path) -> Optional[Dict[str, str]]:
"""Parse an .eml file and extract project data."""
try:
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
msg = email.message_from_file(f)
# Extract date from email headers
email_date = msg.get('Date', 'Unknown')
# Process multipart message
if msg.is_multipart():
for part in msg.walk():
content_type = part.get_content_type()
# Look for text/plain part with base64 encoding
if content_type == 'text/plain':
payload = part.get_payload()
if payload:
decoded_content = decode_base64_content(payload)
project_data = extract_project_data(decoded_content)
if project_data:
project_data['email_date'] = email_date
project_data['source_file'] = file_path.name
return project_data
return None
except Exception as e:
print(f"Error parsing {file_path.name}: {e}")
return None
def main():
"""Main function to process all email files."""
# Get current directory
current_dir = Path.cwd()
# Find all .eml files
eml_files = list(current_dir.glob('*.eml'))
print(f"Found {len(eml_files)} email files to process\n")
# Extract data from all emails
all_data = []
for eml_file in sorted(eml_files):
print(f"Processing: {eml_file.name}")
project_data = parse_eml_file(eml_file)
if project_data:
all_data.append(project_data)
print(f" ✓ Extracted: {project_data['project_id']} - {project_data['project_name']}")
else:
print(f" ✗ No data extracted")
print(f"\n{'='*60}")
print(f"Successfully extracted data from {len(all_data)} emails")
print(f"{'='*60}\n")
# Export to CSV
if all_data:
output_file = current_dir / 'project_margin_data.csv'
fieldnames = [
'source_file',
'email_date',
'recipient_name',
'alert_type',
'project_id',
'project_name',
'margin_target',
'margin_to_date',
'margin_at_completion',
'percent_complete'
]
with open(output_file, 'w', newline='', encoding='utf-8') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(all_data)
print(f"Data exported to: {output_file}")
print(f"\nSummary:")
print(f" Total records: {len(all_data)}")
# Count by alert type
overburning = sum(1 for d in all_data if d['alert_type'] == 'Overburning')
tracking_behind = sum(1 for d in all_data if d['alert_type'] == 'Tracking Behind')
print(f" Overburning alerts: {overburning}")
print(f" Tracking Behind alerts: {tracking_behind}")
else:
print("No data extracted from any emails")
if __name__ == '__main__':
main()