This commit is contained in:
2025-11-27 12:15:40 +01:00
parent 0d73796900
commit 1886c8c010
4 changed files with 116 additions and 29 deletions

12
Cargo.lock generated
View File

@@ -1292,6 +1292,7 @@ name = "importer"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"memmap2 0.9.9",
"osmpbf", "osmpbf",
"scylla", "scylla",
"tokio", "tokio",
@@ -1502,6 +1503,15 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "memmap2"
version = "0.9.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "744133e4a0e0a658e1374cf3bf8e415c4052a15a111acd372764c55b4177d490"
dependencies = [
"libc",
]
[[package]] [[package]]
name = "metal" name = "metal"
version = "0.27.0" version = "0.27.0"
@@ -1790,7 +1800,7 @@ checksum = "5d689d6e9f254bbd63893ce00a27147e41fe94bf3abca70f85b5345afb3cb728"
dependencies = [ dependencies = [
"byteorder", "byteorder",
"flate2", "flate2",
"memmap2", "memmap2 0.5.10",
"protobuf", "protobuf",
"protobuf-codegen", "protobuf-codegen",
"rayon", "rayon",

View File

@@ -30,9 +30,11 @@ services:
container_name: map-importer container_name: map-importer
volumes: volumes:
- ./oberbayern-251125.osm.pbf:/app/data.osm.pbf - ./oberbayern-251125.osm.pbf:/app/data.osm.pbf
- importer_cache:/cache
environment: environment:
- SCYLLA_URI=scylla:9042 - SCYLLA_URI=scylla:9042
- OSM_PBF_PATH=/app/data.osm.pbf - OSM_PBF_PATH=/app/data.osm.pbf
- CACHE_DIR=/cache
depends_on: depends_on:
- scylla - scylla
profiles: profiles:
@@ -40,3 +42,4 @@ services:
volumes: volumes:
scylla_data: scylla_data:
importer_cache:

View File

@@ -8,4 +8,4 @@ osmpbf = "0.3" # Pure Rust PBF parser, easier to build than osmium (C++ bindings
scylla = "0.12" scylla = "0.12"
tokio = { version = "1.0", features = ["full"] } tokio = { version = "1.0", features = ["full"] }
anyhow = "1.0" anyhow = "1.0"
sled = "0.34" memmap2 = "0.9"

View File

@@ -1,12 +1,87 @@
use anyhow::Result; use anyhow::Result;
use osmpbf::{Element, ElementReader}; use osmpbf::{Element, ElementReader};
use scylla::{Session, SessionBuilder}; use scylla::SessionBuilder;
use std::collections::HashMap; use std::collections::HashMap;
use tokio::task::JoinSet; use tokio::task::JoinSet;
use std::fs::{File, OpenOptions};
use std::io::{BufWriter, Write, Seek, SeekFrom};
use std::path::{Path, PathBuf};
use memmap2::Mmap;
const ZOOM_LEVELS: [u32; 4] = [6, 9, 12, 14]; const ZOOM_LEVELS: [u32; 4] = [6, 9, 12, 14];
struct NodeStore {
writer: Option<BufWriter<File>>,
mmap: Option<Mmap>,
path: PathBuf,
last_id: i64,
}
impl NodeStore {
fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
let path = path.as_ref().to_path_buf();
let file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(true)
.open(&path)?;
let writer = BufWriter::with_capacity(10 * 1024 * 1024, file); // 10MB buffer
Ok(Self {
writer: Some(writer),
mmap: None,
path,
last_id: -1,
})
}
fn insert(&mut self, id: i64, lat: f64, lon: f64) -> Result<()> {
if let Some(writer) = &mut self.writer {
if id > self.last_id + 1 {
let gap = id - self.last_id - 1;
writer.seek(SeekFrom::Current(gap * 8))?;
} else if id <= self.last_id {
writer.seek(SeekFrom::Start(id as u64 * 8))?;
}
let lat_i32 = (lat * 1e7) as i32;
let lon_i32 = (lon * 1e7) as i32;
writer.write_all(&lat_i32.to_le_bytes())?;
writer.write_all(&lon_i32.to_le_bytes())?;
self.last_id = id;
}
Ok(())
}
fn prepare_for_reading(&mut self) -> Result<()> {
self.writer = None; // Flush and close writer
let file = File::open(&self.path)?;
let mmap = unsafe { Mmap::map(&file)? };
self.mmap = Some(mmap);
Ok(())
}
fn get(&self, id: i64) -> Option<(f64, f64)> {
if let Some(mmap) = &self.mmap {
let offset = id as usize * 8;
if offset + 8 <= mmap.len() {
let chunk = &mmap[offset..offset+8];
let lat_i32 = i32::from_le_bytes(chunk[0..4].try_into().unwrap());
let lon_i32 = i32::from_le_bytes(chunk[4..8].try_into().unwrap());
if lat_i32 == 0 && lon_i32 == 0 { return None; }
return Some((lat_i32 as f64 / 1e7, lon_i32 as f64 / 1e7));
}
}
None
}
}
fn should_include(tags: &HashMap<String, String>, zoom: u32) -> bool { fn should_include(tags: &HashMap<String, String>, zoom: u32) -> bool {
if zoom >= 14 { return true; } if zoom >= 14 { return true; }
@@ -80,11 +155,11 @@ async fn main() -> Result<()> {
let reader = ElementReader::from_path(path)?; let reader = ElementReader::from_path(path)?;
// Cache for node coordinates: ID -> (lat, lon) // Cache for node coordinates: ID -> (lat, lon)
// Use sled for disk-based caching to avoid OOM, limit cache to 512MB // Use flat file with mmap
let node_cache = sled::Config::new() let cache_dir = std::env::var("CACHE_DIR").unwrap_or_else(|_| ".".to_string());
.path("node_cache") let cache_path = std::path::Path::new(&cache_dir).join("node_cache.bin");
.cache_capacity(512 * 1024 * 1024) println!("Using node cache at {:?}", cache_path);
.open()?; let mut node_store = NodeStore::new(cache_path.clone())?;
// Channel for backpressure // Channel for backpressure
// Producer (reader) -> Consumer (writer) // Producer (reader) -> Consumer (writer)
@@ -147,19 +222,17 @@ async fn main() -> Result<()> {
let tx = tx_clone; let tx = tx_clone;
let mut node_count = 0; let mut node_count = 0;
let mut way_count = 0; let mut way_count = 0;
let mut ways_pending = false;
// We process sequentially: Nodes first, then Ways. // We process sequentially: Nodes first, then Ways.
// osmpbf yields nodes then ways.
// We need to detect when we switch from nodes to ways to prepare the store.
reader.for_each(|element| { reader.for_each(|element| {
match element { match element {
Element::Node(node) => { Element::Node(node) => {
node_count += 1; node_count += 1;
let _ = node_store.insert(node.id(), node.lat(), node.lon());
// Store in sled: key=id (8 bytes), value=lat+lon (16 bytes)
let id_bytes = node.id().to_be_bytes();
let mut coords = [0u8; 16];
coords[0..8].copy_from_slice(&node.lat().to_be_bytes());
coords[8..16].copy_from_slice(&node.lon().to_be_bytes());
let _ = node_cache.insert(id_bytes, &coords);
if node.tags().count() > 0 { if node.tags().count() > 0 {
let id = node.id(); let id = node.id();
@@ -178,13 +251,7 @@ async fn main() -> Result<()> {
} }
Element::DenseNode(node) => { Element::DenseNode(node) => {
node_count += 1; node_count += 1;
let _ = node_store.insert(node.id(), node.lat(), node.lon());
// Store in sled
let id_bytes = node.id().to_be_bytes();
let mut coords = [0u8; 16];
coords[0..8].copy_from_slice(&node.lat().to_be_bytes());
coords[8..16].copy_from_slice(&node.lon().to_be_bytes());
let _ = node_cache.insert(id_bytes, &coords);
if node.tags().count() > 0 { if node.tags().count() > 0 {
let id = node.id(); let id = node.id();
@@ -202,6 +269,16 @@ async fn main() -> Result<()> {
} }
} }
Element::Way(way) => { Element::Way(way) => {
if !ways_pending {
// First way encountered. Prepare store for reading.
println!("Switching to Way processing. Flushing node cache...");
if let Err(e) = node_store.prepare_for_reading() {
eprintln!("Failed to prepare node store: {}", e);
return;
}
ways_pending = true;
}
way_count += 1; way_count += 1;
let tags: HashMap<String, String> = way.tags().map(|(k, v)| (k.to_string(), v.to_string())).collect(); let tags: HashMap<String, String> = way.tags().map(|(k, v)| (k.to_string(), v.to_string())).collect();
@@ -219,12 +296,9 @@ async fn main() -> Result<()> {
if is_highway || is_building || is_water || is_landuse || is_railway { if is_highway || is_building || is_water || is_landuse || is_railway {
let mut points = Vec::new(); let mut points = Vec::new();
// Resolve nodes from sled // Resolve nodes from store
for node_id in way.refs() { for node_id in way.refs() {
let id_bytes = node_id.to_be_bytes(); if let Some((lat, lon)) = node_store.get(node_id) {
if let Ok(Some(coords_bytes)) = node_cache.get(id_bytes) {
let lat = f64::from_be_bytes(coords_bytes[0..8].try_into().unwrap());
let lon = f64::from_be_bytes(coords_bytes[8..16].try_into().unwrap());
points.push((lat, lon)); points.push((lat, lon));
} }
} }
@@ -298,7 +372,7 @@ async fn main() -> Result<()> {
consumer_handle.await?; consumer_handle.await?;
// Clean up cache // Clean up cache
let _ = std::fs::remove_dir_all("node_cache"); let _ = std::fs::remove_file(cache_path);
println!("Done!"); println!("Done!");
Ok(()) Ok(())