Complexities of Media Streaming

June 21, 2025

Contents

Intro

If you’ve used YouTube, Spotify, Netflix, or any similar apps before, you’re probably familiar with the concept of media streaming. Embedding media content that can be loaded directly from a remote source is a common feature for apps that support audio or video data. There are a number of libraries that can handle this - ffmpeg, gstreamer, and mpv, to name a few. However, these are heavy dependencies, and you may want to use something more lightweight and easy to embed in your app.

It can be fairly easy to create something that works under optimal conditions, but an efficient solution that handles edge cases correctly can be deceptively complicated due to the number of concurrency-related challenges you’ll encounter. I created a Rust library to solve this specific problem and I’ll talk a bit about how it works in this post. We’ll talk specifically about audio streaming here, but most of these concepts apply to other types of media as well.

Background: Anatomy of an audio program

Let’s look at an example architecture for a simple audio program. Our audio application will consist of several components:

Putting it all together, the flow looks something like this:

audio program workflow

Note

A detailed description of how an audio decoder works is outside the scope of this post. The example here is meant to convey the basic concepts.

We’ll create a simple audio program that decodes a list of files from some user input and plays them in order. We’ll use a queue to send file objects from the main thread to the decoder and a ring buffer to send decoded PCM data from the decoder to the audio output device, as described above.

Note

The code used throughout this article is simplified to avoid being overly burdened by implementation details. It is not intended to be executed verbatim.

async fn main() -> Result<()> {
    let (queue_producer, queue_consumer) = Queue::new();
    let (ring_buffer_producer, ring_buffer_consumer) = RingBuffer::new();

    // Spawn the decoder thread.
    thread::spawn(move || decode_input(queue_consumer, ring_buffer_producer));
    // Spawn the audio thread.
    thread::spawn(move || process_audio_output(ring_buffer_consumer));

    // Enqueue any new files we receive from user input.
    while let Some(path) = next_input().await {
        let file = File::open(path)?;
        // Add the file to the queue.
        queue_producer.enqueue(file);
    }

    Ok(())
}

fn decode_input(
    queue_consumer: QueueConsumer<File>,
    ring_buffer_producer: RingBufferProducer<f32>,
) -> Result<()> {
    // Keep processing inputs until the queue is closed.
    while let Some(file) = queue_consumer.dequeue() {
        let mut decoder = Decoder::new(file);
        // We need to iterate through the file and decode one chunk at a time,
        // so we can fill the output stream as quickly as possible.
        while !decoder.is_finished() {
            // Decode the next chunk of data from the file.
            decoder.decode_next_chunk()?;
            // Read the decoded samples from the decoder and
            // write them into the ring buffer.
            // This will block until there's space available.

            // (the word "sample" refers to a single data point in the stream)
            ring_buffer_producer.write(decoder.decoded_samples())?;
        }
    }
    Ok(())
}

fn process_audio_output(ring_buffer_consumer: RingBufferConsumer<f32>) -> Result<()> {
    let mut audio_device = AudioOutputDevice::new()?;
    audio_device.start_output_stream()?;
    let mut buf = Vec::new();
    loop {
        // Poll the audio device and wait for the next buffer to become available.
        let buffer_size = audio_device.poll_next_buffer()?;
        // Ensure the buffer has the correct size.
        buf.resize(buffer_size);
        // Read as many samples as possible from the ring buffer.
        let written = ring_buffer_consumer.read(&mut buf)?;
        // Mute the remaining samples to prevent any noise.
        buf[written..].iter_mut().for_each(|s| *s = 0);
        // Write them to the audio device.
        audio_device.write_buffer(&buf)?;
    }
    Ok(())
}

This architecture works fine if you already have the complete contents available in a file that can be read synchronously, but this is not the case when streaming content from a remote server.

Let’s talk about some solutions for handling this, in order of increasing complexity.

Solution #1: Download It First

Instead of receiving file paths as user input, let’s change the code to receive URLs instead. We’ll use an HTTP client to fetch the media content, save it to a temporary file, and pass the file object to the decoder.

async fn main() -> Result<()> {
    let (queue_producer, queue_consumer) = Queue::new();
    let (ring_buffer_producer, ring_buffer_consumer) = RingBuffer::new();

    // Spawn the decoder thread.
    thread::spawn(move || decode_input(queue_consumer, ring_buffer_producer));
    // Spawn the audio thread.
    thread::spawn(move || process_audio_output(consumer));

    // Enqueue any new files we receive from user input.
    while let Some(path) = next_input().await {
        let file = File::open(path)?;
        // Add the file to the queue.
        queue_producer.enqueue(file);
    }
    let client = HttpClient::new();
    // Enqueue any new URLs we receive from user input.
    while let Some(url) = next_input().await {
        // Download the whole file.
        let bytes = client.get(url).await.bytes();
        // Write the content to a temporary file.
        let temp_file = TempFile::new();
        temp_file.write_all(bytes)?;
        // Add the file to the queue.
        queue_producer.enqueue(temp_file.into_file());
    }

    Ok(())
}

This is a simple solution that may be adequate under the following conditions:

However, this solution is problematic unless all of these points hold. It’s not very efficient to wait for the entire file to download before decoding any of the contents. What if you need to download a thirty-minute audio file and your connection speed suddenly slows down? You could be waiting several minutes for the source to start playing.

Solution #2: Just-in-time Streaming

So far, we’ve exclusively used files as input for our audio decoder, but this isn’t strictly necessary. A robust decoder library should be able to operate on a generic interface for reading bytes rather than being coupled with file objects.

Our custom I/O source can utilize HTTP range requests to avoid requesting the entire file contents each time. Using the Range header, we can ask for a specific range of bytes, which can yield significant performance improvements for larger content.

Implementing the std::io::Read interface allows us to fill a buffer with our input data whenever the caller requests it.

use std::io;

struct StreamReader {
    client: HttpClient,
    content_length: u64,
    url: Url,
    position: usize,
}

impl StreamReader {
    async fn new(url: Url) -> Result<Self> {
        let client = HttpClient::new();
        // The Content-Length response header tells us the size of the stream in bytes
        let content_length: u64 = client.head(url).await?.header("Content-Length")?.parse()?;
        Self {
            client,
            content_length,
            url,
            position: 0,
        }
    }
}

impl io::Read for StreamReader {
    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
        let start = self.position;
        // Range headers use inclusive upper bounds,
        // so we have to subtract 1 from the length to get the end value.
        // We also need to ensure we don't request data past
        // the end of the stream.
        let end = (start + buf.len()).min(self.content_length) - 1;

        // Send a request for the next chunk of data only
        let bytes = self
            .client
            .header("Range", format!("bytes={start}-{end}"))
            .get_blocking(self.url)
            .bytes();

        let bytes_len = bytes.len();
        // Fill the buffer with the response data.
        buf[..bytes_len].copy_from_slice(&bytes);
        // Update our stream position.
        self.position += bytes_len;

        Ok(bytes_len)
    }
}

Decoders also need to be able to perform seek operations to move to different locations within the file. Decoders can issue seek requests for a variety of reasons, such as fast forward or rewind operations.

All we need to do here is update our current position. The next call to read will request data from the new position.

use std::io::{self, SeekFrom};

impl io::Seek for StreamReader {
    fn seek(&mut self, position: SeekFrom) -> io::Result<u64> {
        match position {
            SeekFrom::Start(from_start) => {
                self.position = from_start as usize;
            }
            SeekFrom::Current(from_current) => {
                self.position = (self.position as i64 + from_current) as usize;
            }
            SeekFrom::End(from_end) => {
                self.position = (self.content_length as i64 + from_end) as usize;
            }
        }
        // Return the new stream position
        Ok(self.position as u64)
    }
}

To use our new HttpSource, we need to change our main loop to instantiate a new source for each URL we receive. We’ll also change our queue to accept any type implementing io::Read and io::Seek. Almost everything else remains unchanged.

async fn main() -> Result<()> {
    let (queue_producer, queue_consumer) = Queue::new();
    let (ring_buffer_producer, ring_buffer_consumer) = RingBuffer::new();

    // Spawn the decoder thread.
    thread::spawn(move || decode_input(queue_consumer, ring_buffer_producer));
    // Spawn the audio thread.
    thread::spawn(move || audio_output(ring_buffer_consumer));

    let client = HttpClient::new();
    // Enqueue any new URLs we receive from user input.
    while let Some(url) = next_input().await {
        // Download the whole file.
        let bytes = client.get(url).await?.bytes();
        // Write the content to a temporary file.
        let temp_file = TempFile::new();
        temp_file.write_all(bytes);
        // Add the file to the queue.
        queue_producer.enqueue(temp_file);
        let source = StreamReader::new(url).await?;
        // Add the source to the queue.
        queue_producer.enqueue(source);
    }

    Ok(())
}

    fn decode_input(
        queue_consumer: QueueConsumer<File>,
        ring_buffer_producer: RingBufferProducer<f32>,
    ) -> Result<()> {
    fn decode_input<T>(
        queue_consumer: QueueConsumer<T>,
        ring_buf_producer: RingBufferProducer<f32>,
    ) -> Result<()>
    where
        T: io::Read + io::Seek,
    {
      ...
    }

What did we gain from this solution? We can now start decoding content from each incoming source as soon as the first call to read completes. But is this enough?

We still need to complete a round trip to the server every time the decoder requests data from the source. Some decoders may request data in small chunks - as little as a few kilobytes at a time. This could result in thousands of HTTP requests to finish decoding a single audio track. Again, this may be okay if both our client and server are sufficiently fast, but it’s imperative that the decoder is able to work through the source quickly so that the audio stream never runs out of data. A few slow requests could be enough to cause stuttering.

Solution #3: Ahead-of-time Streaming

We need something that combines the benefits of both solutions discussed so far - a way to support efficient data retrieval and minimal delay to start the process. To do this, we need to start downloading the content, begin decoding, and maintain a local cache of the data that allows for fast random access.

We’ll need a few things:

When we’re done, the flow will look like this:

ahead-of-time streaming diagram

First, lets take a look at the new version of StreamReader. We have a temporary file that we’ll use to store the downloaded content along with a StreamProgress struct that we’ll use to track and interact with the stream download task.

struct StreamReader {
    reader: TempFile,
    stream_progress: StreamProgress,
    content_length: u64,
}

impl StreamReader {
    fn new(reader: TempFile, stream_progress: StreamProgress, content_length: u64) -> Self {
        Self {
            reader,
            stream_progress,
            content_length,
        }
    }
}

Our Read implementation involves checking the current download progress and blocking the current thread until the download has progressed enough to fill the requested buffer.

use std::io;

impl io::Read for StreamReader {
    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
        // Get our current position in the stream
        let stream_position = self.reader.stream_position()?;
        let requested_position = stream_position + buf.len();

        // Get the range of downloaded bytes that intersects the current position
        if let Some(closest_set) = self.stream_progress.intersection(stream_position) {
            if closest_set.end >= requested_position {
                // We've already downloaded the data we need.
                // so we can just read it from the file.
                return self.reader.read(buf);
            }
        }
        // Wait for the stream to download enough data.
        // This blocks the thread until it completes.
        self.stream_progress.request_position(requested_position);
        self.stream_progress.wait_for_position();

        self.reader.read(buf)
    }
}

The new implementation of Seek looks similar, except for the last part where we again need to ensure the download has progressed far enough for our operation to complete.

use std::io::{self, SeekFrom};

impl io::Seek for StreamReader {
    fn seek(&mut self, position: SeekFrom) -> io::Result<u64> {
        let absolute_position = match position {
            SeekFrom::Start(from_start) => from_start,
            SeekFrom::Current(from_current) => {
                (self.reader.stream_position()? as i64 + from_current) as u64;
            }
            SeekFrom::End(from_end) => (self.content_length as i64 + from_end) as u64,
        };

        // Check to see if we've already downloaded what we need.
        if self.stream_handle.intersection(absolute_position).is_none() {
            // If not, reset the stream to the requested position so we don't have to
            // wait for the download to catch up.
            self.stream_progress.request_seek(absolute_position);
            // Wait for the stream to restart at our new position.
            self.stream_progress.wait_for_position();
        }

        self.reader.seek(position)
    }
}

Next, let’s look at the implementation of StreamProgress. This is the structure we saw previously that will act as the bridge between our reader and downloader tasks.

First, we’ll add the logic required to track the download progress. We’ll use something called a RangeSet for this - a data structure for storing disjoint sets of numeric ranges. If no seek operations occur, we may only have one range in the set ([0..current_progress]), but if we perform any seek operations before the download finishes, there may be missing data in the middle. For example, if we download bytes 0 through 1000, seek forward to 2000, then download 1000 more bytes, the two ranges stored in our set will be [0..1000, 2000..3000].

use parking_lot::{Condvar, Mutex, RwLock};
use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
use std::sync::Arc;
use tokio::sync::mpsc;

struct StreamProgress {
    downloaded_ranges: Arc<RwLock<RangeSet<u64>>>,
    requested_position: Arc<AtomicI64>,
    seek_tx: mpsc::Sender<u64>,
    monitor: Arc<(Mutex<Waiter>, Condvar)>,
}

struct Waiter {
    position_reached: bool,
    stream_done: bool,
}

impl StreamProgress {
    fn new(seek_tx: mpsc::Sender<u64>) -> Self {
        Self {
            downloaded_ranges: Default::default(),
            requested_position: Arc::new(AtomicI64::new(-1)),
            seek_tx,
            monitor: Default::default(),
        }
    }

    fn insert(&self, range: Range<u64>) {
        // Store the download progress.
        // Overlapping ranges are automatically unified.
        //
        // Example: if we have [0..12, 15..30] already stored and we add [10..20],
        // the result becomes [0..30].
        self.downloaded_ranges.write().insert(range);
    }

    fn intersection(&self, position: u64) -> Option<Range<u64>> {
        // Returns the range that intersects the requested position
        //
        // Example: given ranges of [0..12, 15..30] and position = 17, it will return [15..30].
        self.downloaded_ranges.read().get(position)
    }

    fn next_gap(&self, content_length: u64) -> Option<Range<u64>> {
        // Finds the first gap in the range set, used for seeing what hasn't been downloaded yet.
        //
        // Example: given ranges of [0..12, 15..30], this will return [12..15]
        self.downloaded_ranges
            .read()
            .gaps(0..content_length)
            .first()
    }

    fn request_seek(&self, position: u64) {
        // We wait for the seek operation to complete each time, so this shouldn't fail
        self.seek_tx
            .try_send(position)
            .expect("failed to send seek request");
        Ok(())
    }
}

Next, we’ll add a few methods to allow us the downloader and reader to communicate. Another new data structure used here is the monitor. This is an implementation of the monitor pattern used to allow multiple threads to safely wait and signal for state changes.

Note

We use parking_lot’s synchronization primitives here because its implementation of Condvar isn’t susceptible to spurious wakeups, which simplifies the logic a bit.

struct StreamProgress {
    downloaded_ranges: Arc<RwLock<RangeSet<u64>>>,
    requested_position: Arc<AtomicI64>,
    seek_tx: mpsc::Sender<u64>,
    monitor: Arc<(Mutex<Waiter>, Condvar)>,
}

impl StreamProgress {
    fn notify_position_reached(&self) {
        // Can use relaxed ordering since we're not using these for thread synchronization
        self.requested_position.store(-1, Ordering::Relaxed);
        // Notify any waiters that we've reached the requested position
        let (mutex, cvar) = self.monitor.as_ref();
        mutex.lock().position_reached = true;
        cvar.notify_all();
    }

    fn notify_stream_done(&self) {
        let (mutex, cvar) = self.monitor.as_ref();
        mutex.lock().stream_done = true;
        cvar.notify_all();
    }

    fn wait_for_position(&self) {
        let (mutex, cvar) = self.monitor.as_ref();
        let mut waiter = mutex.lock();
        if waiter.stream_done {
            return;
        }
        // Wait for the downloader thread to call notify()
        cvar.wait_while(&mut waiter, |waiter| {
            !waiter.stream_done && !waiter.position_reached
        });

        // If we're not done downloading yet, reset the position_reached
        // variable before we unlock the mutex.
        // This will ensure any subsequent calls to this method continue to block appropriately.
        if !waiter.stream_done {
            waiter.position_reached = false;
        }
    }

    fn request_position(&self, position: u64) {
        self.requested_position
            .store(position as i64, Ordering::Relaxed);
    }

    fn requested_position(&self) -> Option<u64> {
        let val = self.requested_position.load(Ordering::Relaxed);
        if val == -1 {
            None
        } else {
            Some(val as u64)
        }
    }
}

Finally, we have the Downloader, which will download the stream content concurrently while the StreamReader reads from a temp file. The general approach here is to stream the content in small chunks, tracking the progress and responding to any seek requests along the way.

use tokio::sync::mpsc;

struct Downloader {
    url: Url,
    writer: TempFile,
    stream_progress: StreamProgress,
    seek_rx: mpsc::Receiver<u64>,
    content_length: u64,
    http_client: HttpClient,
}

impl Downloader {
    async fn new(
        url: Url,
        writer: TempFile,
        stream_progress: StreamProgress,
        seek_rx: Receiver<u64>,
    ) {
        let client = HttpClient::new();
        // The Content-Length response header tells us the size of the stream in bytes
        let content_length: u64 = client.head(url).await?.header("Content-Length")?.parse()?;
        Self {
            url,
            writer,
            stream_progress,
            seek_rx,
            content_length,
            http_client,
        }
    }

    fn content_length(&self) -> u64 {
        self.content_length
    }

    async fn download(&mut self) -> Result<()> {
        // Stream the content in small chunks over a single connection.
        // This is more efficient than sending range requests, which requires opening and closing
        // a connection for every request.
        let mut stream = self.http_client.get(self.url).await.bytes_stream();

        loop {
            // Concurrently handle seek requests while downloading
            tokio::select! {
                seek_position = self.seek_rx.recv() => {
                    stream = self.seek_range(seek_position)?;
                }
                bytes = stream.next() => {
                    let Some(bytes) = bytes else {
                        // We finished the stream, but there could be some gaps missing due to
                        // seek requests.
                        if let Some(gap) = self.stream_progress.next_gap(self.content_length) {
                            stream = self.seek_range(gap.start)?;
                        } else {
                            self.stream_progress.notify_stream_done();
                            break;
                        }
                    };
                    let position = self.writer.stream_position()?;

                    self.writer.write_all(bytes)?;
                    let new_position = position + bytes.len();
                    // Keep track of the byte ranges we've downloaded so far
                    self.stream_progress.insert(position..new_position);
                    // Check if the reader is waiting for a specific position
                    if let Some(requested_position) = self.stream_progress.requested_position() {
                        if new_position >= requested_position {
                            // Notify the reader that we've downloaded enough for
                            // the read operation to succeed.
                            self.stream_progress.notify_position_reached();
                        }
                    }
                }
            }
        }
    }

    fn seek_range(&mut self, seek_position: u64) -> Result<ByteStream> {
        // Restart the stream from the requested position.
        let stream = self
            .http_client
            .header("Range", format!("bytes={}-", seek_position))
            .get(self.url)
            .await?
            .bytes_stream();
        // Synchronize the writer with the new stream position.
        self.writer.seek(SeekFrom::Start(seek_position));
        return stream;
    }
}

And now we update our main loop to use the Downloader. We’ll start a new reader and downloader pair for every new input that comes in. This allows us to download multiple sources simultaneously.

async fn main() {
    let (queue_producer, queue_consumer) = Queue::new();
    let (ring_buffer_producer, ring_buffer_consumer) = RingBuffer::new();

    // Spawn the decoder thread.
    thread::spawn(move || decode_input(queue_consumer, ring_buffer_producer));
    // Spawn the audio thread.
    thread::spawn(move || audio_output(ring_buffer_consumer));

    // Enqueue any new URLs we receive from user input.
    while let Some(url) = next_input().await {
        let source = StreamReader::new(url).await?;
        // Add the source to the queue.
        queue_producer.enqueue(source);

        // We wait for the seek operation to complete when we send a message,
        // so a buffer size of 1 is fine here
        let (seek_tx, seek_rx) = mpsc::channel(1);
        let stream_progress = StreamProgress::new(seek_tx);
        let temp_file = TempFile::new();

        let mut downloader =
            Downloader::new(url, temp_file, stream_progress.clone(), seek_rx).await;
        let reader = StreamReader::new(temp_file, stream_progress, downloader.content_length());
        // Run the downloader and input reader concurrently
        tokio::task::spawn(downloader.download());
        // Add the source to the queue.
        queue_producer.enqueue(reader);
    }
}

There’s a few other considerations that we haven’t addressed here:

Appendix: Live Streaming and Bounded Storage

There are some use cases which don’t quite work with the previous implementation. We rely on knowing the content length to support some functionality such as seeking, but some streams don’t have a content length. Consider something like internet radio - it has no finite length and therefore no Content-Length header present in the response.

Since the stream doesn’t have an end, seeking from the end of the stream no longer makes sense. Our logic to find gaps in the downloaded content no longer works since it requires knowledge of the stream length.

There’s also a bigger issue here that we need to consider. If we leave a single stream running for a long time, the space taken by our temporary file is going to continue to grow indefinitely. On a computer with a large drive, this may be fine for any reasonable duration, but let’s say we want to stream a radio station 24/7 on something like a Raspberry Pi. Our Pi may have a small SD card, so we could eventually fill up the storage. To get around this, we could implement a ring buffer over our storage layer that will overwrite older contents as we continue to receive more data.

To solve this, we’ll need a new reader and writer implementation plus a struct that can share state between the two. Let’s start with the state:

use parking_lot::Mutex;
use std::sync::Arc;

struct RingBufState {
    read_position: usize,
    write_position: usize,
    size: usize,
}

impl RingBufState {
    // Get the read position relative to the buffer size
    // This will wrap around on overflow
    fn offset_read_position(&self, read_len: usize) -> usize {
        let current_relative_read_position = self.read_position % self.size;
        (current_relative_read_position + read_len) % self.size
    }

    // Same thing for the write position
    fn offset_write_position(&self, write_len: usize) -> usize {
        let current_relative_write_position = self.write_position % self.size;
        (current_relative_write_position + write_len) % self.size
    }
}

RingBufReader reads from a fixed-size file, wrapping around to the beginning when it reaches the end. The writer needs to supply enough data so it doesn’t re-read old segments during subsequent iterations.

use std::io::{self, SeekFrom};

struct RingBufReader {
    file: TempFile,
    state: Arc<Mutex<RingBufState>>,
}

impl io::Read for RingBufReader {
    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
        let mut state = self.state.lock();
        // We need to get the start and end read positions
        // based on the current read position, the size of our ring buffer,
        // and the size of the requested input.
        let start = state.offset_read_position(0);
        let end = state.offset_read_position(buf.len() - 1) + 1;

        self.file.seek(SeekFrom::Start(start))?;
        if start < end {
            let read_len = end - start;
            self.file.read_exact(&mut buf[..read_len])?;
        } else {
            // The buffer is non-contiguous, so we need to read the first segment
            // until we reach the end of the buffer and then wrap around
            // to the start to read the rest.
            let first_seg_len = self.size - start;
            self.file.read_exact(&mut buf[..first_seg_len])?;
            self.file.seek(SeekFrom::Start(0))?;
            self.file.read_exact(&mut buf[first_seg_len..])?;
        }

        state.read_position += buf.len();
        return buf.len();
    }
}

impl io::Seek for RingBufReader {
    fn seek(&mut self, position: SeekFrom) -> io::Result<u64> {
        let mut shared_info = self.shared_info.lock();
        let new_position = match position {
            SeekFrom::Start(position) => position as usize,
            SeekFrom::Current(from_current) => {
                (shared_info.read_position as i64 + from_current) as usize
            }
            // Seeking from the end doesn't make sense with infinite streams
            SeekFrom::End(_) => {
                return Err(io::Error::new(
                    io::ErrorKind::Unsupported,
                    "seek from end not supported",
                ));
            }
        };

        shared_info.read_position = new_position;
        Ok(new_position as u64)
    }
}

RingBufWriter is very similar, swapping out the calls to read with calls to write.

use std::io::{self, SeekFrom};

struct RingBufWriter {
    file: TempFile,
    state: Arc<Mutex<RingBufState>>,
}

impl io::Write for RingBufWriter {
    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
        let mut shared_info = self.shared_info.lock();

        // This is essentially the same logic as the read method.
        let start = self.offset_write_position(0);
        let end = self.offset_write_position(buffer.len() - 1) + 1;

        self.file.seek(SeekFrom::Start(start))?;
        if start < end {
            self.file.write_all(buf)?;
        } else {
            let first_seg_len = self.size - start;
            self.file.write_all(&buf[..first_seg_len])?;
            self.file.seek(SeekFrom::Start(0))?;
            self.file.write_all(&buf[first_seg_len..])?;
        }

        self.write_position += buf.len();
        return buf.len();
    }
}

impl io::Seek for RingBufWriter {
    fn seek(&mut self, position: SeekFrom) -> io::Result<u64> {
        let mut shared_info = self.shared_info.lock();
        let new_position = match position {
            SeekFrom::Start(position) => position as usize,
            SeekFrom::Current(from_current) => {
                (shared_info.write_position as i64 + from_current) as usize
            }
            SeekFrom::End(_) => {
                return Err(io::Error::new(
                    io::ErrorKind::Unsupported,
                    "seek from end not supported",
                ));
            }
        };

        shared_info.write_position = new_position;
        Ok(new_position as u64)
    }
}

To use it, we need a method that returns both the reader and writer.

fn ring_buf_reader_writer(buffer_size: usize) -> (RingBufReader, RingBufWriter) {
    let state = Arc::new(Mutex::new(RingBufState {
        read_position: 0,
        write_position: 0,
        size: buffer_size,
    }));
    let temp_file = TempFile::new();
    let reader = RingBufReader {
        state: state.clone(),
        file: temp_file.clone(),
    };
    let writer = RingBufWriter {
        state,
        file: temp_file,
    };

    return (reader, writer);
}

Again, there are a few issues that aren’t handled in our simple implementation:

Conclusion

This was a brief look at some of the scenarios that need to be considered when designing a media streaming application. We looked at three different approaches to handle these challenges, each with various tradeoffs between performance and complexity. For a full implementation of the final approach presented here, check out the library I created. Thanks!