261 lines
8.6 KiB
Python
261 lines
8.6 KiB
Python
#!/usr/bin/env python
|
|
"""
|
|
Test email tracking functionality
|
|
Run from backend directory with venv activated:
|
|
cd backend
|
|
source venv/bin/activate # or venv\Scripts\activate on Windows
|
|
python test_tracking.py
|
|
"""
|
|
|
|
import sys
|
|
import os
|
|
from datetime import datetime
|
|
from pymongo import MongoClient
|
|
|
|
# Add backend directory to path
|
|
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
|
|
|
from config import Config
|
|
from services.tracking_service import generate_tracking_id, create_newsletter_tracking
|
|
from database import newsletter_sends_collection, link_clicks_collection
|
|
from app import app
|
|
|
|
print("\n" + "="*80)
|
|
print("Email Tracking System Tests")
|
|
print("="*80)
|
|
|
|
# Test counters
|
|
tests_passed = 0
|
|
tests_failed = 0
|
|
|
|
def test_result(test_name, passed, message=""):
|
|
"""Print test result"""
|
|
global tests_passed, tests_failed
|
|
if passed:
|
|
tests_passed += 1
|
|
print(f"✓ {test_name}")
|
|
if message:
|
|
print(f" {message}")
|
|
else:
|
|
tests_failed += 1
|
|
print(f"❌ {test_name}")
|
|
if message:
|
|
print(f" {message}")
|
|
|
|
|
|
# Test 1: Tracking ID Generation
|
|
print("\n" + "-"*80)
|
|
print("Test 1: Tracking ID Generation")
|
|
print("-"*80)
|
|
|
|
try:
|
|
tracking_id = generate_tracking_id()
|
|
|
|
# Check format (UUID4)
|
|
is_valid_uuid = len(tracking_id) == 36 and tracking_id.count('-') == 4
|
|
test_result("Generate tracking ID", is_valid_uuid, f"Generated ID: {tracking_id}")
|
|
|
|
# Check uniqueness
|
|
tracking_id2 = generate_tracking_id()
|
|
is_unique = tracking_id != tracking_id2
|
|
test_result("Tracking IDs are unique", is_unique, f"ID1: {tracking_id[:8]}... ID2: {tracking_id2[:8]}...")
|
|
|
|
except Exception as e:
|
|
test_result("Generate tracking ID", False, f"Error: {str(e)}")
|
|
|
|
|
|
# Test 2: Create Newsletter Tracking
|
|
print("\n" + "-"*80)
|
|
print("Test 2: Create Newsletter Tracking")
|
|
print("-"*80)
|
|
|
|
try:
|
|
# Clean up test data first
|
|
newsletter_sends_collection.delete_many({'newsletter_id': 'test-newsletter-001'})
|
|
link_clicks_collection.delete_many({'newsletter_id': 'test-newsletter-001'})
|
|
|
|
# Create tracking with article links
|
|
article_links = [
|
|
{'url': 'https://example.com/article1', 'title': 'Test Article 1'},
|
|
{'url': 'https://example.com/article2', 'title': 'Test Article 2'}
|
|
]
|
|
|
|
tracking_data = create_newsletter_tracking(
|
|
newsletter_id='test-newsletter-001',
|
|
subscriber_email='test@example.com',
|
|
article_links=article_links
|
|
)
|
|
|
|
# Verify return data structure
|
|
has_pixel_id = 'pixel_tracking_id' in tracking_data
|
|
test_result("Returns pixel tracking ID", has_pixel_id)
|
|
|
|
has_link_map = 'link_tracking_map' in tracking_data
|
|
test_result("Returns link tracking map", has_link_map)
|
|
|
|
correct_link_count = len(tracking_data.get('link_tracking_map', {})) == 2
|
|
test_result("Creates tracking for all links", correct_link_count,
|
|
f"Created {len(tracking_data.get('link_tracking_map', {}))} link tracking records")
|
|
|
|
# Verify database records
|
|
newsletter_record = newsletter_sends_collection.find_one({
|
|
'tracking_id': tracking_data['pixel_tracking_id']
|
|
})
|
|
|
|
record_exists = newsletter_record is not None
|
|
test_result("Creates newsletter_sends record", record_exists)
|
|
|
|
if newsletter_record:
|
|
correct_initial_state = (
|
|
newsletter_record['opened'] == False and
|
|
newsletter_record['open_count'] == 0 and
|
|
newsletter_record['first_opened_at'] is None
|
|
)
|
|
test_result("Newsletter record has correct initial state", correct_initial_state)
|
|
|
|
# Verify link click records
|
|
link_records = list(link_clicks_collection.find({'newsletter_id': 'test-newsletter-001'}))
|
|
correct_link_records = len(link_records) == 2
|
|
test_result("Creates link_clicks records", correct_link_records,
|
|
f"Created {len(link_records)} link click records")
|
|
|
|
except Exception as e:
|
|
test_result("Create newsletter tracking", False, f"Error: {str(e)}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
|
|
|
|
# Test 3: Tracking Pixel Endpoint
|
|
print("\n" + "-"*80)
|
|
print("Test 3: Tracking Pixel Endpoint")
|
|
print("-"*80)
|
|
|
|
try:
|
|
with app.test_client() as client:
|
|
# Test with valid tracking ID
|
|
pixel_tracking_id = tracking_data['pixel_tracking_id']
|
|
response = client.get(f'/api/track/pixel/{pixel_tracking_id}')
|
|
|
|
is_png = response.content_type == 'image/png'
|
|
test_result("Returns PNG for valid tracking_id", is_png,
|
|
f"Content-Type: {response.content_type}")
|
|
|
|
is_200 = response.status_code == 200
|
|
test_result("Returns 200 status", is_200, f"Status: {response.status_code}")
|
|
|
|
# Verify database was updated
|
|
updated_record = newsletter_sends_collection.find_one({
|
|
'tracking_id': pixel_tracking_id
|
|
})
|
|
|
|
was_logged = (
|
|
updated_record and
|
|
updated_record['opened'] == True and
|
|
updated_record['open_count'] == 1 and
|
|
updated_record['first_opened_at'] is not None
|
|
)
|
|
test_result("Logs email open event", was_logged,
|
|
f"Open count: {updated_record.get('open_count', 0) if updated_record else 0}")
|
|
|
|
# Test multiple opens
|
|
response2 = client.get(f'/api/track/pixel/{pixel_tracking_id}')
|
|
updated_record2 = newsletter_sends_collection.find_one({
|
|
'tracking_id': pixel_tracking_id
|
|
})
|
|
|
|
handles_multiple = (
|
|
updated_record2 and
|
|
updated_record2['open_count'] == 2 and
|
|
updated_record2['last_opened_at'] != updated_record2['first_opened_at']
|
|
)
|
|
test_result("Handles multiple opens", handles_multiple,
|
|
f"Open count: {updated_record2.get('open_count', 0) if updated_record2 else 0}")
|
|
|
|
# Test with invalid tracking ID
|
|
response3 = client.get('/api/track/pixel/invalid-tracking-id-12345')
|
|
|
|
fails_silently = response3.status_code == 200 and response3.content_type == 'image/png'
|
|
test_result("Returns PNG for invalid tracking_id (fails silently)", fails_silently)
|
|
|
|
except Exception as e:
|
|
test_result("Tracking pixel endpoint", False, f"Error: {str(e)}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
|
|
|
|
# Test 4: Link Redirect Endpoint
|
|
print("\n" + "-"*80)
|
|
print("Test 4: Link Redirect Endpoint")
|
|
print("-"*80)
|
|
|
|
try:
|
|
with app.test_client() as client:
|
|
# Test with valid tracking ID
|
|
article_url = 'https://example.com/article1'
|
|
link_tracking_id = tracking_data['link_tracking_map'][article_url]
|
|
|
|
response = client.get(f'/api/track/click/{link_tracking_id}', follow_redirects=False)
|
|
|
|
is_redirect = response.status_code == 302
|
|
test_result("Returns 302 redirect", is_redirect, f"Status: {response.status_code}")
|
|
|
|
correct_location = response.location == article_url
|
|
test_result("Redirects to correct URL", correct_location,
|
|
f"Location: {response.location}")
|
|
|
|
# Verify database was updated
|
|
click_record = link_clicks_collection.find_one({
|
|
'tracking_id': link_tracking_id
|
|
})
|
|
|
|
was_logged = (
|
|
click_record and
|
|
click_record['clicked'] == True and
|
|
click_record['clicked_at'] is not None
|
|
)
|
|
test_result("Logs click event", was_logged)
|
|
|
|
# Test with invalid tracking ID
|
|
response2 = client.get('/api/track/click/invalid-tracking-id-12345', follow_redirects=False)
|
|
|
|
redirects_on_invalid = response2.status_code == 302
|
|
test_result("Redirects on invalid tracking_id", redirects_on_invalid,
|
|
f"Redirects to: {response2.location}")
|
|
|
|
except Exception as e:
|
|
test_result("Link redirect endpoint", False, f"Error: {str(e)}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
|
|
|
|
# Clean up test data
|
|
print("\n" + "-"*80)
|
|
print("Cleaning up test data...")
|
|
print("-"*80)
|
|
|
|
try:
|
|
newsletter_sends_collection.delete_many({'newsletter_id': 'test-newsletter-001'})
|
|
link_clicks_collection.delete_many({'newsletter_id': 'test-newsletter-001'})
|
|
print("✓ Test data cleaned up")
|
|
except Exception as e:
|
|
print(f"⚠ Error cleaning up: {str(e)}")
|
|
|
|
|
|
# Summary
|
|
print("\n" + "="*80)
|
|
print("TEST SUMMARY")
|
|
print("="*80)
|
|
print(f"Total tests: {tests_passed + tests_failed}")
|
|
print(f"✓ Passed: {tests_passed}")
|
|
print(f"❌ Failed: {tests_failed}")
|
|
|
|
if tests_failed == 0:
|
|
print("\n🎉 All tests passed!")
|
|
else:
|
|
print(f"\n⚠ {tests_failed} test(s) failed")
|
|
|
|
print("="*80 + "\n")
|
|
|
|
# Exit with appropriate code
|
|
sys.exit(0 if tests_failed == 0 else 1)
|