first commit

This commit is contained in:
2025-11-25 16:58:24 +01:00
commit f5f5f10338
17 changed files with 5389 additions and 0 deletions

164
importer/src/main.rs Normal file
View File

@@ -0,0 +1,164 @@
use anyhow::Result;
use osmpbf::{Element, ElementReader};
use scylla::{Session, SessionBuilder};
use std::collections::HashMap;
use tokio::task::JoinSet;
#[tokio::main]
async fn main() -> Result<()> {
// Connect to ScyllaDB
let uri = std::env::var("SCYLLA_URI").unwrap_or_else(|_| "127.0.0.1:9042".to_string());
println!("Connecting to ScyllaDB at {}...", uri);
let session = SessionBuilder::new().known_node(uri).build().await?;
let session = std::sync::Arc::new(session);
// Ensure schema exists
session.query("CREATE KEYSPACE IF NOT EXISTS map_data WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }", &[]).await?;
// Create tables
session.query("CREATE TABLE IF NOT EXISTS map_data.nodes (zoom int, tile_x int, tile_y int, id bigint, lat double, lon double, tags map<text, text>, PRIMARY KEY ((zoom, tile_x, tile_y), id))", &[]).await?;
session.query("CREATE TABLE IF NOT EXISTS map_data.ways (zoom int, tile_x int, tile_y int, id bigint, tags map<text, text>, points blob, PRIMARY KEY ((zoom, tile_x, tile_y), id))", &[]).await?;
let path = "sample.osm.pbf";
println!("Reading {}...", path);
let reader = ElementReader::from_path(path)?;
// Cache for node coordinates: ID -> (lat, lon)
let mut node_cache = HashMap::<i64, (f64, f64)>::new();
let mut join_set = JoinSet::new();
let mut node_count = 0;
let mut way_count = 0;
let mut inserted_nodes = 0;
let mut inserted_ways = 0;
// We process sequentially: Nodes first, then Ways.
reader.for_each(|element| {
match element {
Element::Node(node) => {
node_count += 1;
node_cache.insert(node.id(), (node.lat(), node.lon()));
if node.tags().count() > 0 {
inserted_nodes += 1;
let session = session.clone();
let id = node.id();
let lat = node.lat();
let lon = node.lon();
let tags: HashMap<String, String> = node.tags().map(|(k, v)| (k.to_string(), v.to_string())).collect();
let (x, y) = lat_lon_to_tile(lat, lon, 10);
join_set.spawn(async move {
let _ = session.query(
"INSERT INTO map_data.nodes (zoom, tile_x, tile_y, id, lat, lon, tags) VALUES (?, ?, ?, ?, ?, ?, ?)",
(10, x, y, id, lat, lon, tags),
).await;
});
}
}
Element::DenseNode(node) => {
node_count += 1;
node_cache.insert(node.id(), (node.lat(), node.lon()));
if node.tags().count() > 0 {
inserted_nodes += 1;
let session = session.clone();
let id = node.id();
let lat = node.lat();
let lon = node.lon();
let tags: HashMap<String, String> = node.tags().map(|(k, v)| (k.to_string(), v.to_string())).collect();
let (x, y) = lat_lon_to_tile(lat, lon, 10);
join_set.spawn(async move {
let _ = session.query(
"INSERT INTO map_data.nodes (zoom, tile_x, tile_y, id, lat, lon, tags) VALUES (?, ?, ?, ?, ?, ?, ?)",
(10, x, y, id, lat, lon, tags),
).await;
});
}
}
Element::Way(way) => {
way_count += 1;
let tags: HashMap<String, String> = way.tags().map(|(k, v)| (k.to_string(), v.to_string())).collect();
// Filter for highways/roads OR buildings
let is_highway = tags.contains_key("highway");
let is_building = tags.contains_key("building");
if is_highway || is_building {
let mut points = Vec::new();
// Resolve nodes
for node_id in way.refs() {
if let Some(&coords) = node_cache.get(&node_id) {
points.push(coords);
}
}
if points.len() >= 2 {
let session = session.clone();
let id = way.id();
// Insert into the tile of the first point
let (first_lat, first_lon) = points[0];
let (x, y) = lat_lon_to_tile(first_lat, first_lon, 10);
// Serialize points to blob (f64, f64) pairs
let mut blob = Vec::with_capacity(points.len() * 16);
for (lat, lon) in points {
blob.extend_from_slice(&lat.to_be_bytes());
blob.extend_from_slice(&lon.to_be_bytes());
}
if is_highway {
inserted_ways += 1;
let tags_clone = tags.clone();
let blob_clone = blob.clone();
let session = session.clone();
join_set.spawn(async move {
let _ = session.query(
"INSERT INTO map_data.ways (zoom, tile_x, tile_y, id, tags, points) VALUES (?, ?, ?, ?, ?, ?)",
(10, x, y, id, tags_clone, blob_clone),
).await;
});
}
if is_building {
// inserted_buildings += 1; // Need to add this counter
let session = session.clone();
join_set.spawn(async move {
let _ = session.query(
"INSERT INTO map_data.buildings (zoom, tile_x, tile_y, id, tags, points) VALUES (?, ?, ?, ?, ?, ?)",
(10, x, y, id, tags, blob),
).await;
});
}
}
}
}
_ => {}
}
if (node_count + way_count) % 100_000 == 0 {
println!("Processed {} nodes, {} ways...", node_count, way_count);
}
})?;
println!("Finished processing. Nodes: {}, Ways: {}. Inserted Nodes: {}, Inserted Ways: {}", node_count, way_count, inserted_nodes, inserted_ways);
println!("Waiting for pending inserts...");
while let Some(_) = join_set.join_next().await {}
println!("Done!");
Ok(())
}
fn lat_lon_to_tile(lat: f64, lon: f64, zoom: u32) -> (i32, i32) {
let n = 2.0f64.powi(zoom as i32);
let x = (lon + 180.0) / 360.0 * n;
let lat_rad = lat.to_radians();
let y = (1.0 - (lat_rad.tan() + (1.0 / lat_rad.cos())).ln() / std::f64::consts::PI) / 2.0 * n;
(x as i32, y as i32)
}