diff --git a/src/lavinmq/amqp/client.cr b/src/lavinmq/amqp/client.cr index 39b7e5ab34..47f4c47c73 100644 --- a/src/lavinmq/amqp/client.cr +++ b/src/lavinmq/amqp/client.cr @@ -690,10 +690,6 @@ module LavinMQ send_access_refused(frame, "User doesn't have read permissions to exchange '#{frame.exchange_name}'") elsif !@user.can_write?(@vhost.name, frame.queue_name) send_access_refused(frame, "User doesn't have write permissions to queue '#{frame.queue_name}'") - elsif q.is_a?(LavinMQ::MQTT::Session) - send_access_refused(frame, "Not allowed to bind to an MQTT Session") - elsif @vhost.exchanges[frame.exchange_name].is_a?(LavinMQ::MQTT::Exchange) - send_access_refused(frame, "Not allowed to bind to an MQTT Exchange") elsif queue_exclusive_to_other_client?(q) send_resource_locked(frame, "Exclusive queue") else @@ -757,8 +753,8 @@ module LavinMQ send_access_refused(frame, "User doesn't have read permissions to exchange '#{frame.source}'") elsif !@user.can_write?(@vhost.name, frame.destination) send_access_refused(frame, "User doesn't have write permissions to exchange '#{frame.destination}'") - elsif source.is_a?(LavinMQ::MQTT::Exchange) || destination.is_a?(LavinMQ::MQTT::Exchange) - send_access_refused(frame, "Not allowed to bind to an MQTT Exchange") + # elsif source.is_a?(LavinMQ::MQTT::Exchange) || destination.is_a?(LavinMQ::MQTT::Exchange) + # send_access_refused(frame, "Not allowed to bind to an MQTT Exchange") else @vhost.apply(frame) send AMQP::Frame::Exchange::BindOk.new(frame.channel) unless frame.no_wait diff --git a/src/lavinmq/exchange/direct.cr b/src/lavinmq/exchange/direct.cr index af559859f1..4cbc96d8a5 100644 --- a/src/lavinmq/exchange/direct.cr +++ b/src/lavinmq/exchange/direct.cr @@ -27,6 +27,10 @@ module LavinMQ true end + def bind(destination : MQTT::Session, routing_key : String, headers = nil) : Bool + raise LavinMQ::Exchange::AccessRefused.new(self) + end + def unbind(destination : Destination, routing_key, headers = nil) : Bool rk_bindings = @bindings[routing_key] return false unless rk_bindings.delete destination diff --git a/src/lavinmq/exchange/exchange.cr b/src/lavinmq/exchange/exchange.cr index 0a271b0395..1ddb5a0e86 100644 --- a/src/lavinmq/exchange/exchange.cr +++ b/src/lavinmq/exchange/exchange.cr @@ -5,6 +5,7 @@ require "../sortable_json" require "../observable" require "./event" require "../amqp/queue" +require "../mqtt/session" module LavinMQ alias Destination = Queue | Exchange diff --git a/src/lavinmq/exchange/fanout.cr b/src/lavinmq/exchange/fanout.cr index 623b59a2a3..14a3729f61 100644 --- a/src/lavinmq/exchange/fanout.cr +++ b/src/lavinmq/exchange/fanout.cr @@ -23,6 +23,10 @@ module LavinMQ true end + def bind(destination : MQTT::Session, routing_key : String, headers = nil) : Bool + raise LavinMQ::Exchange::AccessRefused.new(self) + end + def unbind(destination : Destination, routing_key, headers = nil) return false unless @bindings.delete destination binding_key = BindingKey.new("") diff --git a/src/lavinmq/exchange/headers.cr b/src/lavinmq/exchange/headers.cr index 98165aa930..bd2e358b58 100644 --- a/src/lavinmq/exchange/headers.cr +++ b/src/lavinmq/exchange/headers.cr @@ -36,6 +36,10 @@ module LavinMQ true end + def bind(destination : MQTT::Session, routing_key : String, headers = nil) : Bool + raise LavinMQ::Exchange::AccessRefused.new(self) + end + def unbind(destination : Destination, routing_key, headers) args = headers ? @arguments.clone.merge!(headers) : @arguments bds = @bindings[args] diff --git a/src/lavinmq/exchange/topic.cr b/src/lavinmq/exchange/topic.cr index 117a782143..ad0f67863c 100644 --- a/src/lavinmq/exchange/topic.cr +++ b/src/lavinmq/exchange/topic.cr @@ -27,6 +27,10 @@ module LavinMQ true end + def bind(destination : MQTT::Session, routing_key : String, headers = nil) : Bool + raise LavinMQ::Exchange::AccessRefused.new(self) + end + def unbind(destination : Destination, routing_key, headers = nil) rks = routing_key.split(".") bds = @bindings[routing_key.split(".")] diff --git a/src/lavinmq/http/controller/bindings.cr b/src/lavinmq/http/controller/bindings.cr index 3893725389..b516e519d6 100644 --- a/src/lavinmq/http/controller/bindings.cr +++ b/src/lavinmq/http/controller/bindings.cr @@ -50,10 +50,6 @@ module LavinMQ user = user(context) if !user.can_read?(vhost, e.name) access_refused(context, "User doesn't have read permissions to exchange '#{e.name}'") - elsif q.is_a?(LavinMQ::MQTT::Session) - access_refused(context, "Not allowed to bind to an MQTT session") - elsif e.is_a?(LavinMQ::MQTT::Exchange) - access_refused(context, "Not allowed to bind to the default MQTT exchange") elsif !user.can_write?(vhost, q.name) access_refused(context, "User doesn't have write permissions to queue '#{q.name}'") elsif e.name.empty? @@ -134,8 +130,6 @@ module LavinMQ user = user(context) if !user.can_read?(vhost, source.name) access_refused(context, "User doesn't have read permissions to exchange '#{source.name}'") - elsif destination.is_a?(LavinMQ::MQTT::Exchange) || source.is_a?(LavinMQ::MQTT::Exchange) - access_refused(context, "Not allowed to bind to the default MQTT exchange") elsif !user.can_write?(vhost, destination.name) access_refused(context, "User doesn't have write permissions to exchange '#{destination.name}'") elsif source.name.empty? || destination.name.empty?