226 lines
7.5 KiB
Python
226 lines
7.5 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Production Environment Smoke Test
|
|
Tests mTLS V2 authentication against the test APIM environment
|
|
Uses .env-prod configuration
|
|
"""
|
|
|
|
import sys
|
|
import os
|
|
import logging
|
|
import requests
|
|
from pathlib import Path
|
|
from tempfile import NamedTemporaryFile
|
|
from datetime import datetime
|
|
|
|
# Add shared library to path
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
|
|
|
|
from shared.config_loader import load_config
|
|
|
|
# Setup logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
|
)
|
|
logger = logging.getLogger('ProdSmokeTest')
|
|
|
|
def pfx_to_pem(pfx_path, pfx_password):
|
|
"""
|
|
Convert PFX certificate to temporary PEM file for requests library
|
|
|
|
Args:
|
|
pfx_path: Path to PFX/P12 certificate file
|
|
pfx_password: Certificate password
|
|
|
|
Returns:
|
|
str: Path to temporary PEM file
|
|
"""
|
|
try:
|
|
from cryptography.hazmat.primitives.serialization import Encoding, PrivateFormat, NoEncryption
|
|
from cryptography.hazmat.primitives.serialization.pkcs12 import load_key_and_certificates
|
|
|
|
# Load PFX file
|
|
pfx = Path(pfx_path).read_bytes()
|
|
private_key, main_cert, add_certs = load_key_and_certificates(
|
|
pfx,
|
|
pfx_password.encode('utf-8') if pfx_password else None,
|
|
None
|
|
)
|
|
|
|
# Create temporary PEM file
|
|
t_pem = NamedTemporaryFile(suffix='.pem', delete=False)
|
|
with open(t_pem.name, 'wb') as pem_file:
|
|
# Write private key
|
|
pem_file.write(private_key.private_bytes(
|
|
Encoding.PEM,
|
|
PrivateFormat.PKCS8,
|
|
NoEncryption()
|
|
))
|
|
# Write main certificate
|
|
pem_file.write(main_cert.public_bytes(Encoding.PEM))
|
|
# Write additional certificates in chain
|
|
for cert in add_certs:
|
|
pem_file.write(cert.public_bytes(Encoding.PEM))
|
|
|
|
return t_pem.name
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to convert PFX to PEM: {}".format(str(e)))
|
|
raise
|
|
|
|
def test_mtls_token_endpoint(token_url, cert_path, cert_password):
|
|
"""
|
|
Test the mTLS token endpoint
|
|
|
|
Args:
|
|
token_url: OAuth token endpoint URL
|
|
cert_path: Path to PFX certificate
|
|
cert_password: Certificate password
|
|
|
|
Returns:
|
|
dict: Test results
|
|
"""
|
|
results = {
|
|
'success': False,
|
|
'http_status': None,
|
|
'access_token': None,
|
|
'expires_in': None,
|
|
'error': None,
|
|
'response_time_ms': None
|
|
}
|
|
|
|
pem_path = None
|
|
|
|
try:
|
|
logger.info("=" * 70)
|
|
logger.info("PRODUCTION SMOKE TEST - mTLS V2 Authentication")
|
|
logger.info("=" * 70)
|
|
logger.info("")
|
|
logger.info("Test Configuration:")
|
|
logger.info(" Token Endpoint: {}".format(token_url))
|
|
logger.info(" Certificate: {}".format(cert_path))
|
|
logger.info(" Timestamp: {}".format(datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S UTC')))
|
|
logger.info("")
|
|
|
|
# Check certificate exists
|
|
if not os.path.exists(cert_path):
|
|
raise Exception("Certificate file not found: {}".format(cert_path))
|
|
|
|
logger.info("✓ Certificate file found")
|
|
|
|
# Convert PFX to PEM
|
|
logger.info("Converting PFX to PEM...")
|
|
pem_path = pfx_to_pem(cert_path, cert_password)
|
|
logger.info("✓ Certificate converted successfully")
|
|
|
|
# Make request
|
|
logger.info("")
|
|
logger.info("Requesting OAuth token via mTLS...")
|
|
logger.info(" URL: {}".format(token_url))
|
|
|
|
start_time = datetime.now()
|
|
|
|
response = requests.post(
|
|
token_url,
|
|
cert=pem_path,
|
|
verify=True,
|
|
timeout=30
|
|
)
|
|
|
|
end_time = datetime.now()
|
|
response_time = (end_time - start_time).total_seconds() * 1000
|
|
results['response_time_ms'] = round(response_time, 2)
|
|
results['http_status'] = response.status_code
|
|
|
|
logger.info("")
|
|
logger.info("Response received:")
|
|
logger.info(" HTTP Status: {}".format(response.status_code))
|
|
logger.info(" Response Time: {} ms".format(results['response_time_ms']))
|
|
|
|
if response.status_code == 200:
|
|
data = response.json()
|
|
results['access_token'] = data.get('access_token', '')[:50] + '...' if data.get('access_token') else None
|
|
results['expires_in'] = data.get('expires_in')
|
|
results['success'] = True
|
|
|
|
logger.info(" Token Type: {}".format(data.get('token_type', 'N/A')))
|
|
logger.info(" Expires In: {} seconds".format(results['expires_in']))
|
|
logger.info(" Access Token (preview): {}".format(results['access_token']))
|
|
logger.info("")
|
|
logger.info("=" * 70)
|
|
logger.info("✓ SMOKE TEST PASSED - Authentication Successful")
|
|
logger.info("=" * 70)
|
|
else:
|
|
results['error'] = "HTTP {} - {}".format(response.status_code, response.text[:200])
|
|
logger.error("")
|
|
logger.error("Response Body:")
|
|
logger.error(response.text[:500])
|
|
logger.error("")
|
|
logger.error("=" * 70)
|
|
logger.error("✗ SMOKE TEST FAILED - Authentication Failed")
|
|
logger.error("=" * 70)
|
|
|
|
except Exception as e:
|
|
results['error'] = str(e)
|
|
logger.error("")
|
|
logger.error("=" * 70)
|
|
logger.error("✗ SMOKE TEST FAILED - Exception Occurred")
|
|
logger.error("=" * 70)
|
|
logger.error("Error: {}".format(str(e)))
|
|
|
|
finally:
|
|
# Cleanup temporary PEM file
|
|
if pem_path and os.path.exists(pem_path):
|
|
try:
|
|
os.unlink(pem_path)
|
|
except Exception:
|
|
pass
|
|
|
|
return results
|
|
|
|
def main():
|
|
"""Main smoke test execution"""
|
|
try:
|
|
# Load production configuration
|
|
logger.info("Loading production configuration from .env-prod...")
|
|
|
|
# Explicitly load .env-prod file
|
|
from dotenv import load_dotenv
|
|
load_dotenv('.env-prod', override=True)
|
|
|
|
config = load_config('config/config.yaml')
|
|
|
|
# Get mTLS configuration
|
|
token_url = "https://prod-auth.app-api.ferrero.com/00003/mm/token"
|
|
cert_path = config['dam'].get('mtls_cert_path')
|
|
cert_password = config['dam'].get('mtls_cert_password')
|
|
|
|
if not cert_path:
|
|
logger.error("Certificate path not configured in .env-prod (DAM_MTLS_CERT_PATH)")
|
|
sys.exit(1)
|
|
|
|
if not cert_password:
|
|
logger.error("Certificate password not configured in .env-prod (DAM_MTLS_CERT_PASSWORD)")
|
|
sys.exit(1)
|
|
|
|
# Run smoke test
|
|
results = test_mtls_token_endpoint(token_url, cert_path, cert_password)
|
|
|
|
# Exit with appropriate code
|
|
if results['success']:
|
|
logger.info("")
|
|
logger.info("Production environment is ready for mTLS V2 authentication")
|
|
sys.exit(0)
|
|
else:
|
|
logger.error("")
|
|
logger.error("Production environment smoke test failed")
|
|
logger.error("Error: {}".format(results['error']))
|
|
sys.exit(1)
|
|
|
|
except Exception as e:
|
|
logger.critical("Smoke test script error: {}".format(str(e)))
|
|
sys.exit(1)
|
|
|
|
if __name__ == '__main__':
|
|
main()
|