This commit is contained in:
2025-11-11 14:09:21 +01:00
parent bcd0a10576
commit 1075a91eac
57 changed files with 5598 additions and 1366 deletions

View File

@@ -0,0 +1,451 @@
#!/usr/bin/env python
"""
Test analytics functionality for email tracking
Run from backend directory with venv activated:
cd backend
source venv/bin/activate # or venv\Scripts\activate on Windows
python test_analytics.py
"""
import sys
import os
from datetime import datetime, timedelta
# Add backend directory to path
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from services.analytics_service import (
get_open_rate,
get_click_rate,
get_newsletter_metrics,
get_article_performance,
get_subscriber_activity_status,
update_subscriber_activity_statuses
)
from database import (
newsletter_sends_collection,
link_clicks_collection,
subscriber_activity_collection
)
from app import app
print("\n" + "="*80)
print("Analytics Service Tests")
print("="*80)
# Test counters
tests_passed = 0
tests_failed = 0
def test_result(test_name, passed, message=""):
"""Print test result"""
global tests_passed, tests_failed
if passed:
tests_passed += 1
print(f"{test_name}")
if message:
print(f" {message}")
else:
tests_failed += 1
print(f"{test_name}")
if message:
print(f" {message}")
# Setup test data
print("\n" + "-"*80)
print("Setting up test data...")
print("-"*80)
try:
# Clean up existing test data
newsletter_sends_collection.delete_many({'newsletter_id': {'$regex': '^test-analytics-'}})
link_clicks_collection.delete_many({'newsletter_id': {'$regex': '^test-analytics-'}})
subscriber_activity_collection.delete_many({'email': {'$regex': '^test-analytics-'}})
# Create test newsletter sends
test_newsletter_id = 'test-analytics-newsletter-001'
# Create 10 newsletter sends: 7 opened, 3 not opened
for i in range(10):
opened = i < 7 # First 7 are opened
doc = {
'newsletter_id': test_newsletter_id,
'subscriber_email': f'test-analytics-user{i}@example.com',
'tracking_id': f'test-pixel-{i}',
'sent_at': datetime.utcnow(),
'opened': opened,
'first_opened_at': datetime.utcnow() if opened else None,
'last_opened_at': datetime.utcnow() if opened else None,
'open_count': 1 if opened else 0,
'created_at': datetime.utcnow()
}
newsletter_sends_collection.insert_one(doc)
# Create test link clicks for an article
test_article_url = 'https://example.com/test-analytics-article'
# Create 10 link tracking records: 4 clicked, 6 not clicked
for i in range(10):
clicked = i < 4 # First 4 are clicked
doc = {
'tracking_id': f'test-link-{i}',
'newsletter_id': test_newsletter_id,
'subscriber_email': f'test-analytics-user{i}@example.com',
'article_url': test_article_url,
'article_title': 'Test Analytics Article',
'clicked': clicked,
'clicked_at': datetime.utcnow() if clicked else None,
'user_agent': 'Test Agent' if clicked else None,
'created_at': datetime.utcnow()
}
link_clicks_collection.insert_one(doc)
print("✓ Test data created")
except Exception as e:
print(f"❌ Error setting up test data: {str(e)}")
import traceback
traceback.print_exc()
sys.exit(1)
# Test 1: Open Rate Calculation
print("\n" + "-"*80)
print("Test 1: Open Rate Calculation")
print("-"*80)
try:
open_rate = get_open_rate(test_newsletter_id)
# Expected: 7 out of 10 = 70%
is_correct = open_rate == 70.0
test_result("Calculate open rate", is_correct, f"Open rate: {open_rate}% (expected 70%)")
# Test with non-existent newsletter
open_rate_empty = get_open_rate('non-existent-newsletter')
handles_empty = open_rate_empty == 0.0
test_result("Handle non-existent newsletter", handles_empty,
f"Open rate: {open_rate_empty}% (expected 0%)")
except Exception as e:
test_result("Open rate calculation", False, f"Error: {str(e)}")
import traceback
traceback.print_exc()
# Test 2: Click Rate Calculation
print("\n" + "-"*80)
print("Test 2: Click Rate Calculation")
print("-"*80)
try:
click_rate = get_click_rate(test_article_url)
# Expected: 4 out of 10 = 40%
is_correct = click_rate == 40.0
test_result("Calculate click rate", is_correct, f"Click rate: {click_rate}% (expected 40%)")
# Test with non-existent article
click_rate_empty = get_click_rate('https://example.com/non-existent')
handles_empty = click_rate_empty == 0.0
test_result("Handle non-existent article", handles_empty,
f"Click rate: {click_rate_empty}% (expected 0%)")
except Exception as e:
test_result("Click rate calculation", False, f"Error: {str(e)}")
import traceback
traceback.print_exc()
# Test 3: Newsletter Metrics
print("\n" + "-"*80)
print("Test 3: Newsletter Metrics")
print("-"*80)
try:
metrics = get_newsletter_metrics(test_newsletter_id)
# Verify all expected fields
has_all_fields = all(key in metrics for key in [
'newsletter_id', 'total_sent', 'total_opened', 'open_rate',
'total_clicks', 'unique_clickers', 'click_through_rate'
])
test_result("Returns all required fields", has_all_fields)
# Verify values
correct_sent = metrics['total_sent'] == 10
test_result("Correct total_sent", correct_sent, f"Total sent: {metrics['total_sent']}")
correct_opened = metrics['total_opened'] == 7
test_result("Correct total_opened", correct_opened, f"Total opened: {metrics['total_opened']}")
correct_open_rate = metrics['open_rate'] == 70.0
test_result("Correct open_rate", correct_open_rate, f"Open rate: {metrics['open_rate']}%")
correct_clicks = metrics['total_clicks'] == 4
test_result("Correct total_clicks", correct_clicks, f"Total clicks: {metrics['total_clicks']}")
correct_unique_clickers = metrics['unique_clickers'] == 4
test_result("Correct unique_clickers", correct_unique_clickers,
f"Unique clickers: {metrics['unique_clickers']}")
correct_ctr = metrics['click_through_rate'] == 40.0
test_result("Correct click_through_rate", correct_ctr,
f"CTR: {metrics['click_through_rate']}%")
except Exception as e:
test_result("Newsletter metrics", False, f"Error: {str(e)}")
import traceback
traceback.print_exc()
# Test 4: Article Performance
print("\n" + "-"*80)
print("Test 4: Article Performance")
print("-"*80)
try:
performance = get_article_performance(test_article_url)
# Verify all expected fields
has_all_fields = all(key in performance for key in [
'article_url', 'total_sent', 'total_clicks', 'click_rate',
'unique_clickers', 'newsletters'
])
test_result("Returns all required fields", has_all_fields)
# Verify values
correct_sent = performance['total_sent'] == 10
test_result("Correct total_sent", correct_sent, f"Total sent: {performance['total_sent']}")
correct_clicks = performance['total_clicks'] == 4
test_result("Correct total_clicks", correct_clicks, f"Total clicks: {performance['total_clicks']}")
correct_click_rate = performance['click_rate'] == 40.0
test_result("Correct click_rate", correct_click_rate, f"Click rate: {performance['click_rate']}%")
correct_unique = performance['unique_clickers'] == 4
test_result("Correct unique_clickers", correct_unique,
f"Unique clickers: {performance['unique_clickers']}")
has_newsletters = len(performance['newsletters']) > 0
test_result("Returns newsletter list", has_newsletters,
f"Newsletters: {performance['newsletters']}")
except Exception as e:
test_result("Article performance", False, f"Error: {str(e)}")
import traceback
traceback.print_exc()
# Test 5: Activity Status Classification
print("\n" + "-"*80)
print("Test 5: Activity Status Classification")
print("-"*80)
try:
# Create test data for activity classification
now = datetime.utcnow()
# Active user (opened 10 days ago)
newsletter_sends_collection.insert_one({
'newsletter_id': 'test-analytics-activity',
'subscriber_email': 'test-analytics-active@example.com',
'tracking_id': 'test-active-pixel',
'sent_at': now - timedelta(days=10),
'opened': True,
'first_opened_at': now - timedelta(days=10),
'last_opened_at': now - timedelta(days=10),
'open_count': 1,
'created_at': now - timedelta(days=10)
})
# Inactive user (opened 45 days ago)
newsletter_sends_collection.insert_one({
'newsletter_id': 'test-analytics-activity',
'subscriber_email': 'test-analytics-inactive@example.com',
'tracking_id': 'test-inactive-pixel',
'sent_at': now - timedelta(days=45),
'opened': True,
'first_opened_at': now - timedelta(days=45),
'last_opened_at': now - timedelta(days=45),
'open_count': 1,
'created_at': now - timedelta(days=45)
})
# Dormant user (opened 90 days ago)
newsletter_sends_collection.insert_one({
'newsletter_id': 'test-analytics-activity',
'subscriber_email': 'test-analytics-dormant@example.com',
'tracking_id': 'test-dormant-pixel',
'sent_at': now - timedelta(days=90),
'opened': True,
'first_opened_at': now - timedelta(days=90),
'last_opened_at': now - timedelta(days=90),
'open_count': 1,
'created_at': now - timedelta(days=90)
})
# New user (never opened)
newsletter_sends_collection.insert_one({
'newsletter_id': 'test-analytics-activity',
'subscriber_email': 'test-analytics-new@example.com',
'tracking_id': 'test-new-pixel',
'sent_at': now - timedelta(days=5),
'opened': False,
'first_opened_at': None,
'last_opened_at': None,
'open_count': 0,
'created_at': now - timedelta(days=5)
})
# Test classifications
active_status = get_subscriber_activity_status('test-analytics-active@example.com')
is_active = active_status == 'active'
test_result("Classify active user", is_active, f"Status: {active_status}")
inactive_status = get_subscriber_activity_status('test-analytics-inactive@example.com')
is_inactive = inactive_status == 'inactive'
test_result("Classify inactive user", is_inactive, f"Status: {inactive_status}")
dormant_status = get_subscriber_activity_status('test-analytics-dormant@example.com')
is_dormant = dormant_status == 'dormant'
test_result("Classify dormant user", is_dormant, f"Status: {dormant_status}")
new_status = get_subscriber_activity_status('test-analytics-new@example.com')
is_new = new_status == 'new'
test_result("Classify new user", is_new, f"Status: {new_status}")
except Exception as e:
test_result("Activity status classification", False, f"Error: {str(e)}")
import traceback
traceback.print_exc()
# Test 6: Batch Update Activity Statuses
print("\n" + "-"*80)
print("Test 6: Batch Update Activity Statuses")
print("-"*80)
try:
updated_count = update_subscriber_activity_statuses()
# Should update all test subscribers
has_updates = updated_count > 0
test_result("Updates subscriber records", has_updates,
f"Updated {updated_count} subscribers")
# Verify a record was created
activity_record = subscriber_activity_collection.find_one({
'email': 'test-analytics-active@example.com'
})
record_exists = activity_record is not None
test_result("Creates activity record", record_exists)
if activity_record:
has_required_fields = all(key in activity_record for key in [
'email', 'status', 'total_opens', 'total_clicks',
'newsletters_received', 'newsletters_opened', 'updated_at'
])
test_result("Activity record has required fields", has_required_fields)
correct_status = activity_record['status'] == 'active'
test_result("Activity record has correct status", correct_status,
f"Status: {activity_record['status']}")
except Exception as e:
test_result("Batch update activity statuses", False, f"Error: {str(e)}")
import traceback
traceback.print_exc()
# Test 7: Analytics API Endpoints
print("\n" + "-"*80)
print("Test 7: Analytics API Endpoints")
print("-"*80)
try:
with app.test_client() as client:
# Test newsletter analytics endpoint
response = client.get(f'/api/analytics/newsletter/{test_newsletter_id}')
is_200 = response.status_code == 200
test_result("Newsletter endpoint returns 200", is_200, f"Status: {response.status_code}")
if is_200:
data = response.get_json()
has_data = data is not None and 'open_rate' in data
test_result("Newsletter endpoint returns data", has_data)
# Test article analytics endpoint
response = client.get(f'/api/analytics/article/{test_article_url}')
is_200 = response.status_code == 200
test_result("Article endpoint returns 200", is_200, f"Status: {response.status_code}")
if is_200:
data = response.get_json()
has_data = data is not None and 'click_rate' in data
test_result("Article endpoint returns data", has_data)
# Test subscriber analytics endpoint
response = client.get('/api/analytics/subscriber/test-analytics-active@example.com')
is_200 = response.status_code == 200
test_result("Subscriber endpoint returns 200", is_200, f"Status: {response.status_code}")
if is_200:
data = response.get_json()
has_data = data is not None and 'status' in data
test_result("Subscriber endpoint returns data", has_data)
# Test update activity endpoint
response = client.post('/api/analytics/update-activity')
is_200 = response.status_code == 200
test_result("Update activity endpoint returns 200", is_200, f"Status: {response.status_code}")
if is_200:
data = response.get_json()
has_count = data is not None and 'updated_count' in data
test_result("Update activity endpoint returns count", has_count)
except Exception as e:
test_result("Analytics API endpoints", False, f"Error: {str(e)}")
import traceback
traceback.print_exc()
# Clean up test data
print("\n" + "-"*80)
print("Cleaning up test data...")
print("-"*80)
try:
newsletter_sends_collection.delete_many({'newsletter_id': {'$regex': '^test-analytics-'}})
link_clicks_collection.delete_many({'newsletter_id': {'$regex': '^test-analytics-'}})
subscriber_activity_collection.delete_many({'email': {'$regex': '^test-analytics-'}})
print("✓ Test data cleaned up")
except Exception as e:
print(f"⚠ Error cleaning up: {str(e)}")
# Summary
print("\n" + "="*80)
print("TEST SUMMARY")
print("="*80)
print(f"Total tests: {tests_passed + tests_failed}")
print(f"✓ Passed: {tests_passed}")
print(f"❌ Failed: {tests_failed}")
if tests_failed == 0:
print("\n🎉 All tests passed!")
else:
print(f"\n{tests_failed} test(s) failed")
print("="*80 + "\n")
# Exit with appropriate code
sys.exit(0 if tests_failed == 0 else 1)

View File

@@ -0,0 +1,389 @@
#!/usr/bin/env python
"""
Test privacy compliance features for email tracking
Run from backend directory with venv activated:
cd backend
source venv/bin/activate # or venv\Scripts\activate on Windows
python test_privacy.py
"""
import sys
import os
from datetime import datetime, timedelta
from pymongo import MongoClient
# Add backend directory to path
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from config import Config
from services.tracking_service import (
create_newsletter_tracking,
anonymize_old_tracking_data,
delete_subscriber_tracking_data
)
from database import (
newsletter_sends_collection,
link_clicks_collection,
subscriber_activity_collection,
subscribers_collection
)
from app import app
print("\n" + "="*80)
print("Privacy Compliance Tests")
print("="*80)
# Test counters
tests_passed = 0
tests_failed = 0
def test_result(test_name, passed, message=""):
"""Print test result"""
global tests_passed, tests_failed
if passed:
tests_passed += 1
print(f"{test_name}")
if message:
print(f" {message}")
else:
tests_failed += 1
print(f"{test_name}")
if message:
print(f" {message}")
# Setup: Clean up test data
print("\n" + "-"*80)
print("Setup: Cleaning test data")
print("-"*80)
test_newsletter_id = 'privacy-test-newsletter'
test_email = 'privacy-test@example.com'
test_email_opted_out = 'opted-out@example.com'
newsletter_sends_collection.delete_many({'newsletter_id': test_newsletter_id})
link_clicks_collection.delete_many({'newsletter_id': test_newsletter_id})
subscriber_activity_collection.delete_many({'email': {'$in': [test_email, test_email_opted_out]}})
subscribers_collection.delete_many({'email': {'$in': [test_email, test_email_opted_out]}})
print("✓ Test data cleaned")
# Test 1: Data Anonymization
print("\n" + "-"*80)
print("Test 1: Data Anonymization")
print("-"*80)
try:
# Create old tracking records (older than 90 days)
old_date = datetime.utcnow() - timedelta(days=100)
old_newsletter_doc = {
'newsletter_id': test_newsletter_id,
'subscriber_email': 'old-user@example.com',
'tracking_id': 'old-tracking-id-1',
'sent_at': old_date,
'opened': True,
'first_opened_at': old_date,
'last_opened_at': old_date,
'open_count': 3,
'created_at': old_date
}
newsletter_sends_collection.insert_one(old_newsletter_doc)
old_link_doc = {
'tracking_id': 'old-link-tracking-id-1',
'newsletter_id': test_newsletter_id,
'subscriber_email': 'old-user@example.com',
'article_url': 'https://example.com/old-article',
'article_title': 'Old Article',
'clicked': True,
'clicked_at': old_date,
'created_at': old_date
}
link_clicks_collection.insert_one(old_link_doc)
# Create recent tracking records (within 90 days)
recent_date = datetime.utcnow() - timedelta(days=30)
recent_newsletter_doc = {
'newsletter_id': test_newsletter_id,
'subscriber_email': 'recent-user@example.com',
'tracking_id': 'recent-tracking-id-1',
'sent_at': recent_date,
'opened': True,
'first_opened_at': recent_date,
'last_opened_at': recent_date,
'open_count': 1,
'created_at': recent_date
}
newsletter_sends_collection.insert_one(recent_newsletter_doc)
# Run anonymization
result = anonymize_old_tracking_data(retention_days=90)
# Check that old records were anonymized
old_newsletter_after = newsletter_sends_collection.find_one({'tracking_id': 'old-tracking-id-1'})
old_anonymized = old_newsletter_after and old_newsletter_after['subscriber_email'] == 'anonymized'
test_result("Anonymizes old newsletter records", old_anonymized,
f"Email: {old_newsletter_after.get('subscriber_email', 'N/A') if old_newsletter_after else 'N/A'}")
old_link_after = link_clicks_collection.find_one({'tracking_id': 'old-link-tracking-id-1'})
link_anonymized = old_link_after and old_link_after['subscriber_email'] == 'anonymized'
test_result("Anonymizes old link click records", link_anonymized,
f"Email: {old_link_after.get('subscriber_email', 'N/A') if old_link_after else 'N/A'}")
# Check that aggregated metrics are preserved
metrics_preserved = (
old_newsletter_after and
old_newsletter_after['open_count'] == 3 and
old_newsletter_after['opened'] == True
)
test_result("Preserves aggregated metrics", metrics_preserved,
f"Open count: {old_newsletter_after.get('open_count', 0) if old_newsletter_after else 0}")
# Check that recent records were NOT anonymized
recent_newsletter_after = newsletter_sends_collection.find_one({'tracking_id': 'recent-tracking-id-1'})
recent_not_anonymized = (
recent_newsletter_after and
recent_newsletter_after['subscriber_email'] == 'recent-user@example.com'
)
test_result("Does not anonymize recent records", recent_not_anonymized,
f"Email: {recent_newsletter_after.get('subscriber_email', 'N/A') if recent_newsletter_after else 'N/A'}")
# Check return counts
correct_counts = result['newsletter_sends_anonymized'] >= 1 and result['link_clicks_anonymized'] >= 1
test_result("Returns correct anonymization counts", correct_counts,
f"Newsletter: {result['newsletter_sends_anonymized']}, Links: {result['link_clicks_anonymized']}")
except Exception as e:
test_result("Data anonymization", False, f"Error: {str(e)}")
import traceback
traceback.print_exc()
# Test 2: User Data Deletion
print("\n" + "-"*80)
print("Test 2: User Data Deletion")
print("-"*80)
try:
# Create tracking records for a specific user
article_links = [
{'url': 'https://example.com/article1', 'title': 'Article 1'},
{'url': 'https://example.com/article2', 'title': 'Article 2'}
]
tracking_data = create_newsletter_tracking(
newsletter_id=test_newsletter_id,
subscriber_email=test_email,
article_links=article_links
)
# Create subscriber activity record
subscriber_activity_collection.insert_one({
'email': test_email,
'status': 'active',
'last_opened_at': datetime.utcnow(),
'total_opens': 5,
'total_clicks': 3
})
# Verify records exist
newsletter_count_before = newsletter_sends_collection.count_documents({'subscriber_email': test_email})
link_count_before = link_clicks_collection.count_documents({'subscriber_email': test_email})
activity_count_before = subscriber_activity_collection.count_documents({'email': test_email})
records_exist = newsletter_count_before > 0 and link_count_before > 0 and activity_count_before > 0
test_result("Creates test tracking records", records_exist,
f"Newsletter: {newsletter_count_before}, Links: {link_count_before}, Activity: {activity_count_before}")
# Delete all tracking data for the user
delete_result = delete_subscriber_tracking_data(test_email)
# Verify all records were deleted
newsletter_count_after = newsletter_sends_collection.count_documents({'subscriber_email': test_email})
link_count_after = link_clicks_collection.count_documents({'subscriber_email': test_email})
activity_count_after = subscriber_activity_collection.count_documents({'email': test_email})
all_deleted = newsletter_count_after == 0 and link_count_after == 0 and activity_count_after == 0
test_result("Deletes all tracking records", all_deleted,
f"Remaining - Newsletter: {newsletter_count_after}, Links: {link_count_after}, Activity: {activity_count_after}")
# Check return counts
correct_delete_counts = (
delete_result['newsletter_sends_deleted'] == newsletter_count_before and
delete_result['link_clicks_deleted'] == link_count_before and
delete_result['subscriber_activity_deleted'] == activity_count_before
)
test_result("Returns correct deletion counts", correct_delete_counts,
f"Deleted - Newsletter: {delete_result['newsletter_sends_deleted']}, Links: {delete_result['link_clicks_deleted']}, Activity: {delete_result['subscriber_activity_deleted']}")
except Exception as e:
test_result("User data deletion", False, f"Error: {str(e)}")
import traceback
traceback.print_exc()
# Test 3: Tracking Opt-Out
print("\n" + "-"*80)
print("Test 3: Tracking Opt-Out")
print("-"*80)
try:
# Create subscriber with tracking disabled
subscribers_collection.insert_one({
'email': test_email_opted_out,
'subscribed_at': datetime.utcnow(),
'tracking_enabled': False
})
# Try to create tracking for opted-out subscriber
article_links = [
{'url': 'https://example.com/article1', 'title': 'Article 1'}
]
tracking_data_opted_out = create_newsletter_tracking(
newsletter_id=test_newsletter_id,
subscriber_email=test_email_opted_out,
article_links=article_links
)
# Check that no tracking was created
no_pixel_id = tracking_data_opted_out['pixel_tracking_id'] is None
test_result("Does not create pixel tracking for opted-out users", no_pixel_id,
f"Pixel ID: {tracking_data_opted_out['pixel_tracking_id']}")
empty_link_map = len(tracking_data_opted_out['link_tracking_map']) == 0
test_result("Does not create link tracking for opted-out users", empty_link_map,
f"Link map size: {len(tracking_data_opted_out['link_tracking_map'])}")
tracking_disabled_flag = tracking_data_opted_out.get('tracking_enabled') == False
test_result("Returns tracking_enabled=False for opted-out users", tracking_disabled_flag)
# Verify no database records were created
newsletter_count = newsletter_sends_collection.count_documents({'subscriber_email': test_email_opted_out})
link_count = link_clicks_collection.count_documents({'subscriber_email': test_email_opted_out})
no_db_records = newsletter_count == 0 and link_count == 0
test_result("Does not create database records for opted-out users", no_db_records,
f"Newsletter records: {newsletter_count}, Link records: {link_count}")
# Test opt-in/opt-out endpoints
with app.test_client() as client:
# Create a subscriber with tracking enabled
subscribers_collection.insert_one({
'email': test_email,
'subscribed_at': datetime.utcnow(),
'tracking_enabled': True
})
# Opt out
response = client.post(f'/api/tracking/subscriber/{test_email}/opt-out')
opt_out_success = response.status_code == 200 and response.json.get('success') == True
test_result("Opt-out endpoint works", opt_out_success,
f"Status: {response.status_code}")
# Verify tracking is disabled
subscriber = subscribers_collection.find_one({'email': test_email})
tracking_disabled = subscriber and subscriber.get('tracking_enabled') == False
test_result("Opt-out disables tracking in database", tracking_disabled)
# Opt back in
response = client.post(f'/api/tracking/subscriber/{test_email}/opt-in')
opt_in_success = response.status_code == 200 and response.json.get('success') == True
test_result("Opt-in endpoint works", opt_in_success,
f"Status: {response.status_code}")
# Verify tracking is enabled
subscriber = subscribers_collection.find_one({'email': test_email})
tracking_enabled = subscriber and subscriber.get('tracking_enabled') == True
test_result("Opt-in enables tracking in database", tracking_enabled)
except Exception as e:
test_result("Tracking opt-out", False, f"Error: {str(e)}")
import traceback
traceback.print_exc()
# Test 4: Privacy API Endpoints
print("\n" + "-"*80)
print("Test 4: Privacy API Endpoints")
print("-"*80)
try:
with app.test_client() as client:
# Create test tracking data
article_links = [{'url': 'https://example.com/test', 'title': 'Test'}]
create_newsletter_tracking(
newsletter_id=test_newsletter_id,
subscriber_email='api-test@example.com',
article_links=article_links
)
# Test deletion endpoint
response = client.delete('/api/tracking/subscriber/api-test@example.com')
delete_endpoint_works = response.status_code == 200 and response.json.get('success') == True
test_result("Deletion endpoint returns success", delete_endpoint_works,
f"Status: {response.status_code}")
# Verify data was deleted
remaining_records = newsletter_sends_collection.count_documents({'subscriber_email': 'api-test@example.com'})
data_deleted = remaining_records == 0
test_result("Deletion endpoint removes data", data_deleted,
f"Remaining records: {remaining_records}")
# Test anonymization endpoint
response = client.post('/api/tracking/anonymize', json={'retention_days': 90})
anonymize_endpoint_works = response.status_code == 200 and response.json.get('success') == True
test_result("Anonymization endpoint returns success", anonymize_endpoint_works,
f"Status: {response.status_code}")
has_counts = 'anonymized_counts' in response.json
test_result("Anonymization endpoint returns counts", has_counts)
except Exception as e:
test_result("Privacy API endpoints", False, f"Error: {str(e)}")
import traceback
traceback.print_exc()
# Clean up test data
print("\n" + "-"*80)
print("Cleaning up test data...")
print("-"*80)
try:
newsletter_sends_collection.delete_many({'newsletter_id': test_newsletter_id})
link_clicks_collection.delete_many({'newsletter_id': test_newsletter_id})
subscriber_activity_collection.delete_many({'email': {'$in': [test_email, test_email_opted_out, 'api-test@example.com']}})
subscribers_collection.delete_many({'email': {'$in': [test_email, test_email_opted_out, 'api-test@example.com']}})
# Clean up anonymized records
newsletter_sends_collection.delete_many({'subscriber_email': 'anonymized'})
link_clicks_collection.delete_many({'subscriber_email': 'anonymized'})
print("✓ Test data cleaned up")
except Exception as e:
print(f"⚠ Error cleaning up: {str(e)}")
# Summary
print("\n" + "="*80)
print("TEST SUMMARY")
print("="*80)
print(f"Total tests: {tests_passed + tests_failed}")
print(f"✓ Passed: {tests_passed}")
print(f"❌ Failed: {tests_failed}")
if tests_failed == 0:
print("\n🎉 All privacy compliance tests passed!")
else:
print(f"\n{tests_failed} test(s) failed")
print("="*80 + "\n")
# Exit with appropriate code
sys.exit(0 if tests_failed == 0 else 1)

View File

@@ -0,0 +1,128 @@
#!/usr/bin/env python
"""
Test RSS feed URL extraction
Run from backend directory with venv activated:
cd backend
source venv/bin/activate # or venv\Scripts\activate on Windows
python test_rss_extraction.py
"""
from pymongo import MongoClient
from config import Config
import feedparser
from utils.rss_utils import extract_article_url, extract_article_summary, extract_published_date
print("\n" + "="*80)
print("RSS Feed URL Extraction Test")
print("="*80)
# Connect to database
print(f"\nConnecting to MongoDB: {Config.MONGODB_URI}")
client = MongoClient(Config.MONGODB_URI)
db = client[Config.DB_NAME]
# Get RSS feeds
print("Fetching RSS feeds from database...")
feeds = list(db['rss_feeds'].find())
if not feeds:
print("\n❌ No RSS feeds in database!")
print("\nAdd a feed first:")
print(" curl -X POST http://localhost:5001/api/rss-feeds \\")
print(" -H 'Content-Type: application/json' \\")
print(" -d '{\"name\": \"Süddeutsche Politik\", \"url\": \"https://rss.sueddeutsche.de/rss/Politik\"}'")
exit(1)
print(f"✓ Found {len(feeds)} feed(s)\n")
# Test each feed
total_success = 0
total_fail = 0
for feed_doc in feeds:
name = feed_doc.get('name', 'Unknown')
url = feed_doc.get('url', '')
active = feed_doc.get('active', True)
print("\n" + "="*80)
print(f"Feed: {name}")
print(f"URL: {url}")
print(f"Active: {'Yes' if active else 'No'}")
print("="*80)
if not active:
print("⏭ Skipping (inactive)")
continue
try:
# Parse RSS
print("\nFetching RSS feed...")
feed = feedparser.parse(url)
if not feed.entries:
print("❌ No entries found in feed")
continue
print(f"✓ Found {len(feed.entries)} entries")
# Test first 3 entries
print(f"\nTesting first 3 entries:")
print("-" * 80)
for i, entry in enumerate(feed.entries[:3], 1):
print(f"\n📰 Entry {i}:")
# Title
title = entry.get('title', 'No title')
print(f" Title: {title[:65]}")
# Test URL extraction
article_url = extract_article_url(entry)
if article_url:
print(f" ✓ URL: {article_url}")
total_success += 1
else:
print(f" ❌ Could not extract URL")
print(f" Available fields: {list(entry.keys())[:10]}")
print(f" link: {entry.get('link', 'N/A')}")
print(f" guid: {entry.get('guid', 'N/A')}")
print(f" id: {entry.get('id', 'N/A')}")
total_fail += 1
# Test summary
summary = extract_article_summary(entry)
if summary:
print(f" ✓ Summary: {summary[:70]}...")
else:
print(f" ⚠ No summary")
# Test date
pub_date = extract_published_date(entry)
if pub_date:
print(f" ✓ Date: {pub_date}")
else:
print(f" ⚠ No date")
except Exception as e:
print(f"❌ Error: {e}")
import traceback
traceback.print_exc()
# Summary
print("\n" + "="*80)
print("SUMMARY")
print("="*80)
print(f"Total URLs tested: {total_success + total_fail}")
print(f"✓ Successfully extracted: {total_success}")
print(f"❌ Failed to extract: {total_fail}")
if total_fail == 0:
print("\n🎉 All URLs extracted successfully!")
print("\nYou can now run the crawler:")
print(" cd ../news_crawler")
print(" pip install -r requirements.txt")
print(" python crawler_service.py 5")
else:
print(f"\n{total_fail} URL(s) could not be extracted")
print("Check the output above for details")
print("="*80 + "\n")

View File

@@ -0,0 +1,260 @@
#!/usr/bin/env python
"""
Test email tracking functionality
Run from backend directory with venv activated:
cd backend
source venv/bin/activate # or venv\Scripts\activate on Windows
python test_tracking.py
"""
import sys
import os
from datetime import datetime
from pymongo import MongoClient
# Add backend directory to path
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from config import Config
from services.tracking_service import generate_tracking_id, create_newsletter_tracking
from database import newsletter_sends_collection, link_clicks_collection
from app import app
print("\n" + "="*80)
print("Email Tracking System Tests")
print("="*80)
# Test counters
tests_passed = 0
tests_failed = 0
def test_result(test_name, passed, message=""):
"""Print test result"""
global tests_passed, tests_failed
if passed:
tests_passed += 1
print(f"{test_name}")
if message:
print(f" {message}")
else:
tests_failed += 1
print(f"{test_name}")
if message:
print(f" {message}")
# Test 1: Tracking ID Generation
print("\n" + "-"*80)
print("Test 1: Tracking ID Generation")
print("-"*80)
try:
tracking_id = generate_tracking_id()
# Check format (UUID4)
is_valid_uuid = len(tracking_id) == 36 and tracking_id.count('-') == 4
test_result("Generate tracking ID", is_valid_uuid, f"Generated ID: {tracking_id}")
# Check uniqueness
tracking_id2 = generate_tracking_id()
is_unique = tracking_id != tracking_id2
test_result("Tracking IDs are unique", is_unique, f"ID1: {tracking_id[:8]}... ID2: {tracking_id2[:8]}...")
except Exception as e:
test_result("Generate tracking ID", False, f"Error: {str(e)}")
# Test 2: Create Newsletter Tracking
print("\n" + "-"*80)
print("Test 2: Create Newsletter Tracking")
print("-"*80)
try:
# Clean up test data first
newsletter_sends_collection.delete_many({'newsletter_id': 'test-newsletter-001'})
link_clicks_collection.delete_many({'newsletter_id': 'test-newsletter-001'})
# Create tracking with article links
article_links = [
{'url': 'https://example.com/article1', 'title': 'Test Article 1'},
{'url': 'https://example.com/article2', 'title': 'Test Article 2'}
]
tracking_data = create_newsletter_tracking(
newsletter_id='test-newsletter-001',
subscriber_email='test@example.com',
article_links=article_links
)
# Verify return data structure
has_pixel_id = 'pixel_tracking_id' in tracking_data
test_result("Returns pixel tracking ID", has_pixel_id)
has_link_map = 'link_tracking_map' in tracking_data
test_result("Returns link tracking map", has_link_map)
correct_link_count = len(tracking_data.get('link_tracking_map', {})) == 2
test_result("Creates tracking for all links", correct_link_count,
f"Created {len(tracking_data.get('link_tracking_map', {}))} link tracking records")
# Verify database records
newsletter_record = newsletter_sends_collection.find_one({
'tracking_id': tracking_data['pixel_tracking_id']
})
record_exists = newsletter_record is not None
test_result("Creates newsletter_sends record", record_exists)
if newsletter_record:
correct_initial_state = (
newsletter_record['opened'] == False and
newsletter_record['open_count'] == 0 and
newsletter_record['first_opened_at'] is None
)
test_result("Newsletter record has correct initial state", correct_initial_state)
# Verify link click records
link_records = list(link_clicks_collection.find({'newsletter_id': 'test-newsletter-001'}))
correct_link_records = len(link_records) == 2
test_result("Creates link_clicks records", correct_link_records,
f"Created {len(link_records)} link click records")
except Exception as e:
test_result("Create newsletter tracking", False, f"Error: {str(e)}")
import traceback
traceback.print_exc()
# Test 3: Tracking Pixel Endpoint
print("\n" + "-"*80)
print("Test 3: Tracking Pixel Endpoint")
print("-"*80)
try:
with app.test_client() as client:
# Test with valid tracking ID
pixel_tracking_id = tracking_data['pixel_tracking_id']
response = client.get(f'/api/track/pixel/{pixel_tracking_id}')
is_png = response.content_type == 'image/png'
test_result("Returns PNG for valid tracking_id", is_png,
f"Content-Type: {response.content_type}")
is_200 = response.status_code == 200
test_result("Returns 200 status", is_200, f"Status: {response.status_code}")
# Verify database was updated
updated_record = newsletter_sends_collection.find_one({
'tracking_id': pixel_tracking_id
})
was_logged = (
updated_record and
updated_record['opened'] == True and
updated_record['open_count'] == 1 and
updated_record['first_opened_at'] is not None
)
test_result("Logs email open event", was_logged,
f"Open count: {updated_record.get('open_count', 0) if updated_record else 0}")
# Test multiple opens
response2 = client.get(f'/api/track/pixel/{pixel_tracking_id}')
updated_record2 = newsletter_sends_collection.find_one({
'tracking_id': pixel_tracking_id
})
handles_multiple = (
updated_record2 and
updated_record2['open_count'] == 2 and
updated_record2['last_opened_at'] != updated_record2['first_opened_at']
)
test_result("Handles multiple opens", handles_multiple,
f"Open count: {updated_record2.get('open_count', 0) if updated_record2 else 0}")
# Test with invalid tracking ID
response3 = client.get('/api/track/pixel/invalid-tracking-id-12345')
fails_silently = response3.status_code == 200 and response3.content_type == 'image/png'
test_result("Returns PNG for invalid tracking_id (fails silently)", fails_silently)
except Exception as e:
test_result("Tracking pixel endpoint", False, f"Error: {str(e)}")
import traceback
traceback.print_exc()
# Test 4: Link Redirect Endpoint
print("\n" + "-"*80)
print("Test 4: Link Redirect Endpoint")
print("-"*80)
try:
with app.test_client() as client:
# Test with valid tracking ID
article_url = 'https://example.com/article1'
link_tracking_id = tracking_data['link_tracking_map'][article_url]
response = client.get(f'/api/track/click/{link_tracking_id}', follow_redirects=False)
is_redirect = response.status_code == 302
test_result("Returns 302 redirect", is_redirect, f"Status: {response.status_code}")
correct_location = response.location == article_url
test_result("Redirects to correct URL", correct_location,
f"Location: {response.location}")
# Verify database was updated
click_record = link_clicks_collection.find_one({
'tracking_id': link_tracking_id
})
was_logged = (
click_record and
click_record['clicked'] == True and
click_record['clicked_at'] is not None
)
test_result("Logs click event", was_logged)
# Test with invalid tracking ID
response2 = client.get('/api/track/click/invalid-tracking-id-12345', follow_redirects=False)
redirects_on_invalid = response2.status_code == 302
test_result("Redirects on invalid tracking_id", redirects_on_invalid,
f"Redirects to: {response2.location}")
except Exception as e:
test_result("Link redirect endpoint", False, f"Error: {str(e)}")
import traceback
traceback.print_exc()
# Clean up test data
print("\n" + "-"*80)
print("Cleaning up test data...")
print("-"*80)
try:
newsletter_sends_collection.delete_many({'newsletter_id': 'test-newsletter-001'})
link_clicks_collection.delete_many({'newsletter_id': 'test-newsletter-001'})
print("✓ Test data cleaned up")
except Exception as e:
print(f"⚠ Error cleaning up: {str(e)}")
# Summary
print("\n" + "="*80)
print("TEST SUMMARY")
print("="*80)
print(f"Total tests: {tests_passed + tests_failed}")
print(f"✓ Passed: {tests_passed}")
print(f"❌ Failed: {tests_failed}")
if tests_failed == 0:
print("\n🎉 All tests passed!")
else:
print(f"\n{tests_failed} test(s) failed")
print("="*80 + "\n")
# Exit with appropriate code
sys.exit(0 if tests_failed == 0 else 1)