From bef2fb7ea0001b9e954ec0323ada41db9697101e Mon Sep 17 00:00:00 2001 From: Frank Hunleth Date: Sat, 23 May 2020 18:51:54 -0400 Subject: [PATCH] Switch to custom circular buffer implementation This builds off in progress updates to the `circular_buffer` library to optimize it for the circular buffer case. This update reduces the work involved with storing log records. Currently other parts of RingLogger dominate in timings so the improvement from this update can only be seen if carefully monitoring the amount of memory used. --- lib/ring_logger/circular_buffer.ex | 140 +++++++++++++++++++++++++++++ lib/ring_logger/server.ex | 114 ++++++++++------------- 2 files changed, 189 insertions(+), 65 deletions(-) create mode 100644 lib/ring_logger/circular_buffer.ex diff --git a/lib/ring_logger/circular_buffer.ex b/lib/ring_logger/circular_buffer.ex new file mode 100644 index 0000000..070ca68 --- /dev/null +++ b/lib/ring_logger/circular_buffer.ex @@ -0,0 +1,140 @@ +defmodule RingLogger.CircularBuffer do + @moduledoc """ + Circular Buffer + + This is a modified version of https://github.com/keathley/circular_buffer + that doesn't use `:queue`. It creates less garbage than the `:queue` version + and is slightly faster in trivial benchmarks. RingLogger currently has other + limitations that make it hard to see these improvements. + + When creating a circular buffer you must specify the max size: + + ``` + cb = CircularBuffer.new(10) + ``` + """ + + # How does this work? + # + # There are two lists, `a` and `b`. New items are placed into list `a`. Old + # items are removed from list `b`. + # + # List `a` is ordered from newest to oldest, and list `b` is ordered from + # oldest to newest. Everything in list `a` is newer than list `b`. + # + # When the circular buffer is full, the normal case, inserting an + # item involves prepending it to `a` and removing the first item + # in list `b`. The list ordering makes these both O(1). + # + # When no more items can be removed from list `b`, list `a` is + # reversed and becomes the new list `b`. + # + # The functions for getting the oldest and newest items are also + # fast: The oldest item is the head of list `b`. The newest item + # is the head of list `a`. + + defstruct [:a, :b, :max_size, :count] + + alias __MODULE__, as: CB + + @doc """ + Creates a new circular buffer with a given size. + """ + def new(size) when is_integer(size) and size > 0 do + %CB{a: [], b: [], max_size: size, count: 0} + end + + @doc """ + Inserts a new item into the next location of the circular buffer + + Amortized run time: O(1) + Worst case run time: O(n) + """ + def insert(%CB{b: b} = cb, item) when b != [] do + %CB{cb | a: [item | cb.a], b: tl(b)} + end + + def insert(%CB{count: count, max_size: max_size} = cb, item) when count < max_size do + %CB{cb | a: [item | cb.a], count: cb.count + 1} + end + + def insert(%CB{b: []} = cb, item) do + new_b = cb.a |> Enum.reverse() |> tl() + %CB{cb | a: [item], b: new_b} + end + + @doc """ + Converts a circular buffer to a list. + + The list is ordered from oldest to newest elements based on their insertion + order. + + Worst case run time: O(n) + """ + def to_list(%CB{} = cb) do + cb.b ++ Enum.reverse(cb.a) + end + + @doc """ + Returns the newest element in the buffer + + Runs in constant time. + """ + def newest(%CB{a: [newest | _rest]}), do: newest + def newest(%CB{b: []}), do: nil + + @doc """ + Returns the oldest element in the buffer + + Mostly runs in constant time. Worst case O(n). + """ + def oldest(%CB{b: [oldest | _rest]}), do: oldest + def oldest(%CB{a: a}), do: List.last(a) + + @doc """ + Checks the buffer to see if its empty + + Runs in constant time + """ + def empty?(%CB{} = cb) do + cb.count == 0 + end + + defimpl Enumerable do + def count(cb) do + {:ok, cb.count} + end + + def member?(cb, element) do + {:ok, Enum.member?(cb.a, element) or Enum.member?(cb.b, element)} + end + + def reduce(cb, acc, fun) do + Enumerable.List.reduce(CB.to_list(cb), acc, fun) + end + + def slice(_cb) do + {:error, __MODULE__} + end + end + + defimpl Collectable do + def into(original) do + collector_fn = fn + cb, {:cont, elem} -> CB.insert(cb, elem) + cb, :done -> cb + _cb, :halt -> :ok + end + + {original, collector_fn} + end + end + + defimpl Inspect do + import Inspect.Algebra + + def inspect(cb, opts) do + concat(["#CircularBuffer<", to_doc(CB.to_list(cb), opts), ">"]) + end + end +end diff --git a/lib/ring_logger/server.ex b/lib/ring_logger/server.ex index 994647b..01be956 100644 --- a/lib/ring_logger/server.ex +++ b/lib/ring_logger/server.ex @@ -2,20 +2,15 @@ defmodule RingLogger.Server do use GenServer @moduledoc false + @default_max_size 1024 - alias RingLogger.Client - - @opts [:max_size] + alias RingLogger.{CircularBuffer, Client} defmodule State do @moduledoc false - @default_max_size 1024 - defstruct clients: [], - buffer: :queue.new(), - size: 0, - max_size: @default_max_size, + cb: nil, index: 0 end @@ -77,24 +72,32 @@ defmodule RingLogger.Server do @impl GenServer def init(opts) do - {:ok, merge_opts(opts, %State{})} + max_size = Keyword.get(opts, :max_size, @default_max_size) + + {:ok, %State{cb: CircularBuffer.new(max_size)}} end @impl GenServer def handle_call(:clear, _from, state) do - {:reply, :ok, %{state | buffer: :queue.new(), size: 0, index: state.index + state.size}} + max_size = state.cb.max_size + + {:reply, :ok, %{state | cb: CircularBuffer.new(max_size)}} end def handle_call(:config, _from, state) do - config = - Map.take(state, @opts) - |> Map.to_list() + config = %{max_size: state.cb.max_size} {:reply, config, state} end def handle_call({:configure, opts}, _from, state) do - {:reply, :ok, merge_opts(opts, state)} + case Keyword.get(opts, :max_size) do + nil -> + {:reply, :ok, state} + + max_size -> + {:reply, :ok, %State{state | cb: CircularBuffer.new(max_size)}} + end end def handle_call({:attach, client_pid}, _from, state) do @@ -105,29 +108,30 @@ defmodule RingLogger.Server do {:reply, :ok, detach_client(pid, state)} end - def handle_call({:get, start_index, n}, _from, state) do - resp = - cond do - start_index <= state.index -> - :queue.to_list(state.buffer) + def handle_call({:get, start_index, 0}, _from, state) do + first_index = state.index - Enum.count(state.cb) + adjusted_start_index = max(start_index - first_index, 0) + items = Enum.drop(state.cb, adjusted_start_index) - start_index >= state.index + state.size -> - [] + {:reply, items, state} + end + + def handle_call({:get, start_index, n}, _from, state) do + first_index = state.index - Enum.count(state.cb) + last_index = state.index - true -> - {_, buffer_range} = :queue.split(start_index - state.index, state.buffer) - :queue.to_list(buffer_range) - end + {adjusted_start_index, adjusted_n} = + {start_index, n} + |> adjust_left(first_index) + |> adjust_right(last_index) - paged_resp = if n <= 0, do: resp, else: Enum.take(resp, n) + items = Enum.slice(state.cb, adjusted_start_index, adjusted_n) - {:reply, paged_resp, state} + {:reply, items, state} end def handle_call({:tail, n}, _from, state) do - start = max(0, state.size - n) - {_, last_n} = :queue.split(start, state.buffer) - {:reply, :queue.to_list(last_n), state} + {:reply, Enum.take(state.cb, -n), state} end @impl GenServer @@ -146,6 +150,18 @@ defmodule RingLogger.Server do :ok end + defp adjust_left({offset, n}, i) when i > offset do + {i, max(0, n - (i - offset))} + end + + defp adjust_left(loc, _i), do: loc + + defp adjust_right({offset, n}, i) when i < offset + n do + {offset, i - offset} + end + + defp adjust_right(loc, _i), do: loc + defp attach_client(client_pid, state) do if !client_info(client_pid, state) do ref = Process.monitor(client_pid) @@ -172,46 +188,14 @@ defmodule RingLogger.Server do List.keyfind(state.clients, client_pid, 0) end - defp merge_opts(opts, state) do - opts = - opts - |> Keyword.take(@opts) - |> Enum.into(%{}) - - state - |> Map.merge(opts) - |> trim - end - - defp trim(%{max_size: max_size, size: size, buffer: buffer} = state) - when size > max_size do - trim = max_size - size - - buffer = Enum.reduce(1..trim, buffer, fn _, buf -> :queue.drop(buf) end) - - %{state | buffer: buffer, size: size} - end - - defp trim(state), do: state - defp push(level, {mod, msg, ts, md}, state) do - index = state.index + state.size + index = state.index log_entry = {level, {mod, msg, ts, Keyword.put(md, :index, index)}} Enum.each(state.clients, &send_log(&1, log_entry)) - ring_insert(state, log_entry) - end - - defp ring_insert(state, item) do - if state.size == state.max_size do - buffer = :queue.drop(state.buffer) - buffer = :queue.in(item, buffer) - %{state | buffer: buffer, index: state.index + 1} - else - buffer = :queue.in(item, state.buffer) - %{state | buffer: buffer, size: state.size + 1} - end + new_cb = CircularBuffer.insert(state.cb, log_entry) + %{state | cb: new_cb, index: index + 1} end defp send_log({client_pid, _ref}, log_entry) do