diff --git a/lib/include/srsran/adt/move_callback.h b/lib/include/srsran/adt/move_callback.h index 2e2d91422..8e8538347 100644 --- a/lib/include/srsran/adt/move_callback.h +++ b/lib/include/srsran/adt/move_callback.h @@ -45,11 +45,11 @@ template class oper_table_t { public: - constexpr oper_table_t() = default; - virtual R call(void* src, Args&&... args) const = 0; - virtual void move(void* src, void* dest) const = 0; - virtual void dtor(void* src) const = 0; - virtual bool is_in_small_buffer() const = 0; + constexpr oper_table_t() = default; + virtual R call(void* src, Args... args) const = 0; + virtual void move(void* src, void* dest) const = 0; + virtual void dtor(void* src) const = 0; + virtual bool is_in_small_buffer() const = 0; }; //! specialization of move/call/destroy operations for when the "move_callback" is empty @@ -58,7 +58,7 @@ class empty_table_t : public oper_table_t { public: constexpr empty_table_t() = default; - R call(void* src, Args&&... args) const final + R call(void* src, Args... args) const final { srsran_terminate("ERROR: bad function call (cause: function ptr is empty)"); } @@ -73,7 +73,7 @@ class smallbuffer_table_t : public oper_table_t { public: constexpr smallbuffer_table_t() = default; - R call(void* src, Args&&... args) const final { return (*static_cast(src))(std::forward(args)...); } + R call(void* src, Args... args) const final { return (*static_cast(src))(std::forward(args)...); } void move(void* src, void* dest) const final { ::new (dest) FunT(std::move(*static_cast(src))); @@ -89,7 +89,7 @@ class heap_table_t : public oper_table_t { public: constexpr heap_table_t() = default; - R call(void* src, Args&&... args) const final { return (**static_cast(src))(std::forward(args)...); } + R call(void* src, Args... args) const final { return (**static_cast(src))(std::forward(args)...); } void move(void* src, void* dest) const final { *static_cast(dest) = *static_cast(src); @@ -163,7 +163,7 @@ public: return *this; } - R operator()(Args&&... args) const noexcept { return oper_ptr->call(&buffer, std::forward(args)...); } + R operator()(Args... args) const noexcept { return oper_ptr->call(&buffer, std::forward(args)...); } bool is_empty() const { return oper_ptr == &empty_table; } bool is_in_small_buffer() const { return oper_ptr->is_in_small_buffer(); } diff --git a/lib/include/srsran/common/network_utils.h b/lib/include/srsran/common/network_utils.h index f3dbeab35..d22cbc48a 100644 --- a/lib/include/srsran/common/network_utils.h +++ b/lib/include/srsran/common/network_utils.h @@ -14,20 +14,18 @@ #define SRSRAN_RX_SOCKET_HANDLER_H #include "srsran/common/buffer_pool.h" +#include "srsran/common/multiqueue.h" #include "srsran/common/threads.h" #include -#include #include #include #include #include #include #include -#include #include #include -#include // for the pipe namespace srsran { @@ -97,40 +95,46 @@ bool sctp_init_server(unique_socket* socket, net_utils::socket_type socktype, co * Rx multisocket handler ***************************/ +class socket_manager_itf +{ +public: + /// Callback called when socket fd (passed as argument) has data + using recv_callback_t = srsran::move_callback; + + explicit socket_manager_itf(srslog::basic_logger& logger_) : logger(logger_) {} + socket_manager_itf(socket_manager_itf&&) = delete; + socket_manager_itf(const socket_manager_itf&) = delete; + socket_manager_itf& operator=(const socket_manager_itf&) = delete; + socket_manager_itf& operator=(socket_manager_itf&&) = delete; + virtual ~socket_manager_itf() = default; + + /// Register (fd, callback). callback is called within socket thread when fd has data. + virtual bool add_socket_handler(int fd, recv_callback_t handler) = 0; + + /// remove registered socket fd + virtual bool remove_socket(int fd) = 0; + +protected: + srslog::basic_logger& logger; +}; + /** * Description - Instantiates a thread that will block waiting for IO from multiple sockets, via a select * The user can register their own (socket fd, data handler) in this class via the * add_socket_handler(fd, task) API or its other variants */ -class rx_multisocket_handler final : public thread +class socket_manager final : public thread, public socket_manager_itf { -public: - // polymorphic callback to handle the socket recv - class recv_task - { - public: - virtual ~recv_task() = default; - virtual bool operator()(int fd) = 0; // returns false, if socket needs to be removed - }; - using task_callback_t = std::unique_ptr; - using recvfrom_callback_t = std::function; - using sctp_recv_callback_t = - std::function; + using recv_callback_t = socket_manager_itf::recv_callback_t; - rx_multisocket_handler(); - rx_multisocket_handler(rx_multisocket_handler&&) = delete; - rx_multisocket_handler(const rx_multisocket_handler&) = delete; - rx_multisocket_handler& operator=(const rx_multisocket_handler&) = delete; - rx_multisocket_handler& operator=(rx_multisocket_handler&&) = delete; - ~rx_multisocket_handler() final; +public: + socket_manager(); + ~socket_manager() final; void stop(); bool remove_socket_nonblocking(int fd, bool signal_completion = false); - bool remove_socket(int fd); - bool add_socket_handler(int fd, task_callback_t handler); - // convenience methods for recv using buffer pool - bool add_socket_pdu_handler(int fd, recvfrom_callback_t pdu_task); - bool add_socket_sctp_pdu_handler(int fd, sctp_recv_callback_t task); + bool remove_socket(int fd) final; + bool add_socket_handler(int fd, recv_callback_t handler) final; void run_thread() override; @@ -144,22 +148,43 @@ private: int new_fd = -1; bool signal_rm_complete = false; }; - std::map::iterator - remove_socket_unprotected(int fd, fd_set* total_fd_set, int* max_fd); - - // args - srslog::basic_logger& logger; + std::map::iterator remove_socket_unprotected(int fd, fd_set* total_fd_set, int* max_fd); // state std::mutex socket_mutex; - std::map active_sockets; + std::map active_sockets; bool running = false; - int pipefd[2] = {}; + int pipefd[2] = {-1, -1}; std::vector rem_fd_tmp_list; std::condition_variable rem_cvar; }; -rx_multisocket_handler& get_stack_socket_manager(); +/// Function signature for SDU byte buffers received from SCTP socket +using sctp_recv_callback_t = + srsran::move_callback; + +/// Function signature for SDU byte buffers received from any sockaddr_in-based socket +using recvfrom_callback_t = srsran::move_callback; + +/** + * Helper function that creates a callback that is called when a SCTP socket has data, and does the following tasks: + * 1. receive SDU byte buffer from SCTP socket and associated metadata - sockaddr_in, sctp_sndrcvinfo, flags + * 2. dispatches the received SDU+metadata+rx_callback into the "queue" + * 3. potentially on a separate thread, the SDU+metadata+callback are popped from the queue, and callback is called with + * the SDU+metadata as arguments + * @param logger logger used by recv_callback_t to log any failure/reception of an SDU + * @param queue queue to which the SDU+metadata+callback are going to be dispatched + * @param rx_callback callback that is run when a new SDU arrives, from the thread that calls queue.pop() + * @return callback void(int) that can be registered in socket_manager + */ +socket_manager_itf::recv_callback_t +make_sctp_sdu_handler(srslog::basic_logger& logger, srsran::task_queue_handle& queue, sctp_recv_callback_t rx_callback); + +/** + * Similar to make_sctp_sdu_handler, but for any sockaddr_in-based socket type + */ +socket_manager_itf::recv_callback_t +make_sdu_handler(srslog::basic_logger& logger, srsran::task_queue_handle& queue, recvfrom_callback_t rx_callback); } // namespace srsran diff --git a/lib/src/common/network_utils.cc b/lib/src/common/network_utils.cc index 88d4852e7..1645da978 100644 --- a/lib/src/common/network_utils.cc +++ b/lib/src/common/network_utils.cc @@ -15,6 +15,7 @@ #include #include #include +#include // for the pipe #define rxSockError(fmt, ...) logger.error("RxSockets: " fmt, ##__VA_ARGS__) #define rxSockWarn(fmt, ...) logger.warning("RxSockets: " fmt, ##__VA_ARGS__) @@ -321,110 +322,23 @@ bool sctp_init_server(unique_socket* socket, net_utils::socket_type socktype, co } // namespace net_utils -/*************************************************************** - * Rx Multisocket Task Types - **************************************************************/ - -/** - * Description: Specialization of recv_task for the case the received data is - * in the form of unique_byte_buffer, and a recvfrom(...) call is used - */ -class recvfrom_pdu_task final : public rx_multisocket_handler::recv_task -{ -public: - using callback_t = std::function; - explicit recvfrom_pdu_task(srslog::basic_logger& logger, callback_t func_) : logger(logger), func(std::move(func_)) {} - - bool operator()(int fd) override - { - srsran::unique_byte_buffer_t pdu = srsran::make_byte_buffer(); - if (pdu == nullptr) { - logger.error("Unable to allocate byte buffer"); - return true; - } - sockaddr_in from = {}; - socklen_t fromlen = sizeof(from); - - ssize_t n_recv = recvfrom(fd, pdu->msg, pdu->get_tailroom(), 0, (struct sockaddr*)&from, &fromlen); - if (n_recv == -1 and errno != EAGAIN) { - logger.error("Error reading from socket: %s", strerror(errno)); - return true; - } - if (n_recv == -1 and errno == EAGAIN) { - logger.debug("Socket timeout reached"); - return true; - } - - pdu->N_bytes = static_cast(n_recv); - func(std::move(pdu), from); - return true; - } - -private: - srslog::basic_logger& logger; - callback_t func; -}; - -class sctp_recvmsg_pdu_task final : public rx_multisocket_handler::recv_task -{ -public: - using callback_t = std::function< - void(srsran::unique_byte_buffer_t pdu, const sockaddr_in& from, const sctp_sndrcvinfo& sri, int flags)>; - explicit sctp_recvmsg_pdu_task(srslog::basic_logger& logger, callback_t func_) : - logger(logger), func(std::move(func_)) - {} - - bool operator()(int fd) override - { - // inside rx_sockets thread. Read socket - srsran::unique_byte_buffer_t pdu = srsran::make_byte_buffer(); - if (pdu == nullptr) { - logger.error("Unable to allocate byte buffer"); - return true; - } - sockaddr_in from = {}; - socklen_t fromlen = sizeof(from); - sctp_sndrcvinfo sri = {}; - int flags = 0; - ssize_t n_recv = sctp_recvmsg(fd, pdu->msg, pdu->get_tailroom(), (struct sockaddr*)&from, &fromlen, &sri, &flags); - if (n_recv == -1 and errno != EAGAIN) { - logger.error("Error reading from SCTP socket: %s", strerror(errno)); - return true; - } - if (n_recv == -1 and errno == EAGAIN) { - logger.debug("Socket timeout reached"); - return true; - } - - bool ret = true; - pdu->N_bytes = static_cast(n_recv); - // SCTP notifications handled in callback. - func(std::move(pdu), from, sri, flags); - return ret; - } - -private: - srslog::basic_logger& logger; - callback_t func; -}; - /*************************************************************** * Rx Multisocket Handler **************************************************************/ -rx_multisocket_handler::rx_multisocket_handler() : thread("RXsockets"), logger(srslog::fetch_basic_logger("COMN")) +socket_manager::socket_manager() : thread("RXsockets"), socket_manager_itf(srslog::fetch_basic_logger("COMN")) { // register control pipe fd srsran_assert(pipe(pipefd) != -1, "Failed to open control pipe"); start(thread_prio); } -rx_multisocket_handler::~rx_multisocket_handler() +socket_manager::~socket_manager() { stop(); } -void rx_multisocket_handler::stop() +void socket_manager::stop() { if (running) { // close thread @@ -449,27 +363,7 @@ void rx_multisocket_handler::stop() } } -/** - * Convenience method for read PDUs from socket - */ -bool rx_multisocket_handler::add_socket_pdu_handler(int fd, recvfrom_callback_t pdu_task) -{ - std::unique_ptr task; - task.reset(new srsran::recvfrom_pdu_task(logger, std::move(pdu_task))); - return add_socket_handler(fd, std::move(task)); -} - -/** - * Convenience method for reading PDUs from SCTP socket - */ -bool rx_multisocket_handler::add_socket_sctp_pdu_handler(int fd, sctp_recv_callback_t pdu_task) -{ - srsran::rx_multisocket_handler::task_callback_t task; - task.reset(new srsran::sctp_recvmsg_pdu_task(logger, std::move(pdu_task))); - return add_socket_handler(fd, std::move(task)); -} - -bool rx_multisocket_handler::add_socket_handler(int fd, task_callback_t handler) +bool socket_manager::add_socket_handler(int fd, recv_callback_t handler) { std::lock_guard lock(socket_mutex); if (fd < 0) { @@ -481,7 +375,7 @@ bool rx_multisocket_handler::add_socket_handler(int fd, task_callback_t handler) return false; } - active_sockets.insert(std::pair(fd, std::move(handler))); + active_sockets.insert(std::make_pair(fd, std::move(handler))); // this unlocks the reading thread to add new connections ctrl_cmd_t msg; @@ -496,7 +390,7 @@ bool rx_multisocket_handler::add_socket_handler(int fd, task_callback_t handler) return true; } -bool rx_multisocket_handler::remove_socket_nonblocking(int fd, bool signal_completion) +bool socket_manager::remove_socket_nonblocking(int fd, bool signal_completion) { std::lock_guard lock(socket_mutex); auto it = active_sockets.find(fd); @@ -516,7 +410,7 @@ bool rx_multisocket_handler::remove_socket_nonblocking(int fd, bool signal_compl return true; } -bool rx_multisocket_handler::remove_socket(int fd) +bool socket_manager::remove_socket(int fd) { bool result = remove_socket_nonblocking(fd, true); @@ -531,8 +425,8 @@ bool rx_multisocket_handler::remove_socket(int fd) return result; } -std::map::iterator -rx_multisocket_handler::remove_socket_unprotected(int fd, fd_set* total_fd_set, int* max_fd) +std::map::iterator +socket_manager::remove_socket_unprotected(int fd, fd_set* total_fd_set, int* max_fd) { if (fd < 0) { rxSockError("fd to be removed is not valid"); @@ -547,7 +441,7 @@ rx_multisocket_handler::remove_socket_unprotected(int fd, fd_set* total_fd_set, return it; } -void rx_multisocket_handler::run_thread() +void socket_manager::run_thread() { running = true; fd_set total_fd_set, read_fd_set; @@ -576,13 +470,13 @@ void rx_multisocket_handler::run_thread() // call read callback for all SCTP/TCP/UDP connections for (auto handler_it = active_sockets.begin(); handler_it != active_sockets.end();) { - int fd = handler_it->first; - recv_task* callback = handler_it->second.get(); + int fd = handler_it->first; + recv_callback_t& callback = handler_it->second; if (not FD_ISSET(fd, &read_fd_set)) { ++handler_it; continue; } - bool socket_valid = callback->operator()(fd); + bool socket_valid = callback(fd); if (not socket_valid) { rxSockInfo("The socket fd=%d has been closed by peer", fd); handler_it = remove_socket_unprotected(fd, &total_fd_set, &max_fd); @@ -626,10 +520,115 @@ void rx_multisocket_handler::run_thread() } } -rx_multisocket_handler& get_stack_socket_manager() +/*************************************************************** + * Rx Multisocket Task Types + **************************************************************/ + +class sctp_recvmsg_pdu_task { - static rx_multisocket_handler handler; - return handler; +public: + using callback_t = sctp_recv_callback_t; + + explicit sctp_recvmsg_pdu_task(srslog::basic_logger& logger, srsran::task_queue_handle& queue_, callback_t func_) : + logger(logger), queue(queue_), func(std::move(func_)) + {} + + bool operator()(int fd) + { + // inside rx_sockets thread. Read socket + srsran::unique_byte_buffer_t pdu = srsran::make_byte_buffer(); + if (pdu == nullptr) { + logger.error("Unable to allocate byte buffer"); + return true; + } + sockaddr_in from = {}; + socklen_t fromlen = sizeof(from); + sctp_sndrcvinfo sri = {}; + int flags = 0; + ssize_t n_recv = sctp_recvmsg(fd, pdu->msg, pdu->get_tailroom(), (struct sockaddr*)&from, &fromlen, &sri, &flags); + if (n_recv == -1 and errno != EAGAIN) { + logger.error("Error reading from SCTP socket: %s", strerror(errno)); + return true; + } + if (n_recv == -1 and errno == EAGAIN) { + logger.debug("Socket timeout reached"); + return true; + } + + bool ret = true; + pdu->N_bytes = static_cast(n_recv); + + // Defer handling of received packet to provided queue + // SCTP notifications handled in callback. + queue.push(std::bind( + [this, from, sri, flags](srsran::unique_byte_buffer_t& sdu) { func(std::move(sdu), from, sri, flags); }, + std::move(pdu))); + return ret; + } + +private: + srslog::basic_logger& logger; + srsran::task_queue_handle& queue; + callback_t func; +}; + +socket_manager_itf::recv_callback_t +make_sctp_sdu_handler(srslog::basic_logger& logger, srsran::task_queue_handle& queue, sctp_recv_callback_t rx_callback) +{ + return socket_manager_itf::recv_callback_t(sctp_recvmsg_pdu_task(logger, queue, std::move(rx_callback))); +} + +/** + * Description: Functor for the case the received data is + * in the form of unique_byte_buffer, and a recvfrom(...) call is used + */ +class recvfrom_pdu_task +{ +public: + using callback_t = recvfrom_callback_t; + explicit recvfrom_pdu_task(srslog::basic_logger& logger, srsran::task_queue_handle& queue_, callback_t func_) : + logger(logger), queue(queue_), func(std::move(func_)) + {} + + bool operator()(int fd) + { + srsran::unique_byte_buffer_t pdu = srsran::make_byte_buffer(); + if (pdu == nullptr) { + logger.error("Unable to allocate byte buffer"); + return true; + } + sockaddr_in from = {}; + socklen_t fromlen = sizeof(from); + + ssize_t n_recv = recvfrom(fd, pdu->msg, pdu->get_tailroom(), 0, (struct sockaddr*)&from, &fromlen); + if (n_recv == -1 and errno != EAGAIN) { + logger.error("Error reading from socket: %s", strerror(errno)); + return true; + } + if (n_recv == -1 and errno == EAGAIN) { + logger.debug("Socket timeout reached"); + return true; + } + + pdu->N_bytes = static_cast(n_recv); + + // Defer handling of received packet to provided queue + queue.push( + std::bind([this, from](srsran::unique_byte_buffer_t& sdu) { func(std::move(sdu), from); }, std::move(pdu))); + + return true; + } + +private: + srslog::basic_logger& logger; + srsran::task_queue_handle& queue; + callback_t func; +}; + +socket_manager_itf::recv_callback_t +make_sdu_handler(srslog::basic_logger& logger, srsran::task_queue_handle& queue, recvfrom_callback_t rx_callback) +{ + return socket_manager_itf::recv_callback_t(recvfrom_pdu_task(logger, queue, std::move(rx_callback))); } } // namespace srsran diff --git a/lib/test/common/network_utils_test.cc b/lib/test/common/network_utils_test.cc index 9e3c1ec2b..9e0dcbe57 100644 --- a/lib/test/common/network_utils_test.cc +++ b/lib/test/common/network_utils_test.cc @@ -11,15 +11,31 @@ */ #include "srsran/common/network_utils.h" +#include "srsran/common/task_scheduler.h" +#include "srsran/common/test_common.h" +#include #include -#define TESTASSERT(cond) \ - do { \ - if (!(cond)) { \ - std::cout << "[" << __FUNCTION__ << "][Line " << __LINE__ << "]: FAIL at " << (#cond) << std::endl; \ - return -1; \ - } \ - } while (0) +struct rx_thread_tester { + srsran::task_scheduler task_sched; + srsran::task_queue_handle task_queue; + std::atomic stop_token; + std::thread t; + + rx_thread_tester() : + task_queue(task_sched.make_task_queue()), t([this]() { + while (not stop_token.load()) { + task_sched.run_pending_tasks(); + std::this_thread::yield(); + } + }) + {} + ~rx_thread_tester() + { + stop_token.store(true); + t.join(); + } +}; int test_socket_handler() { @@ -27,10 +43,10 @@ int test_socket_handler() int counter = 0; - srsran::unique_socket server_socket, client_socket, client_socket2; - srsran::rx_multisocket_handler sockhandler; - int server_port = 36412; - const char* server_addr = "127.0.100.1"; + srsran::unique_socket server_socket, client_socket, client_socket2; + srsran::socket_manager sockhandler; + int server_port = 36412; + const char* server_addr = "127.0.100.1"; using namespace srsran::net_utils; TESTASSERT(sctp_init_server(&server_socket, socket_type::seqpacket, server_addr, server_port)); @@ -50,7 +66,9 @@ int test_socket_handler() counter++; } }; - sockhandler.add_socket_sctp_pdu_handler(server_socket.fd(), pdu_handler); + rx_thread_tester rx_tester; + sockhandler.add_socket_handler(server_socket.fd(), + srsran::make_sctp_sdu_handler(logger, rx_tester.task_queue, pdu_handler)); uint8_t buf[128] = {}; int32_t nof_counts = 5; diff --git a/srsenb/hdr/stack/enb_stack_lte.h b/srsenb/hdr/stack/enb_stack_lte.h index 609658572..ff6f255c6 100644 --- a/srsenb/hdr/stack/enb_stack_lte.h +++ b/srsenb/hdr/stack/enb_stack_lte.h @@ -105,15 +105,13 @@ private: void run_thread() override; void stop_impl(); void tti_clock_impl(); - void handle_mme_rx_packet(srsran::unique_byte_buffer_t pdu, - const sockaddr_in& from, - const sctp_sndrcvinfo& sri, - int flags); // args stack_args_t args = {}; rrc_cfg_t rrc_cfg = {}; + srsran::socket_manager rx_sockets; + srslog::basic_logger& mac_logger; srslog::basic_logger& rlc_logger; srslog::basic_logger& pdcp_logger; diff --git a/srsenb/hdr/stack/upper/s1ap.h b/srsenb/hdr/stack/upper/s1ap.h index c6eed8f6f..20b66e89d 100644 --- a/srsenb/hdr/stack/upper/s1ap.h +++ b/srsenb/hdr/stack/upper/s1ap.h @@ -52,7 +52,9 @@ public: static const uint32_t ts1_reloc_prep_timeout_ms = 10000; static const uint32_t ts1_reloc_overall_timeout_ms = 10000; - s1ap(srsran::task_sched_handle task_sched_, srslog::basic_logger& logger); + s1ap(srsran::task_sched_handle task_sched_, + srslog::basic_logger& logger, + srsran::socket_manager_itf* rx_socket_handler); int init(s1ap_args_t args_, rrc_interface_s1ap* rrc_); void stop(); void get_metrics(s1ap_metrics_t& m); @@ -109,11 +111,12 @@ private: static const int NONUE_STREAM_ID = 0; // args - rrc_interface_s1ap* rrc = nullptr; - s1ap_args_t args; - srslog::basic_logger& logger; - srsran::task_sched_handle task_sched; - srsran::task_queue_handle mme_task_queue; + rrc_interface_s1ap* rrc = nullptr; + s1ap_args_t args; + srslog::basic_logger& logger; + srsran::task_sched_handle task_sched; + srsran::task_queue_handle mme_task_queue; + srsran::socket_manager_itf* rx_socket_handler; srsran::unique_socket mme_socket; struct sockaddr_in mme_addr = {}; // MME address diff --git a/srsenb/src/stack/enb_stack_lte.cc b/srsenb/src/stack/enb_stack_lte.cc index dd57f4b98..ea8042db7 100644 --- a/srsenb/src/stack/enb_stack_lte.cc +++ b/srsenb/src/stack/enb_stack_lte.cc @@ -34,7 +34,7 @@ enb_stack_lte::enb_stack_lte(srslog::sink& log_sink) : mac(&task_sched, mac_logger), rlc(rlc_logger), gtpu(&task_sched, gtpu_logger), - s1ap(&task_sched, s1ap_logger), + s1ap(&task_sched, s1ap_logger, &rx_sockets), rrc(&task_sched), mac_pcap(), pending_stack_metrics(64) @@ -166,7 +166,7 @@ void enb_stack_lte::stop() void enb_stack_lte::stop_impl() { - srsran::get_stack_socket_manager().stop(); + rx_sockets.stop(); s1ap.stop(); gtpu.stop(); @@ -225,39 +225,21 @@ void enb_stack_lte::run_thread() } } -void enb_stack_lte::handle_mme_rx_packet(srsran::unique_byte_buffer_t pdu, - const sockaddr_in& from, - const sctp_sndrcvinfo& sri, - int flags) -{ - // Defer the handling of MME packet to eNB stack main thread - auto task_handler = [this, from, sri, flags](srsran::unique_byte_buffer_t& t) { - s1ap.handle_mme_rx_msg(std::move(t), from, sri, flags); - }; - // Defer the handling of MME packet to main stack thread - mme_task_queue.push(std::bind(task_handler, std::move(pdu))); -} - void enb_stack_lte::add_gtpu_s1u_socket_handler(int fd) { auto gtpu_s1u_handler = [this](srsran::unique_byte_buffer_t pdu, const sockaddr_in& from) { - auto task_handler = [this, from](srsran::unique_byte_buffer_t& t) { - gtpu.handle_gtpu_s1u_rx_packet(std::move(t), from); - }; - gtpu_task_queue.push(std::bind(task_handler, std::move(pdu))); + gtpu.handle_gtpu_s1u_rx_packet(std::move(pdu), from); }; - srsran::get_stack_socket_manager().add_socket_pdu_handler(fd, gtpu_s1u_handler); + + rx_sockets.add_socket_handler(fd, srsran::make_sdu_handler(gtpu_logger, gtpu_task_queue, gtpu_s1u_handler)); } void enb_stack_lte::add_gtpu_m1u_socket_handler(int fd) { auto gtpu_m1u_handler = [this](srsran::unique_byte_buffer_t pdu, const sockaddr_in& from) { - auto task_handler = [this, from](srsran::unique_byte_buffer_t& t) { - gtpu.handle_gtpu_m1u_rx_packet(std::move(t), from); - }; - gtpu_task_queue.push(std::bind(task_handler, std::move(pdu))); + gtpu.handle_gtpu_m1u_rx_packet(std::move(pdu), from); }; - srsran::get_stack_socket_manager().add_socket_pdu_handler(fd, gtpu_m1u_handler); + rx_sockets.add_socket_handler(fd, srsran::make_sdu_handler(gtpu_logger, gtpu_task_queue, gtpu_m1u_handler)); } } // namespace srsenb diff --git a/srsenb/src/stack/upper/s1ap.cc b/srsenb/src/stack/upper/s1ap.cc index 8f93c29a1..2ffc62b02 100644 --- a/srsenb/src/stack/upper/s1ap.cc +++ b/srsenb/src/stack/upper/s1ap.cc @@ -209,7 +209,7 @@ void s1ap::s1_setup_proc_t::then(const srsran::proc_state_t& result) const s1ap_ptr->mme_connect_timer.duration() / 1000); srsran::console("Failed to initiate S1 connection. Attempting reconnection in %d seconds\n", s1ap_ptr->mme_connect_timer.duration() / 1000); - srsran::get_stack_socket_manager().remove_socket(s1ap_ptr->mme_socket.get_socket()); + s1ap_ptr->rx_socket_handler->remove_socket(s1ap_ptr->mme_socket.get_socket()); s1ap_ptr->mme_socket.close(); procInfo("S1AP socket closed."); s1ap_ptr->mme_connect_timer.run(); @@ -221,8 +221,10 @@ void s1ap::s1_setup_proc_t::then(const srsran::proc_state_t& result) const * S1AP class *********************************************************/ -s1ap::s1ap(srsran::task_sched_handle task_sched_, srslog::basic_logger& logger) : - s1setup_proc(this), logger(logger), task_sched(task_sched_) +s1ap::s1ap(srsran::task_sched_handle task_sched_, + srslog::basic_logger& logger, + srsran::socket_manager_itf* rx_socket_handler_) : + s1setup_proc(this), logger(logger), task_sched(task_sched_), rx_socket_handler(rx_socket_handler_) { mme_task_queue = task_sched.make_task_queue(); } @@ -423,24 +425,6 @@ bool s1ap::is_mme_connected() /* S1AP connection helpers ********************************************************************************/ -/// Callback that will run inside the Sockets thread, and is going to be called whenever a SDU is received from the MME -struct sctp_rx_packet_handler { - s1ap* s1ap_ptr; - srsran::task_queue_handle* task_queue; - - sctp_rx_packet_handler(s1ap* ptr, srsran::task_queue_handle& task_queue_) : s1ap_ptr(ptr), task_queue(&task_queue_) {} - - void operator()(srsran::unique_byte_buffer_t pdu, const sockaddr_in& from, const sctp_sndrcvinfo& sri, int flags) - { - // Defer the handling of MME packet to eNB stack main thread - auto packet_handler = [this, from, sri, flags](srsran::unique_byte_buffer_t& t) { - s1ap_ptr->handle_mme_rx_msg(std::move(t), from, sri, flags); - }; - // Defer the handling of MME packet to main stack thread - task_queue->push(std::bind(packet_handler, std::move(pdu))); - } -}; - bool s1ap::connect_mme() { using namespace srsran::net_utils; @@ -458,9 +442,14 @@ bool s1ap::connect_mme() } logger.info("SCTP socket connected with MME. fd=%d", mme_socket.fd()); - // Assign a handler to rx MME packets (going to run in a different thread) - srsran::get_stack_socket_manager().add_socket_sctp_pdu_handler(mme_socket.fd(), - sctp_rx_packet_handler(this, mme_task_queue)); + // Assign a handler to rx MME packets + auto rx_callback = + [this](srsran::unique_byte_buffer_t pdu, const sockaddr_in& from, const sctp_sndrcvinfo& sri, int flags) { + // Defer the handling of MME packet to eNB stack main thread + handle_mme_rx_msg(std::move(pdu), from, sri, flags); + }; + rx_socket_handler->add_socket_handler(mme_socket.fd(), + srsran::make_sctp_sdu_handler(logger, mme_task_queue, rx_callback)); logger.info("SCTP socket established with MME"); return true; @@ -519,13 +508,13 @@ bool s1ap::handle_mme_rx_msg(srsran::unique_byte_buffer_t pdu, if (notification->sn_header.sn_type == SCTP_SHUTDOWN_EVENT) { logger.info("SCTP Association Shutdown. Association: %d", sri.sinfo_assoc_id); srsran::console("SCTP Association Shutdown. Association: %d\n", sri.sinfo_assoc_id); - srsran::get_stack_socket_manager().remove_socket(mme_socket.get_socket()); + rx_socket_handler->remove_socket(mme_socket.get_socket()); mme_socket.close(); } else if (notification->sn_header.sn_type == SCTP_PEER_ADDR_CHANGE && notification->sn_paddr_change.spc_state == SCTP_ADDR_UNREACHABLE) { logger.info("SCTP peer addres unreachable. Association: %d", sri.sinfo_assoc_id); srsran::console("SCTP peer address unreachable. Association: %d\n", sri.sinfo_assoc_id); - srsran::get_stack_socket_manager().remove_socket(mme_socket.get_socket()); + rx_socket_handler->remove_socket(mme_socket.get_socket()); mme_socket.close(); } } else if (pdu->N_bytes == 0) { diff --git a/srsenb/test/upper/s1ap_test.cc b/srsenb/test/upper/s1ap_test.cc index 18406c244..b4adf6205 100644 --- a/srsenb/test/upper/s1ap_test.cc +++ b/srsenb/test/upper/s1ap_test.cc @@ -62,6 +62,34 @@ struct mme_dummy { srsran::unique_byte_buffer_t last_sdu; }; +struct dummy_socket_manager : public srsran::socket_manager_itf { + dummy_socket_manager() : srsran::socket_manager_itf(srslog::fetch_basic_logger("TEST")) {} + + /// Register (fd, callback). callback is called within socket thread when fd has data. + bool add_socket_handler(int fd, recv_callback_t handler) final + { + if (s1u_fd > 0) { + return false; + } + s1u_fd = fd; + callback = std::move(handler); + return true; + } + + /// remove registered socket fd + bool remove_socket(int fd) final + { + if (s1u_fd < 0) { + return false; + } + s1u_fd = -1; + return true; + } + + int s1u_fd; + recv_callback_t callback; +}; + struct rrc_tester : public rrc_dummy { void modify_erabs(uint16_t rnti, const asn1::s1ap::erab_modify_request_s& msg, @@ -162,7 +190,8 @@ void test_s1ap_erab_setup(test_event event) { srsran::task_scheduler task_sched; srslog::basic_logger& logger = srslog::fetch_basic_logger("S1AP"); - s1ap s1ap_obj(&task_sched, logger); + dummy_socket_manager rx_sockets; + s1ap s1ap_obj(&task_sched, logger, &rx_sockets); rrc_tester rrc; asn1::s1ap::s1ap_pdu_c s1ap_pdu; srsran::unique_byte_buffer_t sdu;