Files
maps/importer/src/main.rs
Dongho Kim 1dcdce3ef1 update
2025-12-18 07:36:51 +09:00

437 lines
21 KiB
Rust

mod domain;
mod repositories;
mod services;
mod parsers; // Empty for now, but kept for structure
use anyhow::Result;
use osmpbf::{Element, ElementReader};
use std::collections::HashMap;
use tokio::task::JoinSet;
use std::sync::Arc;
use crate::domain::DbTask;
use crate::repositories::{
scylla_repository::ScyllaRepository,
node_store::NodeStore,
way_store::WayStore,
railway_store::RailwayStore,
};
use crate::services::{
filtering_service::FilteringService,
tile_service::TileService,
geometry_service::GeometryService,
multipolygon_service::MultipolygonService,
railway_service::RailwayService,
};
#[tokio::main]
async fn main() -> Result<()> {
// Load .env file if present
dotenv::dotenv().ok();
// Connect to ScyllaDB
let uri = std::env::var("SCYLLA_URI").unwrap_or_else(|_| "127.0.0.1:9042".to_string());
println!("Connecting to ScyllaDB at {}...", uri);
let scylla_repo = Arc::new(ScyllaRepository::connect(&uri).await?);
// Truncate tables
scylla_repo.truncate_tables().await?;
let path = std::env::var("OSM_PBF_PATH")
.or_else(|_| std::env::var("HOST_PBF_PATH"))
.unwrap_or_else(|_| "europe-latest.osm.pbf".to_string());
println!("Reading {}...", path);
let reader = ElementReader::from_path(path)?;
// Cache for node coordinates: ID -> (lat, lon)
let cache_dir = std::env::var("CACHE_DIR").unwrap_or_else(|_| ".".to_string());
let cache_path = std::path::Path::new(&cache_dir).join("node_cache.bin");
println!("Using node cache at {:?}", cache_path);
let mut node_store = NodeStore::new(cache_path.clone())?;
// Channel for backpressure
let (tx, mut rx) = tokio::sync::mpsc::channel::<DbTask>(10_000);
let scylla_repo_clone = scylla_repo.clone();
let consumer_handle = tokio::spawn(async move {
let mut join_set = JoinSet::new();
let mut inserted_count = 0;
let max_concurrent = std::env::var("CONCURRENT_INSERTS")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(1024); // Default to 1024 concurrent inserts
println!("Starting consumer with max_concurrent={}", max_concurrent);
while let Some(task) = rx.recv().await {
let repo = scylla_repo_clone.clone();
// Backpressure: limit concurrent inserts
while join_set.len() >= max_concurrent {
join_set.join_next().await;
}
match task {
DbTask::Node { zoom, id, lat, lon, tags, x, y } => {
join_set.spawn(async move {
let _ = repo.insert_node(zoom, id, lat, lon, tags, x, y).await;
});
}
DbTask::Way { zoom, table, id, tags, points, x, y } => {
join_set.spawn(async move {
let _ = repo.insert_way(table, zoom, id, tags, points, x, y).await;
});
}
}
inserted_count += 1;
}
// Wait for remaining tasks
while let Some(_) = join_set.join_next().await {}
println!("Consumer finished. Total inserted tasks: {}", inserted_count);
});
// Run the PBF reader in a blocking task
let tx_clone = tx.clone();
let reader_handle = tokio::task::spawn_blocking(move || -> Result<(usize, usize, usize)> {
let tx = tx_clone;
let mut node_count = 0;
let mut way_count = 0;
let mut relation_count = 0;
let mut ways_pending = false;
let mut relations_pending = false;
let mut way_store = WayStore::new();
let mut railway_store = RailwayStore::new();
reader.for_each(|element| {
match element {
Element::Node(node) => {
node_count += 1;
let _ = node_store.insert(node.id(), node.lat(), node.lon());
if node.tags().count() > 0 {
let id = node.id();
let lat = node.lat();
let lon = node.lon();
let tags: HashMap<String, String> = node.tags().map(|(k, v)| (k.to_string(), v.to_string())).collect();
for &zoom in &FilteringService::ZOOM_LEVELS {
if FilteringService::should_include(&tags, zoom) {
let (x, y) = TileService::lat_lon_to_tile(lat, lon, zoom);
let task = DbTask::Node { zoom: zoom as i32, id, lat, lon, tags: tags.clone(), x, y };
let _ = tx.blocking_send(task);
}
}
}
}
Element::DenseNode(node) => {
node_count += 1;
let _ = node_store.insert(node.id(), node.lat(), node.lon());
if node.tags().count() > 0 {
let id = node.id();
let lat = node.lat();
let lon = node.lon();
let tags: HashMap<String, String> = node.tags().map(|(k, v)| (k.to_string(), v.to_string())).collect();
for &zoom in &FilteringService::ZOOM_LEVELS {
if FilteringService::should_include(&tags, zoom) {
let (x, y) = TileService::lat_lon_to_tile(lat, lon, zoom);
let task = DbTask::Node { zoom: zoom as i32, id, lat, lon, tags: tags.clone(), x, y };
let _ = tx.blocking_send(task);
}
}
}
}
Element::Way(way) => {
if !ways_pending {
println!("Switching to Way processing. Flushing node cache...");
if let Err(e) = node_store.prepare_for_reading() {
eprintln!("Failed to prepare node store: {}", e);
return;
}
ways_pending = true;
}
way_count += 1;
let node_refs: Vec<i64> = way.refs().collect();
way_store.insert(way.id(), node_refs.clone());
let tags: HashMap<String, String> = way.tags().map(|(k, v)| (k.to_string(), v.to_string())).collect();
let is_highway = tags.contains_key("highway");
let is_building = tags.contains_key("building");
let is_water_area = tags.get("natural").map(|v| v == "water" || v == "wetland" || v == "bay" || v == "strait").unwrap_or(false) ||
tags.get("place").map(|v| v == "sea" || v == "ocean").unwrap_or(false) ||
tags.get("waterway").map(|v| v == "riverbank" || v == "dock").unwrap_or(false) ||
tags.get("landuse").map(|v| v == "basin" || v == "reservoir").unwrap_or(false);
let is_water_line = tags.get("waterway").map(|v| v == "stream" || v == "river" || v == "canal" || v == "drain" || v == "ditch").unwrap_or(false);
let is_landuse = tags.contains_key("leisure") ||
tags.contains_key("landuse") ||
tags.get("natural").map(|v| v == "wood" || v == "scrub" || v == "heath" || v == "wetland").unwrap_or(false);
let is_railway = tags.contains_key("railway");
if is_highway || is_building || is_water_area || is_water_line || is_landuse || is_railway {
let mut points = Vec::new();
for node_id in way.refs() {
if let Some((lat, lon)) = node_store.get(node_id) {
points.push((lat, lon));
}
}
if points.len() >= 2 {
let id = way.id();
let (first_lat, first_lon) = points[0];
let is_closed = points.first() == points.last();
let mut treat_as_water_area = is_water_area && is_closed;
let mut treat_as_landuse = is_landuse && is_closed;
let mut treat_as_building = is_building && is_closed;
let mut treat_as_water_line = is_water_line || (is_water_area && !is_closed);
if (is_landuse || is_building) && !is_closed {
return;
}
for &zoom in &FilteringService::ZOOM_LEVELS {
if !FilteringService::should_include(&tags, zoom) { continue; }
let base_epsilon = match zoom {
2 => 0.01,
4 => 0.002,
6 => 0.0005,
9 => 0.0001,
12 => 0.000005,
_ => 0.0,
};
let epsilon = if treat_as_water_area || treat_as_landuse || is_highway || treat_as_water_line {
if zoom <= 4 && treat_as_landuse {
0.0
} else if treat_as_water_area || treat_as_landuse {
if zoom >= 9 { 0.0 } else { base_epsilon * 0.01 }
} else {
base_epsilon * 0.5
}
} else {
base_epsilon
};
let simplified_points = if epsilon > 0.0 {
GeometryService::simplify_points(&points, epsilon)
} else {
points.clone()
};
let mut final_points = simplified_points.clone();
// Create blob for line features (highways/railways/water lines)
let mut line_blob = Vec::with_capacity(simplified_points.len() * 8);
for (lat, lon) in &simplified_points {
line_blob.extend_from_slice(&(*lat as f32).to_le_bytes());
line_blob.extend_from_slice(&(*lon as f32).to_le_bytes());
}
if treat_as_building || treat_as_water_area || treat_as_landuse {
final_points = GeometryService::triangulate_polygon(&final_points);
}
if final_points.len() < 3 && (treat_as_building || treat_as_water_area || treat_as_landuse) { continue; }
if simplified_points.len() < 2 && (is_highway || is_railway || treat_as_water_line) { continue; }
let (first_lat, first_lon) = simplified_points[0];
let (x, y) = TileService::lat_lon_to_tile(first_lat, first_lon, zoom);
let zoom_i32 = zoom as i32;
let mut polygon_blob = Vec::with_capacity(final_points.len() * 8);
for (lat, lon) in &final_points {
polygon_blob.extend_from_slice(&(*lat as f32).to_le_bytes());
polygon_blob.extend_from_slice(&(*lon as f32).to_le_bytes());
}
if is_highway || treat_as_water_line {
let task = DbTask::Way { zoom: zoom_i32, table: "ways", id, tags: tags.clone(), points: line_blob.clone(), x, y };
let _ = tx.blocking_send(task);
}
if treat_as_building {
let task = DbTask::Way { zoom: zoom_i32, table: "buildings", id, tags: tags.clone(), points: polygon_blob.clone(), x, y };
let _ = tx.blocking_send(task);
}
if treat_as_water_area {
let task = DbTask::Way { zoom: zoom_i32, table: "water", id, tags: tags.clone(), points: polygon_blob.clone(), x, y };
let _ = tx.blocking_send(task);
}
if treat_as_landuse {
let task = DbTask::Way { zoom: zoom_i32, table: "landuse", id, tags: tags.clone(), points: polygon_blob.clone(), x, y };
let _ = tx.blocking_send(task);
}
if is_railway {
let (first_lat, first_lon) = simplified_points[0];
railway_store.insert_way(id, tags.clone(), line_blob.clone(), first_lat, first_lon);
}
}
}
}
}
Element::Relation(rel) => {
if !relations_pending {
println!("Switching to Relation processing...");
relations_pending = true;
}
relation_count += 1;
let tags: HashMap<String, String> = rel.tags().map(|(k, v)| (k.to_string(), v.to_string())).collect();
if let Some(colour) = RailwayService::get_route_color(&tags) {
for member in rel.members() {
if let osmpbf::RelMemberType::Way = member.member_type {
railway_store.set_color(member.member_id, colour.clone());
}
}
}
if tags.get("type").map(|t| t == "multipolygon").unwrap_or(false) {
let is_water = tags.get("natural").map(|v| v == "water" || v == "wetland" || v == "bay").unwrap_or(false) ||
tags.get("waterway").map(|v| v == "riverbank" || v == "river" || v == "canal").unwrap_or(false) ||
tags.get("water").is_some() ||
tags.get("landuse").map(|v| v == "basin" || v == "reservoir").unwrap_or(false);
let is_landuse = tags.get("landuse").is_some() ||
tags.get("leisure").map(|v| v == "park" || v == "nature_reserve" || v == "garden").unwrap_or(false) ||
tags.get("natural").map(|v| v == "wood" || v == "scrub" || v == "heath").unwrap_or(false);
if is_water || is_landuse {
let mut outer_ways: Vec<i64> = Vec::new();
for member in rel.members() {
if member.role().unwrap_or("") == "outer" {
if let osmpbf::RelMemberType::Way = member.member_type {
outer_ways.push(member.member_id);
}
}
}
if !outer_ways.is_empty() {
let rings = MultipolygonService::assemble_rings(&outer_ways, &way_store);
for ring_node_ids in rings {
let mut points: Vec<(f64, f64)> = Vec::new();
for node_id in &ring_node_ids {
if let Some((lat, lon)) = node_store.get(*node_id) {
points.push((lat, lon));
}
}
if points.len() >= 4 {
let id = rel.id();
let (first_lat, first_lon) = points[0];
for &zoom in &FilteringService::ZOOM_LEVELS {
if !FilteringService::should_include(&tags, zoom) { continue; }
// No simplification for multipolygons
let final_points = GeometryService::triangulate_polygon(&points);
if final_points.len() < 3 { continue; }
let (x, y) = TileService::lat_lon_to_tile(first_lat, first_lon, zoom);
let zoom_i32 = zoom as i32;
let mut polygon_blob = Vec::with_capacity(final_points.len() * 8);
for (lat, lon) in &final_points {
polygon_blob.extend_from_slice(&(*lat as f32).to_le_bytes());
polygon_blob.extend_from_slice(&(*lon as f32).to_le_bytes());
}
let table = if is_water { "water" } else { "landuse" };
let task = DbTask::Way { zoom: zoom_i32, table, id, tags: tags.clone(), points: polygon_blob.clone(), x, y };
let _ = tx.blocking_send(task);
}
}
}
}
}
}
}
_ => {}
}
if (node_count + way_count + relation_count) % 100_000 == 0 {
println!("Processed {} nodes, {} ways, {} relations...", node_count, way_count, relation_count);
}
})?;
let (railways, colors) = railway_store.into_data();
println!("Inserting {} railway ways with colors...", railways.len());
for (id, railway) in railways {
let mut tags = railway.tags;
if let Some(colour) = colors.get(&id) {
tags.insert("colour".to_string(), colour.clone());
}
// Insert for all applicable zoom levels
for &zoom in &FilteringService::ZOOM_LEVELS {
if !FilteringService::should_include(&tags, zoom) { continue; }
let (x, y) = TileService::lat_lon_to_tile(railway.first_lat, railway.first_lon, zoom);
let zoom_i32 = zoom as i32;
let task = DbTask::Way {
zoom: zoom_i32,
table: "railways",
id,
tags: tags.clone(),
points: railway.points.clone(),
x,
y
};
let _ = tx.blocking_send(task);
}
}
println!("Railway insertion complete.");
Ok((node_count, way_count, relation_count))
});
let (node_count, way_count, relation_count) = reader_handle.await??;
println!("Finished reading PBF. Nodes: {}, Ways: {}, Relations: {}. Waiting for consumer...", node_count, way_count, relation_count);
// Drop sender to signal consumer to finish
drop(tx);
// Wait for consumer
consumer_handle.await?;
// Clean up cache
let _ = std::fs::remove_file(cache_path);
// Run major compaction to clean up tombstones from TRUNCATE
println!("Running major compaction to clean up tombstones...");
let tables = ["nodes", "ways", "buildings", "water", "landuse", "railways"];
for table in &tables {
println!("Compacting map_data.{}...", table);
let query = format!("ALTER TABLE map_data.{} WITH gc_grace_seconds = 0", table);
let _ = scylla_repo.get_session().query(query, &[]).await;
}
println!("Compaction settings updated. Tombstones will be cleaned during next compaction cycle.");
println!("For immediate compaction, run: docker exec scylla nodetool compact map_data");
println!("Import complete!");
Ok(())
}