From 39de2efa6948c86a06bb7566d8bb232fdb741ae4 Mon Sep 17 00:00:00 2001 From: Francisco Date: Thu, 1 Apr 2021 12:07:42 +0100 Subject: [PATCH] multisocket handler - use blocking socket remove method --- lib/include/srsran/common/network_utils.h | 10 ++++-- lib/src/common/network_utils.cc | 28 ++++++++++++--- srsenb/hdr/stack/upper/s1ap.h | 2 +- srsenb/src/stack/upper/s1ap.cc | 43 ++++++++++++----------- 4 files changed, 55 insertions(+), 28 deletions(-) diff --git a/lib/include/srsran/common/network_utils.h b/lib/include/srsran/common/network_utils.h index 53d4bda29..3c7446047 100644 --- a/lib/include/srsran/common/network_utils.h +++ b/lib/include/srsran/common/network_utils.h @@ -122,9 +122,10 @@ public: rx_multisocket_handler(const rx_multisocket_handler&) = delete; rx_multisocket_handler& operator=(const rx_multisocket_handler&) = delete; rx_multisocket_handler& operator=(rx_multisocket_handler&&) = delete; - ~rx_multisocket_handler(); + ~rx_multisocket_handler() final; void stop(); + bool remove_socket_nonblocking(int fd, bool signal_completion = false); bool remove_socket(int fd); bool add_socket_handler(int fd, task_callback_t handler); // convenience methods for recv using buffer pool @@ -137,8 +138,9 @@ private: // used to unlock select struct ctrl_cmd_t { enum class cmd_id_t { EXIT, NEW_FD, RM_FD }; - cmd_id_t cmd = cmd_id_t::EXIT; - int new_fd = -1; + cmd_id_t cmd = cmd_id_t::EXIT; + int new_fd = -1; + bool signal_rm_complete = false; }; std::map::iterator remove_socket_unprotected(int fd, fd_set* total_fd_set, int* max_fd); @@ -152,6 +154,8 @@ private: std::map active_sockets; bool running = false; int pipefd[2] = {}; + std::vector rem_fd_tmp_list; + std::condition_variable rem_cvar; }; } // namespace srsran diff --git a/lib/src/common/network_utils.cc b/lib/src/common/network_utils.cc index dcb6db9a1..d55fc8720 100644 --- a/lib/src/common/network_utils.cc +++ b/lib/src/common/network_utils.cc @@ -500,18 +500,19 @@ bool rx_multisocket_handler::add_socket_handler(int fd, task_callback_t handler) return true; } -bool rx_multisocket_handler::remove_socket(int fd) +bool rx_multisocket_handler::remove_socket_nonblocking(int fd, bool signal_completion) { std::lock_guard lock(socket_mutex); auto it = active_sockets.find(fd); if (it == active_sockets.end()) { - rxSockError("The socket fd=%d to be removed does not exist", fd); + rxSockWarn("The socket fd=%d to be removed does not exist", fd); return false; } ctrl_cmd_t msg; - msg.cmd = ctrl_cmd_t::cmd_id_t::RM_FD; - msg.new_fd = fd; + msg.cmd = ctrl_cmd_t::cmd_id_t::RM_FD; + msg.new_fd = fd; + msg.signal_rm_complete = signal_completion; if (write(pipefd[1], &msg, sizeof(msg)) != sizeof(msg)) { rxSockError("while writing to control pipe"); return false; @@ -519,6 +520,21 @@ bool rx_multisocket_handler::remove_socket(int fd) return true; } +bool rx_multisocket_handler::remove_socket(int fd) +{ + bool result = remove_socket_nonblocking(fd, true); + + // block waiting for socket removal + if (result) { + std::unique_lock lock(socket_mutex); + while (std::count(rem_fd_tmp_list.begin(), rem_fd_tmp_list.end(), fd) == 0) { + rem_cvar.wait(lock); + } + rem_fd_tmp_list.erase(std::find(rem_fd_tmp_list.begin(), rem_fd_tmp_list.end(), fd)); + } + return result; +} + std::map::iterator rx_multisocket_handler::remove_socket_unprotected(int fd, fd_set* total_fd_set, int* max_fd) { @@ -601,6 +617,10 @@ void rx_multisocket_handler::run_thread() break; case ctrl_cmd_t::cmd_id_t::RM_FD: remove_socket_unprotected(msg.new_fd, &total_fd_set, &max_fd); + if (msg.signal_rm_complete) { + rem_fd_tmp_list.push_back(msg.new_fd); + rem_cvar.notify_one(); + } rxSockDebug("Socket fd=%d has been successfully removed", msg.new_fd); break; default: diff --git a/srsenb/hdr/stack/upper/s1ap.h b/srsenb/hdr/stack/upper/s1ap.h index 82e12253e..53edebddd 100644 --- a/srsenb/hdr/stack/upper/s1ap.h +++ b/srsenb/hdr/stack/upper/s1ap.h @@ -115,7 +115,7 @@ private: srsenb::stack_interface_s1ap_lte* stack = nullptr; srsran::task_sched_handle task_sched; - srsran::unique_socket s1ap_socket; + srsran::unique_socket mme_socket; struct sockaddr_in mme_addr = {}; // MME address bool mme_connected = false; bool running = false; diff --git a/srsenb/src/stack/upper/s1ap.cc b/srsenb/src/stack/upper/s1ap.cc index 1100c205f..a19533798 100644 --- a/srsenb/src/stack/upper/s1ap.cc +++ b/srsenb/src/stack/upper/s1ap.cc @@ -209,10 +209,10 @@ void s1ap::s1_setup_proc_t::then(const srsran::proc_state_t& result) const s1ap_ptr->mme_connect_timer.duration() / 1000); srsran::console("Failed to initiate S1 connection. Attempting reconnection in %d seconds\n", s1ap_ptr->mme_connect_timer.duration() / 1000); - s1ap_ptr->mme_connect_timer.run(); - s1ap_ptr->stack->remove_mme_socket(s1ap_ptr->s1ap_socket.get_socket()); - s1ap_ptr->s1ap_socket.close(); + s1ap_ptr->stack->remove_mme_socket(s1ap_ptr->mme_socket.get_socket()); + s1ap_ptr->mme_socket.close(); procInfo("S1AP socket closed."); + s1ap_ptr->mme_connect_timer.run(); // Try again with in 10 seconds } } @@ -236,9 +236,10 @@ int s1ap::init(s1ap_args_t args_, rrc_interface_s1ap* rrc_, srsenb::stack_interf // Setup MME reconnection timer mme_connect_timer = task_sched.get_unique_timer(); auto mme_connect_run = [this](uint32_t tid) { - if (not s1setup_proc.launch()) { + if (s1setup_proc.is_busy()) { logger.error("Failed to initiate S1Setup procedure."); } + s1setup_proc.launch(); }; mme_connect_timer.set(10000, mme_connect_run); // Setup S1Setup timeout @@ -263,7 +264,7 @@ int s1ap::init(s1ap_args_t args_, rrc_interface_s1ap* rrc_, srsenb::stack_interf void s1ap::stop() { running = false; - s1ap_socket.close(); + mme_socket.close(); } void s1ap::get_metrics(s1ap_metrics_t& m) @@ -423,23 +424,23 @@ bool s1ap::is_mme_connected() bool s1ap::connect_mme() { + using namespace srsran::net_utils; logger.info("Connecting to MME %s:%d", args.mme_addr.c_str(), int(MME_PORT)); // Init SCTP socket and bind it - if (not srsran::net_utils::sctp_init_client( - &s1ap_socket, srsran::net_utils::socket_type::seqpacket, args.s1c_bind_addr.c_str())) { + if (not sctp_init_client(&mme_socket, socket_type::seqpacket, args.s1c_bind_addr.c_str())) { return false; } - logger.info("SCTP socket opened. fd=%d", s1ap_socket.fd()); + logger.info("SCTP socket opened. fd=%d", mme_socket.fd()); // Connect to the MME address - if (not s1ap_socket.connect_to(args.mme_addr.c_str(), MME_PORT, &mme_addr)) { + if (not mme_socket.connect_to(args.mme_addr.c_str(), MME_PORT, &mme_addr)) { return false; } - logger.info("SCTP socket connected with MME. fd=%d", s1ap_socket.fd()); + logger.info("SCTP socket connected with MME. fd=%d", mme_socket.fd()); // Assign a handler to rx MME packets (going to run in a different thread) - stack->add_mme_socket(s1ap_socket.fd()); + stack->add_mme_socket(mme_socket.fd()); logger.info("SCTP socket established with MME"); return true; @@ -498,26 +499,28 @@ bool s1ap::handle_mme_rx_msg(srsran::unique_byte_buffer_t pdu, if (notification->sn_header.sn_type == SCTP_SHUTDOWN_EVENT) { logger.info("SCTP Association Shutdown. Association: %d", sri.sinfo_assoc_id); srsran::console("SCTP Association Shutdown. Association: %d\n", sri.sinfo_assoc_id); - stack->remove_mme_socket(s1ap_socket.get_socket()); - s1ap_socket.close(); + stack->remove_mme_socket(mme_socket.get_socket()); + mme_socket.close(); } else if (notification->sn_header.sn_type == SCTP_PEER_ADDR_CHANGE && notification->sn_paddr_change.spc_state == SCTP_ADDR_UNREACHABLE) { logger.info("SCTP peer addres unreachable. Association: %d", sri.sinfo_assoc_id); srsran::console("SCTP peer address unreachable. Association: %d\n", sri.sinfo_assoc_id); - stack->remove_mme_socket(s1ap_socket.get_socket()); - s1ap_socket.close(); + stack->remove_mme_socket(mme_socket.get_socket()); + mme_socket.close(); } } else if (pdu->N_bytes == 0) { logger.error("SCTP return 0 bytes. Closing socket"); - s1ap_socket.close(); + mme_socket.close(); } // Restart MME connection procedure if we lost connection - if (not s1ap_socket.is_open()) { + if (not mme_socket.is_open()) { mme_connected = false; - if (not s1setup_proc.launch()) { - logger.error("Failed to initiate MME connection procedure."); + if (s1setup_proc.is_busy()) { + logger.error("Failed to initiate MME connection procedure, as it is already running."); + return false; } + s1setup_proc.launch(); return false; } @@ -1596,7 +1599,7 @@ bool s1ap::sctp_send_s1ap_pdu(const asn1::s1ap::s1ap_pdu_c& tx_pdu, uint32_t rnt } uint16_t streamid = rnti == SRSRAN_INVALID_RNTI ? NONUE_STREAM_ID : users.find_ue_rnti(rnti)->stream_id; - ssize_t n_sent = sctp_sendmsg(s1ap_socket.fd(), + ssize_t n_sent = sctp_sendmsg(mme_socket.fd(), buf->msg, buf->N_bytes, (struct sockaddr*)&mme_addr,