From c86614c2612c57c8e254678186a4ee4aac8eba9f Mon Sep 17 00:00:00 2001 From: lucazulian Date: Tue, 7 Nov 2023 14:44:43 +0100 Subject: [PATCH] Add Session Group and bootstrap Sever RDE --- lib/bgp/server.ex | 8 ++- lib/bgp/server/rde.ex | 100 ++++++++++++++++++++++++++++---- lib/bgp/server/session.ex | 20 ++++++- lib/bgp/server/session/group.ex | 37 ++++++++++++ 4 files changed, 149 insertions(+), 16 deletions(-) create mode 100644 lib/bgp/server/session/group.ex diff --git a/lib/bgp/server.ex b/lib/bgp/server.ex index 8ae86d0..eb29677 100644 --- a/lib/bgp/server.ex +++ b/lib/bgp/server.ex @@ -168,7 +168,8 @@ defmodule BGP.Server do Supervisor.init( [ {Registry, keys: :unique, name: session_registry(args[:server])}, - {BGP.Server.RDE, server: args[:server]}, + {BGP.Server.Session.Group, args[:server]}, + {BGP.Server.RDE, args[:server]}, {BGP.Server.Session.Supervisor, args[:server]}, { ThousandIsland, @@ -239,5 +240,8 @@ defmodule BGP.Server do def session_registry(server), do: Module.concat(server, "Session.Registry") @spec session_via(t(), IP.Address.t()) :: {:via, module(), term()} - def session_via(server, host), do: {:via, Registry, {session_registry(server), host}} + def session_via(server, hostname), do: {:via, Registry, {session_registry(server), hostname}} + + @spec session_group_for(t()) :: module() + def session_group_for(server), do: Module.concat(server, "Session.Group") end diff --git a/lib/bgp/server/rde.ex b/lib/bgp/server/rde.ex index b423144..148f868 100644 --- a/lib/bgp/server/rde.ex +++ b/lib/bgp/server/rde.ex @@ -6,27 +6,105 @@ defmodule BGP.Server.RDE do * performs route selection and maintains Adj-RIB-In, Loc-Rib, Adjs-RIB-Out in ETS. * performs route dissemination to peers after processing updates. + ```mermaid + stateDiagram-v2 + [*] --> Idle + Idle --> Processing : state_timeout + Idle --> Processing : calculate + Processing --> Idle : route_dissemination + Processing --> Idle + ``` """ + # TODO + # implement handle info for monitor + # {Ref, join, Group, [JoinPid1, JoinPid2]} + + @behaviour :gen_statem + alias BGP.{Message.UPDATE, Server} + alias BGP.Server.Session.Group - use GenServer + require Logger - @spec start_link(Keyword.t()) :: GenServer.on_start() - def start_link(args), - do: GenServer.start_link(__MODULE__, args, name: Server.rde_for(args[:server])) + @doc false + def child_spec(server), + do: %{id: Server.rde_for(server), start: {__MODULE__, :start_link, [server]}} @spec process_update(Server.t(), UPDATE.t()) :: :ok def process_update(server, update), - do: GenServer.call(Server.rde_for(server), {:process_update, update}) + do: :gen_statem.call(Server.rde_for(server), {:process_update, update}) + + @spec start_link(term()) :: :gen_statem.start_ret() + def start_link(server) do + :gen_statem.start_link({:local, Server.rde_for(server)}, __MODULE__, [server], + debug: [:trace] + ) + end + + @impl :gen_statem + def callback_mode, do: [:handle_event_function, :state_enter] + + @impl :gen_statem + def init([server]) do + Group.monitor(server) + + data = %{} + actions = [{:next_event, :internal, :accept_updates}] + + {:ok, :idle, data, actions} + end - @impl GenServer - def init(_args) do - {:ok, %{rib: MapSet.new(), rib_in: MapSet.new(), rib_out: MapSet.new()}} + @impl :gen_statem + def handle_event(:enter, old_state, new_state, data) do + Logger.debug("RDE #{data}: #{old_state} -> #{new_state}") + :keep_state_and_data end - @impl GenServer - def handle_call({:process_update, %UPDATE{}}, _from, state) do - {:reply, :ok, state} + def handle_event(:internal, :accept_updates, :idle, _data) do + { + :keep_state_and_data, + [{:state_timeout, 1_000}] + } end + + def handle_event(:state_timeout, _, :idle, data) do + { + :next_state, + :processing, + data, + [{:next_event, :internal, :calculate}] + } + end + + def handle_event(:internal, :calculate, :processing, data) do + # TODO implementation + { + :next_state, + :idle, + data, + [ + {:next_event, :internal, :route_dissemination} + ] + } + end + + def handle_event(:internal, :route_dissemination, :idle, _data) do + # TODO implementation + :keep_state_and_data + end + + # @spec start_link(Keyword.t()) :: GenServer.on_start() + # def start_link(args), + # do: GenServer.start_link(__MODULE__, args, name: Server.rde_for(args[:server])) + + # @impl GenServer + # def init(_args) do + # {:ok, %{rib: MapSet.new(), rib_in: MapSet.new(), rib_out: MapSet.new()}} + # end + + # @impl GenServer + # def handle_call({:process_update, %UPDATE{}}, _from, state) do + # {:reply, :ok, state} + # end end diff --git a/lib/bgp/server/session.ex b/lib/bgp/server/session.ex index 065c09e..ec721c5 100644 --- a/lib/bgp/server/session.ex +++ b/lib/bgp/server/session.ex @@ -43,7 +43,7 @@ defmodule BGP.Server.Session do alias BGP.Message.UPDATE.Attribute alias BGP.Message.UPDATE.Attribute.{ASPath, NextHop, Origin} alias BGP.Server.RDE - alias BGP.Server.Session.{Timer, Transport} + alias BGP.Server.Session.{Group, Timer, Transport} alias ThousandIsland.Socket @@ -184,7 +184,10 @@ defmodule BGP.Server.Session do { :keep_state, %{data | buffer: <<>>, socket: nil}, - [{:next_event, :internal, {:tcp_connection, :fails}}] + [ + {:next_event, :internal, {:tcp_connection, :fails}}, + {:next_event, :internal, :leave_session_group} + ] } end @@ -319,6 +322,16 @@ defmodule BGP.Server.Session do } end + def handle_event(:internal, :join_session_group, _state, %__MODULE__{} = data) do + Group.join(data.server, data.host) + :keep_state_and_data + end + + def handle_event(:internal, :leave_session_group, _state, %__MODULE__{} = data) do + Group.leave(data.server, data.host) + :keep_state_and_data + end + def handle_event({:call, from}, {:check_collision, _peer_bgp_id}, :established, _data), do: {:keep_state_and_data, [{:reply, from, {:error, :collision}}]} @@ -1064,7 +1077,8 @@ defmodule BGP.Server.Session do {:next_event, :internal, {:restart_timer, :as_origination, nil}}, {:next_event, :internal, {:restart_timer, :hold_time, nil}}, {:next_event, :internal, {:restart_timer, :route_advertisement, nil}}, - {:next_event, :internal, {:send, compose_as_update(data)}} + {:next_event, :internal, {:send, compose_as_update(data)}}, + {:next_event, :internal, :join_session_group} ] } end diff --git a/lib/bgp/server/session/group.ex b/lib/bgp/server/session/group.ex new file mode 100644 index 0000000..d181675 --- /dev/null +++ b/lib/bgp/server/session/group.ex @@ -0,0 +1,37 @@ +defmodule BGP.Server.Session.Group do + @moduledoc false + + alias BGP.Server + + @typedoc "Server Session Scope" + @type scope :: module() + + @typedoc "Server Session Group" + @type group :: atom() + + @doc false + def child_spec(server) do + %{ + id: Server.session_group_for(server), + start: {__MODULE__, :start_link, [server]} + } + end + + @spec start_link(Server.t()) :: {:ok, pid()} | {:error, any()} + def start_link(server) do + :pg.start_link(Server.session_group_for(server)) + end + + @spec join(Server.t(), group()) :: :ok + def join(server, group), + do: :pg.join(Server.session_group_for(server), group, self()) + + @spec leave(Server.t(), group()) :: :ok | :not_joined + def leave(server, group), + do: :pg.leave(Server.session_group_for(server), group, self()) + + @spec monitor(Server.t()) :: {reference(), [pid()]} + def monitor(server) do + :pg.monitor_scope(Server.session_group_for(server)) + end +end