Events Source Message Queue (#129)

* Events Source Message Queue
 - Fixes problem where events sent close together are silently dropped.

* Update AsyncEventSource.cpp

error
This commit is contained in:
sticilface 2017-03-11 08:57:37 +00:00 committed by Me No Dev
parent fce6aad2c3
commit b9641902bf
2 changed files with 119 additions and 14 deletions

View File

@ -104,9 +104,53 @@ static String generateEventMessage(const char *message, const char *event, uint3
return ev;
}
// Message
AsyncEventSourceMessage::AsyncEventSourceMessage(const char * data, size_t len)
: _status(EVS_MSG_ERROR), _data(nullptr), _len(len), _sent(0)
{
_data = (uint8_t*)malloc(_len+1);
if(_data == NULL){
_len = 0;
_status = EVS_MSG_ERROR;
} else {
_status = EVS_MSG_SENDING;
memcpy(_data, data, len);
_data[_len] = 0;
String temp = data;
}
}
AsyncEventSourceMessage::~AsyncEventSourceMessage() {
if(_data != NULL)
free(_data);
}
void AsyncEventSourceMessage::ack(size_t len, uint32_t time) {
if(_len == len && _sent == _len){
_status = EVS_MSG_SENT;
}
}
size_t AsyncEventSourceMessage::send(AsyncClient *client) {
if(!client->canSend()){
return 0;
}
if(client->space() < _len){
return 0;
}
size_t sent = client->write((const char *)_data, _len);
_sent = sent;
return sent;
}
// Client
AsyncEventSourceClient::AsyncEventSourceClient(AsyncWebServerRequest *request, AsyncEventSource *server){
AsyncEventSourceClient::AsyncEventSourceClient(AsyncWebServerRequest *request, AsyncEventSource *server)
: _messageQueue(LinkedList<AsyncEventSourceMessage *>([](AsyncEventSourceMessage *m){ delete m; }))
{
_client = request->client();
_server = server;
_lastId = 0;
@ -115,19 +159,50 @@ AsyncEventSourceClient::AsyncEventSourceClient(AsyncWebServerRequest *request, A
_client->setRxTimeout(0);
_client->onError(NULL, NULL);
_client->onAck(NULL, NULL);
_client->onPoll(NULL, NULL);
_client->onAck([](void *r, AsyncClient* c, size_t len, uint32_t time){ ((AsyncEventSourceClient*)(r))->_onAck(len, time); }, this);
_client->onPoll([](void *r, AsyncClient* c){ ((AsyncEventSourceClient*)(r))->_onPoll(); }, this);
_client->onData(NULL, NULL);
_client->onTimeout([](void *r, AsyncClient* c __attribute__((unused)), uint32_t time){ ((AsyncEventSourceClient*)(r))->_onTimeout(time); }, this);
_client->onDisconnect([](void *r, AsyncClient* c){ ((AsyncEventSourceClient*)(r))->_onDisconnect(); delete c; }, this);
_client->onTimeout([this](void *r, AsyncClient* c __attribute__((unused)), uint32_t time){ ((AsyncEventSourceClient*)(r))->_onTimeout(time); }, this);
_client->onDisconnect([this](void *r, AsyncClient* c){ ((AsyncEventSourceClient*)(r))->_onDisconnect(); delete c; }, this);
_server->_addClient(this);
delete request;
}
AsyncEventSourceClient::~AsyncEventSourceClient(){
_messageQueue.free();
close();
}
void AsyncEventSourceClient::_queueMessage(AsyncEventSourceMessage *dataMessage){
if(dataMessage == NULL)
return;
if(!connected()){
delete dataMessage;
return;
}
_messageQueue.add(dataMessage);
if(_client->canSend()) { _runQueue(); }
}
void AsyncEventSourceClient::_onAck(size_t len, uint32_t time){
if(len && !_messageQueue.isEmpty()){
_messageQueue.front()->ack(len, time);
}
_runQueue();
}
void AsyncEventSourceClient::_onPoll(){
if(_client->canSend() && !_messageQueue.isEmpty()){
_runQueue();
}
}
void AsyncEventSourceClient::_onTimeout(uint32_t time __attribute__((unused))){
_client->close(true);
}
@ -143,18 +218,22 @@ void AsyncEventSourceClient::close(){
}
void AsyncEventSourceClient::write(const char * message, size_t len){
if(!_client->canSend()){
return;
}
if(_client->space() < len){
return;
}
_client->write(message, len);
_queueMessage(new AsyncEventSourceMessage(message, len));
}
void AsyncEventSourceClient::send(const char *message, const char *event, uint32_t id, uint32_t reconnect){
String ev = generateEventMessage(message, event, id, reconnect);
write(ev.c_str(), ev.length());
_queueMessage(new AsyncEventSourceMessage(ev.c_str(), ev.length()));
}
void AsyncEventSourceClient::_runQueue(){
while(!_messageQueue.isEmpty() && _messageQueue.front()->finished()){
_messageQueue.remove(_messageQueue.front());
}
if(!_messageQueue.isEmpty()){
_messageQueue.front()->send(_client);
}
}
@ -210,8 +289,9 @@ void AsyncEventSource::send(const char *message, const char *event, uint32_t id,
String ev = generateEventMessage(message, event, id, reconnect);
for(const auto &c: _clients){
if(c->connected())
if(c->connected()) {
c->write(ev.c_str(), ev.length());
}
}
}
@ -257,3 +337,4 @@ size_t AsyncEventSourceResponse::_ack(AsyncWebServerRequest *request, size_t len
}
return 0;
}

View File

@ -29,11 +29,33 @@ class AsyncEventSourceResponse;
class AsyncEventSourceClient;
typedef std::function<void(AsyncEventSourceClient *client)> ArEventHandlerFunction;
typedef enum { EVS_MSG_SENDING, EVS_MSG_SENT, EVS_MSG_ERROR } EventSourceMessageStatus;
class AsyncEventSourceMessage {
private:
EventSourceMessageStatus _status;
uint8_t * _data;
size_t _len;
size_t _sent;
//size_t _ack;
//size_t _acked;
public:
AsyncEventSourceMessage(const char * data, size_t len);
~AsyncEventSourceMessage();
void ack(size_t len __attribute__((unused)), uint32_t time __attribute__((unused)));
size_t send(AsyncClient *client __attribute__((unused)));
bool finished(){ return _status != EVS_MSG_SENDING; }
};
class AsyncEventSourceClient {
private:
AsyncClient *_client;
AsyncEventSource *_server;
uint32_t _lastId;
LinkedList<AsyncEventSourceMessage *> _messageQueue;
void _queueMessage(AsyncEventSourceMessage *dataMessage);
void _runQueue();
public:
@ -48,6 +70,8 @@ class AsyncEventSourceClient {
uint32_t lastId() const { return _lastId; }
//system callbacks (do not call)
void _onAck(size_t len, uint32_t time);
void _onPoll();
void _onTimeout(uint32_t time);
void _onDisconnect();
};