- Replace loggers is network utils.

- Adapt tests that use network utils and callers.
This commit is contained in:
faluco 2021-02-01 17:23:01 +01:00 committed by faluco
parent e9ed6f31ba
commit 1a5799a6ca
4 changed files with 77 additions and 71 deletions

View File

@ -125,7 +125,7 @@ public:
using sctp_recv_callback_t = using sctp_recv_callback_t =
std::function<void(srslte::unique_byte_buffer_t, const sockaddr_in&, const sctp_sndrcvinfo&, int)>; std::function<void(srslte::unique_byte_buffer_t, const sockaddr_in&, const sctp_sndrcvinfo&, int)>;
rx_multisocket_handler(std::string name_, srslte::log_ref log_, int thread_prio = 65); rx_multisocket_handler(std::string name_, srslog::basic_logger& logger, int thread_prio = 65);
rx_multisocket_handler(rx_multisocket_handler&&) = delete; rx_multisocket_handler(rx_multisocket_handler&&) = delete;
rx_multisocket_handler(const rx_multisocket_handler&) = delete; rx_multisocket_handler(const rx_multisocket_handler&) = delete;
rx_multisocket_handler& operator=(const rx_multisocket_handler&) = delete; rx_multisocket_handler& operator=(const rx_multisocket_handler&) = delete;
@ -152,7 +152,7 @@ private:
// args // args
std::string name; std::string name;
srslte::log_ref log_h; srslog::basic_logger& logger;
srslte::byte_buffer_pool* pool = nullptr; srslte::byte_buffer_pool* pool = nullptr;
// state // state

View File

@ -16,10 +16,10 @@
#include <sys/socket.h> #include <sys/socket.h>
#include <sys/types.h> #include <sys/types.h>
#define rxSockError(fmt, ...) log_h->error("%s: " fmt, name.c_str(), ##__VA_ARGS__) #define rxSockError(fmt, ...) logger.error("%s: " fmt, name.c_str(), ##__VA_ARGS__)
#define rxSockWarn(fmt, ...) log_h->warning("%s: " fmt, name.c_str(), ##__VA_ARGS__) #define rxSockWarn(fmt, ...) logger.warning("%s: " fmt, name.c_str(), ##__VA_ARGS__)
#define rxSockInfo(fmt, ...) log_h->info("%s: " fmt, name.c_str(), ##__VA_ARGS__) #define rxSockInfo(fmt, ...) logger.info("%s: " fmt, name.c_str(), ##__VA_ARGS__)
#define rxSockDebug(fmt, ...) log_h->debug("%s: " fmt, name.c_str(), ##__VA_ARGS__) #define rxSockDebug(fmt, ...) logger.debug("%s: " fmt, name.c_str(), ##__VA_ARGS__)
namespace srslte { namespace srslte {
@ -104,7 +104,7 @@ int open_socket(net_utils::addr_family ip_type, net_utils::socket_type socket_ty
{ {
int fd = socket((int)ip_type, (int)socket_type, (int)protocol); int fd = socket((int)ip_type, (int)socket_type, (int)protocol);
if (fd == -1) { if (fd == -1) {
srslte::logmap::get(LOGSERVICE)->error("Failed to open %s socket.\n", net_utils::protocol_to_string(protocol)); srslog::fetch_basic_logger(LOGSERVICE).error("Failed to open %s socket.", net_utils::protocol_to_string(protocol));
perror("Could not create socket\n"); perror("Could not create socket\n");
return -1; return -1;
} }
@ -118,7 +118,7 @@ int open_socket(net_utils::addr_family ip_type, net_utils::socket_type socket_ty
evnts.sctp_shutdown_event = 1; evnts.sctp_shutdown_event = 1;
evnts.sctp_address_event = 1; evnts.sctp_address_event = 1;
if (setsockopt(fd, IPPROTO_SCTP, SCTP_EVENTS, &evnts, sizeof(evnts)) != 0) { if (setsockopt(fd, IPPROTO_SCTP, SCTP_EVENTS, &evnts, sizeof(evnts)) != 0) {
srslte::logmap::get(LOGSERVICE)->error("Failed to subscribe to SCTP_SHUTDOWN event: %s\n", strerror(errno)); srslog::fetch_basic_logger(LOGSERVICE).error("Failed to subscribe to SCTP_SHUTDOWN event: %s", strerror(errno));
perror("Could not regiester socket to SCTP events\n"); perror("Could not regiester socket to SCTP events\n");
} }
@ -138,9 +138,9 @@ int open_socket(net_utils::addr_family ip_type, net_utils::socket_type socket_ty
rto_opts.srto_max = 6000; // 6 seconds rto_opts.srto_max = 6000; // 6 seconds
srslte::logmap::get(LOGSERVICE) srslog::fetch_basic_logger(LOGSERVICE)
->debug( .debug(
"Setting RTO_INFO options on SCTP socket. Association %d, Initial RTO %d, Minimum RTO %d, Maximum RTO %d\n", "Setting RTO_INFO options on SCTP socket. Association %d, Initial RTO %d, Minimum RTO %d, Maximum RTO %d",
rto_opts.srto_assoc_id, rto_opts.srto_assoc_id,
rto_opts.srto_initial, rto_opts.srto_initial,
rto_opts.srto_min, rto_opts.srto_min,
@ -161,8 +161,8 @@ int open_socket(net_utils::addr_family ip_type, net_utils::socket_type socket_ty
init_opts.sinit_max_attempts = 3; init_opts.sinit_max_attempts = 3;
init_opts.sinit_max_init_timeo = 5000; // 5 seconds init_opts.sinit_max_init_timeo = 5000; // 5 seconds
srslte::logmap::get(LOGSERVICE) srslog::fetch_basic_logger(LOGSERVICE)
->debug("Setting SCTP_INITMSG options on SCTP socket. Max attempts %d, Max init attempts timeout %d\n", .debug("Setting SCTP_INITMSG options on SCTP socket. Max attempts %d, Max init attempts timeout %d",
init_opts.sinit_max_attempts, init_opts.sinit_max_attempts,
init_opts.sinit_max_init_timeo); init_opts.sinit_max_init_timeo);
if (setsockopt(fd, SOL_SCTP, SCTP_INITMSG, &init_opts, init_sz) < 0) { if (setsockopt(fd, SOL_SCTP, SCTP_INITMSG, &init_opts, init_sz) < 0) {
@ -177,13 +177,13 @@ int open_socket(net_utils::addr_family ip_type, net_utils::socket_type socket_ty
bool bind_addr(int fd, const sockaddr_in& addr_in) bool bind_addr(int fd, const sockaddr_in& addr_in)
{ {
if (fd < 0) { if (fd < 0) {
srslte::logmap::get(LOGSERVICE)->error("Trying to bind to a closed socket\n"); srslog::fetch_basic_logger(LOGSERVICE).error("Trying to bind to a closed socket");
return false; return false;
} }
if (bind(fd, (struct sockaddr*)&addr_in, sizeof(addr_in)) != 0) { if (bind(fd, (struct sockaddr*)&addr_in, sizeof(addr_in)) != 0) {
srslte::logmap::get(LOGSERVICE) srslog::fetch_basic_logger(LOGSERVICE)
->error("Failed to bind on address %s: %s errno %d\n", get_ip(addr_in).c_str(), strerror(errno), errno); .error("Failed to bind on address %s: %s errno %d", get_ip(addr_in).c_str(), strerror(errno), errno);
perror("bind()"); perror("bind()");
return false; return false;
} }
@ -194,7 +194,8 @@ bool bind_addr(int fd, const char* bind_addr_str, int port, sockaddr_in* addr_re
{ {
sockaddr_in addr_tmp{}; sockaddr_in addr_tmp{};
if (not net_utils::set_sockaddr(&addr_tmp, bind_addr_str, port)) { if (not net_utils::set_sockaddr(&addr_tmp, bind_addr_str, port)) {
srslte::logmap::get(LOGSERVICE)->error("Failed to convert IP address (%s) to sockaddr_in struct\n", bind_addr_str); srslog::fetch_basic_logger(LOGSERVICE)
.error("Failed to convert IP address (%s) to sockaddr_in struct", bind_addr_str);
return false; return false;
} }
bind_addr(fd, addr_tmp); bind_addr(fd, addr_tmp);
@ -207,20 +208,20 @@ bool bind_addr(int fd, const char* bind_addr_str, int port, sockaddr_in* addr_re
bool connect_to(int fd, const char* dest_addr_str, int dest_port, sockaddr_in* dest_sockaddr) bool connect_to(int fd, const char* dest_addr_str, int dest_port, sockaddr_in* dest_sockaddr)
{ {
if (fd < 0) { if (fd < 0) {
srslte::logmap::get(LOGSERVICE)->error("tried to connect to remote address with an invalid socket.\n"); srslog::fetch_basic_logger(LOGSERVICE).error("tried to connect to remote address with an invalid socket.");
return false; return false;
} }
sockaddr_in sockaddr_tmp{}; sockaddr_in sockaddr_tmp{};
if (not net_utils::set_sockaddr(&sockaddr_tmp, dest_addr_str, dest_port)) { if (not net_utils::set_sockaddr(&sockaddr_tmp, dest_addr_str, dest_port)) {
srslte::logmap::get(LOGSERVICE) srslog::fetch_basic_logger(LOGSERVICE)
->error("Error converting IP address (%s) to sockaddr_in structure\n", dest_addr_str); .error("Error converting IP address (%s) to sockaddr_in structure", dest_addr_str);
return false; return false;
} }
if (dest_sockaddr != nullptr) { if (dest_sockaddr != nullptr) {
*dest_sockaddr = sockaddr_tmp; *dest_sockaddr = sockaddr_tmp;
} }
if (connect(fd, (const struct sockaddr*)&sockaddr_tmp, sizeof(sockaddr_tmp)) == -1) { if (connect(fd, (const struct sockaddr*)&sockaddr_tmp, sizeof(sockaddr_tmp)) == -1) {
srslte::logmap::get(LOGSERVICE)->info("Failed to establish socket connection to %s\n", dest_addr_str); srslog::fetch_basic_logger(LOGSERVICE).info("Failed to establish socket connection to %s", dest_addr_str);
perror("connect()"); perror("connect()");
return false; return false;
} }
@ -285,7 +286,7 @@ bool socket_handler_t::open_socket(net_utils::addr_family ip_type,
net_utils::protocol_type protocol) net_utils::protocol_type protocol)
{ {
if (sockfd >= 0) { if (sockfd >= 0) {
srslte::logmap::get(LOGSERVICE)->error("Socket is already open.\n"); srslog::fetch_basic_logger(LOGSERVICE).error("Socket is already open.");
return false; return false;
} }
sockfd = net_utils::open_socket(ip_type, socket_type, protocol); sockfd = net_utils::open_socket(ip_type, socket_type, protocol);
@ -322,7 +323,7 @@ bool sctp_init_server(socket_handler_t* socket, net_utils::socket_type socktype,
} }
// Listen for connections // Listen for connections
if (listen(socket->fd(), SOMAXCONN) != 0) { if (listen(socket->fd(), SOMAXCONN) != 0) {
srslte::logmap::get(LOGSERVICE)->error("Failed to listen to incoming SCTP connections\n"); srslog::fetch_basic_logger(LOGSERVICE).error("Failed to listen to incoming SCTP connections");
return false; return false;
} }
return true; return true;
@ -343,7 +344,7 @@ bool tcp_make_server(socket_handler_t* socket, const char* bind_addr_str, int po
} }
// Listen for connections // Listen for connections
if (listen(socket->fd(), nof_connections) != 0) { if (listen(socket->fd(), nof_connections) != 0) {
srslte::logmap::get(LOGSERVICE)->error("Failed to listen to incoming TCP connections\n"); srslog::fetch_basic_logger(LOGSERVICE).error("Failed to listen to incoming TCP connections");
return false; return false;
} }
return true; return true;
@ -354,7 +355,7 @@ int tcp_accept(socket_handler_t* socket, sockaddr_in* destaddr)
socklen_t clilen = sizeof(destaddr); socklen_t clilen = sizeof(destaddr);
int connfd = accept(socket->fd(), (struct sockaddr*)&destaddr, &clilen); int connfd = accept(socket->fd(), (struct sockaddr*)&destaddr, &clilen);
if (connfd < 0) { if (connfd < 0) {
srslte::logmap::get(LOGSERVICE)->error("Failed to accept connection\n"); srslog::fetch_basic_logger(LOGSERVICE).error("Failed to accept connection");
perror("accept"); perror("accept");
return -1; return -1;
} }
@ -365,12 +366,12 @@ int tcp_read(int remotefd, void* buf, size_t nbytes)
{ {
int n = ::read(remotefd, buf, nbytes); int n = ::read(remotefd, buf, nbytes);
if (n == 0) { if (n == 0) {
srslte::logmap::get(LOGSERVICE)->info("TCP connection closed\n"); srslog::fetch_basic_logger(LOGSERVICE).info("TCP connection closed");
close(remotefd); close(remotefd);
return 0; return 0;
} }
if (n == -1) { if (n == -1) {
srslte::logmap::get(LOGSERVICE)->error("Failed to read from TCP socket."); srslog::fetch_basic_logger(LOGSERVICE).error("Failed to read from TCP socket.");
perror("TCP read"); perror("TCP read");
} }
return n; return n;
@ -384,7 +385,7 @@ int tcp_send(int remotefd, const void* buf, size_t nbytes)
while (nbytes_remaining > 0) { while (nbytes_remaining > 0) {
ssize_t i = ::send(remotefd, ptr, nbytes_remaining, 0); ssize_t i = ::send(remotefd, ptr, nbytes_remaining, 0);
if (i < 1) { if (i < 1) {
srslte::logmap::get(LOGSERVICE)->error("Failed to send data to TCP socket\n"); srslog::fetch_basic_logger(LOGSERVICE).error("Failed to send data to TCP socket");
perror("Error calling send()\n"); perror("Error calling send()\n");
return i; return i;
} }
@ -408,8 +409,8 @@ class recvfrom_pdu_task final : public rx_multisocket_handler::recv_task
{ {
public: public:
using callback_t = std::function<void(srslte::unique_byte_buffer_t pdu, const sockaddr_in& from)>; using callback_t = std::function<void(srslte::unique_byte_buffer_t pdu, const sockaddr_in& from)>;
explicit recvfrom_pdu_task(srslte::byte_buffer_pool* pool_, srslte::log_ref log_, callback_t func_) : explicit recvfrom_pdu_task(srslte::byte_buffer_pool* pool_, srslog::basic_logger& logger, callback_t func_) :
pool(pool_), log_h(log_), func(std::move(func_)) pool(pool_), logger(logger), func(std::move(func_))
{} {}
bool operator()(int fd) override bool operator()(int fd) override
@ -420,11 +421,11 @@ public:
ssize_t n_recv = recvfrom(fd, pdu->msg, pdu->get_tailroom(), 0, (struct sockaddr*)&from, &fromlen); ssize_t n_recv = recvfrom(fd, pdu->msg, pdu->get_tailroom(), 0, (struct sockaddr*)&from, &fromlen);
if (n_recv == -1 and errno != EAGAIN) { if (n_recv == -1 and errno != EAGAIN) {
log_h->error("Error reading from socket: %s\n", strerror(errno)); logger.error("Error reading from socket: %s", strerror(errno));
return true; return true;
} }
if (n_recv == -1 and errno == EAGAIN) { if (n_recv == -1 and errno == EAGAIN) {
log_h->debug("Socket timeout reached\n"); logger.debug("Socket timeout reached");
return true; return true;
} }
@ -435,7 +436,7 @@ public:
private: private:
srslte::byte_buffer_pool* pool = nullptr; srslte::byte_buffer_pool* pool = nullptr;
srslte::log_ref log_h; srslog::basic_logger& logger;
callback_t func; callback_t func;
}; };
@ -444,8 +445,8 @@ class sctp_recvmsg_pdu_task final : public rx_multisocket_handler::recv_task
public: public:
using callback_t = std::function< using callback_t = std::function<
void(srslte::unique_byte_buffer_t pdu, const sockaddr_in& from, const sctp_sndrcvinfo& sri, int flags)>; void(srslte::unique_byte_buffer_t pdu, const sockaddr_in& from, const sctp_sndrcvinfo& sri, int flags)>;
explicit sctp_recvmsg_pdu_task(srslte::byte_buffer_pool* pool_, srslte::log_ref log_, callback_t func_) : explicit sctp_recvmsg_pdu_task(srslte::byte_buffer_pool* pool_, srslog::basic_logger& logger, callback_t func_) :
pool(pool_), log_h(log_), func(std::move(func_)) pool(pool_), logger(logger), func(std::move(func_))
{} {}
bool operator()(int fd) override bool operator()(int fd) override
@ -458,11 +459,11 @@ public:
int flags = 0; int flags = 0;
ssize_t n_recv = sctp_recvmsg(fd, pdu->msg, pdu->get_tailroom(), (struct sockaddr*)&from, &fromlen, &sri, &flags); ssize_t n_recv = sctp_recvmsg(fd, pdu->msg, pdu->get_tailroom(), (struct sockaddr*)&from, &fromlen, &sri, &flags);
if (n_recv == -1 and errno != EAGAIN) { if (n_recv == -1 and errno != EAGAIN) {
log_h->error("Error reading from SCTP socket: %s\n", strerror(errno)); logger.error("Error reading from SCTP socket: %s", strerror(errno));
return true; return true;
} }
if (n_recv == -1 and errno == EAGAIN) { if (n_recv == -1 and errno == EAGAIN) {
log_h->debug("Socket timeout reached\n"); logger.debug("Socket timeout reached");
return true; return true;
} }
@ -482,7 +483,7 @@ public:
private: private:
srslte::byte_buffer_pool* pool = nullptr; srslte::byte_buffer_pool* pool = nullptr;
srslte::log_ref log_h; srslog::basic_logger& logger;
callback_t func; callback_t func;
}; };
@ -490,13 +491,13 @@ private:
* Rx Multisocket Handler * Rx Multisocket Handler
**************************************************************/ **************************************************************/
rx_multisocket_handler::rx_multisocket_handler(std::string name_, srslte::log_ref log_, int thread_prio) : rx_multisocket_handler::rx_multisocket_handler(std::string name_, srslog::basic_logger& logger, int thread_prio) :
thread(name_), name(std::move(name_)), log_h(log_) thread(name_), name(std::move(name_)), logger(logger)
{ {
pool = srslte::byte_buffer_pool::get_instance(); pool = srslte::byte_buffer_pool::get_instance();
// register control pipe fd // register control pipe fd
if (pipe(pipefd) == -1) { if (pipe(pipefd) == -1) {
rxSockInfo("Failed to open control pipe\n"); rxSockInfo("Failed to open control pipe");
return; return;
} }
start(thread_prio); start(thread_prio);
@ -516,10 +517,10 @@ void rx_multisocket_handler::stop()
ctrl_cmd_t msg{}; ctrl_cmd_t msg{};
msg.cmd = ctrl_cmd_t::cmd_id_t::EXIT; msg.cmd = ctrl_cmd_t::cmd_id_t::EXIT;
if (write(pipefd[1], &msg, sizeof(msg)) != sizeof(msg)) { if (write(pipefd[1], &msg, sizeof(msg)) != sizeof(msg)) {
rxSockError("while writing to control pipe\n"); rxSockError("while writing to control pipe");
} }
} }
rxSockDebug("Closing rx socket handler thread\n"); rxSockDebug("Closing rx socket handler thread");
wait_thread_finish(); wait_thread_finish();
} }
@ -528,7 +529,7 @@ void rx_multisocket_handler::stop()
close(pipefd[1]); close(pipefd[1]);
pipefd[0] = -1; pipefd[0] = -1;
pipefd[1] = -1; pipefd[1] = -1;
rxSockDebug("closed.\n"); rxSockDebug("closed.");
} }
} }
@ -538,7 +539,7 @@ void rx_multisocket_handler::stop()
bool rx_multisocket_handler::add_socket_pdu_handler(int fd, recvfrom_callback_t pdu_task) bool rx_multisocket_handler::add_socket_pdu_handler(int fd, recvfrom_callback_t pdu_task)
{ {
std::unique_ptr<srslte::rx_multisocket_handler::recv_task> task; std::unique_ptr<srslte::rx_multisocket_handler::recv_task> task;
task.reset(new srslte::recvfrom_pdu_task(pool, log_h, std::move(pdu_task))); task.reset(new srslte::recvfrom_pdu_task(pool, logger, std::move(pdu_task)));
return add_socket_handler(fd, std::move(task)); return add_socket_handler(fd, std::move(task));
} }
@ -548,7 +549,7 @@ bool rx_multisocket_handler::add_socket_pdu_handler(int fd, recvfrom_callback_t
bool rx_multisocket_handler::add_socket_sctp_pdu_handler(int fd, sctp_recv_callback_t pdu_task) bool rx_multisocket_handler::add_socket_sctp_pdu_handler(int fd, sctp_recv_callback_t pdu_task)
{ {
srslte::rx_multisocket_handler::task_callback_t task; srslte::rx_multisocket_handler::task_callback_t task;
task.reset(new srslte::sctp_recvmsg_pdu_task(pool, log_h, std::move(pdu_task))); task.reset(new srslte::sctp_recvmsg_pdu_task(pool, logger, std::move(pdu_task)));
return add_socket_handler(fd, std::move(task)); return add_socket_handler(fd, std::move(task));
} }
@ -556,11 +557,11 @@ bool rx_multisocket_handler::add_socket_handler(int fd, task_callback_t handler)
{ {
std::lock_guard<std::mutex> lock(socket_mutex); std::lock_guard<std::mutex> lock(socket_mutex);
if (fd < 0) { if (fd < 0) {
rxSockError("Provided SCTP socket must be already open\n"); rxSockError("Provided SCTP socket must be already open");
return false; return false;
} }
if (active_sockets.count(fd) > 0) { if (active_sockets.count(fd) > 0) {
rxSockError("Tried to register fd=%d, but this fd already exists\n", fd); rxSockError("Tried to register fd=%d, but this fd already exists", fd);
return false; return false;
} }
@ -571,11 +572,11 @@ bool rx_multisocket_handler::add_socket_handler(int fd, task_callback_t handler)
msg.cmd = ctrl_cmd_t::cmd_id_t::NEW_FD; msg.cmd = ctrl_cmd_t::cmd_id_t::NEW_FD;
msg.new_fd = fd; msg.new_fd = fd;
if (write(pipefd[1], &msg, sizeof(msg)) != sizeof(msg)) { if (write(pipefd[1], &msg, sizeof(msg)) != sizeof(msg)) {
rxSockError("while writing to control pipe\n"); rxSockError("while writing to control pipe");
return false; return false;
} }
rxSockDebug("socket fd=%d has been registered.\n", fd); rxSockDebug("socket fd=%d has been registered.", fd);
return true; return true;
} }
@ -584,7 +585,7 @@ bool rx_multisocket_handler::remove_socket(int fd)
std::lock_guard<std::mutex> lock(socket_mutex); std::lock_guard<std::mutex> lock(socket_mutex);
auto it = active_sockets.find(fd); auto it = active_sockets.find(fd);
if (it == active_sockets.end()) { if (it == active_sockets.end()) {
rxSockError("The socket fd=%d to be removed does not exist\n", fd); rxSockError("The socket fd=%d to be removed does not exist", fd);
return false; return false;
} }
@ -592,7 +593,7 @@ bool rx_multisocket_handler::remove_socket(int fd)
msg.cmd = ctrl_cmd_t::cmd_id_t::RM_FD; msg.cmd = ctrl_cmd_t::cmd_id_t::RM_FD;
msg.new_fd = fd; msg.new_fd = fd;
if (write(pipefd[1], &msg, sizeof(msg)) != sizeof(msg)) { if (write(pipefd[1], &msg, sizeof(msg)) != sizeof(msg)) {
rxSockError("while writing to control pipe\n"); rxSockError("while writing to control pipe");
return false; return false;
} }
return true; return true;
@ -601,14 +602,14 @@ bool rx_multisocket_handler::remove_socket(int fd)
bool rx_multisocket_handler::remove_socket_unprotected(int fd, fd_set* total_fd_set, int* max_fd) bool rx_multisocket_handler::remove_socket_unprotected(int fd, fd_set* total_fd_set, int* max_fd)
{ {
if (fd < 0) { if (fd < 0) {
rxSockError("fd to be removed is not valid\n"); rxSockError("fd to be removed is not valid");
return false; return false;
} }
active_sockets.erase(fd); active_sockets.erase(fd);
FD_CLR(fd, total_fd_set); FD_CLR(fd, total_fd_set);
// assumes ordering // assumes ordering
*max_fd = (active_sockets.empty()) ? pipefd[0] : std::max(pipefd[0], active_sockets.rbegin()->first); *max_fd = (active_sockets.empty()) ? pipefd[0] : std::max(pipefd[0], active_sockets.rbegin()->first);
rxSockDebug("Socket fd=%d has been successfully removed\n", fd); rxSockDebug("Socket fd=%d has been successfully removed", fd);
return true; return true;
} }
@ -632,7 +633,7 @@ void rx_multisocket_handler::run_thread()
continue; continue;
} }
if (n == 0) { if (n == 0) {
rxSockDebug("No data from select.\n"); rxSockDebug("No data from select.");
continue; continue;
} }
@ -648,7 +649,7 @@ void rx_multisocket_handler::run_thread()
} }
bool socket_valid = callback->operator()(fd); bool socket_valid = callback->operator()(fd);
if (not socket_valid) { if (not socket_valid) {
rxSockInfo("The socket fd=%d has been closed by peer\n", fd); rxSockInfo("The socket fd=%d has been closed by peer", fd);
remove_socket_unprotected(fd, &total_fd_set, &max_fd); remove_socket_unprotected(fd, &total_fd_set, &max_fd);
} }
} }
@ -658,7 +659,7 @@ void rx_multisocket_handler::run_thread()
ctrl_cmd_t msg; ctrl_cmd_t msg;
ssize_t nrd = read(pipefd[0], &msg, sizeof(msg)); ssize_t nrd = read(pipefd[0], &msg, sizeof(msg));
if (nrd <= 0) { if (nrd <= 0) {
rxSockError("Unable to read control message.\n"); rxSockError("Unable to read control message.");
continue; continue;
} }
switch (msg.cmd) { switch (msg.cmd) {
@ -670,15 +671,15 @@ void rx_multisocket_handler::run_thread()
FD_SET(msg.new_fd, &total_fd_set); FD_SET(msg.new_fd, &total_fd_set);
max_fd = std::max(max_fd, msg.new_fd); max_fd = std::max(max_fd, msg.new_fd);
} else { } else {
rxSockError("added fd is not valid\n"); rxSockError("added fd is not valid");
} }
break; break;
case ctrl_cmd_t::cmd_id_t::RM_FD: case ctrl_cmd_t::cmd_id_t::RM_FD:
remove_socket_unprotected(msg.new_fd, &total_fd_set, &max_fd); remove_socket_unprotected(msg.new_fd, &total_fd_set, &max_fd);
rxSockDebug("Socket fd=%d has been successfully removed\n", msg.new_fd); rxSockDebug("Socket fd=%d has been successfully removed", msg.new_fd);
break; break;
default: default:
rxSockError("ctrl message command %d is not valid\n", (int)msg.cmd); rxSockError("ctrl message command %d is not valid", (int)msg.cmd);
} }
} }
} }

View File

@ -24,20 +24,18 @@
int test_socket_handler() int test_socket_handler()
{ {
srslte::log_ref log("S1AP"); auto& logger = srslog::fetch_basic_logger("S1AP", false);
log->set_level(srslte::LOG_LEVEL_DEBUG);
log->set_hex_limit(128);
int counter = 0; int counter = 0;
srslte::socket_handler_t server_socket, client_socket, client_socket2; srslte::socket_handler_t server_socket, client_socket, client_socket2;
srslte::rx_multisocket_handler sockhandler("RXSOCKETS", log); srslte::rx_multisocket_handler sockhandler("RXSOCKETS", logger);
int server_port = 36412; int server_port = 36412;
const char* server_addr = "127.0.100.1"; const char* server_addr = "127.0.100.1";
using namespace srslte::net_utils; using namespace srslte::net_utils;
TESTASSERT(sctp_init_server(&server_socket, socket_type::seqpacket, server_addr, server_port)); TESTASSERT(sctp_init_server(&server_socket, socket_type::seqpacket, server_addr, server_port));
log->info("Listening from fd=%d\n", server_socket.fd()); logger.info("Listening from fd=%d", server_socket.fd());
TESTASSERT(sctp_init_client(&client_socket, socket_type::seqpacket, "127.0.0.1")); TESTASSERT(sctp_init_client(&client_socket, socket_type::seqpacket, "127.0.0.1"));
TESTASSERT(sctp_init_client(&client_socket2, socket_type::seqpacket, "127.0.0.2")); TESTASSERT(sctp_init_client(&client_socket2, socket_type::seqpacket, "127.0.0.2"));
@ -46,10 +44,10 @@ int test_socket_handler()
// register server Rx handler // register server Rx handler
auto pdu_handler = auto pdu_handler =
[log, [&logger,
&counter](srslte::unique_byte_buffer_t pdu, const sockaddr_in& from, const sctp_sndrcvinfo& sri, int flags) { &counter](srslte::unique_byte_buffer_t pdu, const sockaddr_in& from, const sctp_sndrcvinfo& sri, int flags) {
if (pdu->N_bytes > 0) { if (pdu->N_bytes > 0) {
log->info_hex(pdu->msg, pdu->N_bytes, "Received msg from %s:", get_ip(from).c_str()); logger.info(pdu->msg, pdu->N_bytes, "Received msg from %s:", get_ip(from).c_str());
counter++; counter++;
} }
}; };
@ -80,7 +78,7 @@ int test_socket_handler()
0); 0);
TESTASSERT(n_sent >= 0); TESTASSERT(n_sent >= 0);
usleep(1000); usleep(1000);
log->info("Message %d sent.\n", i); logger.info("Message %d sent.", i);
} }
uint32_t time_elapsed = 0; uint32_t time_elapsed = 0;
@ -98,6 +96,13 @@ int test_socket_handler()
int main() int main()
{ {
auto& logger = srslog::fetch_basic_logger("S1AP", false);
logger.set_level(srslog::basic_levels::debug);
logger.set_hex_dump_max_size(128);
srslog::init();
TESTASSERT(test_socket_handler() == 0); TESTASSERT(test_socket_handler() == 0);
return 0; return 0;
} }

View File

@ -116,7 +116,7 @@ int enb_stack_lte::init(const stack_args_t& args_, const rrc_cfg_t& rrc_cfg_)
} }
// Init Rx socket handler // Init Rx socket handler
rx_sockets.reset(new srslte::rx_multisocket_handler("ENBSOCKETS", stack_log)); rx_sockets.reset(new srslte::rx_multisocket_handler("ENBSOCKETS", stack_logger));
// add sync queue // add sync queue
sync_task_queue = task_sched.make_task_queue(args.sync_queue_size); sync_task_queue = task_sched.make_task_queue(args.sync_queue_size);