Skip to content

Commit

Permalink
Fix naming issues, add --dataset-limit argument
Browse files Browse the repository at this point in the history
  • Loading branch information
var77 committed Feb 20, 2024
1 parent 1155b57 commit 50fdb62
Show file tree
Hide file tree
Showing 5 changed files with 261 additions and 19 deletions.
8 changes: 6 additions & 2 deletions lantern_pq/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,15 @@ pub struct PQArgs {
#[arg(long)]
pub codebook_table_name: Option<String>,

/// Stream data to output table while still generating
/// Dataset limit. Limit should be greater or equal to cluster count
#[arg(long)]
pub dataset_limit: Option<usize>,

/// Cluster count for kmeans
#[arg(long, default_value_t = 256)]
pub clusters: usize,

/// Stream data to output table while still generating
/// Subvector count to split vector
#[arg(long, default_value_t = 1)]
pub splits: usize,

Expand Down
4 changes: 2 additions & 2 deletions lantern_pq/src/codebook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,12 +163,12 @@ pub fn create_codebook<'a> (
let mut client = Client::connect(db_uri, NoTls)?;
let mut transaction = client.transaction()?;
let range_start = i * chunk_count;
let range_end = if i == num_cores - 1 { total_row_count + 1 } else { range_start + chunk_count + 1 };
let range_end = if i == num_cores - 1 { total_row_count } else { range_start + chunk_count };

let fetch_start_time = Instant::now();
let rows = transaction.query(
&format!(
"SELECT {pk}::text, {column}[{start_idx}:{end_idx}] FROM {full_table_name} WHERE {pk} > {range_start} AND {pk} < {range_end} ORDER BY id;",
"SELECT {pk}::text, {column}[{start_idx}:{end_idx}] FROM {full_table_name} WHERE {pk} >= {range_start} AND {pk} < {range_end} ORDER BY id;",
pk = quote_ident(&pk),
column = quote_ident(column),
start_idx = subvector_start_idx + 1,
Expand Down
20 changes: 19 additions & 1 deletion lantern_pq/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type AnyhowVoidResult = Result<(), anyhow::Error>;
pub type ProgressCbFn = Box<dyn Fn(u8) + Send + Sync>;

static CONNECTION_PARAMS: &'static str = "connect_timeout=10";
pub static LANTERN_INTERNAL_SCHEMA_NAME: &'static str = "_lantern_internal";

// This function will increment current progress and report it
fn report_progress(
Expand Down Expand Up @@ -118,6 +119,16 @@ fn quantize_table_local(
}
}

let limit = if let Some(limit) = args.dataset_limit {
limit
} else {
0
};

if limit > 0 && limit < args.clusters {
anyhow::bail!("--dataset-limit should be greater than or equal to cluster count");
}

let total_row_count = transaction.query_one(
&format!(
"SELECT COUNT({pk}) FROM {full_table_name};",
Expand All @@ -128,6 +139,12 @@ fn quantize_table_local(

let total_row_count = total_row_count.try_get::<usize, i64>(0)? as usize;

let total_row_count = if limit > 0 && limit <= total_row_count {
limit
} else {
total_row_count
};

let max_connections = transaction.query_one(
"SELECT setting::int FROM pg_settings WHERE name = 'max_connections'",
&[],
Expand Down Expand Up @@ -270,7 +287,8 @@ pub fn quantize_table(
.codebook_table_name
.clone()
.unwrap_or(format!("_codebook_{}_{}", args.table, args.column));
let full_codebook_table_name = get_full_table_name("_lantern_internal", &codebook_table_name);
let full_codebook_table_name =
get_full_table_name(LANTERN_INTERNAL_SCHEMA_NAME, &codebook_table_name);
let pq_column_name = format!("{}_pq", args.column);
let db_uri = append_params_to_uri(&args.uri, CONNECTION_PARAMS);

Expand Down
8 changes: 4 additions & 4 deletions lantern_pq/src/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use lantern_logger::Logger;
use lantern_utils::quote_ident;
use postgres::Transaction;

use crate::AnyhowVoidResult;
use crate::{AnyhowVoidResult, LANTERN_INTERNAL_SCHEMA_NAME};

// Will create a codebook table add neccessary indexes and add PQVEC column into target table
pub fn setup_tables<'a>(
Expand Down Expand Up @@ -41,7 +41,7 @@ pub fn setup_triggers<'a>(
let name_hash = md5::compute(format!("{}{}", full_table_name, pq_column));
let insert_trigger_name = format!("_pq_trigger_in_{:x}", name_hash);
let update_trigger_name = format!("_pq_trigger_up_{:x}", name_hash);
let trigger_fn_name = format!("_set_pq_col_{:x}", name_hash);
let trigger_fn_name = format!("{LANTERN_INTERNAL_SCHEMA_NAME}._set_pq_col_{:x}", name_hash);

transaction.batch_execute(&format!("
DROP TRIGGER IF EXISTS {insert_trigger_name} ON {full_table_name};
Expand All @@ -55,7 +55,7 @@ pub fn setup_triggers<'a>(
IF NEW.{column} IS NULL THEN
NEW.{pq_column} := NULL;
ELSE
NEW.{pq_column} := _lantern_internal.quantize_vector(NEW.{column}, {splits}, '{full_codebook_table_name}'::regclass, '{distance_metric}');
NEW.{pq_column} := {LANTERN_INTERNAL_SCHEMA_NAME}.quantize_vector(NEW.{column}, {splits}, '{full_codebook_table_name}'::regclass, '{distance_metric}');
END IF;
RETURN NEW;
END
Expand All @@ -73,7 +73,7 @@ pub fn make_codebook_logged_and_readonly<'a>(
) -> AnyhowVoidResult {
transaction.batch_execute(&format!("
ALTER TABLE {full_codebook_table_name} SET LOGGED;
CREATE TRIGGER readonly_guard BEFORE INSERT OR UPDATE OR DELETE ON {full_codebook_table_name} EXECUTE PROCEDURE _lantern_internal.forbid_table_change();
CREATE TRIGGER readonly_guard BEFORE INSERT OR UPDATE OR DELETE ON {full_codebook_table_name} EXECUTE PROCEDURE {LANTERN_INTERNAL_SCHEMA_NAME}.forbid_table_change();
"))?;
Ok(())
}
Loading

0 comments on commit 50fdb62

Please sign in to comment.