carb/audio/AudioStreamerUtils.h

File members: carb/audio/AudioStreamerUtils.h

// Copyright (c) 2019-2023, NVIDIA CORPORATION. All rights reserved.
//
// NVIDIA CORPORATION and its licensors retain all intellectual property
// and proprietary rights in and to this software, related documentation
// and any modifications thereto. Any use, reproduction, disclosure or
// distribution of this software and related documentation without an express
// license agreement from NVIDIA CORPORATION is strictly prohibited.
//
#pragma once

#include "AudioUtils.h"
#include "IAudioData.h"
#include "IAudioPlayback.h"
#include "IAudioUtils.h"
#include "../Framework.h"
#include "../cpp/Atomic.h"
#include "../events/IEvents.h"
#include "../../omni/extras/DataStreamer.h"

#include <atomic>
#include <string.h>

#if CARB_PLATFORM_WINDOWS
#    define strdup _strdup
#endif

namespace carb
{
namespace audio
{

class StreamerWrapper : public Streamer
{
public:
    StreamerWrapper()
    {
        m_refCount = 1;
        acquireReference = streamerAcquire;
        releaseReference = streamerRelease;
        openStream = streamerOpen;
        writeStreamData = streamerWriteData;
        closeStream = streamerClose;
    }

    void acquire()
    {
        m_refCount.fetch_add(1, std::memory_order_relaxed);
    }

    void release()
    {
        if (m_refCount.fetch_sub(1, std::memory_order_release) == 1)
        {
            std::atomic_thread_fence(std::memory_order_acquire);
            delete this;
        }
    }

    template <class Rep, class Period>
    bool waitForClose(const std::chrono::duration<Rep, Period>& duration) noexcept
    {
        return m_open.wait_for(true, duration);
    }

    virtual bool open(SoundFormat* format) = 0;

    virtual StreamState writeData(const void* data, size_t bytes) = 0;

    virtual void close() = 0;

protected:
    virtual ~StreamerWrapper()
    {
        auto refCount = m_refCount.load(std::memory_order_relaxed);
        CARB_UNUSED(refCount);
        CARB_ASSERT(refCount == 0,
                    "deleting the streamer with refcount %zd - was it destroyed by a method other than calling release()?",
                    refCount);
    }

private:
    static void CARB_ABI streamerAcquire(Streamer* self)
    {
        StreamerWrapper* ctxt = static_cast<StreamerWrapper*>(self);
        ctxt->acquire();
    }

    static void CARB_ABI streamerRelease(Streamer* self)
    {
        StreamerWrapper* ctxt = static_cast<StreamerWrapper*>(self);
        ctxt->release();
    }

    static bool CARB_ABI streamerOpen(Streamer* self, SoundFormat* format)
    {
        StreamerWrapper* ctxt = static_cast<StreamerWrapper*>(self);
        ctxt->m_open = true;
        return ctxt->open(format);
    }

    static StreamState CARB_ABI streamerWriteData(Streamer* self, const void* data, size_t bytes)
    {
        StreamerWrapper* ctxt = static_cast<StreamerWrapper*>(self);
        return ctxt->writeData(data, bytes);
    }

    static void CARB_ABI streamerClose(Streamer* self)
    {
        StreamerWrapper* ctxt = static_cast<StreamerWrapper*>(self);
        ctxt->close();
        ctxt->m_open = false;
        ctxt->m_open.notify_all();
    }

    std::atomic<size_t> m_refCount;

    carb::cpp::atomic<bool> m_open{ false };
};

class OutputStreamer : public StreamerWrapper
{
public:
    typedef uint32_t Flags;

    static constexpr Flags fFlagRealtime = 0x00000001;

    static constexpr Flags fFlagFlush = 0x00000002;

    OutputStreamer(SampleFormat outputFormat = SampleFormat::eDefault, Flags flags = fFlagRealtime)
    {
        m_desc.flags = 0;
        m_desc.filename = nullptr;
        m_desc.inputFormat = SampleFormat::eDefault;
        m_desc.outputFormat = outputFormat;
        m_desc.frameRate = 0;
        m_desc.channels = 0;
        m_desc.encoderSettings = nullptr;
        m_desc.ext = nullptr;
        m_filename = nullptr;
        m_encoderSettings = nullptr;
        m_stream = nullptr;
        m_utils = nullptr;
        m_flags = flags;
    }

    OutputStreamDesc* getDescriptor()
    {
        return &m_desc;
    }

    void setFlags(Flags flags)
    {
        m_flags = flags;
    }

    Flags getFlags() const
    {
        return m_flags;
    }

    void setOutputFormat(SampleFormat format)
    {
        m_desc.outputFormat = format;
    }

    void setFilename(const char* filename)
    {
        char* temp;

        temp = strdup(filename);

        if (temp == nullptr)
            return;

        if (m_filename != nullptr)
            free(m_filename);

        m_filename = temp;
        m_desc.filename = m_filename;
    }

    const char* getFilename() const
    {
        return m_filename;
    }

    void setEncoderSettings(const void* settings, size_t sizeInBytes)
    {
        void* temp;

        if (settings == nullptr)
        {
            if (m_encoderSettings != nullptr)
                free(m_encoderSettings);

            m_encoderSettings = nullptr;
            m_desc.encoderSettings = nullptr;
            return;
        }

        temp = malloc(sizeInBytes);

        if (temp == nullptr)
            return;

        if (m_encoderSettings != nullptr)
            free(m_encoderSettings);

        memcpy(temp, settings, sizeInBytes);
        m_encoderSettings = temp;
        m_desc.encoderSettings = m_encoderSettings;
    }

    bool open(SoundFormat* format) override
    {
        m_utils = getFramework()->acquireInterface<carb::audio::IAudioUtils>();
        CARB_ASSERT(m_utils != nullptr, "the IAudioData interface was not successfully acquired!");
        CARB_ASSERT(m_desc.filename != nullptr, "call setFilename() first!");

        // update the output stream descriptor with the given format information and flags.
        if ((m_flags & fFlagFlush) != 0)
            m_desc.flags |= fStreamFlagFlushAfterWrite;

        m_desc.channels = format->channels;
        m_desc.frameRate = format->frameRate;
        m_desc.inputFormat = format->format;

        m_stream = m_utils->openOutputStream(getDescriptor());
        return m_stream != nullptr;
    }

    StreamState writeData(const void* data, size_t bytes) override
    {
        CARB_ASSERT(m_utils != nullptr);
        CARB_ASSERT(m_stream != nullptr);
        m_utils->writeDataToStream(m_stream, data, bytesToFrames(bytes, m_desc.channels, m_desc.inputFormat));
        return (m_flags & fFlagRealtime) != 0 ? StreamState::eNormal : StreamState::eCritical;
    }

    void close() override
    {
        CARB_ASSERT(m_utils != nullptr);

        if (m_stream == nullptr)
            return;

        m_utils->closeOutputStream(m_stream);
        m_stream = nullptr;
    }

protected:
    ~OutputStreamer() override
    {
        if (m_stream != nullptr)
            m_utils->closeOutputStream(m_stream);

        if (m_filename != nullptr)
            free(m_filename);

        if (m_encoderSettings != nullptr)
            free(m_encoderSettings);
    }

private:
    char* m_filename;

    void* m_encoderSettings;

    Flags m_flags;

    OutputStreamDesc m_desc;

    OutputStream* m_stream;

    IAudioUtils* m_utils;
};

class NullStreamer : public StreamerWrapper
{
public:
    NullStreamer()
    {
    }

    bool open(SoundFormat* format) override
    {
        CARB_UNUSED(format);
        return true;
    }

    StreamState writeData(const void* data, size_t bytes) override
    {
        CARB_UNUSED(data, bytes);
        return m_state;
    }

    void close() override
    {
    }

    void setStreamState(StreamState state)
    {
        m_state = state;
    }

protected:
    ~NullStreamer() override
    {
    }

    StreamState m_state = StreamState::eNormal;
};

constexpr carb::events::EventType kAudioStreamEventOpen = 1;

constexpr carb::events::EventType kAudioStreamEventClose = 2;

constexpr int32_t kEventStreamVersion = 1;

class EventListener : omni::extras::DataListener
{
public:
    EventListener(carb::events::IEventStreamPtr p,
                  std::function<void(const carb::audio::SoundFormat* fmt)> open,
                  std::function<void(const void* data, size_t bytes)> writeData,
                  std::function<void()> close)
        : omni::extras::DataListener(p)
    {
        OMNI_ASSERT(open, "this callback is not optional");
        OMNI_ASSERT(writeData, "this callback is not optional");
        OMNI_ASSERT(close, "this callback is not optional");
        m_openCallback = open;
        m_writeDataCallback = writeData;
        m_closeCallback = close;
    }

protected:
    void onDataReceived(const void* payload, size_t bytes, omni::extras::DataStreamType type) noexcept override
    {
        CARB_UNUSED(type);
        if (m_open)
        {
            m_writeDataCallback(payload, bytes);
        }
    }

    void onEventReceived(const carb::events::IEvent* e) noexcept override
    {
        carb::audio::SoundFormat fmt = {};
        auto getIntVal = [this](const carb::dictionary::Item* root, const char* name) -> size_t {
            const carb::dictionary::Item* child = m_dict->getItem(root, name);
            return (child == nullptr) ? 0 : m_dict->getAsInt64(child);
        };
        switch (e->type)
        {
            case kAudioStreamEventOpen:
            {
                int32_t ver = int32_t(getIntVal(e->payload, "version"));
                if (ver != kEventStreamVersion)
                {
                    CARB_LOG_ERROR("EventListener version %" PRId32 " tried to attach to data stream version  %" PRId32,
                                   kEventStreamVersion, ver);
                    disconnect();
                    return;
                }

                fmt.channels = getIntVal(e->payload, "channels");
                fmt.bitsPerSample = getIntVal(e->payload, "bitsPerSample");
                fmt.frameSize = getIntVal(e->payload, "frameSize");
                fmt.blockSize = getIntVal(e->payload, "blockSize");
                fmt.framesPerBlock = getIntVal(e->payload, "framesPerBlock");
                fmt.frameRate = getIntVal(e->payload, "frameRate");
                fmt.channelMask = getIntVal(e->payload, "channelMask");
                fmt.validBitsPerSample = getIntVal(e->payload, "validBitsPerSample");
                fmt.format = SampleFormat(getIntVal(e->payload, "format"));
                m_openCallback(&fmt);
                m_open = true;
                break;
            }

            case kAudioStreamEventClose:
                if (m_open)
                {
                    m_closeCallback();
                    m_open = false;
                }
                break;

            default:
                OMNI_LOG_ERROR("unknown event received %zd", size_t(e->type));
        }
    }

private:
    std::function<void(const carb::audio::SoundFormat* fmt)> m_openCallback;
    std::function<void(const void* data, size_t bytes)> m_writeDataCallback;
    std::function<void()> m_closeCallback;
    bool m_open = false;
};

class EventStreamer : public StreamerWrapper
{
public:
    EventStreamer()
    {
    }

    bool isWorking() noexcept
    {
        return m_streamer.isWorking();
    }

    void setFormat(const SoundFormat* format) noexcept
    {
        if (format != nullptr)
        {
            m_desiredFormat = *format;
        }
        else
        {
            m_desiredFormat = {};
        }
    }

    EventListener* createListener(std::function<void(const carb::audio::SoundFormat* fmt)> open,
                                  std::function<void(const void* data, size_t bytes)> writeData,
                                  std::function<void()> close)
    {
        return new (std::nothrow) EventListener(m_streamer.getEventStream(), open, writeData, close);
    }

    carb::events::IEventStreamPtr getEventStream() noexcept
    {
        return m_streamer.getEventStream();
    }

    void flush() noexcept
    {
        m_streamer.flush();
    }

private:
    bool open(carb::audio::SoundFormat* format) noexcept override
    {
        if (!m_streamer.isWorking())
        {
            return false;
        }

        if (m_desiredFormat.channels != 0)
        {
            format->channels = m_desiredFormat.channels;
            format->channelMask = kSpeakerModeDefault;
        }
        if (m_desiredFormat.frameRate != 0)
        {
            format->frameRate = m_desiredFormat.frameRate;
        }
        if (m_desiredFormat.channelMask != kSpeakerModeDefault)
        {
            format->channelMask = m_desiredFormat.channelMask;
        }
        if (m_desiredFormat.format != SampleFormat::eDefault)
        {
            format->format = m_desiredFormat.format;
        }

        m_streamer.getEventStream()->push(kAudioStreamEventOpen, std::make_pair("version", kEventStreamVersion),
                                          std::make_pair("channels", int64_t(format->channels)),
                                          std::make_pair("bitsPerSample", int64_t(format->bitsPerSample)),
                                          std::make_pair("frameSize", int64_t(format->frameSize)),
                                          std::make_pair("blockSize", int64_t(format->blockSize)),
                                          std::make_pair("framesPerBlock", int64_t(format->framesPerBlock)),
                                          std::make_pair("frameRate", int64_t(format->frameRate)),
                                          std::make_pair("channelMask", int64_t(format->channelMask)),
                                          std::make_pair("validBitsPerSample", int64_t(format->validBitsPerSample)),
                                          std::make_pair("format", int32_t(format->format)));
        m_streamer.pumpAsync();
        return true;
    }

    void close() noexcept override
    {
        if (!m_streamer.isWorking())
        {
            return;
        }
        m_streamer.getEventStream()->push(kAudioStreamEventClose);
        m_streamer.pumpAsync();
    }

    StreamState writeData(const void* data, size_t bytes) noexcept override
    {
        if (!m_streamer.isWorking())
        {
            return StreamState::eNormal;
        }
        // just push as bytes here, we'll clean up the type later
        m_streamer.pushData(static_cast<const uint8_t*>(data), bytes);
        m_streamer.pumpAsync();
        return StreamState::eNormal;
    }

    SoundFormat m_desiredFormat = {};

    omni::extras::DataStreamer m_streamer;
};

} // namespace audio
} // namespace carb