atbetaflight/lib/main/dyad/dyad.c

1157 lines
29 KiB
C

/**
* Copyright (c) 2016 rxi
*
* This library is free software; you can redistribute it and/or modify it
* under the terms of the MIT license. See LICENSE for details.
*/
#ifdef _WIN32
#define _WIN32_WINNT 0x501
#ifndef _CRT_SECURE_NO_WARNINGS
#define _CRT_SECURE_NO_WARNINGS
#endif
#include <winsock2.h>
#include <ws2tcpip.h>
#include <windows.h>
#else
#define _POSIX_C_SOURCE 200809L
#ifdef __APPLE__
#define _DARWIN_UNLIMITED_SELECT
#endif
#include <unistd.h>
#include <netdb.h>
#include <fcntl.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/time.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#endif
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <stdarg.h>
#include <signal.h>
#include <errno.h>
#include <limits.h>
#include "dyad.h"
#define DYAD_VERSION "0.2.1"
#ifdef _WIN32
#define close(a) closesocket(a)
#define getsockopt(a,b,c,d,e) getsockopt((a),(b),(c),(char*)(d),(e))
#define setsockopt(a,b,c,d,e) setsockopt((a),(b),(c),(char*)(d),(e))
#define select(a,b,c,d,e) select((int)(a),(b),(c),(d),(e))
#define bind(a,b,c) bind((a),(b),(int)(c))
#define connect(a,b,c) connect((a),(b),(int)(c))
#undef errno
#define errno WSAGetLastError()
#undef EWOULDBLOCK
#define EWOULDBLOCK WSAEWOULDBLOCK
const char *inet_ntop(int af, const void *src, char *dst, socklen_t size) {
union { struct sockaddr sa; struct sockaddr_in sai;
struct sockaddr_in6 sai6; } addr;
int res;
memset(&addr, 0, sizeof(addr));
addr.sa.sa_family = af;
if (af == AF_INET6) {
memcpy(&addr.sai6.sin6_addr, src, sizeof(addr.sai6.sin6_addr));
} else {
memcpy(&addr.sai.sin_addr, src, sizeof(addr.sai.sin_addr));
}
res = WSAAddressToStringA(&addr.sa, sizeof(addr), 0, dst, (LPDWORD) &size);
if (res != 0) return NULL;
return dst;
}
#endif
#ifndef INVALID_SOCKET
#define INVALID_SOCKET -1
#endif
/*===========================================================================*/
/* Memory */
/*===========================================================================*/
static void panic(const char *fmt, ...);
static void *dyad_realloc(void *ptr, int n) {
ptr = realloc(ptr, n);
if (!ptr && n != 0) {
panic("out of memory");
}
return ptr;
}
static void dyad_free(void *ptr) {
free(ptr);
}
/*===========================================================================*/
/* Vec (dynamic array) */
/*===========================================================================*/
static void vec_expand(char **data, int *length, int *capacity, int memsz) {
if (*length + 1 > *capacity) {
if (*capacity == 0) {
*capacity = 1;
} else {
*capacity <<= 1;
}
*data = dyad_realloc(*data, *capacity * memsz);
}
}
static void vec_splice(
char **data, int *length, int *capacity, int memsz, int start, int count
) {
(void) capacity;
memmove(*data + start * memsz,
*data + (start + count) * memsz,
(*length - start - count) * memsz);
}
#define Vec(T)\
struct { T *data; int length, capacity; }
#define vec_unpack(v)\
(char**)&(v)->data, &(v)->length, &(v)->capacity, sizeof(*(v)->data)
#define vec_init(v)\
memset((v), 0, sizeof(*(v)))
#define vec_deinit(v)\
dyad_free((v)->data)
#define vec_clear(v)\
((v)->length = 0)
#define vec_push(v, val)\
( vec_expand(vec_unpack(v)),\
(v)->data[(v)->length++] = (val) )
#define vec_splice(v, start, count)\
( vec_splice(vec_unpack(v), start, count),\
(v)->length -= (count) )
/*===========================================================================*/
/* SelectSet */
/*===========================================================================*/
/* A wrapper around the three fd_sets used for select(). The fd_sets' allocated
* memory is automatically expanded to accommodate fds as they are added.
*
* On Windows fd_sets are implemented as arrays; the FD_xxx macros are not used
* by the wrapper and instead the fd_set struct is manipulated directly. The
* wrapper should perform better than the normal FD_xxx macros, given that we
* don't bother with the linear search which FD_SET would perform to check for
* duplicates.
*
* On non-Windows platforms the sets are assumed to be bit arrays. The FD_xxx
* macros are not used in case their implementation attempts to do bounds
* checking; instead we manipulate the fd_sets' bits directly.
*/
enum {
SELECT_READ,
SELECT_WRITE,
SELECT_EXCEPT,
SELECT_MAX
};
typedef struct {
int capacity;
dyad_Socket maxfd;
fd_set *fds[SELECT_MAX];
} SelectSet;
#define DYAD_UNSIGNED_BIT (sizeof(unsigned) * CHAR_BIT)
static void select_deinit(SelectSet *s) {
int i;
for (i = 0; i < SELECT_MAX; i++) {
dyad_free(s->fds[i]);
s->fds[i] = NULL;
}
s->capacity = 0;
}
static void select_grow(SelectSet *s) {
int i;
int oldCapacity = s->capacity;
s->capacity = s->capacity ? s->capacity << 1 : 1;
for (i = 0; i < SELECT_MAX; i++) {
s->fds[i] = dyad_realloc(s->fds[i], s->capacity * sizeof(fd_set));
memset(s->fds[i] + oldCapacity, 0,
(s->capacity - oldCapacity) * sizeof(fd_set));
}
}
static void select_zero(SelectSet *s) {
int i;
if (s->capacity == 0) return;
s->maxfd = 0;
for (i = 0; i < SELECT_MAX; i++) {
#if _WIN32
s->fds[i]->fd_count = 0;
#else
memset(s->fds[i], 0, s->capacity * sizeof(fd_set));
#endif
}
}
static void select_add(SelectSet *s, int set, dyad_Socket fd) {
#ifdef _WIN32
fd_set *f;
if (s->capacity == 0) select_grow(s);
while ((unsigned) (s->capacity * FD_SETSIZE) < s->fds[set]->fd_count + 1) {
select_grow(s);
}
f = s->fds[set];
f->fd_array[f->fd_count++] = fd;
#else
unsigned *p;
while (s->capacity * FD_SETSIZE < fd) {
select_grow(s);
}
p = (unsigned*) s->fds[set];
p[fd / DYAD_UNSIGNED_BIT] |= 1 << (fd % DYAD_UNSIGNED_BIT);
if (fd > s->maxfd) s->maxfd = fd;
#endif
}
static int select_has(SelectSet *s, int set, dyad_Socket fd) {
#ifdef _WIN32
unsigned i;
fd_set *f;
if (s->capacity == 0) return 0;
f = s->fds[set];
for (i = 0; i < f->fd_count; i++) {
if (f->fd_array[i] == fd) {
return 1;
}
}
return 0;
#else
unsigned *p;
if (s->maxfd < fd) return 0;
p = (unsigned*) s->fds[set];
return p[fd / DYAD_UNSIGNED_BIT] & (1 << (fd % DYAD_UNSIGNED_BIT));
#endif
}
/*===========================================================================*/
/* Core */
/*===========================================================================*/
typedef struct {
int event;
dyad_Callback callback;
void *udata;
} Listener;
struct dyad_Stream {
int state, flags;
dyad_Socket sockfd;
char *address;
int port;
int bytesSent, bytesReceived;
double lastActivity, timeout;
Vec(Listener) listeners;
Vec(char) lineBuffer;
Vec(char) writeBuffer;
dyad_Stream *next;
};
#define DYAD_FLAG_READY (1 << 0)
#define DYAD_FLAG_WRITTEN (1 << 1)
static dyad_Stream *dyad_streams;
static int dyad_streamCount;
static char dyad_panicMsgBuffer[128];
static dyad_PanicCallback panicCallback;
static SelectSet dyad_selectSet;
static double dyad_updateTimeout = 1;
static double dyad_tickInterval = 1;
static double dyad_lastTick = 0;
static void panic(const char *fmt, ...) {
va_list args;
va_start(args, fmt);
vsprintf(dyad_panicMsgBuffer, fmt, args);
va_end(args);
if (panicCallback) {
panicCallback(dyad_panicMsgBuffer);
} else {
printf("dyad panic: %s\n", dyad_panicMsgBuffer);
}
exit(EXIT_FAILURE);
}
static dyad_Event createEvent(int type) {
dyad_Event e;
memset(&e, 0, sizeof(e));
e.type = type;
return e;
}
static void stream_destroy(dyad_Stream *stream);
static void destroyClosedStreams(void) {
dyad_Stream *stream = dyad_streams;
while (stream) {
if (stream->state == DYAD_STATE_CLOSED) {
dyad_Stream *next = stream->next;
stream_destroy(stream);
stream = next;
} else {
stream = stream->next;
}
}
}
static void stream_emitEvent(dyad_Stream *stream, dyad_Event *e);
static void updateTickTimer(void) {
/* Update tick timer */
if (dyad_lastTick == 0) {
dyad_lastTick = dyad_getTime();
}
while (dyad_lastTick < dyad_getTime()) {
/* Emit event on all streams */
dyad_Stream *stream;
dyad_Event e = createEvent(DYAD_EVENT_TICK);
e.msg = "a tick has occured";
stream = dyad_streams;
while (stream) {
stream_emitEvent(stream, &e);
stream = stream->next;
}
dyad_lastTick += dyad_tickInterval;
}
}
static void updateStreamTimeouts(void) {
double currentTime = dyad_getTime();
dyad_Stream *stream;
dyad_Event e = createEvent(DYAD_EVENT_TIMEOUT);
e.msg = "stream timed out";
stream = dyad_streams;
while (stream) {
if (stream->timeout) {
if (currentTime - stream->lastActivity > stream->timeout) {
stream_emitEvent(stream, &e);
dyad_close(stream);
}
}
stream = stream->next;
}
}
/*===========================================================================*/
/* Stream */
/*===========================================================================*/
static void stream_destroy(dyad_Stream *stream) {
dyad_Event e;
dyad_Stream **next;
/* Close socket */
if (stream->sockfd != INVALID_SOCKET) {
close(stream->sockfd);
}
/* Emit destroy event */
e = createEvent(DYAD_EVENT_DESTROY);
e.msg = "the stream has been destroyed";
stream_emitEvent(stream, &e);
/* Remove from list and decrement count */
next = &dyad_streams;
while (*next != stream) {
next = &(*next)->next;
}
*next = stream->next;
dyad_streamCount--;
/* Destroy and free */
vec_deinit(&stream->listeners);
vec_deinit(&stream->lineBuffer);
vec_deinit(&stream->writeBuffer);
dyad_free(stream->address);
dyad_free(stream);
}
static void stream_emitEvent(dyad_Stream *stream, dyad_Event *e) {
int i;
e->stream = stream;
for (i = 0; i < stream->listeners.length; i++) {
Listener *listener = &stream->listeners.data[i];
if (listener->event == e->type) {
e->udata = listener->udata;
listener->callback(e);
}
/* Check to see if this listener was removed: If it was we decrement `i`
* since the next listener will now be in this ones place */
if (listener != &stream->listeners.data[i]) {
i--;
}
}
}
static void stream_error(dyad_Stream *stream, const char *msg, int err) {
char buf[256];
dyad_Event e = createEvent(DYAD_EVENT_ERROR);
if (err) {
sprintf(buf, "%.160s (%.80s)", msg, strerror(err));
e.msg = buf;
} else {
e.msg = msg;
}
stream_emitEvent(stream, &e);
dyad_close(stream);
}
static void stream_initAddress(dyad_Stream *stream) {
union { struct sockaddr sa; struct sockaddr_storage sas;
struct sockaddr_in sai; struct sockaddr_in6 sai6; } addr;
socklen_t size;
memset(&addr, 0, sizeof(addr));
size = sizeof(addr);
dyad_free(stream->address);
stream->address = NULL;
if (getpeername(stream->sockfd, &addr.sa, &size) == -1) {
if (getsockname(stream->sockfd, &addr.sa, &size) == -1) {
return;
}
}
if (addr.sas.ss_family == AF_INET6) {
stream->address = dyad_realloc(NULL, INET6_ADDRSTRLEN);
inet_ntop(AF_INET6, &addr.sai6.sin6_addr, stream->address,
INET6_ADDRSTRLEN);
stream->port = ntohs(addr.sai6.sin6_port);
} else {
stream->address = dyad_realloc(NULL, INET_ADDRSTRLEN);
inet_ntop(AF_INET, &addr.sai.sin_addr, stream->address, INET_ADDRSTRLEN);
stream->port = ntohs(addr.sai.sin_port);
}
}
static void stream_setSocketNonBlocking(dyad_Stream *stream, int opt) {
#ifdef _WIN32
u_long mode = opt;
ioctlsocket(stream->sockfd, FIONBIO, &mode);
#else
int flags = fcntl(stream->sockfd, F_GETFL);
fcntl(stream->sockfd, F_SETFL,
opt ? (flags | O_NONBLOCK) : (flags & ~O_NONBLOCK));
#endif
}
static void stream_setSocket(dyad_Stream *stream, dyad_Socket sockfd) {
stream->sockfd = sockfd;
stream_setSocketNonBlocking(stream, 1);
stream_initAddress(stream);
}
static int stream_initSocket(
dyad_Stream *stream, int domain, int type, int protocol
) {
stream->sockfd = socket(domain, type, protocol);
if (stream->sockfd == INVALID_SOCKET) {
stream_error(stream, "could not create socket", errno);
return -1;
}
stream_setSocket(stream, stream->sockfd);
return 0;
}
static int stream_hasListenerForEvent(dyad_Stream *stream, int event) {
int i;
for (i = 0; i < stream->listeners.length; i++) {
Listener *listener = &stream->listeners.data[i];
if (listener->event == event) {
return 1;
}
}
return 0;
}
static void stream_handleReceivedData(dyad_Stream *stream) {
for (;;) {
/* Receive data */
dyad_Event e;
char data[8192];
int size = recv(stream->sockfd, data, sizeof(data) - 1, 0);
if (size <= 0) {
if (size == 0 || errno != EWOULDBLOCK) {
/* Handle disconnect */
dyad_close(stream);
return;
} else {
/* No more data */
return;
}
}
data[size] = 0;
/* Update status */
stream->bytesReceived += size;
stream->lastActivity = dyad_getTime();
/* Emit data event */
e = createEvent(DYAD_EVENT_DATA);
e.msg = "received data";
e.data = data;
e.size = size;
stream_emitEvent(stream, &e);
/* Check stream state in case it was closed during one of the data event
* handlers. */
if (stream->state != DYAD_STATE_CONNECTED) {
return;
}
/* Handle line event */
if (stream_hasListenerForEvent(stream, DYAD_EVENT_LINE)) {
int i, start;
char *buf;
for (i = 0; i < size; i++) {
vec_push(&stream->lineBuffer, data[i]);
}
start = 0;
buf = stream->lineBuffer.data;
for (i = 0; i < stream->lineBuffer.length; i++) {
if (buf[i] == '\n') {
dyad_Event e;
buf[i] = '\0';
e = createEvent(DYAD_EVENT_LINE);
e.msg = "received line";
e.data = &buf[start];
e.size = i - start;
/* Check and strip carriage return */
if (e.size > 0 && e.data[e.size - 1] == '\r') {
e.data[--e.size] = '\0';
}
stream_emitEvent(stream, &e);
start = i + 1;
/* Check stream state in case it was closed during one of the line
* event handlers. */
if (stream->state != DYAD_STATE_CONNECTED) {
return;
}
}
}
if (start == stream->lineBuffer.length) {
vec_clear(&stream->lineBuffer);
} else {
vec_splice(&stream->lineBuffer, 0, start);
}
}
}
}
static void stream_acceptPendingConnections(dyad_Stream *stream) {
for (;;) {
dyad_Stream *remote;
dyad_Event e;
int err = 0;
dyad_Socket sockfd = accept(stream->sockfd, NULL, NULL);
if (sockfd == INVALID_SOCKET) {
err = errno;
if (err == EWOULDBLOCK) {
/* No more waiting sockets */
return;
}
}
/* Create client stream */
remote = dyad_newStream();
remote->state = DYAD_STATE_CONNECTED;
/* Set stream's socket */
stream_setSocket(remote, sockfd);
/* Emit accept event */
e = createEvent(DYAD_EVENT_ACCEPT);
e.msg = "accepted connection";
e.remote = remote;
stream_emitEvent(stream, &e);
/* Handle invalid socket -- the stream is still made and the ACCEPT event
* is still emitted, but its shut immediately with an error */
if (remote->sockfd == INVALID_SOCKET) {
stream_error(remote, "failed to create socket on accept", err);
return;
}
}
}
static int stream_flushWriteBuffer(dyad_Stream *stream) {
stream->flags &= ~DYAD_FLAG_WRITTEN;
if (stream->writeBuffer.length > 0) {
/* Send data */
int size = send(stream->sockfd, stream->writeBuffer.data,
stream->writeBuffer.length, 0);
if (size <= 0) {
if (errno == EWOULDBLOCK) {
/* No more data can be written */
return 0;
} else {
/* Handle disconnect */
dyad_close(stream);
return 0;
}
}
if (size == stream->writeBuffer.length) {
vec_clear(&stream->writeBuffer);
} else {
vec_splice(&stream->writeBuffer, 0, size);
}
/* Update status */
stream->bytesSent += size;
stream->lastActivity = dyad_getTime();
}
if (stream->writeBuffer.length == 0) {
dyad_Event e;
/* If this is a 'closing' stream we can properly close it now */
if (stream->state == DYAD_STATE_CLOSING) {
dyad_close(stream);
return 0;
}
/* Set ready flag and emit 'ready for data' event */
stream->flags |= DYAD_FLAG_READY;
e = createEvent(DYAD_EVENT_READY);
e.msg = "stream is ready for more data";
stream_emitEvent(stream, &e);
}
/* Return 1 to indicate that more data can immediately be written to the
* stream's socket */
return 1;
}
/*===========================================================================*/
/* API */
/*===========================================================================*/
/*---------------------------------------------------------------------------*/
/* Core */
/*---------------------------------------------------------------------------*/
void dyad_update(void) {
dyad_Stream *stream;
struct timeval tv;
destroyClosedStreams();
updateTickTimer();
updateStreamTimeouts();
/* Create fd sets for select() */
select_zero(&dyad_selectSet);
stream = dyad_streams;
while (stream) {
switch (stream->state) {
case DYAD_STATE_CONNECTED:
select_add(&dyad_selectSet, SELECT_READ, stream->sockfd);
if (!(stream->flags & DYAD_FLAG_READY) ||
stream->writeBuffer.length != 0
) {
select_add(&dyad_selectSet, SELECT_WRITE, stream->sockfd);
}
break;
case DYAD_STATE_CLOSING:
select_add(&dyad_selectSet, SELECT_WRITE, stream->sockfd);
break;
case DYAD_STATE_CONNECTING:
select_add(&dyad_selectSet, SELECT_WRITE, stream->sockfd);
select_add(&dyad_selectSet, SELECT_EXCEPT, stream->sockfd);
break;
case DYAD_STATE_LISTENING:
select_add(&dyad_selectSet, SELECT_READ, stream->sockfd);
break;
}
stream = stream->next;
}
/* Init timeout value and do select */
#ifdef _MSC_VER
#pragma warning(push)
/* Disable double to long implicit conversion warning,
* because the type of timeval's fields don't agree across platforms */
#pragma warning(disable: 4244)
#endif
tv.tv_sec = dyad_updateTimeout;
tv.tv_usec = (dyad_updateTimeout - tv.tv_sec) * 1e6;
#ifdef _MSC_VER
#pragma warning(pop)
#endif
select(dyad_selectSet.maxfd + 1,
dyad_selectSet.fds[SELECT_READ],
dyad_selectSet.fds[SELECT_WRITE],
dyad_selectSet.fds[SELECT_EXCEPT],
&tv);
/* Handle streams */
stream = dyad_streams;
while (stream) {
switch (stream->state) {
case DYAD_STATE_CONNECTED:
if (select_has(&dyad_selectSet, SELECT_READ, stream->sockfd)) {
stream_handleReceivedData(stream);
if (stream->state == DYAD_STATE_CLOSED) {
break;
}
}
/* Fall through */
case DYAD_STATE_CLOSING:
if (select_has(&dyad_selectSet, SELECT_WRITE, stream->sockfd)) {
stream_flushWriteBuffer(stream);
}
break;
case DYAD_STATE_CONNECTING:
if (select_has(&dyad_selectSet, SELECT_WRITE, stream->sockfd)) {
/* Check socket for error */
int optval = 0;
socklen_t optlen = sizeof(optval);
dyad_Event e;
getsockopt(stream->sockfd, SOL_SOCKET, SO_ERROR, &optval, &optlen);
if (optval != 0) goto connectFailed;
/* Handle succeselful connection */
stream->state = DYAD_STATE_CONNECTED;
stream->lastActivity = dyad_getTime();
stream_initAddress(stream);
/* Emit connect event */
e = createEvent(DYAD_EVENT_CONNECT);
e.msg = "connected to server";
stream_emitEvent(stream, &e);
} else if (
select_has(&dyad_selectSet, SELECT_EXCEPT, stream->sockfd)
) {
/* Handle failed connection */
connectFailed:
stream_error(stream, "could not connect to server", 0);
}
break;
case DYAD_STATE_LISTENING:
if (select_has(&dyad_selectSet, SELECT_READ, stream->sockfd)) {
stream_acceptPendingConnections(stream);
}
break;
}
/* If data was just now written to the stream we should immediately try to
* send it */
if (
stream->flags & DYAD_FLAG_WRITTEN &&
stream->state != DYAD_STATE_CLOSED
) {
stream_flushWriteBuffer(stream);
}
stream = stream->next;
}
}
void dyad_init(void) {
#ifdef _WIN32
WSADATA dat;
int err = WSAStartup(MAKEWORD(2, 2), &dat);
if (err != 0) {
panic("WSAStartup failed (%d)", err);
}
#else
/* Stops the SIGPIPE signal being raised when writing to a closed socket */
signal(SIGPIPE, SIG_IGN);
#endif
}
void dyad_shutdown(void) {
/* Close and destroy all the streams */
while (dyad_streams) {
dyad_close(dyad_streams);
stream_destroy(dyad_streams);
}
/* Clear up everything */
select_deinit(&dyad_selectSet);
#ifdef _WIN32
WSACleanup();
#endif
}
const char *dyad_getVersion(void) {
return DYAD_VERSION;
}
double dyad_getTime(void) {
#ifdef _WIN32
FILETIME ft;
GetSystemTimeAsFileTime(&ft);
return (ft.dwHighDateTime * 4294967296.0 / 1e7) + ft.dwLowDateTime / 1e7;
#else
struct timeval tv;
gettimeofday(&tv, NULL);
return tv.tv_sec + tv.tv_usec / 1e6;
#endif
}
int dyad_getStreamCount(void) {
return dyad_streamCount;
}
void dyad_setTickInterval(double seconds) {
dyad_tickInterval = seconds;
}
void dyad_setUpdateTimeout(double seconds) {
dyad_updateTimeout = seconds;
}
dyad_PanicCallback dyad_atPanic(dyad_PanicCallback func) {
dyad_PanicCallback old = panicCallback;
panicCallback = func;
return old;
}
/*---------------------------------------------------------------------------*/
/* Stream */
/*---------------------------------------------------------------------------*/
dyad_Stream *dyad_newStream(void) {
dyad_Stream *stream = dyad_realloc(NULL, sizeof(*stream));
memset(stream, 0, sizeof(*stream));
stream->state = DYAD_STATE_CLOSED;
stream->sockfd = INVALID_SOCKET;
stream->lastActivity = dyad_getTime();
/* Add to list and increment count */
stream->next = dyad_streams;
dyad_streams = stream;
dyad_streamCount++;
return stream;
}
void dyad_addListener(
dyad_Stream *stream, int event, dyad_Callback callback, void *udata
) {
Listener listener;
listener.event = event;
listener.callback = callback;
listener.udata = udata;
vec_push(&stream->listeners, listener);
}
void dyad_removeListener(
dyad_Stream *stream, int event, dyad_Callback callback, void *udata
) {
int i = stream->listeners.length;
while (i--) {
Listener *x = &stream->listeners.data[i];
if (x->event == event && x->callback == callback && x->udata == udata) {
vec_splice(&stream->listeners, i, 1);
}
}
}
void dyad_removeAllListeners(dyad_Stream *stream, int event) {
if (event == DYAD_EVENT_NULL) {
vec_clear(&stream->listeners);
} else {
int i = stream->listeners.length;
while (i--) {
if (stream->listeners.data[i].event == event) {
vec_splice(&stream->listeners, i, 1);
}
}
}
}
void dyad_close(dyad_Stream *stream) {
dyad_Event e;
if (stream->state == DYAD_STATE_CLOSED) return;
stream->state = DYAD_STATE_CLOSED;
/* Close socket */
if (stream->sockfd != INVALID_SOCKET) {
close(stream->sockfd);
stream->sockfd = INVALID_SOCKET;
}
/* Emit event */
e = createEvent(DYAD_EVENT_CLOSE);
e.msg = "stream closed";
stream_emitEvent(stream, &e);
/* Clear buffers */
vec_clear(&stream->lineBuffer);
vec_clear(&stream->writeBuffer);
}
void dyad_end(dyad_Stream *stream) {
if (stream->state == DYAD_STATE_CLOSED) return;
if (stream->writeBuffer.length > 0) {
stream->state = DYAD_STATE_CLOSING;
} else {
dyad_close(stream);
}
}
int dyad_listenEx(
dyad_Stream *stream, const char *host, int port, int backlog
) {
struct addrinfo hints, *ai = NULL;
int err, optval;
char buf[64];
dyad_Event e;
/* Get addrinfo */
memset(&hints, 0, sizeof(hints));
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
hints.ai_flags = AI_PASSIVE;
sprintf(buf, "%d", port);
err = getaddrinfo(host, buf, &hints, &ai);
if (err) {
stream_error(stream, "could not get addrinfo", errno);
goto fail;
}
/* Init socket */
err = stream_initSocket(stream, ai->ai_family, ai->ai_socktype,
ai->ai_protocol);
if (err) goto fail;
/* Set SO_REUSEADDR so that the socket can be immediately bound without
* having to wait for any closed socket on the same port to timeout */
optval = 1;
setsockopt(stream->sockfd, SOL_SOCKET, SO_REUSEADDR,
&optval, sizeof(optval));
/* Bind and listen */
err = bind(stream->sockfd, ai->ai_addr, ai->ai_addrlen);
if (err) {
stream_error(stream, "could not bind socket", errno);
goto fail;
}
err = listen(stream->sockfd, backlog);
if (err) {
stream_error(stream, "socket failed on listen", errno);
goto fail;
}
stream->state = DYAD_STATE_LISTENING;
stream->port = port;
stream_initAddress(stream);
/* Emit listening event */
e = createEvent(DYAD_EVENT_LISTEN);
e.msg = "socket is listening";
stream_emitEvent(stream, &e);
freeaddrinfo(ai);
return 0;
fail:
if (ai) freeaddrinfo(ai);
return -1;
}
int dyad_listen(dyad_Stream *stream, int port) {
return dyad_listenEx(stream, NULL, port, 511);
}
int dyad_connect(dyad_Stream *stream, const char *host, int port) {
struct addrinfo hints, *ai = NULL;
int err;
char buf[64];
/* Resolve host */
memset(&hints, 0, sizeof(hints));
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
sprintf(buf, "%d", port);
err = getaddrinfo(host, buf, &hints, &ai);
if (err) {
stream_error(stream, "could not resolve host", 0);
goto fail;
}
/* Start connecting */
err = stream_initSocket(stream, ai->ai_family, ai->ai_socktype,
ai->ai_protocol);
if (err) goto fail;
connect(stream->sockfd, ai->ai_addr, ai->ai_addrlen);
stream->state = DYAD_STATE_CONNECTING;
freeaddrinfo(ai);
return 0;
fail:
if (ai) freeaddrinfo(ai);
return -1;
}
void dyad_write(dyad_Stream *stream, const void *data, int size) {
const char *p = data;
while (size--) {
vec_push(&stream->writeBuffer, *p++);
}
stream->flags |= DYAD_FLAG_WRITTEN;
}
void dyad_vwritef(dyad_Stream *stream, const char *fmt, va_list args) {
char buf[512];
char *str;
char f[] = "%_";
FILE *fp;
int c;
while (*fmt) {
if (*fmt == '%') {
fmt++;
switch (*fmt) {
case 'r':
fp = va_arg(args, FILE*);
if (fp == NULL) {
str = "(null)";
goto writeStr;
}
while ((c = fgetc(fp)) != EOF) {
vec_push(&stream->writeBuffer, c);
}
break;
case 'c':
vec_push(&stream->writeBuffer, va_arg(args, int));
break;
case 's':
str = va_arg(args, char*);
if (str == NULL) str = "(null)";
writeStr:
while (*str) {
vec_push(&stream->writeBuffer, *str++);
}
break;
case 'b':
str = va_arg(args, char*);
c = va_arg(args, int);
while (c--) {
vec_push(&stream->writeBuffer, *str++);
}
break;
default:
f[1] = *fmt;
switch (*fmt) {
case 'f':
case 'g': sprintf(buf, f, va_arg(args, double)); break;
case 'd':
case 'i': sprintf(buf, f, va_arg(args, int)); break;
case 'x':
case 'X': sprintf(buf, f, va_arg(args, unsigned)); break;
case 'p': sprintf(buf, f, va_arg(args, void*)); break;
default : buf[0] = *fmt; buf[1] = '\0';
}
str = buf;
goto writeStr;
}
} else {
vec_push(&stream->writeBuffer, *fmt);
}
fmt++;
}
stream->flags |= DYAD_FLAG_WRITTEN;
}
void dyad_writef(dyad_Stream *stream, const char *fmt, ...) {
va_list args;
va_start(args, fmt);
dyad_vwritef(stream, fmt, args);
va_end(args);
}
void dyad_setTimeout(dyad_Stream *stream, double seconds) {
stream->timeout = seconds;
}
void dyad_setNoDelay(dyad_Stream *stream, int opt) {
opt = !!opt;
setsockopt(stream->sockfd, IPPROTO_TCP, TCP_NODELAY, &opt, sizeof(opt));
}
int dyad_getState(dyad_Stream *stream) {
return stream->state;
}
const char *dyad_getAddress(dyad_Stream *stream) {
return stream->address ? stream->address : "";
}
int dyad_getPort(dyad_Stream *stream) {
return stream->port;
}
int dyad_getBytesSent(dyad_Stream *stream) {
return stream->bytesSent;
}
int dyad_getBytesReceived(dyad_Stream *stream) {
return stream->bytesReceived;
}
dyad_Socket dyad_getSocket(dyad_Stream *stream) {
return stream->sockfd;
}