update
This commit is contained in:
@@ -9,7 +9,16 @@ async fn main() -> Result<()> {
|
||||
// Connect to ScyllaDB
|
||||
let uri = std::env::var("SCYLLA_URI").unwrap_or_else(|_| "127.0.0.1:9042".to_string());
|
||||
println!("Connecting to ScyllaDB at {}...", uri);
|
||||
let session = SessionBuilder::new().known_node(uri).build().await?;
|
||||
|
||||
let session = loop {
|
||||
match SessionBuilder::new().known_node(&uri).build().await {
|
||||
Ok(session) => break session,
|
||||
Err(e) => {
|
||||
println!("Failed to connect to ScyllaDB: {}. Retrying in 5 seconds...", e);
|
||||
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
|
||||
}
|
||||
}
|
||||
};
|
||||
let session = std::sync::Arc::new(session);
|
||||
|
||||
// Ensure schema exists
|
||||
@@ -18,40 +27,52 @@ async fn main() -> Result<()> {
|
||||
// Create tables
|
||||
session.query("CREATE TABLE IF NOT EXISTS map_data.nodes (zoom int, tile_x int, tile_y int, id bigint, lat double, lon double, tags map<text, text>, PRIMARY KEY ((zoom, tile_x, tile_y), id))", &[]).await?;
|
||||
session.query("CREATE TABLE IF NOT EXISTS map_data.ways (zoom int, tile_x int, tile_y int, id bigint, tags map<text, text>, points blob, PRIMARY KEY ((zoom, tile_x, tile_y), id))", &[]).await?;
|
||||
session.query("CREATE TABLE IF NOT EXISTS map_data.buildings (zoom int, tile_x int, tile_y int, id bigint, tags map<text, text>, points blob, PRIMARY KEY ((zoom, tile_x, tile_y), id))", &[]).await?;
|
||||
session.query("CREATE TABLE IF NOT EXISTS map_data.water (zoom int, tile_x int, tile_y int, id bigint, tags map<text, text>, points blob, PRIMARY KEY ((zoom, tile_x, tile_y), id))", &[]).await?;
|
||||
session.query("CREATE TABLE IF NOT EXISTS map_data.landuse (zoom int, tile_x int, tile_y int, id bigint, tags map<text, text>, points blob, PRIMARY KEY ((zoom, tile_x, tile_y), id))", &[]).await?;
|
||||
session.query("CREATE TABLE IF NOT EXISTS map_data.railways (zoom int, tile_x int, tile_y int, id bigint, tags map<text, text>, points blob, PRIMARY KEY ((zoom, tile_x, tile_y), id))", &[]).await?;
|
||||
|
||||
let path = "sample.osm.pbf";
|
||||
let path = std::env::var("OSM_PBF_PATH").unwrap_or_else(|_| "europe-latest.osm.pbf".to_string());
|
||||
println!("Reading {}...", path);
|
||||
let reader = ElementReader::from_path(path)?;
|
||||
|
||||
// Cache for node coordinates: ID -> (lat, lon)
|
||||
let mut node_cache = HashMap::<i64, (f64, f64)>::new();
|
||||
// Use sled for disk-based caching to avoid OOM, limit cache to 512MB
|
||||
let node_cache = sled::Config::new()
|
||||
.path("node_cache")
|
||||
.cache_capacity(512 * 1024 * 1024)
|
||||
.open()?;
|
||||
|
||||
let mut join_set = JoinSet::new();
|
||||
let mut node_count = 0;
|
||||
let mut way_count = 0;
|
||||
let mut inserted_nodes = 0;
|
||||
let mut inserted_ways = 0;
|
||||
let mut inserted_buildings = 0;
|
||||
let mut inserted_water = 0;
|
||||
let mut inserted_landuse = 0;
|
||||
// Channel for backpressure
|
||||
// Producer (reader) -> Consumer (writer)
|
||||
enum DbTask {
|
||||
Node { id: i64, lat: f64, lon: f64, tags: HashMap<String, String>, x: i32, y: i32 },
|
||||
Way { table: &'static str, id: i64, tags: HashMap<String, String>, points: Vec<u8>, x: i32, y: i32 },
|
||||
}
|
||||
|
||||
// We process sequentially: Nodes first, then Ways.
|
||||
reader.for_each(|element| {
|
||||
match element {
|
||||
Element::Node(node) => {
|
||||
node_count += 1;
|
||||
node_cache.insert(node.id(), (node.lat(), node.lon()));
|
||||
let (tx, mut rx) = tokio::sync::mpsc::channel::<DbTask>(10_000);
|
||||
|
||||
if node.tags().count() > 0 {
|
||||
inserted_nodes += 1;
|
||||
let session = session.clone();
|
||||
let id = node.id();
|
||||
let lat = node.lat();
|
||||
let lon = node.lon();
|
||||
let tags: HashMap<String, String> = node.tags().map(|(k, v)| (k.to_string(), v.to_string())).collect();
|
||||
|
||||
let (x, y) = lat_lon_to_tile(lat, lon, 10);
|
||||
let session_clone = session.clone();
|
||||
let consumer_handle = tokio::spawn(async move {
|
||||
let mut join_set = JoinSet::new();
|
||||
let mut inserted_count = 0;
|
||||
let max_concurrent = std::env::var("CONCURRENT_INSERTS")
|
||||
.ok()
|
||||
.and_then(|s| s.parse().ok())
|
||||
.unwrap_or(128); // Default to 128 concurrent inserts
|
||||
|
||||
println!("Starting consumer with max_concurrent={}", max_concurrent);
|
||||
|
||||
while let Some(task) = rx.recv().await {
|
||||
let session = session_clone.clone();
|
||||
|
||||
// Backpressure: limit concurrent inserts
|
||||
while join_set.len() >= max_concurrent {
|
||||
join_set.join_next().await;
|
||||
}
|
||||
|
||||
match task {
|
||||
DbTask::Node { id, lat, lon, tags, x, y } => {
|
||||
join_set.spawn(async move {
|
||||
let _ = session.query(
|
||||
"INSERT INTO map_data.nodes (zoom, tile_x, tile_y, id, lat, lon, tags) VALUES (?, ?, ?, ?, ?, ?, ?)",
|
||||
@@ -59,135 +80,168 @@ async fn main() -> Result<()> {
|
||||
).await;
|
||||
});
|
||||
}
|
||||
}
|
||||
Element::DenseNode(node) => {
|
||||
node_count += 1;
|
||||
node_cache.insert(node.id(), (node.lat(), node.lon()));
|
||||
|
||||
if node.tags().count() > 0 {
|
||||
inserted_nodes += 1;
|
||||
let session = session.clone();
|
||||
let id = node.id();
|
||||
let lat = node.lat();
|
||||
let lon = node.lon();
|
||||
let tags: HashMap<String, String> = node.tags().map(|(k, v)| (k.to_string(), v.to_string())).collect();
|
||||
|
||||
let (x, y) = lat_lon_to_tile(lat, lon, 10);
|
||||
|
||||
DbTask::Way { table, id, tags, points, x, y } => {
|
||||
let query = format!("INSERT INTO map_data.{} (zoom, tile_x, tile_y, id, tags, points) VALUES (?, ?, ?, ?, ?, ?)", table);
|
||||
join_set.spawn(async move {
|
||||
let _ = session.query(
|
||||
"INSERT INTO map_data.nodes (zoom, tile_x, tile_y, id, lat, lon, tags) VALUES (?, ?, ?, ?, ?, ?, ?)",
|
||||
(10, x, y, id, lat, lon, tags),
|
||||
query,
|
||||
(10, x, y, id, tags, points),
|
||||
).await;
|
||||
});
|
||||
}
|
||||
}
|
||||
Element::Way(way) => {
|
||||
way_count += 1;
|
||||
let tags: HashMap<String, String> = way.tags().map(|(k, v)| (k.to_string(), v.to_string())).collect();
|
||||
|
||||
// Filter for highways/roads OR buildings OR landuse OR water
|
||||
let is_highway = tags.contains_key("highway");
|
||||
let is_building = tags.contains_key("building");
|
||||
let is_water = tags.get("natural").map(|v| v == "water").unwrap_or(false) ||
|
||||
tags.get("waterway").map(|v| v == "riverbank").unwrap_or(false) ||
|
||||
tags.get("landuse").map(|v| v == "basin").unwrap_or(false);
|
||||
let is_landuse = tags.get("leisure").map(|v| v == "park" || v == "garden").unwrap_or(false) ||
|
||||
tags.get("landuse").map(|v| v == "grass" || v == "forest" || v == "meadow").unwrap_or(false) ||
|
||||
tags.get("natural").map(|v| v == "wood" || v == "scrub").unwrap_or(false);
|
||||
|
||||
if is_highway || is_building || is_water || is_landuse {
|
||||
let mut points = Vec::new();
|
||||
|
||||
// Resolve nodes
|
||||
for node_id in way.refs() {
|
||||
if let Some(&coords) = node_cache.get(&node_id) {
|
||||
points.push(coords);
|
||||
}
|
||||
}
|
||||
|
||||
if points.len() >= 2 {
|
||||
let session = session.clone();
|
||||
let id = way.id();
|
||||
|
||||
// Insert into the tile of the first point
|
||||
let (first_lat, first_lon) = points[0];
|
||||
let (x, y) = lat_lon_to_tile(first_lat, first_lon, 10);
|
||||
|
||||
// Serialize points to blob (f64, f64) pairs
|
||||
let mut blob = Vec::with_capacity(points.len() * 16);
|
||||
for (lat, lon) in points {
|
||||
blob.extend_from_slice(&lat.to_be_bytes());
|
||||
blob.extend_from_slice(&lon.to_be_bytes());
|
||||
}
|
||||
|
||||
if is_highway {
|
||||
inserted_ways += 1;
|
||||
let tags_clone = tags.clone();
|
||||
let blob_clone = blob.clone();
|
||||
let session = session.clone();
|
||||
join_set.spawn(async move {
|
||||
let _ = session.query(
|
||||
"INSERT INTO map_data.ways (zoom, tile_x, tile_y, id, tags, points) VALUES (?, ?, ?, ?, ?, ?)",
|
||||
(10, x, y, id, tags_clone, blob_clone),
|
||||
).await;
|
||||
});
|
||||
}
|
||||
|
||||
if is_building {
|
||||
let tags_clone = tags.clone();
|
||||
let blob_clone = blob.clone();
|
||||
let session = session.clone();
|
||||
join_set.spawn(async move {
|
||||
let _ = session.query(
|
||||
"INSERT INTO map_data.buildings (zoom, tile_x, tile_y, id, tags, points) VALUES (?, ?, ?, ?, ?, ?)",
|
||||
(10, x, y, id, tags_clone, blob_clone),
|
||||
).await;
|
||||
});
|
||||
inserted_buildings += 1;
|
||||
}
|
||||
|
||||
if is_water {
|
||||
let tags_clone = tags.clone();
|
||||
let blob_clone = blob.clone();
|
||||
let session = session.clone();
|
||||
join_set.spawn(async move {
|
||||
let _ = session.query(
|
||||
"INSERT INTO map_data.water (zoom, tile_x, tile_y, id, tags, points) VALUES (?, ?, ?, ?, ?, ?)",
|
||||
(10, x, y, id, tags_clone, blob_clone),
|
||||
).await;
|
||||
});
|
||||
inserted_water += 1;
|
||||
}
|
||||
|
||||
if is_landuse {
|
||||
let tags_clone = tags.clone();
|
||||
let blob_clone = blob.clone();
|
||||
let session = session.clone();
|
||||
join_set.spawn(async move {
|
||||
let _ = session.query(
|
||||
"INSERT INTO map_data.landuse (zoom, tile_x, tile_y, id, tags, points) VALUES (?, ?, ?, ?, ?, ?)",
|
||||
(10, x, y, id, tags_clone, blob_clone),
|
||||
).await;
|
||||
});
|
||||
inserted_landuse += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
inserted_count += 1;
|
||||
}
|
||||
|
||||
// Wait for remaining tasks
|
||||
while let Some(_) = join_set.join_next().await {}
|
||||
println!("Consumer finished. Total inserted tasks: {}", inserted_count);
|
||||
});
|
||||
|
||||
// Run the PBF reader in a blocking task to allow blocking_send
|
||||
let tx_clone = tx.clone();
|
||||
let reader_handle = tokio::task::spawn_blocking(move || -> Result<(usize, usize)> {
|
||||
let tx = tx_clone;
|
||||
let mut node_count = 0;
|
||||
let mut way_count = 0;
|
||||
|
||||
// We process sequentially: Nodes first, then Ways.
|
||||
reader.for_each(|element| {
|
||||
match element {
|
||||
Element::Node(node) => {
|
||||
node_count += 1;
|
||||
|
||||
// Store in sled: key=id (8 bytes), value=lat+lon (16 bytes)
|
||||
let id_bytes = node.id().to_be_bytes();
|
||||
let mut coords = [0u8; 16];
|
||||
coords[0..8].copy_from_slice(&node.lat().to_be_bytes());
|
||||
coords[8..16].copy_from_slice(&node.lon().to_be_bytes());
|
||||
let _ = node_cache.insert(id_bytes, &coords);
|
||||
|
||||
if node.tags().count() > 0 {
|
||||
let id = node.id();
|
||||
let lat = node.lat();
|
||||
let lon = node.lon();
|
||||
let tags: HashMap<String, String> = node.tags().map(|(k, v)| (k.to_string(), v.to_string())).collect();
|
||||
let (x, y) = lat_lon_to_tile(lat, lon, 10);
|
||||
|
||||
let task = DbTask::Node { id, lat, lon, tags, x, y };
|
||||
let _ = tx.blocking_send(task);
|
||||
}
|
||||
}
|
||||
Element::DenseNode(node) => {
|
||||
node_count += 1;
|
||||
|
||||
// Store in sled
|
||||
let id_bytes = node.id().to_be_bytes();
|
||||
let mut coords = [0u8; 16];
|
||||
coords[0..8].copy_from_slice(&node.lat().to_be_bytes());
|
||||
coords[8..16].copy_from_slice(&node.lon().to_be_bytes());
|
||||
let _ = node_cache.insert(id_bytes, &coords);
|
||||
|
||||
if node.tags().count() > 0 {
|
||||
let id = node.id();
|
||||
let lat = node.lat();
|
||||
let lon = node.lon();
|
||||
let tags: HashMap<String, String> = node.tags().map(|(k, v)| (k.to_string(), v.to_string())).collect();
|
||||
let (x, y) = lat_lon_to_tile(lat, lon, 10);
|
||||
|
||||
let task = DbTask::Node { id, lat, lon, tags, x, y };
|
||||
let _ = tx.blocking_send(task);
|
||||
}
|
||||
}
|
||||
Element::Way(way) => {
|
||||
way_count += 1;
|
||||
let tags: HashMap<String, String> = way.tags().map(|(k, v)| (k.to_string(), v.to_string())).collect();
|
||||
|
||||
// Filter for highways/roads OR buildings OR landuse OR water OR railways
|
||||
let is_highway = tags.contains_key("highway");
|
||||
let is_building = tags.contains_key("building");
|
||||
let is_water = tags.get("natural").map(|v| v == "water").unwrap_or(false) ||
|
||||
tags.get("waterway").map(|v| v == "riverbank").unwrap_or(false) ||
|
||||
tags.get("landuse").map(|v| v == "basin").unwrap_or(false);
|
||||
let is_landuse = tags.get("leisure").map(|v| v == "park" || v == "garden").unwrap_or(false) ||
|
||||
tags.get("landuse").map(|v| v == "grass" || v == "forest" || v == "meadow").unwrap_or(false) ||
|
||||
tags.get("natural").map(|v| v == "wood" || v == "scrub").unwrap_or(false);
|
||||
let is_railway = tags.contains_key("railway");
|
||||
|
||||
if is_highway || is_building || is_water || is_landuse || is_railway {
|
||||
let mut points = Vec::new();
|
||||
|
||||
// Resolve nodes from sled
|
||||
for node_id in way.refs() {
|
||||
let id_bytes = node_id.to_be_bytes();
|
||||
if let Ok(Some(coords_bytes)) = node_cache.get(id_bytes) {
|
||||
let lat = f64::from_be_bytes(coords_bytes[0..8].try_into().unwrap());
|
||||
let lon = f64::from_be_bytes(coords_bytes[8..16].try_into().unwrap());
|
||||
points.push((lat, lon));
|
||||
}
|
||||
}
|
||||
|
||||
if points.len() >= 2 {
|
||||
let id = way.id();
|
||||
|
||||
// Insert into the tile of the first point
|
||||
let (first_lat, first_lon) = points[0];
|
||||
let (x, y) = lat_lon_to_tile(first_lat, first_lon, 10);
|
||||
|
||||
// Serialize points to blob (f64, f64) pairs
|
||||
let mut blob = Vec::with_capacity(points.len() * 16);
|
||||
for (lat, lon) in points {
|
||||
blob.extend_from_slice(&lat.to_be_bytes());
|
||||
blob.extend_from_slice(&lon.to_be_bytes());
|
||||
}
|
||||
|
||||
if is_highway {
|
||||
let task = DbTask::Way { table: "ways", id, tags: tags.clone(), points: blob.clone(), x, y };
|
||||
let _ = tx.blocking_send(task);
|
||||
}
|
||||
|
||||
if is_building {
|
||||
let task = DbTask::Way { table: "buildings", id, tags: tags.clone(), points: blob.clone(), x, y };
|
||||
let _ = tx.blocking_send(task);
|
||||
}
|
||||
|
||||
if is_water {
|
||||
let task = DbTask::Way { table: "water", id, tags: tags.clone(), points: blob.clone(), x, y };
|
||||
let _ = tx.blocking_send(task);
|
||||
}
|
||||
|
||||
if is_landuse {
|
||||
let task = DbTask::Way { table: "landuse", id, tags: tags.clone(), points: blob.clone(), x, y };
|
||||
let _ = tx.blocking_send(task);
|
||||
}
|
||||
|
||||
if is_railway {
|
||||
let task = DbTask::Way { table: "railways", id, tags: tags.clone(), points: blob.clone(), x, y };
|
||||
let _ = tx.blocking_send(task);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
|
||||
if (node_count + way_count) % 100_000 == 0 {
|
||||
println!("Processed {} nodes, {} ways...", node_count, way_count);
|
||||
}
|
||||
})?;
|
||||
|
||||
if (node_count + way_count) % 100_000 == 0 {
|
||||
println!("Processed {} nodes, {} ways...", node_count, way_count);
|
||||
}
|
||||
})?;
|
||||
Ok((node_count, way_count))
|
||||
});
|
||||
|
||||
println!("Finished processing. Nodes: {}, Ways: {}. Inserted Nodes: {}, Inserted Ways: {}, Buildings: {}, Water: {}, Landuse: {}",
|
||||
node_count, way_count, inserted_nodes, inserted_ways, inserted_buildings, inserted_water, inserted_landuse);
|
||||
let (node_count, way_count) = reader_handle.await??;
|
||||
|
||||
println!("Finished reading PBF. Nodes: {}, Ways: {}. Waiting for consumer...", node_count, way_count);
|
||||
|
||||
println!("Waiting for pending inserts...");
|
||||
while let Some(_) = join_set.join_next().await {}
|
||||
// Drop sender to signal consumer to finish
|
||||
drop(tx);
|
||||
|
||||
// Wait for consumer
|
||||
consumer_handle.await?;
|
||||
|
||||
// Clean up cache
|
||||
let _ = std::fs::remove_dir_all("node_cache");
|
||||
println!("Done!");
|
||||
|
||||
Ok(())
|
||||
|
||||
Reference in New Issue
Block a user