From 131b1a7e41e20f37f4ce3a695e5982c02390872d Mon Sep 17 00:00:00 2001 From: Francisco Paisana Date: Mon, 11 Nov 2019 19:50:16 +0000 Subject: [PATCH] added a class to handle multiple sockets via a select --- lib/include/srslte/common/rx_socket_handler.h | 220 ++++++++++++++++++ lib/include/srslte/common/threads.h | 18 +- lib/test/common/CMakeLists.txt | 3 + lib/test/common/rx_socket_handler_test.cc | 179 ++++++++++++++ 4 files changed, 413 insertions(+), 7 deletions(-) create mode 100644 lib/include/srslte/common/rx_socket_handler.h create mode 100644 lib/test/common/rx_socket_handler_test.cc diff --git a/lib/include/srslte/common/rx_socket_handler.h b/lib/include/srslte/common/rx_socket_handler.h new file mode 100644 index 000000000..2facb5b45 --- /dev/null +++ b/lib/include/srslte/common/rx_socket_handler.h @@ -0,0 +1,220 @@ +/* + * Copyright 2013-2019 Software Radio Systems Limited + * + * This file is part of srsLTE. + * + * srsLTE is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * srsLTE is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * A copy of the GNU Affero General Public License can be found in + * the LICENSE file in the top-level directory of this distribution + * and at http://www.gnu.org/licenses/. + * + */ + +#ifndef SRSLTE_RX_SOCKET_HANDLER_H +#define SRSLTE_RX_SOCKET_HANDLER_H + +#include "srslte/common/buffer_pool.h" +#include "srslte/common/threads.h" + +#include +#include +#include +#include +#include +#include +#include +#include // for the pipe + +namespace srslte { + +class rx_socket_handler final : public thread +{ +public: + struct sctp_packet_t { + ssize_t rd_sz = 0; + srslte::unique_byte_buffer_t buf; + struct sockaddr_in from; + socklen_t fromlen = sizeof(from); + struct sctp_sndrcvinfo sinfo; + int msg_flags = 0; + sctp_packet_t() : from{}, sinfo{} {} + }; + using sctp_callback_t = std::function; + + rx_socket_handler(std::string name_, srslte::log* log_) : thread(name_), log_h(log_), name(std::move(name_)) + { + pool = byte_buffer_pool::get_instance(); + + // register control pipe fd + if (pipe(pipefd) == -1) { + log_h->error("%s: Failed to open control pipe\n", name.c_str()); + return; + } + start(THREAD_PRIO); + } + rx_socket_handler(rx_socket_handler&&) = delete; + rx_socket_handler(const rx_socket_handler&) = delete; + rx_socket_handler& operator=(const rx_socket_handler&) = delete; + rx_socket_handler& operator=(const rx_socket_handler&&) = delete; + + ~rx_socket_handler() + { + if (running) { + std::lock_guard lock(socket_mutex); + ctrl_msg_t msg{}; + msg.cmd = ctrl_msg_t::cmd_t::EXIT; + log_h->debug("%s: Closing socket handler\n", name.c_str()); + if (write(pipefd[1], &msg, sizeof(msg)) != sizeof(msg)) { + log_h->error("%s: while writing to control pipe\n", name.c_str()); + } + } + + // close thread + wait_thread_finish(); + + close(pipefd[0]); + close(pipefd[1]); + + // close all sockets + for (auto& handler_pair : active_sctp_sockets) { + if (close(handler_pair.first) == -1) { + log_h->error("Failed to close socket fd=%d\n", handler_pair.first); + } + } + + log_h->debug("%s: closed.\n", name.c_str()); + } + + void register_sctp_socket(int fd_, sctp_callback_t recv_handler_) + { + std::lock_guard lock(socket_mutex); + if (active_sctp_sockets.count(fd_) > 0) { + log_h->error("SOCKET: Tried to register fd=%d, but this fd already exists\n", fd_); + return; + } + if (fd_ < 0) { + log_h->error("%s: Provided fd=%d can\'t be negative\n", name.c_str(), fd_); + return; + } + + active_sctp_sockets.insert(std::make_pair(fd_, std::move(recv_handler_))); + + ctrl_msg_t msg; + msg.cmd = ctrl_msg_t::cmd_t::NEW_FD; + msg.new_fd = fd_; + if (write(pipefd[1], &msg, sizeof(msg)) != sizeof(msg)) { + log_h->error("%s: while writing to control pipe\n", name.c_str()); + } + + log_h->debug("%s: socket fd=%d has been registered.\n", name.c_str(), fd_); + } + + void run_thread() override + { + srslte::unique_byte_buffer_t pdu = srslte::allocate_unique_buffer(*pool, true); + const uint32_t sz = pdu->get_tailroom(); + running = true; + fd_set total_fd_set, read_fd_set; + FD_ZERO(&total_fd_set); + int max_fd = 0; + + FD_SET(pipefd[0], &total_fd_set); + max_fd = std::max(pipefd[0], max_fd); + + while (running) { + memcpy(&read_fd_set, &total_fd_set, sizeof(total_fd_set)); + int n = select(max_fd + 1, &read_fd_set, nullptr, nullptr, nullptr); + + std::lock_guard lock(socket_mutex); + + // handle select return + if (n == -1) { + log_h->error("%s: Error from select", name.c_str()); + continue; + } + if (n == 0) { + log_h->debug("%s: No data from select.\n", name.c_str()); + continue; + } + + // handle sctp messages + for (auto& handler_pair : active_sctp_sockets) { + if (not FD_ISSET(handler_pair.first, &read_fd_set)) { + continue; + } + sctp_packet_t packet; + packet.rd_sz = sctp_recvmsg(handler_pair.first, + pdu->msg, + sz, + (struct sockaddr*)&packet.from, + &packet.fromlen, + &packet.sinfo, + &packet.msg_flags); + if (packet.rd_sz > 0) { + pdu->N_bytes = static_cast(packet.rd_sz); + packet.buf = std::move(pdu); + pdu = srslte::allocate_unique_buffer(*pool, true); + handler_pair.second(std::move(packet)); + } else { + log_h->error("%s: Unable to read from sctp socket fd=%d\n", name.c_str(), handler_pair.first); + perror(name.c_str()); + } + } + + // TODO: For UDP as well + + // handle ctrl messages + if (FD_ISSET(pipefd[0], &read_fd_set)) { + ctrl_msg_t msg; + ssize_t nrd = read(pipefd[0], &msg, sizeof(msg)); + if (nrd < 0) { + log_h->error("%s: unable to read control message.\n", name.c_str()); + continue; + } + switch (msg.cmd) { + case ctrl_msg_t::cmd_t::EXIT: + running = false; + return; + case ctrl_msg_t::cmd_t::NEW_FD: + if (msg.new_fd >= 0) { + FD_SET(msg.new_fd, &total_fd_set); + max_fd = std::max(max_fd, msg.new_fd); + } else { + log_h->error("%s: added fd is not valid\n", name.c_str()); + } + break; + default: + log_h->error("%s: ctrl message command %d is not valid\n", name.c_str(), (int)msg.cmd); + } + } + } + } + +private: + const static int THREAD_PRIO = 65; + std::string name; + srslte::log* log_h = nullptr; + srslte::byte_buffer_pool* pool = nullptr; + std::mutex socket_mutex; + std::map active_sctp_sockets; + bool running = false; + int pipefd[2] = {}; + struct ctrl_msg_t { + enum class cmd_t { EXIT, NEW_FD }; + cmd_t cmd = cmd_t::EXIT; + int new_fd = -1; + }; +}; + +} // namespace srslte + +#endif // SRSLTE_RX_SOCKET_HANDLER_H diff --git a/lib/include/srslte/common/threads.h b/lib/include/srslte/common/threads.h index 7dfa1e93b..b289dca57 100644 --- a/lib/include/srslte/common/threads.h +++ b/lib/include/srslte/common/threads.h @@ -50,14 +50,18 @@ class thread { public: thread(const std::string& name_) : _thread(0), name(name_) {} - thread(const thread&) = delete; - thread(thread&&) noexcept = default; - thread& operator=(const thread&) = delete; - thread& operator=(thread&&) noexcept = default; - bool start(int prio = -1) { return threads_new_rt_prio(&_thread, thread_function_entry, this, prio); } - bool start_cpu(int prio, int cpu) { - return threads_new_rt_cpu(&_thread, thread_function_entry, this, cpu, prio); + thread(const thread&) = delete; + thread(thread&& other) noexcept + { + _thread = other._thread; + name = std::move(other.name); + other._thread = 0; + other.name = ""; } + thread& operator=(const thread&) = delete; + thread& operator=(thread&&) noexcept = delete; + bool start(int prio = -1) { return threads_new_rt_prio(&_thread, thread_function_entry, this, prio); } + bool start_cpu(int prio, int cpu) { return threads_new_rt_cpu(&_thread, thread_function_entry, this, cpu, prio); } bool start_cpu_mask(int prio, int mask) { return threads_new_rt_mask(&_thread, thread_function_entry, this, mask, prio); diff --git a/lib/test/common/CMakeLists.txt b/lib/test/common/CMakeLists.txt index e179b8f7e..f63c510c5 100644 --- a/lib/test/common/CMakeLists.txt +++ b/lib/test/common/CMakeLists.txt @@ -78,3 +78,6 @@ add_test(queue_test queue_test) add_executable(timer_test timer_test.cc) target_link_libraries(timer_test srslte_common) + +add_executable(rx_socket_handler_test rx_socket_handler_test.cc) +target_link_libraries(rx_socket_handler_test srslte_common ${CMAKE_THREAD_LIBS_INIT} ${SCTP_LIBRARIES}) \ No newline at end of file diff --git a/lib/test/common/rx_socket_handler_test.cc b/lib/test/common/rx_socket_handler_test.cc new file mode 100644 index 000000000..61c2cbc52 --- /dev/null +++ b/lib/test/common/rx_socket_handler_test.cc @@ -0,0 +1,179 @@ +/* + * Copyright 2013-2019 Software Radio Systems Limited + * + * This file is part of srsLTE. + * + * srsLTE is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * srsLTE is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * A copy of the GNU Affero General Public License can be found in + * the LICENSE file in the top-level directory of this distribution + * and at http://www.gnu.org/licenses/. + * + */ + +#include "srslte/common/log_filter.h" +#include "srslte/common/rx_socket_handler.h" +#include + +#define TESTASSERT(cond) \ + do { \ + if (!(cond)) { \ + std::cout << "[" << __FUNCTION__ << "][Line " << __LINE__ << "]: FAIL at " << (#cond) << std::endl; \ + return -1; \ + } \ + } while (0) + +struct sockaddr_in local_addr; +struct sockaddr_in listen_addr; + +int socket_listen(srslte::log* log_h, const char* listen_addr_str, int listen_port) +{ + /*This function sets up the SCTP socket for eNBs to connect to*/ + int sock_fd, err; + struct sctp_event_subscribe evnts; + + sock_fd = socket(AF_INET, SOCK_SEQPACKET, IPPROTO_SCTP); + if (sock_fd == -1) { + log_h->error("Could not create SCTP socket\n"); + return -1; + } + + // Sets the data_io_event to be able to use sendrecv_info + // Subscribes to the SCTP_SHUTDOWN event, to handle graceful shutdown + bzero(&evnts, sizeof(evnts)); + evnts.sctp_data_io_event = 1; + evnts.sctp_shutdown_event = 1; + if (setsockopt(sock_fd, IPPROTO_SCTP, SCTP_EVENTS, &evnts, sizeof(evnts))) { + close(sock_fd); + log_h->console("Subscribing to sctp_data_io_events failed\n"); + return -1; + } + + // S1-MME bind + bzero(&listen_addr, sizeof(listen_addr)); + listen_addr.sin_family = AF_INET; + inet_pton(AF_INET, listen_addr_str, &(listen_addr.sin_addr)); + listen_addr.sin_port = htons(listen_port); + err = bind(sock_fd, (struct sockaddr*)&listen_addr, sizeof(listen_addr)); + if (err != 0) { + close(sock_fd); + log_h->error("Error binding SCTP socket\n"); + return -1; + } + + // Listen for connections + err = listen(sock_fd, SOMAXCONN); + if (err != 0) { + close(sock_fd); + log_h->error("Error in SCTP socket listen\n"); + return -1; + } + + return sock_fd; +} + +int create_fd(srslte::log* log_h, const char* bind_addr_str = "127.0.0.1") +{ + int socket_fd = -1; + const int ADDR_FAMILY = AF_INET; + + if ((socket_fd = socket(ADDR_FAMILY, SOCK_STREAM, IPPROTO_SCTP)) == -1) { + log_h->error("Failed to create S1AP socket\n"); + perror("socket()"); + goto exit_fail; + } + + // Bind to the local address + memset(&local_addr, 0, sizeof(struct sockaddr_in)); + local_addr.sin_family = ADDR_FAMILY; + local_addr.sin_port = 0; // Any local port will do + if (inet_pton(AF_INET, bind_addr_str, &(local_addr.sin_addr)) != 1) { + log_h->error("Error converting IP address (%s) to sockaddr_in structure\n", bind_addr_str); + goto exit_fail; + } + if (bind(socket_fd, (struct sockaddr*)&local_addr, sizeof(local_addr)) != 0) { + log_h->error("Failed to bind on S1-C address %s: %s errno %d\n", bind_addr_str, strerror(errno), errno); + perror("bind()"); + goto exit_fail; + } + + if (connect(socket_fd, (struct sockaddr*)&listen_addr, sizeof(listen_addr)) == -1) { + log_h->error("Failed to establish socket connection to Remote\n"); + goto exit_fail; + } + + log_h->info("Connected to remote\n"); + return socket_fd; + +exit_fail: + if (socket_fd >= 0) { + close(socket_fd); + socket_fd = -1; + } + return -1; +} + +int test_socket_handler() +{ + srslte::log_filter log("S1AP"); + log.set_level(srslte::LOG_LEVEL_DEBUG); + log.set_hex_limit(128); + + int counter = 0; + srslte::byte_buffer_pool* pool = srslte::byte_buffer_pool::get_instance(); + srslte::rx_socket_handler sockhandler("RXSOCKETS", &log); + + int listen_fd = socket_listen(&log, "127.0.100.1", 36412); + log.info("Listening from %d\n", listen_fd); + + int fd = create_fd(&log, "127.0.0.1"); + TESTASSERT(fd >= 0); + + int PPID = 18; + const int NONUE_STREAM_ID = 0; + + sockhandler.register_sctp_socket(listen_fd, [&counter, &log](srslte::rx_socket_handler::sctp_packet_t&& packet) { + log.info("Received %ld bytes\n", packet.rd_sz); + if (packet.buf != nullptr) { + log.info_hex(packet.buf->msg, packet.buf->N_bytes, "Received msg:"); + counter++; + } + }); + + srslte::unique_byte_buffer_t buf = srslte::allocate_unique_buffer(*pool, true); + for (uint32_t i = 0; i < 5; ++i) { + buf->N_bytes = i + 1; + buf->msg[i] = i; + ssize_t n_sent = sctp_sendmsg(fd, + buf->msg, + buf->N_bytes, + (struct sockaddr*)&listen_addr, + sizeof(struct sockaddr_in), + htonl(PPID), + 0, + NONUE_STREAM_ID, + 0, + 0); + TESTASSERT(n_sent == buf->N_bytes); + usleep(1000); + log.info("Message %d sent.\n", i); + } + + sleep(1); + + return 0; +} + +int main() +{ + TESTASSERT(test_socket_handler() == 0); + return 0; +}