carb/audio/AudioStreamerUtils.h
File members: carb/audio/AudioStreamerUtils.h
// Copyright (c) 2019-2023, NVIDIA CORPORATION. All rights reserved.
//
// NVIDIA CORPORATION and its licensors retain all intellectual property
// and proprietary rights in and to this software, related documentation
// and any modifications thereto. Any use, reproduction, disclosure or
// distribution of this software and related documentation without an express
// license agreement from NVIDIA CORPORATION is strictly prohibited.
//
#pragma once
#include "AudioUtils.h"
#include "IAudioData.h"
#include "IAudioPlayback.h"
#include "IAudioUtils.h"
#include "../Framework.h"
#include "../cpp/Atomic.h"
#include "../events/IEvents.h"
#include "../../omni/extras/DataStreamer.h"
#include <atomic>
#include <string.h>
#if CARB_PLATFORM_WINDOWS
# define strdup _strdup
#endif
namespace carb
{
namespace audio
{
class StreamerWrapper : public Streamer
{
public:
StreamerWrapper()
{
m_refCount = 1;
acquireReference = streamerAcquire;
releaseReference = streamerRelease;
openStream = streamerOpen;
writeStreamData = streamerWriteData;
closeStream = streamerClose;
}
void acquire()
{
m_refCount.fetch_add(1, std::memory_order_relaxed);
}
void release()
{
if (m_refCount.fetch_sub(1, std::memory_order_release) == 1)
{
std::atomic_thread_fence(std::memory_order_acquire);
delete this;
}
}
template <class Rep, class Period>
bool waitForClose(const std::chrono::duration<Rep, Period>& duration) noexcept
{
return m_open.wait_for(true, duration);
}
virtual bool open(SoundFormat* format) = 0;
virtual StreamState writeData(const void* data, size_t bytes) = 0;
virtual void close() = 0;
protected:
virtual ~StreamerWrapper()
{
auto refCount = m_refCount.load(std::memory_order_relaxed);
CARB_UNUSED(refCount);
CARB_ASSERT(refCount == 0,
"deleting the streamer with refcount %zd - was it destroyed by a method other than calling release()?",
refCount);
}
private:
static void CARB_ABI streamerAcquire(Streamer* self)
{
StreamerWrapper* ctxt = static_cast<StreamerWrapper*>(self);
ctxt->acquire();
}
static void CARB_ABI streamerRelease(Streamer* self)
{
StreamerWrapper* ctxt = static_cast<StreamerWrapper*>(self);
ctxt->release();
}
static bool CARB_ABI streamerOpen(Streamer* self, SoundFormat* format)
{
StreamerWrapper* ctxt = static_cast<StreamerWrapper*>(self);
ctxt->m_open = true;
return ctxt->open(format);
}
static StreamState CARB_ABI streamerWriteData(Streamer* self, const void* data, size_t bytes)
{
StreamerWrapper* ctxt = static_cast<StreamerWrapper*>(self);
return ctxt->writeData(data, bytes);
}
static void CARB_ABI streamerClose(Streamer* self)
{
StreamerWrapper* ctxt = static_cast<StreamerWrapper*>(self);
ctxt->close();
ctxt->m_open = false;
ctxt->m_open.notify_all();
}
std::atomic<size_t> m_refCount;
carb::cpp::atomic<bool> m_open{ false };
};
class OutputStreamer : public StreamerWrapper
{
public:
typedef uint32_t Flags;
static constexpr Flags fFlagRealtime = 0x00000001;
static constexpr Flags fFlagFlush = 0x00000002;
OutputStreamer(SampleFormat outputFormat = SampleFormat::eDefault, Flags flags = fFlagRealtime)
{
m_desc.flags = 0;
m_desc.filename = nullptr;
m_desc.inputFormat = SampleFormat::eDefault;
m_desc.outputFormat = outputFormat;
m_desc.frameRate = 0;
m_desc.channels = 0;
m_desc.encoderSettings = nullptr;
m_desc.ext = nullptr;
m_filename = nullptr;
m_encoderSettings = nullptr;
m_stream = nullptr;
m_utils = nullptr;
m_flags = flags;
}
OutputStreamDesc* getDescriptor()
{
return &m_desc;
}
void setFlags(Flags flags)
{
m_flags = flags;
}
Flags getFlags() const
{
return m_flags;
}
void setOutputFormat(SampleFormat format)
{
m_desc.outputFormat = format;
}
void setFilename(const char* filename)
{
char* temp;
temp = strdup(filename);
if (temp == nullptr)
return;
if (m_filename != nullptr)
free(m_filename);
m_filename = temp;
m_desc.filename = m_filename;
}
const char* getFilename() const
{
return m_filename;
}
void setEncoderSettings(const void* settings, size_t sizeInBytes)
{
void* temp;
if (settings == nullptr)
{
if (m_encoderSettings != nullptr)
free(m_encoderSettings);
m_encoderSettings = nullptr;
m_desc.encoderSettings = nullptr;
return;
}
temp = malloc(sizeInBytes);
if (temp == nullptr)
return;
if (m_encoderSettings != nullptr)
free(m_encoderSettings);
memcpy(temp, settings, sizeInBytes);
m_encoderSettings = temp;
m_desc.encoderSettings = m_encoderSettings;
}
bool open(SoundFormat* format) override
{
m_utils = getFramework()->acquireInterface<carb::audio::IAudioUtils>();
CARB_ASSERT(m_utils != nullptr, "the IAudioData interface was not successfully acquired!");
CARB_ASSERT(m_desc.filename != nullptr, "call setFilename() first!");
// update the output stream descriptor with the given format information and flags.
if ((m_flags & fFlagFlush) != 0)
m_desc.flags |= fStreamFlagFlushAfterWrite;
m_desc.channels = format->channels;
m_desc.frameRate = format->frameRate;
m_desc.inputFormat = format->format;
m_stream = m_utils->openOutputStream(getDescriptor());
return m_stream != nullptr;
}
StreamState writeData(const void* data, size_t bytes) override
{
CARB_ASSERT(m_utils != nullptr);
CARB_ASSERT(m_stream != nullptr);
m_utils->writeDataToStream(m_stream, data, bytesToFrames(bytes, m_desc.channels, m_desc.inputFormat));
return (m_flags & fFlagRealtime) != 0 ? StreamState::eNormal : StreamState::eCritical;
}
void close() override
{
CARB_ASSERT(m_utils != nullptr);
if (m_stream == nullptr)
return;
m_utils->closeOutputStream(m_stream);
m_stream = nullptr;
}
protected:
~OutputStreamer() override
{
if (m_stream != nullptr)
m_utils->closeOutputStream(m_stream);
if (m_filename != nullptr)
free(m_filename);
if (m_encoderSettings != nullptr)
free(m_encoderSettings);
}
private:
char* m_filename;
void* m_encoderSettings;
Flags m_flags;
OutputStreamDesc m_desc;
OutputStream* m_stream;
IAudioUtils* m_utils;
};
class NullStreamer : public StreamerWrapper
{
public:
NullStreamer()
{
}
bool open(SoundFormat* format) override
{
CARB_UNUSED(format);
return true;
}
StreamState writeData(const void* data, size_t bytes) override
{
CARB_UNUSED(data, bytes);
return m_state;
}
void close() override
{
}
void setStreamState(StreamState state)
{
m_state = state;
}
protected:
~NullStreamer() override
{
}
StreamState m_state = StreamState::eNormal;
};
constexpr carb::events::EventType kAudioStreamEventOpen = 1;
constexpr carb::events::EventType kAudioStreamEventClose = 2;
constexpr int32_t kEventStreamVersion = 1;
class EventListener : omni::extras::DataListener
{
public:
EventListener(carb::events::IEventStreamPtr p,
std::function<void(const carb::audio::SoundFormat* fmt)> open,
std::function<void(const void* data, size_t bytes)> writeData,
std::function<void()> close)
: omni::extras::DataListener(p)
{
OMNI_ASSERT(open, "this callback is not optional");
OMNI_ASSERT(writeData, "this callback is not optional");
OMNI_ASSERT(close, "this callback is not optional");
m_openCallback = open;
m_writeDataCallback = writeData;
m_closeCallback = close;
}
protected:
void onDataReceived(const void* payload, size_t bytes, omni::extras::DataStreamType type) noexcept override
{
CARB_UNUSED(type);
if (m_open)
{
m_writeDataCallback(payload, bytes);
}
}
void onEventReceived(const carb::events::IEvent* e) noexcept override
{
carb::audio::SoundFormat fmt = {};
auto getIntVal = [this](const carb::dictionary::Item* root, const char* name) -> size_t {
const carb::dictionary::Item* child = m_dict->getItem(root, name);
return (child == nullptr) ? 0 : m_dict->getAsInt64(child);
};
switch (e->type)
{
case kAudioStreamEventOpen:
{
int32_t ver = int32_t(getIntVal(e->payload, "version"));
if (ver != kEventStreamVersion)
{
CARB_LOG_ERROR("EventListener version %" PRId32 " tried to attach to data stream version %" PRId32,
kEventStreamVersion, ver);
disconnect();
return;
}
fmt.channels = getIntVal(e->payload, "channels");
fmt.bitsPerSample = getIntVal(e->payload, "bitsPerSample");
fmt.frameSize = getIntVal(e->payload, "frameSize");
fmt.blockSize = getIntVal(e->payload, "blockSize");
fmt.framesPerBlock = getIntVal(e->payload, "framesPerBlock");
fmt.frameRate = getIntVal(e->payload, "frameRate");
fmt.channelMask = getIntVal(e->payload, "channelMask");
fmt.validBitsPerSample = getIntVal(e->payload, "validBitsPerSample");
fmt.format = SampleFormat(getIntVal(e->payload, "format"));
m_openCallback(&fmt);
m_open = true;
break;
}
case kAudioStreamEventClose:
if (m_open)
{
m_closeCallback();
m_open = false;
}
break;
default:
OMNI_LOG_ERROR("unknown event received %zd", size_t(e->type));
}
}
private:
std::function<void(const carb::audio::SoundFormat* fmt)> m_openCallback;
std::function<void(const void* data, size_t bytes)> m_writeDataCallback;
std::function<void()> m_closeCallback;
bool m_open = false;
};
class EventStreamer : public StreamerWrapper
{
public:
EventStreamer()
{
}
bool isWorking() noexcept
{
return m_streamer.isWorking();
}
void setFormat(const SoundFormat* format) noexcept
{
if (format != nullptr)
{
m_desiredFormat = *format;
}
else
{
m_desiredFormat = {};
}
}
EventListener* createListener(std::function<void(const carb::audio::SoundFormat* fmt)> open,
std::function<void(const void* data, size_t bytes)> writeData,
std::function<void()> close)
{
return new (std::nothrow) EventListener(m_streamer.getEventStream(), open, writeData, close);
}
carb::events::IEventStreamPtr getEventStream() noexcept
{
return m_streamer.getEventStream();
}
void flush() noexcept
{
m_streamer.flush();
}
private:
bool open(carb::audio::SoundFormat* format) noexcept override
{
if (!m_streamer.isWorking())
{
return false;
}
if (m_desiredFormat.channels != 0)
{
format->channels = m_desiredFormat.channels;
format->channelMask = kSpeakerModeDefault;
}
if (m_desiredFormat.frameRate != 0)
{
format->frameRate = m_desiredFormat.frameRate;
}
if (m_desiredFormat.channelMask != kSpeakerModeDefault)
{
format->channelMask = m_desiredFormat.channelMask;
}
if (m_desiredFormat.format != SampleFormat::eDefault)
{
format->format = m_desiredFormat.format;
}
m_streamer.getEventStream()->push(kAudioStreamEventOpen, std::make_pair("version", kEventStreamVersion),
std::make_pair("channels", int64_t(format->channels)),
std::make_pair("bitsPerSample", int64_t(format->bitsPerSample)),
std::make_pair("frameSize", int64_t(format->frameSize)),
std::make_pair("blockSize", int64_t(format->blockSize)),
std::make_pair("framesPerBlock", int64_t(format->framesPerBlock)),
std::make_pair("frameRate", int64_t(format->frameRate)),
std::make_pair("channelMask", int64_t(format->channelMask)),
std::make_pair("validBitsPerSample", int64_t(format->validBitsPerSample)),
std::make_pair("format", int32_t(format->format)));
m_streamer.pumpAsync();
return true;
}
void close() noexcept override
{
if (!m_streamer.isWorking())
{
return;
}
m_streamer.getEventStream()->push(kAudioStreamEventClose);
m_streamer.pumpAsync();
}
StreamState writeData(const void* data, size_t bytes) noexcept override
{
if (!m_streamer.isWorking())
{
return StreamState::eNormal;
}
// just push as bytes here, we'll clean up the type later
m_streamer.pushData(static_cast<const uint8_t*>(data), bytes);
m_streamer.pumpAsync();
return StreamState::eNormal;
}
SoundFormat m_desiredFormat = {};
omni::extras::DataStreamer m_streamer;
};
} // namespace audio
} // namespace carb