Skip to content

Commit

Permalink
Merge pull request #108 from kafka4beam/1214-sync-emqx-fork
Browse files Browse the repository at this point in the history
1214 sync emqx fork
  • Loading branch information
zmstone authored Dec 14, 2022
2 parents afe2e11 + c2cbd10 commit f0100ad
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 26 deletions.
14 changes: 14 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,20 @@
- Ported changes from EMQX's fork (based on 2.3.6) back to master branch
- Included an pushback twards 'no_acl' callers. https://github.com/emqx/kafka_protocol/pull/1
- Avoid crashing on decoding unknown error codes
- Improve SNI (server_name_indication) config.
- Prior to this change, SNI is auto-added only when SSL option
'verify' is set to 'verify_peer'.
This retriction is unnecessary, because SNI is a part of
client-hello in the handshake, it does not have anything to do
with server certificate (and hostname) verification.
- The connection config is shared between bootstrap connection
and partition leader connection.
Using a static SNI may work for bootstrap connections but
may then fail for partition leaders if they happen to be
different hosts (which is always the case in confluent cloud).
To fix it, we now allow users to use two special values for SNI:
- auto: use the exact connecting host name (FQDN or IP)
- none: do not use anything at all.
* 4.1.0
- Added pass SASL version to kpro_auth_backend behaviour modules
- The application ˋstartˋ method must return the `pid` of the top supervisor
Expand Down
49 changes: 25 additions & 24 deletions src/kpro_connection.erl
Original file line number Diff line number Diff line change
Expand Up @@ -295,38 +295,39 @@ get_tcp_mod(_SslOpts = true) -> ssl;
get_tcp_mod(_SslOpts = [_|_]) -> ssl;
get_tcp_mod(_) -> gen_tcp.

%% If SslOpts contains {verify, verify_peer}, we insert
%% {server_name_indication, Host}. This is necessary as of OTP 20, to
%% ensure that peer verification is done against the correct host name
%% (otherwise the IP will be used, which is almost certainly
%% incorrect).
insert_server_name_indication(SslOpts, Host) ->
VerifyOpt = proplists:get_value(verify, SslOpts),
insert_server_name_indication(VerifyOpt, SslOpts, Host).

insert_server_name_indication(verify_peer, SslOpts, Host) ->
case proplists:get_value(server_name_indication, SslOpts) of
undefined ->
%% insert {server_name_indication, Host} if not already present
[{server_name_indication, ensure_string(Host)} | SslOpts];
_ ->
SslOpts
end;

insert_server_name_indication(_, SslOpts, _) ->
SslOpts.
%% insert {server_name_indication, Host} if not already present
%% Some special values are allowed:
%% * auto: use the host as SNI
%% * none: do not use SNI
%% * undefined: same as auto (for backward compatibility)
insert_server_name_indication(SslOpts0, Host) ->
SNI = ensure_string(proplists:get_value(server_name_indication, SslOpts0)),
SslOpts = proplists:delete(server_name_indication, SslOpts0),
insert_server_name_indication(SNI, ensure_string(Host), SslOpts).

insert_server_name_indication("", Host, SslOpts) ->
[{server_name_indication, Host} | SslOpts];
insert_server_name_indication("undefined", Host, SslOpts) ->
[{server_name_indication, Host} | SslOpts];
insert_server_name_indication("auto", Host, SslOpts) ->
[{server_name_indication, Host} | SslOpts];
insert_server_name_indication("none", _Host, SslOpts) ->
SslOpts;
insert_server_name_indication(SNI, _Host, SslOpts) ->
[{server_name_indication, SNI} | SslOpts].

%% inet:hostname() is atom() | string()
%% however sni() is only allowed to be string()
ensure_string(Host) when is_atom(Host) -> atom_to_list(Host);
ensure_string(Host) when is_binary(Host) -> binary_to_list(Host);
ensure_string(Host) -> Host.

maybe_upgrade_to_ssl(Sock, _Mod = ssl, SslOpts0, Host, Timeout) ->
SslOpts = case SslOpts0 of
true -> [];
[_|_] -> insert_server_name_indication(SslOpts0, Host)
SslOpts1 = case SslOpts0 of
true -> [{verify, verify_none}];
[_|_] -> SslOpts0
end,

SslOpts = insert_server_name_indication(SslOpts1, Host),
case ssl:connect(Sock, SslOpts, Timeout) of
{ok, NewSock} -> NewSock;
{error, Reason} -> erlang:error({failed_to_upgrade_to_ssl, Reason})
Expand Down
18 changes: 18 additions & 0 deletions test/kpro_produce_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,24 @@ magic_v0_basic_test_() ->
end,
[MkTest(Vsn) || Vsn <- lists:seq(Min, Max)].

ssl_test_() ->
[{"sni=none", fun() -> ssl_test_with_sni(none) end},
{"sni=auto", fun() -> ssl_test_with_sni(auto) end},
{"sni=undefined", fun() -> ssl_test_with_sni(undefined) end},
{"sni=static", fun() -> ssl_test_with_sni(<<"localhost">>) end}
].

ssl_test_with_sni(SNI) ->
{_, Vsn} = get_api_vsn_range(),
Msg = #{value => make_value(?LINE), headers => [{<<"foo">>, <<"bar">>}]},
Req = kpro_req_lib:produce(Vsn, topic(), ?PARTI, [Msg]),
with_connection(#{ssl => [{verify, verify_none}, {server_name_indication, SNI}],
sasl => kpro_test_lib:sasl_config(plain)},
fun(Pid) ->
{ok, Rsp} = kpro:request_sync(Pid, Req, ?TIMEOUT),
?ASSERT_RESPONSE_NO_ERROR(Vsn, Rsp)
end).

%% Timestamp within batch may not have to be monotonic.
non_monotoic_ts_in_batch_test() ->
{_, Vsn} = get_api_vsn_range(),
Expand Down
5 changes: 3 additions & 2 deletions test/kpro_schema_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@ test_api(API) ->
end, lists:seq(MinV, MaxV)).

error_code_test() ->
?assertEqual(invalid_recod, kpro_schema:ec(87)),
%% insure it's added (if we ever regenerate the schema module)
?assertEqual(invalid_record, kpro_schema:ec(87)),
%% unknown error code should not crash
?assertEqual({unknown_error_code, 99999}, kpro_schema:ec(99999)).
?assertEqual(99999, kpro_schema:ec(99999)).

%%%_* Emacs ====================================================================
%%% Local Variables:
Expand Down
2 changes: 2 additions & 0 deletions test/kpro_test_lib.erl
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ ssl_options() ->
[ {cacertfile, CaCertFile}
, {keyfile, osenv("KPRO_TEST_SSL_KEY_FILE")}
, {certfile, osenv("KPRO_TEST_SSL_CERT_FILE")}
, {verify, verify_none}
]
end
end.
Expand All @@ -255,6 +256,7 @@ default_ssl_options() ->
[ {cacertfile, Fname("ca.crt")}
, {keyfile, Fname("client.key")}
, {certfile, Fname("client.crt")}
, {verify, verify_none}
].

osenv(Name) ->
Expand Down

0 comments on commit f0100ad

Please sign in to comment.