Skip to content

Commit

Permalink
Merge common code for indexing
Browse files Browse the repository at this point in the history
  • Loading branch information
var77 committed Mar 19, 2024
1 parent 405bd19 commit f463fdf
Show file tree
Hide file tree
Showing 3 changed files with 277 additions and 169 deletions.
94 changes: 16 additions & 78 deletions lantern_cli/src/external_index/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ use postgres::{Client, NoTls};
use postgres_large_objects::LargeObject;
use postgres_types::FromSql;

use self::utils::check_available_memory_for_index;
use self::utils::{
check_available_memory_for_index, get_and_validate_dimensions, get_codebook,
get_count_estimation_query, get_portal_query,
};

pub mod cli;
mod postgres_large_objects;
Expand Down Expand Up @@ -232,87 +235,31 @@ pub fn create_usearch_index(
&[],
)?;

let rows = transaction.query(&format!("SELECT ARRAY_LENGTH({col}, 1) as dim FROM {full_table_name} WHERE {col} IS NOT NULL LIMIT 1",col=quote_ident(&args.column)), &[])?;

if rows.len() == 0 {
anyhow::bail!("Cannot create an external index on empty table");
}

let row = rows.first().unwrap();
let infered_dimensions = row.try_get::<usize, i32>(0)? as usize;

if args.dims != 0 && infered_dimensions != args.dims {
// I didn't complitely remove the dimensions from args
// To have extra validation when reindexing external index
// This is invariant and should never be a case
anyhow::bail!("Infered dimensions ({infered_dimensions}) does not match with the provided dimensions ({dims})", dims=args.dims);
}

let dimensions = infered_dimensions;
let dimensions =
get_and_validate_dimensions(&full_table_name, &args.column, args.dims, &mut transaction)?;

logger.info(&format!(
"Creating index with parameters dimensions={} m={} ef={} ef_construction={}",
dimensions, args.m, args.ef, args.efc
));

let mut pq_codebook: *const f32 = std::ptr::null();
let mut v: Vec<f32> = vec![];
let v: Vec<f32>;
let mut num_centroids: usize = 0;
let mut num_subvectors: usize = 0;

if args.pq {
let codebook_table_name = format!(
"pq_{table_name}_{column_name}",
table_name = &args.table,
column_name = &args.column
);
let full_codebook_table_name =
get_full_table_name("_lantern_internal", &codebook_table_name);

let rows_codebook_exists = transaction.query("SELECT true FROM information_schema.tables WHERE table_schema='_lantern_internal' AND table_name=$1;", &[&codebook_table_name])?;

if rows_codebook_exists.len() == 0 {
anyhow::bail!("Codebook table {full_codebook_table_name} does not exist");
}

let rows_c = transaction.query(
&format!("SELECT COUNT(*) FROM {full_codebook_table_name} WHERE subvector_id = 0;"),
&[],
)?;
let rows_sv = transaction.query(
&format!("SELECT COUNT(*) FROM {full_codebook_table_name} WHERE centroid_id = 0;"),
&[],
)?;

if rows_c.len() == 0 || rows_sv.len() == 0 {
anyhow::bail!("Invalid codebook table");
}

num_centroids = rows_c.first().unwrap().get::<usize, i64>(0) as usize;
num_subvectors = rows_sv.first().unwrap().get::<usize, i64>(0) as usize;

v.resize(num_centroids * dimensions, 0.);

let rows = transaction.query(
&format!("SELECT subvector_id, centroid_id, c FROM {full_codebook_table_name};",),
&[],
)?;
let (codebook_vector, count_c, count_sv) =
get_codebook(&args.table, &args.column, dimensions, &mut transaction)?;
v = codebook_vector;
num_centroids = count_c;
num_subvectors = count_sv;

logger.info(&format!(
"Codebook has {} rows - {num_centroids} centroids and {num_subvectors} subvectors",
rows.len()
v.len()
));

for r in rows {
let subvector_id: i32 = r.get(0);
let centroid_id: i32 = r.get(1);
let subvector: Vec<f32> = r.get(2);
for i in 0..subvector.len() {
v[centroid_id as usize * dimensions
+ subvector_id as usize * subvector.len()
+ i] = subvector[i];
}
}
pq_codebook = v.as_ptr();
}

Expand Down Expand Up @@ -344,12 +291,10 @@ pub fn create_usearch_index(
let start_time = Instant::now();

let rows = transaction.query(
&format!(
"SELECT COUNT(*) FROM {full_table_name} WHERE {} IS NOT NULL;",
quote_ident(&args.column)
),
&get_count_estimation_query(&full_table_name, &args.column),
&[],
)?;

logger.debug(&format!(
"Count estimation took {}s",
start_time.elapsed().as_secs()
Expand All @@ -369,14 +314,7 @@ pub fn create_usearch_index(

let is_canceled = is_canceled.unwrap_or(Arc::new(RwLock::new(false)));
// With portal we can execute a query and poll values from it in chunks
let portal = transaction.bind(
&format!(
"SELECT ctid, {col} FROM {table} WHERE {col} IS NOT NULL;",
col = quote_ident(&args.column),
table = get_full_table_name(&args.schema, &args.table)
),
&[],
)?;
let portal = transaction.bind(&get_portal_query(&full_table_name, &args.column), &[])?;

loop {
// poll 2000 rows from portal and send it to worker threads via channel
Expand Down
167 changes: 165 additions & 2 deletions lantern_cli/src/external_index/utils.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
use crate::types::AnyhowVoidResult;
use crate::{
types::AnyhowVoidResult,
utils::{get_full_table_name, quote_ident},
};
use postgres::Transaction;
use sysinfo::{System, SystemExt};

pub fn bytes_to_integer_le<T>(bytes: &[u8]) -> T
Expand Down Expand Up @@ -90,7 +94,6 @@ pub fn check_available_memory_for_index(
(sys.total_memory() - sys.used_memory()) + (sys.total_swap() - sys.used_swap());
let total_free_mem = total_free_mem as f64;

println!("Free mem {total_free_mem}, Estimated memory {estimated_memory}");
if total_free_mem < estimated_memory {
let mem_needed = estimated_memory as usize / 1024 / 1024 / 1024;
let mem_avail = total_free_mem as usize / 1024 / 1024 / 1024;
Expand All @@ -99,3 +102,163 @@ pub fn check_available_memory_for_index(

Ok(())
}

pub trait UnifiedClient<'a> {
fn infer_column_dimensions(
&mut self,
full_table_name: &str,
column: &str,
) -> Result<Option<usize>, anyhow::Error>;
fn codebook_exists(&mut self, table_name: &str) -> Result<bool, anyhow::Error>;
fn get_centroid_count(&mut self, full_table_name: &str) -> Result<usize, anyhow::Error>;
fn get_subvector_count(&mut self, full_table_name: &str) -> Result<usize, anyhow::Error>;
fn get_codebook_data(
&mut self,
full_table_name: &str,
) -> Result<Vec<(i32, i32, Vec<f32>)>, anyhow::Error>;
}

pub fn get_infer_dims_query(full_table_name: &str, column: &str) -> String {
format!("SELECT ARRAY_LENGTH({col}, 1) as dim FROM {full_table_name} WHERE {col} IS NOT NULL LIMIT 1",col=quote_ident(column))
}

pub fn get_codebook_exists_query(table_name: &str) -> String {
format!("SELECT true FROM information_schema.tables WHERE table_schema='_lantern_internal' AND table_name='{table_name}'")
}

pub fn get_centroid_count_query(full_table_name: &str) -> String {
format!("SELECT COUNT(*) as cnt FROM {full_table_name} WHERE subvector_id = 0")
}

pub fn get_subvector_count_query(full_table_name: &str) -> String {
format!("SELECT COUNT(*) as cnt FROM {full_table_name} WHERE centroid_id = 0")
}

pub fn get_codebook_data_query(full_table_name: &str) -> String {
format!("SELECT subvector_id, centroid_id, c FROM {full_table_name}")
}

pub fn get_count_estimation_query(full_table_name: &str, column: &str) -> String {
format!(
"SELECT COUNT(*) as cnt FROM {full_table_name} WHERE {} IS NOT NULL",
quote_ident(column)
)
}

pub fn get_portal_query(full_table_name: &str, column: &str) -> String {
format!(
"SELECT ctid, {col} as v FROM {full_table_name} WHERE {col} IS NOT NULL",
col = quote_ident(column)
)
}

impl<'a> UnifiedClient<'a> for Transaction<'a> {
fn infer_column_dimensions(
&mut self,
full_table_name: &str,
column: &str,
) -> Result<Option<usize>, anyhow::Error> {
let rows = self.query(&get_infer_dims_query(full_table_name, column), &[])?;

if rows.len() == 0 {
return Ok(None);
}

Ok(Some(rows.first().unwrap().get::<usize, i32>(0) as usize))
}

fn codebook_exists(&mut self, table_name: &str) -> Result<bool, anyhow::Error> {
let rows_codebook_exists = self.query(&get_codebook_exists_query(table_name), &[])?;

Ok(rows_codebook_exists.len() > 0)
}

fn get_centroid_count(&mut self, full_table_name: &str) -> Result<usize, anyhow::Error> {
let rows = self.query(&get_centroid_count_query(full_table_name), &[])?;
Ok(rows.first().unwrap().get::<usize, i64>(0) as usize)
}

fn get_subvector_count(&mut self, full_table_name: &str) -> Result<usize, anyhow::Error> {
let rows = self.query(&get_subvector_count_query(full_table_name), &[])?;
Ok(rows.first().unwrap().get::<usize, i64>(0) as usize)
}

fn get_codebook_data(
&mut self,
full_table_name: &str,
) -> Result<Vec<(i32, i32, Vec<f32>)>, anyhow::Error> {
let rows = self.query(&get_codebook_data_query(full_table_name), &[])?;

Ok(rows
.iter()
.map(|r| (r.get(0), r.get(1), r.get(2)))
.collect())
}
}

pub fn get_and_validate_dimensions<'a>(
full_table_name: &str,
column: &str,
dimensions: usize,
client: &mut impl UnifiedClient<'a>,
) -> Result<usize, anyhow::Error> {
let infered_dimensions = client.infer_column_dimensions(full_table_name, column)?;

if infered_dimensions.is_none() {
anyhow::bail!("Cannot create an external index on empty table");
}

let infered_dimensions = infered_dimensions.unwrap();

if dimensions != 0 && infered_dimensions != dimensions {
// I didn't complitely remove the dimensions from args
// To have extra validation when reindexing external index
// This is invariant and should never be a case
anyhow::bail!("Infered dimensions ({infered_dimensions}) does not match with the provided dimensions ({dimensions})");
}

if infered_dimensions == 0 {
anyhow::bail!("Column does not have dimensions");
}

Ok(infered_dimensions)
}

pub fn get_codebook<'a>(
table: &str,
column: &str,
dimensions: usize,
client: &mut impl UnifiedClient<'a>,
) -> Result<(Vec<f32>, usize, usize), anyhow::Error> {
let mut v: Vec<f32> = vec![];
let codebook_table_name = format!("pq_{table}_{column}",);
let full_codebook_table_name = get_full_table_name("_lantern_internal", &codebook_table_name);

let codebook_exists = client.codebook_exists(&codebook_table_name)?;
if !codebook_exists {
anyhow::bail!("Codebook table {full_codebook_table_name} does not exist");
}

let num_centroids = client.get_centroid_count(&full_codebook_table_name)?;
let num_subvectors = client.get_subvector_count(&full_codebook_table_name)?;

if num_centroids == 0 || num_subvectors == 0 {
anyhow::bail!("Invalid codebook table");
}

v.resize(num_centroids * dimensions, 0.);

let rows = client.get_codebook_data(&full_codebook_table_name)?;

for r in rows {
let subvector_id: i32 = r.0;
let centroid_id: i32 = r.1;
let subvector: Vec<f32> = r.2;
for i in 0..subvector.len() {
v[centroid_id as usize * dimensions + subvector_id as usize * subvector.len() + i] =
subvector[i];
}
}

Ok((v, num_centroids, num_subvectors))
}
Loading

0 comments on commit f463fdf

Please sign in to comment.