Add onConnect callback to EventSources

usage:
  AsyncEventSource events("/events");
  events.onConnect([](AsyncEventSourceClient *client){
    client->send("Hello!",NULL,0,1000);
  });
  server.addHandler(&events);
  ...
  events.send("boot finished","system");//send "system" event
This commit is contained in:
Me No Dev 2016-06-29 19:29:39 +03:00
parent 49b6046125
commit 0d02922e52
2 changed files with 128 additions and 108 deletions

View File

@ -20,109 +20,7 @@
#include "Arduino.h"
#include "AsyncEventSource.h"
// Client
AsyncEventSourceClient::AsyncEventSourceClient(AsyncWebServerRequest *request, AsyncEventSource *server){
_client = request->client();
_server = server;
next = NULL;
//_client->onError([](void *r, AsyncClient* c, int8_t error){ ((AsyncEventSourceClient*)(r))->_onError(error); }, this);
//_client->onAck([](void *r, AsyncClient* c, size_t len, uint32_t time){ ((AsyncEventSourceClient*)(r))->_onAck(len, time); }, this);
//_client->onPoll([](void *r, AsyncClient* c){ ((AsyncEventSourceClient*)(r))->_onPoll(); }, this);
_client->onError(NULL, NULL);
_client->onAck(NULL, NULL);
_client->onPoll(NULL, NULL);
_client->onData(NULL, NULL);
_client->onTimeout([](void *r, AsyncClient* c, uint32_t time){ ((AsyncEventSourceClient*)(r))->_onTimeout(time); }, this);
_client->onDisconnect([](void *r, AsyncClient* c){ ((AsyncEventSourceClient*)(r))->_onDisconnect(); }, this);
_server->_addClient(this);
delete request;
}
AsyncEventSourceClient::~AsyncEventSourceClient(){
close();
}
//void AsyncEventSourceClient::_onAck(size_t len, uint32_t time){}
//void AsyncEventSourceClient::_onPoll(){}
//void AsyncEventSourceClient::_onError(int8_t){}
void AsyncEventSourceClient::_onTimeout(uint32_t time){
_client->close(true);
}
void AsyncEventSourceClient::_onDisconnect(){
AsyncClient* cl = _client;
_client = NULL;
cl->free();
delete cl;
_server->_handleDisconnect(this);
}
void AsyncEventSourceClient::close(){
if(_client != NULL)
_client->close(true);
}
void AsyncEventSourceClient::send(const char * message, size_t len){
if(!_client->canSend()){
return;
}
if(_client->space() < len){
return;
}
_client->write(message, len);
}
// Handler
AsyncEventSource::AsyncEventSource(String url):_url(url){}
AsyncEventSource::~AsyncEventSource(){
close();
}
void AsyncEventSource::_addClient(AsyncEventSourceClient * client){
if(_clients == NULL){
_clients = client;
return;
}
AsyncEventSourceClient * c = _clients;
while(c->next != NULL) c = c->next;
c->next = client;
}
void AsyncEventSource::_handleDisconnect(AsyncEventSourceClient * client){
if(_clients == NULL){
return;
}
if(_clients == client){
_clients = client->next;
delete client;
return;
}
AsyncEventSourceClient * c = _clients;
while(c->next != NULL && c->next != client) c = c->next;
if(c->next == NULL){
return;
}
c->next = client->next;
delete client;
}
void AsyncEventSource::close(){
AsyncEventSourceClient * c = _clients;
while(c != NULL){
if(c->connected())
c->close();
c = c->next;
}
}
void AsyncEventSource::send(const char *message, const char *event, uint32_t id, uint32_t reconnect){
if(_clients == NULL)
return;
static String generateEventMessage(const char *message, const char *event, uint32_t id, uint32_t reconnect){
String ev = "";
if(reconnect){
@ -151,7 +49,6 @@ void AsyncEventSource::send(const char *message, const char *event, uint32_t id,
char * nextN = strchr(lineStart, '\n');
char * nextR = strchr(lineStart, '\r');
if(nextN == NULL && nextR == NULL){
//last line
size_t llen = ((char *)message + messageLen) - lineStart;
char * ldata = (char *)malloc(llen+1);
if(ldata != NULL){
@ -204,15 +101,134 @@ void AsyncEventSource::send(const char *message, const char *event, uint32_t id,
} while(lineStart < ((char *)message + messageLen));
}
//os_printf("EVENT_SOURCE:\n%s", ev.c_str());
return ev;
}
// Client
AsyncEventSourceClient::AsyncEventSourceClient(AsyncWebServerRequest *request, AsyncEventSource *server){
_client = request->client();
_server = server;
next = NULL;
//_client->onError([](void *r, AsyncClient* c, int8_t error){ ((AsyncEventSourceClient*)(r))->_onError(error); }, this);
//_client->onAck([](void *r, AsyncClient* c, size_t len, uint32_t time){ ((AsyncEventSourceClient*)(r))->_onAck(len, time); }, this);
//_client->onPoll([](void *r, AsyncClient* c){ ((AsyncEventSourceClient*)(r))->_onPoll(); }, this);
_client->onError(NULL, NULL);
_client->onAck(NULL, NULL);
_client->onPoll(NULL, NULL);
_client->onData(NULL, NULL);
_client->onTimeout([](void *r, AsyncClient* c, uint32_t time){ ((AsyncEventSourceClient*)(r))->_onTimeout(time); }, this);
_client->onDisconnect([](void *r, AsyncClient* c){ ((AsyncEventSourceClient*)(r))->_onDisconnect(); }, this);
_server->_addClient(this);
delete request;
}
AsyncEventSourceClient::~AsyncEventSourceClient(){
close();
}
//void AsyncEventSourceClient::_onAck(size_t len, uint32_t time){}
//void AsyncEventSourceClient::_onPoll(){}
//void AsyncEventSourceClient::_onError(int8_t){}
void AsyncEventSourceClient::_onTimeout(uint32_t time){
_client->close(true);
}
void AsyncEventSourceClient::_onDisconnect(){
AsyncClient* cl = _client;
_client = NULL;
cl->free();
delete cl;
_server->_handleDisconnect(this);
}
void AsyncEventSourceClient::close(){
if(_client != NULL)
_client->close(true);
}
void AsyncEventSourceClient::write(const char * message, size_t len){
if(!_client->canSend()){
return;
}
if(_client->space() < len){
return;
}
_client->write(message, len);
}
void AsyncEventSourceClient::send(const char *message, const char *event, uint32_t id, uint32_t reconnect){
String ev = generateEventMessage(message, event, id, reconnect);
write(ev.c_str(), ev.length());
}
// Handler
AsyncEventSource::AsyncEventSource(String url)
: _url(url)
, _clients(NULL)
, _connectcb(NULL)
{}
AsyncEventSource::~AsyncEventSource(){
close();
}
void AsyncEventSource::onConnect(ArEventHandlerFunction cb){
_connectcb = cb;
}
void AsyncEventSource::_addClient(AsyncEventSourceClient * client){
if(_clients == NULL){
_clients = client;
return;
}
AsyncEventSourceClient * c = _clients;
while(c->next != NULL) c = c->next;
c->next = client;
if(_connectcb)
_connectcb(client);
}
void AsyncEventSource::_handleDisconnect(AsyncEventSourceClient * client){
if(_clients == NULL){
return;
}
if(_clients == client){
_clients = client->next;
delete client;
return;
}
AsyncEventSourceClient * c = _clients;
while(c->next != NULL && c->next != client) c = c->next;
if(c->next == NULL){
return;
}
c->next = client->next;
delete client;
}
void AsyncEventSource::close(){
AsyncEventSourceClient * c = _clients;
while(c != NULL){
if(c->connected())
c->send(ev.c_str(), ev.length());
c->close();
c = c->next;
}
}
void AsyncEventSource::send(const char *message, const char *event, uint32_t id, uint32_t reconnect){
if(_clients == NULL)
return;
String ev = generateEventMessage(message, event, id, reconnect);
AsyncEventSourceClient * c = _clients;
while(c != NULL){
if(c->connected())
c->write(ev.c_str(), ev.length());
c = c->next;
}
ev = String();
}
bool AsyncEventSource::canHandle(AsyncWebServerRequest *request){

View File

@ -27,6 +27,7 @@
class AsyncEventSource;
class AsyncEventSourceResponse;
class AsyncEventSourceClient;
typedef std::function<void(AsyncEventSourceClient *client)> ArEventHandlerFunction;
class AsyncEventSourceClient {
private:
@ -41,7 +42,8 @@ class AsyncEventSourceClient {
AsyncClient* client(){ return _client; }
void close();
void send(const char * message, size_t len);
void write(const char * message, size_t len);
void send(const char *message, const char *event=NULL, uint32_t id=0, uint32_t reconnect=0);
bool connected(){ return (_client != NULL) && _client->connected(); }
//system callbacks (do not call)
@ -56,6 +58,7 @@ class AsyncEventSource: public AsyncWebHandler {
private:
String _url;
AsyncEventSourceClient * _clients;
ArEventHandlerFunction _connectcb;
uint32_t _cNextId;
public:
AsyncEventSource(String url);
@ -63,6 +66,7 @@ class AsyncEventSource: public AsyncWebHandler {
const char * url(){ return _url.c_str(); }
void close();
void onConnect(ArEventHandlerFunction cb);
void send(const char *message, const char *event=NULL, uint32_t id=0, uint32_t reconnect=0);
//system callbacks (do not call)