Skip to content

Commit

Permalink
Make scheduler not panic on dataset problems
Browse files Browse the repository at this point in the history
  • Loading branch information
tmcgroul committed Jun 14, 2023
1 parent 7576c2c commit 0ef5d58
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 62 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/router-controller/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ base64 = "0.21.0"
parking_lot = "0.12.1"
rand = "0.8.5"
serde = { version = "1.0.147", features = ["derive"] }
log = "0.4"
6 changes: 4 additions & 2 deletions crates/router-controller/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,9 +254,11 @@ impl Controller {
chunks.last().unwrap()
};
if next_block > c.first_block() {
panic!("Received overlapping chunks: {} and {}", p, c)
log::error!("Received overlapping chunks: {} and {}", p, c);
return false
} else {
panic!("There is a gap between {} and {}", p, c)
log::error!("There is a gap between {} and {}", p, c);
return false
}
}
next_block = c.last_block() + 1
Expand Down
139 changes: 85 additions & 54 deletions crates/router/src/dataset.rs
Original file line number Diff line number Diff line change
@@ -1,65 +1,87 @@
use std::str::FromStr;

use crate::error::Error;
use aws_sdk_s3::Client;
use router_controller::data_chunk::DataChunk;
use tokio::runtime::Handle;

pub trait Storage {
/// Get data chunks in the dataset.
fn get_chunks(&mut self, next_block: u32) -> Result<Vec<DataChunk>, Error>;
fn get_chunks(&mut self, next_block: u32) -> Result<Vec<DataChunk>, String>;
}

fn invalid_object_key(key: &str) -> Error {
Error::InvalidLayoutError(format!("invalid object key - {key}"))
fn invalid_object_key(key: &str) -> String {
format!("invalid object key - {key}")
}

fn extract_last_block(last_key: &str) -> u32 {
let segments = last_key.split('/').collect::<Vec<&str>>();
let range = segments[1].split('-').collect::<Vec<&str>>();
range[1].parse().unwrap()
fn trim_trailing_slash(value: &str) -> String {
let mut value = value.to_string();
value.pop();
value
}

pub struct S3Storage {
client: Client,
bucket: String,
last_key: Option<String>,
}

impl Storage for S3Storage {
fn get_chunks(&mut self, next_block: u32) -> Result<Vec<DataChunk>, Error> {
fn get_chunks(&mut self, next_block: u32) -> Result<Vec<DataChunk>, String> {
let mut objects = vec![];
if let Some(last_key) = &self.last_key {
let last_block = extract_last_block(last_key);
assert_eq!(last_block + 1, next_block);
}

let handle = Handle::current();
let mut builder = self.client.list_objects_v2().bucket(&self.bucket);
if let Some(last_key) = &self.last_key {
builder = builder.start_after(last_key);
}
let output = handle
.block_on(builder.send())
.map_err(|err| Error::ReadDatasetError(Box::new(err)))?;
let prefix = None;
let tops = self.ls(prefix)?;

let mut continuation_token = output.next_continuation_token.clone();
if let Some(contents) = output.contents() {
objects.extend_from_slice(contents);
let mut top: Option<String> = None;
for item in tops.iter().rev() {
let block_num = item.parse::<u32>().map_err(|err| err.to_string())?;
if block_num <= next_block {
top = Some(item.clone());
break;
}
}
while let Some(token) = continuation_token {
let future = self
.client
.list_objects_v2()
.bucket(&self.bucket)
.continuation_token(token)
.send();
let output = handle
.block_on(future)
.map_err(|err| Error::ReadDatasetError(Box::new(err)))?;
continuation_token = output.next_continuation_token.clone();
if let Some(contents) = output.contents() {
objects.extend_from_slice(contents);

if let Some(top) = top {
let prefix = format!("{}/", top);
let top_chunks = self.ls(Some(&prefix))?;

let mut next_chunk: Option<DataChunk> = None;
for chunk in top_chunks {
let chunk = DataChunk::from_str(&chunk)
.map_err(|_| format!("invalid chunk name - {}", &chunk))?;
if chunk.first_block() == next_block {
next_chunk = Some(chunk);
break;
}
}

if let Some(chunk) = next_chunk {
let handle = Handle::current();
let start_after = chunk.to_string();
let builder = self
.client
.list_objects_v2()
.bucket(&self.bucket)
.start_after(start_after);
let output = handle
.block_on(builder.send())
.map_err(|err| err.to_string())?;
let mut continuation_token = output.next_continuation_token.clone();
if let Some(contents) = output.contents() {
objects.extend_from_slice(contents);
}
while let Some(token) = continuation_token {
let future = self
.client
.list_objects_v2()
.bucket(&self.bucket)
.continuation_token(token)
.send();
let output = handle.block_on(future).map_err(|err| err.to_string())?;
continuation_token = output.next_continuation_token.clone();
if let Some(contents) = output.contents() {
objects.extend_from_slice(contents);
}
}
}
}

Expand All @@ -75,28 +97,37 @@ impl Storage for S3Storage {
}
}

if let Some(last_chunk) = chunks.last() {
for object in objects.iter().rev() {
if let Some(key) = object.key() {
let chunk = DataChunk::from_str(key).unwrap();
if &chunk == last_chunk {
self.last_key = Some(key.to_string());
break;
}
}
}
}

Ok(chunks)
}
}

impl S3Storage {
pub fn new(client: Client, bucket: String) -> Self {
S3Storage {
client,
bucket,
last_key: None,
S3Storage { client, bucket }
}

fn ls(&self, prefix: Option<&str>) -> Result<Vec<String>, String> {
let handle = Handle::current();
let mut builder = self
.client
.list_objects_v2()
.bucket(&self.bucket)
.delimiter('/');
if let Some(prefix) = prefix {
builder = builder.prefix(prefix)
}
let output = handle
.block_on(builder.send())
.map_err(|err| err.to_string())?;
let mut items = vec![];
if let Some(prefixes) = output.common_prefixes() {
for prefix in prefixes {
if let Some(value) = prefix.prefix() {
let value = trim_trailing_slash(value);
items.push(value);
}
}
}
Ok(items)
}
}
5 changes: 0 additions & 5 deletions crates/router/src/error.rs

This file was deleted.

1 change: 0 additions & 1 deletion crates/router/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use url::Url;

mod cli;
mod dataset;
mod error;
mod logger;
mod metrics;
mod middleware;
Expand Down

0 comments on commit 0ef5d58

Please sign in to comment.