diff --git a/lib/include/srslte/common/network_utils.h b/lib/include/srslte/common/network_utils.h new file mode 100644 index 000000000..84f7e122e --- /dev/null +++ b/lib/include/srslte/common/network_utils.h @@ -0,0 +1,144 @@ +/* + * Copyright 2013-2019 Software Radio Systems Limited + * + * This file is part of srsLTE. + * + * srsLTE is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * srsLTE is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * A copy of the GNU Affero General Public License can be found in + * the LICENSE file in the top-level directory of this distribution + * and at http://www.gnu.org/licenses/. + * + */ + +#ifndef SRSLTE_RX_SOCKET_HANDLER_H +#define SRSLTE_RX_SOCKET_HANDLER_H + +#include "srslte/common/buffer_pool.h" +#include "srslte/common/threads.h" + +#include +#include +#include +#include +#include +#include +#include +#include // for the pipe + +namespace srslte { + +class rx_sctp_socket_ref; + +/** + * @brief handles the lifetime of a SCTP socket and provides convenience methods for listening/connecting, and read/send + */ +class sctp_socket +{ +public: + sctp_socket(); + sctp_socket(sctp_socket&&) noexcept; + sctp_socket(const sctp_socket&) = delete; + ~sctp_socket(); + sctp_socket& operator=(sctp_socket&&) noexcept; + sctp_socket& operator=(const sctp_socket&) = delete; + + void reset(); + int listen_addr(const char* bind_addr_str, int port); + int connect_addr(const char* bind_addr_str, const char* dest_addr_str, int dest_port); + + int read(void* buf, + ssize_t nbytes, + struct sockaddr_in* from = nullptr, + socklen_t fromlen = sizeof(sockaddr_in), + struct sctp_sndrcvinfo* sinfo = nullptr, + int msg_flags = 0); + int send(void* buf, ssize_t nbytes, uint32_t ppid, uint32_t stream_id); + + const struct sockaddr_in& get_sockaddr_in() const { return addr_in; } + int fd() const { return sockfd; } + operator rx_sctp_socket_ref(); ///< cast to rx_sctp_socket_ref is safe + +private: + int create_socket(); + int bind_addr(const char* bind_addr_str, int port = 0); + + int sockfd = -1; + struct sockaddr_in addr_in; + struct sockaddr_in dest_addr; +}; + +/** + * @brief The rx_sctp_socket_ref class is a safe inteface/handler for receiving SCTP packets + * it basically forbids the user from trying to reset the socket while it is still + * registered to the rx_multisocket_handler for instance. + */ +class rx_sctp_socket_ref +{ +public: + rx_sctp_socket_ref(sctp_socket* sock_) : sock(sock_) {} + int read(void* buf, + ssize_t nbytes, + struct sockaddr_in* from = nullptr, + socklen_t fromlen = sizeof(sockaddr_in), + struct sctp_sndrcvinfo* sinfo = nullptr, + int msg_flags = 0) + { + return sock->read(buf, nbytes, from, fromlen, sinfo, msg_flags); + } + int fd() const { return sock->fd(); } + +private: + sctp_socket* sock = nullptr; +}; + +class rx_multisocket_handler final : public thread +{ +public: + using callback_t = std::function; + + rx_multisocket_handler(std::string name_, srslte::log* log_); + rx_multisocket_handler(rx_multisocket_handler&&) = delete; + rx_multisocket_handler(const rx_multisocket_handler&) = delete; + rx_multisocket_handler& operator=(const rx_multisocket_handler&) = delete; + rx_multisocket_handler& operator=(const rx_multisocket_handler&&) = delete; + ~rx_multisocket_handler(); + + bool register_sctp_socket(rx_sctp_socket_ref sock, callback_t recv_handler_); + + void run_thread() override; + +private: + const static int THREAD_PRIO = 65; + // used to unlock select + struct ctrl_cmd_t { + enum class cmd_id_t { EXIT, NEW_FD }; + cmd_id_t cmd = cmd_id_t::EXIT; + int new_fd = -1; + }; + struct sctp_handler_t { + callback_t callback; + rx_sctp_socket_ref sctp_ptr; + }; + // args + std::string name; + srslte::log* log_h = nullptr; + + // state + std::mutex socket_mutex; + std::map active_sctp_sockets; + bool running = false; + int pipefd[2] = {}; +}; + +} // namespace srslte + +#endif // SRSLTE_RX_SOCKET_HANDLER_H diff --git a/lib/include/srslte/common/rx_socket_handler.h b/lib/include/srslte/common/rx_socket_handler.h deleted file mode 100644 index 2facb5b45..000000000 --- a/lib/include/srslte/common/rx_socket_handler.h +++ /dev/null @@ -1,220 +0,0 @@ -/* - * Copyright 2013-2019 Software Radio Systems Limited - * - * This file is part of srsLTE. - * - * srsLTE is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of - * the License, or (at your option) any later version. - * - * srsLTE is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * A copy of the GNU Affero General Public License can be found in - * the LICENSE file in the top-level directory of this distribution - * and at http://www.gnu.org/licenses/. - * - */ - -#ifndef SRSLTE_RX_SOCKET_HANDLER_H -#define SRSLTE_RX_SOCKET_HANDLER_H - -#include "srslte/common/buffer_pool.h" -#include "srslte/common/threads.h" - -#include -#include -#include -#include -#include -#include -#include -#include // for the pipe - -namespace srslte { - -class rx_socket_handler final : public thread -{ -public: - struct sctp_packet_t { - ssize_t rd_sz = 0; - srslte::unique_byte_buffer_t buf; - struct sockaddr_in from; - socklen_t fromlen = sizeof(from); - struct sctp_sndrcvinfo sinfo; - int msg_flags = 0; - sctp_packet_t() : from{}, sinfo{} {} - }; - using sctp_callback_t = std::function; - - rx_socket_handler(std::string name_, srslte::log* log_) : thread(name_), log_h(log_), name(std::move(name_)) - { - pool = byte_buffer_pool::get_instance(); - - // register control pipe fd - if (pipe(pipefd) == -1) { - log_h->error("%s: Failed to open control pipe\n", name.c_str()); - return; - } - start(THREAD_PRIO); - } - rx_socket_handler(rx_socket_handler&&) = delete; - rx_socket_handler(const rx_socket_handler&) = delete; - rx_socket_handler& operator=(const rx_socket_handler&) = delete; - rx_socket_handler& operator=(const rx_socket_handler&&) = delete; - - ~rx_socket_handler() - { - if (running) { - std::lock_guard lock(socket_mutex); - ctrl_msg_t msg{}; - msg.cmd = ctrl_msg_t::cmd_t::EXIT; - log_h->debug("%s: Closing socket handler\n", name.c_str()); - if (write(pipefd[1], &msg, sizeof(msg)) != sizeof(msg)) { - log_h->error("%s: while writing to control pipe\n", name.c_str()); - } - } - - // close thread - wait_thread_finish(); - - close(pipefd[0]); - close(pipefd[1]); - - // close all sockets - for (auto& handler_pair : active_sctp_sockets) { - if (close(handler_pair.first) == -1) { - log_h->error("Failed to close socket fd=%d\n", handler_pair.first); - } - } - - log_h->debug("%s: closed.\n", name.c_str()); - } - - void register_sctp_socket(int fd_, sctp_callback_t recv_handler_) - { - std::lock_guard lock(socket_mutex); - if (active_sctp_sockets.count(fd_) > 0) { - log_h->error("SOCKET: Tried to register fd=%d, but this fd already exists\n", fd_); - return; - } - if (fd_ < 0) { - log_h->error("%s: Provided fd=%d can\'t be negative\n", name.c_str(), fd_); - return; - } - - active_sctp_sockets.insert(std::make_pair(fd_, std::move(recv_handler_))); - - ctrl_msg_t msg; - msg.cmd = ctrl_msg_t::cmd_t::NEW_FD; - msg.new_fd = fd_; - if (write(pipefd[1], &msg, sizeof(msg)) != sizeof(msg)) { - log_h->error("%s: while writing to control pipe\n", name.c_str()); - } - - log_h->debug("%s: socket fd=%d has been registered.\n", name.c_str(), fd_); - } - - void run_thread() override - { - srslte::unique_byte_buffer_t pdu = srslte::allocate_unique_buffer(*pool, true); - const uint32_t sz = pdu->get_tailroom(); - running = true; - fd_set total_fd_set, read_fd_set; - FD_ZERO(&total_fd_set); - int max_fd = 0; - - FD_SET(pipefd[0], &total_fd_set); - max_fd = std::max(pipefd[0], max_fd); - - while (running) { - memcpy(&read_fd_set, &total_fd_set, sizeof(total_fd_set)); - int n = select(max_fd + 1, &read_fd_set, nullptr, nullptr, nullptr); - - std::lock_guard lock(socket_mutex); - - // handle select return - if (n == -1) { - log_h->error("%s: Error from select", name.c_str()); - continue; - } - if (n == 0) { - log_h->debug("%s: No data from select.\n", name.c_str()); - continue; - } - - // handle sctp messages - for (auto& handler_pair : active_sctp_sockets) { - if (not FD_ISSET(handler_pair.first, &read_fd_set)) { - continue; - } - sctp_packet_t packet; - packet.rd_sz = sctp_recvmsg(handler_pair.first, - pdu->msg, - sz, - (struct sockaddr*)&packet.from, - &packet.fromlen, - &packet.sinfo, - &packet.msg_flags); - if (packet.rd_sz > 0) { - pdu->N_bytes = static_cast(packet.rd_sz); - packet.buf = std::move(pdu); - pdu = srslte::allocate_unique_buffer(*pool, true); - handler_pair.second(std::move(packet)); - } else { - log_h->error("%s: Unable to read from sctp socket fd=%d\n", name.c_str(), handler_pair.first); - perror(name.c_str()); - } - } - - // TODO: For UDP as well - - // handle ctrl messages - if (FD_ISSET(pipefd[0], &read_fd_set)) { - ctrl_msg_t msg; - ssize_t nrd = read(pipefd[0], &msg, sizeof(msg)); - if (nrd < 0) { - log_h->error("%s: unable to read control message.\n", name.c_str()); - continue; - } - switch (msg.cmd) { - case ctrl_msg_t::cmd_t::EXIT: - running = false; - return; - case ctrl_msg_t::cmd_t::NEW_FD: - if (msg.new_fd >= 0) { - FD_SET(msg.new_fd, &total_fd_set); - max_fd = std::max(max_fd, msg.new_fd); - } else { - log_h->error("%s: added fd is not valid\n", name.c_str()); - } - break; - default: - log_h->error("%s: ctrl message command %d is not valid\n", name.c_str(), (int)msg.cmd); - } - } - } - } - -private: - const static int THREAD_PRIO = 65; - std::string name; - srslte::log* log_h = nullptr; - srslte::byte_buffer_pool* pool = nullptr; - std::mutex socket_mutex; - std::map active_sctp_sockets; - bool running = false; - int pipefd[2] = {}; - struct ctrl_msg_t { - enum class cmd_t { EXIT, NEW_FD }; - cmd_t cmd = cmd_t::EXIT; - int new_fd = -1; - }; -}; - -} // namespace srslte - -#endif // SRSLTE_RX_SOCKET_HANDLER_H diff --git a/lib/src/common/network_utils.cc b/lib/src/common/network_utils.cc new file mode 100644 index 000000000..3325ff747 --- /dev/null +++ b/lib/src/common/network_utils.cc @@ -0,0 +1,324 @@ +/* + * Copyright 2013-2019 Software Radio Systems Limited + * + * This file is part of srsLTE. + * + * srsLTE is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * srsLTE is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * A copy of the GNU Affero General Public License can be found in + * the LICENSE file in the top-level directory of this distribution + * and at http://www.gnu.org/licenses/. + * + */ + +#include "srslte/common/network_utils.h" + +#include +#include + +#define rxSockError(fmt, ...) log_h->error("%s: " fmt, name.c_str(), ##__VA_ARGS__) +#define rxSockInfo(fmt, ...) log_h->info("%s: " fmt, name.c_str(), ##__VA_ARGS__) +#define rxSockDebug(fmt, ...) log_h->debug("%s: " fmt, name.c_str(), ##__VA_ARGS__) + +namespace srslte { + +sctp_socket::sctp_socket() +{ + bzero(&addr_in, sizeof(addr_in)); + bzero(&dest_addr, sizeof(dest_addr)); +} + +sctp_socket::sctp_socket(sctp_socket&& other) noexcept +{ + sockfd = other.sockfd; + memcpy(&addr_in, &other.addr_in, sizeof(addr_in)); + // reset other without calling close + other.sockfd = -1; + bzero(&other.addr_in, sizeof(other.addr_in)); +} + +sctp_socket::~sctp_socket() +{ + reset(); +} + +sctp_socket& sctp_socket::operator=(sctp_socket&& other) noexcept +{ + if (this == &other) { + return *this; + } + sockfd = other.sockfd; + memcpy(&addr_in, &other.addr_in, sizeof(addr_in)); + other.sockfd = -1; + bzero(&other.addr_in, sizeof(other.addr_in)); + return *this; +} + +void sctp_socket::reset() +{ + if (sockfd >= 0) { + close(sockfd); + } + bzero(&addr_in, sizeof(addr_in)); + bzero(&dest_addr, sizeof(dest_addr)); +} + +int sctp_socket::listen_addr(const char* bind_addr_str, int port) +{ + if (sockfd < 0) { + if (create_socket()) { + return -1; + } + } + + // Sets the data_io_event to be able to use sendrecv_info + // Subscribes to the SCTP_SHUTDOWN event, to handle graceful shutdown + struct sctp_event_subscribe evnts; + bzero(&evnts, sizeof(evnts)); + evnts.sctp_data_io_event = 1; + evnts.sctp_shutdown_event = 1; + if (setsockopt(sockfd, IPPROTO_SCTP, SCTP_EVENTS, &evnts, sizeof(evnts))) { + perror("setsockopt"); + reset(); + return -1; + } + + // bind addr + if (bind_addr(bind_addr_str, port)) { + reset(); + return -1; + } + + // Listen for connections + if (listen(sockfd, SOMAXCONN)) { + perror("listen"); + reset(); + return -1; + } + + return 0; +} + +int sctp_socket::connect_addr(const char* bind_addr_str, const char* dest_addr_str, int dest_port) +{ + if (sockfd < 0) { + if (bind_addr(bind_addr_str, 0)) { + return -1; + } + } + + dest_addr.sin_family = AF_INET; + dest_addr.sin_port = htons(dest_port); + if (inet_pton(AF_INET, dest_addr_str, &(dest_addr.sin_addr)) != 1) { + perror("inet_pton()"); + return -1; + } + if (connect(sockfd, (struct sockaddr*)&dest_addr, sizeof(dest_addr)) == -1) { + perror("connect()"); + return -1; + } + + return 0; +} + +int sctp_socket::read(void* buf, + ssize_t nbytes, + struct sockaddr_in* from, + socklen_t fromlen, + struct sctp_sndrcvinfo* sinfo, + int msg_flags) +{ + int rd_sz = sctp_recvmsg(sockfd, buf, nbytes, (struct sockaddr*)from, &fromlen, sinfo, &msg_flags); + if (rd_sz <= 0) { + perror("sctp read"); + } + return rd_sz; +} + +int sctp_socket::send(void* buf, ssize_t nbytes, uint32_t ppid, uint32_t stream_id) +{ + return sctp_sendmsg( + sockfd, buf, nbytes, (struct sockaddr*)&dest_addr, sizeof(dest_addr), htonl(ppid), 0, stream_id, 0, 0); +} + +sctp_socket::operator rx_sctp_socket_ref() +{ + return rx_sctp_socket_ref(this); +} + +// Private Methods + +int sctp_socket::bind_addr(const char* bind_addr_str, int port) +{ + if (sockfd < 0) { + if (create_socket()) { + return -1; + } + } + + addr_in.sin_family = AF_INET; + if (inet_pton(AF_INET, bind_addr_str, &(addr_in.sin_addr)) != 1) { + perror("inet_pton"); + return -1; + } + addr_in.sin_port = (port != 0) ? htons(port) : 0; + if (bind(sockfd, (struct sockaddr*)&addr_in, sizeof(addr_in))) { + perror("bind()"); + return -1; + } + return 0; +} + +int sctp_socket::create_socket() +{ + sockfd = socket(AF_INET, SOCK_SEQPACKET, IPPROTO_SCTP); + if (sockfd == -1) { + perror("Could not create SCTP socket\n"); + return -1; + } + return 0; +} + +/*************************************************************** + * Rx Multisocket Handler + **************************************************************/ + +rx_multisocket_handler::rx_multisocket_handler(std::string name_, srslte::log* log_) : + thread(name_), + name(std::move(name_)), + log_h(log_) +{ + // register control pipe fd + if (pipe(pipefd) == -1) { + rxSockInfo("Failed to open control pipe\n"); + return; + } + start(THREAD_PRIO); +} + +rx_multisocket_handler::~rx_multisocket_handler() +{ + if (running) { + std::lock_guard lock(socket_mutex); + ctrl_cmd_t msg{}; + msg.cmd = ctrl_cmd_t::cmd_id_t::EXIT; + rxSockDebug("Closing socket handler\n"); + if (write(pipefd[1], &msg, sizeof(msg)) != sizeof(msg)) { + rxSockError("while writing to control pipe\n"); + } + } + + // close thread + wait_thread_finish(); + + close(pipefd[0]); + close(pipefd[1]); + + // close all sockets + for (auto& handler_pair : active_sctp_sockets) { + if (close(handler_pair.first) == -1) { + rxSockError("Failed to close socket fd=%d\n", handler_pair.first); + } + } + + rxSockDebug("closed.\n"); +} + +bool rx_multisocket_handler::register_sctp_socket(rx_sctp_socket_ref sock, callback_t recv_handler_) +{ + std::lock_guard lock(socket_mutex); + if (sock.fd() < 0) { + rxSockError("Provided SCTP socket must be already open\n"); + return false; + } + if (active_sctp_sockets.count(sock.fd()) > 0) { + rxSockError("Tried to register fd=%d, but this fd already exists\n", sock.fd()); + return false; + } + + active_sctp_sockets.insert(std::make_pair(sock.fd(), sctp_handler_t{std::move(recv_handler_), sock})); + + // this unlocks the reading thread to add new connections + ctrl_cmd_t msg; + msg.cmd = ctrl_cmd_t::cmd_id_t::NEW_FD; + msg.new_fd = sock.fd(); + if (write(pipefd[1], &msg, sizeof(msg)) != sizeof(msg)) { + rxSockError("while writing to control pipe\n"); + } + + rxSockDebug("socket fd=%d has been registered.\n", sock.fd()); + return true; +} + +void rx_multisocket_handler::run_thread() +{ + running = true; + fd_set total_fd_set, read_fd_set; + FD_ZERO(&total_fd_set); + int max_fd = 0; + + FD_SET(pipefd[0], &total_fd_set); + max_fd = std::max(pipefd[0], max_fd); + + while (running) { + memcpy(&read_fd_set, &total_fd_set, sizeof(total_fd_set)); + int n = select(max_fd + 1, &read_fd_set, nullptr, nullptr, nullptr); + + // handle select return + if (n == -1) { + rxSockError("Error from select()"); + continue; + } + if (n == 0) { + rxSockDebug("No data from select.\n"); + continue; + } + + std::lock_guard lock(socket_mutex); + + // call read callback for all SCTP connections + for (auto& handler_pair : active_sctp_sockets) { + if (not FD_ISSET(handler_pair.first, &read_fd_set)) { + continue; + } + handler_pair.second.callback(handler_pair.second.sctp_ptr); + } + + // TODO: For TCP and UDP + + // handle ctrl messages + if (FD_ISSET(pipefd[0], &read_fd_set)) { + ctrl_cmd_t msg; + ssize_t nrd = read(pipefd[0], &msg, sizeof(msg)); + if (nrd <= 0) { + rxSockError("Unable to read control message.\n"); + continue; + } + switch (msg.cmd) { + case ctrl_cmd_t::cmd_id_t::EXIT: + running = false; + return; + case ctrl_cmd_t::cmd_id_t::NEW_FD: + if (msg.new_fd >= 0) { + FD_SET(msg.new_fd, &total_fd_set); + max_fd = std::max(max_fd, msg.new_fd); + } else { + rxSockError("added fd is not valid\n"); + } + break; + default: + rxSockError("ctrl message command %d is not valid\n", (int)msg.cmd); + } + } + } +} + +} // namespace srslte diff --git a/lib/test/common/CMakeLists.txt b/lib/test/common/CMakeLists.txt index f63c510c5..02953a5eb 100644 --- a/lib/test/common/CMakeLists.txt +++ b/lib/test/common/CMakeLists.txt @@ -79,5 +79,5 @@ add_test(queue_test queue_test) add_executable(timer_test timer_test.cc) target_link_libraries(timer_test srslte_common) -add_executable(rx_socket_handler_test rx_socket_handler_test.cc) -target_link_libraries(rx_socket_handler_test srslte_common ${CMAKE_THREAD_LIBS_INIT} ${SCTP_LIBRARIES}) \ No newline at end of file +add_executable(network_utils_test network_utils_test.cc) +target_link_libraries(network_utils_test srslte_common ${CMAKE_THREAD_LIBS_INIT} ${SCTP_LIBRARIES}) diff --git a/lib/test/common/network_utils_test.cc b/lib/test/common/network_utils_test.cc new file mode 100644 index 000000000..ee1a3f8e7 --- /dev/null +++ b/lib/test/common/network_utils_test.cc @@ -0,0 +1,91 @@ +/* + * Copyright 2013-2019 Software Radio Systems Limited + * + * This file is part of srsLTE. + * + * srsLTE is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * srsLTE is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * A copy of the GNU Affero General Public License can be found in + * the LICENSE file in the top-level directory of this distribution + * and at http://www.gnu.org/licenses/. + * + */ + +#include "srslte/common/log_filter.h" +#include "srslte/common/network_utils.h" +#include + +#define TESTASSERT(cond) \ + do { \ + if (!(cond)) { \ + std::cout << "[" << __FUNCTION__ << "][Line " << __LINE__ << "]: FAIL at " << (#cond) << std::endl; \ + return -1; \ + } \ + } while (0) + +int test_socket_handler() +{ + srslte::log_filter log("S1AP"); + log.set_level(srslte::LOG_LEVEL_DEBUG); + log.set_hex_limit(128); + + int counter = 0; + srslte::byte_buffer_pool* pool = srslte::byte_buffer_pool::get_instance(); + + srslte::sctp_socket server_sock, client_sock; + srslte::rx_multisocket_handler sockhandler("RXSOCKETS", &log); + + TESTASSERT(server_sock.listen_addr("127.0.100.1", 36412) == 0); + log.info("Listening from fd=%d\n", server_sock.fd()); + + TESTASSERT(client_sock.connect_addr("127.0.0.1", "127.0.100.1", 36412) == 0); + + sockhandler.register_sctp_socket(server_sock, [pool, &log, &counter](srslte::rx_sctp_socket_ref sock) { + srslte::unique_byte_buffer_t pdu = srslte::allocate_unique_buffer(*pool, true); + int rd_sz = sock.read(pdu->msg, pdu->get_tailroom()); + if (rd_sz > 0) { + pdu->N_bytes = rd_sz; + log.info_hex(pdu->msg, pdu->N_bytes, "Received msg:"); + counter++; + } + }); + + int PPID = 18; + const int NONUE_STREAM_ID = 0; + + uint8_t buf[128] = {}; + int32_t nof_counts = 5; + for (int32_t i = 0; i < nof_counts; ++i) { + buf[i] = i; + ssize_t n_sent = client_sock.send(buf, i + 1, PPID, NONUE_STREAM_ID); + TESTASSERT(n_sent >= 0); + usleep(1000); + log.info("Message %d sent.\n", i); + } + + uint32_t time_elapsed = 0; + while (counter != nof_counts) { + usleep(100); + time_elapsed += 100; + if (time_elapsed > 3000000) { + // too much time has passed + return -1; + } + } + + return 0; +} + +int main() +{ + TESTASSERT(test_socket_handler() == 0); + return 0; +} diff --git a/lib/test/common/rx_socket_handler_test.cc b/lib/test/common/rx_socket_handler_test.cc deleted file mode 100644 index 61c2cbc52..000000000 --- a/lib/test/common/rx_socket_handler_test.cc +++ /dev/null @@ -1,179 +0,0 @@ -/* - * Copyright 2013-2019 Software Radio Systems Limited - * - * This file is part of srsLTE. - * - * srsLTE is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of - * the License, or (at your option) any later version. - * - * srsLTE is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * A copy of the GNU Affero General Public License can be found in - * the LICENSE file in the top-level directory of this distribution - * and at http://www.gnu.org/licenses/. - * - */ - -#include "srslte/common/log_filter.h" -#include "srslte/common/rx_socket_handler.h" -#include - -#define TESTASSERT(cond) \ - do { \ - if (!(cond)) { \ - std::cout << "[" << __FUNCTION__ << "][Line " << __LINE__ << "]: FAIL at " << (#cond) << std::endl; \ - return -1; \ - } \ - } while (0) - -struct sockaddr_in local_addr; -struct sockaddr_in listen_addr; - -int socket_listen(srslte::log* log_h, const char* listen_addr_str, int listen_port) -{ - /*This function sets up the SCTP socket for eNBs to connect to*/ - int sock_fd, err; - struct sctp_event_subscribe evnts; - - sock_fd = socket(AF_INET, SOCK_SEQPACKET, IPPROTO_SCTP); - if (sock_fd == -1) { - log_h->error("Could not create SCTP socket\n"); - return -1; - } - - // Sets the data_io_event to be able to use sendrecv_info - // Subscribes to the SCTP_SHUTDOWN event, to handle graceful shutdown - bzero(&evnts, sizeof(evnts)); - evnts.sctp_data_io_event = 1; - evnts.sctp_shutdown_event = 1; - if (setsockopt(sock_fd, IPPROTO_SCTP, SCTP_EVENTS, &evnts, sizeof(evnts))) { - close(sock_fd); - log_h->console("Subscribing to sctp_data_io_events failed\n"); - return -1; - } - - // S1-MME bind - bzero(&listen_addr, sizeof(listen_addr)); - listen_addr.sin_family = AF_INET; - inet_pton(AF_INET, listen_addr_str, &(listen_addr.sin_addr)); - listen_addr.sin_port = htons(listen_port); - err = bind(sock_fd, (struct sockaddr*)&listen_addr, sizeof(listen_addr)); - if (err != 0) { - close(sock_fd); - log_h->error("Error binding SCTP socket\n"); - return -1; - } - - // Listen for connections - err = listen(sock_fd, SOMAXCONN); - if (err != 0) { - close(sock_fd); - log_h->error("Error in SCTP socket listen\n"); - return -1; - } - - return sock_fd; -} - -int create_fd(srslte::log* log_h, const char* bind_addr_str = "127.0.0.1") -{ - int socket_fd = -1; - const int ADDR_FAMILY = AF_INET; - - if ((socket_fd = socket(ADDR_FAMILY, SOCK_STREAM, IPPROTO_SCTP)) == -1) { - log_h->error("Failed to create S1AP socket\n"); - perror("socket()"); - goto exit_fail; - } - - // Bind to the local address - memset(&local_addr, 0, sizeof(struct sockaddr_in)); - local_addr.sin_family = ADDR_FAMILY; - local_addr.sin_port = 0; // Any local port will do - if (inet_pton(AF_INET, bind_addr_str, &(local_addr.sin_addr)) != 1) { - log_h->error("Error converting IP address (%s) to sockaddr_in structure\n", bind_addr_str); - goto exit_fail; - } - if (bind(socket_fd, (struct sockaddr*)&local_addr, sizeof(local_addr)) != 0) { - log_h->error("Failed to bind on S1-C address %s: %s errno %d\n", bind_addr_str, strerror(errno), errno); - perror("bind()"); - goto exit_fail; - } - - if (connect(socket_fd, (struct sockaddr*)&listen_addr, sizeof(listen_addr)) == -1) { - log_h->error("Failed to establish socket connection to Remote\n"); - goto exit_fail; - } - - log_h->info("Connected to remote\n"); - return socket_fd; - -exit_fail: - if (socket_fd >= 0) { - close(socket_fd); - socket_fd = -1; - } - return -1; -} - -int test_socket_handler() -{ - srslte::log_filter log("S1AP"); - log.set_level(srslte::LOG_LEVEL_DEBUG); - log.set_hex_limit(128); - - int counter = 0; - srslte::byte_buffer_pool* pool = srslte::byte_buffer_pool::get_instance(); - srslte::rx_socket_handler sockhandler("RXSOCKETS", &log); - - int listen_fd = socket_listen(&log, "127.0.100.1", 36412); - log.info("Listening from %d\n", listen_fd); - - int fd = create_fd(&log, "127.0.0.1"); - TESTASSERT(fd >= 0); - - int PPID = 18; - const int NONUE_STREAM_ID = 0; - - sockhandler.register_sctp_socket(listen_fd, [&counter, &log](srslte::rx_socket_handler::sctp_packet_t&& packet) { - log.info("Received %ld bytes\n", packet.rd_sz); - if (packet.buf != nullptr) { - log.info_hex(packet.buf->msg, packet.buf->N_bytes, "Received msg:"); - counter++; - } - }); - - srslte::unique_byte_buffer_t buf = srslte::allocate_unique_buffer(*pool, true); - for (uint32_t i = 0; i < 5; ++i) { - buf->N_bytes = i + 1; - buf->msg[i] = i; - ssize_t n_sent = sctp_sendmsg(fd, - buf->msg, - buf->N_bytes, - (struct sockaddr*)&listen_addr, - sizeof(struct sockaddr_in), - htonl(PPID), - 0, - NONUE_STREAM_ID, - 0, - 0); - TESTASSERT(n_sent == buf->N_bytes); - usleep(1000); - log.info("Message %d sent.\n", i); - } - - sleep(1); - - return 0; -} - -int main() -{ - TESTASSERT(test_socket_handler() == 0); - return 0; -}