mod domain; mod repositories; mod services; mod parsers; // Empty for now, but kept for structure use anyhow::Result; use osmpbf::{Element, ElementReader}; use std::collections::HashMap; use tokio::task::JoinSet; use std::sync::Arc; use crate::domain::DbTask; use crate::repositories::{ scylla_repository::ScyllaRepository, node_store::NodeStore, way_store::WayStore, railway_store::RailwayStore, }; use crate::services::{ filtering_service::FilteringService, tile_service::TileService, geometry_service::GeometryService, multipolygon_service::MultipolygonService, railway_service::RailwayService, }; #[tokio::main] async fn main() -> Result<()> { // Load .env file if present dotenv::dotenv().ok(); // Connect to ScyllaDB let uri = std::env::var("SCYLLA_URI").unwrap_or_else(|_| "127.0.0.1:9042".to_string()); println!("Connecting to ScyllaDB at {}...", uri); let scylla_repo = Arc::new(ScyllaRepository::connect(&uri).await?); // Truncate tables scylla_repo.truncate_tables().await?; // Initialize GPU mesh generation service println!("Initializing GPU mesh generation service..."); let mesh_service = Arc::new( pollster::block_on(services::mesh_service::MeshGenerationService::new())? ); println!("Mesh service initialized!"); let path = std::env::var("OSM_PBF_PATH") .or_else(|_| std::env::var("HOST_PBF_PATH")) .unwrap_or_else(|_| "europe-latest.osm.pbf".to_string()); println!("Reading {}...", path); let reader = ElementReader::from_path(path)?; // Cache for node coordinates: ID -> (lat, lon) let cache_dir = std::env::var("CACHE_DIR").unwrap_or_else(|_| ".".to_string()); let cache_path = std::path::Path::new(&cache_dir).join("node_cache.bin"); println!("Using node cache at {:?}", cache_path); let mut node_store = NodeStore::new(cache_path.clone())?; // Channel for backpressure let (tx, mut rx) = tokio::sync::mpsc::channel::(10_000); let scylla_repo_clone = scylla_repo.clone(); let consumer_handle = tokio::spawn(async move { let mut join_set = JoinSet::new(); let mut inserted_count = 0; let max_concurrent = std::env::var("CONCURRENT_INSERTS") .ok() .and_then(|s| s.parse().ok()) .unwrap_or(1024); // Default to 1024 concurrent inserts println!("Starting consumer with max_concurrent={}", max_concurrent); while let Some(task) = rx.recv().await { let repo = scylla_repo_clone.clone(); // Backpressure: limit concurrent inserts while join_set.len() >= max_concurrent { join_set.join_next().await; } match task { DbTask::Node { zoom, id, lat, lon, tags, x, y } => { join_set.spawn(async move { let _ = repo.insert_node(zoom, id, lat, lon, tags, x, y).await; }); } DbTask::Way { zoom, table, id, tags, points, vertex_buffer, x, y } => { join_set.spawn(async move { let _ = repo.insert_way(table, zoom, id, tags, points, vertex_buffer, x, y).await; }); } } inserted_count += 1; } // Wait for remaining tasks while let Some(_) = join_set.join_next().await {} println!("Consumer finished. Total inserted tasks: {}", inserted_count); }); // Run the PBF reader in a blocking task let tx_clone = tx.clone(); let mesh_service_clone = mesh_service.clone(); let reader_handle = tokio::task::spawn_blocking(move || -> Result<(usize, usize, usize)> { let tx = tx_clone; let mesh_svc = mesh_service_clone; let mut node_count = 0; let mut way_count = 0; let mut relation_count = 0; let mut ways_pending = false; let mut relations_pending = false; let mut way_store = WayStore::new(); let mut railway_store = RailwayStore::new(); reader.for_each(|element| { match element { Element::Node(node) => { node_count += 1; let _ = node_store.insert(node.id(), node.lat(), node.lon()); if node.tags().count() > 0 { let id = node.id(); let lat = node.lat(); let lon = node.lon(); let tags: HashMap = node.tags().map(|(k, v)| (k.to_string(), v.to_string())).collect(); for &zoom in &FilteringService::ZOOM_LEVELS { if FilteringService::should_include(&tags, zoom) { let (x, y) = TileService::lat_lon_to_tile(lat, lon, zoom); let task = DbTask::Node { zoom: zoom as i32, id, lat, lon, tags: tags.clone(), x, y }; let _ = tx.blocking_send(task); } } } } Element::DenseNode(node) => { node_count += 1; let _ = node_store.insert(node.id(), node.lat(), node.lon()); if node.tags().count() > 0 { let id = node.id(); let lat = node.lat(); let lon = node.lon(); let tags: HashMap = node.tags().map(|(k, v)| (k.to_string(), v.to_string())).collect(); for &zoom in &FilteringService::ZOOM_LEVELS { if FilteringService::should_include(&tags, zoom) { let (x, y) = TileService::lat_lon_to_tile(lat, lon, zoom); let task = DbTask::Node { zoom: zoom as i32, id, lat, lon, tags: tags.clone(), x, y }; let _ = tx.blocking_send(task); } } } } Element::Way(way) => { if !ways_pending { println!("Switching to Way processing. Flushing node cache..."); if let Err(e) = node_store.prepare_for_reading() { eprintln!("Failed to prepare node store: {}", e); return; } ways_pending = true; } way_count += 1; let node_refs: Vec = way.refs().collect(); way_store.insert(way.id(), node_refs.clone()); let tags: HashMap = way.tags().map(|(k, v)| (k.to_string(), v.to_string())).collect(); let is_highway = tags.contains_key("highway"); let is_building = tags.contains_key("building"); let is_water_area = tags.get("natural").map(|v| v == "water" || v == "wetland" || v == "bay" || v == "strait").unwrap_or(false) || tags.get("place").map(|v| v == "sea" || v == "ocean").unwrap_or(false) || tags.get("waterway").map(|v| v == "riverbank" || v == "dock").unwrap_or(false) || tags.get("landuse").map(|v| v == "basin" || v == "reservoir").unwrap_or(false); let is_water_line = tags.get("waterway").map(|v| v == "stream" || v == "river" || v == "canal" || v == "drain" || v == "ditch").unwrap_or(false); let is_landuse = tags.contains_key("leisure") || tags.contains_key("landuse") || tags.get("natural").map(|v| v == "wood" || v == "scrub" || v == "heath" || v == "wetland").unwrap_or(false); let is_railway = tags.contains_key("railway"); if is_highway || is_building || is_water_area || is_water_line || is_landuse || is_railway { let mut points = Vec::new(); for node_id in way.refs() { if let Some((lat, lon)) = node_store.get(node_id) { points.push((lat, lon)); } } if points.len() >= 2 { let id = way.id(); let (first_lat, first_lon) = points[0]; let is_closed = points.first() == points.last(); let mut treat_as_water_area = is_water_area && is_closed; let mut treat_as_landuse = is_landuse && is_closed; let mut treat_as_building = is_building && is_closed; let mut treat_as_water_line = is_water_line || (is_water_area && !is_closed); if (is_landuse || is_building) && !is_closed { return; } for &zoom in &FilteringService::ZOOM_LEVELS { if !FilteringService::should_include(&tags, zoom) { continue; } let base_epsilon = match zoom { 2 => 0.01, 4 => 0.002, 6 => 0.0005, 9 => 0.0001, 12 => 0.000005, _ => 0.0, }; let epsilon = if treat_as_water_area || treat_as_landuse || is_highway || treat_as_water_line { if zoom <= 4 && treat_as_landuse { 0.0 } else if treat_as_water_area || treat_as_landuse { if zoom >= 9 { 0.0 } else { base_epsilon * 0.01 } } else { base_epsilon * 0.5 } } else { base_epsilon }; let simplified_points = if epsilon > 0.0 { GeometryService::simplify_points(&points, epsilon) } else { points.clone() }; let mut final_points = simplified_points.clone(); // Create blob for line features (highways/railways/water lines) let mut line_blob = Vec::with_capacity(simplified_points.len() * 8); for (lat, lon) in &simplified_points { line_blob.extend_from_slice(&(*lat as f32).to_le_bytes()); line_blob.extend_from_slice(&(*lon as f32).to_le_bytes()); } if treat_as_building || treat_as_water_area || treat_as_landuse { final_points = GeometryService::triangulate_polygon(&final_points); } if final_points.len() < 3 && (treat_as_building || treat_as_water_area || treat_as_landuse) { continue; } if simplified_points.len() < 2 && (is_highway || is_railway || treat_as_water_line) { continue; } let (first_lat, first_lon) = simplified_points[0]; let (x, y) = TileService::lat_lon_to_tile(first_lat, first_lon, zoom); let zoom_i32 = zoom as i32; let mut polygon_blob = Vec::with_capacity(final_points.len() * 8); for (lat, lon) in &final_points { polygon_blob.extend_from_slice(&(*lat as f32).to_le_bytes()); polygon_blob.extend_from_slice(&(*lon as f32).to_le_bytes()); } if is_highway || treat_as_water_line { // Generate road geometry let projected_points_raw: Vec<[f32; 2]> = simplified_points.iter() .map(|(lat, lon)| { let (x, y) = GeometryService::project(*lat, *lon); [x, y] }) .collect(); // Fix degenerate segments: Deduplicate consecutive points that are too close let mut projected_points = Vec::with_capacity(projected_points_raw.len()); if !projected_points_raw.is_empty() { projected_points.push(projected_points_raw[0]); for i in 1..projected_points_raw.len() { let prev = projected_points.last().unwrap(); let curr = projected_points_raw[i]; let dx = curr[0] - prev[0]; let dy = curr[1] - prev[1]; // 1.0e-11 is approx (3e-6)^2, which ensures we are safely above the 1e-6 degenerate threshold if (dx * dx + dy * dy) > 1.0e-11 { projected_points.push(curr); } } } let highway_tag = tags.get("highway").map(|s| s.as_str()); let road_type = match highway_tag.unwrap_or("") { "motorway" | "motorway_link" | "trunk" | "trunk_link" => 0.0, "primary" | "primary_link" => 1.0, "secondary" | "secondary_link" => 2.0, _ => 3.0, }; let default_lanes: f32 = match highway_tag.unwrap_or("") { "motorway" | "trunk" => 4.0, "motorway_link" | "trunk_link" | "primary" => 2.0, _ => 2.0, }; let lanes: f32 = tags.get("lanes") .and_then(|s| s.parse().ok()) .unwrap_or(default_lanes); // DEBUG: Enable verbose logging for specific way ID via environment variable let debug_way_id: Option = std::env::var("DEBUG_WAY_ID") .ok() .and_then(|s| s.parse().ok()); if debug_way_id == Some(id) { println!("DEBUG Way {}: Processing at zoom {}", id, zoom); println!(" - is_highway: {}", is_highway); println!(" - treat_as_water_line: {}", treat_as_water_line); println!(" - is_water_line: {}", is_water_line); println!(" - is_water_area: {}", is_water_area); println!(" - Original points: {}", points.len()); println!(" - Simplified points: {}", simplified_points.len()); println!(" - Projected points: {}", projected_points.len()); println!(" - First 5 simplified (lat/lon):"); for (i, p) in simplified_points.iter().take(5).enumerate() { println!(" [{:2}] lat={:.8}, lon={:.8}", i, p.0, p.1); } println!(" - First 5 projected (x/y):"); for (i, p) in projected_points.iter().take(5).enumerate() { println!(" [{:2}] x={:.8}, y={:.8}", i, p[0], p[1]); } // Check for consecutive duplicates let mut duplicates = 0; for i in 0..projected_points.len().saturating_sub(1) { let p1 = projected_points[i]; let p2 = projected_points[i + 1]; let dx = p2[0] - p1[0]; let dy = p2[1] - p1[1]; let dist = (dx * dx + dy * dy).sqrt(); if dist < 0.000001 { duplicates += 1; if duplicates <= 3 { println!(" DEGENERATE segment {}: dist={:.12}, p1=[{:.8},{:.8}], p2=[{:.8},{:.8}]", i, dist, p1[0], p1[1], p2[0], p2[1]); } } } if duplicates > 0 { println!(" - Total degenerate segments: {}/{}", duplicates, projected_points.len() - 1); } println!(" - Tags: {:?}", tags); } let vertex_buffer = if treat_as_water_line { if debug_way_id == Some(id) { println!(" - Using generate_polygon_geometry for water line"); } mesh_svc.generate_polygon_geometry(&projected_points) } else { if debug_way_id == Some(id) { println!(" - Using generate_road_geometry with lanes={}, road_type={}", lanes, road_type); } mesh_svc.generate_road_geometry(&projected_points, lanes, road_type) }; // DEBUG: Log buffer size for tracked way if debug_way_id == Some(id) { println!(" - vertex_buffer size = {} bytes", vertex_buffer.len()); println!(" - Expected vertex size: {} bytes", if treat_as_water_line { 8 } else { 24 }); } let task = DbTask::Way { zoom: zoom_i32, table: "ways", id, tags: tags.clone(), points: line_blob.clone(), vertex_buffer, x, y }; let _ = tx.blocking_send(task); } if treat_as_building { // Generate building mesh let projected_points: Vec<[f32; 2]> = final_points.iter() .map(|(lat, lon)| { let (x, y) = GeometryService::project(*lat, *lon); [x, y] }) .collect(); let building_type = tags.get("building").map(|s| s.as_str()).unwrap_or("yes"); let color: [f32; 3] = match building_type { "house" | "apartments" | "residential" | "detached" | "semidetached_house" | "terrace" | "dormitory" => [0.95, 0.94, 0.91], "commercial" | "retail" | "office" | "supermarket" | "kiosk" | "hotel" => [0.91, 0.89, 0.86], "industrial" | "warehouse" | "factory" | "manufacture" => [0.85, 0.84, 0.80], _ => [0.85, 0.85, 0.85], }; let vertex_buffer = mesh_svc.generate_building_geometry(&projected_points, color); let task = DbTask::Way { zoom: zoom_i32, table: "buildings", id, tags: tags.clone(), points: polygon_blob.clone(), vertex_buffer, x, y }; let _ = tx.blocking_send(task); } if treat_as_water_area { // Calculate bounding box for multi-tile insertion let mut min_lat = f64::MAX; let mut max_lat = f64::MIN; let mut min_lon = f64::MAX; let mut max_lon = f64::MIN; for (lat, lon) in &final_points { if *lat < min_lat { min_lat = *lat; } if *lat > max_lat { max_lat = *lat; } if *lon < min_lon { min_lon = *lon; } if *lon > max_lon { max_lon = *lon; } } // Get tiles covered by bounding box let (min_tile_x, min_tile_y) = TileService::lat_lon_to_tile(min_lat, min_lon, zoom); let (max_tile_x, max_tile_y) = TileService::lat_lon_to_tile(max_lat, max_lon, zoom); // Iterate over all tiles in bbox for tile_x in min_tile_x..=max_tile_x { for tile_y in min_tile_y..=max_tile_y { // Calculate tile origin and scale for relative coordinates let tile_count = 2_f64.powi(zoom as i32); let tile_size = 1.0 / tile_count; let tile_origin_x = tile_x as f64 * tile_size; let tile_origin_y = tile_y as f64 * tile_size; // Project points to global space then make relative to this tile let projected_points: Vec<[f32; 2]> = final_points.iter() .map(|(lat, lon)| { let (global_x, global_y) = GeometryService::project(*lat, *lon); // Convert to tile-relative (0..1 within tile) let relative_x = ((global_x as f64 - tile_origin_x) / tile_size) as f32; let relative_y = ((global_y as f64 - tile_origin_y) / tile_size) as f32; [relative_x, relative_y] }) .collect(); let vertex_buffer = mesh_svc.generate_polygon_geometry(&projected_points); let task = DbTask::Way { zoom: zoom_i32, table: "water", id, tags: tags.clone(), points: polygon_blob.clone(), vertex_buffer, x: tile_x, y: tile_y, }; let _ = tx.blocking_send(task); } } } if treat_as_landuse { // Generate landuse polygon mesh let projected_points: Vec<[f32; 2]> = final_points.iter() .map(|(lat, lon)| { let (x, y) = GeometryService::project(*lat, *lon); [x, y] }) .collect(); let vertex_buffer = mesh_svc.generate_polygon_geometry(&projected_points); let task = DbTask::Way { zoom: zoom_i32, table: "landuse", id, tags: tags.clone(), points: polygon_blob.clone(), vertex_buffer, x, y }; let _ = tx.blocking_send(task); } if is_railway { let (first_lat, first_lon) = simplified_points[0]; // Serialize as f64 for high precision let mut railway_blob = Vec::with_capacity(simplified_points.len() * 16); for (lat, lon) in &simplified_points { railway_blob.extend_from_slice(&lat.to_le_bytes()); railway_blob.extend_from_slice(&lon.to_le_bytes()); } railway_store.insert_way(id, tags.clone(), railway_blob, first_lat, first_lon); } } } } } Element::Relation(rel) => { if !relations_pending { println!("Switching to Relation processing..."); relations_pending = true; } relation_count += 1; let tags: HashMap = rel.tags().map(|(k, v)| (k.to_string(), v.to_string())).collect(); if let Some(colour) = RailwayService::get_route_color(&tags) { for member in rel.members() { if let osmpbf::RelMemberType::Way = member.member_type { railway_store.set_color(member.member_id, colour.clone()); } } } // Also extract line ref (e.g., S1, U4) from route relations if let Some(line_ref) = tags.get("ref") { // Only propagate S-Bahn/U-Bahn style refs (starts with S or U followed by digit) if (line_ref.starts_with('S') || line_ref.starts_with('U')) && line_ref.len() >= 2 { // Only log if verbose debugging is enabled if std::env::var("VERBOSE_DEBUG").is_ok() { let member_count = rel.members().filter(|m| matches!(m.member_type, osmpbf::RelMemberType::Way)).count(); println!("DEBUG: Found transit line ref '{}' with {} way members", line_ref, member_count); } for member in rel.members() { if let osmpbf::RelMemberType::Way = member.member_type { railway_store.set_ref(member.member_id, line_ref.clone()); } } } } if tags.get("type").map(|t| t == "multipolygon").unwrap_or(false) { let is_water = tags.get("natural").map(|v| v == "water" || v == "wetland" || v == "bay").unwrap_or(false) || tags.get("waterway").map(|v| v == "riverbank" || v == "river" || v == "canal").unwrap_or(false) || tags.get("water").is_some() || tags.get("landuse").map(|v| v == "basin" || v == "reservoir").unwrap_or(false); let is_landuse = tags.get("landuse").is_some() || tags.get("leisure").map(|v| v == "park" || v == "nature_reserve" || v == "garden").unwrap_or(false) || tags.get("natural").map(|v| v == "wood" || v == "scrub" || v == "heath").unwrap_or(false); if is_water || is_landuse { let mut outer_ways: Vec = Vec::new(); for member in rel.members() { if member.role().unwrap_or("") == "outer" { if let osmpbf::RelMemberType::Way = member.member_type { outer_ways.push(member.member_id); } } } if !outer_ways.is_empty() { let rings = MultipolygonService::assemble_rings(&outer_ways, &way_store); for ring_node_ids in rings { let mut points: Vec<(f64, f64)> = Vec::new(); for node_id in &ring_node_ids { if let Some((lat, lon)) = node_store.get(*node_id) { points.push((lat, lon)); } } if points.len() >= 4 { let id = rel.id(); let (first_lat, first_lon) = points[0]; for &zoom in &FilteringService::ZOOM_LEVELS { if !FilteringService::should_include(&tags, zoom) { continue; } // No simplification for multipolygons let final_points = GeometryService::triangulate_polygon(&points); if final_points.len() < 3 { continue; } let (x, y) = TileService::lat_lon_to_tile(first_lat, first_lon, zoom); let zoom_i32 = zoom as i32; let mut polygon_blob = Vec::with_capacity(final_points.len() * 8); for (lat, lon) in &final_points { polygon_blob.extend_from_slice(&(*lat as f32).to_le_bytes()); polygon_blob.extend_from_slice(&(*lon as f32).to_le_bytes()); } let table = if is_water { "water" } else { "landuse" }; // For water, use multi-tile insertion with tile-relative coords if is_water { // Calculate bounding box let mut min_lat = f64::MAX; let mut max_lat = f64::MIN; let mut min_lon = f64::MAX; let mut max_lon = f64::MIN; for (lat, lon) in &final_points { if *lat < min_lat { min_lat = *lat; } if *lat > max_lat { max_lat = *lat; } if *lon < min_lon { min_lon = *lon; } if *lon > max_lon { max_lon = *lon; } } let (min_tile_x, min_tile_y) = TileService::lat_lon_to_tile(min_lat, min_lon, zoom); let (max_tile_x, max_tile_y) = TileService::lat_lon_to_tile(max_lat, max_lon, zoom); // Iterate over all tiles for tile_x in min_tile_x..=max_tile_x { for tile_y in min_tile_y..=max_tile_y { let tile_count = 2_f64.powi(zoom as i32); let tile_size = 1.0 / tile_count; let tile_origin_x = tile_x as f64 * tile_size; let tile_origin_y = tile_y as f64 * tile_size; let projected_points: Vec<[f32; 2]> = final_points.iter() .map(|(lat, lon)| { let (global_x, global_y) = GeometryService::project(*lat, *lon); let relative_x = ((global_x as f64 - tile_origin_x) / tile_size) as f32; let relative_y = ((global_y as f64 - tile_origin_y) / tile_size) as f32; [relative_x, relative_y] }) .collect(); let vertex_buffer = mesh_svc.generate_polygon_geometry(&projected_points); let task = DbTask::Way { zoom: zoom_i32, table, id, tags: tags.clone(), points: polygon_blob.clone(), vertex_buffer, x: tile_x, y: tile_y, }; let _ = tx.blocking_send(task); } } } else { // Landuse: keep old single-tile logic for now let (x, y) = TileService::lat_lon_to_tile(first_lat, first_lon, zoom); let projected_points: Vec<[f32; 2]> = final_points.iter() .map(|(lat, lon)| { let (x, y) = GeometryService::project(*lat, *lon); [x, y] }) .collect(); let vertex_buffer = mesh_svc.generate_polygon_geometry(&projected_points); let task = DbTask::Way { zoom: zoom_i32, table, id, tags: tags.clone(), points: polygon_blob.clone(), vertex_buffer, x, y, }; let _ = tx.blocking_send(task); } } } } } } } } _ => {} } if (node_count + way_count + relation_count) % 100_000 == 0 { println!("Processed {} nodes, {} ways, {} relations...", node_count, way_count, relation_count); } })?; let (railways, colors, refs) = railway_store.into_data(); println!("Inserting {} railway ways with colors and line refs...", railways.len()); for (id, railway) in railways { let mut tags = railway.tags; if let Some(colour) = colors.get(&id) { tags.insert("colour".to_string(), colour.clone()); } if let Some(line_ref) = refs.get(&id) { tags.insert("line_ref".to_string(), line_ref.clone()); } // Insert for all applicable zoom levels for &zoom in &FilteringService::ZOOM_LEVELS { if !FilteringService::should_include(&tags, zoom) { continue; } let (x, y) = TileService::lat_lon_to_tile(railway.first_lat, railway.first_lon, zoom); let zoom_i32 = zoom as i32; // Parse geometry from blob (f64) and generate tile-relative railway mesh let mut points: Vec<[f32; 2]> = Vec::new(); // Calculate tile origin and scale let tile_count = 2_f64.powi(zoom as i32); let tile_size = 1.0 / tile_count; let tile_origin_x = x as f64 * tile_size; let tile_origin_y = y as f64 * tile_size; for chunk in railway.points.chunks(16) { if chunk.len() < 16 { break; } let lat = f64::from_le_bytes(chunk[0..8].try_into().unwrap_or([0u8; 8])); let lon = f64::from_le_bytes(chunk[8..16].try_into().unwrap_or([0u8; 8])); // Project to Global let (gx, gy) = GeometryService::project_high_precision(lat, lon); // Convert to Tile-Relative (0.0 to 1.0) let rx = ((gx as f64 - tile_origin_x) / tile_size) as f32; let ry = ((gy as f64 - tile_origin_y) / tile_size) as f32; points.push([rx, ry]); } // Parse color and rail type let color_str = tags.get("colour").or(tags.get("color")); let color = color_str .map(|c| { let c = c.trim_start_matches('#'); if c.len() == 6 { let r = u8::from_str_radix(&c[0..2], 16).unwrap_or(0) as f32 / 255.0; let g = u8::from_str_radix(&c[2..4], 16).unwrap_or(0) as f32 / 255.0; let b = u8::from_str_radix(&c[4..6], 16).unwrap_or(0) as f32 / 255.0; [r, g, b] } else { [0.0, 0.0, 0.0] } }) .unwrap_or([0.0, 0.0, 0.0]); let rail_type_str = tags.get("railway").map(|s| s.as_str()).unwrap_or("rail"); let rail_type: f32 = match rail_type_str { "subway" => 1.0, // "tram" => 2.0, // Disabled _ => 0.0, }; let vertex_buffer = mesh_svc.generate_railway_geometry(&points, color, rail_type); let task = DbTask::Way { zoom: zoom_i32, table: "railways", id, tags: tags.clone(), points: railway.points.clone(), vertex_buffer, x, y }; let _ = tx.blocking_send(task); } } println!("Railway insertion complete."); Ok((node_count, way_count, relation_count)) }); let (node_count, way_count, relation_count) = reader_handle.await??; println!("Finished reading PBF. Nodes: {}, Ways: {}, Relations: {}. Waiting for consumer...", node_count, way_count, relation_count); // Drop sender to signal consumer to finish drop(tx); // Wait for consumer consumer_handle.await?; // Clean up cache let _ = std::fs::remove_file(cache_path); // Run major compaction to clean up tombstones from TRUNCATE println!("Running major compaction to clean up tombstones..."); let tables = ["nodes", "ways", "buildings", "water", "landuse", "railways"]; for table in &tables { println!("Compacting map_data.{}...", table); let query = format!("ALTER TABLE map_data.{} WITH gc_grace_seconds = 0", table); let _ = scylla_repo.get_session().query(query, &[]).await; } println!("Compaction settings updated. Tombstones will be cleaned during next compaction cycle."); println!("For immediate compaction, run: docker exec scylla nodetool compact map_data"); println!("Import complete!"); Ok(()) }