diff --git a/lib/bgp/message/update/attribute/as_path.ex b/lib/bgp/message/update/attribute/as_path.ex index d6a6376..0d56cf2 100644 --- a/lib/bgp/message/update/attribute/as_path.ex +++ b/lib/bgp/message/update/attribute/as_path.ex @@ -6,7 +6,7 @@ defmodule BGP.Message.UPDATE.Attribute.ASPath do @type type :: :as_sequence | :as_set | :as_confed_sequence | :as_confed_set @type length :: non_neg_integer() - @type t :: %__MODULE__{value: [{type(), length(), OPEN.asn()}]} + @type t :: %__MODULE__{value: {type(), length(), [OPEN.asn()]}} defstruct value: [] diff --git a/lib/bgp/server.ex b/lib/bgp/server.ex index 7864592..5a32c7e 100644 --- a/lib/bgp/server.ex +++ b/lib/bgp/server.ex @@ -169,6 +169,7 @@ defmodule BGP.Server do Supervisor.init( [ {Registry, keys: :unique, name: session_registry(args[:server])}, + {BGP.Server.Session.Group, args[:server]}, {BGP.Server.RDE, server: args[:server]}, {BGP.Server.Session.Supervisor, args[:server]}, { @@ -244,5 +245,8 @@ defmodule BGP.Server do def session_registry(server), do: Module.concat(server, "Session.Registry") @spec session_via(t(), IP.Address.t()) :: {:via, module(), term()} - def session_via(server, host), do: {:via, Registry, {session_registry(server), host}} + def session_via(server, hostname), do: {:via, Registry, {session_registry(server), hostname}} + + @spec session_group_for(t()) :: module() + def session_group_for(server), do: Module.concat(server, "Session.Group") end diff --git a/lib/bgp/server/rde.ex b/lib/bgp/server/rde.ex index 7540a39..b419561 100644 --- a/lib/bgp/server/rde.ex +++ b/lib/bgp/server/rde.ex @@ -1,25 +1,369 @@ defmodule BGP.Server.RDE do - @moduledoc false + @moduledoc """ + RDE implementation based on RFC4271 section 9 (https://www.rfc-editor.org/rfc/rfc4271#section-9): + + * performs preference calculation for received routes. + * performs route selection and maintains Adj-RIB-In, Loc-Rib, Adjs-RIB-Out in ETS. + * performs route dissemination to peers after processing updates. + + ```mermaid + stateDiagram-v2 + [*] --> Idle + Idle --> Processing : state_timeout + Idle --> Processing : calculate + Processing --> Idle : route_dissemination + Processing --> Idle + ``` + """ + + @behaviour :gen_statem alias BGP.{Message.UPDATE, Server} + alias BGP.Server.Session + alias BGP.Server.Session.Group + alias BGP.Message.UPDATE.Attribute + alias BGP.Message.UPDATE.Attribute.{ASPath, LocalPref, MultiExitDisc, Origin} + + require Logger + + @enforce_keys [:adj_rib_in, :loc_rib, :queue, :server] + + defstruct adj_rib_in: nil, loc_rib: nil, queue: nil, server: nil + + @doc false + def child_spec(server), + do: %{id: Server.rde_for(server), start: {__MODULE__, :start_link, [server]}} + + @spec process_update(Session.data(), UPDATE.t()) :: :ok + def process_update(%Session{} = data, update), + do: :gen_statem.call(Server.rde_for(data.server), {:process_update, data, update}) + + @spec start_link(term()) :: :gen_statem.start_ret() + def start_link(server) do + :gen_statem.start_link( + {:local, Server.rde_for(server)}, + __MODULE__, + [server], + # debug: [:trace] + [] + ) + end + + def get_loc_rib(server) do + server + |> :ets.whereis() + |> :ets.tab2list() + end + + @impl :gen_statem + # def callback_mode, do: [:handle_event_function, :state_enter] + def callback_mode, do: :handle_event_function + + @impl :gen_statem + def init([server]) do + Group.monitor(server) + + data = %__MODULE__{ + adj_rib_in: :ets.new(:adj_rib_in, [:set, :private]), + loc_rib: :ets.new(server, [:named_table, :set, :protected]), + queue: :queue.new(), + server: server + } + + {:ok, :idle, data, [{:state_timeout, 1_000, nil}]} + end + + @impl :gen_statem + # def handle_event(:enter, old_state, new_state, data) do + # Logger.debug("RDE: #{old_state} -> #{new_state}") + + # all = :ets.tab2list(data.adj_rib_in) + # IO.inspect(all, label: :aaaaa) + + # :keep_state_and_data + # end + + def handle_event(:state_timeout, _, :idle, data) do + { + :next_state, + :processing, + data, + [{:next_event, :internal, :process_update_internal}] + } + end + + def handle_event(:internal, :calculate, :processing, data) do + # Phase 2: Route Selection + out_res = + data.adj_rib_in + |> :ets.tab2list() + # |> Enum.filter(& filter_next_hop/1) + |> Enum.filter(&filter_as_path/1) + |> Enum.group_by(fn {{_host, _pid, prefix}, _path_attributes, _loc_pref, _session} -> + prefix + end) + |> Enum.map(fn {prefix, adj_rib_in_items} -> + res = + adj_rib_in_items + |> Enum.reduce( + [], + fn + {_, _, _, _} = item, [] -> + [item] + + {_, _, loc_pref, _} = item, [{_, _, loc_pref_acc, _} | _] = acc -> + cond do + loc_pref > loc_pref_acc -> [item] + loc_pref == loc_pref_acc -> [item | acc] + true -> acc + end + end + ) + + {prefix, res} + end) + |> Enum.map(fn {_prefix, adj_rib_in_items} -> + adj_rib_in_items + |> Enum.reduce( + [], + fn + {_, _, _, _} = item, [] -> + [item] + + {_, path_attributes, _, _} = item, [{_, path_attributes_acc, _, _} | _] = acc -> + as_path = filter_as_path_by_length(path_attributes) + acc_as_path = filter_as_path_by_length(path_attributes_acc) + + cond do + as_path < acc_as_path -> [item] + as_path == acc_as_path -> [item | acc] + true -> acc + end + end + ) + |> Enum.reduce( + [], + fn + {_, _, _, _} = item, [] -> + [item] + + {_, path_attributes, _, _} = item, [{_, path_attributes_acc, _, _} | _] = acc -> + origin = filter_origin(path_attributes) + acc_origin = filter_origin(path_attributes_acc) + + cond do + origin < acc_origin -> [item] + origin == acc_origin -> [item | acc] + true -> acc + end + end + ) + |> Enum.reduce( + %{}, + fn + {{_host, _pid, _prefix}, path_attributes, _loc_pref, %Session{asn: asn}} = item, + acc -> + case acc[asn] || [] do + [] -> + Map.put(acc, asn, [item]) + + [_] -> + acc + + [{_, path_attributes_acc, _, _} | _] = items -> + multi_exit_disc = filter_multi_exit_disc(path_attributes) + acc_multi_exit_disc = filter_multi_exit_disc(path_attributes_acc) + + cond do + multi_exit_disc < acc_multi_exit_disc -> Map.put(acc, asn, [item]) + multi_exit_disc == acc_multi_exit_disc -> Map.put(acc, asn, [item | items]) + true -> acc + end + end + end + ) + |> Enum.flat_map(fn {_asn, prefixes} -> prefixes end) + |> Kernel.then(fn items -> + case filter_ebgp(items) do + [] -> items + ebgps -> ebgps + end + end) + |> Enum.reduce( + [], + fn + {_, _, _, _} = item, [] -> + [item] - use GenServer + {_, _, _, %Session{bgp_id: bgp_id}} = item, + [{_, _, _, %Session{bgp_id: acc_bgp_id}} | _] = acc -> + cond do + bgp_id > acc_bgp_id -> [item] + bgp_id == acc_bgp_id -> [item | acc] + true -> acc + end + end + ) + |> Enum.reduce( + [], + fn + {{_host, _pid, prefix}, path_attributes, _, _}, [] -> + [{prefix, path_attributes}] - @spec start_link(Keyword.t()) :: GenServer.on_start() - def start_link(args), - do: GenServer.start_link(__MODULE__, args, name: Server.rde_for(args[:server])) + {{_host, _pid, prefix}, path_attributes, _, %Session{host: host}}, + [{_, _, _, %Session{host: acc_host}} | _] = acc -> + cond do + host < acc_host -> [{prefix, path_attributes}] + true -> acc + end + end + ) + |> List.flatten() + end) + + :ets.delete_all_objects(data.loc_rib) + :ets.insert(data.loc_rib, out_res) + + { + :next_state, + :idle, + data, + {:state_timeout, 1_000, nil} + } + end + + def handle_event(:info, {_ref, :join, _group, _pids}, _state, _data) do + :keep_state_and_data + end + + def handle_event(:info, {_ref, :leave, host, pids}, _state, data) do + pids + |> Enum.each(fn pid -> + :ets.match_delete(data.adj_rib_in, {{host, pid, :_}, :_}) + end) + + all = :ets.tab2list(data.adj_rib_in) + IO.inspect(all, label: :aaaaa) + + :keep_state_and_data + end + + def handle_event( + {:call, {pid, _} = from}, + {:process_update, %Session{} = session, update}, + :idle, + %__MODULE__{} = data + ) do + { + :keep_state, + %{data | queue: :queue.in({session, update, pid}, data.queue)}, + {:reply, from, :ok} + } + end + + def handle_event({:call, from}, {:process_update, _session, _update}, _state, _data) do + {:postpone, {:reply, from, :ok}} + end + + def handle_event(:internal, :process_update_internal, :processing, %__MODULE__{} = data) do + Stream.resource( + fn -> data.queue end, + fn queue -> + case :queue.out(queue) do + {{:value, update}, queue} -> {[update], queue} + {:empty, queue} -> {:halt, queue} + end + end, + fn _queue -> nil end + ) + |> Enum.each(fn {%Session{} = session, %UPDATE{} = update, pid} -> + update.withdrawn_routes + |> Enum.each(fn prefix -> + :ets.delete(data.adj_rib_in, {session.host, pid, prefix}) + end) + + object = + update.nlri + |> Enum.map(fn prefix -> + loc_pref = degree_of_preference(session.ibgp, update.path_attributes) + {{session.host, pid, prefix}, update.path_attributes, loc_pref, session} + end) + + :ets.insert(data.adj_rib_in, object) + end) + + { + :keep_state, + %{data | queue: :queue.new()}, + [{:next_event, :internal, :calculate}] + } + end + + # Phase 1: Calculation of Degree of Preference (https://www.rfc-editor.org/rfc/rfc4271#section-9.1.1) + defp degree_of_preference(false, _path_attributes), do: 1 + + defp degree_of_preference(true, path_attributes) do + Enum.find_value(path_attributes, 1, fn + %Attribute{value: %LocalPref{} = local_pref} -> local_pref.value + _ -> nil + end) + end + + defp filter_as_path({{_host, _pid, _prefix}, path_attributes, _loc_pref, %Session{} = session}) do + Enum.find_value(path_attributes, true, fn + %Attribute{value: %ASPath{} = as_path} -> + Enum.find_value(as_path.value, true, fn + {_, _, asn} -> asn != session.asn + _ -> nil + end) + + _ -> + nil + end) + end + + defp filter_as_path_by_length({{_host, _pid, _prefix}, path_attributes, _loc_pref, _session}) do + Enum.find_value(path_attributes, fn + %Attribute{value: %ASPath{value: {type, _length, _path}}} + when type in [:as_set, :as_confed_set] -> + 1 + + %Attribute{value: %ASPath{value: {_type, length, _path}}} -> + length + + _ -> + nil + end) + end + + defp filter_origin({{_host, _pid, _prefix}, path_attributes, _loc_pref, _session}) do + Enum.find_value(path_attributes, fn + %Attribute{value: %Origin{origin: origin}} -> + case origin do + :igp -> 0 + :egp -> 1 + :incomplete -> 2 + end + + _ -> + nil + end) + end - @spec process_update(Server.t(), UPDATE.t()) :: :ok - def process_update(server, update), - do: GenServer.call(Server.rde_for(server), {:process_update, update}) + defp filter_multi_exit_disc({{_host, _pid, _prefix}, path_attributes, _loc_pref, _session}) do + Enum.find_value(path_attributes, fn + %Attribute{value: %MultiExitDisc{value: value}} -> + value - @impl GenServer - def init(_args) do - {:ok, %{rib: MapSet.new(), rib_in: MapSet.new(), rib_out: MapSet.new()}} + _ -> + nil + end) end - @impl GenServer - def handle_call({:process_update, %UPDATE{}}, _from, state) do - {:reply, :ok, state} + defp filter_ebgp(items) do + Enum.filter(items, fn {{_host, _pid, _prefix}, _path_attributes, _loc_pref, + %Session{ibgp: ibgp}} -> + not ibgp + end) end end diff --git a/lib/bgp/server/session.ex b/lib/bgp/server/session.ex index 26abb7d..ba158d7 100644 --- a/lib/bgp/server/session.ex +++ b/lib/bgp/server/session.ex @@ -43,7 +43,7 @@ defmodule BGP.Server.Session do alias BGP.Message.UPDATE.Attribute alias BGP.Message.UPDATE.Attribute.{ASPath, NextHop, Origin} alias BGP.Server.RDE - alias BGP.Server.Session.{Timer, Transport} + alias BGP.Server.Session.{Group, Timer, Transport} alias ThousandIsland.Socket @@ -191,7 +191,10 @@ defmodule BGP.Server.Session do { :keep_state, %{data | buffer: <<>>, socket: nil}, - [{:next_event, :internal, {:tcp_connection, :fails}}] + [ + {:next_event, :internal, {:tcp_connection, :fails}}, + {:next_event, :internal, :leave_session_group} + ] } end @@ -326,6 +329,16 @@ defmodule BGP.Server.Session do } end + def handle_event(:internal, :join_session_group, _state, %__MODULE__{} = data) do + Group.join(data.server, data.host) + :keep_state_and_data + end + + def handle_event(:internal, :leave_session_group, _state, %__MODULE__{} = data) do + Group.leave(data.server, data.host) + :keep_state_and_data + end + def handle_event({:call, from}, {:check_collision, _peer_bgp_id}, :established, _data), do: {:keep_state_and_data, [{:reply, from, {:error, :collision}}]} @@ -1071,7 +1084,8 @@ defmodule BGP.Server.Session do {:next_event, :internal, {:restart_timer, :as_origination, nil}}, {:next_event, :internal, {:restart_timer, :hold_time, nil}}, {:next_event, :internal, {:restart_timer, :route_advertisement, nil}}, - {:next_event, :internal, {:send, compose_as_update(data)}} + {:next_event, :internal, {:send, compose_as_update(data)}}, + {:next_event, :internal, :join_session_group} ] } end @@ -1168,10 +1182,27 @@ defmodule BGP.Server.Session do end end - def handle_event({:timeout, :route_advertisement}, _event, :established, _data) do + def handle_event({:timeout, :route_advertisement}, _event, :established, data) do + IO.inspect(:WWWWWWWWWWWWWWW) + + events = + data.server + |> RDE.get_loc_rib() + |> Enum.map(fn {prefix, path_attributes} -> + {:next_event, :internal, + {:send, + %UPDATE{ + path_attributes: path_attributes, + nlri: [prefix] + }}} + end) + |> IO.inspect(label: :aaaaaa) + { :keep_state_and_data, - [{:next_event, :internal, {:restart_timer, :route_advertisement, nil}}] + [ + {:next_event, :internal, {:restart_timer, :route_advertisement, nil}} | events + ] } end @@ -1247,11 +1278,11 @@ defmodule BGP.Server.Session do :keep_state_and_data %UPDATE{} when hold_time > 0 -> - RDE.process_update(data.server, msg) + RDE.process_update(data, msg) {:keep_state_and_data, [{:next_event, :internal, {:restart_timer, :hold_time, nil}}]} %UPDATE{} -> - RDE.process_update(data.server, msg) + RDE.process_update(data, msg) :keep_state_and_data end end diff --git a/lib/bgp/server/session/group.ex b/lib/bgp/server/session/group.ex new file mode 100644 index 0000000..8ebc9a3 --- /dev/null +++ b/lib/bgp/server/session/group.ex @@ -0,0 +1,37 @@ +defmodule BGP.Server.Session.Group do + @moduledoc false + + alias BGP.Server + + @typedoc "Server Session Scope" + @type scope :: module() + + @typedoc "Server Session Group" + @type group :: atom() + + @doc false + def child_spec(server) do + %{ + id: Server.session_group_for(server), + start: {__MODULE__, :start_link, [server]} + } + end + + @spec start_link(Server.t()) :: {:ok, pid()} | {:error, any()} + def start_link(server) do + :pg.start_link(Server.session_group_for(server)) + end + + @spec join(Server.t(), group()) :: :ok + def join(server, group), + do: :pg.join(Server.session_group_for(server), group, self()) + + @spec leave(Server.t(), group()) :: :ok | :not_joined + def leave(server, group), + do: :pg.leave(Server.session_group_for(server), group, self()) + + @spec monitor(Server.t()) :: {reference(), %{optional(group()) => [pid()]}} + def monitor(server) do + :pg.monitor_scope(Server.session_group_for(server)) + end +end