#!/usr/bin/env python3 """ mTLS Authentication Debug Script Performs detailed testing with full request/response logging """ import sys import os import json sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) from shared.config_loader import load_config from shared.dam_client import pfx_to_pem import requests # Enable detailed logging import logging import http.client as http_client http_client.HTTPConnection.debuglevel = 1 logging.basicConfig() logging.getLogger().setLevel(logging.DEBUG) requests_log = logging.getLogger("requests.packages.urllib3") requests_log.setLevel(logging.DEBUG) requests_log.propagate = True print("=" * 80) print("mTLS AUTHENTICATION DEBUG TEST") print("=" * 80) print("") # Load config config = load_config('config/config.yaml') cert_path = config['dam']['mtls_cert_path'] cert_password = config['dam']['mtls_cert_password'] base_url = config['dam']['base_url'] print("Configuration:") print(" Base URL: {}".format(base_url)) print(" Cert Path: {}".format(cert_path)) print(" Cert Password: {}".format('*' * len(cert_password))) print("") # Test 1: Simple base endpoint print("=" * 80) print("TEST 1: GET /v6 (Base API Endpoint)") print("=" * 80) try: with pfx_to_pem(cert_path, cert_password) as cert: print("\nCertificate converted to PEM: {}".format(cert)) print("\nSending request...") print(" URL: {}/v6".format(base_url)) print(" Method: GET") print(" Certificate: {}".format(cert)) print(" Verify SSL: True") print("") response = requests.get( "{}/v6".format(base_url), cert=cert, verify=True, timeout=30 ) print("\nRESPONSE RECEIVED:") print(" Status Code: {}".format(response.status_code)) print(" Reason: {}".format(response.reason)) print("") print("Response Headers:") for key, value in response.headers.items(): print(" {}: {}".format(key, value)) print("") print("Response Cookies:") for key, value in response.cookies.items(): print(" {}: {}".format(key, value)) print("") print("Response Body:") try: body_json = response.json() print(json.dumps(body_json, indent=2)[:1000]) except: print(response.text[:1000]) except Exception as e: print("\n✗ ERROR: {}".format(str(e))) import traceback traceback.print_exc() # Test 2: Login endpoint print("\n" + "=" * 80) print("TEST 2: GET /otdsws/login (Session Login)") print("=" * 80) try: with pfx_to_pem(cert_path, cert_password) as cert: login_url = base_url.replace('/otmmapi', '/otdsws/login') print("\nSending request...") print(" URL: {}".format(login_url)) print(" Method: GET") print(" Certificate: {}".format(cert)) print("") response = requests.get( login_url, cert=cert, verify=True, timeout=30 ) print("\nRESPONSE RECEIVED:") print(" Status Code: {}".format(response.status_code)) print(" Reason: {}".format(response.reason)) print("") print("Response Headers:") for key, value in response.headers.items(): print(" {}: {}".format(key, value)) print("") print("Response Cookies:") for key, value in response.cookies.items(): print(" {}: {}".format(key, value)) print("") print("Response Body:") print(response.text[:1000]) except Exception as e: print("\n✗ ERROR: {}".format(str(e))) import traceback traceback.print_exc() # Test 3: Search campaigns with session print("\n" + "=" * 80) print("TEST 3: POST /v6/search/text (Campaign Search)") print("=" * 80) try: with pfx_to_pem(cert_path, cert_password) as cert: import urllib.parse search_condition = { "search_condition_list": { "search_condition": [ { "type": "com.artesia.search.SearchScalarCondition", "metadata_field_id": "ARTESIA.FIELD.CONTAINER TYPE NAME", "relational_operator_id": "ARTESIA.OPERATOR.CHAR.CONTAINS", "value": "GLOBALCAMPAING", "left_paren": "(", "right_paren": ")" }, { "type": "com.artesia.search.SearchScalarCondition", "metadata_field_id": "FERRERO.FIELD.CAMPAIGN TYPE", "relational_operator_id": "ARTESIA.OPERATOR.CHAR.CONTAINS", "value": "Local Adaptation", "relational_operator": "and" } ] } } search_condition_str = json.dumps(search_condition) search_condition_encoded = urllib.parse.quote(search_condition_str) url = "{}/v6/search/text?load_type=metadata&search_config_id=18&search_condition_list={}".format( base_url, search_condition_encoded ) print("\nSending request...") print(" URL: {}...".format(url[:100])) print(" Method: GET") print(" Certificate: {}".format(cert)) print("") # Create session to maintain cookies session = requests.Session() session.cert = cert session.verify = True # Try to get login first login_url = base_url.replace('/otmmapi', '/otdsws/login') print("First, attempting login...") login_response = session.get(login_url, timeout=30) print(" Login Status: {}".format(login_response.status_code)) if login_response.cookies: print(" Cookies from login:") for key, value in login_response.cookies.items(): print(" {}: {}".format(key, value[:50] if len(value) > 50 else value)) print("\nNow searching campaigns with session...") response = session.get(url, headers={'Accept': 'application/json'}, timeout=30) print("\nRESPONSE RECEIVED:") print(" Status Code: {}".format(response.status_code)) print(" Reason: {}".format(response.reason)) print("") print("Response Headers:") for key, value in response.headers.items(): print(" {}: {}".format(key, value)) print("") print("Response Cookies:") for key, value in response.cookies.items(): print(" {}: {}".format(key, value)) print("") print("Response Body:") try: body_json = response.json() print(json.dumps(body_json, indent=2)[:2000]) except: print(response.text[:2000]) except Exception as e: print("\n✗ ERROR: {}".format(str(e))) import traceback traceback.print_exc() print("\n" + "=" * 80) print("DEBUG TEST COMPLETE") print("=" * 80)