Skip to content

Commit

Permalink
Feature/perf upgrades2 (#92)
Browse files Browse the repository at this point in the history
* feat: limit acks and pending msgs maps

* fix: remove dead code and add missing route in controller

* fix: incr test coverage

* fix: credo code recomendatios

* fix: apply code changes flagged by dialyzer

* fix: separate ci workflows

* fix: separate release workflow

* feat: added limit on retries for unack'ed messages

* fix: resolved PR comments

* fix: add test and doc yaml props
  • Loading branch information
gabheadz authored Dec 10, 2024
1 parent 6af21ff commit 8b24404
Show file tree
Hide file tree
Showing 53 changed files with 1,126 additions and 257 deletions.
106 changes: 106 additions & 0 deletions .github/workflows/build-channel-sender.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
name: "build channel-sender"
on:
push:
branches:
- main
- feature/*
- fix/*
paths:
- 'channel-sender/**'

pull_request:
branches:
- main
paths:
- 'channel-sender/**'

jobs:
build:
if: ${{ !contains(github.event.head_commit.message, '[skip ci]') }}
permissions:
contents: write
issues: write
pull-requests: write
name: Build and test
runs-on: ubuntu-latest
defaults:
run:
working-directory: ./channel-sender

steps:
- name: Generate a token of Github APP
id: generate_token
if: github.ref == 'refs/heads/main'
uses: tibdex/github-app-token@3beb63f4bd073e61482598c45c71c1019b59b73a # v2.1.0
with:
app_id: ${{ secrets.APP_ID_ADMIN_GITHUB }}
private_key: ${{ secrets.APP_PRIVATE_KEY_ADMIN_GITHUB }}
- uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332 # v4.1.7
if: github.ref == 'refs/heads/main'
with:
token: ${{ steps.generate_token.outputs.token }}
- uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332 # v4.1.7
if: github.ref != 'refs/heads/main'

- name: Verify Conventional Commits
uses: amannn/action-semantic-pull-request@0723387faaf9b38adef4775cd42cfd5155ed6017 # v5.5.3
if: github.event_name == 'pull_request' || github.event_name == 'pull_request_target'
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
- name: Set up NodeJS
if: github.ref == 'refs/heads/main'
uses: actions/setup-node@1e60f620b9541d16bece96c5465dc8ee9832be0b # v4.0.3
with:
node-version-file: './channel-sender/.nvmrc'
- name: Set up Semantic Release
if: github.ref == 'refs/heads/main'
run: npm -g install @semantic-release/git [email protected]
- name: Semantic Release
if: github.ref == 'refs/heads/main'
run: npx [email protected]
env:
GITHUB_TOKEN: ${{ steps.generate_token.outputs.token }}

- name: Set up Elixir
uses: erlef/[email protected]
with:
version-type: strict
version-file: "./channel-sender/.tool-versions"
- name: Restore dependencies cache
uses: actions/cache@v4
with:
path: deps
key: ${{ runner.os }}-mix-${{ hashFiles('**/mix.lock') }}
restore-keys: ${{ runner.os }}-mix-
- name: Install dependencies
run: mix deps.get
env:
SKIP_GIT_HOOKS: 'true'
- name: Code analysis
run: mix credo --strict && mix dialyzer
- name: Test generated code
run: mix test
- name: Tests & Coverage (main)
if: github.ref == 'refs/heads/main'
run: mix coveralls.github --umbrella
env:
CI_ENV: 'true'
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
- name: Tests & Coverage (pull requests)
if: github.event_name == 'pull_request' || github.event_name == 'pull_request_target'
run: mix coveralls.lcov --umbrella
env:
CI_ENV: 'true'
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
- name: Setup PR Report tool
if: github.event_name == 'pull_request' || github.event_name == 'pull_request_target'
uses: hrishikesh-kadam/setup-lcov@v1
- name: Validate code coverage
if: github.event_name == 'pull_request' || github.event_name == 'pull_request_target'
uses: zgosalvez/github-actions-report-lcov@v3
with:
coverage-files: ./channel-sender/cover/lcov.info
minimum-coverage: 70
artifact-name: code-coverage-report
github-token: ${{ secrets.GITHUB_TOKEN }}
update-comment: true
5 changes: 5 additions & 0 deletions .github/workflows/build-channel-streams.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,14 @@ on:
- main
- feature/*
- fix/*
paths:
- 'channel-streams/**'

pull_request:
branches:
- main
paths:
- 'channel-streams/**'

jobs:
build:
Expand Down
26 changes: 26 additions & 0 deletions .github/workflows/release-streams.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
name: Deploy image Async DataFlow channel streams
on:
push:
tags:
- 'streams_*' # Push events to matching streams_*, i.e. streams_1.5.0
jobs:
deploy:
defaults:
run:
working-directory: channel-streams
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@d171c3b028d844f2bf14e9fdec0c58114451e4bf
- name: Docker Login
uses: Azure/docker-login@74280010feb87216b500bd8971cf1402e1165da2
with:
username: ${{ secrets.DOCKER_USER }}
password: ${{ secrets.DOCKER_PASSWORD }}
- name: Set up QEMU
uses: docker/setup-qemu-action@v3
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Config Builder
run: docker buildx create --name mbuilder && docker buildx use mbuilder
- name: Docker Build Multiplatform
run: docker buildx build --platform linux/amd64 -t bancolombia/async-dataflow-channel-streams:${GITHUB_REF##*_} --push .
2 changes: 1 addition & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,4 @@ jobs:
- name: Config Builder
run: docker buildx create --name mbuilder && docker buildx use mbuilder
- name: Docker Build Multiplatform
run: docker buildx build --platform linux/amd64 -t bancolombia/async-dataflow-channel-sender:${GITHUB_REF##*/} --push .
run: docker buildx build --platform linux/amd64 -t bancolombia/async-dataflow-channel-sender:${GITHUB_REF##*_} --push .
1 change: 1 addition & 0 deletions channel-sender/.nvmrc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
v20.11.0
2 changes: 2 additions & 0 deletions channel-sender/.tool-versions
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
erlang 26.2.5.3
elixir 1.16.3-otp-26
2 changes: 0 additions & 2 deletions channel-sender/bench/channel.exs
Original file line number Diff line number Diff line change
Expand Up @@ -50,5 +50,3 @@ Benchee.run(
# parallel: 6,
formatters: [{Benchee.Formatters.Console, extended_statistics: true}]
)

IO.inspect(:erlang.process_info(socket_pid2))
54 changes: 45 additions & 9 deletions channel-sender/config/config-local.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,60 @@ channel_sender_ex:
secret_generator:
base: "aV4ZPOf7T7HX6GvbhwyBlDM8B9jfeiwi+9qkBnjXxUZXqAeTrehojWKHkV3U0kGc"
salt: "socket auth"
# Max time (in seconds) for a token to be valid
# this parameter is also used to hold the channel genstatemachine in wainting state
# before it is closed
max_age: 900

# initial time in seconds to wait before re-send a message not acked to a channel
initial_redelivery_time: 900

# max time in seconds to wait the client to send the auth token
# before closing the channel
socket_idle_timeout: 30000

# Specifies the maximum time (in milliseconds) that the supervisor waits
# for child processes to terminate after sending it an exit signal (:shutdown).
channel_shutdown_tolerance: 10000

# Specifies the maximum drift time (in seconds) the channel process
# will consider for emiting a new secret token before the current one expires.
# Time to generate will be the greater value between (max_age / 2) and
# (max_age - min_disconnection_tolerance)
min_disconnection_tolerance: 50

on_connected_channel_reply_timeout: 2000

# max time a channel process will wait to perform the send operation before times out
accept_channel_reply_timeout: 1000

# Allowed max number of unacknowledged messages per client connection
# after this limit is reached, oldes unacknowledged messages will be dropped
max_unacknowledged_queue: 100

# Allowed max number of retries to re-send unack'ed message to a channel
max_unacknowledged_retries: 10

# Allowed max number of messages pending to be sent to a channel
# received by sender while on waiting state (no socket connection)
max_pending_queue: 100

no_start: false
topology:
strategy: Elixir.Cluster.Strategy.Kubernetes
config:
mode: :hostname
kubernetes_ip_lookup_mode: :pods
kubernetes_service_name: "adfsender-headless"
kubernetes_node_basename: "channel_sender_ex"
kubernetes_selector: "cluster=beam"
namespace: "sendernm"
polling_interval: 5000
strategy: Elixir.Cluster.Strategy.Gossip # for local development

# strategy: Elixir.Cluster.Strategy.Kubernetes # topology for kubernetes
# config:
# mode: :hostname
# kubernetes_ip_lookup_mode: :pods
# kubernetes_service_name: "adfsender-headless"
# kubernetes_node_basename: "channel_sender_ex"
# kubernetes_selector: "cluster=beam"
# namespace: "sendernm"
# polling_interval: 5000

# see https://github.com/bancolombia/async-dataflow/tree/master/channel-sender/deploy_samples/k8s
# for more information about the kubernetes configuration with libcluser

logger:
level: debug
Expand Down
11 changes: 7 additions & 4 deletions channel-sender/lib/channel_sender_ex/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@ defmodule ChannelSenderEx.Application do
# See https://hexdocs.pm/elixir/Application.html
# for more information on OTP Applications
@moduledoc false
alias ChannelSenderEx.Transport.Rest.RestController
alias ChannelSenderEx.Transport.EntryPoint

alias ChannelSenderEx.ApplicationConfig
alias ChannelSenderEx.Core.RulesProvider.Helper
alias ChannelSenderEx.Transport.EntryPoint
alias ChannelSenderEx.Transport.Rest.RestController
alias ChannelSenderEx.Utils.ClusterUtils

use Application
require Logger
Expand All @@ -13,8 +16,8 @@ defmodule ChannelSenderEx.Application do

_config = ApplicationConfig.load()

ChannelSenderEx.Utils.ClusterUtils.discover_and_connect_local()
ChannelSenderEx.Core.RulesProvider.Helper.compile(:channel_sender_ex)
ClusterUtils.discover_and_connect_local()
Helper.compile(:channel_sender_ex)

no_start_param = Application.get_env(:channel_sender_ex, :no_start)
if !no_start_param do
Expand Down
19 changes: 15 additions & 4 deletions channel-sender/lib/channel_sender_ex/application_config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ defmodule ChannelSenderEx.ApplicationConfig do

require Logger

def load() do
def load do
config_file = Application.get_env(:channel_sender_ex, :config_file)
Logger.info("Loading configuration from #{inspect(config_file)}")

Expand Down Expand Up @@ -34,7 +34,6 @@ defmodule ChannelSenderEx.ApplicationConfig do
setup_config(%{})
end


end

def setup_config(config) do
Expand Down Expand Up @@ -91,9 +90,21 @@ defmodule ChannelSenderEx.ApplicationConfig do
Map.get(fetch(config, :channel_sender_ex), "socket_idle_timeout", 30_000)
)

Application.put_env(:channel_sender_ex, :max_unacknowledged_retries,
Map.get(fetch(config, :channel_sender_ex), "max_unacknowledged_retries", 20)
)

Application.put_env(:channel_sender_ex, :max_unacknowledged_queue,
Map.get(fetch(config, :channel_sender_ex), "max_unacknowledged_queue", 100)
)

Application.put_env(:channel_sender_ex, :max_pending_queue,
Map.get(fetch(config, :channel_sender_ex), "max_pending_queue", 100)
)

Application.put_env(:channel_sender_ex, :topology, parse_libcluster_topology(config))

if (config == %{}) do
if config == %{} do
Logger.warning("No valid configuration found!!!, Loading pre-defined default values : #{inspect(Application.get_all_env(:channel_sender_ex))}")
else
Logger.info("Succesfully loaded configuration: #{inspect(inspect(Application.get_all_env(:channel_sender_ex)))}")
Expand All @@ -107,7 +118,7 @@ defmodule ChannelSenderEx.ApplicationConfig do
case topology do
nil ->
Logger.warning("No libcluster topology defined!!! -> Using Default [Gossip]")
[ strategy: Cluster.Strategy.Gossip ]
[strategy: Cluster.Strategy.Gossip]
_ ->
[
strategy: String.to_existing_atom(topology["strategy"]),
Expand Down
76 changes: 76 additions & 0 deletions channel-sender/lib/channel_sender_ex/core/bounded_map.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
defmodule ChannelSenderEx.Core.BoundedMap do
@moduledoc """
A map with a maximum size, evicting the oldest key-value pair when the limit is exceeded.
"""

@type t :: {map(), list()}

# Initialize a new BoundedMap
def new, do: {%{}, []}

def size({map, _keys}), do: map_size(map)

@doc """
Put a key-value pair into the map. If the key already exists, update the value.
The oldest key-value pair is evicted when the size limit is exceeded.
The limit is set by the `max_size` parameter, defaulting to 100.
"""
@spec put(t, String.t, any, integer()) :: t
def put({map, keys}, key, value, max_size \\ 100) do
if Map.has_key?(map, key) do
# If the key already exists, update the map without changing keys
{Map.put(map, key, value), keys}
else
# Add new key-value pair
new_map = Map.put(map, key, value)
new_keys = [key | keys]

# Enforce the size limit
if map_size(new_map) > max_size do
oldest_key = List.last(new_keys)
{Map.delete(new_map, oldest_key), List.delete_at(new_keys, -1)}
else
{new_map, new_keys}
end
end
end

# Retrieve a value by key
def get({map, _keys}, key) do
Map.get(map, key)
end

# Pop a key-value pair
def pop({map, keys}, key) do
if Map.has_key?(map, key) do
{value, new_map} = Map.pop(map, key)
new_keys = List.delete(keys, key)
{value, {new_map, new_keys}}
else
# If the key does not exist, return the structure unchanged
{:noop, {map, keys}}
end
end

# delete a key-value pair
def delete({map, keys}, key) do
if Map.has_key?(map, key) do
new_map = Map.delete(map, key)
new_keys = List.delete(keys, key)
{new_map, new_keys}
else
# If the key does not exist, return the structure unchanged
{map, keys}
end
end

# Convert to a plain map
def to_map({map, _keys}), do: map

def merge({map, keys}, {map2, keys2}) do
new_map = Map.merge(map, map2)
new_keys = keys ++ keys2
{new_map, new_keys}
end

end
Loading

0 comments on commit 8b24404

Please sign in to comment.