added a class to handle multiple sockets via a select

This commit is contained in:
Francisco Paisana 2019-11-11 19:50:16 +00:00
parent 98ac39e617
commit 131b1a7e41
4 changed files with 413 additions and 7 deletions

View File

@ -0,0 +1,220 @@
/*
* Copyright 2013-2019 Software Radio Systems Limited
*
* This file is part of srsLTE.
*
* srsLTE is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* srsLTE is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* A copy of the GNU Affero General Public License can be found in
* the LICENSE file in the top-level directory of this distribution
* and at http://www.gnu.org/licenses/.
*
*/
#ifndef SRSLTE_RX_SOCKET_HANDLER_H
#define SRSLTE_RX_SOCKET_HANDLER_H
#include "srslte/common/buffer_pool.h"
#include "srslte/common/threads.h"
#include <functional>
#include <map>
#include <mutex>
#include <netinet/sctp.h>
#include <queue>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h> // for the pipe
namespace srslte {
class rx_socket_handler final : public thread
{
public:
struct sctp_packet_t {
ssize_t rd_sz = 0;
srslte::unique_byte_buffer_t buf;
struct sockaddr_in from;
socklen_t fromlen = sizeof(from);
struct sctp_sndrcvinfo sinfo;
int msg_flags = 0;
sctp_packet_t() : from{}, sinfo{} {}
};
using sctp_callback_t = std::function<void(sctp_packet_t&&)>;
rx_socket_handler(std::string name_, srslte::log* log_) : thread(name_), log_h(log_), name(std::move(name_))
{
pool = byte_buffer_pool::get_instance();
// register control pipe fd
if (pipe(pipefd) == -1) {
log_h->error("%s: Failed to open control pipe\n", name.c_str());
return;
}
start(THREAD_PRIO);
}
rx_socket_handler(rx_socket_handler&&) = delete;
rx_socket_handler(const rx_socket_handler&) = delete;
rx_socket_handler& operator=(const rx_socket_handler&) = delete;
rx_socket_handler& operator=(const rx_socket_handler&&) = delete;
~rx_socket_handler()
{
if (running) {
std::lock_guard<std::mutex> lock(socket_mutex);
ctrl_msg_t msg{};
msg.cmd = ctrl_msg_t::cmd_t::EXIT;
log_h->debug("%s: Closing socket handler\n", name.c_str());
if (write(pipefd[1], &msg, sizeof(msg)) != sizeof(msg)) {
log_h->error("%s: while writing to control pipe\n", name.c_str());
}
}
// close thread
wait_thread_finish();
close(pipefd[0]);
close(pipefd[1]);
// close all sockets
for (auto& handler_pair : active_sctp_sockets) {
if (close(handler_pair.first) == -1) {
log_h->error("Failed to close socket fd=%d\n", handler_pair.first);
}
}
log_h->debug("%s: closed.\n", name.c_str());
}
void register_sctp_socket(int fd_, sctp_callback_t recv_handler_)
{
std::lock_guard<std::mutex> lock(socket_mutex);
if (active_sctp_sockets.count(fd_) > 0) {
log_h->error("SOCKET: Tried to register fd=%d, but this fd already exists\n", fd_);
return;
}
if (fd_ < 0) {
log_h->error("%s: Provided fd=%d can\'t be negative\n", name.c_str(), fd_);
return;
}
active_sctp_sockets.insert(std::make_pair(fd_, std::move(recv_handler_)));
ctrl_msg_t msg;
msg.cmd = ctrl_msg_t::cmd_t::NEW_FD;
msg.new_fd = fd_;
if (write(pipefd[1], &msg, sizeof(msg)) != sizeof(msg)) {
log_h->error("%s: while writing to control pipe\n", name.c_str());
}
log_h->debug("%s: socket fd=%d has been registered.\n", name.c_str(), fd_);
}
void run_thread() override
{
srslte::unique_byte_buffer_t pdu = srslte::allocate_unique_buffer(*pool, true);
const uint32_t sz = pdu->get_tailroom();
running = true;
fd_set total_fd_set, read_fd_set;
FD_ZERO(&total_fd_set);
int max_fd = 0;
FD_SET(pipefd[0], &total_fd_set);
max_fd = std::max(pipefd[0], max_fd);
while (running) {
memcpy(&read_fd_set, &total_fd_set, sizeof(total_fd_set));
int n = select(max_fd + 1, &read_fd_set, nullptr, nullptr, nullptr);
std::lock_guard<std::mutex> lock(socket_mutex);
// handle select return
if (n == -1) {
log_h->error("%s: Error from select", name.c_str());
continue;
}
if (n == 0) {
log_h->debug("%s: No data from select.\n", name.c_str());
continue;
}
// handle sctp messages
for (auto& handler_pair : active_sctp_sockets) {
if (not FD_ISSET(handler_pair.first, &read_fd_set)) {
continue;
}
sctp_packet_t packet;
packet.rd_sz = sctp_recvmsg(handler_pair.first,
pdu->msg,
sz,
(struct sockaddr*)&packet.from,
&packet.fromlen,
&packet.sinfo,
&packet.msg_flags);
if (packet.rd_sz > 0) {
pdu->N_bytes = static_cast<uint32_t>(packet.rd_sz);
packet.buf = std::move(pdu);
pdu = srslte::allocate_unique_buffer(*pool, true);
handler_pair.second(std::move(packet));
} else {
log_h->error("%s: Unable to read from sctp socket fd=%d\n", name.c_str(), handler_pair.first);
perror(name.c_str());
}
}
// TODO: For UDP as well
// handle ctrl messages
if (FD_ISSET(pipefd[0], &read_fd_set)) {
ctrl_msg_t msg;
ssize_t nrd = read(pipefd[0], &msg, sizeof(msg));
if (nrd < 0) {
log_h->error("%s: unable to read control message.\n", name.c_str());
continue;
}
switch (msg.cmd) {
case ctrl_msg_t::cmd_t::EXIT:
running = false;
return;
case ctrl_msg_t::cmd_t::NEW_FD:
if (msg.new_fd >= 0) {
FD_SET(msg.new_fd, &total_fd_set);
max_fd = std::max(max_fd, msg.new_fd);
} else {
log_h->error("%s: added fd is not valid\n", name.c_str());
}
break;
default:
log_h->error("%s: ctrl message command %d is not valid\n", name.c_str(), (int)msg.cmd);
}
}
}
}
private:
const static int THREAD_PRIO = 65;
std::string name;
srslte::log* log_h = nullptr;
srslte::byte_buffer_pool* pool = nullptr;
std::mutex socket_mutex;
std::map<int, sctp_callback_t> active_sctp_sockets;
bool running = false;
int pipefd[2] = {};
struct ctrl_msg_t {
enum class cmd_t { EXIT, NEW_FD };
cmd_t cmd = cmd_t::EXIT;
int new_fd = -1;
};
};
} // namespace srslte
#endif // SRSLTE_RX_SOCKET_HANDLER_H

View File

@ -50,14 +50,18 @@ class thread
{
public:
thread(const std::string& name_) : _thread(0), name(name_) {}
thread(const thread&) = delete;
thread(thread&&) noexcept = default;
thread& operator=(const thread&) = delete;
thread& operator=(thread&&) noexcept = default;
bool start(int prio = -1) { return threads_new_rt_prio(&_thread, thread_function_entry, this, prio); }
bool start_cpu(int prio, int cpu) {
return threads_new_rt_cpu(&_thread, thread_function_entry, this, cpu, prio);
thread(const thread&) = delete;
thread(thread&& other) noexcept
{
_thread = other._thread;
name = std::move(other.name);
other._thread = 0;
other.name = "";
}
thread& operator=(const thread&) = delete;
thread& operator=(thread&&) noexcept = delete;
bool start(int prio = -1) { return threads_new_rt_prio(&_thread, thread_function_entry, this, prio); }
bool start_cpu(int prio, int cpu) { return threads_new_rt_cpu(&_thread, thread_function_entry, this, cpu, prio); }
bool start_cpu_mask(int prio, int mask)
{
return threads_new_rt_mask(&_thread, thread_function_entry, this, mask, prio);

View File

@ -78,3 +78,6 @@ add_test(queue_test queue_test)
add_executable(timer_test timer_test.cc)
target_link_libraries(timer_test srslte_common)
add_executable(rx_socket_handler_test rx_socket_handler_test.cc)
target_link_libraries(rx_socket_handler_test srslte_common ${CMAKE_THREAD_LIBS_INIT} ${SCTP_LIBRARIES})

View File

@ -0,0 +1,179 @@
/*
* Copyright 2013-2019 Software Radio Systems Limited
*
* This file is part of srsLTE.
*
* srsLTE is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* srsLTE is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* A copy of the GNU Affero General Public License can be found in
* the LICENSE file in the top-level directory of this distribution
* and at http://www.gnu.org/licenses/.
*
*/
#include "srslte/common/log_filter.h"
#include "srslte/common/rx_socket_handler.h"
#include <iostream>
#define TESTASSERT(cond) \
do { \
if (!(cond)) { \
std::cout << "[" << __FUNCTION__ << "][Line " << __LINE__ << "]: FAIL at " << (#cond) << std::endl; \
return -1; \
} \
} while (0)
struct sockaddr_in local_addr;
struct sockaddr_in listen_addr;
int socket_listen(srslte::log* log_h, const char* listen_addr_str, int listen_port)
{
/*This function sets up the SCTP socket for eNBs to connect to*/
int sock_fd, err;
struct sctp_event_subscribe evnts;
sock_fd = socket(AF_INET, SOCK_SEQPACKET, IPPROTO_SCTP);
if (sock_fd == -1) {
log_h->error("Could not create SCTP socket\n");
return -1;
}
// Sets the data_io_event to be able to use sendrecv_info
// Subscribes to the SCTP_SHUTDOWN event, to handle graceful shutdown
bzero(&evnts, sizeof(evnts));
evnts.sctp_data_io_event = 1;
evnts.sctp_shutdown_event = 1;
if (setsockopt(sock_fd, IPPROTO_SCTP, SCTP_EVENTS, &evnts, sizeof(evnts))) {
close(sock_fd);
log_h->console("Subscribing to sctp_data_io_events failed\n");
return -1;
}
// S1-MME bind
bzero(&listen_addr, sizeof(listen_addr));
listen_addr.sin_family = AF_INET;
inet_pton(AF_INET, listen_addr_str, &(listen_addr.sin_addr));
listen_addr.sin_port = htons(listen_port);
err = bind(sock_fd, (struct sockaddr*)&listen_addr, sizeof(listen_addr));
if (err != 0) {
close(sock_fd);
log_h->error("Error binding SCTP socket\n");
return -1;
}
// Listen for connections
err = listen(sock_fd, SOMAXCONN);
if (err != 0) {
close(sock_fd);
log_h->error("Error in SCTP socket listen\n");
return -1;
}
return sock_fd;
}
int create_fd(srslte::log* log_h, const char* bind_addr_str = "127.0.0.1")
{
int socket_fd = -1;
const int ADDR_FAMILY = AF_INET;
if ((socket_fd = socket(ADDR_FAMILY, SOCK_STREAM, IPPROTO_SCTP)) == -1) {
log_h->error("Failed to create S1AP socket\n");
perror("socket()");
goto exit_fail;
}
// Bind to the local address
memset(&local_addr, 0, sizeof(struct sockaddr_in));
local_addr.sin_family = ADDR_FAMILY;
local_addr.sin_port = 0; // Any local port will do
if (inet_pton(AF_INET, bind_addr_str, &(local_addr.sin_addr)) != 1) {
log_h->error("Error converting IP address (%s) to sockaddr_in structure\n", bind_addr_str);
goto exit_fail;
}
if (bind(socket_fd, (struct sockaddr*)&local_addr, sizeof(local_addr)) != 0) {
log_h->error("Failed to bind on S1-C address %s: %s errno %d\n", bind_addr_str, strerror(errno), errno);
perror("bind()");
goto exit_fail;
}
if (connect(socket_fd, (struct sockaddr*)&listen_addr, sizeof(listen_addr)) == -1) {
log_h->error("Failed to establish socket connection to Remote\n");
goto exit_fail;
}
log_h->info("Connected to remote\n");
return socket_fd;
exit_fail:
if (socket_fd >= 0) {
close(socket_fd);
socket_fd = -1;
}
return -1;
}
int test_socket_handler()
{
srslte::log_filter log("S1AP");
log.set_level(srslte::LOG_LEVEL_DEBUG);
log.set_hex_limit(128);
int counter = 0;
srslte::byte_buffer_pool* pool = srslte::byte_buffer_pool::get_instance();
srslte::rx_socket_handler sockhandler("RXSOCKETS", &log);
int listen_fd = socket_listen(&log, "127.0.100.1", 36412);
log.info("Listening from %d\n", listen_fd);
int fd = create_fd(&log, "127.0.0.1");
TESTASSERT(fd >= 0);
int PPID = 18;
const int NONUE_STREAM_ID = 0;
sockhandler.register_sctp_socket(listen_fd, [&counter, &log](srslte::rx_socket_handler::sctp_packet_t&& packet) {
log.info("Received %ld bytes\n", packet.rd_sz);
if (packet.buf != nullptr) {
log.info_hex(packet.buf->msg, packet.buf->N_bytes, "Received msg:");
counter++;
}
});
srslte::unique_byte_buffer_t buf = srslte::allocate_unique_buffer(*pool, true);
for (uint32_t i = 0; i < 5; ++i) {
buf->N_bytes = i + 1;
buf->msg[i] = i;
ssize_t n_sent = sctp_sendmsg(fd,
buf->msg,
buf->N_bytes,
(struct sockaddr*)&listen_addr,
sizeof(struct sockaddr_in),
htonl(PPID),
0,
NONUE_STREAM_ID,
0,
0);
TESTASSERT(n_sent == buf->N_bytes);
usleep(1000);
log.info("Message %d sent.\n", i);
}
sleep(1);
return 0;
}
int main()
{
TESTASSERT(test_socket_handler() == 0);
return 0;
}