Skip to content

Commit

Permalink
fix: event_queue fixes (#282)
Browse files Browse the repository at this point in the history
* fix: pg notify for async events

* feat: handle conflicts when inserting into event_queue

* do not insert unknown component status updates to event_queue
  • Loading branch information
adityathebe authored Sep 19, 2023
1 parent 2fc433a commit 75277f7
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 49 deletions.
101 changes: 61 additions & 40 deletions views/007_events.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ CREATE OR REPLACE FUNCTION insert_playbook_spec_approval_in_event_queue()
RETURNS TRIGGER AS $$
BEGIN
IF OLD.spec->'approval' != NEW.spec->'approval' THEN
INSERT INTO event_queue(name, properties) VALUES ('playbook.spec.approval.updated', jsonb_build_object('id', NEW.id));
NOTIFY event_queue_updates, 'playbook.spec.approval.updated';
INSERT INTO event_queue(name, properties) VALUES ('playbook.spec.approval.updated', jsonb_build_object('id', NEW.id))
ON CONFLICT (name, properties) DO UPDATE SET created_at = NOW(), last_attempt = NULL, attempts = 0;
END IF;

RETURN NULL;
Expand All @@ -20,8 +20,8 @@ EXECUTE PROCEDURE insert_playbook_spec_approval_in_event_queue();
CREATE OR REPLACE FUNCTION insert_new_playbook_approvals_to_event_queue()
RETURNS TRIGGER AS $$
BEGIN
INSERT INTO event_queue(name, properties) VALUES ('playbook.approval.inserted', jsonb_build_object('id', NEW.id, 'run_id', NEW.run_id));
NOTIFY event_queue_updates, 'playbook.approval.inserted';
INSERT INTO event_queue(name, properties) VALUES ('playbook.approval.inserted', jsonb_build_object('id', NEW.id, 'run_id', NEW.run_id))
ON CONFLICT (name, properties) DO UPDATE SET created_at = NOW(), last_attempt = NULL, attempts = 0;
RETURN NULL;
END;
$$ LANGUAGE plpgsql;
Expand All @@ -34,8 +34,8 @@ EXECUTE PROCEDURE insert_new_playbook_approvals_to_event_queue();
-- Insert incident created in event_queue
CREATE OR REPLACE FUNCTION insert_incident_creation_in_event_queue() RETURNS TRIGGER AS $$
BEGIN
INSERT INTO event_queue(name, properties) VALUES ('incident.created', jsonb_build_object('id', NEW.id));
NOTIFY event_queue_updates, 'incident.created';
INSERT INTO event_queue(name, properties) VALUES ('incident.created', jsonb_build_object('id', NEW.id))
ON CONFLICT (name, properties) DO UPDATE SET created_at = NOW(), last_attempt = NULL, attempts = 0;
RETURN NULL;
END
$$ LANGUAGE plpgsql;
Expand All @@ -54,8 +54,8 @@ BEGIN
END IF;

event_name := 'incident.status.' || NEW.status;
INSERT INTO event_queue(name, properties) VALUES (event_name, jsonb_build_object('id', NEW.id));
PERFORM pg_notify('event_queue_updates', event_name);
INSERT INTO event_queue(name, properties) VALUES (event_name, jsonb_build_object('id', NEW.id))
ON CONFLICT (name, properties) DO UPDATE SET created_at = NOW(), last_attempt = NULL, attempts = 0;
RETURN NULL;
END
$$ LANGUAGE plpgsql;
Expand All @@ -69,12 +69,12 @@ EXECUTE PROCEDURE insert_incident_updates_in_event_queue();
CREATE OR REPLACE FUNCTION insert_responder_in_event_queue() RETURNS TRIGGER AS $$
BEGIN
IF TG_OP = 'INSERT' THEN
INSERT INTO event_queue(name, properties) VALUES ('incident.responder.added', jsonb_build_object('id', NEW.id));
NOTIFY event_queue_updates, 'incident.responder.added';
INSERT INTO event_queue(name, properties) VALUES ('incident.responder.added', jsonb_build_object('id', NEW.id))
ON CONFLICT (name, properties) DO UPDATE SET created_at = NOW(), last_attempt = NULL, attempts = 0;
ELSIF TG_OP = 'UPDATE' THEN
IF OLD.deleted_at IS NULL AND NEW.deleted_at IS NOT NULL THEN
INSERT INTO event_queue(name, properties) VALUES ('incident.responder.removed', jsonb_build_object('id', NEW.id));
NOTIFY event_queue_updates, 'incident.responder.removed';
INSERT INTO event_queue(name, properties) VALUES ('incident.responder.removed', jsonb_build_object('id', NEW.id))
ON CONFLICT (name, properties) DO UPDATE SET created_at = NOW(), last_attempt = NULL, attempts = 0;
END IF;
END IF;

Expand All @@ -90,8 +90,8 @@ EXECUTE PROCEDURE insert_responder_in_event_queue();
-- Insert incident comment creation in event_queue
CREATE OR REPLACE FUNCTION insert_comment_in_event_queue () RETURNS TRIGGER AS $$
BEGIN
INSERT INTO event_queue(name, properties) VALUES ('incident.comment.added', jsonb_build_object('id', NEW.id));
NOTIFY event_queue_updates, 'incident.comment.added';
INSERT INTO event_queue(name, properties) VALUES ('incident.comment.added', jsonb_build_object('id', NEW.id))
ON CONFLICT (name, properties) DO UPDATE SET created_at = NOW(), last_attempt = NULL, attempts = 0;
RETURN NULL;
END
$$ LANGUAGE plpgsql;
Expand All @@ -110,21 +110,21 @@ BEGIN

IF OLD.definition_of_done != NEW.definition_of_done THEN
IF NEW.definition_of_done THEN
INSERT INTO event_queue(name, properties) VALUES ('incident.dod.added', jsonb_build_object('id', NEW.id));
NOTIFY event_queue_updates, 'incident.dod.added';
INSERT INTO event_queue(name, properties) VALUES ('incident.dod.added', jsonb_build_object('id', NEW.id))
ON CONFLICT (name, properties) DO UPDATE SET created_at = NOW(), last_attempt = NULL, attempts = 0;
ELSE
INSERT INTO event_queue(name, properties) VALUES ('incident.dod.removed', jsonb_build_object('id', NEW.id));
NOTIFY event_queue_updates, 'incident.dod.removed';
INSERT INTO event_queue(name, properties) VALUES ('incident.dod.removed', jsonb_build_object('id', NEW.id))
ON CONFLICT (name, properties) DO UPDATE SET created_at = NOW(), last_attempt = NULL, attempts = 0;
END IF;
END IF;

IF OLD.done != NEW.done THEN
IF NEW.done THEN
INSERT INTO event_queue(name, properties) VALUES ('incident.dod.passed', jsonb_build_object('id', NEW.id));
NOTIFY event_queue_updates, 'incident.dod.passed';
INSERT INTO event_queue(name, properties) VALUES ('incident.dod.passed', jsonb_build_object('id', NEW.id))
ON CONFLICT (name, properties) DO UPDATE SET created_at = NOW(), last_attempt = NULL, attempts = 0;
ELSE
INSERT INTO event_queue(name, properties) VALUES ('incident.dod.regressed', jsonb_build_object('id', NEW.id));
NOTIFY event_queue_updates, 'incident.dod.regressed';
INSERT INTO event_queue(name, properties) VALUES ('incident.dod.regressed', jsonb_build_object('id', NEW.id))
ON CONFLICT (name, properties) DO UPDATE SET created_at = NOW(), last_attempt = NULL, attempts = 0;
END IF;
END IF;

Expand All @@ -145,11 +145,11 @@ BEGIN
END IF;

IF NEW.status = 'healthy' THEN
INSERT INTO event_queue(name, properties) VALUES ('check.passed', jsonb_build_object('id', NEW.id)) ON CONFLICT (name, properties) DO NOTHING;
NOTIFY event_queue_updates, 'check.passed';
INSERT INTO event_queue(name, properties) VALUES ('check.passed', jsonb_build_object('id', NEW.id))
ON CONFLICT (name, properties) DO UPDATE SET created_at = NOW(), last_attempt = NULL, attempts = 0;
ELSEIF NEW.status = 'unhealthy' THEN
INSERT INTO event_queue(name, properties) VALUES ('check.failed', jsonb_build_object('id', NEW.id)) ON CONFLICT (name, properties) DO NOTHING;
NOTIFY event_queue_updates, 'check.failed';
INSERT INTO event_queue(name, properties) VALUES ('check.failed', jsonb_build_object('id', NEW.id))
ON CONFLICT (name, properties) DO UPDATE SET created_at = NOW(), last_attempt = NULL, attempts = 0;
END IF;

RETURN NULL;
Expand All @@ -169,10 +169,15 @@ BEGIN
RETURN NULL;
END IF;

IF NEW.status = 'unknown' THEN
RETURN NULL;
END If;

event_name := 'component.status.' || NEW.status;
INSERT INTO event_queue(name, properties) VALUES (event_name, jsonb_build_object('id', NEW.id));

PERFORM pg_notify('event_queue_updates', event_name);
INSERT INTO event_queue (name, properties) VALUES (event_name, jsonb_build_object('id', NEW.id))
ON CONFLICT (name, properties) DO UPDATE SET created_at = NOW(), last_attempt = NULL, attempts = 0;

RETURN NULL;
END
$$ LANGUAGE plpgsql;
Expand Down Expand Up @@ -218,14 +223,14 @@ CREATE
OR REPLACE FUNCTION insert_team_in_event_queue () RETURNS TRIGGER AS $$
BEGIN
IF TG_OP = 'DELETE' THEN
INSERT INTO event_queue(name, properties) VALUES ('team.delete', jsonb_build_object('team_id', OLD.id));
NOTIFY event_queue_updates, 'team.delete';
RETURN OLD;
INSERT INTO event_queue(name, properties) VALUES ('team.delete', jsonb_build_object('team_id', OLD.id))
ON CONFLICT (name, properties) DO UPDATE SET created_at = NOW(), last_attempt = NULL, attempts = 0;
ELSE
INSERT INTO event_queue(name, properties) VALUES ('team.update', jsonb_build_object('team_id', NEW.id));
NOTIFY event_queue_updates, 'team.update';
RETURN NEW;
INSERT INTO event_queue(name, properties) VALUES ('team.update', jsonb_build_object('team_id', NEW.id))
ON CONFLICT (name, properties) DO UPDATE SET created_at = NOW(), last_attempt = NULL, attempts = 0;
END IF;

RETURN NULL;
END
$$ LANGUAGE plpgsql;

Expand All @@ -239,18 +244,34 @@ CREATE OR REPLACE FUNCTION notifications_trigger_function()
RETURNS TRIGGER AS $$
BEGIN
IF TG_OP = 'DELETE' THEN
INSERT INTO event_queue(name, properties) VALUES ('notification.delete', jsonb_build_object('id', OLD.id));
NOTIFY event_queue_updates, 'notification.delete';
RETURN OLD;
INSERT INTO event_queue(name, properties) VALUES ('notification.delete', jsonb_build_object('id', OLD.id))
ON CONFLICT (name, properties) DO UPDATE SET created_at = NOW(), last_attempt = NULL, attempts = 0;
ELSE
INSERT INTO event_queue(name, properties) VALUES ('notification.update', jsonb_build_object('id', NEW.id));
NOTIFY event_queue_updates, 'notification.update';
RETURN NEW;
INSERT INTO event_queue(name, properties) VALUES ('notification.update', jsonb_build_object('id', NEW.id))
ON CONFLICT (name, properties) DO UPDATE SET created_at = NOW(), last_attempt = NULL, attempts = 0;
END IF;

RETURN NULL;
END
$$ LANGUAGE plpgsql;

CREATE OR REPLACE TRIGGER notification_update_enqueue
AFTER INSERT OR UPDATE OR DELETE ON notifications
FOR EACH ROW
EXECUTE PROCEDURE notifications_trigger_function ();

-- Publish Notify on new events
CREATE OR REPLACE FUNCTION notify_new_events_function()
RETURNS TRIGGER AS $$
BEGIN
IF TG_OP = 'INSERT' THEN
PERFORM pg_notify('event_queue_updates', NEW.name);
RETURN NULL;
END IF;
END
$$ LANGUAGE plpgsql;

CREATE OR REPLACE TRIGGER notify_new_events
AFTER INSERT ON event_queue
FOR EACH ROW
EXECUTE PROCEDURE notify_new_events_function();
13 changes: 4 additions & 9 deletions views/012_changelog.sql
Original file line number Diff line number Diff line change
Expand Up @@ -68,16 +68,11 @@ BEGIN
payload = jsonb_build_object('id', rec.id);
END CASE;

-- Log changes to event queue
priority = (priority_table->>TG_TABLE_NAME)::integer;
INSERT INTO
event_queue (name, properties, priority)
VALUES
('push_queue.create', jsonb_build_object('table', TG_TABLE_NAME) || payload, priority)
ON CONFLICT
(name, properties)
DO UPDATE SET
attempts = 0;
NOTIFY event_queue_updates, 'push_queue.create';
INSERT INTO event_queue (name, properties, priority) VALUES ('push_queue.create', jsonb_build_object('table', TG_TABLE_NAME) || payload, priority)
ON CONFLICT (name, properties) DO UPDATE SET created_at = NOW(), last_attempt = NULL, attempts = 0;

RETURN NULL;
END;
$$ LANGUAGE 'plpgsql' SECURITY DEFINER;
Expand Down

0 comments on commit 75277f7

Please sign in to comment.