Skip to content

Commit

Permalink
Implement channel merging.
Browse files Browse the repository at this point in the history
This is a non-destructive fix for issue #4, but performance is terrible.

Sadly, using `channel = ANY(…)` brings Postgres to its knees because it
stops using its index. Maybe we could try unrolling into an OR list
manually, see if that helps.
  • Loading branch information
zopieux committed Oct 5, 2021
1 parent 19fb25f commit c0f88c6
Show file tree
Hide file tree
Showing 8 changed files with 110 additions and 31 deletions.
34 changes: 22 additions & 12 deletions ircj-serve/src/db.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use itertools::Itertools;
use lazy_static::lazy_static;
use std::{collections::HashSet, str::FromStr};

use crate::{ChannelInfo, Day};
use crate::{ChannelInfo, ChannelRemap, Day};
use ircjournal::{
model::{Message, ServerChannel},
Database,
Expand All @@ -19,14 +20,16 @@ pub(crate) struct Paginated<U> {
pub(crate) page_count: i64,
}

pub(crate) async fn channels(db: &Database) -> Vec<ServerChannel> {
pub(crate) async fn channels(db: &Database, remap: &ChannelRemap) -> Vec<ServerChannel> {
// language=sql
sqlx::query!(r#"SELECT "channel" FROM all_channels()"#)
.fetch_all(db)
.await
.unwrap_or_default()
.iter()
.filter_map(|s| ServerChannel::from_str(s.channel.as_ref().unwrap()).ok())
.map(|sc| remap.canonical(&sc))
.unique()
.collect()
}

Expand All @@ -45,18 +48,19 @@ pub(crate) async fn channel_exists(db: &Database, sc: &ServerChannel) -> bool {
pub(crate) async fn channel_info(
db: &Database,
sc: &ServerChannel,
remap: &ChannelRemap,
before: &Day,
) -> Option<ChannelInfo> {
let channel = sc.to_string();
let channels = remap.aliases_str(sc);
// language=sql
sqlx::query!(r#"
WITH "ts" AS (SELECT min("timestamp") "first!", max("timestamp") "last!" FROM "message" WHERE "channel" = $1)
WITH "ts" AS (SELECT min("timestamp") "first!", max("timestamp") "last!" FROM "message" WHERE "channel" = ANY($1))
SELECT "first!", "last!", array(SELECT "nick" FROM all_nicks($1, $2)) "nicks!",
(SELECT row("message".*) FROM "message"
WHERE "channel" = $1 AND "opcode" = 'topic' AND coalesce("payload", '') != '' AND "timestamp" < $3
WHERE "channel" = ANY($1) AND "opcode" = 'topic' AND coalesce("payload", '') != '' AND "timestamp" < $3
ORDER BY "timestamp" DESC LIMIT 1) "topic?:Message"
FROM "ts" GROUP BY 1, 2, 3 LIMIT 1
"#, &channel, HARD_NICK_LIMIT as i64, before.succ().midnight())
"#, &channels, HARD_NICK_LIMIT as i64, before.succ().midnight())
.fetch_optional(db)
.await
.unwrap()
Expand All @@ -72,18 +76,20 @@ pub(crate) async fn channel_info(
pub(crate) async fn messages_channel_day(
db: &Database,
sc: &ServerChannel,
remap: &ChannelRemap,
day: &Day,
) -> Vec<Message> {
let channels = remap.aliases_str(sc);
// language=sql
sqlx::query_as!(
Message,
r#"
SELECT * FROM "message"
WHERE "channel" = $1 AND "timestamp" >= $2 AND "timestamp" < $3
WHERE "channel" = ANY($1) AND "timestamp" >= $2 AND "timestamp" < $3
ORDER BY "timestamp"
LIMIT $4
"#,
sc.to_string(),
&channels,
day.midnight(),
day.succ().midnight(),
HARD_MESSAGE_LIMIT as i64
Expand All @@ -96,20 +102,22 @@ pub(crate) async fn messages_channel_day(
pub(crate) async fn channel_month_index(
db: &Database,
sc: &ServerChannel,
remap: &ChannelRemap,
year: i32,
month: u32,
) -> HashSet<u32> {
let channels = remap.aliases_str(sc);
let from: Day = chrono::NaiveDate::from_ymd(year, month, 1).into();
let to: Day = chrono::NaiveDate::from_ymd(year + month as i32 / 12, 1 + month % 12, 1).into();
// language=sql
sqlx::query!(
r#"
SELECT DISTINCT EXTRACT(DAY FROM "timestamp") "day!"
FROM "message"
WHERE "channel" = $1 AND ("opcode" IS NULL OR "opcode" = 'me')
WHERE "channel" = ANY($1) AND ("opcode" IS NULL OR "opcode" = 'me')
AND "timestamp" >= $2 AND "timestamp" < $3
"#,
sc.to_string(),
&channels,
from.midnight(),
to.midnight()
)
Expand All @@ -124,10 +132,12 @@ pub(crate) async fn channel_month_index(
pub(crate) async fn channel_search(
db: &Database,
sc: &ServerChannel,
remap: &ChannelRemap,
query: &str,
page: i64,
) -> Paginated<Message> {
// Try to find nick:<something> to build a non-empty nick filter.
let channels = remap.aliases_str(sc);
lazy_static! {
static ref NICK: regex::Regex =
regex::Regex::new(r#"\b(nick:[A-Za-z_0-9|.`\*-]+)"#).unwrap();
Expand Down Expand Up @@ -162,14 +172,14 @@ pub(crate) async fn channel_search(
SELECT row("message".*) "message!:Message",
ts_headline('english', "line", plainto_tsquery('english', $2), U&'StartSel=\E000, StopSel=\E001') "headline!"
FROM "message"
WHERE "channel" || '' = $1
WHERE "channel" || '' = ANY($1)
AND coalesce("opcode", '') = ''
AND CASE WHEN $2 = '' THEN TRUE ELSE to_tsvector('english', "nick" || ' ' || "line") @@ plainto_tsquery('english', $2) END
AND CASE WHEN $5 = '' THEN TRUE ELSE "nick" LIKE $5 END
)
SELECT *, COUNT(*) OVER () "total!"
FROM "query" t LIMIT $3 OFFSET $4
"#, sc.to_string(), &query, per_page, offset, nick_filter)
"#, &channels, &query, per_page, offset, nick_filter)
.fetch_all(db)
.await
.unwrap();
Expand Down
41 changes: 40 additions & 1 deletion ircj-serve/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ extern crate rocket;

use chrono::{Datelike, NaiveDate};
use ircjournal::model::{Datetime, Message, ServerChannel};
use std::{collections::HashSet, str::FromStr};
use std::{
collections::{HashMap, HashSet},
str::FromStr,
};

mod db;
pub mod route;
Expand All @@ -14,6 +17,42 @@ pub mod watch;

pub(crate) type Nicks = HashSet<String>;

#[derive(Clone)]
pub struct ChannelRemap {
forward: HashMap<ServerChannel, ServerChannel>,
reverse: HashMap<ServerChannel, Vec<ServerChannel>>,
}

impl ChannelRemap {
pub fn new(aliases: &HashMap<String, Vec<String>>) -> std::io::Result<Self> {
let mut reverse = HashMap::with_capacity(aliases.len());
let mut forward = HashMap::with_capacity(aliases.len());
for (new, olds) in aliases {
let new = ServerChannel::from_str(new)?;
let mut o = Vec::with_capacity(olds.len());
for old in olds {
let old = ServerChannel::from_str(old)?;
forward.insert(old.clone(), new.clone());
o.push(old.clone());
}
reverse.insert(new.clone(), o);
}
Ok(Self { forward, reverse })
}

pub(crate) fn canonical(&self, sc: &ServerChannel) -> ServerChannel {
self.forward.get(sc).unwrap_or(sc).clone()
}

pub(crate) fn aliases_str(&self, sc: &ServerChannel) -> Vec<String> {
let mut out = vec![sc.to_string()];
if let Some(x) = self.reverse.get(sc) {
x.iter().map(|sc| sc.to_string()).for_each(|s| out.push(s));
}
out
}
}

#[derive(Debug)]
pub struct ChannelInfo {
pub(crate) sc: ServerChannel,
Expand Down
11 changes: 10 additions & 1 deletion ircj-serve/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,20 @@ extern crate rocket;

use figment::providers::Format;
use rocket::fairing::AdHoc;
use std::collections::HashMap;

#[derive(Debug, serde::Deserialize, serde::Serialize)]
pub struct Config {
pub db: String,
pub aliases: HashMap<String, Vec<String>>,
}

impl Default for Config {
fn default() -> Self {
Self { db: "".to_owned() }
Self {
db: "".to_owned(),
aliases: Default::default(),
}
}
}

Expand All @@ -23,8 +28,12 @@ async fn get_rocket() -> rocket::Rocket<rocket::Build> {
.merge(figment::providers::Toml::file("ircj-serve.toml"))
.merge(figment::providers::Env::prefixed("IRCJ_"));

let config = figment.extract::<Config>().unwrap();
let remap = ircj_serve::ChannelRemap::new(&config.aliases).unwrap();

rocket::custom(figment)
.attach(AdHoc::config::<Config>())
.manage(remap)
.attach(AdHoc::on_ignite(
"Connect to database and migrate",
|rocket| async move {
Expand Down
37 changes: 26 additions & 11 deletions ircj-serve/src/route.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,21 @@ use tokio::{
pub use crate::route_static::StaticFiles;
use ircjournal::{model::ServerChannel, Database, MessageEvent};

use crate::{view, Day};
use crate::{view, ChannelRemap, Day};

#[get("/")]
async fn home(db: &State<Database>) -> Option<Markup> {
let channels = crate::db::channels(db).await;
async fn home(db: &State<Database>, remap: &State<ChannelRemap>) -> Option<Markup> {
let channels = crate::db::channels(db, remap).await;
Some(view::home(&channels))
}

#[get("/<sc>")]
async fn channel_redirect(db: &State<Database>, sc: ServerChannel) -> Redirect {
async fn channel_redirect(
db: &State<Database>,
remap: &State<ChannelRemap>,
sc: ServerChannel,
) -> Redirect {
let sc = remap.canonical(&sc);
Redirect::temporary(
if let Some(ts) = ircjournal::db::last_message_ts(db, &sc).await {
uri!(channel(&sc, ts.into()))
Expand All @@ -38,11 +43,13 @@ async fn channel_redirect(db: &State<Database>, sc: ServerChannel) -> Redirect {

#[get("/<sc>/stream")]
async fn channel_stream(
sc: ServerChannel,
db: &State<Database>,
remap: &State<ChannelRemap>,
queue: &State<Sender<MessageEvent>>,
sc: ServerChannel,
mut end: rocket::Shutdown,
) -> Option<EventStream![]> {
let sc = remap.canonical(&sc);
if !crate::db::channel_exists(db, &sc).await {
return None;
}
Expand All @@ -63,12 +70,18 @@ async fn channel_stream(
}

#[get("/<sc>/<day>")]
async fn channel(db: &State<Database>, sc: ServerChannel, day: Day) -> Option<Markup> {
async fn channel(
db: &State<Database>,
remap: &State<ChannelRemap>,
sc: ServerChannel,
day: Day,
) -> Option<Markup> {
let sc = remap.canonical(&sc);
let (messages, info, active_days) = {
tokio::join!(
crate::db::messages_channel_day(db, &sc, &day),
crate::db::channel_info(db, &sc, &day),
crate::db::channel_month_index(db, &sc, day.0.year(), day.0.month()),
crate::db::messages_channel_day(db, &sc, remap, &day),
crate::db::channel_info(db, &sc, remap, &day),
crate::db::channel_month_index(db, &sc, remap, day.0.year(), day.0.month()),
)
};
let truncated = messages.len() == crate::db::HARD_MESSAGE_LIMIT;
Expand All @@ -84,17 +97,19 @@ async fn channel(db: &State<Database>, sc: ServerChannel, day: Day) -> Option<Ma
#[get("/<sc>/search?<query>&<page>")]
async fn channel_search(
db: &State<Database>,
remap: &State<ChannelRemap>,
sc: ServerChannel,
query: &str,
page: Option<u64>,
) -> Option<Markup> {
let sc = remap.canonical(&sc);
let page = page.unwrap_or(1);
let (result_page, info) = {
let query = query.to_string();
let today = Day::today();
tokio::join!(
crate::db::channel_search(db, &sc, &query, page as i64),
crate::db::channel_info(db, &sc, &today),
crate::db::channel_search(db, &sc, remap, &query, page as i64),
crate::db::channel_info(db, &sc, remap, &today),
)
};
let messages: Vec<_> = result_page
Expand Down
6 changes: 5 additions & 1 deletion ircj-serve/src/watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use rocket::fairing::AdHoc;
use std::{str::FromStr, time::Duration};
use tokio::sync::broadcast;

use crate::ChannelRemap;
use ircjournal::{
model::{Message, ServerChannel},
Database, MessageEvent,
Expand All @@ -12,6 +13,7 @@ const AWAKE_LISTEN_INTERVAL: Duration = Duration::from_secs(60);

pub fn broadcast_message_task(
db: Database,
remap: ChannelRemap,
broadcast: broadcast::Sender<MessageEvent>,
mut shutdown: rocket::Shutdown,
) {
Expand All @@ -29,7 +31,8 @@ pub fn broadcast_message_task(
Ok(notification) = listener.recv() => {
if let Ok(message) = serde_json::from_str::<Message>(notification.payload()) {
let sc = ServerChannel::from_str(message.channel.as_ref().unwrap()).unwrap();
let nicks = crate::db::channel_info(&db, &sc, &message.timestamp.into()).await
let sc = remap.canonical(&sc);
let nicks = crate::db::channel_info(&db, &sc, &remap, &message.timestamp.into()).await
.map(|info| info.nicks).unwrap_or_default();
let _ = broadcast.send((sc.clone(), crate::view::formatted_message(&message, &nicks)));
debug!("New message for {:?}, id {}", &sc, message.id);
Expand All @@ -56,6 +59,7 @@ fn watch_fairing() -> AdHoc {
.state::<Database>()
.unwrap() // attached above
.clone(),
rocket.state::<ChannelRemap>().unwrap().clone(),
rocket
.state::<broadcast::Sender<MessageEvent>>()
.unwrap() // attached above
Expand Down
6 changes: 3 additions & 3 deletions ircjournal/migrations/20210915165747_initial.sql
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ CREATE INDEX "channel_ts" ON "message" ("channel", "timestamp");
CREATE INDEX "channel_line_fts" ON "message" USING gin (channel, to_tsvector('english', nick || ' ' || line));

-- https://wiki.postgresql.org/wiki/Loose_indexscan
CREATE OR REPLACE FUNCTION all_nicks(chan text, n numeric)
CREATE OR REPLACE FUNCTION all_nicks(chan text[], n numeric)
RETURNS TABLE
(
nick text
Expand All @@ -29,9 +29,9 @@ $$
WITH RECURSIVE t AS (
SELECT min(nick) AS nick, 1 AS cnt
FROM message
WHERE channel = chan AND 1 <= n
WHERE channel = ANY(chan) AND 1 <= n
UNION ALL
SELECT (SELECT min(nick) FROM message WHERE nick > t.nick AND channel = chan), cnt + 1 AS cnt
SELECT (SELECT min(nick) FROM message WHERE nick > t.nick AND channel = ANY(chan)), cnt + 1 AS cnt
FROM t
WHERE t.nick IS NOT NULL AND cnt < n
)
Expand Down
2 changes: 1 addition & 1 deletion ircjournal/src/db.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use std::time::Duration;
use crate::{
model::{Datetime, NewMessage, ServerChannel},
Database,
};
use std::time::Duration;

pub async fn create_db(uri: &str) -> Result<Database, sqlx::Error> {
// TODO: configurable options.
Expand Down
4 changes: 3 additions & 1 deletion ircjournal/src/model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@ use std::io::ErrorKind;

pub type Datetime = chrono::DateTime<chrono::Utc>;

#[derive(PartialEq, Clone, Debug)]
#[derive(PartialEq, Hash, Clone, Debug)]
pub struct ServerChannel {
pub server: String,
pub channel: String,
}

impl Eq for ServerChannel {}

#[derive(PartialEq, Debug, serde::Deserialize, sqlx::Type)]
pub struct Message {
pub id: i32,
Expand Down

0 comments on commit c0f88c6

Please sign in to comment.