Skip to content

Commit

Permalink
Rename codebook table and params to match lantern pq
Browse files Browse the repository at this point in the history
  • Loading branch information
var77 committed Feb 19, 2024
1 parent 4cc5977 commit 1155b57
Show file tree
Hide file tree
Showing 8 changed files with 169 additions and 143 deletions.
2 changes: 1 addition & 1 deletion ci/scripts/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ function setup_postgres() {
}

function setup_lantern() {
LANTERN_VERSION=v0.1.1
LANTERN_VERSION=main
git clone --recursive https://github.com/lanterndata/lantern.git /tmp/lantern
pushd /tmp/lantern
git checkout ${LANTERN_VERSION} && \
Expand Down
28 changes: 14 additions & 14 deletions lantern_pq/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,15 @@ pub struct PQArgs {
#[arg(long, default_value_t = false)]
pub skip_table_setup: bool,

/// If true vectors will not be compressed and exported to the table
/// If true vectors will not be quantized and exported to the table
#[arg(long, default_value_t = false)]
pub skip_vector_compression: bool,
pub skip_vector_quantization: bool,

/// If true codebook will not be created
#[arg(long, default_value_t = false)]
pub skip_codebook_creation: bool,

/// Primary key of the table, needed for compression job
/// Primary key of the table, needed for quantization job
#[arg(long, default_value = "id")]
pub pk: String,

Expand All @@ -60,9 +60,9 @@ pub struct PQArgs {
#[arg(long)]
pub parallel_task_count: Option<usize>,

/// Task id of currently running compression job (used in gcp batch jobs)
/// Task id of currently running quantization job (used in gcp batch jobs)
#[arg(long)]
pub compression_task_id: Option<usize>,
pub quantization_task_id: Option<usize>,

// GCP ARGS
/// If true job will be submitted to gcp
Expand All @@ -85,16 +85,16 @@ pub struct PQArgs {
#[arg(long)]
pub gcp_image: Option<String>,

/// Task count for compression. default: calculated automatically based on dataset size
/// Task count for quantization. default: calculated automatically based on dataset size
#[arg(long)]
pub gcp_compression_task_count: Option<usize>,
pub gcp_quantization_task_count: Option<usize>,

/// Parallel tasks for compression. default: calculated automatically based on
/// Parallel tasks for quantization. default: calculated automatically based on
/// max connections
#[arg(long)]
pub gcp_compression_task_parallelism: Option<usize>,
pub gcp_quantization_task_parallelism: Option<usize>,

/// Parallel tasks for compression. default: calculated automatically based on
/// Parallel tasks for quantization. default: calculated automatically based on
/// max connections and dataset size
#[arg(long)]
pub gcp_clustering_task_parallelism: Option<usize>,
Expand All @@ -111,11 +111,11 @@ pub struct PQArgs {
#[arg(long)]
pub gcp_clustering_memory_gb: Option<usize>,

/// CPU count for one VM in compression task. default: calculated based on dataset size
/// CPU count for one VM in quantization task. default: calculated based on dataset size
#[arg(long)]
pub gcp_compression_cpu: Option<usize>,
pub gcp_quantization_cpu: Option<usize>,

/// Memory GB for one VM in compression task. default: calculated based on CPU count
/// Memory GB for one VM in quantization task. default: calculated based on CPU count
#[arg(long)]
pub gcp_compression_memory_gb: Option<usize>,
pub gcp_quantization_memory_gb: Option<usize>,
}
2 changes: 1 addition & 1 deletion lantern_pq/src/codebook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ pub fn create_codebook<'a> (
total_fetch_start_time.elapsed().as_secs()
));

// progress indicator is: 5% load, 70% codebook, 15% compression, 10% export
// progress indicator is: 5% load, 70% codebook, 15% quantization, 10% export
report_progress(&progress_cb, &logger, &args.main_progress, 5);

let mut codebooks_hashmap: HashMap<usize, Vec<Vec<f32>>> = HashMap::new();
Expand Down
78 changes: 43 additions & 35 deletions lantern_pq/src/gcp_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ static CLUSTERING_TASK_TEMPLATE: &'static str = r#"{
"entrypoint": "/bin/sh",
"commands": [
"-c",
"/lantern-cli pq-table --uri ${DB_URI} --table ${TABLE} --column ${COLUMN} --clusters ${CLUSTERS} --splits ${SPLITS} --parallel-task-count ${PARALLEL_TASK_COUNT} --subvector-id ${BATCH_TASK_INDEX} --skip-table-setup --skip-vector-compression; exit $?"
"/lantern-cli pq-table --uri ${DB_URI} --table ${TABLE} --column ${COLUMN} --clusters ${CLUSTERS} --splits ${SPLITS} --parallel-task-count ${PARALLEL_TASK_COUNT} --subvector-id ${BATCH_TASK_INDEX} --skip-table-setup --skip-vector-quantization; exit $?"
]
},
"environment": {
Expand All @@ -32,7 +32,7 @@ static CLUSTERING_TASK_TEMPLATE: &'static str = r#"{
"COLUMN": "{column}",
"CLUSTERS": "{cluster_count}",
"SPLITS": "{splits}",
"PARALLEL_TASK_COUNT": "{gcp_compression_task_parallelism}"
"PARALLEL_TASK_COUNT": "{gcp_quantization_task_parallelism}"
}
}
}
Expand All @@ -53,7 +53,7 @@ static CLUSTERING_TASK_TEMPLATE: &'static str = r#"{
}
}"#;

static COMPRESSION_TASK_TEMPLATE: &'static str = r#"{
static QUANTIZATION_TASK_TEMPLATE: &'static str = r#"{
"taskGroups": [{
"taskSpec": {
"runnables": [
Expand All @@ -64,7 +64,7 @@ static COMPRESSION_TASK_TEMPLATE: &'static str = r#"{
"entrypoint": "/bin/sh",
"commands": [
"-c",
"/lantern-cli pq-table --uri ${DB_URI} --table ${TABLE} --column ${COLUMN} --clusters ${CLUSTERS} --splits ${SPLITS} --skip-table-setup --skip-codebook-creation --total-task-count ${COMPRESSION_TASK_COUNT} --parallel-task-count ${PARALLEL_TASK_COUNT} --compression-task-id ${BATCH_TASK_INDEX}; exit $?"
"/lantern-cli pq-table --uri ${DB_URI} --table ${TABLE} --column ${COLUMN} --clusters ${CLUSTERS} --splits ${SPLITS} --skip-table-setup --skip-codebook-creation --total-task-count ${QUANTIZATION_TASK_COUNT} --parallel-task-count ${PARALLEL_TASK_COUNT} --quantization-task-id ${BATCH_TASK_INDEX}; exit $?"
]
},
"environment": {
Expand All @@ -74,8 +74,8 @@ static COMPRESSION_TASK_TEMPLATE: &'static str = r#"{
"COLUMN": "{column}",
"CLUSTERS": "{cluster_count}",
"SPLITS": "{splits}",
"COMPRESSION_TASK_COUNT": "{gcp_compression_task_count}",
"PARALLEL_TASK_COUNT": "{gcp_compression_task_parallelism}"
"QUANTIZATION_TASK_COUNT": "{gcp_quantization_task_count}",
"PARALLEL_TASK_COUNT": "{gcp_quantization_task_parallelism}"
}
}
}
Expand All @@ -87,9 +87,9 @@ static COMPRESSION_TASK_TEMPLATE: &'static str = r#"{
"maxRetryCount": 1,
"maxRunDuration": "2000s"
},
"taskCount": "{gcp_compression_task_count}",
"taskCount": "{gcp_quantization_task_count}",
"taskCountPerNode": 1,
"parallelism": "{gcp_compression_task_parallelism}"
"parallelism": "{gcp_quantization_task_parallelism}"
}],
"logsPolicy": {
"destination": "CLOUD_LOGGING"
Expand Down Expand Up @@ -191,7 +191,7 @@ pub fn quantize_table_on_gcp(
main_progress: AtomicU8,
db_uri: &str,
full_table_name: &str,
codebook_table_name: &str,
full_codebook_table_name: &str,
pq_column_name: &str,
progress_cb: Option<ProgressCbFn>,
logger: &Logger,
Expand Down Expand Up @@ -229,12 +229,12 @@ pub fn quantize_table_on_gcp(

let total_row_count = total_row_count.try_get::<usize, i64>(0)? as usize;

let gcp_compression_cpu_count = args.gcp_compression_cpu.unwrap_or(4);
let gcp_compression_memory_gb = args
.gcp_compression_memory_gb
.unwrap_or((gcp_compression_cpu_count as f64 * 3.75) as usize);
let gcp_quantization_cpu_count = args.gcp_quantization_cpu.unwrap_or(4);
let gcp_quantization_memory_gb = args
.gcp_quantization_memory_gb
.unwrap_or((gcp_quantization_cpu_count as f64 * 3.75) as usize);

let gcp_clustering_cpu_count = args.gcp_compression_cpu.unwrap_or_else(|| {
let gcp_clustering_cpu_count = args.gcp_quantization_cpu.unwrap_or_else(|| {
if total_row_count < 100_000 {
8
} else if total_row_count < 1_000_000 {
Expand All @@ -254,20 +254,24 @@ pub fn quantize_table_on_gcp(
.unwrap_or((gcp_clustering_cpu_count as f64 * 3.75) as usize);

// Let each vm process max 50k rows
let gcp_compression_task_count = args
.gcp_compression_task_count
let gcp_quantization_task_count = args
.gcp_quantization_task_count
.unwrap_or(cmp::max(total_row_count / 50000, 1));

// Limit parallel task count to not exceed max connection limit
let gcp_compression_task_parallelism = args
.gcp_compression_task_parallelism
.unwrap_or(cmp::max(1, max_connections / gcp_compression_task_count));
let gcp_quantization_task_parallelism = args
.gcp_quantization_task_parallelism
.unwrap_or(cmp::max(1, max_connections / gcp_quantization_task_count));

let gcp_compression_task_parallelism =
cmp::min(gcp_compression_task_parallelism, gcp_compression_task_count);
let gcp_quantization_task_parallelism = cmp::min(
gcp_quantization_task_parallelism,
gcp_quantization_task_count,
);

let gcp_compression_task_parallelism =
cmp::min(gcp_compression_task_parallelism, gcp_compression_task_count);
let gcp_quantization_task_parallelism = cmp::min(
gcp_quantization_task_parallelism,
gcp_quantization_task_count,
);

// Limit parallel task count to not exceed max connection limit
let gcp_clustering_task_parallelism = args.gcp_clustering_task_parallelism.unwrap_or(cmp::min(
Expand All @@ -280,15 +284,15 @@ pub fn quantize_table_on_gcp(
crate::setup::setup_tables(
&mut transaction,
&full_table_name,
&codebook_table_name,
&full_codebook_table_name,
&pq_column_name,
&logger,
)?;

crate::setup::setup_triggers(
&mut transaction,
&full_table_name,
&codebook_table_name,
&full_codebook_table_name,
&pq_column_name,
&args.column,
"l2sq",
Expand All @@ -298,6 +302,7 @@ pub fn quantize_table_on_gcp(
// Creating new transaction, because current transaction will lock table reads
// and block the process
transaction.commit()?;
transaction = db_client.transaction()?;
set_and_report_progress(&progress_cb, &logger, &main_progress, 5);
}

Expand Down Expand Up @@ -336,12 +341,16 @@ pub fn quantize_table_on_gcp(
"Clustering duration: {}s",
task_start.elapsed().as_secs()
));
crate::setup::make_codebook_logged_and_readonly(
&mut transaction,
&full_codebook_table_name,
)?;
set_and_report_progress(&progress_cb, &logger, &main_progress, 90);
}

if !args.skip_vector_compression {
if !args.skip_vector_quantization {
let task_start = Instant::now();
let mut body_json: Value = serde_json::from_str(COMPRESSION_TASK_TEMPLATE)?;
let mut body_json: Value = serde_json::from_str(QUANTIZATION_TASK_TEMPLATE)?;
body_json["taskGroups"][0]["taskSpec"]["runnables"][0]["container"]["imageUri"] =
json!(gcp_image);
body_json["taskGroups"][0]["taskSpec"]["runnables"][0]["container"]
Expand All @@ -357,27 +366,26 @@ pub fn quantize_table_on_gcp(
body_json["taskGroups"][0]["taskSpec"]["runnables"][0]["environment"]["variables"]
["SPLITS"] = json!(args.splits.to_string());
body_json["taskGroups"][0]["taskSpec"]["runnables"][0]["environment"]["variables"]
["COMPRESSION_TASK_COUNT"] = json!(gcp_compression_task_count.to_string());
["QUANTIZATION_TASK_COUNT"] = json!(gcp_quantization_task_count.to_string());
body_json["taskGroups"][0]["taskSpec"]["runnables"][0]["environment"]["variables"]
["PARALLEL_TASK_COUNT"] = json!(gcp_compression_task_parallelism.to_string());
["PARALLEL_TASK_COUNT"] = json!(gcp_quantization_task_parallelism.to_string());
body_json["taskGroups"][0]["taskSpec"]["computeResource"]["cpuMilli"] =
json!(gcp_compression_cpu_count * 1000);
json!(gcp_quantization_cpu_count * 1000);
body_json["taskGroups"][0]["taskSpec"]["computeResource"]["memoryMib"] =
json!(gcp_compression_memory_gb * 1000);
body_json["taskGroups"][0]["taskCount"] = json!(gcp_compression_task_count);
body_json["taskGroups"][0]["parallelism"] = json!(gcp_compression_task_parallelism);
json!(gcp_quantization_memory_gb * 1000);
body_json["taskGroups"][0]["taskCount"] = json!(gcp_quantization_task_count);
body_json["taskGroups"][0]["parallelism"] = json!(gcp_quantization_task_parallelism);

run_batch_job(
&logger,
&body_json.to_string(),
&format!("projects/{gcp_project_id}/locations/{gcp_region}"),
)?;
logger.debug(&format!(
"Compression duration: {}s",
"quantization duration: {}s",
task_start.elapsed().as_secs()
));
}

set_and_report_progress(&progress_cb, &logger, &main_progress, 100);
Ok(())
}
Loading

0 comments on commit 1155b57

Please sign in to comment.