Skip to content

Commit

Permalink
Merge pull request #136 from DUNE-DAQ/eflumerf/TCMakersToDataSubscriber
Browse files Browse the repository at this point in the history
Have the standalone TC makers feed into the DataSubscriberModule.
  • Loading branch information
eflumerf authored Oct 2, 2024
2 parents fa393f4 + 4a12ff8 commit fb9db65
Showing 1 changed file with 96 additions and 85 deletions.
181 changes: 96 additions & 85 deletions src/MLTApplication.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
// #include "confmodel/ReadoutGroup.hpp"
// #include "confmodel/ReadoutInterface.hpp"
// #include "confmodel/DetectorStream.hpp"
#include "confmodel/DetectorToDaqConnection.hpp"
#include "confmodel/DetectorStream.hpp"
#include "confmodel/DetectorToDaqConnection.hpp"

#include "confmodel/ResourceSet.hpp"
#include "confmodel/Service.hpp"
Expand All @@ -27,26 +27,26 @@
#include "appmodel/NetworkConnectionRule.hpp"
#include "appmodel/QueueConnectionRule.hpp"

#include "appmodel/QueueDescriptor.hpp"
#include "appmodel/NetworkConnectionDescriptor.hpp"
#include "appmodel/QueueDescriptor.hpp"

#include "appmodel/SourceIDConf.hpp"

#include "appmodel/DataSubscriberModule.hpp"
#include "appmodel/DataReaderConf.hpp"
#include "appmodel/DataRecorderConf.hpp"
#include "appmodel/DataSubscriberModule.hpp"

#include "appmodel/DataHandlerModule.hpp"
#include "appmodel/DataHandlerConf.hpp"
#include "appmodel/DataHandlerModule.hpp"
#include "appmodel/TCDataProcessor.hpp"

#include "appmodel/MLTModule.hpp"
#include "appmodel/MLTConf.hpp"
#include "appmodel/MLTModule.hpp"

#include "appmodel/FakeHSIApplication.hpp"
#include "appmodel/MLTApplication.hpp"
#include "appmodel/ReadoutApplication.hpp"
#include "appmodel/TriggerApplication.hpp"
#include "appmodel/FakeHSIApplication.hpp"
#include "appmodel/appmodelIssues.hpp"

#include "appmodel/StandaloneTCMakerConf.hpp"
Expand Down Expand Up @@ -81,9 +81,9 @@ static ModuleFactory::Registrator __reg__("MLTApplication",
*/
conffwk::ConfigObject
create_mlt_network_connection(std::string uid,
const NetworkConnectionDescriptor* ntDesc,
conffwk::Configuration* confdb,
const std::string& dbfile)
const NetworkConnectionDescriptor* ntDesc,
conffwk::Configuration* confdb,
const std::string& dbfile)
{
auto ntServiceObj = ntDesc->get_associated_service()->config_object();
conffwk::ConfigObject ntObj;
Expand All @@ -97,13 +97,13 @@ create_mlt_network_connection(std::string uid,

std::vector<const confmodel::DaqModule*>
MLTApplication::generate_modules(conffwk::Configuration* confdb,
const std::string& dbfile,
const confmodel::Session* session) const
const std::string& dbfile,
const confmodel::Session* session) const
{
std::vector<const confmodel::DaqModule*> modules;

//auto mlt_conf = get_mlt_conf();
//auto mlt_class = mlt_conf->get_template_for();
// auto mlt_conf = get_mlt_conf();
// auto mlt_class = mlt_conf->get_template_for();

auto tch_conf = get_trigger_inputs_handler();
auto tch_class = tch_conf->get_template_for();
Expand All @@ -126,17 +126,16 @@ MLTApplication::generate_modules(conffwk::Configuration* confdb,
auto data_type = rule->get_descriptor()->get_data_type();
if (destination_class == tch_class) {
tc_inputq_desc = rule->get_descriptor();
}
else if (destination_class == mlt_class) {
} else if (destination_class == mlt_class) {
td_outputq_desc = rule->get_descriptor();
}
}

if (tc_inputq_desc == nullptr) {
throw (BadConf(ERS_HERE, "No TC input queue descriptor given"));
throw(BadConf(ERS_HERE, "No TC input queue descriptor given"));
}
if (td_outputq_desc == nullptr) {
throw (BadConf(ERS_HERE, "No TD output-input queue descriptor given"));
throw(BadConf(ERS_HERE, "No TD output-input queue descriptor given"));
}

// Create queues
Expand All @@ -156,7 +155,6 @@ MLTApplication::generate_modules(conffwk::Configuration* confdb,
output_queue_obj.set_by_val<std::string>("queue_type", td_outputq_desc->get_queue_type());
output_queue_obj.set_by_val<uint32_t>("capacity", td_outputq_desc->get_capacity());


// Net descriptors
const NetworkConnectionDescriptor* req_net_desc = nullptr;
const NetworkConnectionDescriptor* tc_net_desc = nullptr;
Expand All @@ -178,7 +176,7 @@ MLTApplication::generate_modules(conffwk::Configuration* confdb,
tc_net_desc = rule->get_descriptor();
}
if (data_type == "TimeSync") {
timesync_net_desc = rule->get_descriptor();
timesync_net_desc = rule->get_descriptor();
}
if (data_type == "DataRequest") {
req_net_desc = rule->get_descriptor();
Expand All @@ -205,54 +203,60 @@ MLTApplication::generate_modules(conffwk::Configuration* confdb,
create_mlt_network_connection(ti_net_desc->get_uid_base(), ti_net_desc, confdb, dbfile);

conffwk::ConfigObject tc_net_obj =
create_mlt_network_connection(tc_net_desc->get_uid_base()+".*", tc_net_desc, confdb, dbfile);
create_mlt_network_connection(tc_net_desc->get_uid_base() + ".*", tc_net_desc, confdb, dbfile);

// Network connection for output TriggerDecision
// Network connection for output TriggerDecision
conffwk::ConfigObject td_net_obj =
create_mlt_network_connection(td_net_desc->get_uid_base(), td_net_desc, confdb, dbfile);

// Network conection for the input Data Requests
conffwk::ConfigObject dr_net_obj =
create_mlt_network_connection(req_net_desc->get_uid_base()+UID(), req_net_desc, confdb, dbfile);
create_mlt_network_connection(req_net_desc->get_uid_base() + UID(), req_net_desc, confdb, dbfile);

conffwk::ConfigObject timesync_net_obj;
if (timesync_net_desc != nullptr) {
timesync_net_obj = create_mlt_network_connection(timesync_net_desc->get_uid_base()+".*", timesync_net_desc, confdb, dbfile);
timesync_net_obj =
create_mlt_network_connection(timesync_net_desc->get_uid_base() + ".*", timesync_net_desc, confdb, dbfile);
}


/**************************************************************
/**************************************************************
* Instantiate standalone TC generator modules (e.g. random TC generator)
**************************************************************/

auto standalone_TC_maker_confs = get_standalone_candidate_maker_confs();
for (auto gen_conf : standalone_TC_maker_confs) {
conffwk::ConfigObject gen_obj;
confdb->create(dbfile, gen_conf->get_template_for(), gen_conf->UID(), gen_obj);
gen_obj.set_obj("configuration", &(gen_conf->config_object()));
if (gen_conf->get_timestamp_method() == "kTimeSync" && !timesync_net_obj.is_null()) {
gen_obj.set_objs("inputs", {&timesync_net_obj});
}
gen_obj.set_objs("outputs", {&input_queue_obj});
modules.push_back(confdb->get<StandaloneTCMakerModule>(gen_conf->UID()));
}
auto standalone_TC_maker_confs = get_standalone_candidate_maker_confs();
std::vector<conffwk::ConfigObject> generated_tc_conns;
generated_tc_conns.reserve(standalone_TC_maker_confs.size());
for (auto gen_conf : standalone_TC_maker_confs) {
conffwk::ConfigObject gen_obj;
confdb->create(dbfile, gen_conf->get_template_for(), gen_conf->UID(), gen_obj);
gen_obj.set_obj("configuration", &(gen_conf->config_object()));
if (gen_conf->get_timestamp_method() == "kTimeSync" && !timesync_net_obj.is_null()) {
gen_obj.set_objs("inputs", { &timesync_net_obj });
}

auto tc_net_gen =
create_mlt_network_connection(tc_net_desc->get_uid_base() + gen_conf->UID(), tc_net_desc, confdb, dbfile);
generated_tc_conns.push_back(tc_net_gen);

gen_obj.set_objs("outputs", { &generated_tc_conns.back() });
modules.push_back(confdb->get<StandaloneTCMakerModule>(gen_conf->UID()));
}

/**************************************************************
* Create the Data Reader
**************************************************************/
auto rdr_conf = get_data_subscriber();
if (rdr_conf == nullptr) {
throw (BadConf(ERS_HERE, "No DataReaderModule configuration given"));
throw(BadConf(ERS_HERE, "No DataReaderModule configuration given"));
}

std::string reader_uid("data-reader-"+UID());
std::string reader_uid("data-reader-" + UID());
std::string reader_class = rdr_conf->get_template_for();
conffwk::ConfigObject reader_obj;
TLOG_DEBUG(7) << "creating OKS configuration object for Data subscriber class " << reader_class;
TLOG_DEBUG(7) << "creating OKS configuration object for Data subscriber class " << reader_class;
confdb->create(dbfile, reader_class, reader_uid, reader_obj);
reader_obj.set_objs("inputs", {&tc_net_obj} );
reader_obj.set_objs("outputs", {&input_queue_obj} );
reader_obj.set_objs("inputs", { &tc_net_obj });
reader_obj.set_objs("outputs", { &input_queue_obj });
reader_obj.set_obj("configuration", &rdr_conf->config_object());

modules.push_back(confdb->get<DataSubscriberModule>(reader_uid));
Expand All @@ -268,23 +272,24 @@ MLTApplication::generate_modules(conffwk::Configuration* confdb,
for (auto app : apps) {
auto ro_app = app->cast<appmodel::ReadoutApplication>();
if (ro_app != nullptr) {
auto resources = ro_app->get_contains();
// Interate over all the readout groups
for (auto d2d_conn_res : resources) {
if (d2d_conn_res->disabled(*session)) {
TLOG_DEBUG(7) << "Ignoring disabled Detector2DaqConnection " << d2d_conn_res->UID();
continue;
auto resources = ro_app->get_contains();
// Interate over all the readout groups
for (auto d2d_conn_res : resources) {
if (d2d_conn_res->disabled(*session)) {
TLOG_DEBUG(7) << "Ignoring disabled Detector2DaqConnection " << d2d_conn_res->UID();
continue;
}

auto d2d_conn = d2d_conn_res->cast<confmodel::DetectorToDaqConnection>();
if (d2d_conn == nullptr) {
throw(BadConf(ERS_HERE, "MLTApplication's detectordaq connections list contains something other than DetectorToDaqConnection"));
throw(BadConf(
ERS_HERE,
"MLTApplication's detectordaq connections list contains something other than DetectorToDaqConnection"));
}
if (d2d_conn->get_contains().empty()) {
throw(BadConf(ERS_HERE, "DetectorToDaqConnection does not contain interfaces"));
}


// Interate over all the streams
for (auto stream : d2d_conn->get_streams()) {
if (stream == nullptr) {
Expand All @@ -307,47 +312,53 @@ MLTApplication::generate_modules(conffwk::Configuration* confdb,
}
}
if (ro_app->get_tp_generation_enabled()) {
for (auto sid: ro_app->get_tp_source_ids()) {
sourceIds.push_back(&(sid->config_object()));
}
//conffwk::ConfigObject* tpSourceIdConf = new conffwk::ConfigObject();
//confdb->create(dbfile, "SourceIDConf", ro_app->UID()+"-"+ std::to_string(ro_app->get_tp_source_id()), *tpSourceIdConf);
//tpSourceIdConf->set_by_val<uint32_t>("sid", ro_app->get_tp_source_id());
//tpSourceIdConf->set_by_val<std::string>("subsystem", "Trigger");
//sourceIds.push_back(tpSourceIdConf);
for (auto sid : ro_app->get_tp_source_ids()) {
sourceIds.push_back(&(sid->config_object()));
}
// conffwk::ConfigObject* tpSourceIdConf = new conffwk::ConfigObject();
// confdb->create(dbfile, "SourceIDConf", ro_app->UID()+"-"+ std::to_string(ro_app->get_tp_source_id()),
// *tpSourceIdConf); tpSourceIdConf->set_by_val<uint32_t>("sid", ro_app->get_tp_source_id());
// tpSourceIdConf->set_by_val<std::string>("subsystem", "Trigger");
// sourceIds.push_back(tpSourceIdConf);
}
}

// SmartDaqApplication now has source_id member, might want to use that but make sure that it's actually a data source somehow...
// SmartDaqApplication now has source_id member, might want to use that but make sure that it's actually a data
// source somehow...
auto trg_app = app->cast<appmodel::TriggerApplication>();
if(trg_app != nullptr && trg_app->get_source_id() != nullptr) {
conffwk::ConfigObject* tcSourceIdConf = new conffwk::ConfigObject();
confdb->create(dbfile, "SourceIDConf", trg_app->UID()+"-"+ std::to_string(trg_app->get_source_id()->get_sid()), *tcSourceIdConf);
tcSourceIdConf->set_by_val<uint32_t>("sid", trg_app->get_source_id()->get_sid());
tcSourceIdConf->set_by_val<std::string>("subsystem", trg_app->get_source_id()->get_subsystem());
sourceIds.push_back(tcSourceIdConf);
if (trg_app != nullptr && trg_app->get_source_id() != nullptr) {
conffwk::ConfigObject* tcSourceIdConf = new conffwk::ConfigObject();
confdb->create(dbfile,
"SourceIDConf",
trg_app->UID() + "-" + std::to_string(trg_app->get_source_id()->get_sid()),
*tcSourceIdConf);
tcSourceIdConf->set_by_val<uint32_t>("sid", trg_app->get_source_id()->get_sid());
tcSourceIdConf->set_by_val<std::string>("subsystem", trg_app->get_source_id()->get_subsystem());
sourceIds.push_back(tcSourceIdConf);
}

// FIXME: add here same logics for HSI application(s)
//
auto hsi_app = app->cast<appmodel::FakeHSIApplication>();
if(hsi_app != nullptr && hsi_app->get_source_id() != nullptr) {
conffwk::ConfigObject* hsEventSourceIdConf = new conffwk::ConfigObject();
confdb->create(dbfile, "SourceIDConf", hsi_app->UID()+"-"+ std::to_string(hsi_app->get_source_id()->get_sid()), *hsEventSourceIdConf);
hsEventSourceIdConf->set_by_val<uint32_t>("sid", hsi_app->get_source_id()->get_sid());
hsEventSourceIdConf->set_by_val<std::string>("subsystem", hsi_app->get_source_id()->get_subsystem());
sourceIds.push_back(hsEventSourceIdConf);
if (hsi_app != nullptr && hsi_app->get_source_id() != nullptr) {
conffwk::ConfigObject* hsEventSourceIdConf = new conffwk::ConfigObject();
confdb->create(dbfile,
"SourceIDConf",
hsi_app->UID() + "-" + std::to_string(hsi_app->get_source_id()->get_sid()),
*hsEventSourceIdConf);
hsEventSourceIdConf->set_by_val<uint32_t>("sid", hsi_app->get_source_id()->get_sid());
hsEventSourceIdConf->set_by_val<std::string>("subsystem", hsi_app->get_source_id()->get_subsystem());
sourceIds.push_back(hsEventSourceIdConf);
}

}

// Get mandatory links
std::vector<const conffwk::ConfigObject*> mandatory_sids;
const TCDataProcessor* tc_dp = tch_conf->get_data_processor()->cast<TCDataProcessor>();
if (tc_dp != nullptr) {
for (auto m: tc_dp->get_mandatory_links()) {
mandatory_sids.push_back(&m->config_object());
}
for (auto m : tc_dp->get_mandatory_links()) {
mandatory_sids.push_back(&m->config_object());
}
}

/**************************************************************
Expand All @@ -360,29 +371,29 @@ MLTApplication::generate_modules(conffwk::Configuration* confdb,
throw(BadConf(ERS_HERE, "No source_id associated with this TriggerApplication!"));
}
uint32_t source_id = get_source_id()->get_sid();
std::string ti_uid(handler_name + "-"+ std::to_string(source_id));
std::string ti_uid(handler_name + "-" + std::to_string(source_id));
confdb->create(dbfile, tch_class, ti_uid, ti_obj);
ti_obj.set_by_val<uint32_t>("source_id", source_id);
ti_obj.set_by_val<uint32_t>("detector_id", 1); //1 == kDAQ
ti_obj.set_by_val<uint32_t>("detector_id", 1); // 1 == kDAQ
ti_obj.set_obj("module_configuration", &tch_conf_obj);
ti_obj.set_objs("enabled_source_ids", sourceIds);
ti_obj.set_objs("mandatory_source_ids", mandatory_sids);
ti_obj.set_objs("inputs", {&input_queue_obj, &dr_net_obj});
ti_obj.set_objs("outputs", {&output_queue_obj});
ti_obj.set_objs("inputs", { &input_queue_obj, &dr_net_obj });
ti_obj.set_objs("outputs", { &output_queue_obj });

// Add to our list of modules to return
modules.push_back(confdb->get<DataHandlerModule>(ti_uid));
modules.push_back(confdb->get<DataHandlerModule>(ti_uid));

/**************************************************************
* Instantiate the MLTModule module
**************************************************************/

conffwk::ConfigObject mlt_obj;
confdb->create(dbfile, mlt_conf->get_template_for(), mlt_conf->UID(), mlt_obj);
mlt_obj.set_obj("configuration", &(mlt_conf->config_object()));
mlt_obj.set_objs("inputs", {&output_queue_obj, &ti_net_obj});
mlt_obj.set_objs("outputs", {&td_net_obj});
modules.push_back(confdb->get<MLTModule>(mlt_conf->UID()));
conffwk::ConfigObject mlt_obj;
confdb->create(dbfile, mlt_conf->get_template_for(), mlt_conf->UID(), mlt_obj);
mlt_obj.set_obj("configuration", &(mlt_conf->config_object()));
mlt_obj.set_objs("inputs", { &output_queue_obj, &ti_net_obj });
mlt_obj.set_objs("outputs", { &td_net_obj });
modules.push_back(confdb->get<MLTModule>(mlt_conf->UID()));

return modules;
}

0 comments on commit fb9db65

Please sign in to comment.