diff --git a/importer/src/main.rs b/importer/src/main.rs index c0f7470..a82abf5 100644 --- a/importer/src/main.rs +++ b/importer/src/main.rs @@ -150,6 +150,16 @@ async fn main() -> Result<()> { session.query("CREATE TABLE IF NOT EXISTS map_data.landuse (zoom int, tile_x int, tile_y int, id bigint, tags map, points blob, PRIMARY KEY ((zoom, tile_x, tile_y), id))", &[]).await?; session.query("CREATE TABLE IF NOT EXISTS map_data.railways (zoom int, tile_x int, tile_y int, id bigint, tags map, points blob, PRIMARY KEY ((zoom, tile_x, tile_y), id))", &[]).await?; + // Prepare statements + println!("Preparing statements..."); + let insert_node = session.prepare("INSERT INTO map_data.nodes (zoom, tile_x, tile_y, id, lat, lon, tags) VALUES (?, ?, ?, ?, ?, ?, ?)").await?; + let insert_ways = session.prepare("INSERT INTO map_data.ways (zoom, tile_x, tile_y, id, tags, points) VALUES (?, ?, ?, ?, ?, ?)").await?; + let insert_buildings = session.prepare("INSERT INTO map_data.buildings (zoom, tile_x, tile_y, id, tags, points) VALUES (?, ?, ?, ?, ?, ?)").await?; + let insert_water = session.prepare("INSERT INTO map_data.water (zoom, tile_x, tile_y, id, tags, points) VALUES (?, ?, ?, ?, ?, ?)").await?; + let insert_landuse = session.prepare("INSERT INTO map_data.landuse (zoom, tile_x, tile_y, id, tags, points) VALUES (?, ?, ?, ?, ?, ?)").await?; + let insert_railways = session.prepare("INSERT INTO map_data.railways (zoom, tile_x, tile_y, id, tags, points) VALUES (?, ?, ?, ?, ?, ?)").await?; + println!("Statements prepared."); + let path = std::env::var("OSM_PBF_PATH").unwrap_or_else(|_| "europe-latest.osm.pbf".to_string()); println!("Reading {}...", path); let reader = ElementReader::from_path(path)?; @@ -177,7 +187,7 @@ async fn main() -> Result<()> { let max_concurrent = std::env::var("CONCURRENT_INSERTS") .ok() .and_then(|s| s.parse().ok()) - .unwrap_or(128); // Default to 128 concurrent inserts + .unwrap_or(1024); // Default to 1024 concurrent inserts println!("Starting consumer with max_concurrent={}", max_concurrent); @@ -191,18 +201,27 @@ async fn main() -> Result<()> { match task { DbTask::Node { zoom, id, lat, lon, tags, x, y } => { + let statement = insert_node.clone(); join_set.spawn(async move { - let _ = session.query( - "INSERT INTO map_data.nodes (zoom, tile_x, tile_y, id, lat, lon, tags) VALUES (?, ?, ?, ?, ?, ?, ?)", + let _ = session.execute( + &statement, (zoom, x, y, id, lat, lon, tags), ).await; }); } DbTask::Way { zoom, table, id, tags, points, x, y } => { - let query = format!("INSERT INTO map_data.{} (zoom, tile_x, tile_y, id, tags, points) VALUES (?, ?, ?, ?, ?, ?)", table); + let statement = match table { + "ways" => insert_ways.clone(), + "buildings" => insert_buildings.clone(), + "water" => insert_water.clone(), + "landuse" => insert_landuse.clone(), + "railways" => insert_railways.clone(), + _ => panic!("Unknown table: {}", table), + }; + join_set.spawn(async move { - let _ = session.query( - query, + let _ = session.execute( + &statement, (zoom, x, y, id, tags, points), ).await; });