Skip to content

Commit

Permalink
feat(video): replace async_frames with frame_pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
stakach committed Jul 3, 2023
1 parent b03286b commit 4197957
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 37 deletions.
7 changes: 6 additions & 1 deletion shard.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: ffmpeg
version: 0.5.1
version: 0.6.0

dependencies:
# we use stumpy to extract each pixels colour and draw bounding boxes
Expand All @@ -10,6 +10,11 @@ dependencies:
ipaddress:
github: Sija/ipaddress.cr

# for async frame processing
tasker:
github: spider-gazelle/tasker
version: ~> 2.1

development_dependencies:
# we use stumpy to extract each pixels colour and draw bounding boxes
stumpy_png:
Expand Down
36 changes: 15 additions & 21 deletions spec/video_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ module FFmpeg
Spec.before_each do
File.delete?("./output.png")
File.delete?("./output2.png")
File.delete?("./async_output.png")
end

it "calculates scaled size properly" do
Expand Down Expand Up @@ -52,28 +53,21 @@ module FFmpeg
it "skips frames while processing images" do
video = Video.open(Path.new("./test.mp4"))

ready = Channel(Nil).new(1)
data = Channel(Tuple(StumpyCore::Canvas, Bool)).new(1)

spawn do
write_frame = 60
frame_count = 0

loop do
ready.send nil
frame, key_frame = data.receive
frame_count += 1
next if frame_count < write_frame
puts "writing async output"
StumpyPNG.write(frame, "./async_output.png")
break
end

ready.close
data.close
end
write_frame = 60
frame_count = 0

video.async_frames(ready, data)
pipeline = nil
pipeline = Tasker::Pipeline(Tuple(StumpyCore::Canvas, Bool), StumpyCore::Canvas).new("processor") { |(frame, _key_frame)|
# process frames here
frame_count += 1
next frame if frame_count < write_frame
puts "writing async output"
StumpyPNG.write(frame, "./async_output.png")
pipeline.try &.close
frame
}

video.frame_pipeline(pipeline.as(Tasker::Pipeline(Tuple(StumpyCore::Canvas, Bool), StumpyCore::Canvas)))
File.exists?("./async_output.png").should be_true
end

Expand Down
2 changes: 1 addition & 1 deletion src/ffmpeg/format.cr
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class FFmpeg::Format
# Lazy load the packet
getter packet : Packet { Packet.new(@buffer_size) }

def read
def read(&)
status = LibAV::Format.read_frame(@context, packet)
raise "failed to read a frame with #{status}" if status < 0
begin
Expand Down
24 changes: 10 additions & 14 deletions src/ffmpeg/video.cr
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
require "stumpy_core"
require "../ffmpeg"
require "tasker"

abstract class FFmpeg::Video
def finalize
Expand Down Expand Up @@ -85,7 +86,8 @@ abstract class FFmpeg::Video
def each_frame(
output_width : Int? = nil,
output_height : Int? = nil,
scaling_method : ScalingAlgorithm = ScalingAlgorithm::Bicublin
scaling_method : ScalingAlgorithm = ScalingAlgorithm::Bicublin,
&
)
codec, scaler, rgb_frame, frame_buffer, stream_index, canvas, cropped, requires_cropping = configure(output_width, output_height, scaling_method)

Expand All @@ -105,34 +107,28 @@ abstract class FFmpeg::Video
GC.collect
end

def async_frames(
ready : Channel(Nil),
data : Channel(Tuple(StumpyCore::Canvas, Bool)),
def frame_pipeline(
pipeline : Tasker::Pipeline,
output_width : Int? = nil,
output_height : Int? = nil,
scaling_method : ScalingAlgorithm = ScalingAlgorithm::Bicublin
)
codec, scaler, rgb_frame, frame_buffer, stream_index, canvas, cropped, requires_cropping = configure(output_width, output_height, scaling_method)

Log.trace { "extracting frames" }
while !closed?
while !closed? && !pipeline.closed?
format.read do |packet|
if packet.stream_index == stream_index
if frame = codec.decode(packet)
select
when ready.receive
data.send scale_and_extract(scaler, frame, rgb_frame, canvas, frame_buffer, requires_cropping, cropped)
else
Log.trace { "skipping frame" }
end
if (frame = codec.decode(packet)) && pipeline.idle?
pipeline.process scale_and_extract(scaler, frame, rgb_frame, canvas, frame_buffer, requires_cropping, cropped)
else
Log.trace { "skipping frame" }
end
end
end
end
rescue Channel::ClosedError
ensure
ready.close
data.close
close
@format = Format.new
GC.collect
Expand Down

0 comments on commit 4197957

Please sign in to comment.