Skip to content

Commit

Permalink
Implement BEFORE/AFTER/BETWEEN
Browse files Browse the repository at this point in the history
  • Loading branch information
progval committed Oct 28, 2024
1 parent b87c630 commit 82d4546
Showing 1 changed file with 93 additions and 7 deletions.
100 changes: 93 additions & 7 deletions sable_history/src/pg_history_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,10 @@ impl<'a> HistoryService for PgHistoryService<'a> {
historic_users::dsl::account_name,
))
.filter(messages::dsl::target_channel.eq(db_channel_id));
match request {
Ok(match request {
HistoryRequest::Latest { to_ts, limit } => {
let limit = i64::min(10000, i64::try_from(limit).unwrap_or(i64::MAX));
Ok(match to_ts {
match to_ts {
Some(to_ts) => {
let to_ts = DateTime::from_timestamp(to_ts, 999_999)
.unwrap_or(DateTime::<Utc>::MIN_UTC)
Expand Down Expand Up @@ -145,13 +145,50 @@ impl<'a> HistoryService for PgHistoryService<'a> {
.expect("could not parse all records")
.into_iter()
.rev() // need to reverse *after* applying the SQL LIMIT
.collect::<Vec<_>>())
.collect::<Vec<_>>()
}
HistoryRequest::Before { from_ts, limit } => {
todo!("before")
let limit = i64::min(10000, i64::try_from(limit).unwrap_or(i64::MAX));
let from_ts = DateTime::from_timestamp(from_ts, 0)
.unwrap_or(DateTime::<Utc>::MAX_UTC)
.naive_utc();
Box::new(
base_query
.filter(messages::dsl::timestamp.lt(from_ts))
// total order, consistent across requests
.order((messages::dsl::timestamp.desc(), messages::dsl::id.desc()))
.limit(limit)
.load_stream(&mut *connection_lock),
)
.await
.expect("could not query messages")
.map_ok(|row| make_historical_event(&channel, row))
.try_collect::<Vec<_>>()
.await
.expect("could not parse all records")
.into_iter()
.rev() // need to reverse *after* applying the SQL LIMIT
.collect::<Vec<_>>()
}
HistoryRequest::After { start_ts, limit } => {
todo!("after")
let limit = i64::min(10000, i64::try_from(limit).unwrap_or(i64::MAX));
let start_ts = DateTime::from_timestamp(start_ts, 999_999)
.unwrap_or(DateTime::<Utc>::MIN_UTC)
.naive_utc();
Box::new(
base_query
.filter(messages::dsl::timestamp.gt(start_ts))
// total order, consistent across requests
.order((messages::dsl::timestamp, messages::dsl::id))
.limit(limit)
.load_stream(&mut *connection_lock),
)
.await
.expect("could not query messages")
.map_ok(|row| make_historical_event(&channel, row))
.try_collect::<Vec<_>>()
.await
.expect("could not parse all records")
}
HistoryRequest::Around { around_ts, limit } => {

Check warning on line 193 in sable_history/src/pg_history_service.rs

View workflow job for this annotation

GitHub Actions / Test (nightly)

unused variable: `around_ts`

Check warning on line 193 in sable_history/src/pg_history_service.rs

View workflow job for this annotation

GitHub Actions / Test (nightly)

unused variable: `limit`

Check warning on line 193 in sable_history/src/pg_history_service.rs

View workflow job for this annotation

GitHub Actions / Test (nightly)

unused variable: `around_ts`

Check warning on line 193 in sable_history/src/pg_history_service.rs

View workflow job for this annotation

GitHub Actions / Test (nightly)

unused variable: `limit`
todo!("around")
Expand All @@ -161,9 +198,58 @@ impl<'a> HistoryService for PgHistoryService<'a> {
end_ts,
limit,
} => {
todo!("between")
if start_ts <= end_ts {
let start_ts = DateTime::from_timestamp(start_ts, 999_999)
.unwrap_or(DateTime::<Utc>::MIN_UTC)
.naive_utc();
let end_ts = DateTime::from_timestamp(end_ts, 0)
.unwrap_or(DateTime::<Utc>::MAX_UTC)
.naive_utc();
let limit = i64::min(10000, i64::try_from(limit).unwrap_or(i64::MAX));
Box::new(
base_query
.filter(messages::dsl::timestamp.gt(start_ts))
.filter(messages::dsl::timestamp.lt(end_ts))
// total order, consistent across requests
.order((messages::dsl::timestamp, messages::dsl::id))
.limit(limit)
.load_stream(&mut *connection_lock),
)
.await
.expect("could not query messages")
.map_ok(|row| make_historical_event(&channel, row))
.try_collect::<Vec<_>>()
.await
.expect("could not parse all records")
} else {
let start_ts = DateTime::from_timestamp(start_ts, 0)
.unwrap_or(DateTime::<Utc>::MAX_UTC)
.naive_utc();
let end_ts = DateTime::from_timestamp(end_ts, 999_999)
.unwrap_or(DateTime::<Utc>::MIN_UTC)
.naive_utc();
let limit = i64::min(10000, i64::try_from(limit).unwrap_or(i64::MAX));
Box::new(
base_query
.filter(messages::dsl::timestamp.gt(end_ts))
.filter(messages::dsl::timestamp.lt(start_ts))
// total order, consistent across requests
.order((messages::dsl::timestamp.desc(), messages::dsl::id.desc()))
.limit(limit)
.load_stream(&mut *connection_lock),
)
.await
.expect("could not query messages")
.map_ok(|row| make_historical_event(&channel, row))
.try_collect::<Vec<_>>()
.await
.expect("could not parse all records")
.into_iter()
.rev() // need to reverse *after* applying the SQL LIMIT
.collect::<Vec<_>>()
}
}
}
})
}
}

Expand Down

0 comments on commit 82d4546

Please sign in to comment.