diff --git a/backend/app.py b/backend/app.py index b198667..e1c5816 100644 --- a/backend/app.py +++ b/backend/app.py @@ -10,6 +10,7 @@ from routes.newsletter_routes import newsletter_bp from routes.tracking_routes import tracking_bp from routes.analytics_routes import analytics_bp from routes.admin_routes import admin_bp +from routes.transport_routes import transport_bp # Initialize Flask app app = Flask(__name__) @@ -27,6 +28,7 @@ app.register_blueprint(newsletter_bp) app.register_blueprint(tracking_bp) app.register_blueprint(analytics_bp) app.register_blueprint(admin_bp) +app.register_blueprint(transport_bp) # Health check endpoint @app.route('/health') diff --git a/backend/requirements.txt b/backend/requirements.txt index c014e54..51643e0 100644 --- a/backend/requirements.txt +++ b/backend/requirements.txt @@ -5,4 +5,5 @@ python-dotenv==1.0.0 pymongo==4.6.1 requests==2.31.0 Jinja2==3.1.2 +redis==5.0.1 diff --git a/backend/routes/admin_routes.py b/backend/routes/admin_routes.py index d8a0aa8..1dd67c0 100644 --- a/backend/routes/admin_routes.py +++ b/backend/routes/admin_routes.py @@ -12,7 +12,8 @@ admin_bp = Blueprint('admin', __name__) @admin_bp.route('/api/admin/trigger-crawl', methods=['POST']) def trigger_crawl(): """ - Manually trigger the news crawler + Manually trigger the news crawler asynchronously via Redis queue + Uses Redis message queue for non-blocking execution Request body (optional): { @@ -20,6 +21,9 @@ def trigger_crawl(): } """ try: + import redis + import json + # Handle both JSON and empty body try: data = request.get_json(silent=True) or {} @@ -35,41 +39,29 @@ def trigger_crawl(): 'error': 'max_articles must be an integer between 1 and 100' }), 400 - # Execute crawler in crawler container using docker exec - try: - result = subprocess.run( - ['docker', 'exec', 'munich-news-crawler', 'python', 'crawler_service.py', str(max_articles)], - capture_output=True, - text=True, - timeout=300 # 5 minute timeout - ) - - # Check result - success = result.returncode == 0 - - return jsonify({ - 'success': success, - 'message': f'Crawler {"executed successfully" if success else "failed"}', - 'max_articles': max_articles, - 'output': result.stdout[-1000:] if result.stdout else '', # Last 1000 chars - 'errors': result.stderr[-500:] if result.stderr else '' - }), 200 if success else 500 - - except FileNotFoundError: - return jsonify({ - 'success': False, - 'error': 'Docker command not found. Make sure Docker is installed and the socket is mounted.' - }), 500 - - except subprocess.TimeoutExpired: + # Get Redis client + redis_url = os.getenv('REDIS_URL', 'redis://redis:6379') + r = redis.from_url(redis_url, decode_responses=True) + + # Publish message to Redis queue + message = { + 'task': 'crawl_news', + 'max_articles': max_articles, + 'timestamp': str(os.times()) + } + r.lpush('news_crawl_queue', json.dumps(message)) + + # Return immediately without waiting return jsonify({ - 'success': False, - 'error': 'Crawler timed out after 5 minutes' - }), 500 + 'success': True, + 'message': 'News crawl task queued', + 'max_articles': max_articles + }), 202 # 202 Accepted + except Exception as e: return jsonify({ 'success': False, - 'error': f'Failed to run crawler: {str(e)}' + 'error': f'Failed to queue news crawl: {str(e)}' }), 500 diff --git a/backend/routes/transport_routes.py b/backend/routes/transport_routes.py new file mode 100644 index 0000000..26884b2 --- /dev/null +++ b/backend/routes/transport_routes.py @@ -0,0 +1,74 @@ +from flask import Blueprint, jsonify +from database import db +import redis +import os +import json + +transport_bp = Blueprint('transport', __name__) + +REDIS_URL = os.getenv('REDIS_URL', 'redis://redis:6379') + + +def get_redis_client(): + """Get Redis client""" + return redis.from_url(REDIS_URL, decode_responses=True) + + +@transport_bp.route('/api/transport/crawl', methods=['POST']) +def trigger_transport_crawl(): + """Trigger transport disruption crawl asynchronously via Redis queue""" + try: + r = get_redis_client() + + # Publish message to Redis queue + message = { + 'task': 'crawl_transport', + 'timestamp': str(os.times()) + } + r.lpush('transport_crawl_queue', json.dumps(message)) + + # Return immediately without waiting + return jsonify({ + 'status': 'success', + 'message': 'Transport crawl task queued' + }), 202 # 202 Accepted + + except Exception as e: + return jsonify({ + 'status': 'error', + 'message': 'Failed to queue transport crawl', + 'details': str(e) + }), 500 + + +@transport_bp.route('/api/transport/disruptions', methods=['GET']) +def get_transport_disruptions(): + """Get current transport disruptions from MongoDB""" + try: + collection = db['transport_alerts'] + + # Get active disruptions + disruptions = list(collection.find( + {'is_active': True}, + {'_id': 0} + ).sort('updated_at', -1)) + + # Convert datetime to ISO format + for d in disruptions: + if d.get('start_time'): + d['start_time'] = d['start_time'].isoformat() + if d.get('end_time'): + d['end_time'] = d['end_time'].isoformat() + if d.get('updated_at'): + d['updated_at'] = d['updated_at'].isoformat() + + return jsonify({ + 'total': len(disruptions), + 'disruptions': disruptions + }), 200 + + except Exception as e: + return jsonify({ + 'error': 'Failed to fetch disruptions from database', + 'details': str(e) + }), 500 diff --git a/docker-compose.yml b/docker-compose.yml index f75b0c5..437b700 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -57,6 +57,20 @@ services: command: sh /setup-ollama-model.sh restart: on-failure + # Redis - Message queue for async tasks (Internal only - not exposed to host) + redis: + image: redis:7-alpine + container_name: munich-news-redis + restart: unless-stopped + # No ports exposed - only accessible within Docker network + networks: + - munich-news-network + healthcheck: + test: ["CMD", "redis-cli", "ping"] + interval: 30s + timeout: 10s + retries: 3 + # MongoDB Database (Internal only - not exposed to host) mongodb: image: mongo:latest @@ -90,8 +104,10 @@ services: depends_on: - mongodb - ollama + - redis environment: - MONGODB_URI=mongodb://${MONGO_USERNAME:-admin}:${MONGO_PASSWORD:-changeme}@mongodb:27017/ + - REDIS_URL=redis://redis:6379 - TZ=Europe/Berlin volumes: - ./backend/.env:/app/.env:ro @@ -112,10 +128,12 @@ services: restart: unless-stopped depends_on: - mongodb + - redis ports: - "5001:5001" environment: - MONGODB_URI=mongodb://${MONGO_USERNAME:-admin}:${MONGO_PASSWORD:-changeme}@mongodb:27017/ + - REDIS_URL=redis://redis:6379 - FLASK_PORT=5001 - TZ=Europe/Berlin volumes: @@ -130,6 +148,32 @@ services: retries: 3 start_period: 40s + # Transport Crawler - API service for MVG disruptions (Internal only - not exposed to host) + transport-crawler: + build: + context: ./transport_crawler + dockerfile: Dockerfile + container_name: munich-news-transport-crawler + restart: unless-stopped + depends_on: + - mongodb + - redis + # No ports exposed - only accessible within Docker network + environment: + - MONGODB_URI=mongodb://${MONGO_USERNAME:-admin}:${MONGO_PASSWORD:-changeme}@mongodb:27017/ + - REDIS_URL=redis://redis:6379 + - TZ=Europe/Berlin + volumes: + - ./backend/.env:/app/.env:ro + networks: + - munich-news-network + healthcheck: + test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:5002/health')"] + interval: 30s + timeout: 10s + retries: 3 + start_period: 40s + # Newsletter Sender - Runs at 7 AM Berlin time sender: build: @@ -141,6 +185,7 @@ services: - mongodb - backend - crawler + - transport-crawler environment: - MONGODB_URI=mongodb://${MONGO_USERNAME:-admin}:${MONGO_PASSWORD:-changeme}@mongodb:27017/ - TZ=Europe/Berlin diff --git a/news_crawler/Dockerfile b/news_crawler/Dockerfile index 4a825bc..53cae85 100644 --- a/news_crawler/Dockerfile +++ b/news_crawler/Dockerfile @@ -12,12 +12,12 @@ COPY backend/config.py /app/config.py # Copy crawler files (includes ollama_client.py) COPY news_crawler/ /app/ -# Make the scheduler executable -RUN chmod +x scheduled_crawler.py +# Make scripts executable +RUN chmod +x scheduled_crawler.py start.sh # Set timezone to Berlin ENV TZ=Europe/Berlin RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone -# Run the scheduled crawler -CMD ["python", "-u", "scheduled_crawler.py"] +# Run both scheduler and worker +CMD ["/app/start.sh"] diff --git a/news_crawler/requirements.txt b/news_crawler/requirements.txt index 19b585e..1c6e14f 100644 --- a/news_crawler/requirements.txt +++ b/news_crawler/requirements.txt @@ -6,3 +6,4 @@ pymongo==4.6.1 python-dotenv==1.0.0 schedule==1.2.0 pytz==2023.3 +redis==5.0.1 diff --git a/news_crawler/start.sh b/news_crawler/start.sh new file mode 100644 index 0000000..3776229 --- /dev/null +++ b/news_crawler/start.sh @@ -0,0 +1,8 @@ +#!/bin/bash +# Start both the scheduler and the worker + +# Start the worker in the background +python -u worker.py & + +# Start the scheduler in the foreground +python -u scheduled_crawler.py diff --git a/news_crawler/worker.py b/news_crawler/worker.py new file mode 100644 index 0000000..0d6892e --- /dev/null +++ b/news_crawler/worker.py @@ -0,0 +1,72 @@ +#!/usr/bin/env python3 +""" +News Crawler Worker - Listens to Redis queue and processes crawl tasks +""" +import redis +import json +import os +import time +from crawler_service import crawl_all_feeds + +REDIS_URL = os.getenv('REDIS_URL', 'redis://redis:6379') +QUEUE_NAME = 'news_crawl_queue' + + +def get_redis_client(): + """Get Redis client""" + return redis.from_url(REDIS_URL, decode_responses=True) + + +def process_crawl_task(message): + """Process a crawl task""" + try: + max_articles = message.get('max_articles', 10) + print(f"\n📨 Received task: {message.get('task')}") + print(f" Max articles per feed: {max_articles}") + print(f" Timestamp: {message.get('timestamp')}") + + # Run the crawler + result = crawl_all_feeds(max_articles_per_feed=max_articles) + + print(f"✅ Task completed: {result.get('total_articles_crawled')} articles crawled") + return True + + except Exception as e: + print(f"❌ Task failed: {e}") + import traceback + traceback.print_exc() + return False + + +def main(): + """Main worker loop""" + print("="*70) + print("📰 News Crawler Worker Starting") + print("="*70) + print(f"Redis URL: {REDIS_URL}") + print(f"Queue: {QUEUE_NAME}") + print("Waiting for tasks...") + print("="*70) + + r = get_redis_client() + + while True: + try: + # Block and wait for messages (timeout 1 second) + result = r.brpop(QUEUE_NAME, timeout=1) + + if result: + queue_name, message_json = result + message = json.loads(message_json) + process_crawl_task(message) + + except KeyboardInterrupt: + print("\n\n👋 Worker stopped by user") + break + except Exception as e: + print(f"\n❌ Worker error: {e}") + time.sleep(5) # Wait before retrying + + +if __name__ == '__main__': + main() diff --git a/news_sender/newsletter_template.html b/news_sender/newsletter_template.html index d2b22e0..bdf11dc 100644 --- a/news_sender/newsletter_template.html +++ b/news_sender/newsletter_template.html @@ -247,6 +247,70 @@ + {% if transport_disruptions and transport_disruptions|length > 0 %} + + + +
+ + + + + + +

+ 🚆 S-Bahn Disruptions Today +

+

+ Current service disruptions affecting Munich S-Bahn: +

+ + {% for disruption in transport_disruptions %} + + + + + +
+ +

+ {{ disruption.severity_icon }} {{ disruption.lines_str }} +

+ + +

+ {{ disruption.title }} +

+ + + {% if disruption.description %} +

+ {{ disruption.description }} +

+ {% endif %} + + + {% if disruption.start_time_str or disruption.end_time_str %} +

+ ⏰ + {% if disruption.start_time_str %} + From {{ disruption.start_time_str }} + {% endif %} + {% if disruption.end_time_str %} + until {{ disruption.end_time_str }} + {% endif %} +

+ {% endif %} +
+ {% endfor %} + +

+ 💡 Plan your commute accordingly. Check MVG.de for real-time updates. +

+ + + {% endif %} + diff --git a/news_sender/sender_service.py b/news_sender/sender_service.py index 7cc883a..cfe732d 100644 --- a/news_sender/sender_service.py +++ b/news_sender/sender_service.py @@ -77,6 +77,72 @@ client = MongoClient(Config.MONGODB_URI) db = client[Config.DB_NAME] articles_collection = db['articles'] subscribers_collection = db['subscribers'] +transport_alerts_collection = db['transport_alerts'] + + +def get_today_transport_disruptions(): + """ + Get active S-Bahn disruptions for today + Fetches from MongoDB transport_alerts collection + + Returns: + list: Active disruptions with details + """ + try: + from datetime import datetime + + # Get active disruptions + disruptions = list(transport_alerts_collection.find( + {'is_active': True}, + {'_id': 0} + ).sort('severity', -1).sort('updated_at', -1)) + + # Filter for disruptions happening today + today = datetime.utcnow().date() + today_disruptions = [] + + for d in disruptions: + # Check if disruption is active today + start_time = d.get('start_time') + end_time = d.get('end_time') + + is_today = False + if start_time and end_time: + start_date = start_time.date() if hasattr(start_time, 'date') else today + end_date = end_time.date() if hasattr(end_time, 'date') else today + is_today = start_date <= today <= end_date + elif start_time: + start_date = start_time.date() if hasattr(start_time, 'date') else today + is_today = start_date <= today + else: + is_today = True # No time info, assume it's relevant + + if is_today: + # Format times for display + if start_time: + d['start_time_str'] = start_time.strftime('%H:%M') if hasattr(start_time, 'strftime') else str(start_time) + if end_time: + d['end_time_str'] = end_time.strftime('%H:%M') if hasattr(end_time, 'strftime') else str(end_time) + + # Format lines as comma-separated string + d['lines_str'] = ', '.join(d.get('lines', [])) + + # Get severity icon + severity_icons = { + 'high': '🔴', + 'medium': '🟡', + 'low': '🟢' + } + d['severity_icon'] = severity_icons.get(d.get('severity', 'medium'), '🟡') + + today_disruptions.append(d) + + print(f"✓ Found {len(today_disruptions)} transport disruptions for today") + return today_disruptions + + except Exception as e: + print(f"✗ Error fetching transport disruptions: {e}") + return [] def get_latest_articles_by_categories(categories=None, articles_per_category=3, hours=24): @@ -296,6 +362,9 @@ def render_newsletter_html(articles, subscriber_categories=None, tracking_enable from weather_service import get_munich_weather weather = get_munich_weather() + # Get transport disruptions for today + transport_disruptions = get_today_transport_disruptions() + # Prepare template data now = datetime.now() total_articles = sum(len(section['articles']) for section in category_sections) @@ -308,7 +377,8 @@ def render_newsletter_html(articles, subscriber_categories=None, tracking_enable 'preferences_link': f'{Config.WEBSITE_URL}/preferences.html', 'website_link': Config.WEBSITE_URL, 'tracking_enabled': tracking_enabled, - 'weather': weather + 'weather': weather, + 'transport_disruptions': transport_disruptions } # Render HTML diff --git a/start-with-gpu.sh b/start-with-gpu.sh index 976e2b5..244832c 100755 --- a/start-with-gpu.sh +++ b/start-with-gpu.sh @@ -17,7 +17,7 @@ if command -v nvidia-smi &> /dev/null; then echo "✓ NVIDIA Docker runtime is available" echo "" echo "Starting services with GPU support..." - docker-compose -f docker-compose.yml -f docker-compose.gpu.yml up -d + docker compose -f docker-compose.yml -f docker-compose.gpu.yml up -d echo "" echo "✓ Services started with GPU acceleration!" echo "" diff --git a/transport_crawler/Dockerfile b/transport_crawler/Dockerfile new file mode 100644 index 0000000..878c6aa --- /dev/null +++ b/transport_crawler/Dockerfile @@ -0,0 +1,30 @@ +FROM python:3.11-slim + +WORKDIR /app + +# Install Chromium and dependencies for Selenium (works on both ARM64 and AMD64) +RUN apt-get update && apt-get install -y \ + chromium \ + chromium-driver \ + && rm -rf /var/lib/apt/lists/* + +# Set environment variable for Chromium +ENV CHROME_BIN=/usr/bin/chromium +ENV CHROMEDRIVER_PATH=/usr/bin/chromedriver + +# Install Python dependencies +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +# Copy crawler files +COPY *.py /app/ +COPY *.sh /app/ + +# Make start script executable +RUN chmod +x /app/start.sh + +# Expose port for API +EXPOSE 5002 + +# Run both API and worker +CMD ["/app/start.sh"] diff --git a/transport_crawler/api_service.py b/transport_crawler/api_service.py new file mode 100644 index 0000000..e971616 --- /dev/null +++ b/transport_crawler/api_service.py @@ -0,0 +1,64 @@ +#!/usr/bin/env python3 +""" +Transport Crawler API - Flask service for triggering crawls +""" +from flask import Flask, jsonify +from crawler_service import run_crawler +import os + +app = Flask(__name__) + +@app.route('/health', methods=['GET']) +def health(): + """Health check endpoint""" + return jsonify({'status': 'ok'}), 200 + + +@app.route('/api/transport/crawl', methods=['POST']) +def trigger_crawl(): + """Trigger a transport disruption crawl""" + try: + result = run_crawler() + return jsonify(result), 200 + except Exception as e: + return jsonify({'error': str(e)}), 500 + + +@app.route('/api/transport/disruptions', methods=['GET']) +def get_disruptions(): + """Get current active disruptions from MongoDB""" + try: + from pymongo import MongoClient + + mongo_uri = os.getenv('MONGODB_URI', 'mongodb://admin:changeme@mongodb:27017/') + client = MongoClient(mongo_uri) + db = client['munich_news'] + collection = db['transport_alerts'] + + # Get active disruptions + disruptions = list(collection.find( + {'is_active': True}, + {'_id': 0} + )) + + # Convert datetime to ISO format + for d in disruptions: + if d.get('start_time'): + d['start_time'] = d['start_time'].isoformat() + if d.get('end_time'): + d['end_time'] = d['end_time'].isoformat() + if d.get('updated_at'): + d['updated_at'] = d['updated_at'].isoformat() + + return jsonify({ + 'total': len(disruptions), + 'disruptions': disruptions + }), 200 + + except Exception as e: + return jsonify({'error': str(e)}), 500 + + +if __name__ == '__main__': + port = int(os.getenv('FLASK_PORT', 5002)) + app.run(host='0.0.0.0', port=port, debug=False) diff --git a/transport_crawler/crawler_service.py b/transport_crawler/crawler_service.py new file mode 100644 index 0000000..73c33ea --- /dev/null +++ b/transport_crawler/crawler_service.py @@ -0,0 +1,233 @@ +#!/usr/bin/env python3 +""" +Transport Crawler Service - Main orchestrator +Fetches disruptions from multiple sources and displays them +""" +from datetime import datetime +from mvg_api_client import MVGClient +from db_api_client import DBClient + +def print_header(): + """Print header""" + print("\n" + "="*70) + print("🚇 Munich Transport Disruption Crawler") + print("="*70) + print(f"Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") + print("="*70) + +def print_disruption_summary(all_disruptions): + """Print summary of all disruptions""" + if not all_disruptions: + print("\n✅ No disruptions found - All lines operating normally!") + return + + print(f"\n📊 SUMMARY: {len(all_disruptions)} Active Disruptions") + print("="*70) + + # Group by type + by_type = {} + for d in all_disruptions: + dtype = d.get('type', 'unknown') + by_type[dtype] = by_type.get(dtype, 0) + 1 + + print("\nBy Type:") + for dtype, count in sorted(by_type.items()): + icon = { + 'maintenance': '🔧', + 'disruption': '⚠️', + 'delay': '⏱️', + 'info': 'ℹ️' + }.get(dtype, '❓') + print(f" {icon} {dtype.title()}: {count}") + + # Group by source + by_source = {} + for d in all_disruptions: + source = d.get('source', 'unknown') + by_source[source] = by_source.get(source, 0) + 1 + + print("\nBy Source:") + for source, count in sorted(by_source.items()): + print(f" • {source}: {count}") + +def print_disruptions(disruptions, title): + """Print disruptions in a formatted way""" + if not disruptions: + return + + print(f"\n{title}") + print("-"*70) + + for i, d in enumerate(disruptions, 1): + # Icon based on type + icon = { + 'maintenance': '🔧', + 'disruption': '⚠️', + 'delay': '⏱️', + 'info': 'ℹ️' + }.get(d.get('type', 'info'), '❓') + + print(f"\n{icon} [{i}] {d.get('title', 'No title')}") + + # Lines affected + lines = d.get('lines', []) + if lines: + line_str = ', '.join(lines) + print(f" 🚇 Lines: {line_str}") + + # Time range + start = d.get('start_time') + end = d.get('end_time') + if start or end: + time_str = "" + if start: + time_str += f"From: {start.strftime('%d.%m %H:%M')}" + if end: + if time_str: + time_str += " → " + time_str += f"Until: {end.strftime('%d.%m %H:%M')}" + print(f" ⏰ {time_str}") + + # Description + desc = d.get('description', '') + if desc: + # Truncate long descriptions + if len(desc) > 150: + desc = desc[:150] + "..." + print(f" 📝 {desc}") + + # Severity + severity = d.get('severity', 'medium') + severity_icon = { + 'high': '🔴', + 'medium': '🟡', + 'low': '🟢' + }.get(severity, '⚪') + print(f" {severity_icon} Severity: {severity}") + +def save_to_mongodb(disruptions): + """Save disruptions to MongoDB""" + try: + from pymongo import MongoClient + import os + + mongo_uri = os.getenv('MONGODB_URI', 'mongodb://admin:changeme@mongodb:27017/') + client = MongoClient(mongo_uri) + db = client['munich_news'] + collection = db['transport_alerts'] + + print("\n💾 Saving to MongoDB...") + + # Mark all existing alerts as inactive + collection.update_many({}, {'$set': {'is_active': False}}) + + # Insert or update current disruptions + saved_count = 0 + for d in disruptions: + # Use disruption ID as unique identifier + collection.update_one( + {'alert_id': d['id']}, + { + '$set': { + 'alert_id': d['id'], + 'title': d['title'], + 'description': d['description'], + 'lines': d['lines'], + 'type': d['type'], + 'severity': d['severity'], + 'start_time': d['start_time'], + 'end_time': d['end_time'], + 'source': d['source'], + 'is_active': True, + 'updated_at': datetime.utcnow() + } + }, + upsert=True + ) + saved_count += 1 + + print(f"✓ Saved {saved_count} disruptions to MongoDB") + return True + + except Exception as e: + print(f"✗ MongoDB error: {e}") + return False + + +def run_crawler(): + """Main crawler function""" + print_header() + + all_disruptions = [] + + # 1. Fetch MVG disruptions (U-Bahn, Tram, Bus) + print("\n📡 Fetching data from sources...") + print("-"*70) + + mvg_client = MVGClient() + mvg_disruptions = mvg_client.get_disruptions() + all_disruptions.extend(mvg_disruptions) + + # 2. Fetch S-Bahn disruptions + db_client = DBClient() + sbahn_disruptions = db_client.get_sbahn_disruptions() + all_disruptions.extend(sbahn_disruptions) + + # 3. Print summary + print_disruption_summary(all_disruptions) + + # 4. Print detailed disruptions + if mvg_disruptions: + print_disruptions(mvg_disruptions, "\n🚇 MVG DISRUPTIONS (U-Bahn, Tram, Bus)") + + if sbahn_disruptions: + print_disruptions(sbahn_disruptions, "\n🚆 S-BAHN DISRUPTIONS") + + # 5. Output JSON + print("\n" + "="*70) + print("📄 JSON OUTPUT") + print("="*70) + + import json + output = { + 'timestamp': datetime.now().isoformat(), + 'total_disruptions': len(all_disruptions), + 'mvg_disruptions': len(mvg_disruptions), + 'sbahn_disruptions': len(sbahn_disruptions), + 'disruptions': [] + } + + for d in all_disruptions: + output['disruptions'].append({ + 'id': d.get('id'), + 'title': d.get('title'), + 'description': d.get('description'), + 'lines': d.get('lines', []), + 'type': d.get('type'), + 'severity': d.get('severity'), + 'start_time': d.get('start_time').isoformat() if d.get('start_time') else None, + 'end_time': d.get('end_time').isoformat() if d.get('end_time') else None, + 'source': d.get('source') + }) + + print(json.dumps(output, indent=2, ensure_ascii=False)) + + # 6. Save to MongoDB + save_to_mongodb(all_disruptions) + + # Footer + print("\n" + "="*70) + print("✓ Crawler finished") + print("="*70 + "\n") + + return output + +if __name__ == '__main__': + try: + disruptions = run_crawler() + except KeyboardInterrupt: + print("\n\n👋 Crawler stopped by user") + except Exception as e: + print(f"\n\n❌ Crawler error: {e}") + import traceback + traceback.print_exc() diff --git a/transport_crawler/db_api_client.py b/transport_crawler/db_api_client.py new file mode 100644 index 0000000..b42fd8c --- /dev/null +++ b/transport_crawler/db_api_client.py @@ -0,0 +1,789 @@ +#!/usr/bin/env python3 +""" +Deutsche Bahn API Client - Fetch S-Bahn disruptions using Selenium +""" +import requests +from datetime import datetime +import time + +class DBClient: + """Client for Deutsche Bahn (S-Bahn) disruptions""" + + # DB S-Bahn München map page + MAP_URL = "https://karte.bahn.de/en/region/DB_SBahn_Muenchen" + + def __init__(self): + self.session = requests.Session() + self.session.headers.update({ + 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36', + 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', + 'Accept-Language': 'en-US,en;q=0.9,de;q=0.8', + }) + + def get_sbahn_disruptions(self): + """ + Fetch S-Bahn disruptions for Munich from DB Karte using Selenium + + Returns: + list: Disruption data + """ + print("\n🔍 Fetching S-Bahn disruptions from DB Karte (using Selenium)...") + + driver = None + try: + from selenium import webdriver + from selenium.webdriver.chrome.options import Options + from selenium.webdriver.chrome.service import Service + from selenium.webdriver.common.by import By + from selenium.webdriver.support.ui import WebDriverWait + from selenium.webdriver.support import expected_conditions as EC + import os + + # Setup Chrome options for Chromium + chrome_options = Options() + chrome_options.add_argument('--headless') + chrome_options.add_argument('--no-sandbox') + chrome_options.add_argument('--disable-dev-shm-usage') + chrome_options.add_argument('--disable-blink-features=AutomationControlled') + chrome_options.add_argument('--window-size=1920,1080') + chrome_options.add_argument('--disable-gpu') + chrome_options.add_experimental_option("excludeSwitches", ["enable-automation"]) + chrome_options.add_experimental_option('useAutomationExtension', False) + + # Set realistic user agent + chrome_options.add_argument('user-agent=Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36') + + # Use system Chromium if available (Docker container) + chrome_bin = os.getenv('CHROME_BIN', '/usr/bin/chromium') + chromedriver_path = os.getenv('CHROMEDRIVER_PATH', '/usr/bin/chromedriver') + + if os.path.exists(chrome_bin): + chrome_options.binary_location = chrome_bin + print(f" Using system Chromium: {chrome_bin}") + + print(" Starting Chromium browser...") + + # Try to use system chromedriver + try: + if os.path.exists(chromedriver_path): + service = Service(chromedriver_path) + driver = webdriver.Chrome(service=service, options=chrome_options) + else: + driver = webdriver.Chrome(options=chrome_options) + except Exception as e: + print(f" ✗ Failed to start Chromium: {e}") + print(f" ℹ️ Falling back to webdriver-manager...") + try: + from webdriver_manager.chrome import ChromeDriverManager + service = Service(ChromeDriverManager().install()) + driver = webdriver.Chrome(service=service, options=chrome_options) + except Exception as e2: + print(f" ✗ webdriver-manager also failed: {e2}") + raise + + print(f" Loading: {self.MAP_URL}") + driver.get(self.MAP_URL) + + # Wait for page to load + print(" Waiting for page to load...") + + # Wait for disruption boxes to appear + try: + print(" Waiting for disruption boxes...") + WebDriverWait(driver, 15).until( + EC.presence_of_element_located((By.CSS_SELECTOR, "div[data-cy='disruptionbox']")) + ) + # Give extra time for all boxes to load + time.sleep(3) + print(" ✓ Disruption boxes should be loaded") + except Exception as e: + print(f" ⚠ Timeout waiting for disruption boxes: {e}") + time.sleep(5) + + print(f" ✓ Page loaded (title: {driver.title[:50]}...)") + + # Debug: Save screenshot and page source + try: + screenshot_path = "/tmp/db_karte_screenshot.png" + driver.save_screenshot(screenshot_path) + print(f" 📸 Screenshot saved to: {screenshot_path}") + except: + pass + + # Debug: Print page structure + print(" Analyzing page structure...") + page_source = driver.page_source + + # Save page source for inspection + try: + with open("/tmp/db_karte_source.html", "w", encoding="utf-8") as f: + f.write(page_source) + print(f" 📄 Page source saved to: /tmp/db_karte_source.html") + except: + pass + + # Look for disruption markers/icons on the map + disruptions = self._find_and_click_disruptions(driver) + + # If no disruptions found via clicking, parse the page source + if not disruptions: + print(" No clickable disruptions found, parsing page source...") + + # Debug: Show what elements are on the page + from bs4 import BeautifulSoup + soup = BeautifulSoup(page_source, 'html.parser') + + # Count different element types + print(f" Page stats: {len(soup.find_all('div'))} divs, {len(soup.find_all('button'))} buttons") + + # Look for any text mentioning disruptions + text = soup.get_text().lower() + if 'disruption' in text or 'störung' in text or 'incident' in text: + print(f" ℹ️ Page contains disruption-related text") + + # Check for common map libraries + if 'leaflet' in page_source.lower(): + print(f" ℹ️ Page uses Leaflet maps") + if 'mapbox' in page_source.lower(): + print(f" ℹ️ Page uses Mapbox") + if 'google.maps' in page_source.lower(): + print(f" ℹ️ Page uses Google Maps") + + disruptions = self._parse_selenium_page(page_source, driver) + + if disruptions: + print(f"✓ Found {len(disruptions)} S-Bahn disruptions") + else: + print(f" ℹ️ No S-Bahn disruptions found (all lines operating normally)") + + return disruptions + + except ImportError as e: + print(f" ✗ Selenium not available: {e}") + print(f" ℹ️ Install with: pip install selenium webdriver-manager") + return [] + except Exception as e: + print(f" ✗ Error: {e}") + import traceback + traceback.print_exc() + return [] + finally: + if driver: + driver.quit() + + def _find_and_click_disruptions(self, driver): + """Find disruption boxes in the sidebar""" + try: + from selenium.webdriver.common.by import By + + disruptions = [] + + print(" Looking for disruption boxes...") + + # Find all disruption boxes in the sidebar + disruption_boxes = driver.find_elements(By.CSS_SELECTOR, "div[data-cy='disruptionbox']") + + if not disruption_boxes: + print(" No disruption boxes found") + return [] + + print(f" Found {len(disruption_boxes)} disruption boxes") + + # First pass: collect all basic info without clicking + basic_info = [] + for i, box in enumerate(disruption_boxes): + try: + + # Extract disruption ID + disruption_id = box.get_attribute('id') + + # Extract title + title_elem = box.find_element(By.CSS_SELECTOR, "span[data-cy='disruptionboxTitle']") + title = title_elem.text.strip() + + # Extract subtitle (type) + subtitle_elem = box.find_element(By.CSS_SELECTOR, "span[data-cy='disruptionboxSubtitle']") + subtitle = subtitle_elem.text.strip() + + # Extract affected lines + lines = [] + badge_list = box.find_element(By.CSS_SELECTOR, "div[data-cy='disruptionBadgeList']") + badges = badge_list.find_elements(By.CSS_SELECTOR, "span[data-cy='disruptionBadge']") + for badge in badges: + line_text = badge.text.strip() + if line_text and line_text.startswith('S'): + lines.append(line_text) + + # Determine severity from icon + severity = 'medium' + try: + icon = box.find_element(By.CSS_SELECTOR, "img[data-cy='disruptionboxIcon']") + icon_src = icon.get_attribute('src') + if 'red' in icon_src: + severity = 'high' + elif 'orange' in icon_src: + severity = 'medium' + elif 'yellow' in icon_src: + severity = 'low' + except: + pass + + # Store basic info + basic_info.append({ + 'id': disruption_id or f"sbahn_{i}", + 'title': title, + 'subtitle': subtitle, + 'lines': lines, + 'severity': severity, + 'index': i + }) + + print(f" ✓ [{i}] {title[:60]}... (Lines: {', '.join(lines)})") + + except Exception as e: + print(f" ✗ Error extracting disruption {i}: {e}") + continue + + # Second pass: click each one to get time details + print(f"\n Extracting time details for {len(basic_info)} disruptions...") + for info in basic_info: + print(f" Processing disruption {info['index']}...") + try: + # Make sure we're back at the list view + driver.execute_script("window.scrollTo(0, 0);") + time.sleep(0.5) + + # Wait for boxes to be present again + try: + WebDriverWait(driver, 3).until( + EC.presence_of_element_located((By.CSS_SELECTOR, "div[data-cy='disruptionbox']")) + ) + except: + pass + + # Refetch boxes each time + boxes = driver.find_elements(By.CSS_SELECTOR, "div[data-cy='disruptionbox']") + print(f" Found {len(boxes)} boxes after refetch") + + if info['index'] >= len(boxes): + print(f" ⚠ Box {info['index']} not found (only {len(boxes)} boxes available)") + continue + + # Get fresh reference to the box and button + box = boxes[info['index']] + button = box.find_element(By.TAG_NAME, "button") + + # Click to open details + driver.execute_script("arguments[0].scrollIntoView(true);", button) + time.sleep(0.3) + driver.execute_script("arguments[0].click();", button) # Use JS click + time.sleep(1.5) # Wait for detail panel to fully open + + # Extract time from page text + detail_text = driver.find_element(By.TAG_NAME, "body").text + + # Debug: show a snippet of the detail text + if "From:" in detail_text and "To:" in detail_text: + snippet_start = detail_text.find("From:") + snippet_end = detail_text.find("To:", snippet_start) + 50 + snippet = detail_text[snippet_start:snippet_end] + print(f" Time snippet: {snippet.replace(chr(10), ' ')}") + + start_time, end_time = self._extract_time_range(detail_text) + + # Go back to original page to reset the view + driver.get(self.MAP_URL) + time.sleep(3) # Wait for page to reload and boxes to appear + + # Create disruption object + disruption_type = self._classify_type(info['title'] + ' ' + info['subtitle']) + + disruption = { + 'id': info['id'], + 'title': info['title'], + 'description': info['subtitle'], + 'lines': info['lines'], + 'type': disruption_type, + 'start_time': start_time, + 'end_time': end_time, + 'severity': info['severity'], + 'source': 'db_karte_sidebar', + 'created_at': datetime.utcnow() + } + + disruptions.append(disruption) + + time_info = "" + if start_time: + time_info += f" From: {start_time.strftime('%d.%m %H:%M')}" + if end_time: + time_info += f" To: {end_time.strftime('%d.%m %H:%M')}" + + if time_info: + print(f" ✓ [{info['index']}]{time_info}") + + except Exception as e: + print(f" ⚠ Could not get time for disruption {info['index']}: {e}") + # Still add the disruption without time info + disruption = { + 'id': info['id'], + 'title': info['title'], + 'description': info['subtitle'], + 'lines': info['lines'], + 'type': self._classify_type(info['title']), + 'start_time': None, + 'end_time': None, + 'severity': info['severity'], + 'source': 'db_karte_sidebar', + 'created_at': datetime.utcnow() + } + disruptions.append(disruption) + + return disruptions + + except Exception as e: + print(f" ✗ Error finding disruption boxes: {e}") + return [] + + def _extract_disruption_details(self, driver): + """Extract disruption details from popup/modal""" + try: + from selenium.webdriver.common.by import By + + # Look for popup/modal/tooltip containers + popup_selectors = [ + "div[class*='popup']", + "div[class*='modal']", + "div[class*='tooltip']", + "div[class*='detail']", + "div[class*='info']", + "[role='dialog']", + "[role='tooltip']", + ] + + popup = None + for selector in popup_selectors: + try: + elements = driver.find_elements(By.CSS_SELECTOR, selector) + for elem in elements: + if elem.is_displayed() and len(elem.text) > 20: + popup = elem + break + if popup: + break + except: + continue + + if not popup: + # Try to get any recently appeared text + body = driver.find_element(By.TAG_NAME, "body") + popup_text = body.text + else: + popup_text = popup.text + + # Check if it's S-Bahn related + if not self._contains_sbahn_reference(popup_text): + return None + + # Extract title (usually first line or heading) + title = popup_text.split('\n')[0][:100] if '\n' in popup_text else popup_text[:100] + + # Extract time information + start_time, end_time = self._extract_time_range(popup_text) + + # Extract affected lines + lines = self._extract_lines_from_text(popup_text) + + return { + 'id': f"sbahn_detail_{hash(popup_text) % 10000}", + 'title': title, + 'description': popup_text[:500], + 'lines': lines, + 'type': self._classify_type(title), + 'start_time': start_time, + 'end_time': end_time, + 'severity': self._determine_severity(popup_text), + 'source': 'db_karte_detail', + 'created_at': datetime.utcnow() + } + + except Exception as e: + return None + + def _extract_time_range(self, text): + """Extract start and end time from text""" + import re + from datetime import datetime + + start_time = None + end_time = None + + # Look for the specific format with possible newlines + # Pattern: From:XX.YYYY-MM-DD, HH:MMTo:XX.YYYY-MM-DD, HH:MM + # Remove newlines first to make matching easier + text_clean = text.replace('\n', ' ').replace('\r', ' ') + + pattern = r'From:\s*[A-Za-z]{2}\.\s*(\d{4}-\d{2}-\d{2}),\s*(\d{2}:\d{2})\s*To:\s*[A-Za-z]{2}\.\s*(\d{4}-\d{2}-\d{2}),\s*(\d{2}:\d{2})' + match = re.search(pattern, text_clean) + + if match: + try: + start_date = match.group(1) # 2025-11-13 + start_time_str = match.group(2) # 10:02 + end_date = match.group(3) # 2025-11-13 + end_time_str = match.group(4) # 14:30 + + start_time = datetime.strptime(f"{start_date} {start_time_str}", "%Y-%m-%d %H:%M") + end_time = datetime.strptime(f"{end_date} {end_time_str}", "%Y-%m-%d %H:%M") + except Exception as e: + print(f" ⚠ Error parsing time: {e}") + + # Fallback: Try other German formats + if not start_time: + # Look for "ab DD.MM.YYYY HH:MM" or "bis DD.MM.YYYY HH:MM" + ab_pattern = r'ab\s+(\d{1,2}\.\d{1,2}\.\d{4})[,\s]+(\d{1,2}:\d{2})' + bis_pattern = r'bis\s+(\d{1,2}\.\d{1,2}\.\d{4})[,\s]+(\d{1,2}:\d{2})' + + ab_match = re.search(ab_pattern, text, re.IGNORECASE) + if ab_match: + try: + start_time = datetime.strptime(f"{ab_match.group(1)} {ab_match.group(2)}", "%d.%m.%Y %H:%M") + except: + pass + + bis_match = re.search(bis_pattern, text, re.IGNORECASE) + if bis_match: + try: + end_time = datetime.strptime(f"{bis_match.group(1)} {bis_match.group(2)}", "%d.%m.%Y %H:%M") + except: + pass + + return start_time, end_time + + def _determine_severity(self, text): + """Determine severity based on keywords""" + text_lower = text.lower() + + if any(word in text_lower for word in ['ausfall', 'gesperrt', 'eingestellt', 'komplett']): + return 'high' + elif any(word in text_lower for word in ['verspätung', 'verzögerung', 'teilweise']): + return 'medium' + else: + return 'low' + + def _parse_selenium_page(self, page_source, driver): + """Parse page loaded by Selenium""" + try: + from bs4 import BeautifulSoup + from selenium.webdriver.common.by import By + + print(" Analyzing rendered page...") + soup = BeautifulSoup(page_source, 'html.parser') + disruptions = [] + + # Method 1: Try to find disruption elements directly via Selenium + try: + # Look for common disruption indicators + selectors = [ + "div[class*='disruption']", + "div[class*='stoerung']", + "div[class*='incident']", + "div[class*='message']", + "div[class*='alert']", + "[data-disruption]", + "[data-incident]" + ] + + for selector in selectors: + try: + elements = driver.find_elements(By.CSS_SELECTOR, selector) + if elements: + print(f" Found {len(elements)} elements with selector: {selector}") + for elem in elements: + text = elem.text.strip() + if len(text) > 20 and self._contains_sbahn_reference(text): + disruptions.append(self._create_disruption_from_text(text)) + except: + continue + except Exception as e: + print(f" ✗ Selenium element search error: {e}") + + # Method 2: Parse the page source with BeautifulSoup + if not disruptions: + print(" Trying BeautifulSoup parsing...") + disruptions = self._parse_map_page(page_source.encode(), page_source) + + # Method 3: Check for any text mentioning S-Bahn lines with disruptions + if not disruptions: + print(" Checking page text for S-Bahn mentions...") + page_text = soup.get_text() + if self._contains_sbahn_reference(page_text): + # Extract paragraphs or sections mentioning S-Bahn + for elem in soup.find_all(['p', 'div', 'span']): + text = elem.get_text(strip=True) + if len(text) > 30 and self._contains_sbahn_reference(text): + lines = self._extract_lines_from_text(text) + if lines: + disruptions.append(self._create_disruption_from_text(text)) + + # Remove duplicates + seen = set() + unique = [] + for d in disruptions: + key = d['title'][:50] + if key not in seen: + seen.add(key) + unique.append(d) + + return unique + + except Exception as e: + print(f" ✗ Parse error: {e}") + import traceback + traceback.print_exc() + return [] + + def _contains_sbahn_reference(self, text): + """Check if text contains S-Bahn line references""" + import re + return bool(re.search(r'S[\s-]?[1-8]', text, re.IGNORECASE)) + + def _create_disruption_from_text(self, text): + """Create disruption object from text""" + # Extract first sentence or first 100 chars as title + sentences = text.split('.') + title = sentences[0][:100] if sentences else text[:100] + + return { + 'id': f"sbahn_{hash(text) % 10000}", + 'title': title, + 'description': text[:500], + 'lines': self._extract_lines_from_text(text), + 'type': self._classify_type(title), + 'start_time': None, + 'end_time': None, + 'severity': 'medium', + 'source': 'db_karte_selenium', + 'created_at': datetime.utcnow() + } + + def _parse_map_page(self, html_content, html_text): + """Parse DB Karte map page for S-Bahn disruptions""" + try: + from bs4 import BeautifulSoup + import re + import json + + disruptions = [] + + # Method 1: Look for embedded JSON data in script tags + print(" Analyzing page for disruption data...") + + # The map page likely has JSON data embedded in