Files
Munich-news/transport_crawler/db_api_client.py
2025-11-14 12:51:18 +01:00

790 lines
33 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Deutsche Bahn API Client - Fetch S-Bahn disruptions using Selenium
"""
import requests
from datetime import datetime
import time
class DBClient:
"""Client for Deutsche Bahn (S-Bahn) disruptions"""
# DB S-Bahn München map page
MAP_URL = "https://karte.bahn.de/en/region/DB_SBahn_Muenchen"
def __init__(self):
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.9,de;q=0.8',
})
def get_sbahn_disruptions(self):
"""
Fetch S-Bahn disruptions for Munich from DB Karte using Selenium
Returns:
list: Disruption data
"""
print("\n🔍 Fetching S-Bahn disruptions from DB Karte (using Selenium)...")
driver = None
try:
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import os
# Setup Chrome options for Chromium
chrome_options = Options()
chrome_options.add_argument('--headless')
chrome_options.add_argument('--no-sandbox')
chrome_options.add_argument('--disable-dev-shm-usage')
chrome_options.add_argument('--disable-blink-features=AutomationControlled')
chrome_options.add_argument('--window-size=1920,1080')
chrome_options.add_argument('--disable-gpu')
chrome_options.add_experimental_option("excludeSwitches", ["enable-automation"])
chrome_options.add_experimental_option('useAutomationExtension', False)
# Set realistic user agent
chrome_options.add_argument('user-agent=Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36')
# Use system Chromium if available (Docker container)
chrome_bin = os.getenv('CHROME_BIN', '/usr/bin/chromium')
chromedriver_path = os.getenv('CHROMEDRIVER_PATH', '/usr/bin/chromedriver')
if os.path.exists(chrome_bin):
chrome_options.binary_location = chrome_bin
print(f" Using system Chromium: {chrome_bin}")
print(" Starting Chromium browser...")
# Try to use system chromedriver
try:
if os.path.exists(chromedriver_path):
service = Service(chromedriver_path)
driver = webdriver.Chrome(service=service, options=chrome_options)
else:
driver = webdriver.Chrome(options=chrome_options)
except Exception as e:
print(f" ✗ Failed to start Chromium: {e}")
print(f" Falling back to webdriver-manager...")
try:
from webdriver_manager.chrome import ChromeDriverManager
service = Service(ChromeDriverManager().install())
driver = webdriver.Chrome(service=service, options=chrome_options)
except Exception as e2:
print(f" ✗ webdriver-manager also failed: {e2}")
raise
print(f" Loading: {self.MAP_URL}")
driver.get(self.MAP_URL)
# Wait for page to load
print(" Waiting for page to load...")
# Wait for disruption boxes to appear
try:
print(" Waiting for disruption boxes...")
WebDriverWait(driver, 15).until(
EC.presence_of_element_located((By.CSS_SELECTOR, "div[data-cy='disruptionbox']"))
)
# Give extra time for all boxes to load
time.sleep(3)
print(" ✓ Disruption boxes should be loaded")
except Exception as e:
print(f" ⚠ Timeout waiting for disruption boxes: {e}")
time.sleep(5)
print(f" ✓ Page loaded (title: {driver.title[:50]}...)")
# Debug: Save screenshot and page source
try:
screenshot_path = "/tmp/db_karte_screenshot.png"
driver.save_screenshot(screenshot_path)
print(f" 📸 Screenshot saved to: {screenshot_path}")
except:
pass
# Debug: Print page structure
print(" Analyzing page structure...")
page_source = driver.page_source
# Save page source for inspection
try:
with open("/tmp/db_karte_source.html", "w", encoding="utf-8") as f:
f.write(page_source)
print(f" 📄 Page source saved to: /tmp/db_karte_source.html")
except:
pass
# Look for disruption markers/icons on the map
disruptions = self._find_and_click_disruptions(driver)
# If no disruptions found via clicking, parse the page source
if not disruptions:
print(" No clickable disruptions found, parsing page source...")
# Debug: Show what elements are on the page
from bs4 import BeautifulSoup
soup = BeautifulSoup(page_source, 'html.parser')
# Count different element types
print(f" Page stats: {len(soup.find_all('div'))} divs, {len(soup.find_all('button'))} buttons")
# Look for any text mentioning disruptions
text = soup.get_text().lower()
if 'disruption' in text or 'störung' in text or 'incident' in text:
print(f" Page contains disruption-related text")
# Check for common map libraries
if 'leaflet' in page_source.lower():
print(f" Page uses Leaflet maps")
if 'mapbox' in page_source.lower():
print(f" Page uses Mapbox")
if 'google.maps' in page_source.lower():
print(f" Page uses Google Maps")
disruptions = self._parse_selenium_page(page_source, driver)
if disruptions:
print(f"✓ Found {len(disruptions)} S-Bahn disruptions")
else:
print(f" No S-Bahn disruptions found (all lines operating normally)")
return disruptions
except ImportError as e:
print(f" ✗ Selenium not available: {e}")
print(f" Install with: pip install selenium webdriver-manager")
return []
except Exception as e:
print(f" ✗ Error: {e}")
import traceback
traceback.print_exc()
return []
finally:
if driver:
driver.quit()
def _find_and_click_disruptions(self, driver):
"""Find disruption boxes in the sidebar"""
try:
from selenium.webdriver.common.by import By
disruptions = []
print(" Looking for disruption boxes...")
# Find all disruption boxes in the sidebar
disruption_boxes = driver.find_elements(By.CSS_SELECTOR, "div[data-cy='disruptionbox']")
if not disruption_boxes:
print(" No disruption boxes found")
return []
print(f" Found {len(disruption_boxes)} disruption boxes")
# First pass: collect all basic info without clicking
basic_info = []
for i, box in enumerate(disruption_boxes):
try:
# Extract disruption ID
disruption_id = box.get_attribute('id')
# Extract title
title_elem = box.find_element(By.CSS_SELECTOR, "span[data-cy='disruptionboxTitle']")
title = title_elem.text.strip()
# Extract subtitle (type)
subtitle_elem = box.find_element(By.CSS_SELECTOR, "span[data-cy='disruptionboxSubtitle']")
subtitle = subtitle_elem.text.strip()
# Extract affected lines
lines = []
badge_list = box.find_element(By.CSS_SELECTOR, "div[data-cy='disruptionBadgeList']")
badges = badge_list.find_elements(By.CSS_SELECTOR, "span[data-cy='disruptionBadge']")
for badge in badges:
line_text = badge.text.strip()
if line_text and line_text.startswith('S'):
lines.append(line_text)
# Determine severity from icon
severity = 'medium'
try:
icon = box.find_element(By.CSS_SELECTOR, "img[data-cy='disruptionboxIcon']")
icon_src = icon.get_attribute('src')
if 'red' in icon_src:
severity = 'high'
elif 'orange' in icon_src:
severity = 'medium'
elif 'yellow' in icon_src:
severity = 'low'
except:
pass
# Store basic info
basic_info.append({
'id': disruption_id or f"sbahn_{i}",
'title': title,
'subtitle': subtitle,
'lines': lines,
'severity': severity,
'index': i
})
print(f" ✓ [{i}] {title[:60]}... (Lines: {', '.join(lines)})")
except Exception as e:
print(f" ✗ Error extracting disruption {i}: {e}")
continue
# Second pass: click each one to get time details
print(f"\n Extracting time details for {len(basic_info)} disruptions...")
for info in basic_info:
print(f" Processing disruption {info['index']}...")
try:
# Make sure we're back at the list view
driver.execute_script("window.scrollTo(0, 0);")
time.sleep(0.5)
# Wait for boxes to be present again
try:
WebDriverWait(driver, 3).until(
EC.presence_of_element_located((By.CSS_SELECTOR, "div[data-cy='disruptionbox']"))
)
except:
pass
# Refetch boxes each time
boxes = driver.find_elements(By.CSS_SELECTOR, "div[data-cy='disruptionbox']")
print(f" Found {len(boxes)} boxes after refetch")
if info['index'] >= len(boxes):
print(f" ⚠ Box {info['index']} not found (only {len(boxes)} boxes available)")
continue
# Get fresh reference to the box and button
box = boxes[info['index']]
button = box.find_element(By.TAG_NAME, "button")
# Click to open details
driver.execute_script("arguments[0].scrollIntoView(true);", button)
time.sleep(0.3)
driver.execute_script("arguments[0].click();", button) # Use JS click
time.sleep(1.5) # Wait for detail panel to fully open
# Extract time from page text
detail_text = driver.find_element(By.TAG_NAME, "body").text
# Debug: show a snippet of the detail text
if "From:" in detail_text and "To:" in detail_text:
snippet_start = detail_text.find("From:")
snippet_end = detail_text.find("To:", snippet_start) + 50
snippet = detail_text[snippet_start:snippet_end]
print(f" Time snippet: {snippet.replace(chr(10), ' ')}")
start_time, end_time = self._extract_time_range(detail_text)
# Go back to original page to reset the view
driver.get(self.MAP_URL)
time.sleep(3) # Wait for page to reload and boxes to appear
# Create disruption object
disruption_type = self._classify_type(info['title'] + ' ' + info['subtitle'])
disruption = {
'id': info['id'],
'title': info['title'],
'description': info['subtitle'],
'lines': info['lines'],
'type': disruption_type,
'start_time': start_time,
'end_time': end_time,
'severity': info['severity'],
'source': 'db_karte_sidebar',
'created_at': datetime.utcnow()
}
disruptions.append(disruption)
time_info = ""
if start_time:
time_info += f" From: {start_time.strftime('%d.%m %H:%M')}"
if end_time:
time_info += f" To: {end_time.strftime('%d.%m %H:%M')}"
if time_info:
print(f" ✓ [{info['index']}]{time_info}")
except Exception as e:
print(f" ⚠ Could not get time for disruption {info['index']}: {e}")
# Still add the disruption without time info
disruption = {
'id': info['id'],
'title': info['title'],
'description': info['subtitle'],
'lines': info['lines'],
'type': self._classify_type(info['title']),
'start_time': None,
'end_time': None,
'severity': info['severity'],
'source': 'db_karte_sidebar',
'created_at': datetime.utcnow()
}
disruptions.append(disruption)
return disruptions
except Exception as e:
print(f" ✗ Error finding disruption boxes: {e}")
return []
def _extract_disruption_details(self, driver):
"""Extract disruption details from popup/modal"""
try:
from selenium.webdriver.common.by import By
# Look for popup/modal/tooltip containers
popup_selectors = [
"div[class*='popup']",
"div[class*='modal']",
"div[class*='tooltip']",
"div[class*='detail']",
"div[class*='info']",
"[role='dialog']",
"[role='tooltip']",
]
popup = None
for selector in popup_selectors:
try:
elements = driver.find_elements(By.CSS_SELECTOR, selector)
for elem in elements:
if elem.is_displayed() and len(elem.text) > 20:
popup = elem
break
if popup:
break
except:
continue
if not popup:
# Try to get any recently appeared text
body = driver.find_element(By.TAG_NAME, "body")
popup_text = body.text
else:
popup_text = popup.text
# Check if it's S-Bahn related
if not self._contains_sbahn_reference(popup_text):
return None
# Extract title (usually first line or heading)
title = popup_text.split('\n')[0][:100] if '\n' in popup_text else popup_text[:100]
# Extract time information
start_time, end_time = self._extract_time_range(popup_text)
# Extract affected lines
lines = self._extract_lines_from_text(popup_text)
return {
'id': f"sbahn_detail_{hash(popup_text) % 10000}",
'title': title,
'description': popup_text[:500],
'lines': lines,
'type': self._classify_type(title),
'start_time': start_time,
'end_time': end_time,
'severity': self._determine_severity(popup_text),
'source': 'db_karte_detail',
'created_at': datetime.utcnow()
}
except Exception as e:
return None
def _extract_time_range(self, text):
"""Extract start and end time from text"""
import re
from datetime import datetime
start_time = None
end_time = None
# Look for the specific format with possible newlines
# Pattern: From:XX.YYYY-MM-DD, HH:MMTo:XX.YYYY-MM-DD, HH:MM
# Remove newlines first to make matching easier
text_clean = text.replace('\n', ' ').replace('\r', ' ')
pattern = r'From:\s*[A-Za-z]{2}\.\s*(\d{4}-\d{2}-\d{2}),\s*(\d{2}:\d{2})\s*To:\s*[A-Za-z]{2}\.\s*(\d{4}-\d{2}-\d{2}),\s*(\d{2}:\d{2})'
match = re.search(pattern, text_clean)
if match:
try:
start_date = match.group(1) # 2025-11-13
start_time_str = match.group(2) # 10:02
end_date = match.group(3) # 2025-11-13
end_time_str = match.group(4) # 14:30
start_time = datetime.strptime(f"{start_date} {start_time_str}", "%Y-%m-%d %H:%M")
end_time = datetime.strptime(f"{end_date} {end_time_str}", "%Y-%m-%d %H:%M")
except Exception as e:
print(f" ⚠ Error parsing time: {e}")
# Fallback: Try other German formats
if not start_time:
# Look for "ab DD.MM.YYYY HH:MM" or "bis DD.MM.YYYY HH:MM"
ab_pattern = r'ab\s+(\d{1,2}\.\d{1,2}\.\d{4})[,\s]+(\d{1,2}:\d{2})'
bis_pattern = r'bis\s+(\d{1,2}\.\d{1,2}\.\d{4})[,\s]+(\d{1,2}:\d{2})'
ab_match = re.search(ab_pattern, text, re.IGNORECASE)
if ab_match:
try:
start_time = datetime.strptime(f"{ab_match.group(1)} {ab_match.group(2)}", "%d.%m.%Y %H:%M")
except:
pass
bis_match = re.search(bis_pattern, text, re.IGNORECASE)
if bis_match:
try:
end_time = datetime.strptime(f"{bis_match.group(1)} {bis_match.group(2)}", "%d.%m.%Y %H:%M")
except:
pass
return start_time, end_time
def _determine_severity(self, text):
"""Determine severity based on keywords"""
text_lower = text.lower()
if any(word in text_lower for word in ['ausfall', 'gesperrt', 'eingestellt', 'komplett']):
return 'high'
elif any(word in text_lower for word in ['verspätung', 'verzögerung', 'teilweise']):
return 'medium'
else:
return 'low'
def _parse_selenium_page(self, page_source, driver):
"""Parse page loaded by Selenium"""
try:
from bs4 import BeautifulSoup
from selenium.webdriver.common.by import By
print(" Analyzing rendered page...")
soup = BeautifulSoup(page_source, 'html.parser')
disruptions = []
# Method 1: Try to find disruption elements directly via Selenium
try:
# Look for common disruption indicators
selectors = [
"div[class*='disruption']",
"div[class*='stoerung']",
"div[class*='incident']",
"div[class*='message']",
"div[class*='alert']",
"[data-disruption]",
"[data-incident]"
]
for selector in selectors:
try:
elements = driver.find_elements(By.CSS_SELECTOR, selector)
if elements:
print(f" Found {len(elements)} elements with selector: {selector}")
for elem in elements:
text = elem.text.strip()
if len(text) > 20 and self._contains_sbahn_reference(text):
disruptions.append(self._create_disruption_from_text(text))
except:
continue
except Exception as e:
print(f" ✗ Selenium element search error: {e}")
# Method 2: Parse the page source with BeautifulSoup
if not disruptions:
print(" Trying BeautifulSoup parsing...")
disruptions = self._parse_map_page(page_source.encode(), page_source)
# Method 3: Check for any text mentioning S-Bahn lines with disruptions
if not disruptions:
print(" Checking page text for S-Bahn mentions...")
page_text = soup.get_text()
if self._contains_sbahn_reference(page_text):
# Extract paragraphs or sections mentioning S-Bahn
for elem in soup.find_all(['p', 'div', 'span']):
text = elem.get_text(strip=True)
if len(text) > 30 and self._contains_sbahn_reference(text):
lines = self._extract_lines_from_text(text)
if lines:
disruptions.append(self._create_disruption_from_text(text))
# Remove duplicates
seen = set()
unique = []
for d in disruptions:
key = d['title'][:50]
if key not in seen:
seen.add(key)
unique.append(d)
return unique
except Exception as e:
print(f" ✗ Parse error: {e}")
import traceback
traceback.print_exc()
return []
def _contains_sbahn_reference(self, text):
"""Check if text contains S-Bahn line references"""
import re
return bool(re.search(r'S[\s-]?[1-8]', text, re.IGNORECASE))
def _create_disruption_from_text(self, text):
"""Create disruption object from text"""
# Extract first sentence or first 100 chars as title
sentences = text.split('.')
title = sentences[0][:100] if sentences else text[:100]
return {
'id': f"sbahn_{hash(text) % 10000}",
'title': title,
'description': text[:500],
'lines': self._extract_lines_from_text(text),
'type': self._classify_type(title),
'start_time': None,
'end_time': None,
'severity': 'medium',
'source': 'db_karte_selenium',
'created_at': datetime.utcnow()
}
def _parse_map_page(self, html_content, html_text):
"""Parse DB Karte map page for S-Bahn disruptions"""
try:
from bs4 import BeautifulSoup
import re
import json
disruptions = []
# Method 1: Look for embedded JSON data in script tags
print(" Analyzing page for disruption data...")
# The map page likely has JSON data embedded in <script> tags
soup = BeautifulSoup(html_content, 'html.parser')
scripts = soup.find_all('script')
for script in scripts:
if script.string:
# Look for JSON data containing disruption/störung information
script_text = script.string
# Try to find JSON objects
json_pattern = r'\{[^{}]*(?:"disruption"|"störung"|"incident"|"message")[^{}]*\}'
matches = re.finditer(json_pattern, script_text, re.IGNORECASE)
for match in matches:
try:
data = json.loads(match.group())
# Process found JSON data
if self._is_disruption_data(data):
disruption = self._parse_disruption_json(data)
if disruption:
disruptions.append(disruption)
except json.JSONDecodeError:
continue
# Method 2: Look for API endpoint URLs in the page
api_pattern = r'https?://[^\s"\']+(?:api|disruption|stoerung)[^\s"\']+'
api_urls = re.findall(api_pattern, html_text, re.IGNORECASE)
if api_urls:
print(f" Found {len(api_urls)} potential API endpoints")
for api_url in set(api_urls[:3]): # Try first 3 unique URLs
try:
print(f" Trying API: {api_url[:60]}...")
api_response = self.session.get(api_url, timeout=10)
if api_response.status_code == 200:
api_data = api_response.json()
api_disruptions = self._parse_api_response(api_data)
disruptions.extend(api_disruptions)
except:
continue
# Method 3: Look for visible disruption messages on the page
if not disruptions:
print(" Checking for visible disruption messages...")
disruptions = self._scrape_visible_disruptions(soup)
# Remove duplicates based on title
seen_titles = set()
unique_disruptions = []
for d in disruptions:
if d['title'] not in seen_titles:
seen_titles.add(d['title'])
unique_disruptions.append(d)
return unique_disruptions
except Exception as e:
print(f" ✗ Parse error: {e}")
import traceback
traceback.print_exc()
return []
def _is_disruption_data(self, data):
"""Check if JSON data contains disruption information"""
if not isinstance(data, dict):
return False
disruption_keys = ['disruption', 'störung', 'incident', 'message', 'title', 'description']
return any(key in str(data).lower() for key in disruption_keys)
def _parse_disruption_json(self, data):
"""Parse disruption from JSON data"""
try:
title = data.get('title') or data.get('headline') or data.get('message', '')
if not title or len(title) < 5:
return None
return {
'id': data.get('id', f"json_{hash(title)}"),
'title': title,
'description': data.get('description') or data.get('text') or data.get('content', ''),
'lines': self._extract_lines_from_text(title),
'type': self._classify_type(title),
'start_time': None,
'end_time': None,
'severity': data.get('severity', 'medium'),
'source': 'db_karte_json',
'created_at': datetime.utcnow()
}
except:
return None
def _parse_api_response(self, data):
"""Parse API response for disruptions"""
disruptions = []
try:
# Handle different response formats
if isinstance(data, dict):
if 'disruptions' in data:
data = data['disruptions']
elif 'items' in data:
data = data['items']
elif 'data' in data:
data = data['data']
else:
data = [data]
if isinstance(data, list):
for item in data:
disruption = self._parse_disruption_json(item)
if disruption:
disruptions.append(disruption)
except:
pass
return disruptions
def _scrape_visible_disruptions(self, soup):
"""Scrape visible disruption messages from the page"""
disruptions = []
try:
# Look for common disruption container classes
selectors = [
'div[class*="disruption"]',
'div[class*="stoerung"]',
'div[class*="incident"]',
'div[class*="message"]',
'div[class*="alert"]',
'article[class*="disruption"]',
]
for selector in selectors:
elements = soup.select(selector)
for elem in elements:
text = elem.get_text(strip=True)
if len(text) > 20 and any(word in text.lower() for word in ['s-bahn', 's1', 's2', 's3', 's4', 's6', 's7', 's8']):
# Extract title (first line or heading)
title_elem = elem.find(['h1', 'h2', 'h3', 'h4', 'strong'])
title = title_elem.get_text(strip=True) if title_elem else text[:100]
disruptions.append({
'id': f"visible_{len(disruptions)}",
'title': title,
'description': text[:500],
'lines': self._extract_lines_from_text(text),
'type': self._classify_type(title),
'start_time': None,
'end_time': None,
'severity': 'medium',
'source': 'db_karte_page',
'created_at': datetime.utcnow()
})
except:
pass
return disruptions
def _extract_lines_from_text(self, text):
"""Extract S-Bahn line numbers from text"""
import re
# Match S1, S2, S 3, S-4, etc.
pattern = r'S[\s-]?[1-8]'
matches = re.findall(pattern, text, re.IGNORECASE)
# Normalize to format like "S1", "S2"
lines = [re.sub(r'[^\dS]', '', m.upper()) for m in matches]
return list(set(lines)) # Remove duplicates
def _classify_type(self, title):
"""Classify disruption type based on title"""
title_lower = title.lower()
if 'bauarbeit' in title_lower or 'wartung' in title_lower:
return 'maintenance'
elif 'ausfall' in title_lower or 'störung' in title_lower:
return 'disruption'
elif 'verspätung' in title_lower:
return 'delay'
else:
return 'info'
def test_db_client():
"""Test the DB client and print results"""
print("="*70)
print("🚆 Deutsche Bahn S-Bahn Client Test")
print("="*70)
client = DBClient()
disruptions = client.get_sbahn_disruptions()
if not disruptions:
print("\n⚠ No S-Bahn disruptions found (or not yet implemented)")
return
print(f"\n📊 Total S-Bahn Disruptions: {len(disruptions)}")
print("="*70)
for i, d in enumerate(disruptions, 1):
print(f"\n[{i}] {d['title']}")
print(f" Lines: {', '.join(d['lines'])}")
print(f" Type: {d['type']}")
print("\n" + "="*70)
if __name__ == '__main__':
test_db_client()