diff --git a/README.md b/README.md index 78cd070..613b86f 100644 --- a/README.md +++ b/README.md @@ -347,3 +347,56 @@ CREATE TABLE "public"."index_parameter_experiment_results" ( build_time DOUBLE PRECISION NULL ); ``` + +## Lantern PQ + +## Description + +Use external product quantization to compress table vectors using kmeans clustering. + +### Usage + +Run `lantern-cli pq-table --help` to show the cli options. + +Job can be run both on local instance and also using GCP batch jobs to parallelize the workload over handreds of VMs to speed up clustering. + +To run locally use: + +```bash +lantern-cli pq-table --uri 'postgres://postgres@127.0.0.1:5432/postgres' --table sift10k --column v --clusters 256 --splits 32 +``` + +The job will be run on current machine utilizing all available cores. + +For big datasets over 1M it is convinient to run the job using GCP batch jobs. +Make sure to have GCP credentials set-up before running this command: + +```bash +lantern-cli pq-table --uri 'postgres://postgres@127.0.0.1:5432/postgres' --table sift10k --column v --clusters 256 --splits 32 --run-on-gcp +``` + +If you prefer to orchestrate task on your own on premise servers you need to do the following 3 steps: + +1. Run setup job. This will create necessary tables and add `pqvec` column on target table + +```bash +lantern-cli pq-table --uri 'postgres://postgres@127.0.0.1:5432/postgres' --table sift10k --column v --clusters 256 --splits 32 --skip-codebook-creation --skip-vector-compression +``` + +2. Run clustering job. This will create codebook for the table and export to postgres table + +```bash +lantern-cli pq-table --uri 'postgres://postgres@127.0.0.1:5432/postgres' --table sift10k --column v --clusters 256 --splits 32 --skip-table-setup --skip-vector-compression --parallel-task-count 10 --subvector-id 0 +``` + +In this case this command should be run 32 times for each subvector in range [0-31] and `--parallel-task-count` means at most we will run 10 tasks in parallel. This is used to not exceed max connection limit on postgres. + +3. Run compression job. This will compress vectors using the generated codebook and export results under `pqvec` column + +```bash +lantern-cli pq-table --uri 'postgres://postgres@127.0.0.1:5432/postgres' --table sift10k --column v --clusters 256 --splits 32 --skip-table-setup --skip-codebook-creation --parallel-task-count 10 --total-task-count 10 --compression-task-id 0 +``` + +In this case this command should be run 10 times for each part of codebook in range [0-9] and `--parallel-task-count` means at most we will run 10 tasks in parallel. This is used to not exceed max connection limit on postgres. + +Table should have primary key, in order for this job to work. If primary key is different than `id` provide it using `--pk` argument diff --git a/lantern_pq/src/cli.rs b/lantern_pq/src/cli.rs index 90bfba6..6b411d2 100644 --- a/lantern_pq/src/cli.rs +++ b/lantern_pq/src/cli.rs @@ -48,14 +48,6 @@ pub struct PQArgs { #[arg(long, default_value_t = false)] pub skip_codebook_creation: bool, - /// If true only codebook table and pq column will be created - #[arg(long, default_value_t = false)] - pub only_setup: bool, - - /// If true we will assume that codebook already exists and only will compress table vectors - #[arg(long, default_value_t = false)] - pub only_compress: bool, - /// Primary key of the table, needed for compression job #[arg(long, default_value = "id")] pub pk: String, diff --git a/lantern_pq/src/gcp_batch.rs b/lantern_pq/src/gcp_batch.rs index 38ec6d1..45ef958 100644 --- a/lantern_pq/src/gcp_batch.rs +++ b/lantern_pq/src/gcp_batch.rs @@ -22,7 +22,7 @@ static CLUSTERING_TASK_TEMPLATE: &'static str = r#"{ "entrypoint": "/bin/sh", "commands": [ "-c", - "/lantern-cli pq-table --uri ${DB_URI} --table ${TABLE} --column ${COLUMN} --clusters ${CLUSTERS} --splits ${SPLITS} --subvector-id ${BATCH_TASK_INDEX} --skip-table-setup --skip-vector-compression; exit $?" + "/lantern-cli pq-table --uri ${DB_URI} --table ${TABLE} --column ${COLUMN} --clusters ${CLUSTERS} --splits ${SPLITS} --parallel-task-count ${PARALLEL_TASK_COUNT} --subvector-id ${BATCH_TASK_INDEX} --skip-table-setup --skip-vector-compression; exit $?" ] }, "environment": { @@ -318,6 +318,8 @@ pub fn quantize_table_on_gcp( ["CLUSTERS"] = json!(args.clusters.to_string()); body_json["taskGroups"][0]["taskSpec"]["runnables"][0]["environment"]["variables"] ["SPLITS"] = json!(args.splits.to_string()); + body_json["taskGroups"][0]["taskSpec"]["runnables"][0]["environment"]["variables"] + ["PARALLEL_TASK_COUNT"] = json!(gcp_clustering_task_parallelism.to_string()); body_json["taskGroups"][0]["taskSpec"]["computeResource"]["cpuMilli"] = json!(gcp_clustering_cpu_count * 1000); body_json["taskGroups"][0]["taskSpec"]["computeResource"]["memoryMib"] = @@ -356,6 +358,8 @@ pub fn quantize_table_on_gcp( ["SPLITS"] = json!(args.splits.to_string()); body_json["taskGroups"][0]["taskSpec"]["runnables"][0]["environment"]["variables"] ["COMPRESSION_TASK_COUNT"] = json!(gcp_compression_task_count.to_string()); + body_json["taskGroups"][0]["taskSpec"]["runnables"][0]["environment"]["variables"] + ["PARALLEL_TASK_COUNT"] = json!(gcp_compression_task_parallelism.to_string()); body_json["taskGroups"][0]["taskSpec"]["computeResource"]["cpuMilli"] = json!(gcp_compression_cpu_count * 1000); body_json["taskGroups"][0]["taskSpec"]["computeResource"]["memoryMib"] = diff --git a/lantern_pq/src/lib.rs b/lantern_pq/src/lib.rs index 6a96d70..52a64cb 100644 --- a/lantern_pq/src/lib.rs +++ b/lantern_pq/src/lib.rs @@ -112,7 +112,7 @@ fn quantize_table_local( transaction = client.transaction()?; // Commit and return if the task is to only set up tables - if args.only_setup { + if args.skip_codebook_creation && args.skip_vector_compression { set_and_report_progress(&progress_cb, &logger, &main_progress, 100); return Ok(()); } @@ -138,7 +138,7 @@ fn quantize_table_local( // As there will be three phases // 1. table setup, 2. codebook craetion 3. table compression 4. trigger setup // 2 and 3 phases will be run in parallel - if args.only_compress { + if args.skip_codebook_creation { drop(transaction); compression::compress_and_write_vectors( CompressAndWriteVectorArgs { diff --git a/lantern_pq/tests/pq_test_with_db.rs b/lantern_pq/tests/pq_test_with_db.rs index ab453cd..82065c5 100644 --- a/lantern_pq/tests/pq_test_with_db.rs +++ b/lantern_pq/tests/pq_test_with_db.rs @@ -61,8 +61,6 @@ fn test_full_pq() { skip_table_setup: false, skip_vector_compression: false, skip_codebook_creation: false, - only_setup: false, - only_compress: false, pk: "id".to_owned(), total_task_count: None, parallel_task_count: None, @@ -137,8 +135,6 @@ fn test_chunked_pq() { skip_table_setup: false, skip_vector_compression: true, skip_codebook_creation: true, - only_setup: true, - only_compress: false, pk: "id".to_owned(), total_task_count: None, parallel_task_count: None, @@ -202,8 +198,6 @@ fn test_chunked_pq() { skip_table_setup: true, skip_vector_compression: true, skip_codebook_creation: false, - only_setup: false, - only_compress: false, pk: "id".to_owned(), total_task_count: None, parallel_task_count: Some(1), @@ -270,8 +264,6 @@ fn test_chunked_pq() { skip_table_setup: true, skip_vector_compression: false, skip_codebook_creation: true, - only_setup: false, - only_compress: true, pk: "id".to_owned(), total_task_count: Some(3), parallel_task_count: Some(1),