Skip to content

Commit

Permalink
usearch new interface (#70)
Browse files Browse the repository at this point in the history
* Changes to support new usearch interface

* Change usearch upstream

* Remove unnecessary var

* Print info about available models when model is not found

* Update versions to build latest HEAD

* Fix version name typo

* Lantern external index: use SHARE lock instead of ACCESS EXCLUSIVE

* Update versions

* Change usearch branch

---------

Co-authored-by: Narek Galstyan <[email protected]>
  • Loading branch information
var77 and Ngalstyan4 authored Feb 7, 2024
1 parent c8adb7f commit 326a244
Show file tree
Hide file tree
Showing 6 changed files with 23 additions and 30 deletions.
4 changes: 2 additions & 2 deletions ci/scripts/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,10 @@ function setup_postgres() {
}

function setup_lantern() {
LANTERN_VERSION=0.0.11
LANTERN_VERSION=v0.1.0
git clone --recursive https://github.com/lanterndata/lantern.git /tmp/lantern
pushd /tmp/lantern
git checkout v${LANTERN_VERSION} && \
git checkout ${LANTERN_VERSION} && \
git submodule update --recursive && \
mkdir build
pushd build
Expand Down
6 changes: 5 additions & 1 deletion lantern_embeddings_core/src/ort_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -607,7 +607,11 @@ impl<'a> OrtRuntime<'a> {
let model_info = map.get(model_name);

if model_info.is_none() {
anyhow::bail!("Model \"{}\" not found", model_name)
anyhow::bail!(
"Model \"{}\" not found.\nAvailable models: {}",
model_name,
map.keys().join(", ")
)
}

let model_info = model_info.unwrap();
Expand Down
4 changes: 2 additions & 2 deletions lantern_external_index/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "lantern_external_index"
version = "0.0.5"
version = "0.1.0"
edition = "2021"

[lib]
Expand All @@ -13,7 +13,7 @@ clap = { version = "4.4.0", features = ["derive"] }
cxx = "1.0.106"
postgres = "0.19.7"
postgres-types = { version = "0.2.6", features = ["derive"] }
usearch = { git = "https://github.com/Ngalstyan4/usearch.git", branch = "pg-rebase" }
usearch = { git = "https://github.com/Ngalstyan4/usearch.git", branch = "main-lantern" }
lantern_logger = { path = "../lantern_logger" }
lantern_utils = { path = "../lantern_utils" }
rand = "0.8.5"
2 changes: 1 addition & 1 deletion lantern_external_index/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ impl UMetricKind {
pub fn value(&self) -> MetricKind {
match self {
UMetricKind::L2sq => {
return MetricKind::L2Sq;
return MetricKind::L2sq;
}
UMetricKind::Cos => {
return MetricKind::Cos;
Expand Down
35 changes: 12 additions & 23 deletions lantern_external_index/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ use std::sync::mpsc::{Receiver, Sender, SyncSender};
use std::sync::{mpsc, RwLock};
use std::sync::{Arc, Mutex};
use std::{fs, io};
use usearch::ffi::{IndexOptions, ScalarKind};
use usearch::Index;

use cxx::UniquePtr;
use lantern_logger::{LogLevel, Logger};
use lantern_utils::{get_full_table_name, quote_ident};
use postgres::{Client, NoTls, Row};
use postgres_large_objects::LargeObject;
use postgres_types::FromSql;
use usearch::ffi::*;

mod postgres_large_objects;
pub mod utils;
Expand Down Expand Up @@ -63,33 +63,22 @@ impl<'a> FromSql<'a> for Tid {
}
}

fn index_chunk(
rows: Vec<Row>,
thread_n: usize,
index: Arc<ThreadSafeIndex>,
logger: Arc<Logger>,
) -> AnyhowVoidResult {
let row_count = rows.len();

fn index_chunk(rows: Vec<Row>, index: Arc<ThreadSafeIndex>) -> AnyhowVoidResult {
for row in rows {
let ctid: Tid = row.get(0);
let vec: Vec<f32> = row.get(1);
index.add_in_thread(ctid.label, &vec, thread_n)?;
index.add(ctid.label, &vec)?;
}
logger.debug(&format!(
"{} items added to index from thread {}",
row_count, thread_n
));
Ok(())
}

struct ThreadSafeIndex {
inner: UniquePtr<usearch::ffi::Index>,
inner: Index,
}

impl ThreadSafeIndex {
fn add_in_thread(&self, label: u64, data: &Vec<f32>, thread: usize) -> AnyhowVoidResult {
self.inner.add_in_thread(label, data, thread)?;
fn add(&self, label: u64, data: &[f32]) -> AnyhowVoidResult {
self.inner.add(label, data)?;
Ok(())
}
fn save(&self, path: &str) -> AnyhowVoidResult {
Expand Down Expand Up @@ -126,7 +115,7 @@ pub fn create_usearch_index(

transaction.execute("SET lock_timeout='5s'", &[])?;
transaction.execute(
&format!("LOCK TABLE ONLY {full_table_name} IN ACCESS EXCLUSIVE MODE"),
&format!("LOCK TABLE ONLY {full_table_name} IN SHARE MODE"),
&[],
)?;

Expand Down Expand Up @@ -157,11 +146,12 @@ pub fn create_usearch_index(
dimensions,
metric: args.metric_kind.value(),
quantization: ScalarKind::F32,
multi: false,
connectivity: args.m,
expansion_add: args.efc,
expansion_search: args.ef,
};
let index = new_index(&options)?;
let index = Index::new(&options)?;

let rows = transaction.query(
&format!(
Expand Down Expand Up @@ -206,10 +196,9 @@ pub fn create_usearch_index(
});

let processed_cnt = Arc::new(AtomicU64::new(0));
for n in 0..num_cores {
for _ in 0..num_cores {
// spawn thread
let index_ref = index_arc.clone();
let logger_ref = logger.clone();
let receiver = rx_arc.clone();
let is_canceled = is_canceled.clone();
let progress_tx = progress_tx.clone();
Expand Down Expand Up @@ -241,7 +230,7 @@ pub fn create_usearch_index(

let rows = rows.unwrap();
let rows_cnt = rows.len();
index_chunk(rows, n, index_ref.clone(), logger_ref.clone())?;
index_chunk(rows, index_ref.clone())?;
let all_count = processed_cnt.fetch_add(rows_cnt as u64, Ordering::SeqCst);
let mut progress = (all_count as f64 / count as f64 * 100.0) as u8;
if should_create_index {
Expand Down
2 changes: 1 addition & 1 deletion lantern_extras/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "lantern_extras"
version = "0.0.12"
version = "0.1.0"
edition = "2021"

[lib]
Expand Down

0 comments on commit 326a244

Please sign in to comment.