diff --git a/lib/include/srslte/common/basic_pnf.h b/lib/include/srslte/common/basic_pnf.h index c4c28ceb8..91ac341a2 100644 --- a/lib/include/srslte/common/basic_pnf.h +++ b/lib/include/srslte/common/basic_pnf.h @@ -24,6 +24,10 @@ #include "basic_vnf_api.h" #include "common.h" +#include "srslte/common/block_queue.h" +#include "srslte/common/buffer_pool.h" +#include "srslte/common/choice_type.h" +#include "srslte/common/logmap.h" #include #include #include @@ -56,6 +60,11 @@ struct pnf_metrics_t { class srslte_basic_pnf { + using msg_header_t = basic_vnf_api::msg_header_t; + const static size_t buffer_size = + srslte::static_max::value; + using msg_buffer_t = std::array; + public: srslte_basic_pnf(const std::string& type_, const std::string& vnf_p5_addr, @@ -72,7 +81,11 @@ public: num_sf(num_sf_), tb_len(tb_len_), rand_gen(RAND_SEED), - rand_dist(MIN_TB_LEN, MAX_TB_LEN){}; + rand_dist(MIN_TB_LEN, MAX_TB_LEN) + { + + log_h->set_level(srslte::LOG_LEVEL_INFO); + } ~srslte_basic_pnf() { stop(); }; @@ -106,10 +119,10 @@ public: running = true; if (type == "gnb") { - rx_thread = std::unique_ptr(new std::thread(&srslte_basic_pnf::rx_thread_function, this)); - tx_thread = std::unique_ptr(new std::thread(&srslte_basic_pnf::tx_thread_function, this)); + rx_thread = std::unique_ptr(new std::thread(&srslte_basic_pnf::dl_handler_thread, this)); + tx_thread = std::unique_ptr(new std::thread(&srslte_basic_pnf::ul_handler_thread, this)); } else { - tx_thread = std::unique_ptr(new std::thread(&srslte_basic_pnf::tx_thread_function_ue, this)); + tx_thread = std::unique_ptr(new std::thread(&srslte_basic_pnf::ue_dl_handler_thread, this)); } return true; @@ -141,8 +154,12 @@ public: return tmp; } + void connect_out_rf_queue(srslte::block_queue* rf_queue_) { rf_out_queue = rf_queue_; } + srslte::block_queue* get_in_rf_queue() { return &rf_in_queue; } + private: - void rx_thread_function() + //! Waits for DL Config or Tx Request Msg from VNF and forwards to RF + void dl_handler_thread() { pthread_setname_np(pthread_self(), rx_thread_name.c_str()); @@ -152,25 +169,26 @@ private: fd.fd = sockfd; fd.events = POLLIN; - const uint32_t max_basic_api_pdu = sizeof(basic_vnf_api::dl_conf_msg_t) + 32; // larger than biggest message - std::unique_ptr > rx_buffer = - std::unique_ptr >(new std::array); + // const uint32_t max_basic_api_pdu = sizeof(basic_vnf_api::dl_conf_msg_t) + 32; // larger than biggest message + // std::unique_ptr > rx_buffer = + // std::unique_ptr >(new std::array); + std::unique_ptr rx_buffer{new msg_buffer_t{}}; while (running) { // receive response int ret = poll(&fd, 1, RX_TIMEOUT_MS); switch (ret) { case -1: - printf("Error occured.\n"); + printf("Error occurred.\n"); running = false; break; case 0: // Timeout printf("Error: Didn't receive response after %dms\n", RX_TIMEOUT_MS); - running = false; + // running = false; break; default: - int recv_ret = recv(sockfd, rx_buffer->data(), rx_buffer->size(), 0); + int recv_ret = recv(sockfd, rx_buffer->data(), sizeof(*rx_buffer), 0); handle_msg(rx_buffer->data(), recv_ret); break; } @@ -185,7 +203,7 @@ private: } }; - void tx_thread_function() + void ul_handler_thread() { pthread_setname_np(pthread_self(), tx_thread_name.c_str()); @@ -199,7 +217,8 @@ private: std::unique_ptr > rx_buffer = std::unique_ptr >(new std::array); - int32_t sf_counter = 0; + int32_t sf_counter = 0; + bool using_rf_queue = false; while (running && (num_sf > 0 ? sf_counter < num_sf : true)) { { std::lock_guard lock(mutex); @@ -213,9 +232,14 @@ private: // Send request send_sf_ind(tti); - // provide UL data every 2nd TTI - if (tti % 2 == 0) { + if (not rf_in_queue.empty()) { + using_rf_queue = true; send_rx_data_ind(tti); + } else if (not using_rf_queue) { + // provide UL data every 2nd TTI + if (tti % 2 == 0) { + send_rx_data_ind(tti); + } } sf_counter++; @@ -227,7 +251,7 @@ private: printf("Leaving Tx thread after %d subframes\n", sf_counter); }; - void tx_thread_function_ue() + void ue_dl_handler_thread() { pthread_setname_np(pthread_self(), tx_thread_name.c_str()); @@ -241,7 +265,8 @@ private: std::unique_ptr > rx_buffer = std::unique_ptr >(new std::array); - int32_t sf_counter = 0; + int32_t sf_counter = 0; + bool using_rf_queue = false; while (running && (num_sf > 0 ? sf_counter < num_sf : true)) { { std::lock_guard lock(mutex); @@ -255,11 +280,17 @@ private: // Send SF indication send_sf_ind(tti); - // provide DL grant every even TTI, and UL grant every odd - if (tti % 2 == 0) { - send_dl_ind(tti); - } else { - send_ul_ind(tti); + if (not rf_in_queue.empty()) { + using_rf_queue = true; + srslte::unique_byte_buffer_t tb = rf_in_queue.wait_pop(); + send_dl_ind(tti, std::move(tb)); + } else if (not using_rf_queue) { + // provide DL grant every even TTI, and UL grant every odd + if (tti % 2 == 0) { + send_dl_ind(tti); + } else { + send_ul_ind(tti); + } } sf_counter++; @@ -291,7 +322,7 @@ private: { basic_vnf_api::msg_header_t* header = (basic_vnf_api::msg_header_t*)buffer; - // printf("Received %s (%d B) in TTI\n", msg_type_text[header->type], len); + log_h->info("Received %s (%d B) in TTI\n", basic_vnf_api::msg_type_text[header->type], len); switch (header->type) { case basic_vnf_api::SF_IND: @@ -330,7 +361,7 @@ private: if (msg->tti != tti) { metrics.num_timing_errors++; // printf("Received TX request for TTI=%d but current TTI is %d\n", msg->tti, tti.load()); - return -1; + // return -1; } for (uint32_t i = 0; i < msg->nof_pdus; ++i) { @@ -339,6 +370,15 @@ private: metrics.num_pdus += msg->nof_pdus; + if (rf_out_queue != nullptr) { + for (uint32_t i = 0; i < msg->nof_pdus; ++i) { + srslte::unique_byte_buffer_t pdu = srslte::allocate_unique_buffer(*pool); + pdu->N_bytes = msg->pdus[i].length; + memcpy(pdu->msg, msg->pdus[i].data, msg->pdus[i].length); + rf_out_queue->push(std::move(pdu)); + } + } + return 0; } @@ -377,7 +417,7 @@ private: } } - void send_dl_ind(uint32_t tti_) + void send_dl_ind(uint32_t tti_, srslte::unique_byte_buffer_t tb = {}) { #if PING_REQUEST_PDU static uint8_t tv[] = { @@ -403,6 +443,8 @@ private: 0x3f, }; #endif // PING_REQUEST_PDU + uint8_t* data = tb != nullptr ? tb->msg : tv; + uint32_t N_bytes = tb != nullptr ? tb->N_bytes : sizeof(tv); basic_vnf_api::dl_ind_msg_t dl_ind = {}; @@ -415,16 +457,18 @@ private: dl_ind.pdus[0].type = basic_vnf_api::PDSCH; dl_ind.pdus[0].length = tb_len > 0 ? tb_len : rand_dist(rand_gen); - if (dl_ind.pdus[0].length >= sizeof(tv)) { + if (dl_ind.pdus[0].length >= N_bytes) { // copy TV - memcpy(dl_ind.pdus[0].data, tv, sizeof(tv)); + memcpy(dl_ind.pdus[0].data, data, N_bytes); // set remaining bytes to zero - memset(dl_ind.pdus[0].data + sizeof(tv), 0xaa, dl_ind.pdus[0].length - sizeof(tv)); + memset(dl_ind.pdus[0].data + N_bytes, 0xaa, dl_ind.pdus[0].length - N_bytes); } else { // just fill with dummy bytes memset(dl_ind.pdus[0].data, 0xab, dl_ind.pdus[0].length); } + log_h->info_hex(dl_ind.pdus[0].data, N_bytes, "Sending to UE a TB (%d bytes)\n", N_bytes); + int n = 0; if ((n = sendto(sockfd, &dl_ind, sizeof(dl_ind), 0, (struct sockaddr*)&servaddr, sizeof(servaddr))) < 0) { printf("sendto failed, ret=%d\n", n); @@ -452,6 +496,8 @@ private: std::unique_ptr tx_thread, rx_thread; std::string tx_thread_name = "TX_PNF", rx_thread_name = "RX_PNF"; bool running = false; + srslte::byte_buffer_pool* pool = srslte::byte_buffer_pool::get_instance(); + srslte::log_ref log_h{"PNF"}; std::mutex mutex; std::atomic tti; @@ -474,6 +520,9 @@ private: // For random number generation std::mt19937 rand_gen; std::uniform_int_distribution rand_dist; + + srslte::block_queue* rf_out_queue = nullptr; + srslte::block_queue rf_in_queue; }; } // namespace srslte diff --git a/lib/include/srslte/common/basic_vnf.h b/lib/include/srslte/common/basic_vnf.h index 5c82504f6..4dd2df289 100644 --- a/lib/include/srslte/common/basic_vnf.h +++ b/lib/include/srslte/common/basic_vnf.h @@ -24,7 +24,7 @@ #include "basic_vnf_api.h" #include "common.h" -#include "srslte/common/log_filter.h" +#include "srslte/common/logmap.h" #include "srslte/common/threads.h" #include "srslte/interfaces/gnb_interfaces.h" #include "srslte/interfaces/ue_nr_interfaces.h" @@ -69,11 +69,11 @@ private: // helpers uint32_t calc_full_msg_len(const basic_vnf_api::tx_request_msg_t& msg); - srslte::logger* m_logger = nullptr; - std::unique_ptr m_log = nullptr; - srsenb::stack_interface_phy_nr* m_gnb_stack = nullptr; - srsue::stack_interface_phy_nr* m_ue_stack = nullptr; - srslte::byte_buffer_pool* m_pool = nullptr; + srslte::logger* m_logger = nullptr; + srslte::log_ref log_h; + srsenb::stack_interface_phy_nr* m_gnb_stack = nullptr; + srsue::stack_interface_phy_nr* m_ue_stack = nullptr; + srslte::byte_buffer_pool* m_pool = nullptr; std::unique_ptr m_tx_req_msg; diff --git a/lib/src/common/basic_vnf.cc b/lib/src/common/basic_vnf.cc index c41644e8e..8ca0b46eb 100644 --- a/lib/src/common/basic_vnf.cc +++ b/lib/src/common/basic_vnf.cc @@ -41,12 +41,11 @@ srslte_basic_vnf::srslte_basic_vnf(const vnf_args_t& args_, srslte::logger* logg m_logger(logger_), thread("BASIC_VNF_P7"), m_tx_req_msg(new basic_vnf_api::tx_request_msg_t), - m_log(new srslte::log_filter), + log_h("VNF"), m_pool(srslte::byte_buffer_pool::get_instance()) { - m_log->init("VNF ", m_logger); - m_log->set_level(m_args.log_level); - m_log->set_hex_limit(m_args.log_hex_limit); + log_h->set_level(m_args.log_level); + log_h->set_hex_limit(m_args.log_hex_limit); if (m_args.type == "gnb" || m_args.type == "ue") { if (m_args.type == "gnb") { @@ -55,10 +54,10 @@ srslte_basic_vnf::srslte_basic_vnf(const vnf_args_t& args_, srslte::logger* logg m_ue_stack = (srsue::stack_interface_phy_nr*)stack_; } - m_log->info("Initializing VNF for gNB\n"); + log_h->info("Initializing VNF for gNB\n"); start(); } else { - m_log->error("Unknown VNF type. Exiting\n."); + log_h->error("Unknown VNF type. Exiting\n."); } } @@ -108,7 +107,7 @@ void srslte_basic_vnf::run_thread() running = true; - m_log->info("Started VNF handler listening on %s:%d\n", m_args.bind_addr.c_str(), m_args.bind_port); + log_h->info("Started VNF handler listening on %s:%d\n", m_args.bind_addr.c_str(), m_args.bind_port); while (running) { int ret = poll(&fd, 1, RX_TIMEOUT_MS); @@ -128,14 +127,14 @@ void srslte_basic_vnf::run_thread() break; } } - m_log->info("VNF thread stopped\n"); + log_h->info("VNF thread stopped\n"); } int srslte_basic_vnf::handle_msg(const uint8_t* buffer, const uint32_t len) { basic_vnf_api::msg_header_t* header = (basic_vnf_api::msg_header_t*)buffer; - m_log->info("Received %s (%d B)\n", basic_vnf_api::msg_type_text[header->type], len); + log_h->info("Received %s (%d B)\n", basic_vnf_api::msg_type_text[header->type], len); switch (header->type) { case basic_vnf_api::SF_IND: @@ -163,7 +162,7 @@ int srslte_basic_vnf::handle_msg(const uint8_t* buffer, const uint32_t len) int srslte_basic_vnf::handle_sf_ind(basic_vnf_api::sf_ind_msg_t* msg) { int ret = SRSLTE_SUCCESS; - m_log->info("Received %s for TTI=%d\n", basic_vnf_api::msg_type_text[msg->header.type], msg->tti); + log_h->info("Received %s for TTI=%d\n", basic_vnf_api::msg_type_text[msg->header.type], msg->tti); // store Rx timestamp last_sf_indication_time = msg->t1; @@ -182,7 +181,7 @@ int srslte_basic_vnf::handle_sf_ind(basic_vnf_api::sf_ind_msg_t* msg) int srslte_basic_vnf::handle_dl_ind(basic_vnf_api::dl_ind_msg_t* msg) { int ret = SRSLTE_ERROR; - m_log->info("Received %s for TTI=%d\n", basic_vnf_api::msg_type_text[msg->header.type], msg->tti); + log_h->info("Received %s for TTI=%d\n", basic_vnf_api::msg_type_text[msg->header.type], msg->tti); uint32_t cc_idx = 0; @@ -191,7 +190,7 @@ int srslte_basic_vnf::handle_dl_ind(basic_vnf_api::dl_ind_msg_t* msg) dl_grant.tti = msg->tti; if (msg->nof_pdus > SRSLTE_MAX_TB) { - m_log->error("Too many TBs (%d > %d)\n", msg->nof_pdus, SRSLTE_MAX_TB); + log_h->error("Too many TBs (%d > %d)\n", msg->nof_pdus, SRSLTE_MAX_TB); goto exit; } @@ -204,7 +203,7 @@ int srslte_basic_vnf::handle_dl_ind(basic_vnf_api::dl_ind_msg_t* msg) m_ue_stack->tb_decoded(cc_idx, dl_grant); } } else { - m_log->error("TB too big to fit into buffer (%d > %d)\n", msg->pdus[i].length, dl_grant.tb[i]->get_tailroom()); + log_h->error("TB too big to fit into buffer (%d > %d)\n", msg->pdus[i].length, dl_grant.tb[i]->get_tailroom()); } } @@ -217,10 +216,10 @@ exit: int srslte_basic_vnf::handle_ul_ind(basic_vnf_api::ul_ind_msg_t* msg) { - m_log->info("Received %s for TTI=%d\n", basic_vnf_api::msg_type_text[msg->header.type], msg->tti); + log_h->info("Received %s for TTI=%d\n", basic_vnf_api::msg_type_text[msg->header.type], msg->tti); if (msg->pdus.type != basic_vnf_api::PUSCH) { - m_log->error("Received UL indication for wrong PDU type\n"); + log_h->error("Received UL indication for wrong PDU type\n"); return SRSLTE_ERROR; } @@ -238,10 +237,10 @@ int srslte_basic_vnf::handle_ul_ind(basic_vnf_api::ul_ind_msg_t* msg) int srslte_basic_vnf::handle_rx_data_ind(basic_vnf_api::rx_data_ind_msg_t* msg) { - m_log->info("Received %s for TTI=%d\n", basic_vnf_api::msg_type_text[msg->header.type], msg->sfn); + log_h->info("Received %s for TTI=%d\n", basic_vnf_api::msg_type_text[msg->header.type], msg->sfn); if (msg->nof_pdus != 1 || msg->pdus[0].type != basic_vnf_api::PUSCH) { - m_log->error("Received UL indication for wrong PDU type\n"); + log_h->error("Received UL indication for wrong PDU type\n"); return SRSLTE_ERROR; } @@ -277,10 +276,10 @@ int srslte_basic_vnf::dl_config_request(const srsenb::phy_interface_stack_nr::dl uint32_t len = sizeof(dl_conf); // Send it to PNF - m_log->info("Sending %s (%d B)\n", basic_vnf_api::msg_type_text[dl_conf.header.type], len); + log_h->info("Sending %s (%d B)\n", basic_vnf_api::msg_type_text[dl_conf.header.type], len); int n = 0; if ((n = sendto(sockfd, &dl_conf, len, MSG_CONFIRM, (struct sockaddr*)&client_addr, sizeof(client_addr))) < 0) { - m_log->error("sendto failed, ret=%d\n", n); + log_h->error("sendto failed, ret=%d\n", n); } return 0; @@ -304,7 +303,7 @@ int srslte_basic_vnf::tx_request(const srsue::phy_interface_stack_nr::tx_request // copy data from TB0 memcpy(m_tx_req_msg->pdus[0].data, request.data, request.tb_len); } else { - m_log->error("Trying to send %d B PDU. Maximum size is %d B\n", request.tb_len, MAX_PDU_SIZE); + log_h->error("Trying to send %d B PDU. Maximum size is %d B\n", request.tb_len, MAX_PDU_SIZE); } // calculate actual length of @@ -314,11 +313,11 @@ int srslte_basic_vnf::tx_request(const srsue::phy_interface_stack_nr::tx_request m_tx_req_msg->header.msg_len = len - sizeof(basic_vnf_api::msg_header_t); // Send it to PNF - m_log->info("Sending %s (%d B)\n", basic_vnf_api::msg_type_text[m_tx_req_msg->header.type], len); + log_h->info("Sending %s (%d B)\n", basic_vnf_api::msg_type_text[m_tx_req_msg->header.type], len); int n = 0; if ((n = sendto(sockfd, m_tx_req_msg.get(), len, MSG_CONFIRM, (struct sockaddr*)&client_addr, sizeof(client_addr))) < 0) { - m_log->error("sendto failed, ret=%d\n", n); + log_h->error("sendto failed, ret=%d\n", n); } return 0; @@ -327,7 +326,7 @@ int srslte_basic_vnf::tx_request(const srsue::phy_interface_stack_nr::tx_request int srslte_basic_vnf::tx_request(const srsenb::phy_interface_stack_nr::tx_request_t& request) { if (request.nof_pdus > MAX_NUM_PDUS) { - m_log->error("Trying to send %d PDUs but only %d supported\n", request.nof_pdus, MAX_NUM_PDUS); + log_h->error("Trying to send %d PDUs but only %d supported\n", request.nof_pdus, MAX_NUM_PDUS); return SRSLTE_ERROR; } @@ -346,7 +345,7 @@ int srslte_basic_vnf::tx_request(const srsenb::phy_interface_stack_nr::tx_reques // copy data from TB0 memcpy(m_tx_req_msg->pdus[i].data, request.pdus[i].data[0], m_tx_req_msg->pdus[i].length); } else { - m_log->error("Trying to send %d B PDU. Maximum size is %d B\n", request.pdus[i].length, MAX_PDU_SIZE); + log_h->error("Trying to send %d B PDU. Maximum size is %d B\n", request.pdus[i].length, MAX_PDU_SIZE); } } @@ -357,11 +356,21 @@ int srslte_basic_vnf::tx_request(const srsenb::phy_interface_stack_nr::tx_reques m_tx_req_msg->header.msg_len = len - sizeof(basic_vnf_api::msg_header_t); // Send it to PNF - m_log->info("Sending %s (%d B)\n", basic_vnf_api::msg_type_text[m_tx_req_msg->header.type], len); + log_h->info("Sending %s (%d B)\n", basic_vnf_api::msg_type_text[m_tx_req_msg->header.type], len); + if (log_h->get_level() == LOG_LEVEL_DEBUG) { + for (uint32_t i = 0; i < m_tx_req_msg->nof_pdus; ++i) { + log_h->debug_hex(m_tx_req_msg->pdus[i].data, + m_tx_req_msg->pdus[i].length, + "Sending PDU %s:%d (%d bytes)\n", + basic_vnf_api::msg_type_text[m_tx_req_msg->header.type], + m_tx_req_msg->pdus[i].index, + m_tx_req_msg->pdus[i].length); + } + } int n = 0; if ((n = sendto(sockfd, m_tx_req_msg.get(), len, MSG_CONFIRM, (struct sockaddr*)&client_addr, sizeof(client_addr))) < 0) { - m_log->error("sendto failed, ret=%d\n", n); + log_h->error("sendto failed, ret=%d\n", n); } return 0; diff --git a/lib/test/common/CMakeLists.txt b/lib/test/common/CMakeLists.txt index 646076e50..d44412b4c 100644 --- a/lib/test/common/CMakeLists.txt +++ b/lib/test/common/CMakeLists.txt @@ -100,9 +100,9 @@ add_test(expected_test expected_test) if(ENABLE_5GNR) add_executable(pnf_dummy pnf_dummy.cc) - target_link_libraries(pnf_dummy ${CMAKE_THREAD_LIBS_INIT} ${Boost_LIBRARIES}) + target_link_libraries(pnf_dummy srslte_common ${CMAKE_THREAD_LIBS_INIT} ${Boost_LIBRARIES}) add_executable(pnf_bridge pnf_bridge.cc) - target_link_libraries(pnf_bridge ${CMAKE_THREAD_LIBS_INIT} ${Boost_LIBRARIES}) + target_link_libraries(pnf_bridge srslte_common ${CMAKE_THREAD_LIBS_INIT} ${Boost_LIBRARIES}) endif() diff --git a/lib/test/common/pnf_bridge.cc b/lib/test/common/pnf_bridge.cc index bfb6dec1c..f468932ee 100644 --- a/lib/test/common/pnf_bridge.cc +++ b/lib/test/common/pnf_bridge.cc @@ -100,6 +100,8 @@ int main(int argc, char** argv) srslte::srslte_basic_pnf gnb_pnf( "gnb", args.gnb_vnf_addr, args.gnb_vnf_port, args.sf_interval, args.num_sf, args.tb_len); + gnb_pnf.connect_out_rf_queue(ue_pnf.get_in_rf_queue()); + ue_pnf.start(); gnb_pnf.start(); diff --git a/srsenb/src/enb.cc b/srsenb/src/enb.cc index c09a47eb4..66fdded28 100644 --- a/srsenb/src/enb.cc +++ b/srsenb/src/enb.cc @@ -127,7 +127,9 @@ int enb::init(const all_args_t& args_, srslte::logger* logger_) // TODO: where do we put this? srsenb::nr_phy_cfg_t nr_phy_cfg = {}; - args.phy.vnf_args.type = "gnb"; + args.phy.vnf_args.type = "gnb"; + args.phy.vnf_args.log_level = args.phy.log.phy_level; + args.phy.vnf_args.log_hex_limit = args.phy.log.phy_hex_limit; if (nr_phy->init(args.phy, nr_phy_cfg, nr_stack.get())) { log->console("Error initializing PHY.\n"); return SRSLTE_ERROR;