multisocket handler - use blocking socket remove method

This commit is contained in:
Francisco 2021-04-01 12:07:42 +01:00 committed by Francisco Paisana
parent bf96d897ee
commit 39de2efa69
4 changed files with 55 additions and 28 deletions

View File

@ -122,9 +122,10 @@ public:
rx_multisocket_handler(const rx_multisocket_handler&) = delete;
rx_multisocket_handler& operator=(const rx_multisocket_handler&) = delete;
rx_multisocket_handler& operator=(rx_multisocket_handler&&) = delete;
~rx_multisocket_handler();
~rx_multisocket_handler() final;
void stop();
bool remove_socket_nonblocking(int fd, bool signal_completion = false);
bool remove_socket(int fd);
bool add_socket_handler(int fd, task_callback_t handler);
// convenience methods for recv using buffer pool
@ -137,8 +138,9 @@ private:
// used to unlock select
struct ctrl_cmd_t {
enum class cmd_id_t { EXIT, NEW_FD, RM_FD };
cmd_id_t cmd = cmd_id_t::EXIT;
int new_fd = -1;
cmd_id_t cmd = cmd_id_t::EXIT;
int new_fd = -1;
bool signal_rm_complete = false;
};
std::map<int, rx_multisocket_handler::task_callback_t>::iterator
remove_socket_unprotected(int fd, fd_set* total_fd_set, int* max_fd);
@ -152,6 +154,8 @@ private:
std::map<int, task_callback_t> active_sockets;
bool running = false;
int pipefd[2] = {};
std::vector<int> rem_fd_tmp_list;
std::condition_variable rem_cvar;
};
} // namespace srsran

View File

@ -500,18 +500,19 @@ bool rx_multisocket_handler::add_socket_handler(int fd, task_callback_t handler)
return true;
}
bool rx_multisocket_handler::remove_socket(int fd)
bool rx_multisocket_handler::remove_socket_nonblocking(int fd, bool signal_completion)
{
std::lock_guard<std::mutex> lock(socket_mutex);
auto it = active_sockets.find(fd);
if (it == active_sockets.end()) {
rxSockError("The socket fd=%d to be removed does not exist", fd);
rxSockWarn("The socket fd=%d to be removed does not exist", fd);
return false;
}
ctrl_cmd_t msg;
msg.cmd = ctrl_cmd_t::cmd_id_t::RM_FD;
msg.new_fd = fd;
msg.cmd = ctrl_cmd_t::cmd_id_t::RM_FD;
msg.new_fd = fd;
msg.signal_rm_complete = signal_completion;
if (write(pipefd[1], &msg, sizeof(msg)) != sizeof(msg)) {
rxSockError("while writing to control pipe");
return false;
@ -519,6 +520,21 @@ bool rx_multisocket_handler::remove_socket(int fd)
return true;
}
bool rx_multisocket_handler::remove_socket(int fd)
{
bool result = remove_socket_nonblocking(fd, true);
// block waiting for socket removal
if (result) {
std::unique_lock<std::mutex> lock(socket_mutex);
while (std::count(rem_fd_tmp_list.begin(), rem_fd_tmp_list.end(), fd) == 0) {
rem_cvar.wait(lock);
}
rem_fd_tmp_list.erase(std::find(rem_fd_tmp_list.begin(), rem_fd_tmp_list.end(), fd));
}
return result;
}
std::map<int, rx_multisocket_handler::task_callback_t>::iterator
rx_multisocket_handler::remove_socket_unprotected(int fd, fd_set* total_fd_set, int* max_fd)
{
@ -601,6 +617,10 @@ void rx_multisocket_handler::run_thread()
break;
case ctrl_cmd_t::cmd_id_t::RM_FD:
remove_socket_unprotected(msg.new_fd, &total_fd_set, &max_fd);
if (msg.signal_rm_complete) {
rem_fd_tmp_list.push_back(msg.new_fd);
rem_cvar.notify_one();
}
rxSockDebug("Socket fd=%d has been successfully removed", msg.new_fd);
break;
default:

View File

@ -115,7 +115,7 @@ private:
srsenb::stack_interface_s1ap_lte* stack = nullptr;
srsran::task_sched_handle task_sched;
srsran::unique_socket s1ap_socket;
srsran::unique_socket mme_socket;
struct sockaddr_in mme_addr = {}; // MME address
bool mme_connected = false;
bool running = false;

View File

@ -209,10 +209,10 @@ void s1ap::s1_setup_proc_t::then(const srsran::proc_state_t& result) const
s1ap_ptr->mme_connect_timer.duration() / 1000);
srsran::console("Failed to initiate S1 connection. Attempting reconnection in %d seconds\n",
s1ap_ptr->mme_connect_timer.duration() / 1000);
s1ap_ptr->mme_connect_timer.run();
s1ap_ptr->stack->remove_mme_socket(s1ap_ptr->s1ap_socket.get_socket());
s1ap_ptr->s1ap_socket.close();
s1ap_ptr->stack->remove_mme_socket(s1ap_ptr->mme_socket.get_socket());
s1ap_ptr->mme_socket.close();
procInfo("S1AP socket closed.");
s1ap_ptr->mme_connect_timer.run();
// Try again with in 10 seconds
}
}
@ -236,9 +236,10 @@ int s1ap::init(s1ap_args_t args_, rrc_interface_s1ap* rrc_, srsenb::stack_interf
// Setup MME reconnection timer
mme_connect_timer = task_sched.get_unique_timer();
auto mme_connect_run = [this](uint32_t tid) {
if (not s1setup_proc.launch()) {
if (s1setup_proc.is_busy()) {
logger.error("Failed to initiate S1Setup procedure.");
}
s1setup_proc.launch();
};
mme_connect_timer.set(10000, mme_connect_run);
// Setup S1Setup timeout
@ -263,7 +264,7 @@ int s1ap::init(s1ap_args_t args_, rrc_interface_s1ap* rrc_, srsenb::stack_interf
void s1ap::stop()
{
running = false;
s1ap_socket.close();
mme_socket.close();
}
void s1ap::get_metrics(s1ap_metrics_t& m)
@ -423,23 +424,23 @@ bool s1ap::is_mme_connected()
bool s1ap::connect_mme()
{
using namespace srsran::net_utils;
logger.info("Connecting to MME %s:%d", args.mme_addr.c_str(), int(MME_PORT));
// Init SCTP socket and bind it
if (not srsran::net_utils::sctp_init_client(
&s1ap_socket, srsran::net_utils::socket_type::seqpacket, args.s1c_bind_addr.c_str())) {
if (not sctp_init_client(&mme_socket, socket_type::seqpacket, args.s1c_bind_addr.c_str())) {
return false;
}
logger.info("SCTP socket opened. fd=%d", s1ap_socket.fd());
logger.info("SCTP socket opened. fd=%d", mme_socket.fd());
// Connect to the MME address
if (not s1ap_socket.connect_to(args.mme_addr.c_str(), MME_PORT, &mme_addr)) {
if (not mme_socket.connect_to(args.mme_addr.c_str(), MME_PORT, &mme_addr)) {
return false;
}
logger.info("SCTP socket connected with MME. fd=%d", s1ap_socket.fd());
logger.info("SCTP socket connected with MME. fd=%d", mme_socket.fd());
// Assign a handler to rx MME packets (going to run in a different thread)
stack->add_mme_socket(s1ap_socket.fd());
stack->add_mme_socket(mme_socket.fd());
logger.info("SCTP socket established with MME");
return true;
@ -498,26 +499,28 @@ bool s1ap::handle_mme_rx_msg(srsran::unique_byte_buffer_t pdu,
if (notification->sn_header.sn_type == SCTP_SHUTDOWN_EVENT) {
logger.info("SCTP Association Shutdown. Association: %d", sri.sinfo_assoc_id);
srsran::console("SCTP Association Shutdown. Association: %d\n", sri.sinfo_assoc_id);
stack->remove_mme_socket(s1ap_socket.get_socket());
s1ap_socket.close();
stack->remove_mme_socket(mme_socket.get_socket());
mme_socket.close();
} else if (notification->sn_header.sn_type == SCTP_PEER_ADDR_CHANGE &&
notification->sn_paddr_change.spc_state == SCTP_ADDR_UNREACHABLE) {
logger.info("SCTP peer addres unreachable. Association: %d", sri.sinfo_assoc_id);
srsran::console("SCTP peer address unreachable. Association: %d\n", sri.sinfo_assoc_id);
stack->remove_mme_socket(s1ap_socket.get_socket());
s1ap_socket.close();
stack->remove_mme_socket(mme_socket.get_socket());
mme_socket.close();
}
} else if (pdu->N_bytes == 0) {
logger.error("SCTP return 0 bytes. Closing socket");
s1ap_socket.close();
mme_socket.close();
}
// Restart MME connection procedure if we lost connection
if (not s1ap_socket.is_open()) {
if (not mme_socket.is_open()) {
mme_connected = false;
if (not s1setup_proc.launch()) {
logger.error("Failed to initiate MME connection procedure.");
if (s1setup_proc.is_busy()) {
logger.error("Failed to initiate MME connection procedure, as it is already running.");
return false;
}
s1setup_proc.launch();
return false;
}
@ -1596,7 +1599,7 @@ bool s1ap::sctp_send_s1ap_pdu(const asn1::s1ap::s1ap_pdu_c& tx_pdu, uint32_t rnt
}
uint16_t streamid = rnti == SRSRAN_INVALID_RNTI ? NONUE_STREAM_ID : users.find_ue_rnti(rnti)->stream_id;
ssize_t n_sent = sctp_sendmsg(s1ap_socket.fd(),
ssize_t n_sent = sctp_sendmsg(mme_socket.fd(),
buf->msg,
buf->N_bytes,
(struct sockaddr*)&mme_addr,