Web Sockets memory improvements : shared message buffer, direct buffer access (#144)

* Add support for a MultiMessage where the buffer is shared between WS messages to multiple clients.
Add ability to create a WS buffer directly and write to it, saving duplication of message in RAM, then send it.  example below is an arduinoJson message.

```cpp
void sendDataWs(AsyncWebSocketClient * client)
{
    DynamicJsonBuffer jsonBuffer;
    JsonObject& root = jsonBuffer.createObject();
    root["a"] = "abc";
    root["b"] = "abcd";
    root["c"] = "abcde";
    root["d"] = "abcdef";
    root["e"] = "abcdefg";
    size_t len = root.measureLength();
    AsyncWebSocketMessageBuffer * buffer = ws.makeBuffer(len); //  creates a buffer (len + 1) for you.
    if (buffer) {
        root.printTo((char *)buffer->get(), len + 1);
        if (client) {
            client->text(buffer);
        } else {
            ws.textAll(buffer);
        }
    }
}
```

* Add example to readme.
This commit is contained in:
sticilface 2017-03-14 13:55:39 +00:00 committed by Me No Dev
parent b9641902bf
commit 1b35f15ec2
3 changed files with 436 additions and 107 deletions

View File

@ -790,6 +790,32 @@ const uint8_t flash_binary[] PROGMEM = { 0x01, 0x02, 0x03, 0x04 };
client->binary(flash_binary, 4);
```
### Direct access to web socket message buffer
When sending a web socket message using the above methods a buffer is created. Under certain circumstances you might want to manipulate or populate this buffer directly from your application, for example to prevent unnecessary duplications of the data. This example below shows how to create a buffer and print data to it from an ArduinoJson object then send it.
```cpp
void sendDataWs(AsyncWebSocketClient * client)
{
DynamicJsonBuffer jsonBuffer;
JsonObject& root = jsonBuffer.createObject();
root["a"] = "abc";
root["b"] = "abcd";
root["c"] = "abcde";
root["d"] = "abcdef";
root["e"] = "abcdefg";
size_t len = root.measureLength();
AsyncWebSocketMessageBuffer * buffer = ws.makeBuffer(len); // creates a buffer (len + 1) for you.
if (buffer) {
root.printTo((char *)buffer->get(), len + 1);
if (client) {
client->text(buffer);
} else {
ws.textAll(buffer);
}
}
}
```
## Async Event Source Plugin
The server includes EventSource (Server-Sent Events) plugin which can be used to send short text events to the browser.
Difference between EventSource and WebSockets is that EventSource is single direction, text-only protocol.

View File

@ -119,6 +119,121 @@ size_t webSocketSendFrame(AsyncClient *client, bool final, uint8_t opcode, bool
}
/*
* AsyncWebSocketMessageBuffer
*/
AsyncWebSocketMessageBuffer::AsyncWebSocketMessageBuffer()
:_data(nullptr)
,_len(0)
,_lock(false)
,_count(0)
{
}
AsyncWebSocketMessageBuffer::AsyncWebSocketMessageBuffer(uint8_t * data, size_t size)
:_data(nullptr)
,_len(size)
,_lock(false)
,_count(0)
{
if (!data) {
return;
}
_data = new uint8_t[size + 1];
if (_data) {
memcpy(_data, data, _len);
_data[_len] = 0;
}
}
AsyncWebSocketMessageBuffer::AsyncWebSocketMessageBuffer(size_t size)
:_data(nullptr)
,_len(size)
,_lock(false)
,_count(0)
{
_data = new uint8_t[_len + 1];
if (_data) {
_data[_len] = 0;
}
}
AsyncWebSocketMessageBuffer::AsyncWebSocketMessageBuffer(const AsyncWebSocketMessageBuffer & copy)
:_data(nullptr)
,_len(0)
,_lock(false)
,_count(0)
{
_len = copy._len;
_lock = copy._lock;
_count = 0;
if (_len) {
_data = new uint8_t[_len + 1];
_data[_len] = 0;
}
if (_data) {
memcpy(_data, copy._data, _len);
_data[_len] = 0;
}
}
AsyncWebSocketMessageBuffer::AsyncWebSocketMessageBuffer(AsyncWebSocketMessageBuffer && copy)
:_data(nullptr)
,_len(0)
,_lock(false)
,_count(0)
{
_len = copy._len;
_lock = copy._lock;
_count = 0;
if (copy._data) {
_data = copy._data;
copy._data = nullptr;
}
}
AsyncWebSocketMessageBuffer::~AsyncWebSocketMessageBuffer()
{
if (_data) {
delete[] _data;
}
}
bool AsyncWebSocketMessageBuffer::reserve(size_t size)
{
_len = size;
if (_data) {
delete[] _data;
_data = nullptr;
}
_data = new uint8_t[size];
if (_data) {
_data[_len] = 0;
return true;
} else {
return false;
}
}
/*
@ -167,69 +282,146 @@ class AsyncWebSocketControl {
* Basic Buffered Message
*/
class AsyncWebSocketBasicMessage: public AsyncWebSocketMessage {
private:
uint8_t * _data;
size_t _len;
size_t _sent;
size_t _ack;
size_t _acked;
public:
AsyncWebSocketBasicMessage(const char * data, size_t len, uint8_t opcode=WS_TEXT, bool mask=false)
:_len(len)
,_sent(0)
,_ack(0)
,_acked(0)
{
_opcode = opcode & 0x07;
_mask = mask;
_data = (uint8_t*)malloc(_len+1);
if(_data == NULL){
_len = 0;
_status = WS_MSG_ERROR;
} else {
_status = WS_MSG_SENDING;
memcpy(_data, data, _len);
_data[_len] = 0;
}
}
virtual ~AsyncWebSocketBasicMessage() override {
if(_data != NULL)
free(_data);
}
virtual bool betweenFrames() const override { return _acked == _ack; }
virtual void ack(size_t len, uint32_t time) override {
_acked += len;
if(_sent == _len && _acked == _ack){
_status = WS_MSG_SENT;
}
}
virtual size_t send(AsyncClient *client) override {
if(_status != WS_MSG_SENDING)
return 0;
if(_acked < _ack){
return 0;
}
if(_sent == _len){
if(_acked == _ack)
_status = WS_MSG_SENT;
return 0;
}
size_t window = webSocketSendFrameWindow(client);
size_t toSend = _len - _sent;
if(window < toSend) toSend = window;
bool final = ((toSend + _sent) == _len);
size_t sent = webSocketSendFrame(client, final, (_sent == 0)?_opcode:WS_CONTINUATION, _mask, (uint8_t*)(_data+_sent), toSend);
_sent += sent;
uint8_t headLen = ((sent < 126)?2:4)+(_mask*4);
_ack += sent + headLen;
return sent;
}
};
AsyncWebSocketBasicMessage::AsyncWebSocketBasicMessage(const char * data, size_t len, uint8_t opcode, bool mask)
:_len(len)
,_sent(0)
,_ack(0)
,_acked(0)
{
_opcode = opcode & 0x07;
_mask = mask;
_data = (uint8_t*)malloc(_len+1);
if(_data == NULL){
_len = 0;
_status = WS_MSG_ERROR;
} else {
_status = WS_MSG_SENDING;
memcpy(_data, data, _len);
_data[_len] = 0;
}
}
AsyncWebSocketBasicMessage::AsyncWebSocketBasicMessage(uint8_t opcode, bool mask)
:_len(0)
,_sent(0)
,_ack(0)
,_acked(0)
{
_opcode = opcode & 0x07;
_mask = mask;
}
AsyncWebSocketBasicMessage::~AsyncWebSocketBasicMessage() {
if(_data != NULL)
free(_data);
}
void AsyncWebSocketBasicMessage::ack(size_t len, uint32_t time) {
_acked += len;
if(_sent == _len && _acked == _ack){
_status = WS_MSG_SENT;
}
}
size_t AsyncWebSocketBasicMessage::send(AsyncClient *client) {
if(_status != WS_MSG_SENDING)
return 0;
if(_acked < _ack){
return 0;
}
if(_sent == _len){
if(_acked == _ack)
_status = WS_MSG_SENT;
return 0;
}
size_t window = webSocketSendFrameWindow(client);
size_t toSend = _len - _sent;
if(window < toSend) toSend = window;
bool final = ((toSend + _sent) == _len);
size_t sent = webSocketSendFrame(client, final, (_sent == 0)?_opcode:WS_CONTINUATION, _mask, (uint8_t*)(_data+_sent), toSend);
_sent += sent;
uint8_t headLen = ((sent < 126)?2:4)+(_mask*4);
_ack += sent + headLen;
return sent;
}
// bool AsyncWebSocketBasicMessage::reserve(size_t size) {
// if (size) {
// _data = (uint8_t*)malloc(size +1);
// if (_data) {
// memset(_data, 0, size);
// _len = size;
// _status = WS_MSG_SENDING;
// return true;
// }
// }
// return false;
// }
/*
* AsyncWebSocketMultiMessage Message
*/
AsyncWebSocketMultiMessage::AsyncWebSocketMultiMessage(AsyncWebSocketMessageBuffer * buffer, uint8_t opcode, bool mask)
:_len(0)
,_sent(0)
,_ack(0)
,_acked(0)
,_WSbuffer(nullptr)
{
_opcode = opcode & 0x07;
_mask = mask;
if (buffer) {
_WSbuffer = buffer;
(*_WSbuffer)++;
_data = buffer->get();
_len = buffer->length();
_status = WS_MSG_SENDING;
} else {
_status = WS_MSG_ERROR;
}
}
AsyncWebSocketMultiMessage::~AsyncWebSocketMultiMessage() {
if (_WSbuffer) {
(*_WSbuffer)--; // decreases the counter.
}
}
void AsyncWebSocketMultiMessage::ack(size_t len, uint32_t time) {
_acked += len;
if(_sent == _len && _acked == _ack){
_status = WS_MSG_SENT;
}
}
size_t AsyncWebSocketMultiMessage::send(AsyncClient *client) {
if(_status != WS_MSG_SENDING)
return 0;
if(_acked < _ack){
return 0;
}
if(_sent == _len){
if(_acked == _ack)
_status = WS_MSG_SENT;
return 0;
}
size_t window = webSocketSendFrameWindow(client);
size_t toSend = _len - _sent;
if(window < toSend) toSend = window;
bool final = ((toSend + _sent) == _len);
size_t sent = webSocketSendFrame(client, final, (_sent == 0)?_opcode:WS_CONTINUATION, _mask, (uint8_t*)(_data+_sent), toSend);
_sent += sent;
uint8_t headLen = ((sent < 126)?2:4)+(_mask*4);
_ack += sent + headLen;
return sent;
}
/*
@ -285,6 +477,7 @@ void AsyncWebSocketClient::_onAck(size_t len, uint32_t time){
if(len && !_messageQueue.isEmpty()){
_messageQueue.front()->ack(len, time);
}
_server->_cleanBuffers();
_runQueue();
}
@ -531,9 +724,13 @@ void AsyncWebSocketClient::text(const __FlashStringHelper *data){
message[b] = pgm_read_byte(p++);
message[n] = 0;
text(message, n);
free(message);
free(message);
}
}
void AsyncWebSocketClient::text(AsyncWebSocketMessageBuffer * buffer)
{
_queueMessage(new AsyncWebSocketMultiMessage(buffer));
}
void AsyncWebSocketClient::binary(const char * message, size_t len){
_queueMessage(new AsyncWebSocketBasicMessage(message, len, WS_BINARY));
@ -559,6 +756,11 @@ void AsyncWebSocketClient::binary(const __FlashStringHelper *data, size_t len){
binary(message, len);
free(message);
}
}
void AsyncWebSocketClient::binary(AsyncWebSocketMessageBuffer * buffer)
{
_queueMessage(new AsyncWebSocketMultiMessage(buffer, WS_BINARY));
}
IPAddress AsyncWebSocketClient::remoteIP() {
@ -586,6 +788,7 @@ AsyncWebSocket::AsyncWebSocket(const String& url)
,_clients(LinkedList<AsyncWebSocketClient *>([](AsyncWebSocketClient *c){ delete c; }))
,_cNextId(1)
,_enabled(true)
,_buffers(LinkedList<AsyncWebSocketMessageBuffer *>([](AsyncWebSocketMessageBuffer *b){ delete b; }))
{
_eventHandler = NULL;
}
@ -657,11 +860,21 @@ void AsyncWebSocket::text(uint32_t id, const char * message, size_t len){
c->text(message, len);
}
void AsyncWebSocket::textAll(const char * message, size_t len){
void AsyncWebSocket::textAll(AsyncWebSocketMessageBuffer * buffer){
if (!buffer) return;
buffer->lock();
for(const auto& c: _clients){
if(c->status() == WS_CONNECTED)
c->text(message, len);
c->text(buffer);
}
buffer->unlock();
_cleanBuffers();
}
void AsyncWebSocket::textAll(const char * message, size_t len){
AsyncWebSocketMessageBuffer * WSBuffer = makeBuffer((uint8_t *)message, len);
textAll(WSBuffer);
}
void AsyncWebSocket::binary(uint32_t id, const char * message, size_t len){
@ -671,10 +884,20 @@ void AsyncWebSocket::binary(uint32_t id, const char * message, size_t len){
}
void AsyncWebSocket::binaryAll(const char * message, size_t len){
for(const auto& c: _clients){
AsyncWebSocketMessageBuffer * buffer = makeBuffer((uint8_t *)message, len);
binaryAll(buffer);
}
void AsyncWebSocket::binaryAll(AsyncWebSocketMessageBuffer * buffer)
{
if (!buffer) return;
buffer->lock();
for(const auto& c: _clients){
if(c->status() == WS_CONNECTED)
c->binary(message, len);
c->binary(buffer);
}
buffer->unlock();
_cleanBuffers();
}
void AsyncWebSocket::message(uint32_t id, AsyncWebSocketMessage *message){
@ -683,11 +906,12 @@ void AsyncWebSocket::message(uint32_t id, AsyncWebSocketMessage *message){
c->message(message);
}
void AsyncWebSocket::messageAll(AsyncWebSocketMessage *message){
void AsyncWebSocket::messageAll(AsyncWebSocketMultiMessage *message){
for(const auto& c: _clients){
if(c->status() == WS_CONNECTED)
c->message(message);
}
_cleanBuffers();
}
size_t AsyncWebSocket::printf(uint32_t id, const char *format, ...){
@ -705,27 +929,20 @@ size_t AsyncWebSocket::printf(uint32_t id, const char *format, ...){
size_t AsyncWebSocket::printfAll(const char *format, ...) {
va_list arg;
va_start(arg, format);
char* temp = new char[64];
if(!temp){
return 0;
size_t len = vsnprintf(nullptr, 0, format, arg);
va_end(arg);
AsyncWebSocketMessageBuffer * buffer = makeBuffer(len + 1);
if (!buffer) {
return 0;
}
char* buffer = temp;
size_t len = vsnprintf(temp, 64, format, arg);
va_start(arg, format);
vsnprintf( (char *)buffer->get(), len + 1, format, arg);
va_end(arg);
if (len > 63) {
buffer = new char[len + 1];
if (!buffer) {
return 0;
}
va_start(arg, format);
vsnprintf(buffer, len + 1, format, arg);
va_end(arg);
}
textAll(buffer, len);
if (buffer != temp) {
delete[] buffer;
}
delete[] temp;
textAll(buffer);
return len;
}
@ -744,27 +961,17 @@ size_t AsyncWebSocket::printf_P(uint32_t id, PGM_P formatP, ...){
size_t AsyncWebSocket::printfAll_P(PGM_P formatP, ...) {
va_list arg;
va_start(arg, formatP);
char* temp = new char[64];
if(!temp){
size_t len = vsnprintf_P(nullptr, 0, formatP, arg);
va_end(arg);
AsyncWebSocketMessageBuffer * buffer = makeBuffer(len + 1);
if (!buffer) {
return 0;
}
char* buffer = temp;
size_t len = vsnprintf_P(temp, 64, formatP, arg);
va_start(arg, formatP);
vsnprintf_P( (char *)buffer->get(), len + 1, formatP, arg);
va_end(arg);
if (len > 63) {
buffer = new char[len + 1];
if (!buffer) {
return 0;
}
va_start(arg, formatP);
vsnprintf_P(buffer, len + 1, formatP, arg);
va_end(arg);
}
textAll(buffer, len);
if (buffer != temp) {
delete[] buffer;
}
delete[] temp;
textAll(buffer);
return len;
}
@ -869,9 +1076,6 @@ void AsyncWebSocket::handleRequest(AsyncWebServerRequest *request){
request->send(400);
return;
}
if((_username != "" && _password != "") && !request->authenticate(_username.c_str(), _password.c_str()))
return request->requestAuthentication();
AsyncWebHeader* version = request->getHeader(WS_STR_VERSION);
if(version->value().toInt() != 13){
AsyncWebServerResponse *response = request->beginResponse(400);
@ -889,6 +1093,34 @@ void AsyncWebSocket::handleRequest(AsyncWebServerRequest *request){
request->send(response);
}
AsyncWebSocketMessageBuffer * AsyncWebSocket::makeBuffer(size_t size)
{
AsyncWebSocketMessageBuffer * buffer = new AsyncWebSocketMessageBuffer(size);
if (buffer) {
_buffers.add(buffer);
}
return buffer;
}
AsyncWebSocketMessageBuffer * AsyncWebSocket::makeBuffer(uint8_t * data, size_t size)
{
AsyncWebSocketMessageBuffer * buffer = new AsyncWebSocketMessageBuffer(data, size);
if (buffer) {
_buffers.add(buffer);
}
return buffer;
}
void AsyncWebSocket::_cleanBuffers()
{
for(const auto& c: _buffers){
if(c->canDelete())
_buffers.remove(c);
}
}
/*
* Response to Web Socket request - sends the authorization and detaches the TCP Client from the web server

View File

@ -46,6 +46,34 @@ typedef enum { WS_CONTINUATION, WS_TEXT, WS_BINARY, WS_DISCONNECT = 0x08, WS_PIN
typedef enum { WS_MSG_SENDING, WS_MSG_SENT, WS_MSG_ERROR } AwsMessageStatus;
typedef enum { WS_EVT_CONNECT, WS_EVT_DISCONNECT, WS_EVT_PONG, WS_EVT_ERROR, WS_EVT_DATA } AwsEventType;
class AsyncWebSocketMessageBuffer {
private:
uint8_t * _data;
size_t _len;
bool _lock;
uint32_t _count;
public:
AsyncWebSocketMessageBuffer();
AsyncWebSocketMessageBuffer(size_t size);
AsyncWebSocketMessageBuffer(uint8_t * data, size_t size);
AsyncWebSocketMessageBuffer(const AsyncWebSocketMessageBuffer &);
AsyncWebSocketMessageBuffer(AsyncWebSocketMessageBuffer &&);
~AsyncWebSocketMessageBuffer();
void operator ++(int i) { _count++; }
void operator --(int i) { if (_count > 0) { _count--; } ; }
bool reserve(size_t size);
void lock() { _lock = true; }
void unlock() { _lock = false; }
uint8_t * get() { return _data; }
size_t length() { return _len; }
uint32_t count() { return _count; }
bool canDelete() { return (!_count && !_lock); }
friend AsyncWebSocket;
};
class AsyncWebSocketMessage {
protected:
uint8_t _opcode;
@ -60,6 +88,38 @@ class AsyncWebSocketMessage {
virtual bool betweenFrames() const { return false; }
};
class AsyncWebSocketBasicMessage: public AsyncWebSocketMessage {
private:
uint8_t * _data;
size_t _len;
size_t _sent;
size_t _ack;
size_t _acked;
public:
AsyncWebSocketBasicMessage(const char * data, size_t len, uint8_t opcode=WS_TEXT, bool mask=false);
AsyncWebSocketBasicMessage(uint8_t opcode=WS_TEXT, bool mask=false);
virtual ~AsyncWebSocketBasicMessage() override;
virtual bool betweenFrames() const override { return _acked == _ack; }
virtual void ack(size_t len, uint32_t time) override ;
virtual size_t send(AsyncClient *client) override ;
};
class AsyncWebSocketMultiMessage: public AsyncWebSocketMessage {
private:
uint8_t * _data;
size_t _len;
size_t _sent;
size_t _ack;
size_t _acked;
AsyncWebSocketMessageBuffer * _WSbuffer;
public:
AsyncWebSocketMultiMessage(AsyncWebSocketMessageBuffer * buffer, uint8_t opcode=WS_TEXT, bool mask=false);
virtual ~AsyncWebSocketMultiMessage() override;
virtual bool betweenFrames() const override { return _acked == _ack; }
virtual void ack(size_t len, uint32_t time) override ;
virtual size_t send(AsyncClient *client) override ;
};
class AsyncWebSocketClient {
private:
AsyncClient *_client;
@ -116,6 +176,7 @@ class AsyncWebSocketClient {
void text(char * message);
void text(const String &message);
void text(const __FlashStringHelper *data);
void text(AsyncWebSocketMessageBuffer *buffer);
void binary(const char * message, size_t len);
void binary(const char * message);
@ -123,6 +184,7 @@ class AsyncWebSocketClient {
void binary(char * message);
void binary(const String &message);
void binary(const __FlashStringHelper *data, size_t len);
void binary(AsyncWebSocketMessageBuffer *buffer);
//system callbacks (do not call)
void _onAck(size_t len, uint32_t time);
@ -158,7 +220,7 @@ class AsyncWebSocket: public AsyncWebHandler {
void closeAll(uint16_t code=0, const char * message=NULL);
void ping(uint32_t id, uint8_t *data=NULL, size_t len=0);
void pingAll(uint8_t *data=NULL, size_t len=0);
void pingAll(uint8_t *data=NULL, size_t len=0); // done
void text(uint32_t id, const char * message, size_t len);
void text(uint32_t id, const char * message);
@ -172,7 +234,8 @@ class AsyncWebSocket: public AsyncWebHandler {
void textAll(uint8_t * message, size_t len);
void textAll(char * message);
void textAll(const String &message);
void textAll(const __FlashStringHelper *message);
void textAll(const __FlashStringHelper *message); // need to convert
void textAll(AsyncWebSocketMessageBuffer * buffer);
void binary(uint32_t id, const char * message, size_t len);
void binary(uint32_t id, const char * message);
@ -187,9 +250,10 @@ class AsyncWebSocket: public AsyncWebHandler {
void binaryAll(char * message);
void binaryAll(const String &message);
void binaryAll(const __FlashStringHelper *message, size_t len);
void binaryAll(AsyncWebSocketMessageBuffer * buffer);
void message(uint32_t id, AsyncWebSocketMessage *message);
void messageAll(AsyncWebSocketMessage *message);
void messageAll(AsyncWebSocketMultiMessage *message);
size_t printf(uint32_t id, const char *format, ...) __attribute__ ((format (printf, 3, 4)));
size_t printfAll(const char *format, ...) __attribute__ ((format (printf, 2, 3)));
@ -208,6 +272,13 @@ class AsyncWebSocket: public AsyncWebHandler {
void _handleEvent(AsyncWebSocketClient * client, AwsEventType type, void * arg, uint8_t *data, size_t len);
virtual bool canHandle(AsyncWebServerRequest *request) override final;
virtual void handleRequest(AsyncWebServerRequest *request) override final;
// messagebuffer functions/objects.
AsyncWebSocketMessageBuffer * makeBuffer(size_t size = 0);
AsyncWebSocketMessageBuffer * makeBuffer(uint8_t * data, size_t size);
LinkedList<AsyncWebSocketMessageBuffer *> _buffers;
void _cleanBuffers();
};
//WebServer response to authenticate the socket and detach the tcp client from the web server request