use anyhow::Result; use earcutr::earcut; use osmpbf::{Element, ElementReader}; use scylla::SessionBuilder; use std::collections::HashMap; use tokio::task::JoinSet; use std::fs::{File, OpenOptions}; use std::io::{BufWriter, Write, Seek, SeekFrom}; use std::path::{Path, PathBuf}; use memmap2::Mmap; const ZOOM_LEVELS: [u32; 6] = [2, 4, 6, 9, 12, 14]; struct NodeStore { writer: Option>, mmap: Option, path: PathBuf, last_id: i64, } impl NodeStore { fn new>(path: P) -> Result { let path = path.as_ref().to_path_buf(); let file = OpenOptions::new() .read(true) .write(true) .create(true) .truncate(true) .open(&path)?; let writer = BufWriter::with_capacity(10 * 1024 * 1024, file); // 10MB buffer Ok(Self { writer: Some(writer), mmap: None, path, last_id: -1, }) } fn insert(&mut self, id: i64, lat: f64, lon: f64) -> Result<()> { if let Some(writer) = &mut self.writer { if id > self.last_id + 1 { let gap = id - self.last_id - 1; writer.seek(SeekFrom::Current(gap * 8))?; } else if id <= self.last_id { writer.seek(SeekFrom::Start(id as u64 * 8))?; } let lat_i32 = (lat * 1e7) as i32; let lon_i32 = (lon * 1e7) as i32; writer.write_all(&lat_i32.to_le_bytes())?; writer.write_all(&lon_i32.to_le_bytes())?; self.last_id = id; } Ok(()) } fn prepare_for_reading(&mut self) -> Result<()> { self.writer = None; // Flush and close writer let file = File::open(&self.path)?; let mmap = unsafe { Mmap::map(&file)? }; self.mmap = Some(mmap); Ok(()) } fn get(&self, id: i64) -> Option<(f64, f64)> { if let Some(mmap) = &self.mmap { let offset = id as usize * 8; if offset + 8 <= mmap.len() { let chunk = &mmap[offset..offset+8]; let lat_i32 = i32::from_le_bytes(chunk[0..4].try_into().unwrap()); let lon_i32 = i32::from_le_bytes(chunk[4..8].try_into().unwrap()); if lat_i32 == 0 && lon_i32 == 0 { return None; } return Some((lat_i32 as f64 / 1e7, lon_i32 as f64 / 1e7)); } } None } } // Ramer-Douglas-Peucker simplification fn perpendicular_distance(p: (f64, f64), line_start: (f64, f64), line_end: (f64, f64)) -> f64 { let (x, y) = p; let (x1, y1) = line_start; let (x2, y2) = line_end; let dx = x2 - x1; let dy = y2 - y1; if dx == 0.0 && dy == 0.0 { return ((x - x1).powi(2) + (y - y1).powi(2)).sqrt(); } let num = (dy * x - dx * y + x2 * y1 - y2 * x1).abs(); let den = (dx.powi(2) + dy.powi(2)).sqrt(); num / den } fn simplify_points(points: &[(f64, f64)], epsilon: f64) -> Vec<(f64, f64)> { if points.len() < 3 { return points.to_vec(); } let start = points[0]; let end = points[points.len() - 1]; let mut max_dist = 0.0; let mut index = 0; for i in 1..points.len() - 1 { let dist = perpendicular_distance(points[i], start, end); if dist > max_dist { max_dist = dist; index = i; } } if max_dist > epsilon { let mut left = simplify_points(&points[..=index], epsilon); let mut right = simplify_points(&points[index..], epsilon); // Remove duplicate point at split left.pop(); left.extend(right); left } else { vec![start, end] } } fn triangulate_polygon(points: &[(f64, f64)]) -> Vec<(f64, f64)> { let mut flat_points = Vec::with_capacity(points.len() * 2); for (lat, lon) in points { flat_points.push(*lat); flat_points.push(*lon); } // We assume simple polygons (no holes) for now as we are just processing ways let indices = earcut(&flat_points, &[], 2).unwrap_or_default(); let mut triangles = Vec::with_capacity(indices.len()); for i in indices { triangles.push(points[i]); } triangles } fn should_include(tags: &HashMap, zoom: u32) -> bool { if zoom >= 14 { return true; } let highway = tags.get("highway").map(|s| s.as_str()); let place = tags.get("place").map(|s| s.as_str()); let natural = tags.get("natural").map(|s| s.as_str()); let railway = tags.get("railway").map(|s| s.as_str()); let waterway = tags.get("waterway").map(|s| s.as_str()); match zoom { 2 => { // Space View: Continents and Countries matches!(place, Some("continent" | "country")) || matches!(natural, Some("water")) || // Major water bodies matches!(highway, Some("motorway")) || // Added motorway matches!(tags.get("landuse").map(|s| s.as_str()), Some("forest" | "grass" | "meadow" | "farmland" | "residential")) || // Added more green + farmland/residential matches!(tags.get("leisure").map(|s| s.as_str()), Some("park" | "nature_reserve")) || // Added parks matches!(natural, Some("wood" | "scrub")) // Added wood/scrub }, 4 => { // Regional View (NEW) matches!(highway, Some("motorway" | "trunk")) || matches!(place, Some("city" | "town")) || matches!(natural, Some("water" | "wood" | "scrub" | "heath" | "wetland")) || matches!(tags.get("landuse").map(|s| s.as_str()), Some("forest" | "grass" | "meadow" | "farmland" | "residential")) || matches!(tags.get("leisure").map(|s| s.as_str()), Some("park" | "nature_reserve")) || matches!(waterway, Some("river")) }, 6 => { // Enterprise Grade: ONLY Motorways and Trunk roads. No primary/secondary. // ONLY Cities. No nature/landuse. matches!(highway, Some("motorway" | "trunk" | "primary")) || // Added primary matches!(place, Some("city")) || matches!(natural, Some("water" | "wood" | "scrub" | "heath" | "wetland")) || matches!(tags.get("landuse").map(|s| s.as_str()), Some("forest" | "grass" | "meadow" | "farmland" | "residential")) || matches!(tags.get("leisure").map(|s| s.as_str()), Some("park" | "nature_reserve")) || matches!(waterway, Some("river")) }, 9 => { // Enterprise Grade: Add Primary roads. // Add Towns. // Limited nature. matches!(highway, Some("motorway" | "trunk" | "primary")) || matches!(place, Some("city" | "town")) || matches!(railway, Some("rail")) || matches!(natural, Some("water" | "wood")) || matches!(tags.get("landuse").map(|s| s.as_str()), Some("forest")) || matches!(tags.get("leisure").map(|s| s.as_str()), Some("park")) || matches!(waterway, Some("river" | "riverbank")) }, 12 => { matches!(highway, Some("motorway" | "trunk" | "primary" | "secondary" | "tertiary")) || matches!(place, Some("city" | "town" | "village")) || matches!(railway, Some("rail")) || tags.contains_key("building") || tags.contains_key("landuse") || tags.contains_key("leisure") || matches!(natural, Some("water" | "wood" | "scrub" | "wetland" | "heath")) || matches!(waterway, Some("river" | "riverbank" | "stream")) }, _ => false } } #[tokio::main] async fn main() -> Result<()> { // Load .env file if present dotenv::dotenv().ok(); // Connect to ScyllaDB let uri = std::env::var("SCYLLA_URI").unwrap_or_else(|_| "127.0.0.1:9042".to_string()); println!("Connecting to ScyllaDB at {}...", uri); let session = loop { match SessionBuilder::new().known_node(&uri).build().await { Ok(session) => break session, Err(e) => { println!("Failed to connect to ScyllaDB: {}. Retrying in 5 seconds...", e); tokio::time::sleep(std::time::Duration::from_secs(5)).await; } } }; let session = std::sync::Arc::new(session); // Ensure schema exists session.query("CREATE KEYSPACE IF NOT EXISTS map_data WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }", &[]).await?; // Create tables session.query("CREATE TABLE IF NOT EXISTS map_data.nodes (zoom int, tile_x int, tile_y int, id bigint, lat double, lon double, tags map, PRIMARY KEY ((zoom, tile_x, tile_y), id))", &[]).await?; session.query("CREATE TABLE IF NOT EXISTS map_data.ways (zoom int, tile_x int, tile_y int, id bigint, tags map, points blob, PRIMARY KEY ((zoom, tile_x, tile_y), id))", &[]).await?; session.query("CREATE TABLE IF NOT EXISTS map_data.buildings (zoom int, tile_x int, tile_y int, id bigint, tags map, points blob, PRIMARY KEY ((zoom, tile_x, tile_y), id))", &[]).await?; session.query("CREATE TABLE IF NOT EXISTS map_data.water (zoom int, tile_x int, tile_y int, id bigint, tags map, points blob, PRIMARY KEY ((zoom, tile_x, tile_y), id))", &[]).await?; session.query("CREATE TABLE IF NOT EXISTS map_data.landuse (zoom int, tile_x int, tile_y int, id bigint, tags map, points blob, PRIMARY KEY ((zoom, tile_x, tile_y), id))", &[]).await?; session.query("CREATE TABLE IF NOT EXISTS map_data.railways (zoom int, tile_x int, tile_y int, id bigint, tags map, points blob, PRIMARY KEY ((zoom, tile_x, tile_y), id))", &[]).await?; // Prepare statements println!("Preparing statements..."); let insert_node = session.prepare("INSERT INTO map_data.nodes (zoom, tile_x, tile_y, id, lat, lon, tags) VALUES (?, ?, ?, ?, ?, ?, ?)").await?; let insert_ways = session.prepare("INSERT INTO map_data.ways (zoom, tile_x, tile_y, id, tags, points) VALUES (?, ?, ?, ?, ?, ?)").await?; let insert_buildings = session.prepare("INSERT INTO map_data.buildings (zoom, tile_x, tile_y, id, tags, points) VALUES (?, ?, ?, ?, ?, ?)").await?; let insert_water = session.prepare("INSERT INTO map_data.water (zoom, tile_x, tile_y, id, tags, points) VALUES (?, ?, ?, ?, ?, ?)").await?; let insert_landuse = session.prepare("INSERT INTO map_data.landuse (zoom, tile_x, tile_y, id, tags, points) VALUES (?, ?, ?, ?, ?, ?)").await?; let insert_railways = session.prepare("INSERT INTO map_data.railways (zoom, tile_x, tile_y, id, tags, points) VALUES (?, ?, ?, ?, ?, ?)").await?; println!("Statements prepared."); let path = std::env::var("OSM_PBF_PATH") .or_else(|_| std::env::var("HOST_PBF_PATH")) .unwrap_or_else(|_| "europe-latest.osm.pbf".to_string()); println!("Reading {}...", path); let reader = ElementReader::from_path(path)?; // Cache for node coordinates: ID -> (lat, lon) // Use flat file with mmap let cache_dir = std::env::var("CACHE_DIR").unwrap_or_else(|_| ".".to_string()); let cache_path = std::path::Path::new(&cache_dir).join("node_cache.bin"); println!("Using node cache at {:?}", cache_path); let mut node_store = NodeStore::new(cache_path.clone())?; // Channel for backpressure // Producer (reader) -> Consumer (writer) enum DbTask { Node { zoom: i32, id: i64, lat: f64, lon: f64, tags: HashMap, x: i32, y: i32 }, Way { zoom: i32, table: &'static str, id: i64, tags: HashMap, points: Vec, x: i32, y: i32 }, } let (tx, mut rx) = tokio::sync::mpsc::channel::(10_000); let session_clone = session.clone(); let consumer_handle = tokio::spawn(async move { let mut join_set = JoinSet::new(); let mut inserted_count = 0; let max_concurrent = std::env::var("CONCURRENT_INSERTS") .ok() .and_then(|s| s.parse().ok()) .unwrap_or(1024); // Default to 1024 concurrent inserts println!("Starting consumer with max_concurrent={}", max_concurrent); while let Some(task) = rx.recv().await { let session = session_clone.clone(); // Backpressure: limit concurrent inserts while join_set.len() >= max_concurrent { join_set.join_next().await; } match task { DbTask::Node { zoom, id, lat, lon, tags, x, y } => { let statement = insert_node.clone(); join_set.spawn(async move { let _ = session.execute( &statement, (zoom, x, y, id, lat, lon, tags), ).await; }); } DbTask::Way { zoom, table, id, tags, points, x, y } => { let statement = match table { "ways" => insert_ways.clone(), "buildings" => insert_buildings.clone(), "water" => insert_water.clone(), "landuse" => insert_landuse.clone(), "railways" => insert_railways.clone(), _ => panic!("Unknown table: {}", table), }; join_set.spawn(async move { let _ = session.execute( &statement, (zoom, x, y, id, tags, points), ).await; }); } } inserted_count += 1; } // Wait for remaining tasks while let Some(_) = join_set.join_next().await {} println!("Consumer finished. Total inserted tasks: {}", inserted_count); }); // Run the PBF reader in a blocking task to allow blocking_send let tx_clone = tx.clone(); let reader_handle = tokio::task::spawn_blocking(move || -> Result<(usize, usize)> { let tx = tx_clone; let mut node_count = 0; let mut way_count = 0; let mut ways_pending = false; // We process sequentially: Nodes first, then Ways. // osmpbf yields nodes then ways. // We need to detect when we switch from nodes to ways to prepare the store. reader.for_each(|element| { match element { Element::Node(node) => { node_count += 1; let _ = node_store.insert(node.id(), node.lat(), node.lon()); if node.tags().count() > 0 { let id = node.id(); let lat = node.lat(); let lon = node.lon(); let tags: HashMap = node.tags().map(|(k, v)| (k.to_string(), v.to_string())).collect(); for &zoom in &ZOOM_LEVELS { if should_include(&tags, zoom) { let (x, y) = lat_lon_to_tile(lat, lon, zoom); let task = DbTask::Node { zoom: zoom as i32, id, lat, lon, tags: tags.clone(), x, y }; let _ = tx.blocking_send(task); } } } } Element::DenseNode(node) => { node_count += 1; let _ = node_store.insert(node.id(), node.lat(), node.lon()); if node.tags().count() > 0 { let id = node.id(); let lat = node.lat(); let lon = node.lon(); let tags: HashMap = node.tags().map(|(k, v)| (k.to_string(), v.to_string())).collect(); for &zoom in &ZOOM_LEVELS { if should_include(&tags, zoom) { let (x, y) = lat_lon_to_tile(lat, lon, zoom); let task = DbTask::Node { zoom: zoom as i32, id, lat, lon, tags: tags.clone(), x, y }; let _ = tx.blocking_send(task); } } } } Element::Way(way) => { if !ways_pending { // First way encountered. Prepare store for reading. println!("Switching to Way processing. Flushing node cache..."); if let Err(e) = node_store.prepare_for_reading() { eprintln!("Failed to prepare node store: {}", e); return; } ways_pending = true; } way_count += 1; let tags: HashMap = way.tags().map(|(k, v)| (k.to_string(), v.to_string())).collect(); // Filter for highways/roads OR buildings OR landuse OR water OR railways let is_highway = tags.contains_key("highway"); let is_building = tags.contains_key("building"); let is_water = tags.get("natural").map(|v| v == "water" || v == "wetland").unwrap_or(false) || tags.get("waterway").map(|v| v == "riverbank" || v == "stream" || v == "river").unwrap_or(false) || tags.get("landuse").map(|v| v == "basin" || v == "reservoir").unwrap_or(false); let is_landuse = tags.contains_key("leisure") || tags.contains_key("landuse") || tags.get("natural").map(|v| v == "wood" || v == "scrub" || v == "heath" || v == "wetland").unwrap_or(false); let is_railway = tags.contains_key("railway"); if is_highway || is_building || is_water || is_landuse || is_railway { let mut points = Vec::new(); // Resolve nodes from store for node_id in way.refs() { if let Some((lat, lon)) = node_store.get(node_id) { points.push((lat, lon)); } } if points.len() >= 2 { let id = way.id(); // Insert into the tile of the first point let (first_lat, first_lon) = points[0]; for &zoom in &ZOOM_LEVELS { if !should_include(&tags, zoom) { continue; } // Apply simplification based on zoom level let base_epsilon = match zoom { 2 => 0.0001, 4 => 0.00005, 6 => 0.00002, 9 => 0.00001, 12 => 0.000005, _ => 0.0, }; let epsilon = if is_water || is_landuse || is_highway { base_epsilon * 0.5 // Preserve more detail for natural features AND roads } else { base_epsilon }; let simplified_points = if epsilon > 0.0 { simplify_points(&points, epsilon) } else { points.clone() }; // Serialize points let mut final_points = simplified_points; // Triangulate if it's a polygon type if is_building || is_water || is_landuse { // Close the loop if not closed if final_points.first() != final_points.last() { final_points.push(final_points[0]); } final_points = triangulate_polygon(&final_points); } if final_points.len() < 2 { continue; } let (first_lat, first_lon) = final_points[0]; let (x, y) = lat_lon_to_tile(first_lat, first_lon, zoom); let zoom_i32 = zoom as i32; let mut blob = Vec::with_capacity(final_points.len() * 8); // 4 bytes lat + 4 bytes lon for (lat, lon) in final_points { blob.extend_from_slice(&(lat as f32).to_le_bytes()); blob.extend_from_slice(&(lon as f32).to_le_bytes()); } if is_highway { let task = DbTask::Way { zoom: zoom_i32, table: "ways", id, tags: tags.clone(), points: blob.clone(), x, y }; let _ = tx.blocking_send(task); } if is_building { let task = DbTask::Way { zoom: zoom_i32, table: "buildings", id, tags: tags.clone(), points: blob.clone(), x, y }; let _ = tx.blocking_send(task); } if is_water { let task = DbTask::Way { zoom: zoom_i32, table: "water", id, tags: tags.clone(), points: blob.clone(), x, y }; let _ = tx.blocking_send(task); } if is_landuse { let task = DbTask::Way { zoom: zoom_i32, table: "landuse", id, tags: tags.clone(), points: blob.clone(), x, y }; let _ = tx.blocking_send(task); } if is_railway { let task = DbTask::Way { zoom: zoom_i32, table: "railways", id, tags: tags.clone(), points: blob.clone(), x, y }; let _ = tx.blocking_send(task); } } } } } _ => {} } if (node_count + way_count) % 100_000 == 0 { println!("Processed {} nodes, {} ways...", node_count, way_count); } })?; Ok((node_count, way_count)) }); let (node_count, way_count) = reader_handle.await??; println!("Finished reading PBF. Nodes: {}, Ways: {}. Waiting for consumer...", node_count, way_count); // Drop sender to signal consumer to finish drop(tx); // Wait for consumer consumer_handle.await?; // Clean up cache let _ = std::fs::remove_file(cache_path); println!("Done!"); Ok(()) } fn lat_lon_to_tile(lat: f64, lon: f64, zoom: u32) -> (i32, i32) { let n = 2.0f64.powi(zoom as i32); let x = (lon + 180.0) / 360.0 * n; let lat_rad = lat.to_radians(); let y = (1.0 - (lat_rad.tan() + (1.0 / lat_rad.cos())).ln() / std::f64::consts::PI) / 2.0 * n; (x as i32, y as i32) }