From bbcb4f4535bde7b583196e016dd9978326cef85f Mon Sep 17 00:00:00 2001 From: kidp330 Date: Tue, 21 Nov 2023 13:47:46 +0100 Subject: [PATCH 01/18] Update `handle_process` and `handle_write` to `handle_buffer` --- get_started_with_membrane/03_elements.md | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/get_started_with_membrane/03_elements.md b/get_started_with_membrane/03_elements.md index 54f9946..afcd605 100644 --- a/get_started_with_membrane/03_elements.md +++ b/get_started_with_membrane/03_elements.md @@ -84,8 +84,7 @@ Each option can have the following fields: - `default` - The value that the option will have if it's not specified. If the default is not provided, the option must be always explicitly specified. - `description` - Write here what the option does. It will be included in the module documentation. -We'll see a practical example of defining options in the [sample element](#sample-element). - +We'll see a practical example of defining options in the [sample element](#sample-element). ## Callbacks Apart from specifying pads and options, creating an element involves implementing callbacks. They have different responsibilities and are called in a specific order. As in the case of pipelines, callbacks interact with the framework by returning [actions](https://hexdocs.pm/membrane_core/Membrane.Element.Action.html). Here are some most useful callbacks: @@ -113,7 +112,7 @@ After `handle_playing`, you should expect the following callbacks to be called: - [handle_start_of_stream](https://hexdocs.pm/membrane_core/Membrane.Element.WithInputPads.html#c:handle_start_of_stream/3) is called just before the first buffer arrives from the preceding element -- [handle_process](https://hexdocs.pm/membrane_core/Membrane.Element.WithInputPads.html#c:handle_process/4) or [handle_write](https://hexdocs.pm/membrane_core/Membrane.Element.WithInputPads.html#c:handle_write/4) is called every time a buffer arrives from the preceding element +- [handle_buffer](https://hexdocs.pm/membrane_core/Membrane.Element.WithInputPads.html#c:handle_buffer/4) is called every time a buffer arrives from the preceding element - [handle_event](https://hexdocs.pm/membrane_core/Membrane.Element.Base.html#c:handle_event/4) is called once an event arrives from the preceding or subsequent element @@ -157,7 +156,7 @@ defmodule VolumeKnob do end @impl true - def handle_process(:input, buffer, ctx, state) do + def handle_buffer(:input, buffer, ctx, state) do stream_format = ctx.pads.input.stream_format sample_size = RawAudio.sample_size(stream_format) payload = @@ -213,11 +212,11 @@ end The callback does not return any actions (thus the empty list), but it saves the gain passed through options in the state. -Then goes the main part of the element - the `handle_process` callback: +Then goes the main part of the element - the `handle_buffer` callback: ```elixir @impl true -def handle_process(:input, buffer, ctx, state) do +def handle_buffer(:input, buffer, ctx, state) do ``` The callback is called whenever a buffer arrives on a pad, and receives four arguments: From 56a97a94466ef247145df5fb78fea88ec20254b9 Mon Sep 17 00:00:00 2001 From: kidp330 Date: Tue, 21 Nov 2023 13:48:03 +0100 Subject: [PATCH 02/18] Fix hyperlinks --- get_started_with_membrane/02_pipelines.md | 2 +- get_started_with_membrane/08_native_code_integration.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/get_started_with_membrane/02_pipelines.md b/get_started_with_membrane/02_pipelines.md index fbbc664..3b05037 100644 --- a/get_started_with_membrane/02_pipelines.md +++ b/get_started_with_membrane/02_pipelines.md @@ -49,7 +49,7 @@ To run the snippet, follow the steps below: - On Mac OS: `brew install libmad portaudio pkg-config` - On Debian: `apt install libmad0-dev portaudio19-dev` -- Option 1: Click [here](https://livebook.dev/run?url=https%3A%2F%2Fgithub.com%2Fmembraneframework%2Fmembrane_core%2Fblob%2Fmaster%2Fexample.livemd). You'll be directed to install [Livebook](livebook.dev), an interactive notebook similar to Jupyter, and it'll open the snippet in there for you. Then just click the 'run' button in there. +- Option 1: Click [here](https://livebook.dev/run?url=https%3A%2F%2Fgithub.com%2Fmembraneframework%2Fmembrane_core%2Fblob%2Fmaster%2Fexample.livemd). You'll be directed to install [Livebook](https://livebook.dev), an interactive notebook similar to Jupyter, and it'll open the snippet in there for you. Then just click the 'run' button in there. - Option 2: If you don't want to use Livebook, you can [install Elixir](https://elixir-lang.org/install.html), type `iex` to run the interactive shell and paste the snippet there. diff --git a/get_started_with_membrane/08_native_code_integration.md b/get_started_with_membrane/08_native_code_integration.md index a12cf03..a64ec38 100644 --- a/get_started_with_membrane/08_native_code_integration.md +++ b/get_started_with_membrane/08_native_code_integration.md @@ -15,5 +15,5 @@ For more information, see [Bundlex's GitHub page](https://github.com/membranefra Process of creating natives is not only difficult but also quite arduous, because it requires using cumbersome Erlang APIs, and thus a lot of boilerplate code. To make it more pleasant, we created [Unifex](https://github.com/membraneframework/unifex), a tool that is responsible for generating interfaces between C or C++ libraries and Elixir on the base of short `.exs` configuration files. An important feature of Unifex is that you can write the C/C++ code once and use it either as a NIF or as a C node. -A quick introduction to Unifex is available [here](https://hexdocs.pm/unifex/creating_unifex_nif.html). +A quick introduction to Unifex is available [here](https://hexdocs.pm/unifex/creating_unifex_natives.html). From 34493b689793444348f6a1e0749bf2d0c87c4485 Mon Sep 17 00:00:00 2001 From: kidp330 Date: Wed, 29 Nov 2023 16:14:37 +0100 Subject: [PATCH 03/18] Update 03_Source to v1.0 --- basic_pipeline/03_Source.md | 112 +++++++++++++++++++++--------------- 1 file changed, 65 insertions(+), 47 deletions(-) diff --git a/basic_pipeline/03_Source.md b/basic_pipeline/03_Source.md index 3961548..455de63 100644 --- a/basic_pipeline/03_Source.md +++ b/basic_pipeline/03_Source.md @@ -7,8 +7,8 @@ The first thing you need to be aware of is that `Membrane. Element` describes a Our process keeps a state which is updated in callbacks. We only need to provide an implementation of some callbacks in order to make our element act in the desired way. The set of callbacks that can be implemented depends on the type of the elements and we will get familiar with them during the implementation of these elements. -However, each callback is required to return a tuple of a [specific form](https://hexdocs.pm/membrane_core/Membrane.Pipeline.html#t:callback_return_t/0). -As you can see, we are returning a status of the operation, an optional list of [actions](https://hexdocs.pm/membrane_core/Membrane.Pipeline.Action.html#t:t/0) to be performed, and +However, each callback is required to return a tuple of a [specific form](https://hexdocs.pm/membrane_core/Membrane.Pipeline.html#t:callback_return/0). +As you can see, we are returning an optional list of [actions](https://hexdocs.pm/membrane_core/Membrane.Pipeline.Action.html#t:t/0) to be performed, and the updated state (which later on will be passed to the next invoked callback). Take your time and read about the possible actions which can be requested to be performed while returning from the callback. Their usage is crucial for the pipeline to work. @@ -38,8 +38,14 @@ Later on, we will make use of [macros](https://elixir-lang.org/getting-started/m ```elixir defmodule Basic.Elements.Source do ... - def_options location: [type: :string, description: "Path to the file"] - def_output_pad :output, [caps: {Packet, type: :custom_packets}, mode: :pull] + def_options location: [ + spec: String.t(), + description: "Path to the file" + ] + + def_output_pad :output, + accepted_format: %Packet{type: :custom_packets}, + flow_control: :manual ... end ``` @@ -53,75 +59,82 @@ Later on, while instantiating the Source element, we will be able to write: and the `:location` option will be passed during the construction of the element. -The second macro, `def_output_pad`, lets us define the output pad. The pad name will be `:output` (which is a default name for the output pad). The second argument of the macro describes the `:caps` - which is the type of data sent through the pad. As the code states, we want to send data in `Basic.Formats.Packet` format. -What's more, we have specified that the `:output` pad will work in the `:pull` mode. -You can read more on the pad specification [here](https://hexdocs.pm/membrane_core/Membrane.Pad.html#t:common_spec_options_t/0). +The second macro, `def_output_pad`, lets us define the output pad. The pad name will be `:output` (which is a default name for the output pad). The second argument of the macro describes the `:accepted_format` - which is the type of data sent through the pad. As the code states, we want to send data in `Basic.Formats.Packet` format. +What's more, we have specified that the `:output` pad will work in `:manual` mode. +You can read more on pad specification [here](https://hexdocs.pm/membrane_core/Membrane.Pad.html#types). ## Initialization of the element -Let's define our first callback! Why not start with [`handle_init/1`](https://hexdocs.pm/membrane_core/Membrane.Element.Base.html#c:handle_init/1), which gets called once the element is created? +Let's define our first callback! Why not start with [`handle_init/2`](https://hexdocs.pm/membrane_core/Membrane.Element.Base.html#c:handle_init/2), which gets called once the element is created? **_`lib/elements/Source.ex`_** ```elixir defmodule Basic.Elements.Source do ... - @impl true - def handle_init(options) do - {:ok, - %{ - location: options.location, - content: nil - } - } - end + @impl true + def handle_init(_context, options) do + {[setup: :incomplete], + %{ + location: options.location, + content: nil + }} + end ... end ``` -As said before, `handle_init/1` expects a structure with the previously defined parameters to be passed as an argument. -All we need to do there is to initialize the state - our state will be in a form of a map, and for now on we will put there a `location` (a path to the input file) and the `content`, where we will be holding packets read from the file, which haven't been sent yet. For now, the content is set to nil as we haven't read anything from the input file yet. +As said before, `handle_init/2` expects a structure with the previously defined parameters to be passed as an argument. +All we need to do there is to initialize the state - our state will be in a form of a map, and for now on we will put there a `location` (a path to the input file) and the `content`, where we will be holding packets read from the file, which haven't been sent yet. For now, the content is set to nil as we haven't read anything from the input file yet. That's also why we send back the action `setup: :incomplete`. In the next section we'll talk about more involved initialization and resources managed by our element. > ### TIP > > You might also wonder what is the purpose of the `@impl true` specifier, put just above the function signature - this is simply a way to tell the compiler > that the function defined below is about to be a callback. If we have misspelled the function name (or provided a wrong arguments list), we will be informed in the compilation time. -## Playback states +## Preparing our element -Before going further you should stop for the moment and read about the [playback states](https://hexdocs.pm/membrane_core/Membrane.Element.Action.html#t:playback_change_t/0) in which the Pipeline (and therefore - its elements) can be. Generally speaking, there are three playback states: **stopped**, **prepared**, and **playing**. The transition between the states can happen automatically or as a result of a user's explicit action. -The callbacks we are about to implement will be called once the transition between playback states occurs. +When an element requires more time to initialise, you should delegate complex tasks to `handle_setup/2`. This callback runs after `handle_init/2` if it returns the `setup: :incomplete` action. +In our example, we'd like to open, read and save the contents of the input file. We then save it in our state as `content`. **_`lib/elements/Source.ex`_** ```elixir defmodule Basic.Elements.Source do ... - @impl true - def handle_stopped_to_prepared(_ctx, state) do - raw_file_binary = File.read!(state.location) - content = String.split(raw_file_binary, "\n") - state = %{state | content: content} - { {:ok, [caps: {:output, %Packet{type: :custom_packets} }]}, state} - end + @impl true + def handle_setup(_context, state) do + content = + File.read!(state.location) + |> String.split("\n") + + new_state = %{state | content: content} + {[], new_state} + end + ... +end +``` - @impl true - def handle_prepared_to_stopped(_ctx, state) do - state = %{state | content: nil} - {:ok, state} - end +When the setup is complete, the element goes into `playing` state. It can then demand buffers from previous elements and send its `:stream_format` to receiving elements. Since we are implementing a sink we do not have anything to demand from, but we can specify the format. We can do this, for example, in `handle_playing/2`: + +**_`lib/elements/Source.ex`_** + +```elixir +defmodule Basic.Elements.Source do + ... + @impl true + def handle_playing(_context, state) do + {[stream_format: {:output, %Packet{type: :custom_packets}}], state} + end ... end ``` -In the case of the first callback, `handle_stopped_to_prepared/2`, what we do is that we are reading the file from the location specified in the options structure (which we have saved in the state of the element). -Then we split the content of the file to get the particular packets and save the list of those packets in the state of the element. -An interesting thing here is the action we are returning - the `:caps` action. That means that we want to transmit the information about the supported [caps](../glossary/glossary.md#caps) through the `output` pad, to the next element in the pipeline. In the [chapter 4](../basic_pipeline/04_Caps.md) you will find out more about caps and formats and learn why it is required to do so. -The second callback, `handle_prepared_to_stopped`, defines the behavior of the Source element while we are stopping the pipeline. What we want to do is to clear the content buffer in the state of our element. +The `:stream_format` action means that we want to transmit the information about the supported [formats](../glossary/glossary.md#stream-format-formerly-caps) through the `output` pad, to the next element in the pipeline. In [chapter 4](../basic_pipeline/04_Caps.md) you will find out more about stream formats and learn why it is required to do so. ## Demands -Before going any further let's stop for a moment and talk about the demands. Do you remember, that the `:output` pad is working in the pulling mode? That means that the succeeding element have to ask the Source element for the data to be sent and our element has to take care of keeping that data in some kind of buffer until it is requested. +Before going any further let's stop for a moment and talk about the demands. Do you remember, that the `:output` pad is working in `:manual` mode? That means that the succeeding element has to ask the Source element for the data to be sent and our element has to take care of keeping that data in some kind of buffer until it is requested. Once the succeeding element requests for the data, the `handle_demand/4` callback will be invoked - therefore it would be good for us to define it: **_`lib/elements/Source.ex`_** @@ -133,13 +146,17 @@ defmodule Basic.Elements.Source do @impl true def handle_demand(:output, _size, :buffers, _ctx, state) do if state.content == [] do - { {:ok, end_of_stream: :output}, state} + { [end_of_stream: :output], state} else [first_packet | rest] = state.content - state = %{state | content: rest} - action = [buffer: {:output, %Buffer{payload: first_packet} }] - action = action ++ [redemand: :output] - { {:ok, action}, state} + new_state = %{state | content: rest} + + actions = [ + buffer: {:output, %Buffer{payload: first_packet}}, + redemand: :output + ] + + {actions, new_state} end end ... @@ -148,8 +165,9 @@ end The callback's body describes the situation in which some buffers were requested. Then we are checking if we have any packets left in the list persisting in the state of the element. If that list is empty, we are sending an `end_of_stream` action, indicating that there will be no more buffers sent through the `:output` pad and that is why there is no point in requesting more buffers. However, in case of the `content` list of packets being non-empty, we are taking the head of that list, and storing the remaining tail of the list in the state of the element. Later on, we are defining the actions we want to take - that is, we want to return a buffer with the head packet from the original list. We make use of the [`buffer:` action](https://hexdocs.pm/membrane_core/Membrane.Element.Action.html#t:buffer_t/0), and specify that we want to transmit the [`%Buffer`](https://hexdocs.pm/membrane_core/Membrane.Buffer.html#t:t/0) structure through the `:output` pad. Note the fields available in the `%Buffer` structure - in our case, we make use of only the `:payload` field, which, according to the documentation, can be of `any` type - however, in almost all cases you will need to send binary data within this field. Any structured data (just like timestamps etc.) should be passed in the other fields available in the `%Buffer`, designed especially for that cases. -However, there is the other action that is taken - the `:redemand` action, queued to take place on the `:output` pad. This action will simply invoke the `handle_demand/4` callback once again, which is helpful when the whole demand cannot be completely fulfilled in the single `handle_demand` invocation we are just processing. The great thing here is that the `size` of the demand will be automatically determined by the element and we do not need to specify it anyhow. Redemanding, in the context of sources, helps us simplify the logic of the `handle_demand` callback since all we need to do in that callback is to supply a single piece of data and in case this is not enough, take a [`:redemand`](https://hexdocs.pm/membrane_core/Membrane.Element.Action.html#t:redemand_t/0) action and invoke that callback once again. As you will see later, the process of redemanding is even more powerful in the context of the [filter elements](../glossary/glossary.md#filter). +However, there is the other action that is taken - the `:redemand` action, queued to take place on the `:output` pad. This action will simply invoke the `handle_demand/4` callback once again, which is helpful when the whole demand cannot be completely fulfilled in the single `handle_demand` invocation we are just processing. The great thing here is that the `size` of the demand will be automatically determined by the element and we do not need to specify it anyhow. Redemanding, in the context of sources, helps us simplify the logic of the `handle_demand` callback since all we need to do in that callback is to supply a single piece of data and in case this is not enough, take a [`:redemand`](https://hexdocs.pm/membrane_core/Membrane.Element.Action.html#t:redemand_t/0) action and invoke that callback once again. As you will see later, the process of redemanding is even more powerful in the context of [filter elements](../glossary/glossary.md#filter). +But don't give up if you don't grasp demands just yet! :) Membrane also supports `:auto` flow control, which takes care of demands and should be enough for 90% of use cases. -By now you should have created `Basic.Element.Source` element, with options and output pads defined and its `handle_init/1`, `handle_stopped_to_prepared/2`, `handle_prepared_to_stopped/2` and `handle_demand/5` callbacks implemented. +By now you should have created a `Basic.Element.Source` element, with options and output pads defined and its `handle_init/2`, `handle_setup/2`, `handle_playing/2` and `handle_demand/5` callbacks implemented. -In the next chapter we will explore what `caps` are in Membrane. +In the next chapter we will explore what stream formats are in Membrane. From 3ceb2d0142ef176720c234017bcb81b0da5d7cd0 Mon Sep 17 00:00:00 2001 From: kidp330 Date: Wed, 29 Nov 2023 17:16:39 +0100 Subject: [PATCH 04/18] Update chapter 4 --- basic_pipeline/04_Caps.md | 136 ------------------------------ basic_pipeline/04_StreamFormat.md | 136 ++++++++++++++++++++++++++++++ 2 files changed, 136 insertions(+), 136 deletions(-) delete mode 100644 basic_pipeline/04_Caps.md create mode 100644 basic_pipeline/04_StreamFormat.md diff --git a/basic_pipeline/04_Caps.md b/basic_pipeline/04_Caps.md deleted file mode 100644 index 995832c..0000000 --- a/basic_pipeline/04_Caps.md +++ /dev/null @@ -1,136 +0,0 @@ -# Caps - -We owe you something...and we would like to pay it back as soon as possible! -As promised in the [3rd chapter](03_Source.md), we will talk more about the concept of caps - which in fact we have used in the previous chapter, but which weren't described sufficiently. - -## What are caps? - -Caps (an abbreviation of the _capabilities_) is a concept allowing us to define what kind of data is flowing through the [pad](../glossary/glossary.md#pad). -In the Membrane Framework's nomenclature, we say, that we define a caps specification for a given [element](../glossary/glossary.md#element). - -We believe that an example might speak here louder than a plain definition, so we will try to describe the caps with the real-life scenario example. -Let's say that we are connecting two elements that process the video multimedia. -The link is made between the pads which are working on raw video data. -Here is where caps come up - they can be defined with the following constraints: - -- data format - in our case, we are having a raw video format -- some additional constraints - i.e. [frame](../glossary/glossary.md#frame) resolution (480p) , framerate (30 fps) etc. - -Caps help us find out if the given elements are capable to communicate with each other. Not only we cannot send the data between the pads if the format they are expecting is different - we need to take into consideration some other constraints! We can think of a situation in which the format would be the same (i.e. raw video data), but the element which receives the data performs a much more complex computation on that data than the sender, and therefore cannot digest such a great amount of data as the sender is capable of transmitting. Then their caps wouldn't be compatible, which could be expressed by adding some constraint, i.e. framerate. - -Caps help us define a contract between elements and prevent us from connecting incompatible elements. That is why it is always better to define precise caps rather than using caps of type `:any`. - -## When the caps are compatible? - -A comparison between caps is made when the pads are connected. Due to freedom in defining the type, the comparison is not that straightforward. It would be good for a responsible Membrane's element architect to be aware of how the caps are compared. You can refer to the implementation of the caps matcher, available [here](https://github.com/membraneframework/membrane_core/blob/82d6162e3df94cd9abc508c58bc0267367b02d58/lib/membrane/caps/matcher.ex#L124)... or follow on this chapter, and learn it by an example. -Here is how you define a caps specification: - -1. First you need to specify the format module - -```elixir -defmodule Formats.Raw do - defstruct [:pixel_format, :framerate, :width, :height] -end -``` - -Module name defines the type of the caps, however it is possible to pass some other options in a form of a struct. That is why we have defined a structure with the use of `defstruct`. Our format will be described with the following options: - -- :pixel_format - pixel format, i.e. [I420](https://en.wikipedia.org/wiki/Chroma_subsampling) ([YUV](https://en.wikipedia.org/wiki/YUV)) or RGB888 -- :framerate - number of frames per second, i.e. 30 (FPS) -- :width - width of the picture in pixels, i.e. 480 (px) -- :height - height of the picture in pixels, i.e. 300 (px) - -2. We specify the pad of the element with the format we have just defined, using the `:caps` option. For the purpose of an example, let it be the `:input` pad: - -```elixir -def_input_pad(:input, - demand_unit: :buffers, - caps: [ - {Format.Raw, pixel_format: one_of([:I420, :I422]), framerate: range(30, 60), width: 480, height: 300}, - {Format.Raw, pixel_format: one_of([:I420, :I422]), framerate: range(30, 60), width: 720, height: 480} - ] -) -``` - -As you can see, we pass a list of compatible formats, each described with the tuple, consisting of our module name, and the keywords list fulfilling the -structure defined in that module. For the format's options, we can use the `range/2` or `one_of/1` specifier, which will modify the way in which the comparison between the caps specification and the actual caps received by the element is performed. - -3. Once the caps event comes to the element's pad, the caps description sent in that event is confronted with each of the formats in the caps specification list of the pad. If the event's caps description matches even one of the caps formats present in the list it means that caps are matching. - To match the caps with the particular format (one from the caps specification list), the module (first element of the tuple in caps format description) must be the same and all the options must match. For each option, a value sent within the event is confronted with the specification of the option. The way comparison occurs is dependent on how we defined that option in the specification: - -- We have used `framerate: range(30, 60)`, so will accept the framerate value in the given interval, between 30 and 60 FPS. -- We have also used `pixel_format: one_of([:I420, :I422]`, and that will accept caps, whose pixel format is either I420 or I422 -- We have used a plain value to specify the `width` and the `height` of a picture - the caps will match if that option will be equal to the value passed in the specification - -4. As noted previously, one can specify the caps as `:any`. Such a specification will match all the caps sent on the pad, however, it is not a recommended way to develop the element - caps are there for some reason! - -Our journey with caps does not end here. We know how to describe caps specification...but we also need to make our elements send the caps events so that the following elements will be aware of what type of data our element is producing! - -An element can send caps as one of the [actions](https://hexdocs.pm/membrane_core/Membrane.Element.Action.html) it can take - the [`:caps` action](https://hexdocs.pm/membrane_core/Membrane.Element.Action.html#t:caps_t/0). - -Another thing is that we can specify the behavior of an element when it receives the caps with the use of [`handle_caps/4` callback](https://hexdocs.pm/membrane_core/Membrane.Element.WithInputPads.html#c:handle_caps/4). - -For all the [filter elements](../glossary/glossary.md#filter), `handle_caps/4` has a default implementation, which is relaying the received caps on all the output pads of that filter. -However, if your filter is changing the format of data being sent, it should override the implementation of that callback to prevent caps flying through it, and send the proper caps via the output pads. - -For the [source element](../glossary/glossary.md#source), it is necessary to send the caps as in each [pipeline](../glossary/glossary.md#pipeline) the source is the first element - caps wouldn't flow through the pipeline if the source element wouldn't have sent them. Sending can be done in the `handle_stopped_to_prepared/2` callback. - -## Example - -Imagine a pipeline, which starts with the source producing a video, which is then passed to the filter, responsible for reducing the quality of that video if it is too high. -For the source element, we should have the `:output` pads caps which would allow us to send video in the higher and in the lower quality. The same caps should be specified on the input of the filter element. However, the caps on the output of the filter should accept only video in the lower quality. -Here is the definition of the source element: - -```elixir -# Source element - -defmodule Source do - def_output_pad(:output, - demand_unit: :buffers, - caps: [ - {Format.Raw, pixel_format: one_of([:I420, :I422]), framerate: range(30, 60), width: 480, height: 300}, - {Format.Raw, pixel_format: one_of([:I420, :I422]), framerate: range(30, 60), width: 720, height: 480} - ] - ) - ... - def handle_stopped_to_prepared(_ctx, state) do - ... - { {:ok, [caps: {:output, %Formats.Raw{pixel_format: I420, framerate: 45, width: 720, height: 300} }]}, state} - end -``` - -While returning from the `handle_stopped_to_prepared/2` callback, the element will send the caps described by the `Formats.Raw` structure, through the `:output` pad. -Will those caps meet the caps specification provided by us? Think about it! -In fact, they will. The format matches (both in the caps being sent and in the caps specification of the pad, we have `Format.Raw` module). When it comes to the options, we see, that `I420` is in the `one_of` list, acceptable by the caps specification format for `width` equal to 720 and `height` equal to 480, and the `framerate`, equal to 45, is in the `range` between 30 and 60, as defined in the caps specification. -It means that the caps can be sent through the `:output` pad. -Below there is the draft of the filter implementation: - -```elixir -# Filter - -defmodule Filter do - def_input_pad(:input, - demand_unit: :buffers, - caps: [ - {Format.Raw, pixel_format: one_of([:I420, :I422]), framerate: range(30, 60), width: 480, height: 300}, - {Format.Raw, pixel_format: one_of([:I420, :I422]), framerate: range(30, 60), width: 720, height: 480} - ] - ) - - def_output_pad(:output, - demand_unit: :buffers, - caps: {Format.Raw, pixel_format: one_of([:I420, :I422]), framerate: range(30, 60), width: 480, height: 300}, - ) - - ... - def handle_caps(_pad, _caps, _context, state) do - ... - { {:ok, [caps: {:output, %Formats.Raw{pixel_format: I420, framerate: 60, width: 480, height:300} }]}, state} - end - -end -``` - -When we receive the caps on the input pad, we do not propagate them to our `:output` pad - instead, we send other caps, with reduced quality (width and height options of the format are lower). - -We hope by now you have a better understanding of what `caps` are. This knowledge will helpful in the following chapters. diff --git a/basic_pipeline/04_StreamFormat.md b/basic_pipeline/04_StreamFormat.md new file mode 100644 index 0000000..13eb9d1 --- /dev/null +++ b/basic_pipeline/04_StreamFormat.md @@ -0,0 +1,136 @@ +# Stream format + +We owe you something...and we would like to pay it back as soon as possible! +As promised in the [3rd chapter](03_Source.md), we will talk more about the concept of stream formats - which in fact we have used in the previous chapter, but which weren't described sufficiently. + +## Why formats are important + +Specifying stream formats allows us to define what kind of data is flowing through the [pad](../glossary/glossary.md#pad). +This isn't necessarily limited to data formats, as you'll see below. +In the Membrane Framework's nomenclature, we say, that we define a stream format specification for a given [element](../glossary/glossary.md#element). + +We believe that an example might speak here louder than a plain definition, so we will try to describe it with a real-life scenario example. +Let's say that we are connecting two elements that process the video multimedia. +The link is made between the pads which are working on raw video data. +Here is where stream formats come up - they can be defined with the following constraints: + +- data format - in our case, we are having a raw video format +- some additional constraints - i.e. [frame](../glossary/glossary.md#frame) resolution (480p) , framerate (30 fps) etc. + +The `:stream_format` action helps us find out if the given elements are capable to communicate with each other. Not only can we not send the data between the pads if the format they are expecting is different - we need to take into consideration some other constraints! We can think of a situation in which the _data format_ would be the same (i.e. raw video data), but the element which receives the data performs a much more complex computation on that data than the sender, and therefore cannot digest such a great amount of data as the sender is capable of transmitting. Then their stream formats wouldn't be compatible, which could be expressed by adding some constraint, i.e. framerate. + +Stream formats help us define a contract between elements and prevent us from connecting incompatible elements. That is why it is always better to define precise constraints rather than using `stream_format: :any`. + +## When are stream formats compatible? + +A comparison between formats is made when an input pad receives the `stream_format` action, and checks whether it matches its [accepted format](https://hexdocs.pm/membrane_core/Membrane.Pad.html#t:accepted_format/0). +Here is how you define a stream format specification: + +1. First you need to specify the format module + +```elixir +defmodule Formats.Raw do + defstruct [:pixel_format, :framerate, :width, :height] +end +``` + +Module name defines the type of the format, however it is possible to pass some other options in a form of a struct. That is why we have defined a structure with the use of `defstruct`. Our format will be described with the following options: + +- :pixel_format - pixel format, i.e. [I420](https://en.wikipedia.org/wiki/Chroma_subsampling) ([YUV](https://en.wikipedia.org/wiki/YUV)) or RGB888 +- :framerate - number of frames per second, i.e. 30 (FPS) +- :width - width of the picture in pixels, i.e. 480 (px) +- :height - height of the picture in pixels, i.e. 300 (px) + +2. We specify the pad of the element with the format we have just defined, using the `:accepted_format` option. For the purpose of an example, let it be the `:input` pad: + +```elixir +def_input_pad(:input, + demand_unit: :buffers, + accepted_format: [ + {Format.Raw, pixel_format: one_of([:I420, :I422]), framerate: range(30, 60), width: 480, height: 300}, + {Format.Raw, pixel_format: one_of([:I420, :I422]), framerate: range(30, 60), width: 720, height: 480} + ] +) +``` + +As you can see, we pass a list of compatible formats, each described with the tuple, consisting of our module name, and the keywords list fulfilling the +structure defined in that module. For the format's options, we can use the `range/2` or `one_of/1` specifier, which will modify the way in which the comparison between the accepted specification and the actual format received by the element is performed. + +3. Once the `:stream_format` event comes to the element's pad, the format description sent in that event is confronted with each of the formats in the specification list of the pad. If the event's format description matches even one of the formats present in the list it means that they are matching. + +- We have used `framerate: range(30, 60)`, so will accept the framerate value in the given interval, between 30 and 60 FPS. +- We have also used `pixel_format: one_of([:I420, :I422]`, and that will accept formats, whose pixel format is either I420 or I422 +- We have used a plain value to specify the `width` and the `height` of a picture - the format will match if that option will be equal to the value passed in the specification + +4. As noted previously, one can specify the format as `:any`. Such a specification will match all the formats sent on the pad, however, it is not a recommended way to develop the element - formats are there for a reason! + +Our journey with stream formats does not end here. We know how to describe their specification...but we also need to make our elements send the `:stream_format` events so that the following elements will be aware of what type of data our element is producing! + +An element can send a stream format as one of the [actions](https://hexdocs.pm/membrane_core/Membrane.Element.Action.html) it can take - the [`:stream_format` action](https://hexdocs.pm/membrane_core/Membrane.Element.Action.html#t:stream_format/0). + +Another thing is that we can specify the behavior of an element when it receives the stream format with the use of [`handle_stream_format/4` callback](https://hexdocs.pm/membrane_core/Membrane.Element.WithInputPads.html#c:handle_stream_format/4). + +For all the [filter elements](../glossary/glossary.md#filter), `handle_stream_format/4` has a default implementation, which is relaying the received format on all the output pads of that filter. +However, if your filter is changing the format of data being sent, it should override the implementation of that callback to prevent formats flying through it, and send the proper spec via the output pads. + +For the [source element](../glossary/glossary.md#source), it is necessary to send the format as in each [pipeline](../glossary/glossary.md#pipeline) the source is the first element - formats wouldn't flow through the pipeline if the source element wouldn't have sent them. Sending can be done in the `handle_playing/2` callback. + +## Example + +Imagine a pipeline, which starts with the source producing a video, which is then passed to the filter, responsible for reducing the quality of that video if it is too high. +For the source element, we should have the `:output` pads format which would allow us to send video in the higher and in the lower quality. The same format should be specified on the input of the filter element. However, the stream format on the output of the filter should accept only video in the lower quality. +Here is the definition of the source element: + +```elixir +# Source element + +defmodule Source do + def_output_pad(:output, + demand_unit: :buffers, + stream_format: [ + {Format.Raw, pixel_format: one_of([:I420, :I422]), framerate: range(30, 60), width: 480, height: 300}, + {Format.Raw, pixel_format: one_of([:I420, :I422]), framerate: range(30, 60), width: 720, height: 480} + ] + ) + ... + def handle_playing(_ctx, state) do + ... + { {[stream_format: {:output, %Formats.Raw{pixel_format: I420, framerate: 45, width: 720, height: 300} }]}, state} + end +``` + +While returning from the `handle_playing/2` callback, the element will send the format described by the `Formats.Raw` structure, through the `:output` pad. +Will this format meet the accepted specification provided by us? Think about it! +In fact, it will. The format matches (both in the event being sent and in the accepted specification of the pad, we have `Format.Raw` module). When it comes to the options, we see, that `I420` is in the `one_of` list, acceptable by the specification format for `width` equal to 720 and `height` equal to 480, and the `framerate`, equal to 45, is in the `range` between 30 and 60, as defined in the specification. +It means that the format can be sent through the `:output` pad. +Below there is the draft of the filter implementation: + +```elixir +# Filter + +defmodule Filter do + def_input_pad(:input, + demand_unit: :buffers, + accepted_format: [ + {Format.Raw, pixel_format: one_of([:I420, :I422]), framerate: range(30, 60), width: 480, height: 300}, + {Format.Raw, pixel_format: one_of([:I420, :I422]), framerate: range(30, 60), width: 720, height: 480} + ] + ) + + def_output_pad(:output, + demand_unit: :buffers, + accepted_format: {Format.Raw, pixel_format: one_of([:I420, :I422]), framerate: range(30, 60), width: 480, height: 300}, + ) + + ... + def handle_stream_format(_pad, _stream_format, _context, state) do + ... + { {:ok, [stream_format: {:output, %Formats.Raw{pixel_format: I420, framerate: 60, width: 480, height:300} }]}, state} + end + +end +``` + +When we receive the spec on the input pad, we do not propagate it to our `:output` pad - instead, we send a different format, with reduced quality (width and height options are lower). + +We hope by now you have a better understanding of what stream formats are. This knowledge will be helpful in the following chapters. From 9c0130356034631e47613543c61775bdb505002c Mon Sep 17 00:00:00 2001 From: kidp330 Date: Wed, 29 Nov 2023 17:16:59 +0100 Subject: [PATCH 05/18] Update chapter 5 --- basic_pipeline/05_Formats.md | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/basic_pipeline/05_Formats.md b/basic_pipeline/05_Formats.md index 54c41bf..7a8a205 100644 --- a/basic_pipeline/05_Formats.md +++ b/basic_pipeline/05_Formats.md @@ -1,10 +1,10 @@ -# Formats +# Stream formats in action -Since we have already discussed what caps are and how to use them, let's make use of them in our project! +Since we have already discussed what stream formats are and how to use them, let's make use of them in our project! The first thing to do is to define modules responsible for describing the formats used in our pipeline. We will put them in a separate directory - `lib/formats`. Let's start with the format describing the [packets](../glossary/glossary.md#packet): -**_`lib/formats/PacketFormat.ex`_** +**_`lib/formats/packet_format.ex`_** ```elixir defmodule Basic.Formats.Packet do @@ -15,11 +15,11 @@ defmodule Basic.Formats.Packet do end ``` -The definition of the module is not complicated, as you can see in the code snippet above - we are only defining a structure within that module, with a `:type` parameter, which default value is `:custom_packtes`. +The definition of the module is not complicated, as you can see in the code snippet above - we are only defining a structure within that module, with a `:type` parameter, whose default value is `:custom_packets`. In our [pipeline](../glossary/glossary.md#pipeline) we will also send another type of data - [frames](../glossary/glossary.md#frame). Let's define a format for them: -**_`lib/formats/FrameFormat.ex`_** +**_`lib/formats/frame_format.ex`_** ```elixir defmodule Basic.Formats.Frame do @@ -32,7 +32,7 @@ end Same as in the case of the previous format - we are defining a structure with a single field, called `:encoding`, and the default value of that field - `:utf8`. -That's it! Format modules are really simple - the more complicated thing is to make use of them - which we will do in the subsequent chapters while defining the caps! +That's it! Format modules are really simple - the more complicated thing is to make use of them - which we will do in the subsequent chapters while defining the specs! Before advancing you can test the `Source` [element](../glossary/glossary.md/#source), using the tests provided in `/test` directory. From 3f71657a513a0f620e0338945f7df64afb65d684 Mon Sep 17 00:00:00 2001 From: kidp330 Date: Wed, 6 Dec 2023 11:18:05 +0100 Subject: [PATCH 06/18] Further fixes to 03-04 --- basic_pipeline/03_Source.md | 44 +++++++++++++++---------------- basic_pipeline/04_StreamFormat.md | 2 +- 2 files changed, 23 insertions(+), 23 deletions(-) diff --git a/basic_pipeline/03_Source.md b/basic_pipeline/03_Source.md index 455de63..5c80571 100644 --- a/basic_pipeline/03_Source.md +++ b/basic_pipeline/03_Source.md @@ -12,13 +12,13 @@ As you can see, we are returning an optional list of [actions](https://hexdocs.p the updated state (which later on will be passed to the next invoked callback). Take your time and read about the possible actions which can be requested to be performed while returning from the callback. Their usage is crucial for the pipeline to work. -As you can judge based on the structure of the project, all the elements will be put in the `lib/elements` directory. Therefore there is a place where `Source.ex` with the `Basic.Elements.Source` module's definition should be placed. +As you can judge based on the structure of the project, all the elements will be put in the `lib/elements` directory. Therefore there is a place where `source.ex` with the `Basic.Elements.Source` module's definition should be placed. ## What makes our module a Membrane Framework's element? Let's start with specifying that our module will implement the `Membrane.Source` behavior as well as alias the modules which will be used later in the module's code: -**_`lib/elements/Source.ex`_** +**_`lib/elements/source.ex`_** ```elixir defmodule Basic.Elements.Source do @@ -33,7 +33,7 @@ end Later on, we will make use of [macros](https://elixir-lang.org/getting-started/meta/macros.html) defined in the `Membrane.Source` module: -**_`lib/elements/Source.ex`_** +**_`lib/elements/source.ex`_** ```elixir defmodule Basic.Elements.Source do @@ -67,7 +67,7 @@ You can read more on pad specification [here](https://hexdocs.pm/membrane_core/M Let's define our first callback! Why not start with [`handle_init/2`](https://hexdocs.pm/membrane_core/Membrane.Element.Base.html#c:handle_init/2), which gets called once the element is created? -**_`lib/elements/Source.ex`_** +**_`lib/elements/source.ex`_** ```elixir defmodule Basic.Elements.Source do @@ -97,7 +97,7 @@ All we need to do there is to initialize the state - our state will be in a form When an element requires more time to initialise, you should delegate complex tasks to `handle_setup/2`. This callback runs after `handle_init/2` if it returns the `setup: :incomplete` action. In our example, we'd like to open, read and save the contents of the input file. We then save it in our state as `content`. -**_`lib/elements/Source.ex`_** +**_`lib/elements/source.ex`_** ```elixir defmodule Basic.Elements.Source do @@ -117,7 +117,7 @@ end When the setup is complete, the element goes into `playing` state. It can then demand buffers from previous elements and send its `:stream_format` to receiving elements. Since we are implementing a sink we do not have anything to demand from, but we can specify the format. We can do this, for example, in `handle_playing/2`: -**_`lib/elements/Source.ex`_** +**_`lib/elements/source.ex`_** ```elixir defmodule Basic.Elements.Source do @@ -137,28 +137,28 @@ The `:stream_format` action means that we want to transmit the information about Before going any further let's stop for a moment and talk about the demands. Do you remember, that the `:output` pad is working in `:manual` mode? That means that the succeeding element has to ask the Source element for the data to be sent and our element has to take care of keeping that data in some kind of buffer until it is requested. Once the succeeding element requests for the data, the `handle_demand/4` callback will be invoked - therefore it would be good for us to define it: -**_`lib/elements/Source.ex`_** +**_`lib/elements/source.ex`_** ```elixir defmodule Basic.Elements.Source do ... - @impl true - def handle_demand(:output, _size, :buffers, _ctx, state) do - if state.content == [] do - { [end_of_stream: :output], state} - else - [first_packet | rest] = state.content - new_state = %{state | content: rest} - - actions = [ - buffer: {:output, %Buffer{payload: first_packet}}, - redemand: :output - ] - - {actions, new_state} + @impl true + def handle_demand(:output, _size, :buffers, _context, state) do + if state.content == [] do + {[end_of_stream: :output], state} + else + [first_packet | rest] = state.content + new_state = %{state | content: rest} + + actions = [ + buffer: {:output, %Buffer{payload: first_packet}}, + redemand: :output + ] + + {actions, new_state} + end end - end ... end ``` diff --git a/basic_pipeline/04_StreamFormat.md b/basic_pipeline/04_StreamFormat.md index 13eb9d1..49bb58f 100644 --- a/basic_pipeline/04_StreamFormat.md +++ b/basic_pipeline/04_StreamFormat.md @@ -93,7 +93,7 @@ defmodule Source do ] ) ... - def handle_playing(_ctx, state) do + def handle_playing(_context, state) do ... { {[stream_format: {:output, %Formats.Raw{pixel_format: I420, framerate: 45, width: 720, height: 300} }]}, state} end From 7045f9be15bcdbd8f34f41047c5fcfca0b0af84a Mon Sep 17 00:00:00 2001 From: kidp330 Date: Wed, 6 Dec 2023 11:37:17 +0100 Subject: [PATCH 07/18] Update 06_OrderingBuffer.md --- basic_pipeline/06_OrderingBuffer.md | 114 +++++++++++++++------------- 1 file changed, 62 insertions(+), 52 deletions(-) diff --git a/basic_pipeline/06_OrderingBuffer.md b/basic_pipeline/06_OrderingBuffer.md index 1ad19ac..6b416de 100644 --- a/basic_pipeline/06_OrderingBuffer.md +++ b/basic_pipeline/06_OrderingBuffer.md @@ -4,76 +4,80 @@ In this chapter we will deal with the next [element](../glossary/glossary.md#ele As stated in the [chapter about the system architecture](02_SystemArchitecture.md), this element is responsible for ordering the incoming [packets](../glossary/glossary.md#packet), based on their sequence id. Because Ordering Buffer is a filtering element, we need to specify both the input and the output [pads](../glossary/glossary.md#pad): -**_`lib/elements/OrderingBuffer.ex`_** +**_`lib/elements/ordering_buffer.ex`_** ```elixir defmodule Basic.Elements.OrderingBuffer do use Membrane.Filter alias Basic.Formats.Packet - def_input_pad(:input, demand_unit: :buffers, caps: {Packet, type: :custom_packets}) + def_input_pad :input, + flow_control: :manual, + demand_unit: :buffers, + accepted_format: %Packet{type: :custom_packets} - def_output_pad(:output, caps: {Packet, type: :custom_packets}) - ... + def_output_pad :output, + flow_control: :manual, + accepted_format: %Packet{type: :custom_packets} + ... end ``` -Note the caps specification definition there - we expect `Basic.Formats.Packet` of type `:custom_packets` to be sent on the input pad, and the same type of packets to be sent through the output pad. +Note the format specification definition there - we expect `Basic.Formats.Packet` of type `:custom_packets` to be sent on the input pad, and the same type of packets to be sent through the output pad. In the next step let's specify how we want the state of our element to look like: -**_`lib/elements/OrderingBuffer.ex`_** +**_`lib/elements/ordering_buffer.ex`_** ```elixir defmodule Basic.Elements.OrderingBuffer do - ... + ... @impl true - def handle_init(_options) do - {:ok, - %{ - ordered_packets: [], - last_sent_seq_id: 0 - } - } + def handle_init(_context, _options) do + {[], + %{ + ordered_packets: [], + last_sent_seq_id: 0 + }} end - ... + ... end ``` If you don't remember what is the purpose of the Ordering Buffer, please refer to the [2nd chapter](02_SystemArchitecture.md). We will need to hold a list of ordered packets, as well as a sequence id of the packet, which most recently was sent through the output pad (we need to know if there are some packets missing between the last sent packet and the first packet in our ordered list). -Handling demand is quite straightforward: +Handling demands is quite straightforward: -**_`lib/elements/OrderingBuffer.ex`_** +**_`lib/elements/ordering_buffer.ex`_** ```elixir defmodule Basic.Elements.OrderingBuffer do ... - @impl true - def handle_demand(_ref, size, _unit, _ctx, state) do - { {:ok, demand: {Pad.ref(:input), size} }, state} - end + @impl true + def handle_demand(:output, size, _unit, _context, state) do + {[demand: {:input, size}], state} + end ... end ``` We simply send the `:demand` on the `:input` pad once we receive a demand on the `:output` pad. One packet on input corresponds to one packet on output so for each 1 unit of demand we send 1 unit of demand to the `:input` pad. -Now we can go to the main part of the Ordering Buffer implementation - the `handle_process/4` callback. +Now we can go to the main part of the Ordering Buffer implementation - the `handle_buffer/4` callback. The purpose of this callback is to process the incoming buffer. It gets called once a new buffer is available and waiting to be processed. -**_`lib/elements/OrderingBuffer.ex`_** +**_`lib/elements/ordering_buffer.ex`_** ```elixir defmodule Basic.Elements.OrderingBuffer do - ... + ... @impl true - def handle_process(:input, buffer, _context, state) do + def handle_buffer(:input, buffer, _context, state) do packet = unzip_packet(buffer.payload) ordered_packets = [packet | state.ordered_packets] |> Enum.sort() state = %{state | ordered_packets: ordered_packets} - {last_seq_id, _} = Enum.at(ordered_packets, 0) - ... + [{last_seq_id, _} | _] = ordered_packets + ... end defp unzip_packet(packet) do @@ -81,7 +85,7 @@ defmodule Basic.Elements.OrderingBuffer do %{"data" => data, "seq_id" => seq_id} = Regex.named_captures(regex, packet) {String.to_integer(seq_id), %Membrane.Buffer{payload: data} } end - ... + ... end ``` @@ -118,64 +122,70 @@ The result of `Regex.named_captures/2` applied to that regex description and the {"seq_id"=>7, "data"=>"[frameid:2][timestamp:3]data"} ``` -Once we unzip the header of the packet in the `handle_process/4` callback, we can put the incoming packet in the `ordered_packets` list and sort that list. Due to the fact, that elements of this list are tuples, whose first element is a sequence id (a value that is unique), the list will be sorted based on the sequence id. +Once we unzip the header of the packet in the `handle_buffer/4` callback, we can put the incoming packet in the `ordered_packets` list and sort that list. Due to the fact, that elements of this list are tuples, whose first element is a sequence id (a value that is unique), the list will be sorted based on the sequence id. We also get the sequence id of the first element in the updated `ordered_packets` list. -Here comes the rest of the `handle_process/4` definition: +Here comes the rest of the `handle_buffer/4` definition: -**_`lib/elements/OrderingBuffer.ex`_** +**_`lib/elements/ordering_buffer.ex`_** ```elixir defmodule Basic.Elements.OrderingBuffer do - ... - def handle_process(:input, buffer, _context, state) do - ... + ... + def handle_buffer(:input, buffer, _context, state) do + ... if state.last_sent_seq_id + 1 == last_seq_id do - {reversed_ready_packets_sequence, ordered_packets} = get_ready_packets_sequence(ordered_packets, []) - [{last_sent_seq_id, _} | _] = reversed_ready_packets_sequence + {ready_packets_sequence, ordered_packets_left} = + get_ready_packets_sequence(ordered_packets, []) + + {last_sent_seq_id, _} = List.last(ready_packets_sequence) state = %{ state - | ordered_packets: ordered_packets, + | ordered_packets: ordered_packets_left, last_sent_seq_id: last_sent_seq_id } - buffers = Enum.reverse(reversed_ready_packets_sequence) |> Enum.map(fn {_seq_id, data} -> data end) - { {:ok, buffer: {:output, buffers} }, state} + ready_buffers = Enum.map(ready_packets_sequence, &elem(&1, 1)) + + {[buffer: {:output, ready_buffers}], state} else - { {:ok, redemand: :output}, state} + {[redemand: :output], state} end end - ... + ... end ``` We need to distinguish between two situations: the currently processed packet can have a sequence id which is subsequent to the sequence id of the last sent packet or there might be some packets not yet delivered to us, with sequence ids in between the last sent sequence id and the sequence id of a currently processed packet. In the second case, we should store the packet and wait for the next packets to arrive. We will accomplish that using [`redemands` mechanism](../glossary/glossary.md#redemands), which will be explained in detail in the next chapter. However, in the first situation, we need to get the ready packet's sequence - that means, a consistent batch of packets from the `:ordered_packets`. This can be done in the following way: -**_`lib/elements/OrderingBuffer.ex`_** +**_`lib/elements/ordering_buffer.ex`_** ```elixir defmodule Basic.Elements.OrderingBuffer do - ... - defp get_ready_packets_sequence([], acc) do - {acc, []} + ... + defp get_ready_packets_sequence([first_packet | ordered_rest], []) do + get_ready_packets_sequence(ordered_rest, [first_packet]) end defp get_ready_packets_sequence( - [{first_id, _first_data} = first_seq | [{second_id, second_data} | rest]], acc) - when first_id + 1 == second_id do - get_ready_packets_sequence([{second_id, second_data} | rest], [first_seq | acc]) + [next_seq = {next_id, _} | ordered_rest], + [{last_id, _} | _] = ready_sequence + ) + when next_id == last_id + 1 do + get_ready_packets_sequence(ordered_rest, [next_seq | ready_sequence]) end - defp get_ready_packets_sequence([first_seq | rest], acc) do - {[first_seq | acc], rest} - end + defp get_ready_packets_sequence(ordered_packets, ready_sequence) do + {Enum.reverse(ready_sequence), ordered_packets} + end + ... end ``` Note the order of the definitions, since we are taking advantage of the pattern matching mechanism! -The algorithm implemented in the snippet above is really simple - we are recursively taking the next packet out of the `:ordered_packets` buffer until it becomes empty or there is a missing packet (`first_id + 1 == second_id`) between the last taken packet and the next packet in the buffer. +The algorithm implemented in the snippet above is really simple - we are recursively taking the next packet out of the `:ordered_packets` buffer until it becomes empty or there is a missing packet (`next_id == last_id + 1`) between the last taken packet and the next packet in the buffer. Once we have a consistent batch of packets, we can update the state (both the`:ordered_packets` and the `:last_sent_seq_id` need to be updated) and output the ready packets by defining the `:buffer` action. Test the `OrderingBuffer`: From bc3a6b99ab0f33ebcf0c1cd02b7e0065f0ad440f Mon Sep 17 00:00:00 2001 From: kidp330 Date: Wed, 6 Dec 2023 11:42:55 +0100 Subject: [PATCH 08/18] Update 08_Depayloader.md --- basic_pipeline/08_Depayloader.md | 157 ++++++++++++++++++------------- 1 file changed, 89 insertions(+), 68 deletions(-) diff --git a/basic_pipeline/08_Depayloader.md b/basic_pipeline/08_Depayloader.md index 0714290..f19b2b6 100644 --- a/basic_pipeline/08_Depayloader.md +++ b/basic_pipeline/08_Depayloader.md @@ -1,11 +1,10 @@ -# Depeyloader - +# Depayloader Since we have [packets](../glossary/glossary.md#packet) put in order by the [Ordering Buffer](../basic_pipeline/06_OrderingBuffer.md), we can assemble them into the original [frames](../glossary/glossary.md#frame). The Depayloader is an element responsible for this task. Specifically speaking, it unpacks the payload from the packets - and that is why it's called 'depayloader'. -Let's create a new module in the `lib/elements/Depayloader.ex` file: +Let's create a new module in the `lib/elements/depayloader.ex` file: -**_`lib/elements/Depayloader.ex`_** +**_`lib/elements/depayloader.ex`_** ```elixir defmodule Basic.Elements.Depayloader do @@ -18,11 +17,15 @@ end What input data do we expect? Of course in `Basic.Format.Packet` format! -**_`lib/elements/Depayloader.ex`_** +**_`lib/elements/depayloader.ex`_** ```elixir defmodule Basic.Elements.Depayloader do - def_input_pad(:input, demand_unit: :buffers, caps: {Packet, type: :custom_packets}) + ... + def_input_pad :input, + flow_control: :manual, + demand_unit: :buffers, + accepted_format: %Packet{type: :custom_packets} ... end ``` @@ -30,91 +33,103 @@ end However, our element will process that input data in a way that will change the format - on output, there will be frames instead of packets! We need to specify it while defining the `:output` pad: -**_`lib/elements/Depayloader.ex`_** +**_`lib/elements/depayloader.ex`_** ```elixir defmodule Basic.Elements.Depayloader do ... - def_output_pad(:output, caps: {Frame, encoding: :utf8}) + def_output_pad :output, + flow_control: :manual, + accepted_format: %Frame{encoding: :utf8} ... end ``` We will also need a parameter describing how many packets should we request once we receive a demand for a frame: -**_`lib/elements/Depayloader.ex`_** +**_`lib/elements/depayloader.ex`_** ```elixir defmodule Basic.Elements.Depayloader do ... - def_options( - packets_per_frame: [ - type: :integer, - spec: pos_integer, - description: - "Positive integer, describing how many packets form a single frame. Used to demand the proper number of packets while assembling the frame." - ] - ) + def_options packets_per_frame: [ + spec: pos_integer, + description: + "Positive integer, describing how many packets form a single frame. Used to demand the proper number of packets while assembling the frame." + ] ... end ``` -In the `handle_init/1` callback we are simply saving the value of that parameter in the state of our element: +In the `handle_init/2` callback we are simply saving the value of that parameter in the state of our element: -**_`lib/elements/Depayloader.ex`_** +**_`lib/elements/depayloader.ex`_** ```elixir -@impl true -def handle_init(options) do -{:ok, - %{ - frame: [], - packets_per_frame: options.packets_per_frame - } } +defmodule Basic.Elements.Depayloader do + ... + @impl true + def handle_init(_context, options) do + {[], + %{ + frame: [], + packets_per_frame: options.packets_per_frame + }} + end + ... end ``` Within the state, we will also hold a (potentially not complete) `:frame` - a list of packets, which form a particular frame. We will aggregate the packets in the `:frame` until the moment the frame is complete. -As noted in the [chapter dedicated to the caps](04_Caps.md), since we are changing the type of data within the element, we cannot rely on the default implementation of the `handle_caps/4` callback. We need to explicitly send the updated version of caps: +As noted in the [chapter dedicated to stream formats](04_StreamFormat.md), since we are changing the type of data within the element, we cannot rely on the default implementation of the `handle_stream_format/4` callback. We need to explicitly send the updated version of the format: -**_`lib/elements/Depayloader.ex`_** +**_`lib/elements/depayloader.ex`_** ```elixir -@impl true -def handle_caps(_pad, _caps, _context, state) do - caps = %Frame{encoding: :utf8} - { {:ok, caps: {:output, caps} }, state} +defmodule Basic.Elements.Depayloader do + ... + @impl true + def handle_stream_format(:input, _stream_format, _context, state) do + {[stream_format: {:output, %Frame{encoding: :utf8}}], state} + end + ... end ``` -As in most elements, the `handle_demand/5` implementation is quite easy - what we do is simply to make a demand on our `:input` pad once we receive a demand on the `:output` pad. However, since we are expected to produce a frame (which is formed from a particular number of packets) on the `:output` pad, we need to request a particular number of packets on the `:input` pad - that is why we have defined the `:packets_per_frame` option and now we will be making usage of it. In case we would have been asked to produce 10 frames, and each frame would have been made out of 5 packets, then we would need to ask for 10\*5 = 50 packets on the `:input`. +As in most elements, the `handle_demand/5` implementation is quite easy - what we do is simply make a demand on our `:input` pad once we receive a demand on the `:output` pad. However, since we are expected to produce a frame (which is formed from a particular number of packets) on the `:output` pad, we need to request a particular number of packets on the `:input` pad - that is why we have defined the `:packets_per_frame` option and now we will be making use of it. In case we would have been asked to produce 10 frames, and each frame would have been made out of 5 packets, then we would need to ask for 10\*5 = 50 packets on the `:input`. -**_`lib/elements/Depayloader.ex`_** +**_`lib/elements/depayloader.ex`_** ```elixir -@impl true -def handle_demand(_ref, size, _unit, _ctx, state) do - { {:ok, demand: {Pad.ref(:input), size * state.packets_per_frame} }, state} +defmodule Basic.Elements.Depayloader do + ... + @impl true + def handle_demand(:output, size, :buffers, _context, state) do + {[demand: {:input, size * state.packets_per_frame}], state} + end + ... end ``` -There is nothing left apart from processing the input data - that is - the packets. Since the packets are coming in order, we can simply hold them in the `:frame` list until all the packets forming that frame will be there. As you might remember, each packet has a frame id in its header, which can be followed by a 'b' or 'e' character, indicating the type of the packet (the one begging a frame or the one ending the frame). We will use information about the type to find a moment in which we should produce a frame out of the packets list. +There is nothing left apart from processing the input data - that is - the packets. Since the packets are coming in order, we can simply hold them in the `:frame` list until all the packets forming that frame will be there. As you might remember, each packet has a frame id in its header, which can be followed by a 'b' or 'e' character, indicating the type of the packet (the one beginning a frame or the one ending the frame). We will use information about the type to find a moment in which we should produce a frame out of the packets list. -**_`lib/elements/Depayloader.ex`_** +**_`lib/elements/depayloader.ex`_** ```elixir -@impl true -def handle_process(_ref, buffer, _ctx, state) do - packet = buffer.payload +defmodule Basic.Elements.Depayloader do + ... + @impl true + def handle_buffer(:input, buffer, _context, state) do + packet = buffer.payload - regex = - ~r/^\[frameid\:(?\d+(?[s|e]*))\]\[timestamp\:(?\d+)\](?.*)$/ + regex = + ~r/^\[frameid\:(?\d+(?[s|e]*))\]\[timestamp\:(?\d+)\](?.*)$/ - %{"data" => data, "frame_id" => _frame_id, "type" => type, "timestamp" => timestamp} = - Regex.named_captures(regex, packet) + %{"data" => data, "frame_id" => _frame_id, "type" => type, "timestamp" => timestamp} = + Regex.named_captures(regex, packet) - frame = [data | state.frame] + frame = [data | state.frame] ... end ``` @@ -122,34 +137,40 @@ end Once again we are taking advantage of the `Regex.named_captures`. Once we fetch the interesting values of the header's parameters, we can update the `:frame`. -**_`lib/elements/Depayloader.ex`_** +**_`lib/elements/depayloader.ex`_** ```elixir -@impl true -def handle_process(_ref, buffer, _ctx, state) do - ... -case type do - "e" -> - frame = prepare_frame(frame) - state = Map.put(state, :frame, []) - buffer = %Membrane.Buffer{payload: frame, pts: String.to_integer(timestamp)} - { {:ok, [buffer: {:output, buffer}]}, state} - - _ -> - state = Map.put(state, :frame, frame) - {:ok, state} - end +defmodule Basic.Elements.Depayloader do + ... + @impl true + def handle_buffer(:input, buffer, _context, state) do + ... + if type == "e" do + buffer = %Membrane.Buffer{ + payload: prepare_frame(frame), + pts: String.to_integer(timestamp) + } + + {[buffer: {:output, buffer}], %{state | frame: []}} + else + {[], %{state | frame: frame}} + end + ... end ``` Now, depending on the type of frame, we perform different actions. -If we have the 'ending' packet, we are making the `:buffer` action with the frame made out of the packets (that's where `prepare_frame/1` function comes in handy), and clear the `:frame` buffer. Here is how can the `prepare_frame/1` function be implemented: +If we have the 'ending' packet, we are making the `:buffer` action with the frame made out of the packets (that's where `prepare_frame/1` function comes in handy), and clear the `:frame` buffer. Here is how the `prepare_frame/1` function can be implemented: -**_`lib/elements/Depayloader.ex`_** +**_`lib/elements/depayloader.ex`_** ```elixir -defp prepare_frame(frame) do - frame |> Enum.reverse() |> Enum.join("") +defmodule Basic.Elements.Depayloader do + ... + defp prepare_frame(frame) do + frame |> Enum.reverse() |> Enum.join("") + end + ... end ``` @@ -161,5 +182,5 @@ Test the `Depayloader`: mix test test/elements/depayloader_test.exs ``` -With the [`Source`](../glossary/glossary.md#source), [`OrderingBuffer`](../glossary/glossary.md#jitter-buffer--ordering-buffer) and [`Depayloader`](../glossary/glossary.md#payloader-and-depayloader) elements ready we are able to read packets from file, order them based on their sequence id and assemble them back into frames. -In the next chapter we will be dealing with [`Mixer`](../glossary/glossary.md#mixer) which will merge two message streams in order to create complete conversation. +With the [`Source`](../glossary/glossary.md#source), [`OrderingBuffer`](../glossary/glossary.md#jitter-buffer--ordering-buffer) and [`Depayloader`](../glossary/glossary.md#payloader-and-depayloader) elements ready we are able to read packets from file, order them based on their sequence ID and assemble them back into frames. +In the next chapter we will be dealing with the [`Mixer`](../glossary/glossary.md#mixer) which will merge two message streams in order to create complete conversation. From 8afb2b7028223ce1df4fc7b9b8417152694c58a2 Mon Sep 17 00:00:00 2001 From: kidp330 Date: Wed, 6 Dec 2023 14:34:38 +0100 Subject: [PATCH 09/18] Update 09_Mixer.md --- basic_pipeline/09_Mixer.md | 269 +++++++++++++++++-------------------- 1 file changed, 120 insertions(+), 149 deletions(-) diff --git a/basic_pipeline/09_Mixer.md b/basic_pipeline/09_Mixer.md index f68000d..e11df67 100644 --- a/basic_pipeline/09_Mixer.md +++ b/basic_pipeline/09_Mixer.md @@ -3,21 +3,29 @@ Here comes the mixer - an [element](../glossary/glossary.md#element) responsible for mixing two streams of [frames](../glossary/glossary.md#frame), coming from two different sources. Once again we start with defining the initialization options and the pads of both types: -**_`lib/elements/Mixer.ex`_** +**_`lib/elements/mixer.ex`_** ```elixir defmodule Basic.Elements.Mixer do - @moduledoc """ - The element responsible for mixing the frames coming from two sources, based on their timestamps. - """ - use Membrane.Filter - alias Basic.Formats.Frame - - def_input_pad(:first_input, demand_unit: :buffers, caps: {Frame, encoding: :utf8}) - - def_input_pad(:second_input, demand_unit: :buffers, caps: {Frame, encoding: :utf8}) - - def_output_pad(:output, caps: {Frame, encoding: :utf8}) + @moduledoc """ + Element responsible for mixing the frames coming from two sources, basing on their timestamps. + """ + use Membrane.Filter + alias Basic.Formats.Frame + + def_input_pad :first_input, + flow_control: :manual, + demand_unit: :buffers, + accepted_format: %Frame{encoding: :utf8} + + def_input_pad :second_input, + flow_control: :manual, + demand_unit: :buffers, + accepted_format: %Frame{encoding: :utf8} + + def_output_pad :output, + flow_control: :manual, + accepted_format: %Frame{encoding: :utf8} ... end ``` @@ -25,25 +33,25 @@ end Note, that we have defined two input [pads](../glossary/glossary.md#pad): `:first_input` and the `:second_input`. Each of these input pads will have a corresponding incoming [track](../glossary/glossary.md#track) in form of a [buffers](../glossary/glossary.md#buffer) stream. We need a structure that will hold the state of the track. Let's create it by defining a `Track` inside the mixer module: -**_`lib/elements/Mixer.ex`_** +**_`lib/elements/mixer.ex`_** ```elixir defmodule Basic.Elements.Mixer do ... - defmodule Track do - @type t :: %__MODULE__{ - buffer: Membrane.Buffer.t(), - status: :started | :finished - } - defstruct buffer: nil, status: :started - end + defmodule Track do + @type t :: %__MODULE__{ + buffer: Membrane.Buffer.t(), + status: :started | :finished + } + defstruct buffer: nil, status: :started + end ... end ``` As you can see in the code snippet above, the `Track` will consist of the `:buffer` field, holding the very last buffer received on the corresponding input pad, and the `:status` fields, indicating the status of the track - `:started`, in case we are still expecting some buffers to come (that means - in case `:end_of_stream` event hasn't been received yet) and `:finished` otherwise. -It's a good practice to provide a type specification for such a custom struct since it makes the code easier to reuse and lets the compiler warn us about some misspellings (for instance in the status field atoms), which cause some nasty to be spotted errors. -A careful reader might notice, that we are holding only one buffer for each track, instead of a list of all the potentially unprocessed buffers - does it mean that we are losing some of them? Not at all, since we are taking advantage of the elements which have appeared earlier in the [pipeline](../glossary/glossary.md#pipeline) and which provide us an ordered list of frames on each of the inputs - however, we will need to process each buffer just at the moment it comes on the pad. +It's a good practice to provide a type specification for such a custom struct since it makes the code easier to reuse and lets the compiler warn us about some misspellings (for instance in the status field atoms), which cause some hard to spot errors. +A careful reader might notice, that we are holding only one buffer for each track, instead of a list of all the potentially unprocessed buffers - does it mean that we are losing some of them? Not at all, since we are taking advantage of the elements which have appeared earlier in the [pipeline](../glossary/glossary.md#pipeline) and which provide us with an ordered list of frames on each of the inputs - however, we will need to process each buffer just at the moment it comes on the pad. The logic we're going to implement can be described in the following three steps: @@ -53,186 +61,149 @@ The logic we're going to implement can be described in the following three steps The next step in our element implementation is quite an obvious one: -**_`lib/elements/Mixer.ex`_** +**_`lib/elements/mixer.ex`_** ```elixir defmodule Basic.Elements.Mixer do ... - @impl true - def handle_init(_options) do - {:ok, - %{ - tracks: %{first_input: %Track{}, second_input: %Track{} } - } } - end + @impl true + def handle_init(_context, _options) do + {[], + %{ + tracks: %{first_input: %Track{}, second_input: %Track{}} + }} + end ... end ``` -We have provided an `handle_init/1` callback, which does not expect any options to be passed. We are simply setting up the structure of the element state. -As mentioned previously, we will have a `Track` structure for each of the input pads. -Following on the callbacks implementation, let's continue with `handle_process/4` implementation: +We have provided a `handle_init/2` callback, which does not expect any options to be passed. We are simply setting up the structure of the element state. +As mentioned previously, we will have a `Track` structure for each of the input pads. +What's interesting is this is where the mixer having exactly two inputs stops being important. The missing functionality can be defined generically without much hassle. +Following on the callbacks implementation, let's continue with the `handle_buffer/4` implementation: -**_`lib/elements/Mixer.ex`_** +**_`lib/elements/mixer.ex`_** ```elixir defmodule Basic.Elements.Mixer do ... - @impl true - def handle_process(pad, buffer, _context, state) do - tracks = - Map.update!(state.tracks, pad, fn track -> - %Track{track | buffer: buffer} - end) - - state = %{state | tracks: tracks} - { {:ok, [{:redemand, :output}]}, state} - end + @impl true + def handle_buffer(pad, buffer, _context, state) do + tracks = Map.update!(state.tracks, pad, &%Track{&1 | buffer: buffer}) + {tracks, buffer_actions} = get_output_buffers_actions(tracks) + state = %{state | tracks: tracks} + + {buffer_actions ++ [redemand: :output], state} + end ... end ``` -What we do is that we are simply putting the incoming `buffer` into the `Track` structure for the given pad. Note, that we have to be sure that we are not losing any information which is in the `Track`'s buffer before the update. In case there is a buffer on a given `Track`, it has to be processed before another buffer comes. Why can we be sure of that in our implementation? That's before we precisely steer the flow of our program and ask for the next buffer after we empty the buffer hold in the state of the element. +In this callback we update the mixer's state by assigning the incoming buffer to its track. We can be sure no overwriting of an existing buffer happens, which will become more apparent as we delve further into the logic's implementation. +Once the state is updated we gather all buffers that can be sent (might be none) in `get_output_buffers_actions/1` and return the coresponding `buffer` actions. In case any demands should be sent afterwards we also tell the output pad to redemand. -**_`lib/elements/Mixer.ex`_** +**_`lib/elements/mixer.ex`_** ```elixir defmodule Basic.Elements.Mixer do ... - @impl true - def handle_end_of_stream(pad, _context, state) do - tracks = - Map.update!(state.tracks, pad, fn track -> - %Track{track | status: :finished} - end) - - state = %{state | tracks: tracks} - { {:ok, [{:redemand, :output}]}, state} - end + @impl true + def handle_end_of_stream(pad, _context, state) do + tracks = Map.update!(state.tracks, pad, &%Track{&1 | status: :finished}) + {tracks, buffer_actions} = get_output_buffers_actions(tracks) + state = %{state | tracks: tracks} + + if Enum.all?(tracks, fn {track_id, track} -> + track.status == :finished and not has_buffer?({track_id, track}) + end) do + {buffer_actions ++ [end_of_stream: :output], state} + else + {buffer_actions ++ [redemand: :output], state} + end + end ... end ``` -What we did here was similar to the logic defined in the `handle_process/4` - we have just updated the state of the track (in that case - by setting its status as `:finished`) and then we called the `handle_demand/5` callback using the `:redemand` actions. The `handle_demand/5` will take care of the fact that the track state has changed. -There is nothing left to do apart from defining the `handle_demand/5` itself! +What we did here was similar to the logic defined in `handle_buffer/4` - we have just updated the state of the track (in that case - by setting its status as `:finished`), gather the buffers and send them. The important difference is that in case all inputs have closed, we should forward an `end_of_stream` action instead of a `redemand`, signaling the mixer has finished its processing. +The `has_buffer?/1` function is a private utility that will show up in a few more places in our element's definition and is nothing more than a simple check `track.buffer != nil`. +Let's now implement the `handle_demand/5` callback: -**_`lib/elements/Mixer.ex`_** +**_`lib/elements/mixer.ex`_** ```elixir defmodule Basic.Elements.Mixer do - ... - @impl true - def handle_demand(:output, _size, _unit, ctx, state) do - {state, buffer_actions} = get_output_buffers_actions(state) - {state, end_of_stream_actions} = maybe_send_end_of_stream(state) - {state, demand_actions} = get_demand_actions(state, ctx.pads) - - actions = buffer_actions ++ end_of_stream_actions ++ demand_actions - { {:ok, actions}, state} + ... + def handle_demand(:output, _size, _unit, context, state) do + demand_actions = + state.tracks + |> Enum.reject(&has_buffer?/1) + |> Enum.filter(fn {track_id, track} -> + track.status != :finished and context.pads[track_id].demand == 0 + end) + |> Enum.map(fn {track_id, _track} -> {:demand, {track_id, 1}} end) + + {demand_actions, state} end - ... + ... end ``` -The tracks processing presented in the code snippet above has been split into the following steps: +Since it should be responsible for producing and sending `demand` actions to the corresponding input pads, we accordingly filter tracks for ones that are empty, started, and with no demands pending. +It should also become clearer why in `handle_buffer/4` the receiving track is sure to have an empty buffer ready to be overwritten, since we only send demands to input pads of empty tracks. +All that's left now is to implement the main processing logic, gathering buffers that are ready to be sent: -- outputing the ready buffers -- sending `:end_of_stream` notification if necessary -- demanding on empty tracks -Each of these steps has a corresponding private function. - -**_`lib/elements/Mixer.ex`_** +**_`lib/elements/mixer.ex`_** ```elixir defmodule Basic.Elements.Mixer do ... - defp get_output_buffers_actions(state) do - {buffers, tracks} = prepare_buffers(state.tracks) - state = %{state | tracks: tracks} - buffer_actions = Enum.map(buffers, fn buffer -> {:buffer, {:output, buffer} } end) - {state, buffer_actions} - end - - defp prepare_buffers(tracks) do - active_tracks = - tracks - |> Enum.reject(fn {_track_id, track} -> - track.status == :finished and track.buffer == nil - end) - |> Map.new() - - if active_tracks != %{} and - Enum.all?(active_tracks, fn {_, track} -> track.buffer != nil end) do - {track_id, track} = - active_tracks - |> Enum.min_by(fn {_track_id, track} -> track.buffer.pts end) - - buffer = track.buffer - tracks = Map.put(tracks, track_id, %Track{track | buffer: nil}) - {buffers, tracks} = prepare_buffers(tracks) - {[buffer | buffers], tracks} - else - {[], tracks} + defp has_buffer?({_track_id, track}), + do: track.buffer != nil + + defp can_send_buffer?(tracks) do + started_tracks = + Enum.filter( + tracks, + fn {_track_id, track} -> track.status != :finished end + ) + + (started_tracks == [] and Enum.any?(tracks, &has_buffer?/1)) or + (started_tracks != [] and Enum.all?(started_tracks, &has_buffer?/1)) end - end - ... -end -``` -In order to output the buffers, we need to fetch the desired buffers - that is what we do with the `prepare_buffers/1` function. Later on, we are simply creating the `:buffer` action, basing on the list of buffers to be output. -In the `prepare_bufers/1` we get all the active tracks (by 'active' we mean that there is still an unprocessed buffer in the `Track` structure - independent of the status of that track). If all the active tracks have the buffers we can output the one with the lowest presentation timestamp and recursively call the `prepare_buffers/1` (in case there are some buffers that still need to be output - this can happen in a 'corner case' of processing the buffer from the track in the `:finished` state). Surely, we also need to update the state so that to remove the processed buffers. -Now let's focus on preparing `:end_of_stream` action: + defp get_output_buffers_actions(tracks) do + {buffers, tracks} = prepare_buffers(tracks) + buffer_actions = Enum.map(buffers, fn buffer -> {:buffer, {:output, buffer}} end) + {tracks, buffer_actions} + end -**_`lib/elements/Mixer.ex`_** + defp prepare_buffers(tracks) do + if can_send_buffer?(tracks) do + {next_track_id, next_track} = + tracks + |> Enum.filter(&has_buffer?/1) + |> Enum.min_by(fn {_track_id, track} -> track.buffer.pts end) -```elixir -defmodule Basic.Elements.Mixer do - ... - defp maybe_send_end_of_stream(state) do - end_of_stream_actions = - if Enum.all?(state.tracks, fn {_, track} -> track.status == :finished end) do - [end_of_stream: :output] + tracks = Map.put(tracks, next_track_id, %Track{next_track | buffer: nil}) + {buffers, tracks} = prepare_buffers(tracks) + {[next_track.buffer | buffers], tracks} else - [] + {[], tracks} end - {state, end_of_stream_actions} - end - ... -end -``` - -This action needs to be sent if both the tracks are in the `:finished` state - since the `maybe_send_end_of_stream/1` function gets called after the `get_output_buffers_actions/1`, we can be sure, that all the buffers which could possibly be on those tracks, despite they are in the `:finished` state, are already processed. - -**_`lib/elements/Mixer.ex`_** - -```elixir -defmodule Basic.Elements.Mixer do - ... - defp get_demand_actions(state, pads) do - actions = - state.tracks - |> Enum.filter(fn {track_id, track} -> - track.status != :finished and track.buffer == nil and pads[track_id].demand == 0 - end) - |> Enum.map(fn {track_id, _} -> {:demand, {Pad.ref(track_id), 1} } end) - - {state, actions} - end - ... + end end ``` -The last type of actions we need to generate are `:demand` actions. From the [context](https://hexdocs.pm/membrane_core/Membrane.Element.CallbackContext.Demand.html#t:t/0) passed as one of the arguments in the `handle_demand/5` callback, we have passed the `context.pads`. -That is how we can fetch the information about the current demand size on the given pad. -For all the tracks which are not yet `:finished`, do not have the buffer and the demand was not made on behalf of that pad (there is where we are making usage of the context information - `pads[track_id].demand==0`), we are making such a demand for one buffer. +The `prepare_buffers/1` function is the most involved here, so let's start with that. We first check whether we can send a buffer at all. The next buffer to send in order will of course be one with lowest `.pts`. We then empty the corresponding track's buffer. There might be more than one buffer ready to send and so we iterate the gathering recursively. +We define `can_send_buffer?` as follows. If there's any `:started` track still waiting on a buffer we cannot send more, since whatever buffers the mixer's currently holding might come after the one that's yet to be received on this track. +Otherwise, if all tracks have finished it can still be the case that some have non-empty buffers. We can happily send all of these in order since it is guaranteed there is no buffer preceding them that we would have to wait for. -Test the `Mixer`: +And that's all! the mixer's ready to mix, and ready to be tested: ```console mix test test/elements/mixer_test.exs ``` -Starting from that moment, our mixer should be capable of mixing the inputs from two sources! In the following part of this tutorial, we will extend the mixer so that it will be able to mix any number of tracks. - Now all that's left to do is to save our stream to file using [`Sink`](../glossary/glossary.md#sink). From 00aaab88ea94943e76bcc26a2d78f8f1521aa093 Mon Sep 17 00:00:00 2001 From: kidp330 Date: Wed, 6 Dec 2023 14:47:29 +0100 Subject: [PATCH 10/18] Update 10_Sink.md --- basic_pipeline/10_Sink.md | 62 +++++++++++++++++++-------------------- 1 file changed, 30 insertions(+), 32 deletions(-) diff --git a/basic_pipeline/10_Sink.md b/basic_pipeline/10_Sink.md index 4795919..41bd68b 100644 --- a/basic_pipeline/10_Sink.md +++ b/basic_pipeline/10_Sink.md @@ -4,77 +4,75 @@ The sink is the last [element](../glossary/glossary.md#element) in our [pipeline In contrast to the [filter elements](../glossary/glossary.md#filter), it won't have any output [pad](../glossary/glossary.md#pad) - that is why we need to make our element `use Membrane.Sink` and define the input pad only. Since we want to parameterize the usage of that element, it will be good to define the options structure, so that we can specify the path to the file where the output should be saved. This stuff is done in the code snippet below: -**_`lib/elements/Sink.ex`_** +**_`lib/elements/sink.ex`_** ```elixir - defmodule Basic.Elements.Sink do - @moduledoc """ - An element writing the data to the text file. - """ - - use Membrane.Sink + use Membrane.Sink - def_options(location: [type: :string, description: "Path to the file"]) + def_options location: [ + spec: String.t(), + description: "Path to the file" + ] - def_input_pad(:input, demand_unit: :buffers, caps: :any) + def_input_pad :input, + flow_control: :manual, + demand_unit: :buffers, + accepted_format: _any ... end ``` No surprises there - now we need to specify the element's behavior by defining the relevant callbacks! -**_`lib/elements/Sink.ex`_** +**_`lib/elements/sink.ex`_** ```elixir defmodule Basic.Elements.Sink do ... - @impl true - def handle_init(options) do - {:ok, - %{ - location: options.location - } } - end + @impl true + def handle_init(_context, options) do + {[], %{location: options.location}} + end ... end ``` -We have started with `handle_init/1`, where we are initializing the state of the element (we need to store the path to the output files). +We have started with `handle_init/2`, where we are initializing the state of the element (we need to store the path to the output files). -Later on, we can specify the `handle_prepared_to_playing/2` callback - this callback gets called once the pipeline gets in the `:playing` state - that is a moment when we can demand the [buffers](../glossary/glossary.md#buffer) for the first time (since the pipeline is already prepared to work): +Later on, we can specify the `handle_playing/2` callback - this callback gets called once the pipeline's setup finishes - that is a moment when we can demand the [buffers](../glossary/glossary.md#buffer) for the first time (since the pipeline is already prepared to work): -**_`lib/elements/Sink.ex`_** +**_`lib/elements/sink.ex`_** ```elixir defmodule Basic.Elements.Sink do ... - @impl true - def handle_prepared_to_playing(_ctx, state) do - { {:ok, demand: {:input, 10} }, state} - end + @impl true + def handle_playing(_context, state) do + {[demand: {:input, 10}], state} + end ... end ``` -There is only one more callback that needs to be specified - `handle_write/4`, which get's called once there are some buffers that can be processed (which means, that there are buffers to be written since there are no output pads through which we could be transmitting these buffers): +There is only one more callback that needs to be specified - `handle_buffer/4`, which get's called once there are some buffers that can be processed (which means, that there are buffers to be written since there are no output pads through which we could be transmitting these buffers): -**_`lib/elements/Sink.ex`_** +**_`lib/elements/sink.ex`_** ```elixir defmodule Basic.Elements.Sink do ... - @impl true - def handle_write(:input, buffer, _ctx, state) do - File.write!(state.location, buffer.payload <> "\n", [:append]) - { {:ok, demand: {:input, 10} }, state} - end + @impl true + def handle_buffer(:input, buffer, _context, state) do + File.write!(state.location, buffer.payload <> "\n", [:append]) + {[demand: {:input, 10}], state} + end end ``` -Note, that after the successful writing, we are taking the `:demand` action and we ask for some more buffer. +Note that after the successful writing, we are taking the `:demand` action and we ask for some more buffer. With the `Sink` completed, we have implemented all elements of our pipeline. Now let's move to the very last step - defining the actual pipeline using the elements we have created. From 84e2c132a425df21a9fa53e7e13b784059109b0e Mon Sep 17 00:00:00 2001 From: kidp330 Date: Wed, 6 Dec 2023 15:12:37 +0100 Subject: [PATCH 11/18] Redemands WIP --- basic_pipeline/07_Redemands.md | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/basic_pipeline/07_Redemands.md b/basic_pipeline/07_Redemands.md index 7ddb7d2..6ebdf9b 100644 --- a/basic_pipeline/07_Redemands.md +++ b/basic_pipeline/07_Redemands.md @@ -6,7 +6,7 @@ Generally speaking, it can be used in two situations: - in the [source elements](../glossary/glossary.md#source) - in the [filter elements](../glossary/glossary.md#filter) -To comprehensively understand the concept behind redemanding, you need to be aware of the typical control flow which occurs in the Membrane's [elements](../glossary/glossary.md#element) - which you could have seen in the elements we have already defined. +To comprehensively understand the concept behind redemanding, you need to be aware of the typical control flow which occurs in Membrane's [elements](../glossary/glossary.md#element) - which you could have seen in the elements we have already defined. ## In Source elements @@ -40,14 +40,14 @@ end ``` ## In Filter elements - + In the filter element, the situation is quite different. -Since the filter's responsibility is to process the data sent via the input pads and transmit it through the output pads, there is no 'side-channel' from which we could take data. That is why in normal circumstances you would transmit the buffer through the output pad in the `handle_process/4` callback (which means - once your element receives a buffer, you process it, and then you 'mark' it as ready to be output with the `:buffer` action). When it comes to the `handle_demand/5` action on the output pad, all you need to do is to demand the appropriate number of buffers on the element's input pad. The behavior which is easy to specify when we exactly know how many input buffers correspond to the one output buffer (recall the situation in the [Depayloader](../glossary/glossary.md#payloader-and-depayloader) of our pipeline, where we *a priori* knew, that each output buffer ([frame](../glossary/glossary.md#frame)) consists of a given number of input buffers ([packets](../glossary/glossary.md#packet)), becomes impossible to define if the output buffer might be a combination of a discretionary set number of input buffers. However, we have dealt with an unknown number of required buffers in the OrderingBuffer implementation, where we didn't know how many input buffers do we need to demand to fulfill the missing spaces between the packets ordered in the list. How did we manage to do it? -We simply used the `:redemand` action! In case there was a missing space between the packets, we returned the `:redemand` action, which immediately called the `handle_demand/5` callback (implemented in a way to request for a buffer on the input pad). The fact, that that callback invocation was immediate, which means - the callback was called synchronously, right after returning from the `handle_process/4` callback, before processing any other message from the element's mailbox - might be crucial in some situations, since it makes us sure, that the demand will be done before handling any other event. -Recall the situation in the [Mixer](../glossary/glossary.md#mixer), where we were producing the output buffers right in the `handle_demand/5` callback. We needed to attempt to create the output buffer after: +Since the filter's responsibility is to process the data sent via the input pads and transmit it through the output pads, there is no 'side-channel' from which we could take data. That is why in normal circumstances you would transmit the buffer through the output pad in the `handle_buffer/4` callback (which means - once your element receives a buffer, you process it, and then you 'mark' it as ready to be output with the `:buffer` action). When it comes to the `handle_demand/5` action on the output pad, all you need to do is to demand the appropriate number of buffers on the element's input pad. The behavior which is easy to specify when we exactly know how many input buffers correspond to the one output buffer (recall the situation in the [Depayloader](../glossary/glossary.md#payloader-and-depayloader) of our pipeline, where we *a priori* knew, that each output buffer ([frame](../glossary/glossary.md#frame)) consists of a given number of input buffers ([packets](../glossary/glossary.md#packet))), becomes impossible to define if the output buffer might be a combination of a discretionary set number of input buffers. However, we have dealt with an unknown number of required buffers in the OrderingBuffer implementation, where we didn't know how many input buffers do we need to demand to fulfill the missing spaces between the packets ordered in the list. How did we manage to do it? +We simply used the `:redemand` action! In case there was a missing space between the packets, we returned the `:redemand` action, which immediately called the `handle_demand/5` callback (implemented in a way to request for a buffer on the input pad). The fact, that that callback invocation was immediate, which means - the callback was called synchronously, right after returning from the `handle_buffer/4` callback, before processing any other message from the element's mailbox - might be crucial in some situations, since it makes us sure, that the demand will be done before handling any other event. +Recall the situation in the [Mixer](../glossary/glossary.md#mixer), where we were producing the output buffers right in the `handle_demand/5` callback. We needed to attempt to create the output buffer after: -- updating the buffers' list in the `handle_process/4` -- updating the status of the [track](../glossary/glossary.md#track) in the `handle_end_of_stream/3` +- updating the buffers' list in `handle_buffer/4` +- updating the status of the [track](../glossary/glossary.md#track) in `handle_end_of_stream/3` Therefore, we were simply returning the `:redemand` action, and the `handle_demand/5` was called sequentially after on, trying to produce the output buffer. As you can see, redemand mechanism in filters helps us deal with situations, where we do not know how many input buffers to demand in order to be able to produce an output buffer/buffers. From 1c20407f8a4b3d11fb29a1346cf375ade4f63e4e Mon Sep 17 00:00:00 2001 From: kidp330 Date: Wed, 6 Dec 2023 15:29:15 +0100 Subject: [PATCH 12/18] Update source after start.exs fix --- basic_pipeline/03_Source.md | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/basic_pipeline/03_Source.md b/basic_pipeline/03_Source.md index 5c80571..ac153b3 100644 --- a/basic_pipeline/03_Source.md +++ b/basic_pipeline/03_Source.md @@ -74,7 +74,7 @@ defmodule Basic.Elements.Source do ... @impl true def handle_init(_context, options) do - {[setup: :incomplete], + {[], %{ location: options.location, content: nil @@ -85,7 +85,7 @@ end ``` As said before, `handle_init/2` expects a structure with the previously defined parameters to be passed as an argument. -All we need to do there is to initialize the state - our state will be in a form of a map, and for now on we will put there a `location` (a path to the input file) and the `content`, where we will be holding packets read from the file, which haven't been sent yet. For now, the content is set to nil as we haven't read anything from the input file yet. That's also why we send back the action `setup: :incomplete`. In the next section we'll talk about more involved initialization and resources managed by our element. +All we need to do there is to initialize the state - our state will be in a form of a map, and for now on we will put there a `location` (a path to the input file) and the `content`, where we will be holding packets read from the file, which haven't been sent yet. The recommended approach is to keep `handle_init/2` lean and defer any heavier initialization to `handle_setup/2` which runs automatically afterwards, if defined. In the next section we'll talk about more involved initialization and resources managed by our element. > ### TIP > @@ -94,8 +94,7 @@ All we need to do there is to initialize the state - our state will be in a form ## Preparing our element -When an element requires more time to initialise, you should delegate complex tasks to `handle_setup/2`. This callback runs after `handle_init/2` if it returns the `setup: :incomplete` action. -In our example, we'd like to open, read and save the contents of the input file. We then save it in our state as `content`. +In the `handle_setup/2` callback, we'd like to open, read and save the contents of the input file. We then save it in our state as `content`. **_`lib/elements/source.ex`_** From 0274c1a23c46c64d7f0cdfa22088141c2105ea1f Mon Sep 17 00:00:00 2001 From: kidp330 Date: Wed, 6 Dec 2023 16:03:05 +0100 Subject: [PATCH 13/18] Update 11_Pipeline.md --- basic_pipeline/11_Pipeline.md | 127 ++++++++++------------------------ 1 file changed, 37 insertions(+), 90 deletions(-) diff --git a/basic_pipeline/11_Pipeline.md b/basic_pipeline/11_Pipeline.md index b0691dc..472cd06 100644 --- a/basic_pipeline/11_Pipeline.md +++ b/basic_pipeline/11_Pipeline.md @@ -6,30 +6,30 @@ In many real-life scenarios, this part would be the only thing you would need to ## Defining the pipeline -The pipeline is another behavior introduced by the Membrane Framework. To make the module a pipeline, we need to make it `use Membrane.Pipeline`. That is how we will start our implementation of the pipeline module, in the `lib/Pipeline.ex` file: +The pipeline is another behavior introduced by the Membrane Framework. To make the module a pipeline, we need to make it `use Membrane.Pipeline`. That is how we will start our implementation of the pipeline module, in the `lib/pipeline.ex` file: -**_`lib/Pipeline.ex`_** +**_`lib/pipeline.ex`_** ```elixir defmodule Basic.Pipeline do - use Membrane.Pipeline + use Membrane.Pipeline ... end ``` -You could have guessed - all we need to do now is to describe the desired behavior by implementing the callbacks! In fact, the only callback we want to implement if the pipeline is the `handle_init/1` callback, called once the pipeline is initialized - of course, there are plenty of other callbacks which you might find useful while dealing with a more complex task. You can read about them [here](https://hexdocs.pm/membrane_core/Membrane.Pipeline.html#callbacks). +You could have guessed - all we need to do now is to describe the desired behavior by implementing the callbacks! In fact, the only callback we want to implement if the pipeline is the `handle_init/2` callback, called once the pipeline is initialized - of course, there are plenty of other callbacks which you might find useful while dealing with a more complex task. You can read about them [here](https://hexdocs.pm/membrane_core/Membrane.Pipeline.html#callbacks). Please add the following callback signature to our `Basic.Pipeline` module: -**_`lib/Pipeline.ex`_** +**_`lib/pipeline.ex`_** ```elixir defmodule Basic.Pipeline do ... @impl true - def handle_init(_opts) do + def handle_init(_context, _options) do end end @@ -45,110 +45,57 @@ With such a concept in mind, it's possible to create reliable and fault-tolerant [Here](https://blog.appsignal.com/2021/08/23/using-supervisors-to-organize-your-elixir-application.html) there is a really nice article describing that concept and providing an example of the actor system. Feel free to stop here and read about the supervision mechanism in Elixir if you have never met with that concept before. Our pipeline will also be an actor system - with the `Basic.Pipeline` module being the supervisor of all its elements. As you have heard before - it is the supervisor's responsibility to launch its children's processes. -In the Membrane Framework's pipeline there is a special action designed for that purpose - [`:spec`](https://hexdocs.pm/membrane_core/Membrane.Pipeline.Action.html#t:spec_t/0). -As you can see, you need to specify the `Membrane.ParentSpec` for that purpose. -It consists of: +In the Membrane Framework's pipeline there is a special action designed for that purpose - [`:spec`](https://hexdocs.pm/membrane_core/Membrane.Pipeline.Action.html#t:spec/0). +As you can see, you need to specify the `Membrane.ChildrenSpec` for that purpose. -- `:children` - map of the pipeline's (or a bin) elements -- `:links` - list consisting of links between the elements - -Please stop for a moment and read about the [`Membrane.ParentSpec`](https://hexdocs.pm/membrane_core/Membrane.ParentSpec.html). +Please stop for a moment and read about the [`Membrane.ChildrenSpec`](https://hexdocs.pm/membrane_core/Membrane.ChildrenSpec.html). We will wait for you and once you are ready, we will define our own children and links ;) -Let's start with defining what children we need inside the `handle_init/1` callback! If you have forgotten what structure we want to achieve please refer to the [2nd chapter](02_SystemArchitecture.md) and recall what elements we need inside of our pipeline. +Let's start with defining what children we need inside the `handle_init/2` callback! If you have forgotten what structure we want to achieve please refer to the [2nd chapter](02_SystemArchitecture.md) and recall what elements we need inside of our pipeline. -**_`lib/Pipeline.ex`_** +**_`lib/pipeline.ex`_** ```elixir - -@impl true -def handle_init(_opts) do - children = %{ - input1: %Basic.Elements.Source{location: "input.A.txt"}, - ordering_buffer1: Basic.Elements.OrderingBuffer, - depayloader1: %Basic.Elements.Depayloader{packets_per_frame: 4}, - - input2: %Basic.Elements.Source{location: "input.B.txt"}, - ordering_buffer2: Basic.Elements.OrderingBuffer, - depayloader2: %Basic.Elements.Depayloader{packets_per_frame: 4}, - - mixer: Basic.Elements.Mixer, - output: %Basic.Elements.Sink{location: "output.txt"} - } -end -``` - -Remember to pass the desired file paths in the `:location` option! -We have just created the map, which keys are in the form of atoms that describe each of the children and the values are particular module built-in structures (with all the required fields passed). - -Now we have a `children` map which we will use to launch the processes. But the Membrane needs to know how those children elements are connected (and, in fact, how the pipeline is defined!). Therefore let's create a `links` list with the description of the links between the elements: - -**_`lib/Pipeline.ex`_** - -```elixir - -def handle_init(_opts) do - ... - links = [ - link(:input1) |> to(:ordering_buffer1) |> to(:depayloader1), - link(:input2) |> to(:ordering_buffer2) |> to(:depayloader2), - link(:depayloader1) |> via_in(:first_input) |> to(:mixer), - link(:depayloader2) |> via_in(:second_input) |> to(:mixer), - link(:mixer) |> to(:output) - ] - ... -end -``` - -We hope the syntax is visually descriptive enough to show what is the desired result of that definition. Just to be sure let us go through it bit-by-bit. -Each pipeline's "branch" starts with the `link/1` which takes as an argument an atom corresponding to a given element. All the further elements in the branch can be accessed with the use of the `to/1` function, expecting an atom that identifies that element to be passed as an argument. Note, that the mentioned atoms must be the same as the ones you have used as keys in the `children` map! -`|>` operator allows "linking" of the elements accessed in the way described above, via their [pads](../glossary/glossary.md#pad). By default, the elements' link will be using an `:input` pad as the input pad and an `:output` pad as the output pad. -`via_in/1` allows specifying an input pad with a given name. Since in a [mixer](../glossary/glossary.md#mixer) there are two input pads (`:first_input` and `:second_input`, defined in `Basic.Elements.Mixer` module with `def_input_pad` and `def_output_pad` macros), we need to distinguish between them while linking the elements. -Of course, there is also a `via_out/1` function, which is used to point the output pad with the given identifier, but there was no need to use it. -In the case of other elements we do not need to explicitly point the desired pads since we are taking advantage of the default pads name - `:input` for the input pads and `:output` for the output ones (see what names we have given to our pads in the elements other than the mixer!). However, we could rewrite the following `links` definitions and explicitly specify the pad names: - -**_`lib/Pipeline.ex`_** - -```elixir - -def handle_init(_opts) do +defmodule Basic.Pipeline do ... - links = [ - link(:input1) |> via_out(:output) |> via_in(:input) |> to(:ordering_buffer1) |> via_out(:output) |> via_in(:input) |> to(:depayloader1), - link(:input2) |> via_out(:output) |> via_in(:input) |> to(:ordering_buffer2) |> via_out(:output) |> via_in(:input) |>to(:depayloader2), - link(:depayloader1) |> via_out(:output) |> via_in(:first_input) |> to(:mixer), - link(:depayloader2) |> via_out(:output) |> via_in(:second_input) |> to(:mixer), - link(:mixer) |> via_out(:output) |> via_in(:input) |> to(:output) - ] + @impl true + def handle_init(_context, _options) do + spec = [ + child(:input1, %Basic.Elements.Source{location: "input.A.txt"}) + |> child(:ordering_buffer1, Basic.Elements.OrderingBuffer) + |> child(:depayloader1, %Basic.Elements.Depayloader{packets_per_frame: 4}) + |> via_in(:first_input) + |> child(:mixer, Basic.Elements.Mixer) + |> child(:output, %Basic.Elements.Sink{location: "output.txt"}), + child(:input2, %Basic.Elements.Source{location: "input.B.txt"}) + |> child(:ordering_buffer2, Basic.Elements.OrderingBuffer) + |> child(:depayloader2, %Basic.Elements.Depayloader{packets_per_frame: 4}) + |> via_in(:second_input) + |> get_child(:mixer) + ] + + {[spec: spec], %{}} + end ... -end ``` -That's almost it! All we need to do is to return a proper tuple from the `handle_init/1` callback, with the `:spec` action defined: - -**_`lib/Pipeline.ex`_** - -```elixir +Remember to pass the desired file paths in the `:location` option! -def handle_init(_opts) do - ... - spec = %ParentSpec{children: children, links: links} - { {:ok, spec: spec}, %{} } -end -``` +Now... that's it! :) +The spec list using Membrane's DSL is enough to describe our pipeline's topology. The child keywords spawn components of the specified type and we can use the `|>` operator to link them together. When the pads that should be linked are unamibiguous this is straightforward but for links like those with `Mixer` we can specify the pads using `via_in/1`. There also exists a `via_out/1` keyword which works in a similar way. +As you can see the first argument to `child/2` is a component identifier, but it's also possible to have anonymous children using `child/1`, which just has Membrane generate a unique id under the hood. Our pipeline is ready! Let's try to launch it. We will do so by starting the pipeline, and then playing it. For the ease of use we will do it in a script. -**_`lib/start.exs`_** +**_`start.exs`_** ```elixir -{:ok, pid} = Basic.Pipeline.start() -Basic.Pipeline.play(pid) +{:ok, _sup, _pipeline} = Membrane.Pipeline.start_link(Basic.Pipeline) Process.sleep(500) ``` -You can execute it by running `mix run lib/start.exs` in the terminal. +You can execute it by running `mix run start.exs` in the terminal. In the output file (the one specified in the `handle_init/1` callback of the pipeline) you should see the recovered conversation. From f66c2e424c1303d98ab5570258970df381cf965f Mon Sep 17 00:00:00 2001 From: kidp330 Date: Tue, 19 Dec 2023 17:28:53 +0100 Subject: [PATCH 14/18] Changes after review --- basic_pipeline/03_Source.md | 19 ++++++++++++------- basic_pipeline/04_StreamFormat.md | 14 ++++++++------ basic_pipeline/05_Formats.md | 2 +- basic_pipeline/11_Pipeline.md | 14 ++++++++++---- 4 files changed, 31 insertions(+), 18 deletions(-) diff --git a/basic_pipeline/03_Source.md b/basic_pipeline/03_Source.md index ac153b3..82cac21 100644 --- a/basic_pipeline/03_Source.md +++ b/basic_pipeline/03_Source.md @@ -114,7 +114,7 @@ defmodule Basic.Elements.Source do end ``` -When the setup is complete, the element goes into `playing` state. It can then demand buffers from previous elements and send its `:stream_format` to receiving elements. Since we are implementing a sink we do not have anything to demand from, but we can specify the format. We can do this, for example, in `handle_playing/2`: +When the setup is complete, the element goes into `:playing` state. It can then demand buffers from previous elements and send its `:stream_format` to subsequent elements. Since we are implementing a sink we do not have any previous element to demand from, but we can specify the format. We can do this, for example, in `handle_playing/2`: **_`lib/elements/source.ex`_** @@ -129,7 +129,7 @@ defmodule Basic.Elements.Source do end ``` -The `:stream_format` action means that we want to transmit the information about the supported [formats](../glossary/glossary.md#stream-format-formerly-caps) through the `output` pad, to the next element in the pipeline. In [chapter 4](../basic_pipeline/04_Caps.md) you will find out more about stream formats and learn why it is required to do so. +The `:stream_format` action means that we want to transmit the information about the supported [formats](../glossary/glossary.md#stream-format-formerly-caps) through the `output` pad, to the next element in the pipeline. In [chapter 4](../basic_pipeline/04_StreamFormat.md) you will find out more about stream formats and learn why it is required to do so. ## Demands @@ -162,11 +162,16 @@ defmodule Basic.Elements.Source do end ``` -The callback's body describes the situation in which some buffers were requested. Then we are checking if we have any packets left in the list persisting in the state of the element. If that list is empty, we are sending an `end_of_stream` action, indicating that there will be no more buffers sent through the `:output` pad and that is why there is no point in requesting more buffers. -However, in case of the `content` list of packets being non-empty, we are taking the head of that list, and storing the remaining tail of the list in the state of the element. Later on, we are defining the actions we want to take - that is, we want to return a buffer with the head packet from the original list. We make use of the [`buffer:` action](https://hexdocs.pm/membrane_core/Membrane.Element.Action.html#t:buffer_t/0), and specify that we want to transmit the [`%Buffer`](https://hexdocs.pm/membrane_core/Membrane.Buffer.html#t:t/0) structure through the `:output` pad. Note the fields available in the `%Buffer` structure - in our case, we make use of only the `:payload` field, which, according to the documentation, can be of `any` type - however, in almost all cases you will need to send binary data within this field. Any structured data (just like timestamps etc.) should be passed in the other fields available in the `%Buffer`, designed especially for that cases. -However, there is the other action that is taken - the `:redemand` action, queued to take place on the `:output` pad. This action will simply invoke the `handle_demand/4` callback once again, which is helpful when the whole demand cannot be completely fulfilled in the single `handle_demand` invocation we are just processing. The great thing here is that the `size` of the demand will be automatically determined by the element and we do not need to specify it anyhow. Redemanding, in the context of sources, helps us simplify the logic of the `handle_demand` callback since all we need to do in that callback is to supply a single piece of data and in case this is not enough, take a [`:redemand`](https://hexdocs.pm/membrane_core/Membrane.Element.Action.html#t:redemand_t/0) action and invoke that callback once again. As you will see later, the process of redemanding is even more powerful in the context of [filter elements](../glossary/glossary.md#filter). -But don't give up if you don't grasp demands just yet! :) Membrane also supports `:auto` flow control, which takes care of demands and should be enough for 90% of use cases. +The callback's body describes the situation in which some buffers were requested. Then we are checking if we have any packets left in the list persisting in the state of the element. If that list is empty, we are sending an `end_of_stream` action, indicating that there will be no more buffers sent through the `:output` pad and that is why there is no point in requesting more buffers. -By now you should have created a `Basic.Element.Source` element, with options and output pads defined and its `handle_init/2`, `handle_setup/2`, `handle_playing/2` and `handle_demand/5` callbacks implemented. +However, in case of the `content` list of packets being non-empty, we are taking the head of that list, and storing the remaining tail of the list in the state of the element. Later on, we are defining the actions we want to take - that is, we want to return a buffer with the head packet from the original list. We make use of the [`buffer:` action](https://hexdocs.pm/membrane_core/Membrane.Element.Action.html#t:buffer_t/0), and specify that we want to transmit the [`%Buffer`](https://hexdocs.pm/membrane_core/Membrane.Buffer.html#t:t/0) structure through the `:output` pad. Note the fields available in the `%Buffer` structure - in our case, we make use of only the `:payload` field, which, according to the documentation, can be of `any` type - however, in almost all cases you will need to send binary data within this field. Any structured data (just like timestamps etc.) should be passed in the other fields available in the `%Buffer`, designed especially for that cases. + +Then there's the other action that is taken - the `:redemand` action, queued to take place on the `:output` pad. This action will simply invoke the `handle_demand/4` callback once again, which is helpful when the whole demand cannot be completely fulfilled in the single `handle_demand` invocation we are just processing. The great thing here is that the `size` of the demand will be automatically determined by the element and we do not need to specify it anyhow. Redemanding, in the context of sources, helps us simplify the logic of the `handle_demand` callback since all we need to do in that callback is to supply a single piece of data and in case this is not enough, take a [`:redemand`](https://hexdocs.pm/membrane_core/Membrane.Element.Action.html#t:redemand_t/0) action and invoke that callback once again. As you will see later, the process of redemanding is even more powerful in the context of [filter elements](../glossary/glossary.md#filter). + +> ### TIP +> +> Membrane also supports `:auto` demand mode, which should cover 90% of use cases. +> +> You can learn more about demand modes [here](https://membrane.stream/learn/get_started_with_membrane/6). In the next chapter we will explore what stream formats are in Membrane. diff --git a/basic_pipeline/04_StreamFormat.md b/basic_pipeline/04_StreamFormat.md index 49bb58f..a159b6e 100644 --- a/basic_pipeline/04_StreamFormat.md +++ b/basic_pipeline/04_StreamFormat.md @@ -44,13 +44,15 @@ Module name defines the type of the format, however it is possible to pass some 2. We specify the pad of the element with the format we have just defined, using the `:accepted_format` option. For the purpose of an example, let it be the `:input` pad: ```elixir -def_input_pad(:input, +def_input_pad :input, demand_unit: :buffers, - accepted_format: [ - {Format.Raw, pixel_format: one_of([:I420, :I422]), framerate: range(30, 60), width: 480, height: 300}, - {Format.Raw, pixel_format: one_of([:I420, :I422]), framerate: range(30, 60), width: 720, height: 480} - ] -) + accepted_format: + any_of([ + %Format.Raw{pixel_format: pixel_format, framerate: framerate, width: 480, height: 300} + when pixel_format in [:I420, :I422] and framerate >= 30 and framerate <= 60, + %Format.Raw{pixel_format: pixel_format, framerate: range(30, 60), width: 720, height: 480} + when pixel_format in [:I420, :I422] and framerate >= 30 and framerate <= 60 + ]) ``` As you can see, we pass a list of compatible formats, each described with the tuple, consisting of our module name, and the keywords list fulfilling the diff --git a/basic_pipeline/05_Formats.md b/basic_pipeline/05_Formats.md index 7a8a205..c3f3822 100644 --- a/basic_pipeline/05_Formats.md +++ b/basic_pipeline/05_Formats.md @@ -32,7 +32,7 @@ end Same as in the case of the previous format - we are defining a structure with a single field, called `:encoding`, and the default value of that field - `:utf8`. -That's it! Format modules are really simple - the more complicated thing is to make use of them - which we will do in the subsequent chapters while defining the specs! +That's it! Format modules are really simple, and using them is not that hard either, as you will see in the subsequent chapters - defining the accepted stream formats and sending stream format actions! Before advancing you can test the `Source` [element](../glossary/glossary.md/#source), using the tests provided in `/test` directory. diff --git a/basic_pipeline/11_Pipeline.md b/basic_pipeline/11_Pipeline.md index 472cd06..513f692 100644 --- a/basic_pipeline/11_Pipeline.md +++ b/basic_pipeline/11_Pipeline.md @@ -82,8 +82,9 @@ defmodule Basic.Pipeline do Remember to pass the desired file paths in the `:location` option! Now... that's it! :) -The spec list using Membrane's DSL is enough to describe our pipeline's topology. The child keywords spawn components of the specified type and we can use the `|>` operator to link them together. When the pads that should be linked are unamibiguous this is straightforward but for links like those with `Mixer` we can specify the pads using `via_in/1`. There also exists a `via_out/1` keyword which works in a similar way. -As you can see the first argument to `child/2` is a component identifier, but it's also possible to have anonymous children using `child/1`, which just has Membrane generate a unique id under the hood. +The spec list using Membrane's DSL is enough to describe our pipeline's topology. The child keywords spawn components of the specified type and we can use the `|>` operator to link them together. When the pads that should be linked are unamibiguous this is straightforward but for links like those with `Mixer` we can specify the pads using `via_in/1`. There also exists a `via_out/1` keyword which works similarly for output pads. + +As you can see the first argument to `child/2` is a component identifier. If we had omitted it Membrane would generate a unique identifier under the hood. For more about the `child` functions please refer to [the docs](https://hexdocs.pm/membrane_core/Membrane.ChildrenSpec.html#child/1). Our pipeline is ready! Let's try to launch it. We will do so by starting the pipeline, and then playing it. For the ease of use we will do it in a script. @@ -91,8 +92,13 @@ We will do so by starting the pipeline, and then playing it. For the ease of use **_`start.exs`_** ```elixir -{:ok, _sup, _pipeline} = Membrane.Pipeline.start_link(Basic.Pipeline) -Process.sleep(500) +{:ok, _sup, pipeline} = Membrane.Pipeline.start_link(Basic.Pipeline) +Process.monitor(pipeline) + +# Wait for the pipeline to terminate +receive do + {:DOWN, _monitor, :process, ^pipeline, _reason} -> :ok +end ``` You can execute it by running `mix run start.exs` in the terminal. From d6980dcf968c6f5139d75d31c25adcad683da864 Mon Sep 17 00:00:00 2001 From: kidp330 Date: Tue, 19 Dec 2023 17:48:39 +0100 Subject: [PATCH 15/18] Restructure 09_Mixer.md --- basic_pipeline/09_Mixer.md | 67 +++++++++++++++++++++----------------- 1 file changed, 37 insertions(+), 30 deletions(-) diff --git a/basic_pipeline/09_Mixer.md b/basic_pipeline/09_Mixer.md index e11df67..c3e4dca 100644 --- a/basic_pipeline/09_Mixer.md +++ b/basic_pipeline/09_Mixer.md @@ -49,8 +49,10 @@ defmodule Basic.Elements.Mixer do end ``` -As you can see in the code snippet above, the `Track` will consist of the `:buffer` field, holding the very last buffer received on the corresponding input pad, and the `:status` fields, indicating the status of the track - `:started`, in case we are still expecting some buffers to come (that means - in case `:end_of_stream` event hasn't been received yet) and `:finished` otherwise. -It's a good practice to provide a type specification for such a custom struct since it makes the code easier to reuse and lets the compiler warn us about some misspellings (for instance in the status field atoms), which cause some hard to spot errors. +As you can see in the code snippet above, the `Track` will consist of the `:buffer` field, holding the very last buffer received on the corresponding input pad, and the `:status` fields, indicating the status of the track - `:started`, in case we are still expecting some buffers to come (that means - in case `:end_of_stream` event hasn't been received yet) and `:finished` otherwise. + +It's a good practice to provide a type specification for such a custom struct since it makes the code easier to reuse and lets the compiler warn us about some misspellings (for instance in the status field atoms), which cause some hard to spot errors. + A careful reader might notice, that we are holding only one buffer for each track, instead of a list of all the potentially unprocessed buffers - does it mean that we are losing some of them? Not at all, since we are taking advantage of the elements which have appeared earlier in the [pipeline](../glossary/glossary.md#pipeline) and which provide us with an ordered list of frames on each of the inputs - however, we will need to process each buffer just at the moment it comes on the pad. The logic we're going to implement can be described in the following three steps: @@ -79,7 +81,9 @@ end We have provided a `handle_init/2` callback, which does not expect any options to be passed. We are simply setting up the structure of the element state. As mentioned previously, we will have a `Track` structure for each of the input pads. -What's interesting is this is where the mixer having exactly two inputs stops being important. The missing functionality can be defined generically without much hassle. + +What's interesting is this is where the mixer having exactly two inputs stops being important. The missing functionality can be defined generically without much hassle. + Following on the callbacks implementation, let's continue with the `handle_buffer/4` implementation: **_`lib/elements/mixer.ex`_** @@ -100,6 +104,7 @@ end ``` In this callback we update the mixer's state by assigning the incoming buffer to its track. We can be sure no overwriting of an existing buffer happens, which will become more apparent as we delve further into the logic's implementation. + Once the state is updated we gather all buffers that can be sent (might be none) in `get_output_buffers_actions/1` and return the coresponding `buffer` actions. In case any demands should be sent afterwards we also tell the output pad to redemand. **_`lib/elements/mixer.ex`_** @@ -125,34 +130,9 @@ defmodule Basic.Elements.Mixer do end ``` -What we did here was similar to the logic defined in `handle_buffer/4` - we have just updated the state of the track (in that case - by setting its status as `:finished`), gather the buffers and send them. The important difference is that in case all inputs have closed, we should forward an `end_of_stream` action instead of a `redemand`, signaling the mixer has finished its processing. -The `has_buffer?/1` function is a private utility that will show up in a few more places in our element's definition and is nothing more than a simple check `track.buffer != nil`. -Let's now implement the `handle_demand/5` callback: - -**_`lib/elements/mixer.ex`_** - -```elixir -defmodule Basic.Elements.Mixer do - ... - def handle_demand(:output, _size, _unit, context, state) do - demand_actions = - state.tracks - |> Enum.reject(&has_buffer?/1) - |> Enum.filter(fn {track_id, track} -> - track.status != :finished and context.pads[track_id].demand == 0 - end) - |> Enum.map(fn {track_id, _track} -> {:demand, {track_id, 1}} end) - - {demand_actions, state} - end - ... -end -``` - -Since it should be responsible for producing and sending `demand` actions to the corresponding input pads, we accordingly filter tracks for ones that are empty, started, and with no demands pending. -It should also become clearer why in `handle_buffer/4` the receiving track is sure to have an empty buffer ready to be overwritten, since we only send demands to input pads of empty tracks. -All that's left now is to implement the main processing logic, gathering buffers that are ready to be sent: +What we did here was similar to the logic defined in `handle_buffer/4` - we have just updated the state of the track (in that case - by setting its status to `:finished`), gather the buffers and send them. The important difference is that in case all inputs have closed, we should forward an `end_of_stream` action instead of a `redemand`, signaling the mixer has finished its processing. +Let's now implement gathering ready buffers: **_`lib/elements/mixer.ex`_** @@ -197,9 +177,36 @@ end ``` The `prepare_buffers/1` function is the most involved here, so let's start with that. We first check whether we can send a buffer at all. The next buffer to send in order will of course be one with lowest `.pts`. We then empty the corresponding track's buffer. There might be more than one buffer ready to send and so we iterate the gathering recursively. + We define `can_send_buffer?` as follows. If there's any `:started` track still waiting on a buffer we cannot send more, since whatever buffers the mixer's currently holding might come after the one that's yet to be received on this track. + Otherwise, if all tracks have finished it can still be the case that some have non-empty buffers. We can happily send all of these in order since it is guaranteed there is no buffer preceding them that we would have to wait for. +All that's left now is to handle redemands. + +**_`lib/elements/mixer.ex`_** + +```elixir +defmodule Basic.Elements.Mixer do + ... + def handle_demand(:output, _size, _unit, context, state) do + demand_actions = + state.tracks + |> Enum.reject(&has_buffer?/1) + |> Enum.filter(fn {track_id, track} -> + track.status != :finished and context.pads[track_id].demand == 0 + end) + |> Enum.map(fn {track_id, _track} -> {:demand, {track_id, 1}} end) + + {demand_actions, state} + end + ... +end +``` + +Since it should be responsible for producing and sending `demand` actions to the corresponding input pads, we accordingly filter tracks for ones that are empty, started, and with no demands pending. +It should also become clearer why in `handle_buffer/4` the receiving track is sure to have an empty buffer ready to be overwritten, since we only send demands to input pads of empty tracks. + And that's all! the mixer's ready to mix, and ready to be tested: ```console From 8f978ddbbb9938ca0abcee10a5daaa866d9d8a86 Mon Sep 17 00:00:00 2001 From: varsill Date: Thu, 21 Dec 2023 11:17:30 +0100 Subject: [PATCH 16/18] Update description of stream formats --- basic_pipeline/04_StreamFormat.md | 65 ++++++++++++++++--------------- 1 file changed, 34 insertions(+), 31 deletions(-) diff --git a/basic_pipeline/04_StreamFormat.md b/basic_pipeline/04_StreamFormat.md index a159b6e..0a13f5d 100644 --- a/basic_pipeline/04_StreamFormat.md +++ b/basic_pipeline/04_StreamFormat.md @@ -43,6 +43,17 @@ Module name defines the type of the format, however it is possible to pass some 2. We specify the pad of the element with the format we have just defined, using the `:accepted_format` option. For the purpose of an example, let it be the `:input` pad: +```elixir + def_input_pad :input, + demand_unit: :buffers, + accepted_format: + %Format.Raw{pixel_format: pixel_format, framerate: framerate, width: 480, height: 300} + when pixel_format in [:I420, :I422] and framerate >= 30 and framerate <= 60 +``` +As you can see, the argument of that option is simply a match pattern. The incoming stream format is later confronted against that match pattern. If it does not match, an exception is thrown at the runtime. + +To simplify the pattern definition, there is `any_of/1` helper function that allows to define a alternative of match patterns - the matching will succeed if the stream format received on the pad matches any of the patterns listed as `any_of/1` argument. Below you can see an example of defining alternative of match patterns: + ```elixir def_input_pad :input, demand_unit: :buffers, @@ -50,22 +61,11 @@ def_input_pad :input, any_of([ %Format.Raw{pixel_format: pixel_format, framerate: framerate, width: 480, height: 300} when pixel_format in [:I420, :I422] and framerate >= 30 and framerate <= 60, - %Format.Raw{pixel_format: pixel_format, framerate: range(30, 60), width: 720, height: 480} + %Format.Raw{pixel_format: pixel_format, framerate: framerate, width: 720, height: 480} when pixel_format in [:I420, :I422] and framerate >= 30 and framerate <= 60 ]) ``` -As you can see, we pass a list of compatible formats, each described with the tuple, consisting of our module name, and the keywords list fulfilling the -structure defined in that module. For the format's options, we can use the `range/2` or `one_of/1` specifier, which will modify the way in which the comparison between the accepted specification and the actual format received by the element is performed. - -3. Once the `:stream_format` event comes to the element's pad, the format description sent in that event is confronted with each of the formats in the specification list of the pad. If the event's format description matches even one of the formats present in the list it means that they are matching. - -- We have used `framerate: range(30, 60)`, so will accept the framerate value in the given interval, between 30 and 60 FPS. -- We have also used `pixel_format: one_of([:I420, :I422]`, and that will accept formats, whose pixel format is either I420 or I422 -- We have used a plain value to specify the `width` and the `height` of a picture - the format will match if that option will be equal to the value passed in the specification - -4. As noted previously, one can specify the format as `:any`. Such a specification will match all the formats sent on the pad, however, it is not a recommended way to develop the element - formats are there for a reason! - Our journey with stream formats does not end here. We know how to describe their specification...but we also need to make our elements send the `:stream_format` events so that the following elements will be aware of what type of data our element is producing! An element can send a stream format as one of the [actions](https://hexdocs.pm/membrane_core/Membrane.Element.Action.html) it can take - the [`:stream_format` action](https://hexdocs.pm/membrane_core/Membrane.Element.Action.html#t:stream_format/0). @@ -89,10 +89,12 @@ Here is the definition of the source element: defmodule Source do def_output_pad(:output, demand_unit: :buffers, - stream_format: [ - {Format.Raw, pixel_format: one_of([:I420, :I422]), framerate: range(30, 60), width: 480, height: 300}, - {Format.Raw, pixel_format: one_of([:I420, :I422]), framerate: range(30, 60), width: 720, height: 480} - ] + stream_format: any_of([ + %Format.Raw{pixel_format: pixel_format, framerate: framerate, width: 480, height: 300} + when pixel_format in [:I420, :I422] and framerate >= 30 and framerate <= 60, + %Format.Raw{pixel_format: pixel_format, framerate: framerate, width: 720, height: 480} + when pixel_format in [:I420, :I422] and framerate >= 30 and framerate <= 60 + ]) ) ... def handle_playing(_context, state) do @@ -103,36 +105,37 @@ defmodule Source do While returning from the `handle_playing/2` callback, the element will send the format described by the `Formats.Raw` structure, through the `:output` pad. Will this format meet the accepted specification provided by us? Think about it! -In fact, it will. The format matches (both in the event being sent and in the accepted specification of the pad, we have `Format.Raw` module). When it comes to the options, we see, that `I420` is in the `one_of` list, acceptable by the specification format for `width` equal to 720 and `height` equal to 480, and the `framerate`, equal to 45, is in the `range` between 30 and 60, as defined in the specification. -It means that the format can be sent through the `:output` pad. +In fact, it will, as the `Formats.Raw` structure sent with `:stream_format` action matches the pattern - the value of `:pixel_format` field is one of `:I420` and `:I422`, and the `:framerate` is in the range between 30 and 60. In case the structure didn't match the pattern, a runtime exception would be thrown. + Below there is the draft of the filter implementation: ```elixir # Filter defmodule Filter do - def_input_pad(:input, + def_input_pad:input, demand_unit: :buffers, - accepted_format: [ - {Format.Raw, pixel_format: one_of([:I420, :I422]), framerate: range(30, 60), width: 480, height: 300}, - {Format.Raw, pixel_format: one_of([:I420, :I422]), framerate: range(30, 60), width: 720, height: 480} - ] - ) + accepted_format: any_of([ + %Format.Raw{pixel_format: pixel_format, framerate: framerate, width: 480, height: 300} + when pixel_format in [:I420, :I422] and framerate >= 30 and framerate <= 60, + %Format.Raw{pixel_format: pixel_format, framerate: framerate, width: 720, height: 480} + when pixel_format in [:I420, :I422] and framerate >= 30 and framerate <= 60 + ]) - def_output_pad(:output, - demand_unit: :buffers, - accepted_format: {Format.Raw, pixel_format: one_of([:I420, :I422]), framerate: range(30, 60), width: 480, height: 300}, - ) + def_output_pad :output, + demand_unit: :buffers, + accepted_format: %Format.Raw{pixel_format: pixel_format, framerate: framerate, width: 480,height: 300} when pixel_format in [:I420, :I422] and framerate >= 30 and framerate <= 60 ... + def handle_stream_format(_pad, _stream_format, _context, state) do - ... - { {:ok, [stream_format: {:output, %Formats.Raw{pixel_format: I420, framerate: 60, width: 480, height:300} }]}, state} + ... + { {[stream_format: {:output, %Formats.Raw{pixel_format: I420, framerate: 60, width: 480, height: 300} }]}, state} end end ``` -When we receive the spec on the input pad, we do not propagate it to our `:output` pad - instead, we send a different format, with reduced quality (width and height options are lower). +When we receive the spec on the input pad, we do not propagate it to our `:output` pad - instead, we send a different format, with reduced quality (values of the `width` and `height` fields might be lower). We hope by now you have a better understanding of what stream formats are. This knowledge will be helpful in the following chapters. From 7d6ad56af64762867ace3ee6f09ac8267345f1a1 Mon Sep 17 00:00:00 2001 From: varsill Date: Thu, 21 Dec 2023 12:12:42 +0100 Subject: [PATCH 17/18] Update description of redemands --- basic_pipeline/07_Redemands.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/basic_pipeline/07_Redemands.md b/basic_pipeline/07_Redemands.md index 6ebdf9b..62c71ee 100644 --- a/basic_pipeline/07_Redemands.md +++ b/basic_pipeline/07_Redemands.md @@ -40,16 +40,16 @@ end ``` ## In Filter elements - In the filter element, the situation is quite different. -Since the filter's responsibility is to process the data sent via the input pads and transmit it through the output pads, there is no 'side-channel' from which we could take data. That is why in normal circumstances you would transmit the buffer through the output pad in the `handle_buffer/4` callback (which means - once your element receives a buffer, you process it, and then you 'mark' it as ready to be output with the `:buffer` action). When it comes to the `handle_demand/5` action on the output pad, all you need to do is to demand the appropriate number of buffers on the element's input pad. The behavior which is easy to specify when we exactly know how many input buffers correspond to the one output buffer (recall the situation in the [Depayloader](../glossary/glossary.md#payloader-and-depayloader) of our pipeline, where we *a priori* knew, that each output buffer ([frame](../glossary/glossary.md#frame)) consists of a given number of input buffers ([packets](../glossary/glossary.md#packet))), becomes impossible to define if the output buffer might be a combination of a discretionary set number of input buffers. However, we have dealt with an unknown number of required buffers in the OrderingBuffer implementation, where we didn't know how many input buffers do we need to demand to fulfill the missing spaces between the packets ordered in the list. How did we manage to do it? +Since the filter's responsibility is to process the data sent via the input pads and transmit it through the output pads, there is no 'side-channel' from which we could take data. That is why in normal circumstances you would transmit the buffer through the output pad in the `handle_buffer/4` callback (which means - once your element receives a buffer, you process it, and then you 'mark' it as ready to be output with the `:buffer` action). When it comes to the `handle_demand/5` action on the output pad, all you need to do is to demand the appropriate number of buffers on the element's input pad. That behavior is easy to specify when we exactly know how many input buffers correspond to the one output buffer (recall the situation in the [Depayloader](../glossary/glossary.md#payloader-and-depayloader) of our pipeline, where we *a priori* knew, that each output buffer ([frame](../glossary/glossary.md#frame)) consists of a given number of input buffers ([packets](../glossary/glossary.md#packet))). However it becomes impossible to define if the output buffer might be a combination of a discretionary set number of input buffers. At the same time, we have dealt with an unknown number of required buffers in the OrderingBuffer implementation, where we didn't know how many input buffers do we need to demand to fulfill the missing spaces between the packets ordered in the list. How did we manage to do it? We simply used the `:redemand` action! In case there was a missing space between the packets, we returned the `:redemand` action, which immediately called the `handle_demand/5` callback (implemented in a way to request for a buffer on the input pad). The fact, that that callback invocation was immediate, which means - the callback was called synchronously, right after returning from the `handle_buffer/4` callback, before processing any other message from the element's mailbox - might be crucial in some situations, since it makes us sure, that the demand will be done before handling any other event. -Recall the situation in the [Mixer](../glossary/glossary.md#mixer), where we were producing the output buffers right in the `handle_demand/5` callback. We needed to attempt to create the output buffer after: +Recall the situation in the [Mixer](../glossary/glossary.md#mixer), where we were producing the output buffers right in the `handle_demand/5` callback. We needed to attempt to create the output buffer after: - updating the buffers' list in `handle_buffer/4` - updating the status of the [track](../glossary/glossary.md#track) in `handle_end_of_stream/3` Therefore, we were simply returning the `:redemand` action, and the `handle_demand/5` was called sequentially after on, trying to produce the output buffer. As you can see, redemand mechanism in filters helps us deal with situations, where we do not know how many input buffers to demand in order to be able to produce an output buffer/buffers. +In case we don't provide enough buffers in the `handle_demand/5` callback (or we are not sure that we do provide), we should call `:redemand` somewhere else (usually in the `handle_buffer/4`) to make sure that the demand is not lost. With that knowledge let's carry on with the next element in our pipeline - `Depayloader`. From 76d3942d9f0fbe396f91b5189cb7694bd309a8af Mon Sep 17 00:00:00 2001 From: kidp330 Date: Thu, 21 Dec 2023 14:40:35 +0100 Subject: [PATCH 18/18] Formatting fixes --- basic_pipeline/07_Redemands.md | 7 +++++-- get_started_with_membrane/03_elements.md | 3 ++- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/basic_pipeline/07_Redemands.md b/basic_pipeline/07_Redemands.md index 62c71ee..d9de38f 100644 --- a/basic_pipeline/07_Redemands.md +++ b/basic_pipeline/07_Redemands.md @@ -41,8 +41,11 @@ end ## In Filter elements In the filter element, the situation is quite different. -Since the filter's responsibility is to process the data sent via the input pads and transmit it through the output pads, there is no 'side-channel' from which we could take data. That is why in normal circumstances you would transmit the buffer through the output pad in the `handle_buffer/4` callback (which means - once your element receives a buffer, you process it, and then you 'mark' it as ready to be output with the `:buffer` action). When it comes to the `handle_demand/5` action on the output pad, all you need to do is to demand the appropriate number of buffers on the element's input pad. That behavior is easy to specify when we exactly know how many input buffers correspond to the one output buffer (recall the situation in the [Depayloader](../glossary/glossary.md#payloader-and-depayloader) of our pipeline, where we *a priori* knew, that each output buffer ([frame](../glossary/glossary.md#frame)) consists of a given number of input buffers ([packets](../glossary/glossary.md#packet))). However it becomes impossible to define if the output buffer might be a combination of a discretionary set number of input buffers. At the same time, we have dealt with an unknown number of required buffers in the OrderingBuffer implementation, where we didn't know how many input buffers do we need to demand to fulfill the missing spaces between the packets ordered in the list. How did we manage to do it? -We simply used the `:redemand` action! In case there was a missing space between the packets, we returned the `:redemand` action, which immediately called the `handle_demand/5` callback (implemented in a way to request for a buffer on the input pad). The fact, that that callback invocation was immediate, which means - the callback was called synchronously, right after returning from the `handle_buffer/4` callback, before processing any other message from the element's mailbox - might be crucial in some situations, since it makes us sure, that the demand will be done before handling any other event. +Since the filter's responsibility is to process the data sent via the input pads and transmit it through the output pads, there is no 'side-channel' from which we could take data. That is why in normal circumstances you would transmit the buffer through the output pad in the `handle_buffer/4` callback (which means - once your element receives a buffer, you process it, and then you 'mark' it as ready to be output with the `:buffer` action). When it comes to the `handle_demand/5` action on the output pad, all you need to do is to demand the appropriate number of buffers on the element's input pad. + +That behavior is easy to specify when we exactly know how many input buffers correspond to the one output buffer (recall the situation in the [Depayloader](../glossary/glossary.md#payloader-and-depayloader) of our pipeline, where we *a priori* knew, that each output buffer ([frame](../glossary/glossary.md#frame)) consists of a given number of input buffers ([packets](../glossary/glossary.md#packet))). However it becomes impossible to define if the output buffer might be a combination of a discretionary set number of input buffers. At the same time, we have dealt with an unknown number of required buffers in the OrderingBuffer implementation, where we didn't know how many input buffers do we need to demand to fulfill the missing spaces between the packets ordered in the list. How did we manage to do it? + +We simply used the `:redemand` action! In case there was a missing space between the packets, we returned the `:redemand` action, which immediately called the `handle_demand/5` callback (implemented in a way to request for a buffer on the input pad). The fact, that that callback invocation was immediate, which means - the callback was called synchronously, right after returning from the `handle_buffer/4` callback, before processing any other message from the element's mailbox - might be crucial in some situations, since it guarantees that the demand will be done before handling any other event. Recall the situation in the [Mixer](../glossary/glossary.md#mixer), where we were producing the output buffers right in the `handle_demand/5` callback. We needed to attempt to create the output buffer after: - updating the buffers' list in `handle_buffer/4` diff --git a/get_started_with_membrane/03_elements.md b/get_started_with_membrane/03_elements.md index afcd605..dd1c583 100644 --- a/get_started_with_membrane/03_elements.md +++ b/get_started_with_membrane/03_elements.md @@ -84,7 +84,8 @@ Each option can have the following fields: - `default` - The value that the option will have if it's not specified. If the default is not provided, the option must be always explicitly specified. - `description` - Write here what the option does. It will be included in the module documentation. -We'll see a practical example of defining options in the [sample element](#sample-element). +We'll see a practical example of defining options in the [sample element](#sample-element). + ## Callbacks Apart from specifying pads and options, creating an element involves implementing callbacks. They have different responsibilities and are called in a specific order. As in the case of pipelines, callbacks interact with the framework by returning [actions](https://hexdocs.pm/membrane_core/Membrane.Element.Action.html). Here are some most useful callbacks: