This commit is contained in:
2025-11-18 14:45:41 +01:00
parent 2e80d64ff6
commit 84fce9a82c
19 changed files with 2437 additions and 3 deletions

30
backend/.env.local Normal file
View File

@@ -0,0 +1,30 @@
# Munich News Daily - Local Development Backend Configuration
# MongoDB Configuration
MONGODB_URI=mongodb://admin:changeme@mongodb:27017/
# Email Configuration (use test credentials or disable)
SMTP_SERVER=localhost
SMTP_PORT=587
EMAIL_USER=test@localhost
EMAIL_PASSWORD=test123
# Newsletter Settings
NEWSLETTER_MAX_ARTICLES=5
NEWSLETTER_HOURS_LOOKBACK=24
WEBSITE_URL=http://localhost:3000
# Tracking Configuration
TRACKING_ENABLED=true
TRACKING_API_URL=http://localhost:5001
TRACKING_DATA_RETENTION_DAYS=90
# Ollama Configuration (AI Summarization)
OLLAMA_ENABLED=true
OLLAMA_BASE_URL=http://ollama:11434
OLLAMA_MODEL=phi3:latest
OLLAMA_TIMEOUT=120
SUMMARY_MAX_WORDS=150
# Flask Server Configuration
FLASK_PORT=5001

View File

@@ -11,6 +11,8 @@ from routes.tracking_routes import tracking_bp
from routes.analytics_routes import analytics_bp
from routes.admin_routes import admin_bp
from routes.transport_routes import transport_bp
from routes.interests_routes import interests_bp
from routes.personalization_routes import personalization_bp
# Initialize Flask app
app = Flask(__name__)
@@ -29,6 +31,8 @@ app.register_blueprint(tracking_bp)
app.register_blueprint(analytics_bp)
app.register_blueprint(admin_bp)
app.register_blueprint(transport_bp)
app.register_blueprint(interests_bp)
app.register_blueprint(personalization_bp)
# Health check endpoint
@app.route('/health')

View File

@@ -0,0 +1,239 @@
"""
User Interest Profile API routes for Munich News Daily.
Provides endpoints to view and manage user interest profiles.
"""
from flask import Blueprint, request, jsonify
from services.interest_profiling_service import (
get_user_interests,
get_top_interests,
build_interests_from_history,
decay_user_interests,
get_interest_statistics,
delete_user_interests
)
interests_bp = Blueprint('interests', __name__)
@interests_bp.route('/api/interests/<email>', methods=['GET'])
def get_interests(email):
"""
Get user interest profile.
Args:
email: Email address of the user
Returns:
JSON response with user interest profile
"""
try:
profile = get_user_interests(email)
if not profile:
return jsonify({
'success': False,
'error': 'User profile not found'
}), 404
# Remove MongoDB _id field
if '_id' in profile:
del profile['_id']
return jsonify({
'success': True,
'profile': profile
}), 200
except Exception as e:
return jsonify({
'success': False,
'error': str(e)
}), 500
@interests_bp.route('/api/interests/<email>/top', methods=['GET'])
def get_top_user_interests(email):
"""
Get user's top interests sorted by score.
Query parameters:
top_n: Number of top interests to return (default: 10)
Args:
email: Email address of the user
Returns:
JSON response with top categories and keywords
"""
try:
top_n = request.args.get('top_n', 10, type=int)
top_interests = get_top_interests(email, top_n)
return jsonify({
'success': True,
'email': email,
'top_categories': [
{'category': cat, 'score': score}
for cat, score in top_interests['top_categories']
],
'top_keywords': [
{'keyword': kw, 'score': score}
for kw, score in top_interests['top_keywords']
]
}), 200
except Exception as e:
return jsonify({
'success': False,
'error': str(e)
}), 500
@interests_bp.route('/api/interests/<email>/rebuild', methods=['POST'])
def rebuild_interests(email):
"""
Rebuild user interest profile from click history.
Request body (optional):
{
"days_lookback": 30 // Number of days of history to analyze
}
Args:
email: Email address of the user
Returns:
JSON response with rebuilt profile
"""
try:
data = request.get_json() or {}
days_lookback = data.get('days_lookback', 30)
# Validate days_lookback
if not isinstance(days_lookback, int) or days_lookback < 1:
return jsonify({
'success': False,
'error': 'days_lookback must be a positive integer'
}), 400
profile = build_interests_from_history(email, days_lookback)
# Remove MongoDB _id field
if '_id' in profile:
del profile['_id']
return jsonify({
'success': True,
'message': f'Profile rebuilt from {days_lookback} days of history',
'profile': profile
}), 200
except Exception as e:
return jsonify({
'success': False,
'error': str(e)
}), 500
@interests_bp.route('/api/interests/decay', methods=['POST'])
def decay_interests():
"""
Decay interest scores for inactive users.
Request body (optional):
{
"decay_factor": 0.95, // Multiplier for scores (default: 0.95)
"days_threshold": 7 // Only decay profiles older than N days
}
Returns:
JSON response with decay statistics
"""
try:
data = request.get_json() or {}
decay_factor = data.get('decay_factor', 0.95)
days_threshold = data.get('days_threshold', 7)
# Validate parameters
if not isinstance(decay_factor, (int, float)) or decay_factor <= 0 or decay_factor > 1:
return jsonify({
'success': False,
'error': 'decay_factor must be between 0 and 1'
}), 400
if not isinstance(days_threshold, int) or days_threshold < 1:
return jsonify({
'success': False,
'error': 'days_threshold must be a positive integer'
}), 400
result = decay_user_interests(decay_factor, days_threshold)
return jsonify({
'success': True,
'message': f'Decayed interests for profiles older than {days_threshold} days',
'statistics': result
}), 200
except Exception as e:
return jsonify({
'success': False,
'error': str(e)
}), 500
@interests_bp.route('/api/interests/statistics', methods=['GET'])
def get_statistics():
"""
Get statistics about user interests across all users.
Returns:
JSON response with interest statistics
"""
try:
stats = get_interest_statistics()
return jsonify({
'success': True,
'statistics': stats
}), 200
except Exception as e:
return jsonify({
'success': False,
'error': str(e)
}), 500
@interests_bp.route('/api/interests/<email>', methods=['DELETE'])
def delete_interests(email):
"""
Delete user interest profile (GDPR compliance).
Args:
email: Email address of the user
Returns:
JSON response with confirmation
"""
try:
deleted = delete_user_interests(email)
if not deleted:
return jsonify({
'success': False,
'error': 'User profile not found'
}), 404
return jsonify({
'success': True,
'message': f'Interest profile deleted for {email}'
}), 200
except Exception as e:
return jsonify({
'success': False,
'error': str(e)
}), 500

View File

@@ -0,0 +1,135 @@
"""
Personalization API routes for Munich News Daily.
Provides endpoints to test and preview personalized content.
"""
from flask import Blueprint, request, jsonify
from datetime import datetime, timedelta
from database import articles_collection
from services.personalization_service import (
rank_articles_for_user,
select_personalized_articles,
get_personalization_explanation,
get_personalization_stats
)
personalization_bp = Blueprint('personalization', __name__)
@personalization_bp.route('/api/personalize/preview/<email>', methods=['GET'])
def preview_personalized_newsletter(email):
"""
Preview personalized newsletter for a user.
Query parameters:
max_articles: Maximum articles to return (default: 10)
hours_lookback: Hours of articles to consider (default: 24)
Returns:
JSON with personalized article selection and statistics
"""
try:
max_articles = request.args.get('max_articles', 10, type=int)
hours_lookback = request.args.get('hours_lookback', 24, type=int)
# Get recent articles
cutoff_date = datetime.utcnow() - timedelta(hours=hours_lookback)
articles = list(articles_collection.find({
'created_at': {'$gte': cutoff_date},
'summary': {'$exists': True, '$ne': None}
}).sort('created_at', -1))
# Select personalized articles
personalized = select_personalized_articles(
articles,
email,
max_articles=max_articles
)
# Get statistics
stats = get_personalization_stats(personalized, email)
# Format response
articles_response = []
for article in personalized:
articles_response.append({
'title': article.get('title', ''),
'title_en': article.get('title_en'),
'summary': article.get('summary', ''),
'link': article.get('link', ''),
'category': article.get('category', 'general'),
'keywords': article.get('keywords', []),
'personalization_score': article.get('personalization_score', 0.0),
'published_at': article.get('published_at', '')
})
return jsonify({
'success': True,
'email': email,
'articles': articles_response,
'statistics': stats
}), 200
except Exception as e:
return jsonify({
'success': False,
'error': str(e)
}), 500
@personalization_bp.route('/api/personalize/explain', methods=['POST'])
def explain_recommendation():
"""
Explain why an article was recommended to a user.
Request body:
{
"email": "user@example.com",
"article_id": "article-id-here"
}
Returns:
JSON with explanation of recommendation
"""
try:
data = request.get_json()
if not data or 'email' not in data or 'article_id' not in data:
return jsonify({
'success': False,
'error': 'email and article_id required'
}), 400
email = data['email']
article_id = data['article_id']
# Get article
from bson import ObjectId
article = articles_collection.find_one({'_id': ObjectId(article_id)})
if not article:
return jsonify({
'success': False,
'error': 'Article not found'
}), 404
# Get user interests
from services.interest_profiling_service import get_user_interests
user_interests = get_user_interests(email)
# Generate explanation
explanation = get_personalization_explanation(article, user_interests)
return jsonify({
'success': True,
'email': email,
'article_title': article.get('title', ''),
'explanation': explanation
}), 200
except Exception as e:
return jsonify({
'success': False,
'error': str(e)
}), 500

View File

@@ -79,8 +79,8 @@ def track_click(tracking_id):
"""
Track link clicks and redirect to original article URL.
Logs the click event and redirects the user to the original article URL.
Handles invalid tracking_id by redirecting to homepage.
Logs the click event, updates user interest profile, and redirects the user
to the original article URL. Handles invalid tracking_id by redirecting to homepage.
Ensures redirect completes within 200ms.
Args:
@@ -115,6 +115,19 @@ def track_click(tracking_id):
}
}
)
# Update user interest profile (Phase 3)
subscriber_email = tracking_record.get('subscriber_email')
keywords = tracking_record.get('keywords', [])
category = tracking_record.get('category', 'general')
if subscriber_email and subscriber_email != 'anonymized':
try:
from services.interest_profiling_service import update_user_interests
update_user_interests(subscriber_email, keywords, category)
except Exception as e:
# Don't fail the redirect if interest update fails
print(f"Error updating user interests: {str(e)}")
except Exception as e:
# Log error but still redirect
print(f"Error tracking click for {tracking_id}: {str(e)}")

View File

@@ -0,0 +1,323 @@
"""
User Interest Profiling Service for Munich News Daily.
Builds and maintains user interest profiles based on article click behavior.
"""
from datetime import datetime, timedelta
from typing import Dict, List, Optional
from database import link_clicks_collection
from pymongo import MongoClient
from config import Config
# Connect to MongoDB
client = MongoClient(Config.MONGODB_URI)
db = client[Config.DB_NAME]
user_interests_collection = db['user_interests']
def update_user_interests(subscriber_email: str, keywords: List[str], category: str) -> Dict:
"""
Update user interest profile based on a clicked article.
Increments interest scores for the article's keywords and category.
Creates a new profile if the user doesn't have one yet.
Args:
subscriber_email: Email address of the user
keywords: List of keywords from the clicked article
category: Category of the clicked article
Returns:
dict: Updated user interest profile
"""
current_time = datetime.utcnow()
# Get existing profile or create new one
profile = user_interests_collection.find_one({'email': subscriber_email})
if not profile:
# Create new profile
profile = {
'email': subscriber_email,
'categories': {},
'keywords': {},
'total_clicks': 0,
'last_updated': current_time,
'created_at': current_time
}
# Update category interest (increment by 0.1, max 1.0)
current_category_score = profile['categories'].get(category, 0.0)
profile['categories'][category] = min(current_category_score + 0.1, 1.0)
# Update keyword interests (increment by 0.1, max 1.0)
for keyword in keywords:
if keyword: # Skip empty keywords
current_keyword_score = profile['keywords'].get(keyword, 0.0)
profile['keywords'][keyword] = min(current_keyword_score + 0.1, 1.0)
# Update metadata
profile['total_clicks'] = profile.get('total_clicks', 0) + 1
profile['last_updated'] = current_time
# Upsert profile
user_interests_collection.update_one(
{'email': subscriber_email},
{'$set': profile},
upsert=True
)
return profile
def get_user_interests(subscriber_email: str) -> Optional[Dict]:
"""
Get user interest profile.
Args:
subscriber_email: Email address of the user
Returns:
dict: User interest profile or None if not found
"""
return user_interests_collection.find_one({'email': subscriber_email})
def decay_user_interests(decay_factor: float = 0.95, days_threshold: int = 7) -> Dict[str, int]:
"""
Decay interest scores for users who haven't clicked recently.
Reduces interest scores over time to reflect changing interests.
Only decays profiles that haven't been updated in the last N days.
Args:
decay_factor: Multiplier for interest scores (default: 0.95 = 5% decay)
days_threshold: Only decay profiles older than this many days (default: 7)
Returns:
dict: Statistics about the decay operation
- profiles_decayed: Number of profiles that were decayed
- profiles_checked: Total number of profiles checked
"""
cutoff_date = datetime.utcnow() - timedelta(days=days_threshold)
# Find profiles that haven't been updated recently
old_profiles = user_interests_collection.find({
'last_updated': {'$lt': cutoff_date}
})
profiles_decayed = 0
profiles_checked = 0
for profile in old_profiles:
profiles_checked += 1
# Decay category scores
decayed_categories = {}
for category, score in profile.get('categories', {}).items():
new_score = score * decay_factor
# Remove categories with very low scores (< 0.05)
if new_score >= 0.05:
decayed_categories[category] = round(new_score, 3)
# Decay keyword scores
decayed_keywords = {}
for keyword, score in profile.get('keywords', {}).items():
new_score = score * decay_factor
# Remove keywords with very low scores (< 0.05)
if new_score >= 0.05:
decayed_keywords[keyword] = round(new_score, 3)
# Update profile with decayed scores
user_interests_collection.update_one(
{'email': profile['email']},
{
'$set': {
'categories': decayed_categories,
'keywords': decayed_keywords,
'last_decayed': datetime.utcnow()
}
}
)
profiles_decayed += 1
return {
'profiles_decayed': profiles_decayed,
'profiles_checked': profiles_checked
}
def get_top_interests(subscriber_email: str, top_n: int = 10) -> Dict[str, List[tuple]]:
"""
Get user's top interests sorted by score.
Args:
subscriber_email: Email address of the user
top_n: Number of top interests to return (default: 10)
Returns:
dict: Top interests containing:
- top_categories: List of (category, score) tuples
- top_keywords: List of (keyword, score) tuples
"""
profile = get_user_interests(subscriber_email)
if not profile:
return {
'top_categories': [],
'top_keywords': []
}
# Sort categories by score
categories = profile.get('categories', {})
top_categories = sorted(categories.items(), key=lambda x: x[1], reverse=True)[:top_n]
# Sort keywords by score
keywords = profile.get('keywords', {})
top_keywords = sorted(keywords.items(), key=lambda x: x[1], reverse=True)[:top_n]
return {
'top_categories': top_categories,
'top_keywords': top_keywords
}
def build_interests_from_history(subscriber_email: str, days_lookback: int = 30) -> Dict:
"""
Build or rebuild user interest profile from click history.
Useful for:
- Initializing profiles for existing users
- Rebuilding profiles after algorithm changes
- Backfilling data
Args:
subscriber_email: Email address of the user
days_lookback: Number of days of history to analyze (default: 30)
Returns:
dict: Newly built interest profile
"""
cutoff_date = datetime.utcnow() - timedelta(days=days_lookback)
# Get all clicks from this user in the lookback period
clicks = link_clicks_collection.find({
'subscriber_email': subscriber_email,
'clicked': True,
'clicked_at': {'$gte': cutoff_date}
})
# Initialize profile
profile = {
'email': subscriber_email,
'categories': {},
'keywords': {},
'total_clicks': 0,
'last_updated': datetime.utcnow(),
'created_at': datetime.utcnow()
}
# Process each click
for click in clicks:
category = click.get('category', 'general')
keywords = click.get('keywords', [])
# Update category score
profile['categories'][category] = profile['categories'].get(category, 0.0) + 0.1
# Update keyword scores
for keyword in keywords:
if keyword:
profile['keywords'][keyword] = profile['keywords'].get(keyword, 0.0) + 0.1
profile['total_clicks'] += 1
# Cap scores at 1.0
for category in profile['categories']:
profile['categories'][category] = min(profile['categories'][category], 1.0)
for keyword in profile['keywords']:
profile['keywords'][keyword] = min(profile['keywords'][keyword], 1.0)
# Save profile
if profile['total_clicks'] > 0:
user_interests_collection.update_one(
{'email': subscriber_email},
{'$set': profile},
upsert=True
)
return profile
def get_interest_statistics() -> Dict:
"""
Get statistics about user interests across all users.
Returns:
dict: Statistics containing:
- total_users: Total number of users with profiles
- avg_clicks_per_user: Average number of clicks per user
- most_popular_categories: Top categories across all users
- most_popular_keywords: Top keywords across all users
"""
total_users = user_interests_collection.count_documents({})
if total_users == 0:
return {
'total_users': 0,
'avg_clicks_per_user': 0,
'most_popular_categories': [],
'most_popular_keywords': []
}
# Calculate average clicks
pipeline = [
{
'$group': {
'_id': None,
'total_clicks': {'$sum': '$total_clicks'}
}
}
]
result = list(user_interests_collection.aggregate(pipeline))
total_clicks = result[0]['total_clicks'] if result else 0
avg_clicks = total_clicks / total_users if total_users > 0 else 0
# Get most popular categories
category_counts = {}
keyword_counts = {}
for profile in user_interests_collection.find({}):
for category, score in profile.get('categories', {}).items():
category_counts[category] = category_counts.get(category, 0) + score
for keyword, score in profile.get('keywords', {}).items():
keyword_counts[keyword] = keyword_counts.get(keyword, 0) + score
# Sort and get top 10
top_categories = sorted(category_counts.items(), key=lambda x: x[1], reverse=True)[:10]
top_keywords = sorted(keyword_counts.items(), key=lambda x: x[1], reverse=True)[:10]
return {
'total_users': total_users,
'avg_clicks_per_user': round(avg_clicks, 2),
'most_popular_categories': top_categories,
'most_popular_keywords': top_keywords
}
def delete_user_interests(subscriber_email: str) -> bool:
"""
Delete user interest profile (for GDPR compliance).
Args:
subscriber_email: Email address of the user
Returns:
bool: True if profile was deleted, False if not found
"""
result = user_interests_collection.delete_one({'email': subscriber_email})
return result.deleted_count > 0

View File

@@ -0,0 +1,295 @@
"""
Newsletter Personalization Service for Munich News Daily.
Ranks and selects articles based on user interest profiles.
"""
from typing import Dict, List, Optional
from datetime import datetime, timedelta
from services.interest_profiling_service import get_user_interests
def calculate_article_score(
article: Dict,
user_interests: Optional[Dict],
category_weight: float = 0.4,
keyword_weight: float = 0.6
) -> float:
"""
Calculate personalization score for an article based on user interests.
Score is calculated as:
- Category match: 0-1.0 based on user's interest in the category
- Keyword match: Average of user's interest in article keywords
- Final score: (category_score * 0.4) + (keyword_score * 0.6)
Args:
article: Article dictionary with 'category' and 'keywords' fields
user_interests: User interest profile (None for non-personalized)
category_weight: Weight for category matching (default: 0.4)
keyword_weight: Weight for keyword matching (default: 0.6)
Returns:
float: Personalization score between 0.0 and 1.0
"""
# If no user interests, return neutral score
if not user_interests:
return 0.5
# Get article metadata
article_category = article.get('category', 'general')
article_keywords = article.get('keywords', [])
# Calculate category score
user_categories = user_interests.get('categories', {})
category_score = user_categories.get(article_category, 0.0)
# Calculate keyword score (average of all matching keywords)
user_keywords = user_interests.get('keywords', {})
keyword_scores = []
for keyword in article_keywords:
if keyword in user_keywords:
keyword_scores.append(user_keywords[keyword])
# Average keyword score (0.0 if no matches)
keyword_score = sum(keyword_scores) / len(keyword_scores) if keyword_scores else 0.0
# Weighted final score
final_score = (category_score * category_weight) + (keyword_score * keyword_weight)
return round(final_score, 3)
def rank_articles_for_user(
articles: List[Dict],
subscriber_email: str,
personalization_ratio: float = 0.7
) -> List[Dict]:
"""
Rank articles for a specific user based on their interests.
Mixes personalized content with trending content to avoid filter bubbles.
Args:
articles: List of article dictionaries
subscriber_email: Email address of the user
personalization_ratio: Ratio of personalized vs trending (default: 0.7 = 70% personalized)
Returns:
list: Articles sorted by personalization score with score added
"""
# Get user interests
user_interests = get_user_interests(subscriber_email)
# Calculate score for each article
scored_articles = []
for article in articles:
score = calculate_article_score(article, user_interests)
# Add score to article (don't modify original)
article_with_score = article.copy()
article_with_score['personalization_score'] = score
scored_articles.append(article_with_score)
# Sort by score (highest first)
scored_articles.sort(key=lambda x: x['personalization_score'], reverse=True)
return scored_articles
def select_personalized_articles(
articles: List[Dict],
subscriber_email: str,
max_articles: int = 10,
personalization_ratio: float = 0.7,
min_score_threshold: float = 0.1
) -> List[Dict]:
"""
Select and rank articles for a personalized newsletter.
Strategy:
- Top N * personalization_ratio articles: Highest scoring (personalized)
- Remaining articles: Most recent (trending/diverse content)
- Ensures mix of personalized + diverse content
Args:
articles: List of available articles
subscriber_email: Email address of the user
max_articles: Maximum number of articles to include (default: 10)
personalization_ratio: Ratio of personalized content (default: 0.7)
min_score_threshold: Minimum score to consider personalized (default: 0.1)
Returns:
list: Selected articles with personalization scores
"""
if not articles:
return []
# Rank all articles
ranked_articles = rank_articles_for_user(articles, subscriber_email, personalization_ratio)
# Calculate split
num_personalized = int(max_articles * personalization_ratio)
num_trending = max_articles - num_personalized
# Get personalized articles (high scoring)
personalized = [
a for a in ranked_articles
if a['personalization_score'] >= min_score_threshold
][:num_personalized]
# Get trending articles (most recent, not already selected)
personalized_ids = {a.get('_id') for a in personalized}
trending = [
a for a in ranked_articles
if a.get('_id') not in personalized_ids
][:num_trending]
# Combine: personalized first, then trending
selected = personalized + trending
# Ensure we don't exceed max_articles
return selected[:max_articles]
def get_personalization_explanation(
article: Dict,
user_interests: Optional[Dict]
) -> Dict[str, any]:
"""
Generate explanation for why an article was recommended.
Useful for transparency and debugging.
Args:
article: Article dictionary
user_interests: User interest profile
Returns:
dict: Explanation containing:
- score: Overall personalization score
- category_match: Category score
- keyword_matches: List of matching keywords with scores
- reason: Human-readable explanation
"""
if not user_interests:
return {
'score': 0.5,
'category_match': 0.0,
'keyword_matches': [],
'reason': 'No personalization data available'
}
article_category = article.get('category', 'general')
article_keywords = article.get('keywords', [])
user_categories = user_interests.get('categories', {})
user_keywords = user_interests.get('keywords', {})
# Category match
category_score = user_categories.get(article_category, 0.0)
# Keyword matches
keyword_matches = []
for keyword in article_keywords:
if keyword in user_keywords:
keyword_matches.append({
'keyword': keyword,
'score': user_keywords[keyword]
})
# Calculate overall score
overall_score = calculate_article_score(article, user_interests)
# Generate reason
if overall_score >= 0.5:
reason = f"High match with your interests in {article_category}"
if keyword_matches:
top_keywords = [m['keyword'] for m in keyword_matches[:2]]
reason += f" and topics like {', '.join(top_keywords)}"
elif overall_score >= 0.3:
reason = f"Moderate match with your interests"
else:
reason = "Trending article for diverse content"
return {
'score': overall_score,
'category_match': category_score,
'keyword_matches': keyword_matches,
'reason': reason
}
def get_personalization_stats(
selected_articles: List[Dict],
subscriber_email: str
) -> Dict[str, any]:
"""
Get statistics about personalization for a newsletter.
Args:
selected_articles: Articles selected for the newsletter
subscriber_email: Email address of the user
Returns:
dict: Statistics containing:
- total_articles: Number of articles
- avg_score: Average personalization score
- highly_personalized: Number of articles with score >= 0.5
- moderately_personalized: Number with score 0.3-0.5
- trending: Number with score < 0.3
"""
if not selected_articles:
return {
'total_articles': 0,
'avg_score': 0.0,
'highly_personalized': 0,
'moderately_personalized': 0,
'trending': 0
}
scores = [a.get('personalization_score', 0.0) for a in selected_articles]
avg_score = sum(scores) / len(scores)
highly_personalized = sum(1 for s in scores if s >= 0.5)
moderately_personalized = sum(1 for s in scores if 0.3 <= s < 0.5)
trending = sum(1 for s in scores if s < 0.3)
return {
'total_articles': len(selected_articles),
'avg_score': round(avg_score, 3),
'highly_personalized': highly_personalized,
'moderately_personalized': moderately_personalized,
'trending': trending
}
def batch_personalize_newsletters(
articles: List[Dict],
subscribers: List[str],
max_articles_per_user: int = 10
) -> Dict[str, List[Dict]]:
"""
Generate personalized article selections for multiple subscribers.
Useful for batch newsletter generation.
Args:
articles: List of available articles
subscribers: List of subscriber email addresses
max_articles_per_user: Max articles per newsletter (default: 10)
Returns:
dict: Mapping of email -> personalized article list
"""
personalized_newsletters = {}
for subscriber_email in subscribers:
personalized_articles = select_personalized_articles(
articles,
subscriber_email,
max_articles=max_articles_per_user
)
personalized_newsletters[subscriber_email] = personalized_articles
return personalized_newsletters

View File

@@ -80,6 +80,9 @@ def create_newsletter_tracking(
link_tracking_map = {}
if article_links:
# Import here to avoid circular dependency
from database import articles_collection
for article in article_links:
article_url = article.get('url')
article_title = article.get('title', '')
@@ -87,13 +90,22 @@ def create_newsletter_tracking(
if article_url:
link_tracking_id = generate_tracking_id()
# Create link click tracking record
# Look up article metadata from database for personalization
article_doc = articles_collection.find_one({'link': article_url})
article_id = str(article_doc['_id']) if article_doc else None
category = article_doc.get('category', 'general') if article_doc else 'general'
keywords = article_doc.get('keywords', []) if article_doc else []
# Create link click tracking record with metadata
link_click_doc = {
'tracking_id': link_tracking_id,
'newsletter_id': newsletter_id,
'subscriber_email': subscriber_email,
'article_url': article_url,
'article_title': article_title,
'article_id': article_id, # NEW: Article database ID
'category': category, # NEW: Article category
'keywords': keywords, # NEW: Article keywords for personalization
'clicked': False,
'clicked_at': None,
'user_agent': None,

View File

@@ -0,0 +1,221 @@
#!/usr/bin/env python3
"""
Comprehensive test suite for the personalization system.
Tests all 4 phases: keyword extraction, click tracking, interest profiling, and personalization.
"""
import sys
from pymongo import MongoClient
from datetime import datetime
# Import services
from services.tracking_service import create_newsletter_tracking
from services.interest_profiling_service import (
update_user_interests,
get_user_interests,
get_top_interests,
build_interests_from_history
)
from services.personalization_service import (
calculate_article_score,
rank_articles_for_user,
select_personalized_articles,
get_personalization_stats
)
from config import Config
# Connect to MongoDB
client = MongoClient(Config.MONGODB_URI)
db = client[Config.DB_NAME]
articles_collection = db['articles']
link_clicks_collection = db['link_clicks']
user_interests_collection = db['user_interests']
def test_phase1_keywords():
"""Phase 1: Verify articles have keywords extracted"""
print("\n" + "="*60)
print("Phase 1: Keyword Extraction")
print("="*60)
articles_with_keywords = articles_collection.count_documents({
'keywords': {'$exists': True, '$ne': []}
})
if articles_with_keywords == 0:
print("❌ No articles with keywords found")
print(" Run a crawl first to extract keywords")
return False
sample = articles_collection.find_one({'keywords': {'$exists': True, '$ne': []}})
print(f"✓ Found {articles_with_keywords} articles with keywords")
print(f" Sample: {sample.get('title', 'N/A')[:50]}...")
print(f" Keywords: {sample.get('keywords', [])[:3]}")
return True
def test_phase2_tracking():
"""Phase 2: Verify tracking includes keywords and metadata"""
print("\n" + "="*60)
print("Phase 2: Click Tracking Enhancement")
print("="*60)
test_email = 'test-phase2@example.com'
# Clean up
link_clicks_collection.delete_many({'subscriber_email': test_email})
# Get article with keywords
article = articles_collection.find_one({'keywords': {'$exists': True, '$ne': []}})
if not article:
print("❌ No articles found")
return False
# Create tracking
tracking_data = create_newsletter_tracking(
newsletter_id='test-phase2',
subscriber_email=test_email,
article_links=[{
'url': article['link'],
'title': article.get('title', '')
}]
)
# Verify tracking record
tracking_id = list(tracking_data['link_tracking_map'].values())[0]
tracking_record = link_clicks_collection.find_one({'tracking_id': tracking_id})
has_metadata = (
tracking_record.get('article_id') is not None and
tracking_record.get('category') is not None and
len(tracking_record.get('keywords', [])) > 0
)
# Clean up
link_clicks_collection.delete_many({'subscriber_email': test_email})
db['newsletter_sends'].delete_many({'subscriber_email': test_email})
if has_metadata:
print(f"✓ Tracking records include metadata")
print(f" Article ID: {tracking_record.get('article_id')}")
print(f" Category: {tracking_record.get('category')}")
print(f" Keywords: {len(tracking_record.get('keywords', []))} keywords")
return True
else:
print("❌ Tracking records missing metadata")
return False
def test_phase3_profiling():
"""Phase 3: Verify interest profiles are built from clicks"""
print("\n" + "="*60)
print("Phase 3: User Interest Profiling")
print("="*60)
test_email = 'test-phase3@example.com'
# Clean up
user_interests_collection.delete_many({'email': test_email})
# Create profile
update_user_interests(test_email, ['Bayern Munich', 'Football'], 'sports')
update_user_interests(test_email, ['Transportation', 'Munich'], 'local')
# Verify profile
profile = get_user_interests(test_email)
# Clean up
user_interests_collection.delete_many({'email': test_email})
if profile and profile['total_clicks'] == 2:
print(f"✓ Interest profile created")
print(f" Total clicks: {profile['total_clicks']}")
print(f" Categories: {len(profile.get('categories', {}))}")
print(f" Keywords: {len(profile.get('keywords', {}))}")
return True
else:
print("❌ Interest profile not created correctly")
return False
def test_phase4_personalization():
"""Phase 4: Verify articles are ranked by user interests"""
print("\n" + "="*60)
print("Phase 4: Personalized Newsletter Generation")
print("="*60)
test_email = 'test-phase4@example.com'
# Clean up
user_interests_collection.delete_many({'email': test_email})
# Get articles
articles = list(articles_collection.find(
{'keywords': {'$exists': True, '$ne': []}},
limit=5
))
if len(articles) < 3:
print("❌ Not enough articles found")
return False
# Create profile
update_user_interests(test_email, ['Bayern Munich', 'Football'], 'sports')
# Rank articles
ranked = rank_articles_for_user(articles, test_email)
# Select personalized
selected = select_personalized_articles(articles, test_email, max_articles=3)
# Clean up
user_interests_collection.delete_many({'email': test_email})
has_scores = all('personalization_score' in a for a in selected)
if has_scores and len(selected) > 0:
print(f"✓ Articles ranked and selected")
print(f" Total ranked: {len(ranked)}")
print(f" Selected: {len(selected)}")
print(f" Top score: {selected[0].get('personalization_score', 0):.3f}")
return True
else:
print("❌ Personalization failed")
return False
def main():
"""Run all personalization tests"""
print("\n" + "="*60)
print("PERSONALIZATION SYSTEM TEST SUITE")
print("="*60)
results = {
'Phase 1: Keyword Extraction': test_phase1_keywords(),
'Phase 2: Click Tracking': test_phase2_tracking(),
'Phase 3: Interest Profiling': test_phase3_profiling(),
'Phase 4: Personalization': test_phase4_personalization()
}
print("\n" + "="*60)
print("TEST RESULTS")
print("="*60)
for phase, passed in results.items():
status = "✅ PASS" if passed else "❌ FAIL"
print(f"{status} - {phase}")
all_passed = all(results.values())
if all_passed:
print("\n🎉 All personalization tests PASSED!")
return 0
else:
print("\n❌ Some tests FAILED")
return 1
if __name__ == '__main__':
sys.exit(main())