Files
maps/importer/src/main.rs
Dongho Kim 169997eecd update
2025-12-14 02:25:03 +09:00

559 lines
25 KiB
Rust

use anyhow::Result;
use earcutr::earcut;
use osmpbf::{Element, ElementReader};
use scylla::SessionBuilder;
use std::collections::HashMap;
use tokio::task::JoinSet;
use std::fs::{File, OpenOptions};
use std::io::{BufWriter, Write, Seek, SeekFrom};
use std::path::{Path, PathBuf};
use memmap2::Mmap;
const ZOOM_LEVELS: [u32; 6] = [2, 4, 6, 9, 12, 14];
struct NodeStore {
writer: Option<BufWriter<File>>,
mmap: Option<Mmap>,
path: PathBuf,
last_id: i64,
}
impl NodeStore {
fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
let path = path.as_ref().to_path_buf();
let file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(true)
.open(&path)?;
let writer = BufWriter::with_capacity(10 * 1024 * 1024, file); // 10MB buffer
Ok(Self {
writer: Some(writer),
mmap: None,
path,
last_id: -1,
})
}
fn insert(&mut self, id: i64, lat: f64, lon: f64) -> Result<()> {
if let Some(writer) = &mut self.writer {
if id > self.last_id + 1 {
let gap = id - self.last_id - 1;
writer.seek(SeekFrom::Current(gap * 8))?;
} else if id <= self.last_id {
writer.seek(SeekFrom::Start(id as u64 * 8))?;
}
let lat_i32 = (lat * 1e7) as i32;
let lon_i32 = (lon * 1e7) as i32;
writer.write_all(&lat_i32.to_le_bytes())?;
writer.write_all(&lon_i32.to_le_bytes())?;
self.last_id = id;
}
Ok(())
}
fn prepare_for_reading(&mut self) -> Result<()> {
self.writer = None; // Flush and close writer
let file = File::open(&self.path)?;
let mmap = unsafe { Mmap::map(&file)? };
self.mmap = Some(mmap);
Ok(())
}
fn get(&self, id: i64) -> Option<(f64, f64)> {
if let Some(mmap) = &self.mmap {
let offset = id as usize * 8;
if offset + 8 <= mmap.len() {
let chunk = &mmap[offset..offset+8];
let lat_i32 = i32::from_le_bytes(chunk[0..4].try_into().unwrap());
let lon_i32 = i32::from_le_bytes(chunk[4..8].try_into().unwrap());
if lat_i32 == 0 && lon_i32 == 0 { return None; }
return Some((lat_i32 as f64 / 1e7, lon_i32 as f64 / 1e7));
}
}
None
}
}
// Ramer-Douglas-Peucker simplification
fn perpendicular_distance(p: (f64, f64), line_start: (f64, f64), line_end: (f64, f64)) -> f64 {
let (x, y) = p;
let (x1, y1) = line_start;
let (x2, y2) = line_end;
let dx = x2 - x1;
let dy = y2 - y1;
if dx == 0.0 && dy == 0.0 {
return ((x - x1).powi(2) + (y - y1).powi(2)).sqrt();
}
let num = (dy * x - dx * y + x2 * y1 - y2 * x1).abs();
let den = (dx.powi(2) + dy.powi(2)).sqrt();
num / den
}
fn simplify_points(points: &[(f64, f64)], epsilon: f64) -> Vec<(f64, f64)> {
if points.len() < 3 {
return points.to_vec();
}
let start = points[0];
let end = points[points.len() - 1];
let mut max_dist = 0.0;
let mut index = 0;
for i in 1..points.len() - 1 {
let dist = perpendicular_distance(points[i], start, end);
if dist > max_dist {
max_dist = dist;
index = i;
}
}
if max_dist > epsilon {
let mut left = simplify_points(&points[..=index], epsilon);
let mut right = simplify_points(&points[index..], epsilon);
// Remove duplicate point at split
left.pop();
left.extend(right);
left
} else {
vec![start, end]
}
}
fn triangulate_polygon(points: &[(f64, f64)]) -> Vec<(f64, f64)> {
let mut flat_points = Vec::with_capacity(points.len() * 2);
for (lat, lon) in points {
flat_points.push(*lat);
flat_points.push(*lon);
}
// We assume simple polygons (no holes) for now as we are just processing ways
let indices = earcut(&flat_points, &[], 2).unwrap_or_default();
let mut triangles = Vec::with_capacity(indices.len());
for i in indices {
triangles.push(points[i]);
}
triangles
}
fn should_include(tags: &HashMap<String, String>, zoom: u32) -> bool {
if zoom >= 14 { return true; }
let highway = tags.get("highway").map(|s| s.as_str());
let place = tags.get("place").map(|s| s.as_str());
let natural = tags.get("natural").map(|s| s.as_str());
let railway = tags.get("railway").map(|s| s.as_str());
let waterway = tags.get("waterway").map(|s| s.as_str());
match zoom {
2 => {
// Space View: Continents and Countries
matches!(place, Some("continent" | "country" | "sea" | "ocean")) ||
matches!(natural, Some("water" | "bay" | "strait")) || // Major water bodies
matches!(highway, Some("motorway")) || // Added motorway
matches!(tags.get("landuse").map(|s| s.as_str()), Some("forest" | "grass" | "meadow" | "farmland" | "residential")) || // Added more green + farmland/residential
matches!(tags.get("leisure").map(|s| s.as_str()), Some("park" | "nature_reserve")) || // Added parks
matches!(natural, Some("wood" | "scrub")) // Added wood/scrub
},
4 => {
// Regional View (NEW)
matches!(highway, Some("motorway" | "trunk")) ||
matches!(place, Some("city" | "town" | "sea" | "ocean")) ||
matches!(natural, Some("water" | "wood" | "scrub" | "heath" | "wetland" | "bay" | "strait")) ||
matches!(tags.get("landuse").map(|s| s.as_str()), Some("forest" | "grass" | "meadow" | "farmland" | "residential")) ||
matches!(tags.get("leisure").map(|s| s.as_str()), Some("park" | "nature_reserve")) ||
matches!(waterway, Some("river"))
},
6 => {
// Enterprise Grade: ONLY Motorways and Trunk roads. No primary/secondary.
// ONLY Cities. No nature/landuse.
matches!(highway, Some("motorway" | "trunk" | "primary")) || // Added primary
matches!(place, Some("city" | "sea" | "ocean")) ||
matches!(natural, Some("water" | "wood" | "scrub" | "heath" | "wetland" | "bay" | "strait")) ||
matches!(tags.get("landuse").map(|s| s.as_str()), Some("forest" | "grass" | "meadow" | "farmland" | "residential")) ||
matches!(tags.get("leisure").map(|s| s.as_str()), Some("park" | "nature_reserve")) ||
matches!(waterway, Some("river"))
},
9 => {
// Enterprise Grade: Add Primary roads.
// Add Towns.
// Limited nature.
matches!(highway, Some("motorway" | "trunk" | "primary")) ||
matches!(place, Some("city" | "town" | "sea" | "ocean")) ||
matches!(railway, Some("rail")) ||
matches!(natural, Some("water" | "wood" | "bay" | "strait")) ||
matches!(tags.get("landuse").map(|s| s.as_str()), Some("forest")) ||
matches!(tags.get("leisure").map(|s| s.as_str()), Some("park")) ||
matches!(waterway, Some("river" | "riverbank"))
},
12 => {
matches!(highway, Some("motorway" | "trunk" | "primary" | "secondary" | "tertiary")) ||
matches!(place, Some("city" | "town" | "village")) ||
matches!(railway, Some("rail")) ||
tags.contains_key("building") ||
tags.contains_key("landuse") ||
tags.contains_key("leisure") ||
matches!(natural, Some("water" | "wood" | "scrub" | "wetland" | "heath" | "bay" | "strait")) ||
matches!(waterway, Some("river" | "riverbank" | "stream"))
},
_ => false
}
}
#[tokio::main]
async fn main() -> Result<()> {
// Load .env file if present
dotenv::dotenv().ok();
// Connect to ScyllaDB
let uri = std::env::var("SCYLLA_URI").unwrap_or_else(|_| "127.0.0.1:9042".to_string());
println!("Connecting to ScyllaDB at {}...", uri);
let session = loop {
match SessionBuilder::new().known_node(&uri).build().await {
Ok(session) => break session,
Err(e) => {
println!("Failed to connect to ScyllaDB: {}. Retrying in 5 seconds...", e);
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
}
}
};
let session = std::sync::Arc::new(session);
// Ensure schema exists
session.query("CREATE KEYSPACE IF NOT EXISTS map_data WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }", &[]).await?;
// Create tables
session.query("CREATE TABLE IF NOT EXISTS map_data.nodes (zoom int, tile_x int, tile_y int, id bigint, lat double, lon double, tags map<text, text>, PRIMARY KEY ((zoom, tile_x, tile_y), id))", &[]).await?;
session.query("CREATE TABLE IF NOT EXISTS map_data.ways (zoom int, tile_x int, tile_y int, id bigint, tags map<text, text>, points blob, PRIMARY KEY ((zoom, tile_x, tile_y), id))", &[]).await?;
session.query("CREATE TABLE IF NOT EXISTS map_data.buildings (zoom int, tile_x int, tile_y int, id bigint, tags map<text, text>, points blob, PRIMARY KEY ((zoom, tile_x, tile_y), id))", &[]).await?;
session.query("CREATE TABLE IF NOT EXISTS map_data.water (zoom int, tile_x int, tile_y int, id bigint, tags map<text, text>, points blob, PRIMARY KEY ((zoom, tile_x, tile_y), id))", &[]).await?;
session.query("CREATE TABLE IF NOT EXISTS map_data.landuse (zoom int, tile_x int, tile_y int, id bigint, tags map<text, text>, points blob, PRIMARY KEY ((zoom, tile_x, tile_y), id))", &[]).await?;
session.query("CREATE TABLE IF NOT EXISTS map_data.railways (zoom int, tile_x int, tile_y int, id bigint, tags map<text, text>, points blob, PRIMARY KEY ((zoom, tile_x, tile_y), id))", &[]).await?;
// Prepare statements
println!("Preparing statements...");
let insert_node = session.prepare("INSERT INTO map_data.nodes (zoom, tile_x, tile_y, id, lat, lon, tags) VALUES (?, ?, ?, ?, ?, ?, ?)").await?;
let insert_ways = session.prepare("INSERT INTO map_data.ways (zoom, tile_x, tile_y, id, tags, points) VALUES (?, ?, ?, ?, ?, ?)").await?;
let insert_buildings = session.prepare("INSERT INTO map_data.buildings (zoom, tile_x, tile_y, id, tags, points) VALUES (?, ?, ?, ?, ?, ?)").await?;
let insert_water = session.prepare("INSERT INTO map_data.water (zoom, tile_x, tile_y, id, tags, points) VALUES (?, ?, ?, ?, ?, ?)").await?;
let insert_landuse = session.prepare("INSERT INTO map_data.landuse (zoom, tile_x, tile_y, id, tags, points) VALUES (?, ?, ?, ?, ?, ?)").await?;
let insert_railways = session.prepare("INSERT INTO map_data.railways (zoom, tile_x, tile_y, id, tags, points) VALUES (?, ?, ?, ?, ?, ?)").await?;
println!("Statements prepared.");
let path = std::env::var("OSM_PBF_PATH")
.or_else(|_| std::env::var("HOST_PBF_PATH"))
.unwrap_or_else(|_| "europe-latest.osm.pbf".to_string());
println!("Reading {}...", path);
let reader = ElementReader::from_path(path)?;
// Cache for node coordinates: ID -> (lat, lon)
// Use flat file with mmap
let cache_dir = std::env::var("CACHE_DIR").unwrap_or_else(|_| ".".to_string());
let cache_path = std::path::Path::new(&cache_dir).join("node_cache.bin");
println!("Using node cache at {:?}", cache_path);
let mut node_store = NodeStore::new(cache_path.clone())?;
// Channel for backpressure
// Producer (reader) -> Consumer (writer)
enum DbTask {
Node { zoom: i32, id: i64, lat: f64, lon: f64, tags: HashMap<String, String>, x: i32, y: i32 },
Way { zoom: i32, table: &'static str, id: i64, tags: HashMap<String, String>, points: Vec<u8>, x: i32, y: i32 },
}
let (tx, mut rx) = tokio::sync::mpsc::channel::<DbTask>(10_000);
let session_clone = session.clone();
let consumer_handle = tokio::spawn(async move {
let mut join_set = JoinSet::new();
let mut inserted_count = 0;
let max_concurrent = std::env::var("CONCURRENT_INSERTS")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(1024); // Default to 1024 concurrent inserts
println!("Starting consumer with max_concurrent={}", max_concurrent);
while let Some(task) = rx.recv().await {
let session = session_clone.clone();
// Backpressure: limit concurrent inserts
while join_set.len() >= max_concurrent {
join_set.join_next().await;
}
match task {
DbTask::Node { zoom, id, lat, lon, tags, x, y } => {
let statement = insert_node.clone();
join_set.spawn(async move {
let _ = session.execute(
&statement,
(zoom, x, y, id, lat, lon, tags),
).await;
});
}
DbTask::Way { zoom, table, id, tags, points, x, y } => {
let statement = match table {
"ways" => insert_ways.clone(),
"buildings" => insert_buildings.clone(),
"water" => insert_water.clone(),
"landuse" => insert_landuse.clone(),
"railways" => insert_railways.clone(),
_ => panic!("Unknown table: {}", table),
};
join_set.spawn(async move {
let _ = session.execute(
&statement,
(zoom, x, y, id, tags, points),
).await;
});
}
}
inserted_count += 1;
}
// Wait for remaining tasks
while let Some(_) = join_set.join_next().await {}
println!("Consumer finished. Total inserted tasks: {}", inserted_count);
});
// Run the PBF reader in a blocking task to allow blocking_send
let tx_clone = tx.clone();
let reader_handle = tokio::task::spawn_blocking(move || -> Result<(usize, usize)> {
let tx = tx_clone;
let mut node_count = 0;
let mut way_count = 0;
let mut ways_pending = false;
// We process sequentially: Nodes first, then Ways.
// osmpbf yields nodes then ways.
// We need to detect when we switch from nodes to ways to prepare the store.
reader.for_each(|element| {
match element {
Element::Node(node) => {
node_count += 1;
let _ = node_store.insert(node.id(), node.lat(), node.lon());
if node.tags().count() > 0 {
let id = node.id();
let lat = node.lat();
let lon = node.lon();
let tags: HashMap<String, String> = node.tags().map(|(k, v)| (k.to_string(), v.to_string())).collect();
for &zoom in &ZOOM_LEVELS {
if should_include(&tags, zoom) {
let (x, y) = lat_lon_to_tile(lat, lon, zoom);
let task = DbTask::Node { zoom: zoom as i32, id, lat, lon, tags: tags.clone(), x, y };
let _ = tx.blocking_send(task);
}
}
}
}
Element::DenseNode(node) => {
node_count += 1;
let _ = node_store.insert(node.id(), node.lat(), node.lon());
if node.tags().count() > 0 {
let id = node.id();
let lat = node.lat();
let lon = node.lon();
let tags: HashMap<String, String> = node.tags().map(|(k, v)| (k.to_string(), v.to_string())).collect();
for &zoom in &ZOOM_LEVELS {
if should_include(&tags, zoom) {
let (x, y) = lat_lon_to_tile(lat, lon, zoom);
let task = DbTask::Node { zoom: zoom as i32, id, lat, lon, tags: tags.clone(), x, y };
let _ = tx.blocking_send(task);
}
}
}
}
Element::Way(way) => {
if !ways_pending {
// First way encountered. Prepare store for reading.
println!("Switching to Way processing. Flushing node cache...");
if let Err(e) = node_store.prepare_for_reading() {
eprintln!("Failed to prepare node store: {}", e);
return;
}
ways_pending = true;
}
way_count += 1;
let tags: HashMap<String, String> = way.tags().map(|(k, v)| (k.to_string(), v.to_string())).collect();
// Filter for highways/roads OR buildings OR landuse OR water OR railways
let is_highway = tags.contains_key("highway");
let is_building = tags.contains_key("building");
let is_water = tags.get("natural").map(|v| v == "water" || v == "wetland" || v == "bay" || v == "strait").unwrap_or(false) ||
tags.get("place").map(|v| v == "sea" || v == "ocean").unwrap_or(false) ||
tags.get("waterway").map(|v| v == "riverbank" || v == "stream" || v == "river").unwrap_or(false) ||
tags.get("landuse").map(|v| v == "basin" || v == "reservoir").unwrap_or(false);
let is_landuse = tags.contains_key("leisure") ||
tags.contains_key("landuse") ||
tags.get("natural").map(|v| v == "wood" || v == "scrub" || v == "heath" || v == "wetland").unwrap_or(false);
let is_railway = tags.contains_key("railway");
if is_highway || is_building || is_water || is_landuse || is_railway {
let mut points = Vec::new();
// Resolve nodes from store
for node_id in way.refs() {
if let Some((lat, lon)) = node_store.get(node_id) {
points.push((lat, lon));
}
}
if points.len() >= 2 {
let id = way.id();
// Insert into the tile of the first point
let (first_lat, first_lon) = points[0];
for &zoom in &ZOOM_LEVELS {
if !should_include(&tags, zoom) { continue; }
// Apply simplification based on zoom level
let base_epsilon = match zoom {
2 => 0.0001,
4 => 0.00005,
6 => 0.00002,
9 => 0.00001,
12 => 0.000005,
_ => 0.0,
};
let epsilon = if is_water || is_landuse || is_highway {
if zoom <= 4 && is_landuse {
0.0 // Disable simplification for landuse at low zoom to prevent disappearing polygons
} else {
base_epsilon * 0.5 // Preserve more detail for natural features AND roads
}
} else {
base_epsilon
};
let simplified_points = if epsilon > 0.0 {
simplify_points(&points, epsilon)
} else {
points.clone()
};
// Serialize points
let mut final_points = simplified_points.clone();
// For highways and railways, we DON'T triangulate - they're line data
// Create the highway/railway blob BEFORE triangulation
let mut line_blob = Vec::with_capacity(simplified_points.len() * 8);
for (lat, lon) in &simplified_points {
line_blob.extend_from_slice(&(*lat as f32).to_le_bytes());
line_blob.extend_from_slice(&(*lon as f32).to_le_bytes());
}
// Triangulate for polygon types (buildings, water, landuse)
if is_building || is_water || is_landuse {
// Close the loop if not closed
if final_points.first() != final_points.last() {
final_points.push(final_points[0]);
}
final_points = triangulate_polygon(&final_points);
}
if final_points.len() < 3 && (is_building || is_water || is_landuse) { continue; }
if simplified_points.len() < 2 && (is_highway || is_railway) { continue; }
let (first_lat, first_lon) = simplified_points[0];
let (x, y) = lat_lon_to_tile(first_lat, first_lon, zoom);
let zoom_i32 = zoom as i32;
// Create polygon blob from triangulated points
let mut polygon_blob = Vec::with_capacity(final_points.len() * 8);
for (lat, lon) in &final_points {
polygon_blob.extend_from_slice(&(*lat as f32).to_le_bytes());
polygon_blob.extend_from_slice(&(*lon as f32).to_le_bytes());
}
// Use line_blob for highways/railways, polygon_blob for others
if is_highway {
let task = DbTask::Way { zoom: zoom_i32, table: "ways", id, tags: tags.clone(), points: line_blob.clone(), x, y };
let _ = tx.blocking_send(task);
}
if is_building {
let task = DbTask::Way { zoom: zoom_i32, table: "buildings", id, tags: tags.clone(), points: polygon_blob.clone(), x, y };
let _ = tx.blocking_send(task);
}
if is_water {
let task = DbTask::Way { zoom: zoom_i32, table: "water", id, tags: tags.clone(), points: polygon_blob.clone(), x, y };
let _ = tx.blocking_send(task);
}
if is_landuse {
let task = DbTask::Way { zoom: zoom_i32, table: "landuse", id, tags: tags.clone(), points: polygon_blob.clone(), x, y };
let _ = tx.blocking_send(task);
}
if is_railway {
let task = DbTask::Way { zoom: zoom_i32, table: "railways", id, tags: tags.clone(), points: line_blob.clone(), x, y };
let _ = tx.blocking_send(task);
}
}
}
}
}
_ => {}
}
if (node_count + way_count) % 100_000 == 0 {
println!("Processed {} nodes, {} ways...", node_count, way_count);
}
})?;
Ok((node_count, way_count))
});
let (node_count, way_count) = reader_handle.await??;
println!("Finished reading PBF. Nodes: {}, Ways: {}. Waiting for consumer...", node_count, way_count);
// Drop sender to signal consumer to finish
drop(tx);
// Wait for consumer
consumer_handle.await?;
// Clean up cache
let _ = std::fs::remove_file(cache_path);
println!("Done!");
Ok(())
}
fn lat_lon_to_tile(lat: f64, lon: f64, zoom: u32) -> (i32, i32) {
let n = 2.0f64.powi(zoom as i32);
let x = (lon + 180.0) / 360.0 * n;
let lat_rad = lat.to_radians();
let y = (1.0 - (lat_rad.tan() + (1.0 / lat_rad.cos())).ln() / std::f64::consts::PI) / 2.0 * n;
(x as i32, y as i32)
}