390 lines
15 KiB
Python
390 lines
15 KiB
Python
#!/usr/bin/env python
|
|
"""
|
|
Test privacy compliance features for email tracking
|
|
Run from backend directory with venv activated:
|
|
cd backend
|
|
source venv/bin/activate # or venv\Scripts\activate on Windows
|
|
python test_privacy.py
|
|
"""
|
|
|
|
import sys
|
|
import os
|
|
from datetime import datetime, timedelta
|
|
from pymongo import MongoClient
|
|
|
|
# Add backend directory to path
|
|
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
|
|
|
from config import Config
|
|
from services.tracking_service import (
|
|
create_newsletter_tracking,
|
|
anonymize_old_tracking_data,
|
|
delete_subscriber_tracking_data
|
|
)
|
|
from database import (
|
|
newsletter_sends_collection,
|
|
link_clicks_collection,
|
|
subscriber_activity_collection,
|
|
subscribers_collection
|
|
)
|
|
from app import app
|
|
|
|
print("\n" + "="*80)
|
|
print("Privacy Compliance Tests")
|
|
print("="*80)
|
|
|
|
# Test counters
|
|
tests_passed = 0
|
|
tests_failed = 0
|
|
|
|
def test_result(test_name, passed, message=""):
|
|
"""Print test result"""
|
|
global tests_passed, tests_failed
|
|
if passed:
|
|
tests_passed += 1
|
|
print(f"✓ {test_name}")
|
|
if message:
|
|
print(f" {message}")
|
|
else:
|
|
tests_failed += 1
|
|
print(f"❌ {test_name}")
|
|
if message:
|
|
print(f" {message}")
|
|
|
|
|
|
# Setup: Clean up test data
|
|
print("\n" + "-"*80)
|
|
print("Setup: Cleaning test data")
|
|
print("-"*80)
|
|
|
|
test_newsletter_id = 'privacy-test-newsletter'
|
|
test_email = 'privacy-test@example.com'
|
|
test_email_opted_out = 'opted-out@example.com'
|
|
|
|
newsletter_sends_collection.delete_many({'newsletter_id': test_newsletter_id})
|
|
link_clicks_collection.delete_many({'newsletter_id': test_newsletter_id})
|
|
subscriber_activity_collection.delete_many({'email': {'$in': [test_email, test_email_opted_out]}})
|
|
subscribers_collection.delete_many({'email': {'$in': [test_email, test_email_opted_out]}})
|
|
|
|
print("✓ Test data cleaned")
|
|
|
|
|
|
# Test 1: Data Anonymization
|
|
print("\n" + "-"*80)
|
|
print("Test 1: Data Anonymization")
|
|
print("-"*80)
|
|
|
|
try:
|
|
# Create old tracking records (older than 90 days)
|
|
old_date = datetime.utcnow() - timedelta(days=100)
|
|
|
|
old_newsletter_doc = {
|
|
'newsletter_id': test_newsletter_id,
|
|
'subscriber_email': 'old-user@example.com',
|
|
'tracking_id': 'old-tracking-id-1',
|
|
'sent_at': old_date,
|
|
'opened': True,
|
|
'first_opened_at': old_date,
|
|
'last_opened_at': old_date,
|
|
'open_count': 3,
|
|
'created_at': old_date
|
|
}
|
|
newsletter_sends_collection.insert_one(old_newsletter_doc)
|
|
|
|
old_link_doc = {
|
|
'tracking_id': 'old-link-tracking-id-1',
|
|
'newsletter_id': test_newsletter_id,
|
|
'subscriber_email': 'old-user@example.com',
|
|
'article_url': 'https://example.com/old-article',
|
|
'article_title': 'Old Article',
|
|
'clicked': True,
|
|
'clicked_at': old_date,
|
|
'created_at': old_date
|
|
}
|
|
link_clicks_collection.insert_one(old_link_doc)
|
|
|
|
# Create recent tracking records (within 90 days)
|
|
recent_date = datetime.utcnow() - timedelta(days=30)
|
|
|
|
recent_newsletter_doc = {
|
|
'newsletter_id': test_newsletter_id,
|
|
'subscriber_email': 'recent-user@example.com',
|
|
'tracking_id': 'recent-tracking-id-1',
|
|
'sent_at': recent_date,
|
|
'opened': True,
|
|
'first_opened_at': recent_date,
|
|
'last_opened_at': recent_date,
|
|
'open_count': 1,
|
|
'created_at': recent_date
|
|
}
|
|
newsletter_sends_collection.insert_one(recent_newsletter_doc)
|
|
|
|
# Run anonymization
|
|
result = anonymize_old_tracking_data(retention_days=90)
|
|
|
|
# Check that old records were anonymized
|
|
old_newsletter_after = newsletter_sends_collection.find_one({'tracking_id': 'old-tracking-id-1'})
|
|
old_anonymized = old_newsletter_after and old_newsletter_after['subscriber_email'] == 'anonymized'
|
|
test_result("Anonymizes old newsletter records", old_anonymized,
|
|
f"Email: {old_newsletter_after.get('subscriber_email', 'N/A') if old_newsletter_after else 'N/A'}")
|
|
|
|
old_link_after = link_clicks_collection.find_one({'tracking_id': 'old-link-tracking-id-1'})
|
|
link_anonymized = old_link_after and old_link_after['subscriber_email'] == 'anonymized'
|
|
test_result("Anonymizes old link click records", link_anonymized,
|
|
f"Email: {old_link_after.get('subscriber_email', 'N/A') if old_link_after else 'N/A'}")
|
|
|
|
# Check that aggregated metrics are preserved
|
|
metrics_preserved = (
|
|
old_newsletter_after and
|
|
old_newsletter_after['open_count'] == 3 and
|
|
old_newsletter_after['opened'] == True
|
|
)
|
|
test_result("Preserves aggregated metrics", metrics_preserved,
|
|
f"Open count: {old_newsletter_after.get('open_count', 0) if old_newsletter_after else 0}")
|
|
|
|
# Check that recent records were NOT anonymized
|
|
recent_newsletter_after = newsletter_sends_collection.find_one({'tracking_id': 'recent-tracking-id-1'})
|
|
recent_not_anonymized = (
|
|
recent_newsletter_after and
|
|
recent_newsletter_after['subscriber_email'] == 'recent-user@example.com'
|
|
)
|
|
test_result("Does not anonymize recent records", recent_not_anonymized,
|
|
f"Email: {recent_newsletter_after.get('subscriber_email', 'N/A') if recent_newsletter_after else 'N/A'}")
|
|
|
|
# Check return counts
|
|
correct_counts = result['newsletter_sends_anonymized'] >= 1 and result['link_clicks_anonymized'] >= 1
|
|
test_result("Returns correct anonymization counts", correct_counts,
|
|
f"Newsletter: {result['newsletter_sends_anonymized']}, Links: {result['link_clicks_anonymized']}")
|
|
|
|
except Exception as e:
|
|
test_result("Data anonymization", False, f"Error: {str(e)}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
|
|
|
|
# Test 2: User Data Deletion
|
|
print("\n" + "-"*80)
|
|
print("Test 2: User Data Deletion")
|
|
print("-"*80)
|
|
|
|
try:
|
|
# Create tracking records for a specific user
|
|
article_links = [
|
|
{'url': 'https://example.com/article1', 'title': 'Article 1'},
|
|
{'url': 'https://example.com/article2', 'title': 'Article 2'}
|
|
]
|
|
|
|
tracking_data = create_newsletter_tracking(
|
|
newsletter_id=test_newsletter_id,
|
|
subscriber_email=test_email,
|
|
article_links=article_links
|
|
)
|
|
|
|
# Create subscriber activity record
|
|
subscriber_activity_collection.insert_one({
|
|
'email': test_email,
|
|
'status': 'active',
|
|
'last_opened_at': datetime.utcnow(),
|
|
'total_opens': 5,
|
|
'total_clicks': 3
|
|
})
|
|
|
|
# Verify records exist
|
|
newsletter_count_before = newsletter_sends_collection.count_documents({'subscriber_email': test_email})
|
|
link_count_before = link_clicks_collection.count_documents({'subscriber_email': test_email})
|
|
activity_count_before = subscriber_activity_collection.count_documents({'email': test_email})
|
|
|
|
records_exist = newsletter_count_before > 0 and link_count_before > 0 and activity_count_before > 0
|
|
test_result("Creates test tracking records", records_exist,
|
|
f"Newsletter: {newsletter_count_before}, Links: {link_count_before}, Activity: {activity_count_before}")
|
|
|
|
# Delete all tracking data for the user
|
|
delete_result = delete_subscriber_tracking_data(test_email)
|
|
|
|
# Verify all records were deleted
|
|
newsletter_count_after = newsletter_sends_collection.count_documents({'subscriber_email': test_email})
|
|
link_count_after = link_clicks_collection.count_documents({'subscriber_email': test_email})
|
|
activity_count_after = subscriber_activity_collection.count_documents({'email': test_email})
|
|
|
|
all_deleted = newsletter_count_after == 0 and link_count_after == 0 and activity_count_after == 0
|
|
test_result("Deletes all tracking records", all_deleted,
|
|
f"Remaining - Newsletter: {newsletter_count_after}, Links: {link_count_after}, Activity: {activity_count_after}")
|
|
|
|
# Check return counts
|
|
correct_delete_counts = (
|
|
delete_result['newsletter_sends_deleted'] == newsletter_count_before and
|
|
delete_result['link_clicks_deleted'] == link_count_before and
|
|
delete_result['subscriber_activity_deleted'] == activity_count_before
|
|
)
|
|
test_result("Returns correct deletion counts", correct_delete_counts,
|
|
f"Deleted - Newsletter: {delete_result['newsletter_sends_deleted']}, Links: {delete_result['link_clicks_deleted']}, Activity: {delete_result['subscriber_activity_deleted']}")
|
|
|
|
except Exception as e:
|
|
test_result("User data deletion", False, f"Error: {str(e)}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
|
|
|
|
# Test 3: Tracking Opt-Out
|
|
print("\n" + "-"*80)
|
|
print("Test 3: Tracking Opt-Out")
|
|
print("-"*80)
|
|
|
|
try:
|
|
# Create subscriber with tracking disabled
|
|
subscribers_collection.insert_one({
|
|
'email': test_email_opted_out,
|
|
'subscribed_at': datetime.utcnow(),
|
|
'tracking_enabled': False
|
|
})
|
|
|
|
# Try to create tracking for opted-out subscriber
|
|
article_links = [
|
|
{'url': 'https://example.com/article1', 'title': 'Article 1'}
|
|
]
|
|
|
|
tracking_data_opted_out = create_newsletter_tracking(
|
|
newsletter_id=test_newsletter_id,
|
|
subscriber_email=test_email_opted_out,
|
|
article_links=article_links
|
|
)
|
|
|
|
# Check that no tracking was created
|
|
no_pixel_id = tracking_data_opted_out['pixel_tracking_id'] is None
|
|
test_result("Does not create pixel tracking for opted-out users", no_pixel_id,
|
|
f"Pixel ID: {tracking_data_opted_out['pixel_tracking_id']}")
|
|
|
|
empty_link_map = len(tracking_data_opted_out['link_tracking_map']) == 0
|
|
test_result("Does not create link tracking for opted-out users", empty_link_map,
|
|
f"Link map size: {len(tracking_data_opted_out['link_tracking_map'])}")
|
|
|
|
tracking_disabled_flag = tracking_data_opted_out.get('tracking_enabled') == False
|
|
test_result("Returns tracking_enabled=False for opted-out users", tracking_disabled_flag)
|
|
|
|
# Verify no database records were created
|
|
newsletter_count = newsletter_sends_collection.count_documents({'subscriber_email': test_email_opted_out})
|
|
link_count = link_clicks_collection.count_documents({'subscriber_email': test_email_opted_out})
|
|
|
|
no_db_records = newsletter_count == 0 and link_count == 0
|
|
test_result("Does not create database records for opted-out users", no_db_records,
|
|
f"Newsletter records: {newsletter_count}, Link records: {link_count}")
|
|
|
|
# Test opt-in/opt-out endpoints
|
|
with app.test_client() as client:
|
|
# Create a subscriber with tracking enabled
|
|
subscribers_collection.insert_one({
|
|
'email': test_email,
|
|
'subscribed_at': datetime.utcnow(),
|
|
'tracking_enabled': True
|
|
})
|
|
|
|
# Opt out
|
|
response = client.post(f'/api/tracking/subscriber/{test_email}/opt-out')
|
|
opt_out_success = response.status_code == 200 and response.json.get('success') == True
|
|
test_result("Opt-out endpoint works", opt_out_success,
|
|
f"Status: {response.status_code}")
|
|
|
|
# Verify tracking is disabled
|
|
subscriber = subscribers_collection.find_one({'email': test_email})
|
|
tracking_disabled = subscriber and subscriber.get('tracking_enabled') == False
|
|
test_result("Opt-out disables tracking in database", tracking_disabled)
|
|
|
|
# Opt back in
|
|
response = client.post(f'/api/tracking/subscriber/{test_email}/opt-in')
|
|
opt_in_success = response.status_code == 200 and response.json.get('success') == True
|
|
test_result("Opt-in endpoint works", opt_in_success,
|
|
f"Status: {response.status_code}")
|
|
|
|
# Verify tracking is enabled
|
|
subscriber = subscribers_collection.find_one({'email': test_email})
|
|
tracking_enabled = subscriber and subscriber.get('tracking_enabled') == True
|
|
test_result("Opt-in enables tracking in database", tracking_enabled)
|
|
|
|
except Exception as e:
|
|
test_result("Tracking opt-out", False, f"Error: {str(e)}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
|
|
|
|
# Test 4: Privacy API Endpoints
|
|
print("\n" + "-"*80)
|
|
print("Test 4: Privacy API Endpoints")
|
|
print("-"*80)
|
|
|
|
try:
|
|
with app.test_client() as client:
|
|
# Create test tracking data
|
|
article_links = [{'url': 'https://example.com/test', 'title': 'Test'}]
|
|
create_newsletter_tracking(
|
|
newsletter_id=test_newsletter_id,
|
|
subscriber_email='api-test@example.com',
|
|
article_links=article_links
|
|
)
|
|
|
|
# Test deletion endpoint
|
|
response = client.delete('/api/tracking/subscriber/api-test@example.com')
|
|
|
|
delete_endpoint_works = response.status_code == 200 and response.json.get('success') == True
|
|
test_result("Deletion endpoint returns success", delete_endpoint_works,
|
|
f"Status: {response.status_code}")
|
|
|
|
# Verify data was deleted
|
|
remaining_records = newsletter_sends_collection.count_documents({'subscriber_email': 'api-test@example.com'})
|
|
data_deleted = remaining_records == 0
|
|
test_result("Deletion endpoint removes data", data_deleted,
|
|
f"Remaining records: {remaining_records}")
|
|
|
|
# Test anonymization endpoint
|
|
response = client.post('/api/tracking/anonymize', json={'retention_days': 90})
|
|
|
|
anonymize_endpoint_works = response.status_code == 200 and response.json.get('success') == True
|
|
test_result("Anonymization endpoint returns success", anonymize_endpoint_works,
|
|
f"Status: {response.status_code}")
|
|
|
|
has_counts = 'anonymized_counts' in response.json
|
|
test_result("Anonymization endpoint returns counts", has_counts)
|
|
|
|
except Exception as e:
|
|
test_result("Privacy API endpoints", False, f"Error: {str(e)}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
|
|
|
|
# Clean up test data
|
|
print("\n" + "-"*80)
|
|
print("Cleaning up test data...")
|
|
print("-"*80)
|
|
|
|
try:
|
|
newsletter_sends_collection.delete_many({'newsletter_id': test_newsletter_id})
|
|
link_clicks_collection.delete_many({'newsletter_id': test_newsletter_id})
|
|
subscriber_activity_collection.delete_many({'email': {'$in': [test_email, test_email_opted_out, 'api-test@example.com']}})
|
|
subscribers_collection.delete_many({'email': {'$in': [test_email, test_email_opted_out, 'api-test@example.com']}})
|
|
|
|
# Clean up anonymized records
|
|
newsletter_sends_collection.delete_many({'subscriber_email': 'anonymized'})
|
|
link_clicks_collection.delete_many({'subscriber_email': 'anonymized'})
|
|
|
|
print("✓ Test data cleaned up")
|
|
except Exception as e:
|
|
print(f"⚠ Error cleaning up: {str(e)}")
|
|
|
|
|
|
# Summary
|
|
print("\n" + "="*80)
|
|
print("TEST SUMMARY")
|
|
print("="*80)
|
|
print(f"Total tests: {tests_passed + tests_failed}")
|
|
print(f"✓ Passed: {tests_passed}")
|
|
print(f"❌ Failed: {tests_failed}")
|
|
|
|
if tests_failed == 0:
|
|
print("\n🎉 All privacy compliance tests passed!")
|
|
else:
|
|
print(f"\n⚠ {tests_failed} test(s) failed")
|
|
|
|
print("="*80 + "\n")
|
|
|
|
# Exit with appropriate code
|
|
sys.exit(0 if tests_failed == 0 else 1)
|