From 1155b57991572e175090055107608476957bc527 Mon Sep 17 00:00:00 2001 From: Varik Matevosyan Date: Mon, 19 Feb 2024 15:14:09 +0400 Subject: [PATCH] Rename codebook table and params to match lantern pq --- ci/scripts/build.sh | 2 +- lantern_pq/src/cli.rs | 28 +++---- lantern_pq/src/codebook.rs | 2 +- lantern_pq/src/gcp_batch.rs | 78 ++++++++++--------- lantern_pq/src/lib.rs | 57 ++++++++------ .../src/{compression.rs => quantization.rs} | 56 ++++++------- lantern_pq/src/setup.rs | 27 ++++--- lantern_pq/tests/pq_test_with_db.rs | 62 +++++++-------- 8 files changed, 169 insertions(+), 143 deletions(-) rename lantern_pq/src/{compression.rs => quantization.rs} (86%) diff --git a/ci/scripts/build.sh b/ci/scripts/build.sh index e45bd5d..04970d7 100755 --- a/ci/scripts/build.sh +++ b/ci/scripts/build.sh @@ -48,7 +48,7 @@ function setup_postgres() { } function setup_lantern() { - LANTERN_VERSION=v0.1.1 + LANTERN_VERSION=main git clone --recursive https://github.com/lanterndata/lantern.git /tmp/lantern pushd /tmp/lantern git checkout ${LANTERN_VERSION} && \ diff --git a/lantern_pq/src/cli.rs b/lantern_pq/src/cli.rs index 6b411d2..5cd8282 100644 --- a/lantern_pq/src/cli.rs +++ b/lantern_pq/src/cli.rs @@ -40,15 +40,15 @@ pub struct PQArgs { #[arg(long, default_value_t = false)] pub skip_table_setup: bool, - /// If true vectors will not be compressed and exported to the table + /// If true vectors will not be quantized and exported to the table #[arg(long, default_value_t = false)] - pub skip_vector_compression: bool, + pub skip_vector_quantization: bool, /// If true codebook will not be created #[arg(long, default_value_t = false)] pub skip_codebook_creation: bool, - /// Primary key of the table, needed for compression job + /// Primary key of the table, needed for quantization job #[arg(long, default_value = "id")] pub pk: String, @@ -60,9 +60,9 @@ pub struct PQArgs { #[arg(long)] pub parallel_task_count: Option, - /// Task id of currently running compression job (used in gcp batch jobs) + /// Task id of currently running quantization job (used in gcp batch jobs) #[arg(long)] - pub compression_task_id: Option, + pub quantization_task_id: Option, // GCP ARGS /// If true job will be submitted to gcp @@ -85,16 +85,16 @@ pub struct PQArgs { #[arg(long)] pub gcp_image: Option, - /// Task count for compression. default: calculated automatically based on dataset size + /// Task count for quantization. default: calculated automatically based on dataset size #[arg(long)] - pub gcp_compression_task_count: Option, + pub gcp_quantization_task_count: Option, - /// Parallel tasks for compression. default: calculated automatically based on + /// Parallel tasks for quantization. default: calculated automatically based on /// max connections #[arg(long)] - pub gcp_compression_task_parallelism: Option, + pub gcp_quantization_task_parallelism: Option, - /// Parallel tasks for compression. default: calculated automatically based on + /// Parallel tasks for quantization. default: calculated automatically based on /// max connections and dataset size #[arg(long)] pub gcp_clustering_task_parallelism: Option, @@ -111,11 +111,11 @@ pub struct PQArgs { #[arg(long)] pub gcp_clustering_memory_gb: Option, - /// CPU count for one VM in compression task. default: calculated based on dataset size + /// CPU count for one VM in quantization task. default: calculated based on dataset size #[arg(long)] - pub gcp_compression_cpu: Option, + pub gcp_quantization_cpu: Option, - /// Memory GB for one VM in compression task. default: calculated based on CPU count + /// Memory GB for one VM in quantization task. default: calculated based on CPU count #[arg(long)] - pub gcp_compression_memory_gb: Option, + pub gcp_quantization_memory_gb: Option, } diff --git a/lantern_pq/src/codebook.rs b/lantern_pq/src/codebook.rs index 5455081..f6d26fe 100644 --- a/lantern_pq/src/codebook.rs +++ b/lantern_pq/src/codebook.rs @@ -216,7 +216,7 @@ pub fn create_codebook<'a> ( total_fetch_start_time.elapsed().as_secs() )); - // progress indicator is: 5% load, 70% codebook, 15% compression, 10% export + // progress indicator is: 5% load, 70% codebook, 15% quantization, 10% export report_progress(&progress_cb, &logger, &args.main_progress, 5); let mut codebooks_hashmap: HashMap>> = HashMap::new(); diff --git a/lantern_pq/src/gcp_batch.rs b/lantern_pq/src/gcp_batch.rs index 45ef958..a0dbc92 100644 --- a/lantern_pq/src/gcp_batch.rs +++ b/lantern_pq/src/gcp_batch.rs @@ -22,7 +22,7 @@ static CLUSTERING_TASK_TEMPLATE: &'static str = r#"{ "entrypoint": "/bin/sh", "commands": [ "-c", - "/lantern-cli pq-table --uri ${DB_URI} --table ${TABLE} --column ${COLUMN} --clusters ${CLUSTERS} --splits ${SPLITS} --parallel-task-count ${PARALLEL_TASK_COUNT} --subvector-id ${BATCH_TASK_INDEX} --skip-table-setup --skip-vector-compression; exit $?" + "/lantern-cli pq-table --uri ${DB_URI} --table ${TABLE} --column ${COLUMN} --clusters ${CLUSTERS} --splits ${SPLITS} --parallel-task-count ${PARALLEL_TASK_COUNT} --subvector-id ${BATCH_TASK_INDEX} --skip-table-setup --skip-vector-quantization; exit $?" ] }, "environment": { @@ -32,7 +32,7 @@ static CLUSTERING_TASK_TEMPLATE: &'static str = r#"{ "COLUMN": "{column}", "CLUSTERS": "{cluster_count}", "SPLITS": "{splits}", - "PARALLEL_TASK_COUNT": "{gcp_compression_task_parallelism}" + "PARALLEL_TASK_COUNT": "{gcp_quantization_task_parallelism}" } } } @@ -53,7 +53,7 @@ static CLUSTERING_TASK_TEMPLATE: &'static str = r#"{ } }"#; -static COMPRESSION_TASK_TEMPLATE: &'static str = r#"{ +static QUANTIZATION_TASK_TEMPLATE: &'static str = r#"{ "taskGroups": [{ "taskSpec": { "runnables": [ @@ -64,7 +64,7 @@ static COMPRESSION_TASK_TEMPLATE: &'static str = r#"{ "entrypoint": "/bin/sh", "commands": [ "-c", - "/lantern-cli pq-table --uri ${DB_URI} --table ${TABLE} --column ${COLUMN} --clusters ${CLUSTERS} --splits ${SPLITS} --skip-table-setup --skip-codebook-creation --total-task-count ${COMPRESSION_TASK_COUNT} --parallel-task-count ${PARALLEL_TASK_COUNT} --compression-task-id ${BATCH_TASK_INDEX}; exit $?" + "/lantern-cli pq-table --uri ${DB_URI} --table ${TABLE} --column ${COLUMN} --clusters ${CLUSTERS} --splits ${SPLITS} --skip-table-setup --skip-codebook-creation --total-task-count ${QUANTIZATION_TASK_COUNT} --parallel-task-count ${PARALLEL_TASK_COUNT} --quantization-task-id ${BATCH_TASK_INDEX}; exit $?" ] }, "environment": { @@ -74,8 +74,8 @@ static COMPRESSION_TASK_TEMPLATE: &'static str = r#"{ "COLUMN": "{column}", "CLUSTERS": "{cluster_count}", "SPLITS": "{splits}", - "COMPRESSION_TASK_COUNT": "{gcp_compression_task_count}", - "PARALLEL_TASK_COUNT": "{gcp_compression_task_parallelism}" + "QUANTIZATION_TASK_COUNT": "{gcp_quantization_task_count}", + "PARALLEL_TASK_COUNT": "{gcp_quantization_task_parallelism}" } } } @@ -87,9 +87,9 @@ static COMPRESSION_TASK_TEMPLATE: &'static str = r#"{ "maxRetryCount": 1, "maxRunDuration": "2000s" }, - "taskCount": "{gcp_compression_task_count}", + "taskCount": "{gcp_quantization_task_count}", "taskCountPerNode": 1, - "parallelism": "{gcp_compression_task_parallelism}" + "parallelism": "{gcp_quantization_task_parallelism}" }], "logsPolicy": { "destination": "CLOUD_LOGGING" @@ -191,7 +191,7 @@ pub fn quantize_table_on_gcp( main_progress: AtomicU8, db_uri: &str, full_table_name: &str, - codebook_table_name: &str, + full_codebook_table_name: &str, pq_column_name: &str, progress_cb: Option, logger: &Logger, @@ -229,12 +229,12 @@ pub fn quantize_table_on_gcp( let total_row_count = total_row_count.try_get::(0)? as usize; - let gcp_compression_cpu_count = args.gcp_compression_cpu.unwrap_or(4); - let gcp_compression_memory_gb = args - .gcp_compression_memory_gb - .unwrap_or((gcp_compression_cpu_count as f64 * 3.75) as usize); + let gcp_quantization_cpu_count = args.gcp_quantization_cpu.unwrap_or(4); + let gcp_quantization_memory_gb = args + .gcp_quantization_memory_gb + .unwrap_or((gcp_quantization_cpu_count as f64 * 3.75) as usize); - let gcp_clustering_cpu_count = args.gcp_compression_cpu.unwrap_or_else(|| { + let gcp_clustering_cpu_count = args.gcp_quantization_cpu.unwrap_or_else(|| { if total_row_count < 100_000 { 8 } else if total_row_count < 1_000_000 { @@ -254,20 +254,24 @@ pub fn quantize_table_on_gcp( .unwrap_or((gcp_clustering_cpu_count as f64 * 3.75) as usize); // Let each vm process max 50k rows - let gcp_compression_task_count = args - .gcp_compression_task_count + let gcp_quantization_task_count = args + .gcp_quantization_task_count .unwrap_or(cmp::max(total_row_count / 50000, 1)); // Limit parallel task count to not exceed max connection limit - let gcp_compression_task_parallelism = args - .gcp_compression_task_parallelism - .unwrap_or(cmp::max(1, max_connections / gcp_compression_task_count)); + let gcp_quantization_task_parallelism = args + .gcp_quantization_task_parallelism + .unwrap_or(cmp::max(1, max_connections / gcp_quantization_task_count)); - let gcp_compression_task_parallelism = - cmp::min(gcp_compression_task_parallelism, gcp_compression_task_count); + let gcp_quantization_task_parallelism = cmp::min( + gcp_quantization_task_parallelism, + gcp_quantization_task_count, + ); - let gcp_compression_task_parallelism = - cmp::min(gcp_compression_task_parallelism, gcp_compression_task_count); + let gcp_quantization_task_parallelism = cmp::min( + gcp_quantization_task_parallelism, + gcp_quantization_task_count, + ); // Limit parallel task count to not exceed max connection limit let gcp_clustering_task_parallelism = args.gcp_clustering_task_parallelism.unwrap_or(cmp::min( @@ -280,7 +284,7 @@ pub fn quantize_table_on_gcp( crate::setup::setup_tables( &mut transaction, &full_table_name, - &codebook_table_name, + &full_codebook_table_name, &pq_column_name, &logger, )?; @@ -288,7 +292,7 @@ pub fn quantize_table_on_gcp( crate::setup::setup_triggers( &mut transaction, &full_table_name, - &codebook_table_name, + &full_codebook_table_name, &pq_column_name, &args.column, "l2sq", @@ -298,6 +302,7 @@ pub fn quantize_table_on_gcp( // Creating new transaction, because current transaction will lock table reads // and block the process transaction.commit()?; + transaction = db_client.transaction()?; set_and_report_progress(&progress_cb, &logger, &main_progress, 5); } @@ -336,12 +341,16 @@ pub fn quantize_table_on_gcp( "Clustering duration: {}s", task_start.elapsed().as_secs() )); + crate::setup::make_codebook_logged_and_readonly( + &mut transaction, + &full_codebook_table_name, + )?; set_and_report_progress(&progress_cb, &logger, &main_progress, 90); } - if !args.skip_vector_compression { + if !args.skip_vector_quantization { let task_start = Instant::now(); - let mut body_json: Value = serde_json::from_str(COMPRESSION_TASK_TEMPLATE)?; + let mut body_json: Value = serde_json::from_str(QUANTIZATION_TASK_TEMPLATE)?; body_json["taskGroups"][0]["taskSpec"]["runnables"][0]["container"]["imageUri"] = json!(gcp_image); body_json["taskGroups"][0]["taskSpec"]["runnables"][0]["container"] @@ -357,15 +366,15 @@ pub fn quantize_table_on_gcp( body_json["taskGroups"][0]["taskSpec"]["runnables"][0]["environment"]["variables"] ["SPLITS"] = json!(args.splits.to_string()); body_json["taskGroups"][0]["taskSpec"]["runnables"][0]["environment"]["variables"] - ["COMPRESSION_TASK_COUNT"] = json!(gcp_compression_task_count.to_string()); + ["QUANTIZATION_TASK_COUNT"] = json!(gcp_quantization_task_count.to_string()); body_json["taskGroups"][0]["taskSpec"]["runnables"][0]["environment"]["variables"] - ["PARALLEL_TASK_COUNT"] = json!(gcp_compression_task_parallelism.to_string()); + ["PARALLEL_TASK_COUNT"] = json!(gcp_quantization_task_parallelism.to_string()); body_json["taskGroups"][0]["taskSpec"]["computeResource"]["cpuMilli"] = - json!(gcp_compression_cpu_count * 1000); + json!(gcp_quantization_cpu_count * 1000); body_json["taskGroups"][0]["taskSpec"]["computeResource"]["memoryMib"] = - json!(gcp_compression_memory_gb * 1000); - body_json["taskGroups"][0]["taskCount"] = json!(gcp_compression_task_count); - body_json["taskGroups"][0]["parallelism"] = json!(gcp_compression_task_parallelism); + json!(gcp_quantization_memory_gb * 1000); + body_json["taskGroups"][0]["taskCount"] = json!(gcp_quantization_task_count); + body_json["taskGroups"][0]["parallelism"] = json!(gcp_quantization_task_parallelism); run_batch_job( &logger, @@ -373,11 +382,10 @@ pub fn quantize_table_on_gcp( &format!("projects/{gcp_project_id}/locations/{gcp_region}"), )?; logger.debug(&format!( - "Compression duration: {}s", + "quantization duration: {}s", task_start.elapsed().as_secs() )); } - set_and_report_progress(&progress_cb, &logger, &main_progress, 100); Ok(()) } diff --git a/lantern_pq/src/lib.rs b/lantern_pq/src/lib.rs index 52a64cb..c762c89 100644 --- a/lantern_pq/src/lib.rs +++ b/lantern_pq/src/lib.rs @@ -1,7 +1,7 @@ use codebook::CreateCodebookArgs; -use compression::CompressAndWriteVectorArgs; use lantern_logger::{LogLevel, Logger}; use lantern_utils::{append_params_to_uri, get_full_table_name, quote_ident}; +use quantization::QuantizeAndWriteVectorArgs; use std::sync::atomic::{AtomicU8, Ordering}; use std::sync::{Arc, RwLock}; use std::time::Instant; @@ -10,8 +10,8 @@ use postgres::{Client, NoTls}; pub mod cli; mod codebook; -mod compression; mod gcp_batch; +mod quantization; mod setup; type AnyhowVoidResult = Result<(), anyhow::Error>; @@ -61,18 +61,18 @@ struct DatasetItem { // This code can be used in 2 modes // The first one is to quantize the whole table for all subvectors // In this mode whole vectors will be fetched from the table and kmeans will be run for all -// subvectors then codebook will be created, vectors will be compressed and written to table +// subvectors then codebook will be created, vectors will be quantized and written to table // // The second mode is meant to horizontally scale this job, so only one subvector will be fetched // for the job and codebook will be created for that subvector -// Then separate job will be run to compress vectors and write to table +// Then separate job will be run to quantize vectors and write to table fn quantize_table_local( args: cli::PQArgs, main_progress: AtomicU8, db_uri: &str, full_table_name: &str, - codebook_table_name: &str, + full_codebook_table_name: &str, pq_column_name: &str, progress_cb: Option, is_canceled: Option>>, @@ -91,7 +91,7 @@ fn quantize_table_local( setup::setup_tables( &mut transaction, &full_table_name, - &codebook_table_name, + &full_codebook_table_name, &pq_column_name, &logger, )?; @@ -99,7 +99,7 @@ fn quantize_table_local( setup::setup_triggers( &mut transaction, &full_table_name, - &codebook_table_name, + &full_codebook_table_name, &pq_column_name, column, "l2sq", @@ -112,7 +112,7 @@ fn quantize_table_local( transaction = client.transaction()?; // Commit and return if the task is to only set up tables - if args.skip_codebook_creation && args.skip_vector_compression { + if args.skip_codebook_creation && args.skip_vector_quantization { set_and_report_progress(&progress_cb, &logger, &main_progress, 100); return Ok(()); } @@ -134,15 +134,15 @@ fn quantize_table_local( )?; let max_connections = max_connections.get::(0) as usize; - // Only compress will be passed if task is run from Batch job + // If --skip-codebook-creation is passed that means we only need to quantize and write vectors // As there will be three phases - // 1. table setup, 2. codebook craetion 3. table compression 4. trigger setup + // 1. table setup, 2. codebook craetion 3. table quantization 4. trigger setup // 2 and 3 phases will be run in parallel - if args.skip_codebook_creation { + if args.skip_codebook_creation && !args.skip_vector_quantization { drop(transaction); - compression::compress_and_write_vectors( - CompressAndWriteVectorArgs { - codebook_table_name: &codebook_table_name, + quantization::quantize_and_write_vectors( + QuantizeAndWriteVectorArgs { + codebook_table_name: &full_codebook_table_name, full_table_name: &full_table_name, db_uri, schema, @@ -154,7 +154,7 @@ fn quantize_table_local( total_row_count, total_task_count: &args.total_task_count, parallel_task_count: &args.parallel_task_count, - compression_task_id: &args.compression_task_id, + quantization_task_id: &args.quantization_task_id, max_connections, main_progress: &main_progress, progress_cb: &progress_cb, @@ -191,7 +191,7 @@ fn quantize_table_local( pk: &args.pk, column, full_table_name: &full_table_name, - codebook_table_name: &codebook_table_name, + codebook_table_name: &full_codebook_table_name, total_row_count, max_connections, splits: args.splits, @@ -204,12 +204,20 @@ fn quantize_table_local( &mut transaction, )?; - // Compress vectors using codebook + if args.subvector_id.is_none() { + // We will only run this if clustering is run for whole dataset + // As we can not know if this is the last task or not + // So it is the responsibility of workflow orchestrator + // To make the codebook table logged and readonly + setup::make_codebook_logged_and_readonly(&mut transaction, &full_codebook_table_name)?; + } + + // quantize vectors using codebook // And write results to target table - if !args.skip_vector_compression { + if !args.skip_vector_quantization { let codebooks_hashmap = Arc::new(RwLock::new(codebooks_hashmap)); - let dataset = compression::compress_vectors( + let dataset = quantization::quantize_vectors( &dataset, vector_dim, subvector_dim, @@ -225,14 +233,14 @@ fn quantize_table_local( anyhow::bail!("Job canceled"); } - compression::write_compressed_rows( + quantization::write_quantized_rows( &mut transaction, &dataset, &args.schema, &args.table, &pq_column_name, &args.pk, - "compress", + "quantize", &main_progress, &progress_cb, &logger, @@ -261,7 +269,8 @@ pub fn quantize_table( let codebook_table_name = args .codebook_table_name .clone() - .unwrap_or(format!("_lantern_codebook_{}", args.table)); + .unwrap_or(format!("_codebook_{}_{}", args.table, args.column)); + let full_codebook_table_name = get_full_table_name("_lantern_internal", &codebook_table_name); let pq_column_name = format!("{}_pq", args.column); let db_uri = append_params_to_uri(&args.uri, CONNECTION_PARAMS); @@ -271,7 +280,7 @@ pub fn quantize_table( main_progress, &db_uri, &full_table_name, - &codebook_table_name, + &full_codebook_table_name, &pq_column_name, progress_cb, &logger, @@ -282,7 +291,7 @@ pub fn quantize_table( main_progress, &db_uri, &full_table_name, - &codebook_table_name, + &full_codebook_table_name, &pq_column_name, progress_cb, is_canceled, diff --git a/lantern_pq/src/compression.rs b/lantern_pq/src/quantization.rs similarity index 86% rename from lantern_pq/src/compression.rs rename to lantern_pq/src/quantization.rs index aacaaa1..09e8bea 100644 --- a/lantern_pq/src/compression.rs +++ b/lantern_pq/src/quantization.rs @@ -39,8 +39,8 @@ fn get_closest_centroid(centroids: &Vec>, subvector: &[f32]) -> u8 { // Will parallel iterate over the dataset // Then iterate over each subvector of the vector and return // closest centroid id for that subvector -// Result will be vector with row id and compressed vector -pub fn compress_vectors( +// Result will be vector with row id and quantized vector +pub fn quantize_vectors( dataset: &Vec, vector_dim: usize, subvector_dim: usize, @@ -48,7 +48,7 @@ pub fn compress_vectors( codebooks_hashmap: Arc>>>>, logger: &Logger, ) -> Result)>, anyhow::Error> { - let compression_start = Instant::now(); + let quantization_start = Instant::now(); let rows: Vec<_> = dataset .iter() .map(|r| r.clone()) @@ -71,17 +71,17 @@ pub fn compress_vectors( .collect(); logger.debug(&format!( - "Vector compression duration: {}s", - compression_start.elapsed().as_secs() + "Vector quantization duration: {}s", + quantization_start.elapsed().as_secs() )); Ok(rows) } -// This function will write compressed vector into temporary table +// This function will write quantized vector into temporary table // Using COPY protocol and then update the original table via pk mapping -// So we will use only one UPDATE query to write compressed vectors +// So we will use only one UPDATE query to write quantized vectors // This function can be run in parallel -pub fn write_compressed_rows<'a>( +pub fn write_quantized_rows<'a>( transaction: &mut Transaction<'a>, rows: &Vec<(String, Vec)>, schema: &str, @@ -101,7 +101,7 @@ pub fn write_compressed_rows<'a>( transaction .execute( &format!( - "CREATE TEMPORARY TABLE {temp_table_name} AS SELECT {pk} as id, '{{}}'::PQVEC AS {pq_column} FROM {full_table_name} LIMIT 0", + "CREATE TEMPORARY TABLE {temp_table_name} AS SELECT {pk} as id, '{{1}}'::PQVEC AS {pq_column} FROM {full_table_name} LIMIT 0", pq_column = quote_ident(pq_column), pk = quote_ident(pk) ), @@ -155,8 +155,8 @@ pub fn write_compressed_rows<'a>( // It is optimized for parallel runs // The data read/write will be done in parallel using rayon // It can operate over range of data from the whole table, -// so it can be split over multiple vm instances to speed up compression times -pub struct CompressAndWriteVectorArgs<'a> { +// so it can be split over multiple vm instances to speed up quantization times +pub struct QuantizeAndWriteVectorArgs<'a> { pub codebook_table_name: &'a str, pub full_table_name: &'a str, pub db_uri: &'a str, @@ -169,18 +169,19 @@ pub struct CompressAndWriteVectorArgs<'a> { pub total_row_count: usize, pub total_task_count: &'a Option, pub parallel_task_count: &'a Option, - pub compression_task_id: &'a Option, + pub quantization_task_id: &'a Option, pub max_connections: usize, pub main_progress: &'a AtomicU8, pub progress_cb: &'a Option, pub logger: &'a Logger, } -pub fn compress_and_write_vectors(args: CompressAndWriteVectorArgs, mut client: Client) -> crate::AnyhowVoidResult { +pub fn quantize_and_write_vectors(args: QuantizeAndWriteVectorArgs, mut client: Client) -> crate::AnyhowVoidResult { let mut transaction = client.transaction()?; let logger = args.logger; let db_uri = args.db_uri; let full_table_name = args.full_table_name; + let full_codebook_table_name = args.codebook_table_name; let column = args.column; let splits = args.splits; let schema = args.schema; @@ -195,23 +196,22 @@ pub fn compress_and_write_vectors(args: CompressAndWriteVectorArgs, mut client: // In batch mode each task will operate on a range of vectors from dataset // Here we will determine the range from the task id - if let Some(compression_task_id) = args.compression_task_id { + if let Some(quantization_task_id) = args.quantization_task_id { if args.total_task_count.is_none() { - anyhow::bail!("Please provide --total-task-count when providing --compression-task-id"); + anyhow::bail!("Please provide --total-task-count when providing --quantization-task-id"); } - let compression_task_count = args.total_task_count.as_ref().unwrap(); + let quantization_task_count = args.total_task_count.as_ref().unwrap(); - let chunk_per_task = limit_end / compression_task_count; - limit_start = chunk_per_task * compression_task_id; - limit_end = if *compression_task_id == compression_task_count - 1 { limit_end } else { limit_start + chunk_per_task }; + let chunk_per_task = limit_end / quantization_task_count; + limit_start = chunk_per_task * quantization_task_id; + limit_end = if *quantization_task_id == quantization_task_count - 1 { limit_end } else { limit_start + chunk_per_task }; } // Read all codebook and create a hashmap from it let codebook_read_start = Instant::now(); let codebook_rows = transaction.query( &format!( - "SELECT subvector_id, centroid_id, c FROM {codebook_table_name} ORDER BY centroid_id ASC;", - codebook_table_name = quote_ident(&args.codebook_table_name), + "SELECT subvector_id, centroid_id, c FROM {full_codebook_table_name} ORDER BY centroid_id ASC;" ), &[], )?; @@ -254,13 +254,13 @@ pub fn compress_and_write_vectors(args: CompressAndWriteVectorArgs, mut client: // Here we will read the range of data for this chunk in parallel // Based on total task count and machine CPU count - // Then we will compress the range chunk and write to database + // Then we will quantize the range chunk and write to database let range_row_count = limit_end - limit_start; let num_cores: usize = std::thread::available_parallelism().unwrap().into(); - let num_connections: usize = if args.compression_task_id.is_some() { + let num_connections: usize = if args.quantization_task_id.is_some() { // This will never fail as it is checked on start to be specified if task id is present let parallel_task_count = args.parallel_task_count.as_ref().unwrap_or(args.total_task_count.as_ref().unwrap()); - // If there's compression task id we expect this to be batch job + // If there's quantization task id we expect this to be batch job // So each task will get (max_connections / parallel task count) connection pool // But it won't be higher than cpu count cmp::min(num_cores, (args.max_connections - 2) / parallel_task_count) @@ -277,7 +277,7 @@ pub fn compress_and_write_vectors(args: CompressAndWriteVectorArgs, mut client: logger.debug(&format!("max_connections: {}, num_cores: {num_cores}, num_connections: {num_connections}, chunk_count: {chunk_count}", args.max_connections)); - let compression_and_write_start_time = Instant::now(); + let quantization_and_write_start_time = Instant::now(); let results = (0..num_connections) .into_par_iter() @@ -321,7 +321,7 @@ pub fn compress_and_write_vectors(args: CompressAndWriteVectorArgs, mut client: }) .collect::>(); let vector_dim = rows[0].vec.len(); - let rows = compress_vectors( + let rows = quantize_vectors( &rows, vector_dim, subvector_dim, @@ -330,7 +330,7 @@ pub fn compress_and_write_vectors(args: CompressAndWriteVectorArgs, mut client: &logger, )?; - write_compressed_rows( + write_quantized_rows( &mut transaction, &rows, schema, @@ -350,7 +350,7 @@ pub fn compress_and_write_vectors(args: CompressAndWriteVectorArgs, mut client: result?; } - logger.debug(&format!("Vectors compressed and exported in {}s", compression_and_write_start_time.elapsed().as_secs())); + logger.debug(&format!("Vectors quantized and exported in {}s", quantization_and_write_start_time.elapsed().as_secs())); transaction.commit()?; Ok(()) } diff --git a/lantern_pq/src/setup.rs b/lantern_pq/src/setup.rs index 5ebb259..19c1550 100644 --- a/lantern_pq/src/setup.rs +++ b/lantern_pq/src/setup.rs @@ -8,22 +8,21 @@ use crate::AnyhowVoidResult; pub fn setup_tables<'a>( transaction: &mut Transaction<'a>, full_table_name: &str, - codebook_table_name: &str, + full_codebook_table_name: &str, pq_column_name: &str, logger: &Logger, ) -> AnyhowVoidResult { transaction.batch_execute(&format!( " - CREATE TABLE {codebook_table_name} (subvector_id INT, centroid_id INT, c REAL[]); + CREATE UNLOGGED TABLE {full_codebook_table_name} (subvector_id INT, centroid_id INT, c REAL[]); ALTER TABLE {full_table_name} ADD COLUMN {pq_column_name} PQVEC; - CREATE INDEX ON {codebook_table_name} USING BTREE(subvector_id, centroid_id); - CREATE INDEX ON {codebook_table_name} USING BTREE(centroid_id); + CREATE INDEX ON {full_codebook_table_name} USING BTREE(subvector_id, centroid_id); + CREATE INDEX ON {full_codebook_table_name} USING BTREE(centroid_id); ", - codebook_table_name = quote_ident(&codebook_table_name), pq_column_name = quote_ident(&pq_column_name) ))?; logger.info(&format!( - "{codebook_table_name} table and {pq_column_name} column created successfully" + "{full_codebook_table_name} table and {pq_column_name} column created successfully" )); Ok(()) } @@ -32,7 +31,7 @@ pub fn setup_tables<'a>( pub fn setup_triggers<'a>( transaction: &mut Transaction<'a>, full_table_name: &str, - codebook_table_name: &str, + full_codebook_table_name: &str, pq_column: &str, column: &str, distance_metric: &str, @@ -56,7 +55,7 @@ pub fn setup_triggers<'a>( IF NEW.{column} IS NULL THEN NEW.{pq_column} := NULL; ELSE - NEW.{pq_column} := _lantern_internal.compress_vector(NEW.{column}, {splits}, {codebook_table_name}::regclass, '{distance_metric}'); + NEW.{pq_column} := _lantern_internal.quantize_vector(NEW.{column}, {splits}, '{full_codebook_table_name}'::regclass, '{distance_metric}'); END IF; RETURN NEW; END @@ -64,7 +63,17 @@ pub fn setup_triggers<'a>( CREATE TRIGGER {insert_trigger_name} BEFORE INSERT ON {full_table_name} FOR EACH ROW EXECUTE FUNCTION {trigger_fn_name}(); CREATE TRIGGER {update_trigger_name} BEFORE UPDATE OF {column} ON {full_table_name} FOR EACH ROW EXECUTE FUNCTION {trigger_fn_name}(); + ", pq_column=quote_ident(pq_column), column=quote_ident(column) ))?; + Ok(()) +} - ", pq_column=quote_ident(pq_column), column=quote_ident(column), codebook_table_name=quote_ident(codebook_table_name)))?; +pub fn make_codebook_logged_and_readonly<'a>( + transaction: &mut Transaction<'a>, + full_codebook_table_name: &str, +) -> AnyhowVoidResult { + transaction.batch_execute(&format!(" + ALTER TABLE {full_codebook_table_name} SET LOGGED; + CREATE TRIGGER readonly_guard BEFORE INSERT OR UPDATE OR DELETE ON {full_codebook_table_name} EXECUTE PROCEDURE _lantern_internal.forbid_table_change(); + "))?; Ok(()) } diff --git a/lantern_pq/tests/pq_test_with_db.rs b/lantern_pq/tests/pq_test_with_db.rs index 82065c5..534b83a 100644 --- a/lantern_pq/tests/pq_test_with_db.rs +++ b/lantern_pq/tests/pq_test_with_db.rs @@ -36,7 +36,7 @@ fn drop_db_tables(client: &mut Client, table_name: &str, codebook_table_name: &s fn test_full_pq() { let db_url = env::var("DB_URL").expect("`DB_URL` not specified"); let table_name = String::from("_lantern_pq_test"); - let codebook_table_name = String::from("_lantern_codebook_lantern_pq_test"); + let codebook_table_name = String::from("_lantern_internal._codebook__lantern_pq_test_v"); let mut db_client = Client::connect(&db_url, NoTls).expect("Database connection failed"); drop_db_tables(&mut db_client, &table_name, &codebook_table_name); setup_db_tables(&mut db_client, &table_name); @@ -54,30 +54,30 @@ fn test_full_pq() { column: "v".to_owned(), table: table_name.clone(), schema: "public".to_owned(), - codebook_table_name: Some(codebook_table_name.clone()), + codebook_table_name: None, clusters: 10, splits: 32, subvector_id: None, skip_table_setup: false, - skip_vector_compression: false, + skip_vector_quantization: false, skip_codebook_creation: false, pk: "id".to_owned(), total_task_count: None, parallel_task_count: None, - compression_task_id: None, + quantization_task_id: None, run_on_gcp: false, gcp_cli_image_tag: None, gcp_project: None, gcp_region: None, gcp_image: None, - gcp_compression_task_count: None, - gcp_compression_task_parallelism: None, + gcp_quantization_task_count: None, + gcp_quantization_task_parallelism: None, gcp_clustering_task_parallelism: None, gcp_enable_image_streaming: false, gcp_clustering_cpu: None, gcp_clustering_memory_gb: None, - gcp_compression_cpu: None, - gcp_compression_memory_gb: None, + gcp_quantization_cpu: None, + gcp_quantization_memory_gb: None, }, Some(Box::new(callback)), None, @@ -116,7 +116,7 @@ fn test_full_pq() { fn test_chunked_pq() { let db_url = env::var("DB_URL").expect("`DB_URL` not specified"); let table_name = String::from("_lantern_pq_test_2"); - let codebook_table_name = String::from("_lantern_codebook_lantern_pq_test_2"); + let codebook_table_name = String::from("_lantern_internal._codebook__lantern_pq_test_2_v"); let mut db_client = Client::connect(&db_url, NoTls).expect("Database connection failed"); drop_db_tables(&mut db_client, &table_name, &codebook_table_name); setup_db_tables(&mut db_client, &table_name); @@ -128,30 +128,30 @@ fn test_chunked_pq() { column: "v".to_owned(), table: table_name.clone(), schema: "public".to_owned(), - codebook_table_name: Some(codebook_table_name.clone()), + codebook_table_name: None, clusters: 10, splits: 32, subvector_id: None, skip_table_setup: false, - skip_vector_compression: true, + skip_vector_quantization: true, skip_codebook_creation: true, pk: "id".to_owned(), total_task_count: None, parallel_task_count: None, - compression_task_id: None, + quantization_task_id: None, run_on_gcp: false, gcp_cli_image_tag: None, gcp_project: None, gcp_region: None, gcp_image: None, - gcp_compression_task_count: None, - gcp_compression_task_parallelism: None, + gcp_quantization_task_count: None, + gcp_quantization_task_parallelism: None, gcp_clustering_task_parallelism: None, gcp_enable_image_streaming: false, gcp_clustering_cpu: None, gcp_clustering_memory_gb: None, - gcp_compression_cpu: None, - gcp_compression_memory_gb: None, + gcp_quantization_cpu: None, + gcp_quantization_memory_gb: None, }, None, None, @@ -191,30 +191,30 @@ fn test_chunked_pq() { column: "v".to_owned(), table: table_name.clone(), schema: "public".to_owned(), - codebook_table_name: Some(codebook_table_name.clone()), + codebook_table_name: None, clusters: 10, splits: 32, subvector_id: Some(i), skip_table_setup: true, - skip_vector_compression: true, + skip_vector_quantization: true, skip_codebook_creation: false, pk: "id".to_owned(), total_task_count: None, parallel_task_count: Some(1), - compression_task_id: None, + quantization_task_id: None, run_on_gcp: false, gcp_cli_image_tag: None, gcp_project: None, gcp_region: None, gcp_image: None, - gcp_compression_task_count: None, - gcp_compression_task_parallelism: None, + gcp_quantization_task_count: None, + gcp_quantization_task_parallelism: None, gcp_clustering_task_parallelism: None, gcp_enable_image_streaming: false, gcp_clustering_cpu: None, gcp_clustering_memory_gb: None, - gcp_compression_cpu: None, - gcp_compression_memory_gb: None, + gcp_quantization_cpu: None, + gcp_quantization_memory_gb: None, }, None, None, @@ -249,7 +249,7 @@ fn test_chunked_pq() { assert_eq!(cnt, 1000); // ================================================================================== - // ================= Run compression job ================ + // ================= Run quantization job ================ for i in 0..3 { lantern_pq::quantize_table( cli::PQArgs { @@ -257,30 +257,30 @@ fn test_chunked_pq() { column: "v".to_owned(), table: table_name.clone(), schema: "public".to_owned(), - codebook_table_name: Some(codebook_table_name.clone()), + codebook_table_name: None, clusters: 10, splits: 32, subvector_id: None, skip_table_setup: true, - skip_vector_compression: false, + skip_vector_quantization: false, skip_codebook_creation: true, pk: "id".to_owned(), total_task_count: Some(3), parallel_task_count: Some(1), - compression_task_id: Some(i), + quantization_task_id: Some(i), run_on_gcp: false, gcp_cli_image_tag: None, gcp_project: None, gcp_region: None, gcp_image: None, - gcp_compression_task_count: None, - gcp_compression_task_parallelism: None, + gcp_quantization_task_count: None, + gcp_quantization_task_parallelism: None, gcp_clustering_task_parallelism: None, gcp_enable_image_streaming: false, gcp_clustering_cpu: None, gcp_clustering_memory_gb: None, - gcp_compression_cpu: None, - gcp_compression_memory_gb: None, + gcp_quantization_cpu: None, + gcp_quantization_memory_gb: None, }, None, None,