""" Article Clustering Module Detects and groups similar articles from different sources using Ollama AI """ from difflib import SequenceMatcher from datetime import datetime, timedelta from typing import List, Dict, Optional from ollama_client import OllamaClient class ArticleClusterer: """ Clusters articles about the same story from different sources using Ollama AI """ def __init__(self, ollama_client: OllamaClient, similarity_threshold=0.75, time_window_hours=24): """ Initialize clusterer Args: ollama_client: OllamaClient instance for AI-based similarity detection similarity_threshold: Minimum similarity to consider articles as same story (0-1) time_window_hours: Time window to look for similar articles """ self.ollama_client = ollama_client self.similarity_threshold = similarity_threshold self.time_window_hours = time_window_hours def normalize_title(self, title: str) -> str: """ Normalize title for comparison Args: title: Article title Returns: Normalized title (lowercase, stripped) """ return title.lower().strip() def simple_stem(self, word: str) -> str: """ Simple German word stemming (remove common suffixes) Args: word: Word to stem Returns: Stemmed word """ # Remove common German suffixes suffixes = ['ungen', 'ung', 'en', 'er', 'e', 'n', 's'] for suffix in suffixes: if len(word) > 5 and word.endswith(suffix): return word[:-len(suffix)] return word def extract_keywords(self, text: str) -> set: """ Extract important keywords from text with simple stemming Args: text: Article title or content Returns: Set of stemmed keywords """ # Common German stop words to ignore stop_words = { 'der', 'die', 'das', 'den', 'dem', 'des', 'ein', 'eine', 'einer', 'eines', 'und', 'oder', 'aber', 'in', 'im', 'am', 'um', 'für', 'von', 'zu', 'nach', 'bei', 'mit', 'auf', 'an', 'aus', 'über', 'unter', 'gegen', 'durch', 'ist', 'sind', 'war', 'waren', 'hat', 'haben', 'wird', 'werden', 'wurde', 'wurden', 'neue', 'neuer', 'neues', 'neuen', 'sich', 'auch', 'nicht', 'nur', 'noch', 'mehr', 'als', 'wie', 'beim', 'zum', 'zur', 'vom', 'ins', 'ans' } # Normalize and split words = text.lower().strip().split() # Filter out stop words, short words, and apply stemming keywords = set() for word in words: # Remove punctuation word = ''.join(c for c in word if c.isalnum() or c == '-') if len(word) > 3 and word not in stop_words: # Apply simple stemming stemmed = self.simple_stem(word) keywords.add(stemmed) return keywords def check_same_story_with_ai(self, article1: Dict, article2: Dict) -> bool: """ Use Ollama AI to determine if two articles are about the same story Args: article1: First article article2: Second article Returns: True if same story, False otherwise """ if not self.ollama_client.enabled: # Fallback to keyword-based similarity return self.calculate_similarity(article1, article2) >= self.similarity_threshold title1 = article1.get('title', '') title2 = article2.get('title', '') content1 = article1.get('content', '')[:300] # First 300 chars content2 = article2.get('content', '')[:300] prompt = f"""Compare these two news articles and determine if they are about the SAME story/event. Article 1: Title: {title1} Content: {content1} Article 2: Title: {title2} Content: {content2} Answer with ONLY "YES" if they are about the same story/event, or "NO" if they are different stories. Consider them the same story if they report on the same event, even if from different perspectives. Answer:""" try: response = self.ollama_client.generate(prompt, max_tokens=10) answer = response.get('text', '').strip().upper() return 'YES' in answer except Exception as e: print(f" ⚠ AI clustering failed: {e}, using fallback") # Fallback to keyword-based similarity return self.calculate_similarity(article1, article2) >= self.similarity_threshold def calculate_similarity(self, article1: Dict, article2: Dict) -> float: """ Calculate similarity between two articles using title and content Args: article1: First article (dict with 'title' and optionally 'content') article2: Second article (dict with 'title' and optionally 'content') Returns: Similarity score (0-1) """ title1 = article1.get('title', '') title2 = article2.get('title', '') content1 = article1.get('content', '') content2 = article2.get('content', '') # Extract keywords from titles title_keywords1 = self.extract_keywords(title1) title_keywords2 = self.extract_keywords(title2) # Calculate title similarity if title_keywords1 and title_keywords2: title_intersection = title_keywords1.intersection(title_keywords2) title_union = title_keywords1.union(title_keywords2) title_similarity = len(title_intersection) / len(title_union) if title_union else 0 else: # Fallback to string similarity t1 = self.normalize_title(title1) t2 = self.normalize_title(title2) title_similarity = SequenceMatcher(None, t1, t2).ratio() # If we have content, use it for better accuracy if content1 and content2: # Extract keywords from first 500 chars of content (for performance) content_keywords1 = self.extract_keywords(content1[:500]) content_keywords2 = self.extract_keywords(content2[:500]) if content_keywords1 and content_keywords2: content_intersection = content_keywords1.intersection(content_keywords2) content_union = content_keywords1.union(content_keywords2) content_similarity = len(content_intersection) / len(content_union) if content_union else 0 # Weighted average: title (40%) + content (60%) return (title_similarity * 0.4) + (content_similarity * 0.6) # If no content, use only title similarity return title_similarity def find_cluster(self, article: Dict, existing_articles: List[Dict]) -> Optional[str]: """ Find if article belongs to an existing cluster using AI Args: article: New article to cluster (dict with 'title' and optionally 'content') existing_articles: List of existing articles Returns: cluster_id if found, None otherwise """ cutoff_time = datetime.utcnow() - timedelta(hours=self.time_window_hours) for existing in existing_articles: # Only compare recent articles published_at = existing.get('published_at') if published_at and published_at < cutoff_time: continue # Use AI to check if same story if self.check_same_story_with_ai(article, existing): return existing.get('cluster_id', str(existing.get('_id'))) return None def cluster_article(self, article: Dict, existing_articles: List[Dict]) -> Dict: """ Cluster a single article Args: article: Article to cluster existing_articles: List of existing articles Returns: Article with cluster_id and is_primary fields """ cluster_id = self.find_cluster(article, existing_articles) if cluster_id: # Add to existing cluster article['cluster_id'] = cluster_id article['is_primary'] = False else: # Create new cluster article['cluster_id'] = str(article.get('_id', datetime.utcnow().timestamp())) article['is_primary'] = True return article def get_cluster_articles(self, cluster_id: str, articles_collection) -> List[Dict]: """ Get all articles in a cluster Args: cluster_id: Cluster ID articles_collection: MongoDB collection Returns: List of articles in the cluster """ return list(articles_collection.find({'cluster_id': cluster_id}))