performance
This commit is contained in:
@@ -150,6 +150,16 @@ async fn main() -> Result<()> {
|
|||||||
session.query("CREATE TABLE IF NOT EXISTS map_data.landuse (zoom int, tile_x int, tile_y int, id bigint, tags map<text, text>, points blob, PRIMARY KEY ((zoom, tile_x, tile_y), id))", &[]).await?;
|
session.query("CREATE TABLE IF NOT EXISTS map_data.landuse (zoom int, tile_x int, tile_y int, id bigint, tags map<text, text>, points blob, PRIMARY KEY ((zoom, tile_x, tile_y), id))", &[]).await?;
|
||||||
session.query("CREATE TABLE IF NOT EXISTS map_data.railways (zoom int, tile_x int, tile_y int, id bigint, tags map<text, text>, points blob, PRIMARY KEY ((zoom, tile_x, tile_y), id))", &[]).await?;
|
session.query("CREATE TABLE IF NOT EXISTS map_data.railways (zoom int, tile_x int, tile_y int, id bigint, tags map<text, text>, points blob, PRIMARY KEY ((zoom, tile_x, tile_y), id))", &[]).await?;
|
||||||
|
|
||||||
|
// Prepare statements
|
||||||
|
println!("Preparing statements...");
|
||||||
|
let insert_node = session.prepare("INSERT INTO map_data.nodes (zoom, tile_x, tile_y, id, lat, lon, tags) VALUES (?, ?, ?, ?, ?, ?, ?)").await?;
|
||||||
|
let insert_ways = session.prepare("INSERT INTO map_data.ways (zoom, tile_x, tile_y, id, tags, points) VALUES (?, ?, ?, ?, ?, ?)").await?;
|
||||||
|
let insert_buildings = session.prepare("INSERT INTO map_data.buildings (zoom, tile_x, tile_y, id, tags, points) VALUES (?, ?, ?, ?, ?, ?)").await?;
|
||||||
|
let insert_water = session.prepare("INSERT INTO map_data.water (zoom, tile_x, tile_y, id, tags, points) VALUES (?, ?, ?, ?, ?, ?)").await?;
|
||||||
|
let insert_landuse = session.prepare("INSERT INTO map_data.landuse (zoom, tile_x, tile_y, id, tags, points) VALUES (?, ?, ?, ?, ?, ?)").await?;
|
||||||
|
let insert_railways = session.prepare("INSERT INTO map_data.railways (zoom, tile_x, tile_y, id, tags, points) VALUES (?, ?, ?, ?, ?, ?)").await?;
|
||||||
|
println!("Statements prepared.");
|
||||||
|
|
||||||
let path = std::env::var("OSM_PBF_PATH").unwrap_or_else(|_| "europe-latest.osm.pbf".to_string());
|
let path = std::env::var("OSM_PBF_PATH").unwrap_or_else(|_| "europe-latest.osm.pbf".to_string());
|
||||||
println!("Reading {}...", path);
|
println!("Reading {}...", path);
|
||||||
let reader = ElementReader::from_path(path)?;
|
let reader = ElementReader::from_path(path)?;
|
||||||
@@ -177,7 +187,7 @@ async fn main() -> Result<()> {
|
|||||||
let max_concurrent = std::env::var("CONCURRENT_INSERTS")
|
let max_concurrent = std::env::var("CONCURRENT_INSERTS")
|
||||||
.ok()
|
.ok()
|
||||||
.and_then(|s| s.parse().ok())
|
.and_then(|s| s.parse().ok())
|
||||||
.unwrap_or(128); // Default to 128 concurrent inserts
|
.unwrap_or(1024); // Default to 1024 concurrent inserts
|
||||||
|
|
||||||
println!("Starting consumer with max_concurrent={}", max_concurrent);
|
println!("Starting consumer with max_concurrent={}", max_concurrent);
|
||||||
|
|
||||||
@@ -191,18 +201,27 @@ async fn main() -> Result<()> {
|
|||||||
|
|
||||||
match task {
|
match task {
|
||||||
DbTask::Node { zoom, id, lat, lon, tags, x, y } => {
|
DbTask::Node { zoom, id, lat, lon, tags, x, y } => {
|
||||||
|
let statement = insert_node.clone();
|
||||||
join_set.spawn(async move {
|
join_set.spawn(async move {
|
||||||
let _ = session.query(
|
let _ = session.execute(
|
||||||
"INSERT INTO map_data.nodes (zoom, tile_x, tile_y, id, lat, lon, tags) VALUES (?, ?, ?, ?, ?, ?, ?)",
|
&statement,
|
||||||
(zoom, x, y, id, lat, lon, tags),
|
(zoom, x, y, id, lat, lon, tags),
|
||||||
).await;
|
).await;
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
DbTask::Way { zoom, table, id, tags, points, x, y } => {
|
DbTask::Way { zoom, table, id, tags, points, x, y } => {
|
||||||
let query = format!("INSERT INTO map_data.{} (zoom, tile_x, tile_y, id, tags, points) VALUES (?, ?, ?, ?, ?, ?)", table);
|
let statement = match table {
|
||||||
|
"ways" => insert_ways.clone(),
|
||||||
|
"buildings" => insert_buildings.clone(),
|
||||||
|
"water" => insert_water.clone(),
|
||||||
|
"landuse" => insert_landuse.clone(),
|
||||||
|
"railways" => insert_railways.clone(),
|
||||||
|
_ => panic!("Unknown table: {}", table),
|
||||||
|
};
|
||||||
|
|
||||||
join_set.spawn(async move {
|
join_set.spawn(async move {
|
||||||
let _ = session.query(
|
let _ = session.execute(
|
||||||
query,
|
&statement,
|
||||||
(zoom, x, y, id, tags, points),
|
(zoom, x, y, id, tags, points),
|
||||||
).await;
|
).await;
|
||||||
});
|
});
|
||||||
|
|||||||
Reference in New Issue
Block a user