Skip to content

Commit

Permalink
roc_pipeline: Refine configs organization
Browse files Browse the repository at this point in the history
  • Loading branch information
gavv committed Jan 13, 2019
1 parent 4928b4a commit c2cd1dc
Show file tree
Hide file tree
Showing 15 changed files with 134 additions and 121 deletions.
6 changes: 3 additions & 3 deletions src/lib/src/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ bool config_receiver(pipeline::ReceiverConfig& out, const roc_receiver_config& i
}

if (in.sample_rate) {
out.sample_rate = in.sample_rate;
out.output.sample_rate = in.sample_rate;
}

switch ((unsigned)in.fec_scheme) {
Expand All @@ -137,8 +137,8 @@ bool config_receiver(pipeline::ReceiverConfig& out, const roc_receiver_config& i
out.default_session.fec.n_repair_packets = in.n_repair_packets;
}

out.default_session.resampling = !(in.flags & ROC_FLAG_DISABLE_RESAMPLER);
out.timing = (in.flags & ROC_FLAG_ENABLE_TIMER);
out.output.resampling = !(in.flags & ROC_FLAG_DISABLE_RESAMPLER);
out.output.timing = (in.flags & ROC_FLAG_ENABLE_TIMER);

return true;
}
Expand Down
4 changes: 4 additions & 0 deletions src/lib/src/private.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,12 @@ struct roc_sender {
roc_context& context;

roc::rtp::FormatMap format_map;

roc::pipeline::SenderConfig config;

roc::pipeline::PortConfig source_port;
roc::pipeline::PortConfig repair_port;

roc::core::UniquePtr<roc::pipeline::Sender> sender;
roc::packet::IWriter* writer;

Expand Down
2 changes: 1 addition & 1 deletion src/lib/src/receiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ roc_receiver::roc_receiver(roc_context& ctx, pipeline::ReceiverConfig& cfg)
context.byte_buffer_pool,
context.sample_buffer_pool,
context.allocator)
, num_channels(packet::num_channels(cfg.channels)) {
, num_channels(packet::num_channels(cfg.output.channels)) {
}

roc_receiver* roc_receiver_open(roc_context* context, const roc_receiver_config* config) {
Expand Down
19 changes: 10 additions & 9 deletions src/lib/src/sender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ namespace {
bool init_pipeline(roc_sender* sender) {
sender->sender.reset(
new (sender->context.allocator) pipeline::Sender(
sender->config, *sender->writer, *sender->writer, sender->format_map,
sender->context.packet_pool, sender->context.byte_buffer_pool,
sender->context.sample_buffer_pool, sender->context.allocator),
sender->config, sender->source_port, *sender->writer, sender->repair_port,
*sender->writer, sender->format_map, sender->context.packet_pool,
sender->context.byte_buffer_pool, sender->context.sample_buffer_pool,
sender->context.allocator),
sender->context.allocator);

if (!sender->sender) {
Expand All @@ -45,12 +46,12 @@ bool init_port(roc_sender* sender, const pipeline::PortConfig& pconfig) {
case pipeline::Proto_RTP:
case pipeline::Proto_RTP_RSm8_Source:
case pipeline::Proto_RTP_LDPC_Source:
if (sender->config.source_port.protocol != pipeline::Proto_None) {
if (sender->source_port.protocol != pipeline::Proto_None) {
roc_log(LogError, "roc_sender: source port is already connected");
return false;
}

sender->config.source_port = pconfig;
sender->source_port = pconfig;

roc_log(LogInfo, "roc_sender: connected source port to %s %s",
packet::address_to_str(pconfig.address).c_str(),
Expand All @@ -60,7 +61,7 @@ bool init_port(roc_sender* sender, const pipeline::PortConfig& pconfig) {

case pipeline::Proto_RSm8_Repair:
case pipeline::Proto_LDPC_Repair:
if (sender->config.repair_port.protocol != pipeline::Proto_None) {
if (sender->repair_port.protocol != pipeline::Proto_None) {
roc_log(LogError, "roc_sender: repair port is already connected");
return false;
}
Expand All @@ -71,7 +72,7 @@ bool init_port(roc_sender* sender, const pipeline::PortConfig& pconfig) {
return false;
}

sender->config.repair_port = pconfig;
sender->repair_port = pconfig;

roc_log(LogInfo, "roc_sender: connected repair port to %s %s",
packet::address_to_str(pconfig.address).c_str(),
Expand All @@ -85,12 +86,12 @@ bool init_port(roc_sender* sender, const pipeline::PortConfig& pconfig) {
}

bool check_connected(roc_sender* sender) {
if (sender->config.source_port.protocol == pipeline::Proto_None) {
if (sender->source_port.protocol == pipeline::Proto_None) {
roc_log(LogError, "roc_sender: source port is not connected");
return false;
}

if (sender->config.repair_port.protocol == pipeline::Proto_None
if (sender->repair_port.protocol == pipeline::Proto_None
&& sender->config.fec.codec != fec::NoCodec) {
roc_log(LogError, "roc_sender: repair port is not connected");
return false;
Expand Down
52 changes: 27 additions & 25 deletions src/modules/roc_pipeline/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,10 @@ struct PortConfig {
}
};

//! Session parameters.
//! Receiver session parameters.
//! @remarks
//! Defines per-session parameters on the receiver side.
struct SessionConfig {
struct ReceiverSessionConfig {
//! Channel mask.
packet::channel_mask_t channels;

Expand All @@ -107,59 +107,61 @@ struct SessionConfig {
//! Resampler parameters.
audio::ResamplerConfig resampler;

//! Perform resampling to to compensate sender and receiver frequency difference.
bool resampling;

//! Insert weird beeps instead of silence on packet loss.
bool beeping;

SessionConfig()
ReceiverSessionConfig()
: channels(DefaultChannelMask)
, samples_per_packet(DefaultPacketSize)
, latency(DefaultPacketSize * 27)
, watchdog(DefaultSampleRate)
, resampling(false)
, beeping(false) {
, watchdog(DefaultSampleRate) {
latency_monitor.min_latency =
(packet::signed_timestamp_t)latency * DefaultMinLatency;
latency_monitor.max_latency =
(packet::signed_timestamp_t)latency * DefaultMaxLatency;
}
};

//! Receiver parameters.
struct ReceiverConfig {
//! Default parameters for session.
SessionConfig default_session;

//! Receiver output parameters.
//! @remarks
//! Defines common output parameters on the receiver side.
struct ReceiverOutputConfig {
//! Number of samples per second per channel.
size_t sample_rate;

//! Channel mask.
packet::channel_mask_t channels;

//! Perform resampling to compensate sender and receiver frequency difference.
bool resampling;

//! Constrain receiver speed using a CPU timer according to the sample rate.
bool timing;

//! Fill uninitialized data with large values to make them more noticeable.
bool poisoning;

ReceiverConfig()
//! Insert weird beeps instead of silence on packet loss.
bool beeping;

ReceiverOutputConfig()
: sample_rate(DefaultSampleRate)
, channels(DefaultChannelMask)
, resampling(false)
, timing(false)
, poisoning(false) {
, poisoning(false)
, beeping(false) {
}
};

//! Sender parameters.
struct SenderConfig {
//! Parameters for the port to which source packets are sent.
PortConfig source_port;
//! Receiver parameters.
struct ReceiverConfig {
//! Default parameters for receiver session.
ReceiverSessionConfig default_session;

//! Parameters for the port to which repair packets are sent.
PortConfig repair_port;
//! Parameters for receiver output.
ReceiverOutputConfig output;
};

//! Sender parameters.
struct SenderConfig {
//! Resampler parameters.
audio::ResamplerConfig resampler;

Expand Down
15 changes: 7 additions & 8 deletions src/modules/roc_pipeline/receiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,19 @@ Receiver::Receiver(const ReceiverConfig& config,
, byte_buffer_pool_(byte_buffer_pool)
, sample_buffer_pool_(sample_buffer_pool)
, allocator_(allocator)
, ticker_(config.sample_rate)
, ticker_(config.output.sample_rate)
, audio_reader_(NULL)
, config_(config)
, timestamp_(0)
, num_channels_(packet::num_channels(config.channels))
, num_channels_(packet::num_channels(config.output.channels))
, active_cond_(control_mutex_) {
mixer_.reset(new (allocator_) audio::Mixer(sample_buffer_pool), allocator_);
if (!mixer_) {
return;
}
audio::IReader* areader = mixer_.get();

if (config.poisoning) {
if (config.output.poisoning) {
poisoner_.reset(new (allocator_) audio::PoisonReader(*areader), allocator_);
if (!poisoner_) {
return;
Expand Down Expand Up @@ -98,7 +98,7 @@ void Receiver::write(const packet::PacketPtr& packet) {
void Receiver::read(audio::Frame& frame) {
core::Mutex::Lock lock(pipeline_mutex_);

if (config_.timing) {
if (config_.output.timing) {
ticker_.wait(timestamp_);
}

Expand Down Expand Up @@ -212,10 +212,9 @@ bool Receiver::create_session_(const packet::PacketPtr& packet) {

const packet::Address src_address = packet->udp()->src_addr;

core::SharedPtr<ReceiverSession> sess = new (allocator_)
ReceiverSession(config_.default_session, packet->rtp()->payload_type,
config_.sample_rate, config_.poisoning, src_address, format_map_,
packet_pool_, byte_buffer_pool_, sample_buffer_pool_, allocator_);
core::SharedPtr<ReceiverSession> sess = new (allocator_) ReceiverSession(
config_.default_session, config_.output, packet->rtp()->payload_type, src_address,
format_map_, packet_pool_, byte_buffer_pool_, sample_buffer_pool_, allocator_);

if (!sess || !sess->valid()) {
roc_log(LogError, "receiver: can't create session, initialization failed");
Expand Down
54 changes: 28 additions & 26 deletions src/modules/roc_pipeline/receiver_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@
namespace roc {
namespace pipeline {

ReceiverSession::ReceiverSession(const SessionConfig& config,
ReceiverSession::ReceiverSession(const ReceiverSessionConfig& session_config,
const ReceiverOutputConfig& output_config,
const unsigned int payload_type,
const size_t out_sample_rate,
bool poisoning,
const packet::Address& src_address,
const rtp::FormatMap& format_map,
packet::PacketPool& packet_pool,
Expand Down Expand Up @@ -53,23 +52,24 @@ ReceiverSession::ReceiverSession(const SessionConfig& config,

packet::IReader* preader = source_queue_.get();

delayed_reader_.reset(
new (allocator_) packet::DelayedReader(*preader, config.latency), allocator_);
delayed_reader_.reset(new (allocator_)
packet::DelayedReader(*preader, session_config.latency),
allocator_);
if (!delayed_reader_) {
return;
}
preader = delayed_reader_.get();

validator_.reset(new (allocator_)
rtp::Validator(*preader, *format, config.rtp_validator),
rtp::Validator(*preader, *format, session_config.rtp_validator),
allocator_);
if (!validator_) {
return;
}
preader = validator_.get();

#ifdef ROC_TARGET_OPENFEC
if (config.fec.codec != fec::NoCodec) {
if (session_config.fec.codec != fec::NoCodec) {
repair_queue_.reset(new (allocator_) packet::SortedQueue(0), allocator_);
if (!repair_queue_) {
return;
Expand All @@ -79,9 +79,9 @@ ReceiverSession::ReceiverSession(const SessionConfig& config,
}

core::UniquePtr<fec::OFDecoder> fec_decoder(
new (allocator_)
fec::OFDecoder(config.fec, format->size(config.samples_per_packet),
byte_buffer_pool, allocator_),
new (allocator_) fec::OFDecoder(
session_config.fec, format->size(session_config.samples_per_packet),
byte_buffer_pool, allocator_),
allocator_);
if (!fec_decoder || !fec_decoder->valid()) {
return;
Expand All @@ -94,16 +94,16 @@ ReceiverSession::ReceiverSession(const SessionConfig& config,
}

fec_reader_.reset(new (allocator_) fec::Reader(
config.fec, *fec_decoder_, *preader, *repair_queue_,
session_config.fec, *fec_decoder_, *preader, *repair_queue_,
*fec_parser_, packet_pool, allocator_),
allocator_);
if (!fec_reader_ || !fec_reader_->valid()) {
return;
}
preader = fec_reader_.get();

fec_validator_.reset(new (allocator_)
rtp::Validator(*preader, *format, config.rtp_validator),
fec_validator_.reset(new (allocator_) rtp::Validator(
*preader, *format, session_config.rtp_validator),
allocator_);
if (!fec_validator_) {
return;
Expand All @@ -117,29 +117,31 @@ ReceiverSession::ReceiverSession(const SessionConfig& config,
return;
}

depacketizer_.reset(new (allocator_) audio::Depacketizer(
*preader, *decoder_, config.channels, config.beeping),
depacketizer_.reset(new (allocator_) audio::Depacketizer(*preader, *decoder_,
session_config.channels,
output_config.beeping),
allocator_);
if (!depacketizer_) {
return;
}

audio::IReader* areader = depacketizer_.get();

if (config.watchdog.silence_timeout != 0 || config.watchdog.drops_timeout != 0
|| config.watchdog.frame_status_window != 0) {
if (session_config.watchdog.silence_timeout != 0
|| session_config.watchdog.drops_timeout != 0
|| session_config.watchdog.frame_status_window != 0) {
watchdog_.reset(new (allocator_) audio::Watchdog(
*areader, packet::num_channels(config.channels),
config.watchdog, allocator_),
*areader, packet::num_channels(session_config.channels),
session_config.watchdog, allocator_),
allocator_);
if (!watchdog_ || !watchdog_->valid()) {
return;
}
areader = watchdog_.get();
}

if (config.resampling) {
if (poisoning) {
if (output_config.resampling) {
if (output_config.poisoning) {
resampler_poisoner_.reset(new (allocator_) audio::PoisonReader(*areader),
allocator_);
if (!resampler_poisoner_) {
Expand All @@ -148,16 +150,16 @@ ReceiverSession::ReceiverSession(const SessionConfig& config,
areader = resampler_poisoner_.get();
}
resampler_.reset(new (allocator_) audio::ResamplerReader(
*areader, sample_buffer_pool, allocator, config.resampler,
config.channels),
*areader, sample_buffer_pool, allocator,
session_config.resampler, session_config.channels),
allocator_);
if (!resampler_ || !resampler_->valid()) {
return;
}
areader = resampler_.get();
}

if (poisoning) {
if (output_config.poisoning) {
session_poisoner_.reset(new (allocator_) audio::PoisonReader(*areader),
allocator_);
if (!session_poisoner_) {
Expand All @@ -168,8 +170,8 @@ ReceiverSession::ReceiverSession(const SessionConfig& config,

latency_monitor_.reset(new (allocator_) audio::LatencyMonitor(
*source_queue_, *depacketizer_, resampler_.get(),
config.latency_monitor, config.latency,
format->sample_rate, out_sample_rate),
session_config.latency_monitor, session_config.latency,
format->sample_rate, output_config.sample_rate),
allocator_);
if (!latency_monitor_ || !latency_monitor_->valid()) {
return;
Expand Down
Loading

0 comments on commit c2cd1dc

Please sign in to comment.