Skip to content

Commit

Permalink
Add Session Group and bootstrap Sever RDE
Browse files Browse the repository at this point in the history
  • Loading branch information
lucazulian committed Dec 5, 2023
1 parent 0873716 commit c86614c
Show file tree
Hide file tree
Showing 4 changed files with 149 additions and 16 deletions.
8 changes: 6 additions & 2 deletions lib/bgp/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,8 @@ defmodule BGP.Server do
Supervisor.init(
[
{Registry, keys: :unique, name: session_registry(args[:server])},
{BGP.Server.RDE, server: args[:server]},
{BGP.Server.Session.Group, args[:server]},
{BGP.Server.RDE, args[:server]},
{BGP.Server.Session.Supervisor, args[:server]},
{
ThousandIsland,
Expand Down Expand Up @@ -239,5 +240,8 @@ defmodule BGP.Server do
def session_registry(server), do: Module.concat(server, "Session.Registry")

@spec session_via(t(), IP.Address.t()) :: {:via, module(), term()}
def session_via(server, host), do: {:via, Registry, {session_registry(server), host}}
def session_via(server, hostname), do: {:via, Registry, {session_registry(server), hostname}}

@spec session_group_for(t()) :: module()
def session_group_for(server), do: Module.concat(server, "Session.Group")
end
100 changes: 89 additions & 11 deletions lib/bgp/server/rde.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,27 +6,105 @@ defmodule BGP.Server.RDE do
* performs route selection and maintains Adj-RIB-In, Loc-Rib, Adjs-RIB-Out in ETS.
* performs route dissemination to peers after processing updates.
```mermaid
stateDiagram-v2
[*] --> Idle
Idle --> Processing : state_timeout
Idle --> Processing : calculate
Processing --> Idle : route_dissemination
Processing --> Idle
```
"""

# TODO
# implement handle info for monitor
# {Ref, join, Group, [JoinPid1, JoinPid2]}

@behaviour :gen_statem

alias BGP.{Message.UPDATE, Server}
alias BGP.Server.Session.Group

use GenServer
require Logger

@spec start_link(Keyword.t()) :: GenServer.on_start()
def start_link(args),
do: GenServer.start_link(__MODULE__, args, name: Server.rde_for(args[:server]))
@doc false
def child_spec(server),
do: %{id: Server.rde_for(server), start: {__MODULE__, :start_link, [server]}}

@spec process_update(Server.t(), UPDATE.t()) :: :ok
def process_update(server, update),
do: GenServer.call(Server.rde_for(server), {:process_update, update})
do: :gen_statem.call(Server.rde_for(server), {:process_update, update})

@spec start_link(term()) :: :gen_statem.start_ret()
def start_link(server) do
:gen_statem.start_link({:local, Server.rde_for(server)}, __MODULE__, [server],
debug: [:trace]
)
end

@impl :gen_statem
def callback_mode, do: [:handle_event_function, :state_enter]

@impl :gen_statem
def init([server]) do
Group.monitor(server)

data = %{}
actions = [{:next_event, :internal, :accept_updates}]

{:ok, :idle, data, actions}
end

@impl GenServer
def init(_args) do
{:ok, %{rib: MapSet.new(), rib_in: MapSet.new(), rib_out: MapSet.new()}}
@impl :gen_statem
def handle_event(:enter, old_state, new_state, data) do
Logger.debug("RDE #{data}: #{old_state} -> #{new_state}")
:keep_state_and_data
end

@impl GenServer
def handle_call({:process_update, %UPDATE{}}, _from, state) do
{:reply, :ok, state}
def handle_event(:internal, :accept_updates, :idle, _data) do
{
:keep_state_and_data,
[{:state_timeout, 1_000}]
}
end

def handle_event(:state_timeout, _, :idle, data) do
{
:next_state,
:processing,
data,
[{:next_event, :internal, :calculate}]
}
end

def handle_event(:internal, :calculate, :processing, data) do
# TODO implementation
{
:next_state,
:idle,
data,
[
{:next_event, :internal, :route_dissemination}
]
}
end

def handle_event(:internal, :route_dissemination, :idle, _data) do
# TODO implementation
:keep_state_and_data
end

# @spec start_link(Keyword.t()) :: GenServer.on_start()
# def start_link(args),
# do: GenServer.start_link(__MODULE__, args, name: Server.rde_for(args[:server]))

# @impl GenServer
# def init(_args) do
# {:ok, %{rib: MapSet.new(), rib_in: MapSet.new(), rib_out: MapSet.new()}}
# end

# @impl GenServer
# def handle_call({:process_update, %UPDATE{}}, _from, state) do
# {:reply, :ok, state}
# end
end
20 changes: 17 additions & 3 deletions lib/bgp/server/session.ex
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ defmodule BGP.Server.Session do
alias BGP.Message.UPDATE.Attribute
alias BGP.Message.UPDATE.Attribute.{ASPath, NextHop, Origin}
alias BGP.Server.RDE
alias BGP.Server.Session.{Timer, Transport}
alias BGP.Server.Session.{Group, Timer, Transport}

alias ThousandIsland.Socket

Expand Down Expand Up @@ -184,7 +184,10 @@ defmodule BGP.Server.Session do
{
:keep_state,
%{data | buffer: <<>>, socket: nil},
[{:next_event, :internal, {:tcp_connection, :fails}}]
[
{:next_event, :internal, {:tcp_connection, :fails}},
{:next_event, :internal, :leave_session_group}
]
}
end

Expand Down Expand Up @@ -319,6 +322,16 @@ defmodule BGP.Server.Session do
}
end

def handle_event(:internal, :join_session_group, _state, %__MODULE__{} = data) do
Group.join(data.server, data.host)
:keep_state_and_data
end

def handle_event(:internal, :leave_session_group, _state, %__MODULE__{} = data) do
Group.leave(data.server, data.host)
:keep_state_and_data
end

def handle_event({:call, from}, {:check_collision, _peer_bgp_id}, :established, _data),
do: {:keep_state_and_data, [{:reply, from, {:error, :collision}}]}

Expand Down Expand Up @@ -1064,7 +1077,8 @@ defmodule BGP.Server.Session do
{:next_event, :internal, {:restart_timer, :as_origination, nil}},
{:next_event, :internal, {:restart_timer, :hold_time, nil}},
{:next_event, :internal, {:restart_timer, :route_advertisement, nil}},
{:next_event, :internal, {:send, compose_as_update(data)}}
{:next_event, :internal, {:send, compose_as_update(data)}},
{:next_event, :internal, :join_session_group}
]
}
end
Expand Down
37 changes: 37 additions & 0 deletions lib/bgp/server/session/group.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
defmodule BGP.Server.Session.Group do
@moduledoc false

alias BGP.Server

@typedoc "Server Session Scope"
@type scope :: module()

@typedoc "Server Session Group"
@type group :: atom()

@doc false
def child_spec(server) do
%{
id: Server.session_group_for(server),
start: {__MODULE__, :start_link, [server]}
}
end

@spec start_link(Server.t()) :: {:ok, pid()} | {:error, any()}
def start_link(server) do
:pg.start_link(Server.session_group_for(server))
end

@spec join(Server.t(), group()) :: :ok
def join(server, group),
do: :pg.join(Server.session_group_for(server), group, self())

@spec leave(Server.t(), group()) :: :ok | :not_joined
def leave(server, group),
do: :pg.leave(Server.session_group_for(server), group, self())

@spec monitor(Server.t()) :: {reference(), [pid()]}
def monitor(server) do
:pg.monitor_scope(Server.session_group_for(server))
end
end

0 comments on commit c86614c

Please sign in to comment.