Skip to content

Commit

Permalink
response -1 for empty archives
Browse files Browse the repository at this point in the history
  • Loading branch information
tmcgroul committed Oct 23, 2023
1 parent 9b9c4f6 commit 73e9db8
Showing 1 changed file with 20 additions and 8 deletions.
28 changes: 20 additions & 8 deletions crates/router-controller/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::collections::{HashMap, HashSet};
use std::fmt::Debug;
use std::ops::{Deref, DerefMut};
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::atomic::{AtomicI64, Ordering};
use std::time::{Duration, SystemTime};
use base64::Engine;
use base64::engine::general_purpose::URL_SAFE_NO_PAD;
Expand All @@ -20,6 +20,10 @@ pub type Dataset = Url;
pub type WorkerState = HashMap<Dataset, RangeSet>;


const INITIAL_VALUE: i64 = -2;
const EMPTY_VALUE: i64 = -1;


#[derive(Clone)]
struct Worker {
id: WorkerId,
Expand Down Expand Up @@ -61,7 +65,7 @@ struct Schedule {
pub struct Controller {
schedule: parking_lot::Mutex<Schedule>,
workers: Atom<Vec<Worker>>,
datasets_height: HashMap<String, AtomicU32>,
datasets_height: HashMap<String, AtomicI64>,
managed_datasets: HashMap<String, Dataset>,
managed_workers: HashSet<WorkerId>,
data_replication: usize,
Expand Down Expand Up @@ -129,15 +133,18 @@ impl Controller {
}.map(|url| Self::format_worker_url(url, dataset))
}

pub fn get_height(&self, dataset_name: &str) -> Option<u32> {
pub fn get_height(&self, dataset_name: &str) -> Option<i32> {
let dataset = match self.managed_datasets.get(dataset_name) {
Some(ds) => ds,
None => return None
};

self.datasets_height
match self.datasets_height
.get(dataset)
.map(|height| height.load(Ordering::Relaxed))
.map(|height| height.load(Ordering::Relaxed)) {
Some(INITIAL_VALUE) | None => None,
Some(value) => value.try_into().ok(),
}
}

fn format_worker_url(base: &Url, dataset: &Dataset) -> String {
Expand Down Expand Up @@ -199,10 +206,15 @@ impl Controller {
if Self::import_new_chunks(chunks, |next_block| {
f(dataset, next_block)
}) {
let height = self.datasets_height.get(dataset).unwrap();
if let Some(chunk) = chunks.last() {
let height = self.datasets_height.get(dataset).unwrap();
let last_block = chunk.last_block();
height.store(last_block, Ordering::Relaxed);
height.store(last_block.into(), Ordering::Relaxed);
} else {
let value = height.load(Ordering::Relaxed);
if value == INITIAL_VALUE {
height.store(EMPTY_VALUE, Ordering::Relaxed);
}
}

let plan = self.schedule_dataset(
Expand Down Expand Up @@ -471,7 +483,7 @@ impl ControllerBuilder {
assignment: HashMap::new()
}),
datasets_height: self.managed_datasets.values()
.map(|name| (name.clone(), AtomicU32::new(0)))
.map(|name| (name.clone(), AtomicI64::new(INITIAL_VALUE)))
.collect(),
workers: Atom::new(Arc::new(Vec::new())),
managed_datasets: self.managed_datasets.clone(),
Expand Down

0 comments on commit 73e9db8

Please sign in to comment.