ferrero-opentext/Python-Version/scripts/test-prod-smoke-test.py

226 lines
7.5 KiB
Python

#!/usr/bin/env python3
"""
Production Environment Smoke Test
Tests mTLS V2 authentication against the test APIM environment
Uses .env-prod configuration
"""
import sys
import os
import logging
import requests
from pathlib import Path
from tempfile import NamedTemporaryFile
from datetime import datetime
# Add shared library to path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
from shared.config_loader import load_config
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('ProdSmokeTest')
def pfx_to_pem(pfx_path, pfx_password):
"""
Convert PFX certificate to temporary PEM file for requests library
Args:
pfx_path: Path to PFX/P12 certificate file
pfx_password: Certificate password
Returns:
str: Path to temporary PEM file
"""
try:
from cryptography.hazmat.primitives.serialization import Encoding, PrivateFormat, NoEncryption
from cryptography.hazmat.primitives.serialization.pkcs12 import load_key_and_certificates
# Load PFX file
pfx = Path(pfx_path).read_bytes()
private_key, main_cert, add_certs = load_key_and_certificates(
pfx,
pfx_password.encode('utf-8') if pfx_password else None,
None
)
# Create temporary PEM file
t_pem = NamedTemporaryFile(suffix='.pem', delete=False)
with open(t_pem.name, 'wb') as pem_file:
# Write private key
pem_file.write(private_key.private_bytes(
Encoding.PEM,
PrivateFormat.PKCS8,
NoEncryption()
))
# Write main certificate
pem_file.write(main_cert.public_bytes(Encoding.PEM))
# Write additional certificates in chain
for cert in add_certs:
pem_file.write(cert.public_bytes(Encoding.PEM))
return t_pem.name
except Exception as e:
logger.error("Failed to convert PFX to PEM: {}".format(str(e)))
raise
def test_mtls_token_endpoint(token_url, cert_path, cert_password):
"""
Test the mTLS token endpoint
Args:
token_url: OAuth token endpoint URL
cert_path: Path to PFX certificate
cert_password: Certificate password
Returns:
dict: Test results
"""
results = {
'success': False,
'http_status': None,
'access_token': None,
'expires_in': None,
'error': None,
'response_time_ms': None
}
pem_path = None
try:
logger.info("=" * 70)
logger.info("PRODUCTION SMOKE TEST - mTLS V2 Authentication")
logger.info("=" * 70)
logger.info("")
logger.info("Test Configuration:")
logger.info(" Token Endpoint: {}".format(token_url))
logger.info(" Certificate: {}".format(cert_path))
logger.info(" Timestamp: {}".format(datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S UTC')))
logger.info("")
# Check certificate exists
if not os.path.exists(cert_path):
raise Exception("Certificate file not found: {}".format(cert_path))
logger.info("✓ Certificate file found")
# Convert PFX to PEM
logger.info("Converting PFX to PEM...")
pem_path = pfx_to_pem(cert_path, cert_password)
logger.info("✓ Certificate converted successfully")
# Make request
logger.info("")
logger.info("Requesting OAuth token via mTLS...")
logger.info(" URL: {}".format(token_url))
start_time = datetime.now()
response = requests.post(
token_url,
cert=pem_path,
verify=True,
timeout=30
)
end_time = datetime.now()
response_time = (end_time - start_time).total_seconds() * 1000
results['response_time_ms'] = round(response_time, 2)
results['http_status'] = response.status_code
logger.info("")
logger.info("Response received:")
logger.info(" HTTP Status: {}".format(response.status_code))
logger.info(" Response Time: {} ms".format(results['response_time_ms']))
if response.status_code == 200:
data = response.json()
results['access_token'] = data.get('access_token', '')[:50] + '...' if data.get('access_token') else None
results['expires_in'] = data.get('expires_in')
results['success'] = True
logger.info(" Token Type: {}".format(data.get('token_type', 'N/A')))
logger.info(" Expires In: {} seconds".format(results['expires_in']))
logger.info(" Access Token (preview): {}".format(results['access_token']))
logger.info("")
logger.info("=" * 70)
logger.info("✓ SMOKE TEST PASSED - Authentication Successful")
logger.info("=" * 70)
else:
results['error'] = "HTTP {} - {}".format(response.status_code, response.text[:200])
logger.error("")
logger.error("Response Body:")
logger.error(response.text[:500])
logger.error("")
logger.error("=" * 70)
logger.error("✗ SMOKE TEST FAILED - Authentication Failed")
logger.error("=" * 70)
except Exception as e:
results['error'] = str(e)
logger.error("")
logger.error("=" * 70)
logger.error("✗ SMOKE TEST FAILED - Exception Occurred")
logger.error("=" * 70)
logger.error("Error: {}".format(str(e)))
finally:
# Cleanup temporary PEM file
if pem_path and os.path.exists(pem_path):
try:
os.unlink(pem_path)
except Exception:
pass
return results
def main():
"""Main smoke test execution"""
try:
# Load production configuration
logger.info("Loading production configuration from .env-prod...")
# Explicitly load .env-prod file
from dotenv import load_dotenv
load_dotenv('.env-prod', override=True)
config = load_config('config/config.yaml')
# Get mTLS configuration
token_url = "https://prod-auth.app-api.ferrero.com/00003/mm/token"
cert_path = config['dam'].get('mtls_cert_path')
cert_password = config['dam'].get('mtls_cert_password')
if not cert_path:
logger.error("Certificate path not configured in .env-prod (DAM_MTLS_CERT_PATH)")
sys.exit(1)
if not cert_password:
logger.error("Certificate password not configured in .env-prod (DAM_MTLS_CERT_PASSWORD)")
sys.exit(1)
# Run smoke test
results = test_mtls_token_endpoint(token_url, cert_path, cert_password)
# Exit with appropriate code
if results['success']:
logger.info("")
logger.info("Production environment is ready for mTLS V2 authentication")
sys.exit(0)
else:
logger.error("")
logger.error("Production environment smoke test failed")
logger.error("Error: {}".format(results['error']))
sys.exit(1)
except Exception as e:
logger.critical("Smoke test script error: {}".format(str(e)))
sys.exit(1)
if __name__ == '__main__':
main()