diff --git a/big_tests/tests/graphql_sse_SUITE.erl b/big_tests/tests/graphql_sse_SUITE.erl index 820f354a91c..8a7a097c3ea 100644 --- a/big_tests/tests/graphql_sse_SUITE.erl +++ b/big_tests/tests/graphql_sse_SUITE.erl @@ -15,11 +15,13 @@ suite() -> all() -> [{group, admin}, - {group, user}]. + {group, user}, + {group, timeout}]. groups() -> [{admin, [parallel], admin_tests()}, - {user, [parallel], user_tests()}]. + {user, [parallel], user_tests()}, + {timeout, [], [sse_should_not_get_timeout]}]. init_per_suite(Config) -> Config1 = escalus:init_per_suite(Config), @@ -32,12 +34,22 @@ end_per_suite(Config) -> init_per_group(user, Config) -> graphql_helper:init_user(Config); init_per_group(admin, Config) -> - graphql_helper:init_admin_handler(Config). + graphql_helper:init_admin_handler(Config); +init_per_group(timeout, Config) -> + % Change the default idle_timeout for the listener to 1s to test if sse will override it + Listener = get_graphql_user_listener(), + mongoose_helper:change_listener_idle_timeout(Listener, 1000), + graphql_helper:init_user(Config). end_per_group(user, _Config) -> escalus_fresh:clean(), graphql_helper:clean(); end_per_group(admin, _Config) -> + graphql_helper:clean(); +end_per_group(timeout, _Config) -> + Listener = get_graphql_user_listener(), + mongoose_helper:restart_listener(mim(), Listener), + escalus_fresh:clean(), graphql_helper:clean(). init_per_testcase(CaseName, Config) -> @@ -123,8 +135,27 @@ user_invalid_operation_type(Config) -> user_invalid_operation_type_story(Alice) -> get_bad_request(execute_sse(user, #{query => query_doc()}, make_creds(Alice))). +sse_should_not_get_timeout(Config) -> + escalus:fresh_story(Config, [{alice, 1}, {bob, 1}], fun (Alice, Bob) -> + From = escalus_client:full_jid(Bob), + To = escalus_client:short_jid(Alice), + {200, Stream} = graphql_helper:execute_user_command_sse(<<"stanza">>, <<"subscribeForMessages">>, Alice, #{}, Config), + escalus:send(Bob, escalus_stanza:chat(From, To, <<"Hello!">>)), + sse_helper:wait_for_event(Stream), + timer:sleep(2000), + escalus:send(Bob, escalus_stanza:chat(From, To, <<"Hello again!">>)), + sse_helper:wait_for_event(Stream), + sse_helper:stop_sse(Stream) + end). + %% Helpers +get_graphql_user_listener() -> + Handler = #{module => mongoose_graphql_handler, schema_endpoint => user}, + ListenerOpts = #{handlers => [Handler]}, + [Listener] = mongoose_helper:get_listeners(mim(), ListenerOpts), + Listener. + %% Subscription - works only with the SSE handler doc() -> graphql_helper:get_doc(<<"stanza">>, <<"subscribeForMessages">>). diff --git a/big_tests/tests/rest_client_SUITE.erl b/big_tests/tests/rest_client_SUITE.erl index 52a6c46eefd..0d67699e984 100644 --- a/big_tests/tests/rest_client_SUITE.erl +++ b/big_tests/tests/rest_client_SUITE.erl @@ -38,7 +38,8 @@ all() -> {group, roster}, {group, messages_with_props}, {group, messages_with_thread}, - {group, security}]. + {group, security}, + {group, sse_timeout}]. groups() -> [{messages_with_props, [parallel], message_with_props_test_cases()}, @@ -48,7 +49,8 @@ groups() -> {muc_config, [], muc_config_cases()}, {muc_disabled, [parallel], muc_disabled_cases()}, {roster, [parallel], roster_test_cases()}, - {security, [], security_test_cases()}]. + {security, [], security_test_cases()}, + {sse_timeout, [], [sse_should_not_get_timeout]}]. message_test_cases() -> [msg_is_sent_and_delivered_over_xmpp, @@ -152,11 +154,19 @@ init_per_group(muc_disabled = GN, Config) -> Config1 = dynamic_modules:save_modules(HostType, Config), dynamic_modules:ensure_modules(HostType, required_modules(GN)), Config1; +init_per_group(sse_timeout, Config) -> + % Change the default idle_timeout for the listener to 1s to test if sse will override it + Listener = get_client_api_listener(), + mongoose_helper:change_listener_idle_timeout(Listener, 1000), + Config; init_per_group(_GN, Config) -> Config. end_per_group(muc_disabled, Config) -> dynamic_modules:restore_modules(Config); +end_per_group(sse_timeout, _Config) -> + Listener = get_client_api_listener(), + mongoose_helper:restart_listener(distributed_helper:mim(), Listener); end_per_group(_GN, _Config) -> ok. @@ -882,6 +892,19 @@ msg_without_thread_can_be_parsed(Config) -> MsgID = maps:get(id, _Msg) end). +sse_should_not_get_timeout(Config) -> + escalus:fresh_story(Config, [{alice, 1}, {bob, 1}], fun (Alice, Bob) -> + From = escalus_client:full_jid(Bob), + To = escalus_client:short_jid(Alice), + {200, Stream} = connect_to_sse({alice, Alice}), + escalus:send(Bob, escalus_stanza:chat(From, To, <<"Hello!">>)), + sse_helper:wait_for_event(Stream), + timer:sleep(2000), + escalus:send(Bob, escalus_stanza:chat(From, To, <<"Hello again!">>)), + sse_helper:wait_for_event(Stream), + sse_helper:stop_sse(Stream) + end). + assert_room_messages(RecvMsg, {_ID, _GenFrom, GenMsg}) -> escalus:assert(is_chat_message, [maps:get(body, RecvMsg)], GenMsg), ok. @@ -1447,3 +1470,9 @@ verify_server_name_in_header(Server, ExpectedName) -> config_to_muc_host(Config) -> ?config(muc_light_host, Config). + +get_client_api_listener() -> + Handler = #{module => mongoose_client_api}, + ListenerOpts = #{handlers => [Handler]}, + [Listener] = mongoose_helper:get_listeners(distributed_helper:mim(), ListenerOpts), + Listener. diff --git a/doc/configuration/listen.md b/doc/configuration/listen.md index a1fdc3988eb..ad9c982c99e 100644 --- a/doc/configuration/listen.md +++ b/doc/configuration/listen.md @@ -548,6 +548,14 @@ When set, enables authentication for the admin API, otherwise it is disabled. Re By default, when the option is not included, all GraphQL categories are enabled, so you don't need to add this option. When this option is added, only listed GraphQL categories will be processed. For others, the error "category disabled" will be returned. +#### `listen.http.handlers.mongoose_graphql_handler.sse_idle_timeout` +* **Syntax:** positive integer or the string `"infinity"` +* **Default:** 3600000 +* **Example:** `schema_endpoint = "admin"` + +This option specifies the time in milliseconds after which the SSE connection is closed when idle. +The default value is 1 hour. + ### Handler types: REST API - Admin - `mongoose_admin_api` The recommended configuration is shown in [Example 5](#example-5-admin-rest-api) below. diff --git a/doc/rest-api/Client-frontend_swagger.yml b/doc/rest-api/Client-frontend_swagger.yml index 6e1ee808fbe..3c9aaa7adc7 100644 --- a/doc/rest-api/Client-frontend_swagger.yml +++ b/doc/rest-api/Client-frontend_swagger.yml @@ -378,6 +378,7 @@ paths: "affiliation":"member"} ``` For more details please refer to the [rooms API specifications](#/Rooms) + Note: The SSE connection will be closed after 1 hour of idle time. tags: - "Server Sent Events" produces: diff --git a/src/graphql/mongoose_graphql_handler.erl b/src/graphql/mongoose_graphql_handler.erl index 8f6a047f924..17cf5caea25 100644 --- a/src/graphql/mongoose_graphql_handler.erl +++ b/src/graphql/mongoose_graphql_handler.erl @@ -45,13 +45,16 @@ -spec config_spec() -> mongoose_config_spec:config_section(). config_spec() -> #section{ - items = #{<<"username">> => #option{type = binary}, - <<"password">> => #option{type = binary}, - <<"schema_endpoint">> => #option{type = atom, - validate = {enum, [admin, domain_admin, user]}}, - <<"allowed_categories">> => #list{items = #option{type = binary, - validate = {enum, allowed_categories()}}, - validate = unique_non_empty}}, + items = #{<<"username">> => #option{type = binary}, + <<"password">> => #option{type = binary}, + <<"schema_endpoint">> => #option{type = atom, + validate = {enum, [admin, domain_admin, user]}}, + <<"allowed_categories">> => #list{items = #option{type = binary, + validate = {enum, allowed_categories()}}, + validate = unique_non_empty}, + <<"sse_idle_timeout">> => #option{type = int_or_infinity, + validate = positive}}, + defaults = #{<<"sse_idle_timeout">> => 3600000}, % 1h format_items = map, required = [<<"schema_endpoint">>], process = fun ?MODULE:process_config/1}. diff --git a/src/graphql/mongoose_graphql_sse_handler.erl b/src/graphql/mongoose_graphql_sse_handler.erl index bd19bb594a6..df4410daf42 100644 --- a/src/graphql/mongoose_graphql_sse_handler.erl +++ b/src/graphql/mongoose_graphql_sse_handler.erl @@ -24,8 +24,11 @@ -spec init(state(), any(), cowboy_req:req()) -> {ok, req(), state()} | {shutdown, cowboy:http_status(), cowboy:http_headers(), iodata(), req(), state()}. -init(State, _LastEvtId, Req) -> +init(#{sse_idle_timeout := Timeout} = State, _LastEvtId, Req) -> process_flag(trap_exit, true), % needed for 'terminate' to be called + cowboy_req:cast({set_options, #{ + idle_timeout => Timeout + }}, Req), case cowboy_req:method(Req) of <<"GET">> -> case mongoose_graphql_handler:check_auth_header(Req, State) of diff --git a/src/mongoose_client_api/mongoose_client_api_sse.erl b/src/mongoose_client_api/mongoose_client_api_sse.erl index f893c42fbcb..786dfba6d69 100644 --- a/src/mongoose_client_api/mongoose_client_api_sse.erl +++ b/src/mongoose_client_api/mongoose_client_api_sse.erl @@ -21,6 +21,10 @@ init(_InitArgs, _LastEvtId, Req) -> ?LOG_DEBUG(#{what => client_api_sse_init, req => Req}), {cowboy_rest, Req1, State0} = mongoose_client_api:init(Req, []), {Authorization, Req2, State} = mongoose_client_api:is_authorized(Req1, State0), + % set 1h timeout to prevent client from frequent disconnections + cowboy_req:cast({set_options, #{ + idle_timeout => 3600000 + }}, Req), maybe_init(Authorization, Req2, State#{id => 1}). maybe_init(true, Req, #{jid := JID} = State) -> diff --git a/test/common/config_parser_helper.erl b/test/common/config_parser_helper.erl index 825b49bca5c..acd77f3b24b 100644 --- a/test/common/config_parser_helper.erl +++ b/test/common/config_parser_helper.erl @@ -1083,7 +1083,8 @@ default_config([listen, http, handlers, mongoose_client_api]) -> module => mongoose_client_api}; default_config([listen, http, handlers, mongoose_graphql_handler]) -> #{module => mongoose_graphql_handler, - schema_endpoint => admin}; + schema_endpoint => admin, + sse_idle_timeout => 3600000}; default_config([listen, http, handlers, Module]) -> #{module => Module}; default_config([listen, http, transport]) -> diff --git a/test/common/mongoose_helper.erl b/test/common/mongoose_helper.erl index 4fe0c5c65f0..867c1f7b0a3 100644 --- a/test/common/mongoose_helper.erl +++ b/test/common/mongoose_helper.erl @@ -45,6 +45,7 @@ -export([auth_opts_with_password_format/1]). -export([get_listeners/2]). -export([restart_listener/2]). +-export([change_listener_idle_timeout/2]). -export([should_minio_be_running/1]). -export([new_mongoose_acc/1]). -export([print_debug_info_for_module/1]). @@ -490,15 +491,29 @@ build_new_password_opts(Type) -> #{format => Type}. get_listeners(#{} = Spec, Pattern) -> - Keys = maps:keys(Pattern), Listeners = rpc(Spec, mongoose_config, get_opt, [listen]), - lists:filter(fun(Listener) -> maps:with(Keys, Listener) =:= Pattern end, Listeners). + lists:filter(fun(Listener) -> matches_pattern(Listener, Pattern) end, Listeners). + +matches_pattern(Map, Pattern) when is_map(Map), is_map(Pattern) -> + Keys = maps:keys(Pattern), + lists:all(fun(Key) -> matches_pattern(maps:get(Key, Map, undefined), maps:get(Key, Pattern)) end, Keys); +matches_pattern([Head1 | List], [Head2 | Pattern]) -> + matches_pattern(Head1, Head2) andalso matches_pattern(List, Pattern); +matches_pattern(undefined, _) -> + false; +matches_pattern(Value, Pattern) -> + Value =:= Pattern. %% 'port', 'ip_tuple' and 'proto' options need to stay unchanged for a successful restart restart_listener(Spec, Listener) -> rpc(Spec, mongoose_listener, stop_listener, [Listener]), rpc(Spec, mongoose_listener, start_listener, [Listener]). +change_listener_idle_timeout(Listener, Timeout) -> + #{protocol := ProtocolOpts} = Listener, + NewConfig = Listener#{protocol => ProtocolOpts#{idle_timeout => Timeout}}, + restart_listener(mim(), NewConfig). + should_minio_be_running(Config) -> DBs = ct_helper:get_preset_var(Config, dbs, []), lists:member(minio, DBs). diff --git a/test/config_parser_SUITE.erl b/test/config_parser_SUITE.erl index 85ba31fa255..df06350058e 100644 --- a/test/config_parser_SUITE.erl +++ b/test/config_parser_SUITE.erl @@ -616,8 +616,10 @@ listen_http_handlers_graphql(_Config) -> test_listen_http_handler_creds(P, T), ?cfg(P ++ [allowed_categories], [<<"muc">>, <<"inbox">>], T(#{<<"allowed_categories">> => [<<"muc">>, <<"inbox">>]})), + ?cfg(P ++ [sse_idle_timeout], 3600000, T(#{})), ?err(T(#{<<"allowed_categories">> => [<<"invalid">>]})), ?err(T(#{<<"schema_endpoint">> => <<"wrong_endpoint">>})), + ?err(T(#{<<"sse_idle_timeout">> => 0})), ?err(http_handler_raw(mongoose_graphql_handler, #{})). test_listen_http_handler_creds(P, T) ->