diff --git a/Cargo.lock b/Cargo.lock index 9887712..3f37ef5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1292,6 +1292,7 @@ name = "importer" version = "0.1.0" dependencies = [ "anyhow", + "memmap2 0.9.9", "osmpbf", "scylla", "tokio", @@ -1502,6 +1503,15 @@ dependencies = [ "libc", ] +[[package]] +name = "memmap2" +version = "0.9.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "744133e4a0e0a658e1374cf3bf8e415c4052a15a111acd372764c55b4177d490" +dependencies = [ + "libc", +] + [[package]] name = "metal" version = "0.27.0" @@ -1790,7 +1800,7 @@ checksum = "5d689d6e9f254bbd63893ce00a27147e41fe94bf3abca70f85b5345afb3cb728" dependencies = [ "byteorder", "flate2", - "memmap2", + "memmap2 0.5.10", "protobuf", "protobuf-codegen", "rayon", diff --git a/docker-compose.yml b/docker-compose.yml index 13197df..03d5682 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -30,9 +30,11 @@ services: container_name: map-importer volumes: - ./oberbayern-251125.osm.pbf:/app/data.osm.pbf + - importer_cache:/cache environment: - SCYLLA_URI=scylla:9042 - OSM_PBF_PATH=/app/data.osm.pbf + - CACHE_DIR=/cache depends_on: - scylla profiles: @@ -40,3 +42,4 @@ services: volumes: scylla_data: + importer_cache: diff --git a/importer/Cargo.toml b/importer/Cargo.toml index 5fc4866..a6f53f3 100644 --- a/importer/Cargo.toml +++ b/importer/Cargo.toml @@ -8,4 +8,4 @@ osmpbf = "0.3" # Pure Rust PBF parser, easier to build than osmium (C++ bindings scylla = "0.12" tokio = { version = "1.0", features = ["full"] } anyhow = "1.0" -sled = "0.34" +memmap2 = "0.9" diff --git a/importer/src/main.rs b/importer/src/main.rs index 234b712..c0f7470 100644 --- a/importer/src/main.rs +++ b/importer/src/main.rs @@ -1,12 +1,87 @@ use anyhow::Result; use osmpbf::{Element, ElementReader}; -use scylla::{Session, SessionBuilder}; +use scylla::SessionBuilder; use std::collections::HashMap; use tokio::task::JoinSet; - +use std::fs::{File, OpenOptions}; +use std::io::{BufWriter, Write, Seek, SeekFrom}; +use std::path::{Path, PathBuf}; +use memmap2::Mmap; const ZOOM_LEVELS: [u32; 4] = [6, 9, 12, 14]; +struct NodeStore { + writer: Option>, + mmap: Option, + path: PathBuf, + last_id: i64, +} + +impl NodeStore { + fn new>(path: P) -> Result { + let path = path.as_ref().to_path_buf(); + let file = OpenOptions::new() + .read(true) + .write(true) + .create(true) + .truncate(true) + .open(&path)?; + + let writer = BufWriter::with_capacity(10 * 1024 * 1024, file); // 10MB buffer + + Ok(Self { + writer: Some(writer), + mmap: None, + path, + last_id: -1, + }) + } + + fn insert(&mut self, id: i64, lat: f64, lon: f64) -> Result<()> { + if let Some(writer) = &mut self.writer { + if id > self.last_id + 1 { + let gap = id - self.last_id - 1; + writer.seek(SeekFrom::Current(gap * 8))?; + } else if id <= self.last_id { + writer.seek(SeekFrom::Start(id as u64 * 8))?; + } + + let lat_i32 = (lat * 1e7) as i32; + let lon_i32 = (lon * 1e7) as i32; + writer.write_all(&lat_i32.to_le_bytes())?; + writer.write_all(&lon_i32.to_le_bytes())?; + + self.last_id = id; + } + Ok(()) + } + + fn prepare_for_reading(&mut self) -> Result<()> { + self.writer = None; // Flush and close writer + + let file = File::open(&self.path)?; + let mmap = unsafe { Mmap::map(&file)? }; + self.mmap = Some(mmap); + Ok(()) + } + + fn get(&self, id: i64) -> Option<(f64, f64)> { + if let Some(mmap) = &self.mmap { + let offset = id as usize * 8; + if offset + 8 <= mmap.len() { + let chunk = &mmap[offset..offset+8]; + let lat_i32 = i32::from_le_bytes(chunk[0..4].try_into().unwrap()); + let lon_i32 = i32::from_le_bytes(chunk[4..8].try_into().unwrap()); + + if lat_i32 == 0 && lon_i32 == 0 { return None; } + + return Some((lat_i32 as f64 / 1e7, lon_i32 as f64 / 1e7)); + } + } + None + } +} + fn should_include(tags: &HashMap, zoom: u32) -> bool { if zoom >= 14 { return true; } @@ -80,11 +155,11 @@ async fn main() -> Result<()> { let reader = ElementReader::from_path(path)?; // Cache for node coordinates: ID -> (lat, lon) - // Use sled for disk-based caching to avoid OOM, limit cache to 512MB - let node_cache = sled::Config::new() - .path("node_cache") - .cache_capacity(512 * 1024 * 1024) - .open()?; + // Use flat file with mmap + let cache_dir = std::env::var("CACHE_DIR").unwrap_or_else(|_| ".".to_string()); + let cache_path = std::path::Path::new(&cache_dir).join("node_cache.bin"); + println!("Using node cache at {:?}", cache_path); + let mut node_store = NodeStore::new(cache_path.clone())?; // Channel for backpressure // Producer (reader) -> Consumer (writer) @@ -147,19 +222,17 @@ async fn main() -> Result<()> { let tx = tx_clone; let mut node_count = 0; let mut way_count = 0; + let mut ways_pending = false; // We process sequentially: Nodes first, then Ways. + // osmpbf yields nodes then ways. + // We need to detect when we switch from nodes to ways to prepare the store. + reader.for_each(|element| { match element { Element::Node(node) => { node_count += 1; - - // Store in sled: key=id (8 bytes), value=lat+lon (16 bytes) - let id_bytes = node.id().to_be_bytes(); - let mut coords = [0u8; 16]; - coords[0..8].copy_from_slice(&node.lat().to_be_bytes()); - coords[8..16].copy_from_slice(&node.lon().to_be_bytes()); - let _ = node_cache.insert(id_bytes, &coords); + let _ = node_store.insert(node.id(), node.lat(), node.lon()); if node.tags().count() > 0 { let id = node.id(); @@ -178,13 +251,7 @@ async fn main() -> Result<()> { } Element::DenseNode(node) => { node_count += 1; - - // Store in sled - let id_bytes = node.id().to_be_bytes(); - let mut coords = [0u8; 16]; - coords[0..8].copy_from_slice(&node.lat().to_be_bytes()); - coords[8..16].copy_from_slice(&node.lon().to_be_bytes()); - let _ = node_cache.insert(id_bytes, &coords); + let _ = node_store.insert(node.id(), node.lat(), node.lon()); if node.tags().count() > 0 { let id = node.id(); @@ -202,6 +269,16 @@ async fn main() -> Result<()> { } } Element::Way(way) => { + if !ways_pending { + // First way encountered. Prepare store for reading. + println!("Switching to Way processing. Flushing node cache..."); + if let Err(e) = node_store.prepare_for_reading() { + eprintln!("Failed to prepare node store: {}", e); + return; + } + ways_pending = true; + } + way_count += 1; let tags: HashMap = way.tags().map(|(k, v)| (k.to_string(), v.to_string())).collect(); @@ -219,12 +296,9 @@ async fn main() -> Result<()> { if is_highway || is_building || is_water || is_landuse || is_railway { let mut points = Vec::new(); - // Resolve nodes from sled + // Resolve nodes from store for node_id in way.refs() { - let id_bytes = node_id.to_be_bytes(); - if let Ok(Some(coords_bytes)) = node_cache.get(id_bytes) { - let lat = f64::from_be_bytes(coords_bytes[0..8].try_into().unwrap()); - let lon = f64::from_be_bytes(coords_bytes[8..16].try_into().unwrap()); + if let Some((lat, lon)) = node_store.get(node_id) { points.push((lat, lon)); } } @@ -298,7 +372,7 @@ async fn main() -> Result<()> { consumer_handle.await?; // Clean up cache - let _ = std::fs::remove_dir_all("node_cache"); + let _ = std::fs::remove_file(cache_path); println!("Done!"); Ok(())