790 lines
33 KiB
Python
790 lines
33 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Deutsche Bahn API Client - Fetch S-Bahn disruptions using Selenium
|
||
"""
|
||
import requests
|
||
from datetime import datetime
|
||
import time
|
||
|
||
class DBClient:
|
||
"""Client for Deutsche Bahn (S-Bahn) disruptions"""
|
||
|
||
# DB S-Bahn München map page
|
||
MAP_URL = "https://karte.bahn.de/en/region/DB_SBahn_Muenchen"
|
||
|
||
def __init__(self):
|
||
self.session = requests.Session()
|
||
self.session.headers.update({
|
||
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36',
|
||
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
|
||
'Accept-Language': 'en-US,en;q=0.9,de;q=0.8',
|
||
})
|
||
|
||
def get_sbahn_disruptions(self):
|
||
"""
|
||
Fetch S-Bahn disruptions for Munich from DB Karte using Selenium
|
||
|
||
Returns:
|
||
list: Disruption data
|
||
"""
|
||
print("\n🔍 Fetching S-Bahn disruptions from DB Karte (using Selenium)...")
|
||
|
||
driver = None
|
||
try:
|
||
from selenium import webdriver
|
||
from selenium.webdriver.chrome.options import Options
|
||
from selenium.webdriver.chrome.service import Service
|
||
from selenium.webdriver.common.by import By
|
||
from selenium.webdriver.support.ui import WebDriverWait
|
||
from selenium.webdriver.support import expected_conditions as EC
|
||
import os
|
||
|
||
# Setup Chrome options for Chromium
|
||
chrome_options = Options()
|
||
chrome_options.add_argument('--headless')
|
||
chrome_options.add_argument('--no-sandbox')
|
||
chrome_options.add_argument('--disable-dev-shm-usage')
|
||
chrome_options.add_argument('--disable-blink-features=AutomationControlled')
|
||
chrome_options.add_argument('--window-size=1920,1080')
|
||
chrome_options.add_argument('--disable-gpu')
|
||
chrome_options.add_experimental_option("excludeSwitches", ["enable-automation"])
|
||
chrome_options.add_experimental_option('useAutomationExtension', False)
|
||
|
||
# Set realistic user agent
|
||
chrome_options.add_argument('user-agent=Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36')
|
||
|
||
# Use system Chromium if available (Docker container)
|
||
chrome_bin = os.getenv('CHROME_BIN', '/usr/bin/chromium')
|
||
chromedriver_path = os.getenv('CHROMEDRIVER_PATH', '/usr/bin/chromedriver')
|
||
|
||
if os.path.exists(chrome_bin):
|
||
chrome_options.binary_location = chrome_bin
|
||
print(f" Using system Chromium: {chrome_bin}")
|
||
|
||
print(" Starting Chromium browser...")
|
||
|
||
# Try to use system chromedriver
|
||
try:
|
||
if os.path.exists(chromedriver_path):
|
||
service = Service(chromedriver_path)
|
||
driver = webdriver.Chrome(service=service, options=chrome_options)
|
||
else:
|
||
driver = webdriver.Chrome(options=chrome_options)
|
||
except Exception as e:
|
||
print(f" ✗ Failed to start Chromium: {e}")
|
||
print(f" ℹ️ Falling back to webdriver-manager...")
|
||
try:
|
||
from webdriver_manager.chrome import ChromeDriverManager
|
||
service = Service(ChromeDriverManager().install())
|
||
driver = webdriver.Chrome(service=service, options=chrome_options)
|
||
except Exception as e2:
|
||
print(f" ✗ webdriver-manager also failed: {e2}")
|
||
raise
|
||
|
||
print(f" Loading: {self.MAP_URL}")
|
||
driver.get(self.MAP_URL)
|
||
|
||
# Wait for page to load
|
||
print(" Waiting for page to load...")
|
||
|
||
# Wait for disruption boxes to appear
|
||
try:
|
||
print(" Waiting for disruption boxes...")
|
||
WebDriverWait(driver, 15).until(
|
||
EC.presence_of_element_located((By.CSS_SELECTOR, "div[data-cy='disruptionbox']"))
|
||
)
|
||
# Give extra time for all boxes to load
|
||
time.sleep(3)
|
||
print(" ✓ Disruption boxes should be loaded")
|
||
except Exception as e:
|
||
print(f" ⚠ Timeout waiting for disruption boxes: {e}")
|
||
time.sleep(5)
|
||
|
||
print(f" ✓ Page loaded (title: {driver.title[:50]}...)")
|
||
|
||
# Debug: Save screenshot and page source
|
||
try:
|
||
screenshot_path = "/tmp/db_karte_screenshot.png"
|
||
driver.save_screenshot(screenshot_path)
|
||
print(f" 📸 Screenshot saved to: {screenshot_path}")
|
||
except:
|
||
pass
|
||
|
||
# Debug: Print page structure
|
||
print(" Analyzing page structure...")
|
||
page_source = driver.page_source
|
||
|
||
# Save page source for inspection
|
||
try:
|
||
with open("/tmp/db_karte_source.html", "w", encoding="utf-8") as f:
|
||
f.write(page_source)
|
||
print(f" 📄 Page source saved to: /tmp/db_karte_source.html")
|
||
except:
|
||
pass
|
||
|
||
# Look for disruption markers/icons on the map
|
||
disruptions = self._find_and_click_disruptions(driver)
|
||
|
||
# If no disruptions found via clicking, parse the page source
|
||
if not disruptions:
|
||
print(" No clickable disruptions found, parsing page source...")
|
||
|
||
# Debug: Show what elements are on the page
|
||
from bs4 import BeautifulSoup
|
||
soup = BeautifulSoup(page_source, 'html.parser')
|
||
|
||
# Count different element types
|
||
print(f" Page stats: {len(soup.find_all('div'))} divs, {len(soup.find_all('button'))} buttons")
|
||
|
||
# Look for any text mentioning disruptions
|
||
text = soup.get_text().lower()
|
||
if 'disruption' in text or 'störung' in text or 'incident' in text:
|
||
print(f" ℹ️ Page contains disruption-related text")
|
||
|
||
# Check for common map libraries
|
||
if 'leaflet' in page_source.lower():
|
||
print(f" ℹ️ Page uses Leaflet maps")
|
||
if 'mapbox' in page_source.lower():
|
||
print(f" ℹ️ Page uses Mapbox")
|
||
if 'google.maps' in page_source.lower():
|
||
print(f" ℹ️ Page uses Google Maps")
|
||
|
||
disruptions = self._parse_selenium_page(page_source, driver)
|
||
|
||
if disruptions:
|
||
print(f"✓ Found {len(disruptions)} S-Bahn disruptions")
|
||
else:
|
||
print(f" ℹ️ No S-Bahn disruptions found (all lines operating normally)")
|
||
|
||
return disruptions
|
||
|
||
except ImportError as e:
|
||
print(f" ✗ Selenium not available: {e}")
|
||
print(f" ℹ️ Install with: pip install selenium webdriver-manager")
|
||
return []
|
||
except Exception as e:
|
||
print(f" ✗ Error: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
return []
|
||
finally:
|
||
if driver:
|
||
driver.quit()
|
||
|
||
def _find_and_click_disruptions(self, driver):
|
||
"""Find disruption boxes in the sidebar"""
|
||
try:
|
||
from selenium.webdriver.common.by import By
|
||
|
||
disruptions = []
|
||
|
||
print(" Looking for disruption boxes...")
|
||
|
||
# Find all disruption boxes in the sidebar
|
||
disruption_boxes = driver.find_elements(By.CSS_SELECTOR, "div[data-cy='disruptionbox']")
|
||
|
||
if not disruption_boxes:
|
||
print(" No disruption boxes found")
|
||
return []
|
||
|
||
print(f" Found {len(disruption_boxes)} disruption boxes")
|
||
|
||
# First pass: collect all basic info without clicking
|
||
basic_info = []
|
||
for i, box in enumerate(disruption_boxes):
|
||
try:
|
||
|
||
# Extract disruption ID
|
||
disruption_id = box.get_attribute('id')
|
||
|
||
# Extract title
|
||
title_elem = box.find_element(By.CSS_SELECTOR, "span[data-cy='disruptionboxTitle']")
|
||
title = title_elem.text.strip()
|
||
|
||
# Extract subtitle (type)
|
||
subtitle_elem = box.find_element(By.CSS_SELECTOR, "span[data-cy='disruptionboxSubtitle']")
|
||
subtitle = subtitle_elem.text.strip()
|
||
|
||
# Extract affected lines
|
||
lines = []
|
||
badge_list = box.find_element(By.CSS_SELECTOR, "div[data-cy='disruptionBadgeList']")
|
||
badges = badge_list.find_elements(By.CSS_SELECTOR, "span[data-cy='disruptionBadge']")
|
||
for badge in badges:
|
||
line_text = badge.text.strip()
|
||
if line_text and line_text.startswith('S'):
|
||
lines.append(line_text)
|
||
|
||
# Determine severity from icon
|
||
severity = 'medium'
|
||
try:
|
||
icon = box.find_element(By.CSS_SELECTOR, "img[data-cy='disruptionboxIcon']")
|
||
icon_src = icon.get_attribute('src')
|
||
if 'red' in icon_src:
|
||
severity = 'high'
|
||
elif 'orange' in icon_src:
|
||
severity = 'medium'
|
||
elif 'yellow' in icon_src:
|
||
severity = 'low'
|
||
except:
|
||
pass
|
||
|
||
# Store basic info
|
||
basic_info.append({
|
||
'id': disruption_id or f"sbahn_{i}",
|
||
'title': title,
|
||
'subtitle': subtitle,
|
||
'lines': lines,
|
||
'severity': severity,
|
||
'index': i
|
||
})
|
||
|
||
print(f" ✓ [{i}] {title[:60]}... (Lines: {', '.join(lines)})")
|
||
|
||
except Exception as e:
|
||
print(f" ✗ Error extracting disruption {i}: {e}")
|
||
continue
|
||
|
||
# Second pass: click each one to get time details
|
||
print(f"\n Extracting time details for {len(basic_info)} disruptions...")
|
||
for info in basic_info:
|
||
print(f" Processing disruption {info['index']}...")
|
||
try:
|
||
# Make sure we're back at the list view
|
||
driver.execute_script("window.scrollTo(0, 0);")
|
||
time.sleep(0.5)
|
||
|
||
# Wait for boxes to be present again
|
||
try:
|
||
WebDriverWait(driver, 3).until(
|
||
EC.presence_of_element_located((By.CSS_SELECTOR, "div[data-cy='disruptionbox']"))
|
||
)
|
||
except:
|
||
pass
|
||
|
||
# Refetch boxes each time
|
||
boxes = driver.find_elements(By.CSS_SELECTOR, "div[data-cy='disruptionbox']")
|
||
print(f" Found {len(boxes)} boxes after refetch")
|
||
|
||
if info['index'] >= len(boxes):
|
||
print(f" ⚠ Box {info['index']} not found (only {len(boxes)} boxes available)")
|
||
continue
|
||
|
||
# Get fresh reference to the box and button
|
||
box = boxes[info['index']]
|
||
button = box.find_element(By.TAG_NAME, "button")
|
||
|
||
# Click to open details
|
||
driver.execute_script("arguments[0].scrollIntoView(true);", button)
|
||
time.sleep(0.3)
|
||
driver.execute_script("arguments[0].click();", button) # Use JS click
|
||
time.sleep(1.5) # Wait for detail panel to fully open
|
||
|
||
# Extract time from page text
|
||
detail_text = driver.find_element(By.TAG_NAME, "body").text
|
||
|
||
# Debug: show a snippet of the detail text
|
||
if "From:" in detail_text and "To:" in detail_text:
|
||
snippet_start = detail_text.find("From:")
|
||
snippet_end = detail_text.find("To:", snippet_start) + 50
|
||
snippet = detail_text[snippet_start:snippet_end]
|
||
print(f" Time snippet: {snippet.replace(chr(10), ' ')}")
|
||
|
||
start_time, end_time = self._extract_time_range(detail_text)
|
||
|
||
# Go back to original page to reset the view
|
||
driver.get(self.MAP_URL)
|
||
time.sleep(3) # Wait for page to reload and boxes to appear
|
||
|
||
# Create disruption object
|
||
disruption_type = self._classify_type(info['title'] + ' ' + info['subtitle'])
|
||
|
||
disruption = {
|
||
'id': info['id'],
|
||
'title': info['title'],
|
||
'description': info['subtitle'],
|
||
'lines': info['lines'],
|
||
'type': disruption_type,
|
||
'start_time': start_time,
|
||
'end_time': end_time,
|
||
'severity': info['severity'],
|
||
'source': 'db_karte_sidebar',
|
||
'created_at': datetime.utcnow()
|
||
}
|
||
|
||
disruptions.append(disruption)
|
||
|
||
time_info = ""
|
||
if start_time:
|
||
time_info += f" From: {start_time.strftime('%d.%m %H:%M')}"
|
||
if end_time:
|
||
time_info += f" To: {end_time.strftime('%d.%m %H:%M')}"
|
||
|
||
if time_info:
|
||
print(f" ✓ [{info['index']}]{time_info}")
|
||
|
||
except Exception as e:
|
||
print(f" ⚠ Could not get time for disruption {info['index']}: {e}")
|
||
# Still add the disruption without time info
|
||
disruption = {
|
||
'id': info['id'],
|
||
'title': info['title'],
|
||
'description': info['subtitle'],
|
||
'lines': info['lines'],
|
||
'type': self._classify_type(info['title']),
|
||
'start_time': None,
|
||
'end_time': None,
|
||
'severity': info['severity'],
|
||
'source': 'db_karte_sidebar',
|
||
'created_at': datetime.utcnow()
|
||
}
|
||
disruptions.append(disruption)
|
||
|
||
return disruptions
|
||
|
||
except Exception as e:
|
||
print(f" ✗ Error finding disruption boxes: {e}")
|
||
return []
|
||
|
||
def _extract_disruption_details(self, driver):
|
||
"""Extract disruption details from popup/modal"""
|
||
try:
|
||
from selenium.webdriver.common.by import By
|
||
|
||
# Look for popup/modal/tooltip containers
|
||
popup_selectors = [
|
||
"div[class*='popup']",
|
||
"div[class*='modal']",
|
||
"div[class*='tooltip']",
|
||
"div[class*='detail']",
|
||
"div[class*='info']",
|
||
"[role='dialog']",
|
||
"[role='tooltip']",
|
||
]
|
||
|
||
popup = None
|
||
for selector in popup_selectors:
|
||
try:
|
||
elements = driver.find_elements(By.CSS_SELECTOR, selector)
|
||
for elem in elements:
|
||
if elem.is_displayed() and len(elem.text) > 20:
|
||
popup = elem
|
||
break
|
||
if popup:
|
||
break
|
||
except:
|
||
continue
|
||
|
||
if not popup:
|
||
# Try to get any recently appeared text
|
||
body = driver.find_element(By.TAG_NAME, "body")
|
||
popup_text = body.text
|
||
else:
|
||
popup_text = popup.text
|
||
|
||
# Check if it's S-Bahn related
|
||
if not self._contains_sbahn_reference(popup_text):
|
||
return None
|
||
|
||
# Extract title (usually first line or heading)
|
||
title = popup_text.split('\n')[0][:100] if '\n' in popup_text else popup_text[:100]
|
||
|
||
# Extract time information
|
||
start_time, end_time = self._extract_time_range(popup_text)
|
||
|
||
# Extract affected lines
|
||
lines = self._extract_lines_from_text(popup_text)
|
||
|
||
return {
|
||
'id': f"sbahn_detail_{hash(popup_text) % 10000}",
|
||
'title': title,
|
||
'description': popup_text[:500],
|
||
'lines': lines,
|
||
'type': self._classify_type(title),
|
||
'start_time': start_time,
|
||
'end_time': end_time,
|
||
'severity': self._determine_severity(popup_text),
|
||
'source': 'db_karte_detail',
|
||
'created_at': datetime.utcnow()
|
||
}
|
||
|
||
except Exception as e:
|
||
return None
|
||
|
||
def _extract_time_range(self, text):
|
||
"""Extract start and end time from text"""
|
||
import re
|
||
from datetime import datetime
|
||
|
||
start_time = None
|
||
end_time = None
|
||
|
||
# Look for the specific format with possible newlines
|
||
# Pattern: From:XX.YYYY-MM-DD, HH:MMTo:XX.YYYY-MM-DD, HH:MM
|
||
# Remove newlines first to make matching easier
|
||
text_clean = text.replace('\n', ' ').replace('\r', ' ')
|
||
|
||
pattern = r'From:\s*[A-Za-z]{2}\.\s*(\d{4}-\d{2}-\d{2}),\s*(\d{2}:\d{2})\s*To:\s*[A-Za-z]{2}\.\s*(\d{4}-\d{2}-\d{2}),\s*(\d{2}:\d{2})'
|
||
match = re.search(pattern, text_clean)
|
||
|
||
if match:
|
||
try:
|
||
start_date = match.group(1) # 2025-11-13
|
||
start_time_str = match.group(2) # 10:02
|
||
end_date = match.group(3) # 2025-11-13
|
||
end_time_str = match.group(4) # 14:30
|
||
|
||
start_time = datetime.strptime(f"{start_date} {start_time_str}", "%Y-%m-%d %H:%M")
|
||
end_time = datetime.strptime(f"{end_date} {end_time_str}", "%Y-%m-%d %H:%M")
|
||
except Exception as e:
|
||
print(f" ⚠ Error parsing time: {e}")
|
||
|
||
# Fallback: Try other German formats
|
||
if not start_time:
|
||
# Look for "ab DD.MM.YYYY HH:MM" or "bis DD.MM.YYYY HH:MM"
|
||
ab_pattern = r'ab\s+(\d{1,2}\.\d{1,2}\.\d{4})[,\s]+(\d{1,2}:\d{2})'
|
||
bis_pattern = r'bis\s+(\d{1,2}\.\d{1,2}\.\d{4})[,\s]+(\d{1,2}:\d{2})'
|
||
|
||
ab_match = re.search(ab_pattern, text, re.IGNORECASE)
|
||
if ab_match:
|
||
try:
|
||
start_time = datetime.strptime(f"{ab_match.group(1)} {ab_match.group(2)}", "%d.%m.%Y %H:%M")
|
||
except:
|
||
pass
|
||
|
||
bis_match = re.search(bis_pattern, text, re.IGNORECASE)
|
||
if bis_match:
|
||
try:
|
||
end_time = datetime.strptime(f"{bis_match.group(1)} {bis_match.group(2)}", "%d.%m.%Y %H:%M")
|
||
except:
|
||
pass
|
||
|
||
return start_time, end_time
|
||
|
||
def _determine_severity(self, text):
|
||
"""Determine severity based on keywords"""
|
||
text_lower = text.lower()
|
||
|
||
if any(word in text_lower for word in ['ausfall', 'gesperrt', 'eingestellt', 'komplett']):
|
||
return 'high'
|
||
elif any(word in text_lower for word in ['verspätung', 'verzögerung', 'teilweise']):
|
||
return 'medium'
|
||
else:
|
||
return 'low'
|
||
|
||
def _parse_selenium_page(self, page_source, driver):
|
||
"""Parse page loaded by Selenium"""
|
||
try:
|
||
from bs4 import BeautifulSoup
|
||
from selenium.webdriver.common.by import By
|
||
|
||
print(" Analyzing rendered page...")
|
||
soup = BeautifulSoup(page_source, 'html.parser')
|
||
disruptions = []
|
||
|
||
# Method 1: Try to find disruption elements directly via Selenium
|
||
try:
|
||
# Look for common disruption indicators
|
||
selectors = [
|
||
"div[class*='disruption']",
|
||
"div[class*='stoerung']",
|
||
"div[class*='incident']",
|
||
"div[class*='message']",
|
||
"div[class*='alert']",
|
||
"[data-disruption]",
|
||
"[data-incident]"
|
||
]
|
||
|
||
for selector in selectors:
|
||
try:
|
||
elements = driver.find_elements(By.CSS_SELECTOR, selector)
|
||
if elements:
|
||
print(f" Found {len(elements)} elements with selector: {selector}")
|
||
for elem in elements:
|
||
text = elem.text.strip()
|
||
if len(text) > 20 and self._contains_sbahn_reference(text):
|
||
disruptions.append(self._create_disruption_from_text(text))
|
||
except:
|
||
continue
|
||
except Exception as e:
|
||
print(f" ✗ Selenium element search error: {e}")
|
||
|
||
# Method 2: Parse the page source with BeautifulSoup
|
||
if not disruptions:
|
||
print(" Trying BeautifulSoup parsing...")
|
||
disruptions = self._parse_map_page(page_source.encode(), page_source)
|
||
|
||
# Method 3: Check for any text mentioning S-Bahn lines with disruptions
|
||
if not disruptions:
|
||
print(" Checking page text for S-Bahn mentions...")
|
||
page_text = soup.get_text()
|
||
if self._contains_sbahn_reference(page_text):
|
||
# Extract paragraphs or sections mentioning S-Bahn
|
||
for elem in soup.find_all(['p', 'div', 'span']):
|
||
text = elem.get_text(strip=True)
|
||
if len(text) > 30 and self._contains_sbahn_reference(text):
|
||
lines = self._extract_lines_from_text(text)
|
||
if lines:
|
||
disruptions.append(self._create_disruption_from_text(text))
|
||
|
||
# Remove duplicates
|
||
seen = set()
|
||
unique = []
|
||
for d in disruptions:
|
||
key = d['title'][:50]
|
||
if key not in seen:
|
||
seen.add(key)
|
||
unique.append(d)
|
||
|
||
return unique
|
||
|
||
except Exception as e:
|
||
print(f" ✗ Parse error: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
return []
|
||
|
||
def _contains_sbahn_reference(self, text):
|
||
"""Check if text contains S-Bahn line references"""
|
||
import re
|
||
return bool(re.search(r'S[\s-]?[1-8]', text, re.IGNORECASE))
|
||
|
||
def _create_disruption_from_text(self, text):
|
||
"""Create disruption object from text"""
|
||
# Extract first sentence or first 100 chars as title
|
||
sentences = text.split('.')
|
||
title = sentences[0][:100] if sentences else text[:100]
|
||
|
||
return {
|
||
'id': f"sbahn_{hash(text) % 10000}",
|
||
'title': title,
|
||
'description': text[:500],
|
||
'lines': self._extract_lines_from_text(text),
|
||
'type': self._classify_type(title),
|
||
'start_time': None,
|
||
'end_time': None,
|
||
'severity': 'medium',
|
||
'source': 'db_karte_selenium',
|
||
'created_at': datetime.utcnow()
|
||
}
|
||
|
||
def _parse_map_page(self, html_content, html_text):
|
||
"""Parse DB Karte map page for S-Bahn disruptions"""
|
||
try:
|
||
from bs4 import BeautifulSoup
|
||
import re
|
||
import json
|
||
|
||
disruptions = []
|
||
|
||
# Method 1: Look for embedded JSON data in script tags
|
||
print(" Analyzing page for disruption data...")
|
||
|
||
# The map page likely has JSON data embedded in <script> tags
|
||
soup = BeautifulSoup(html_content, 'html.parser')
|
||
scripts = soup.find_all('script')
|
||
|
||
for script in scripts:
|
||
if script.string:
|
||
# Look for JSON data containing disruption/störung information
|
||
script_text = script.string
|
||
|
||
# Try to find JSON objects
|
||
json_pattern = r'\{[^{}]*(?:"disruption"|"störung"|"incident"|"message")[^{}]*\}'
|
||
matches = re.finditer(json_pattern, script_text, re.IGNORECASE)
|
||
|
||
for match in matches:
|
||
try:
|
||
data = json.loads(match.group())
|
||
# Process found JSON data
|
||
if self._is_disruption_data(data):
|
||
disruption = self._parse_disruption_json(data)
|
||
if disruption:
|
||
disruptions.append(disruption)
|
||
except json.JSONDecodeError:
|
||
continue
|
||
|
||
# Method 2: Look for API endpoint URLs in the page
|
||
api_pattern = r'https?://[^\s"\']+(?:api|disruption|stoerung)[^\s"\']+'
|
||
api_urls = re.findall(api_pattern, html_text, re.IGNORECASE)
|
||
|
||
if api_urls:
|
||
print(f" Found {len(api_urls)} potential API endpoints")
|
||
for api_url in set(api_urls[:3]): # Try first 3 unique URLs
|
||
try:
|
||
print(f" Trying API: {api_url[:60]}...")
|
||
api_response = self.session.get(api_url, timeout=10)
|
||
if api_response.status_code == 200:
|
||
api_data = api_response.json()
|
||
api_disruptions = self._parse_api_response(api_data)
|
||
disruptions.extend(api_disruptions)
|
||
except:
|
||
continue
|
||
|
||
# Method 3: Look for visible disruption messages on the page
|
||
if not disruptions:
|
||
print(" Checking for visible disruption messages...")
|
||
disruptions = self._scrape_visible_disruptions(soup)
|
||
|
||
# Remove duplicates based on title
|
||
seen_titles = set()
|
||
unique_disruptions = []
|
||
for d in disruptions:
|
||
if d['title'] not in seen_titles:
|
||
seen_titles.add(d['title'])
|
||
unique_disruptions.append(d)
|
||
|
||
return unique_disruptions
|
||
|
||
except Exception as e:
|
||
print(f" ✗ Parse error: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
return []
|
||
|
||
def _is_disruption_data(self, data):
|
||
"""Check if JSON data contains disruption information"""
|
||
if not isinstance(data, dict):
|
||
return False
|
||
|
||
disruption_keys = ['disruption', 'störung', 'incident', 'message', 'title', 'description']
|
||
return any(key in str(data).lower() for key in disruption_keys)
|
||
|
||
def _parse_disruption_json(self, data):
|
||
"""Parse disruption from JSON data"""
|
||
try:
|
||
title = data.get('title') or data.get('headline') or data.get('message', '')
|
||
if not title or len(title) < 5:
|
||
return None
|
||
|
||
return {
|
||
'id': data.get('id', f"json_{hash(title)}"),
|
||
'title': title,
|
||
'description': data.get('description') or data.get('text') or data.get('content', ''),
|
||
'lines': self._extract_lines_from_text(title),
|
||
'type': self._classify_type(title),
|
||
'start_time': None,
|
||
'end_time': None,
|
||
'severity': data.get('severity', 'medium'),
|
||
'source': 'db_karte_json',
|
||
'created_at': datetime.utcnow()
|
||
}
|
||
except:
|
||
return None
|
||
|
||
def _parse_api_response(self, data):
|
||
"""Parse API response for disruptions"""
|
||
disruptions = []
|
||
|
||
try:
|
||
# Handle different response formats
|
||
if isinstance(data, dict):
|
||
if 'disruptions' in data:
|
||
data = data['disruptions']
|
||
elif 'items' in data:
|
||
data = data['items']
|
||
elif 'data' in data:
|
||
data = data['data']
|
||
else:
|
||
data = [data]
|
||
|
||
if isinstance(data, list):
|
||
for item in data:
|
||
disruption = self._parse_disruption_json(item)
|
||
if disruption:
|
||
disruptions.append(disruption)
|
||
except:
|
||
pass
|
||
|
||
return disruptions
|
||
|
||
def _scrape_visible_disruptions(self, soup):
|
||
"""Scrape visible disruption messages from the page"""
|
||
disruptions = []
|
||
|
||
try:
|
||
# Look for common disruption container classes
|
||
selectors = [
|
||
'div[class*="disruption"]',
|
||
'div[class*="stoerung"]',
|
||
'div[class*="incident"]',
|
||
'div[class*="message"]',
|
||
'div[class*="alert"]',
|
||
'article[class*="disruption"]',
|
||
]
|
||
|
||
for selector in selectors:
|
||
elements = soup.select(selector)
|
||
for elem in elements:
|
||
text = elem.get_text(strip=True)
|
||
if len(text) > 20 and any(word in text.lower() for word in ['s-bahn', 's1', 's2', 's3', 's4', 's6', 's7', 's8']):
|
||
# Extract title (first line or heading)
|
||
title_elem = elem.find(['h1', 'h2', 'h3', 'h4', 'strong'])
|
||
title = title_elem.get_text(strip=True) if title_elem else text[:100]
|
||
|
||
disruptions.append({
|
||
'id': f"visible_{len(disruptions)}",
|
||
'title': title,
|
||
'description': text[:500],
|
||
'lines': self._extract_lines_from_text(text),
|
||
'type': self._classify_type(title),
|
||
'start_time': None,
|
||
'end_time': None,
|
||
'severity': 'medium',
|
||
'source': 'db_karte_page',
|
||
'created_at': datetime.utcnow()
|
||
})
|
||
except:
|
||
pass
|
||
|
||
return disruptions
|
||
|
||
def _extract_lines_from_text(self, text):
|
||
"""Extract S-Bahn line numbers from text"""
|
||
import re
|
||
# Match S1, S2, S 3, S-4, etc.
|
||
pattern = r'S[\s-]?[1-8]'
|
||
matches = re.findall(pattern, text, re.IGNORECASE)
|
||
# Normalize to format like "S1", "S2"
|
||
lines = [re.sub(r'[^\dS]', '', m.upper()) for m in matches]
|
||
return list(set(lines)) # Remove duplicates
|
||
|
||
def _classify_type(self, title):
|
||
"""Classify disruption type based on title"""
|
||
title_lower = title.lower()
|
||
if 'bauarbeit' in title_lower or 'wartung' in title_lower:
|
||
return 'maintenance'
|
||
elif 'ausfall' in title_lower or 'störung' in title_lower:
|
||
return 'disruption'
|
||
elif 'verspätung' in title_lower:
|
||
return 'delay'
|
||
else:
|
||
return 'info'
|
||
|
||
|
||
def test_db_client():
|
||
"""Test the DB client and print results"""
|
||
print("="*70)
|
||
print("🚆 Deutsche Bahn S-Bahn Client Test")
|
||
print("="*70)
|
||
|
||
client = DBClient()
|
||
disruptions = client.get_sbahn_disruptions()
|
||
|
||
if not disruptions:
|
||
print("\n⚠ No S-Bahn disruptions found (or not yet implemented)")
|
||
return
|
||
|
||
print(f"\n📊 Total S-Bahn Disruptions: {len(disruptions)}")
|
||
print("="*70)
|
||
|
||
for i, d in enumerate(disruptions, 1):
|
||
print(f"\n[{i}] {d['title']}")
|
||
print(f" Lines: {', '.join(d['lines'])}")
|
||
print(f" Type: {d['type']}")
|
||
|
||
print("\n" + "="*70)
|
||
|
||
|
||
if __name__ == '__main__':
|
||
test_db_client()
|