ferrero-opentext/Python-Version/tests/test-a4-box-upload.py

94 lines
3.1 KiB
Python

import unittest
from unittest.mock import MagicMock, patch
import os
import sys
# Add parent directory to path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
# Add scripts directory to path to find shared module
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../scripts'))
from scripts.a4_box_uploader import process_campaign
class TestA4BoxUpload(unittest.TestCase):
def setUp(self):
self.mock_dam = MagicMock()
self.mock_box = MagicMock()
self.mock_db = MagicMock()
self.mock_notifier = MagicMock()
self.mock_config = {
'box': {
'live_campaigns_folder_id': '123456789'
},
'notifications': {
'recipients': {'success': ['test@example.com']}
}
}
self.campaign = {
'asset_id': '123',
'campaign_name': 'Test Campaign',
'campaign_id': 'C001'
}
def test_process_campaign_success(self):
# Mock DB check - not processed yet
self.mock_db.check_campaign_processed.return_value = {'exists': False}
# Mock DB get live campaigns (should be called after update)
self.mock_db.get_all_live_campaigns.return_value = [
{'campaign_number': 'C002', 'campaign_name': 'Other Campaign'}
]
# Mock Box upload
self.mock_box.upload_file.return_value = {'file_id': '999', 'url': 'url'}
# Run
result = process_campaign(self.campaign, self.mock_dam, self.mock_box, self.mock_db, self.mock_notifier, self.mock_config)
# Verify
self.assertTrue(result['success'])
self.assertTrue(result['processed'])
# Verify DB update called with live_campaign='NO'
self.mock_db.record_campaign_status.assert_called_with(
campaign_id='123',
campaign_number='C001',
campaign_name='Test Campaign',
live_campaign='NO',
status='A4',
webhook_sent=True
)
# Verify CSV generation called (get_all_live_campaigns)
self.mock_db.get_all_live_campaigns.assert_called_once()
# Verify Box upload called
self.mock_box.upload_file.assert_called_once()
def test_already_processed(self):
# Mock DB check - already processed
self.mock_db.check_campaign_processed.return_value = {
'exists': True,
'webhook_sent': True,
'webhook_sent_at': '2025-01-01',
'status': 'A4',
'live_campaign': 'NO'
}
# Run
result = process_campaign(self.campaign, self.mock_dam, self.mock_box, self.mock_db, self.mock_notifier, self.mock_config)
# Verify
self.assertTrue(result['success'])
self.assertFalse(result['processed'])
self.assertTrue(result['already_processed'])
# Verify DB update NOT called
self.mock_db.record_campaign_status.assert_not_called()
# Verify Box upload NOT called
self.mock_box.upload_file.assert_not_called()
if __name__ == '__main__':
unittest.main()