#!/usr/bin/env python3 """ Production Environment Smoke Test Tests mTLS V2 authentication against the test APIM environment Uses .env-prod configuration """ import sys import os import logging import requests from pathlib import Path from tempfile import NamedTemporaryFile from datetime import datetime # Add shared library to path sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) from shared.config_loader import load_config # Setup logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger('ProdSmokeTest') def pfx_to_pem(pfx_path, pfx_password): """ Convert PFX certificate to temporary PEM file for requests library Args: pfx_path: Path to PFX/P12 certificate file pfx_password: Certificate password Returns: str: Path to temporary PEM file """ try: from cryptography.hazmat.primitives.serialization import Encoding, PrivateFormat, NoEncryption from cryptography.hazmat.primitives.serialization.pkcs12 import load_key_and_certificates # Load PFX file pfx = Path(pfx_path).read_bytes() private_key, main_cert, add_certs = load_key_and_certificates( pfx, pfx_password.encode('utf-8') if pfx_password else None, None ) # Create temporary PEM file t_pem = NamedTemporaryFile(suffix='.pem', delete=False) with open(t_pem.name, 'wb') as pem_file: # Write private key pem_file.write(private_key.private_bytes( Encoding.PEM, PrivateFormat.PKCS8, NoEncryption() )) # Write main certificate pem_file.write(main_cert.public_bytes(Encoding.PEM)) # Write additional certificates in chain for cert in add_certs: pem_file.write(cert.public_bytes(Encoding.PEM)) return t_pem.name except Exception as e: logger.error("Failed to convert PFX to PEM: {}".format(str(e))) raise def test_mtls_token_endpoint(token_url, cert_path, cert_password): """ Test the mTLS token endpoint Args: token_url: OAuth token endpoint URL cert_path: Path to PFX certificate cert_password: Certificate password Returns: dict: Test results """ results = { 'success': False, 'http_status': None, 'access_token': None, 'expires_in': None, 'error': None, 'response_time_ms': None } pem_path = None try: logger.info("=" * 70) logger.info("PRODUCTION SMOKE TEST - mTLS V2 Authentication") logger.info("=" * 70) logger.info("") logger.info("Test Configuration:") logger.info(" Token Endpoint: {}".format(token_url)) logger.info(" Certificate: {}".format(cert_path)) logger.info(" Timestamp: {}".format(datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S UTC'))) logger.info("") # Check certificate exists if not os.path.exists(cert_path): raise Exception("Certificate file not found: {}".format(cert_path)) logger.info("✓ Certificate file found") # Convert PFX to PEM logger.info("Converting PFX to PEM...") pem_path = pfx_to_pem(cert_path, cert_password) logger.info("✓ Certificate converted successfully") # Make request logger.info("") logger.info("Requesting OAuth token via mTLS...") logger.info(" URL: {}".format(token_url)) start_time = datetime.now() response = requests.post( token_url, cert=pem_path, verify=True, timeout=30 ) end_time = datetime.now() response_time = (end_time - start_time).total_seconds() * 1000 results['response_time_ms'] = round(response_time, 2) results['http_status'] = response.status_code logger.info("") logger.info("Response received:") logger.info(" HTTP Status: {}".format(response.status_code)) logger.info(" Response Time: {} ms".format(results['response_time_ms'])) if response.status_code == 200: data = response.json() results['access_token'] = data.get('access_token', '')[:50] + '...' if data.get('access_token') else None results['expires_in'] = data.get('expires_in') results['success'] = True logger.info(" Token Type: {}".format(data.get('token_type', 'N/A'))) logger.info(" Expires In: {} seconds".format(results['expires_in'])) logger.info(" Access Token (preview): {}".format(results['access_token'])) logger.info("") logger.info("=" * 70) logger.info("✓ SMOKE TEST PASSED - Authentication Successful") logger.info("=" * 70) else: results['error'] = "HTTP {} - {}".format(response.status_code, response.text[:200]) logger.error("") logger.error("Response Body:") logger.error(response.text[:500]) logger.error("") logger.error("=" * 70) logger.error("✗ SMOKE TEST FAILED - Authentication Failed") logger.error("=" * 70) except Exception as e: results['error'] = str(e) logger.error("") logger.error("=" * 70) logger.error("✗ SMOKE TEST FAILED - Exception Occurred") logger.error("=" * 70) logger.error("Error: {}".format(str(e))) finally: # Cleanup temporary PEM file if pem_path and os.path.exists(pem_path): try: os.unlink(pem_path) except Exception: pass return results def main(): """Main smoke test execution""" try: # Load production configuration logger.info("Loading production configuration from .env-prod...") # Explicitly load .env-prod file from dotenv import load_dotenv load_dotenv('.env-prod', override=True) config = load_config('config/config.yaml') # Get mTLS configuration token_url = "https://prod-auth.app-api.ferrero.com/00003/mm/token" cert_path = config['dam'].get('mtls_cert_path') cert_password = config['dam'].get('mtls_cert_password') if not cert_path: logger.error("Certificate path not configured in .env-prod (DAM_MTLS_CERT_PATH)") sys.exit(1) if not cert_password: logger.error("Certificate password not configured in .env-prod (DAM_MTLS_CERT_PASSWORD)") sys.exit(1) # Run smoke test results = test_mtls_token_endpoint(token_url, cert_path, cert_password) # Exit with appropriate code if results['success']: logger.info("") logger.info("Production environment is ready for mTLS V2 authentication") sys.exit(0) else: logger.error("") logger.error("Production environment smoke test failed") logger.error("Error: {}".format(results['error'])) sys.exit(1) except Exception as e: logger.critical("Smoke test script error: {}".format(str(e))) sys.exit(1) if __name__ == '__main__': main()