Buffers queues should be final now.

git-svn-id: svn://svn.code.sf.net/p/chibios/svn/trunk@8638 35acf78f-673a-0410-8e92-d51de3d6d3f4
This commit is contained in:
Giovanni Di Sirio 2015-12-24 09:10:49 +00:00
parent 2296afaa34
commit b3b4c6ad41
3 changed files with 198 additions and 103 deletions

View File

@ -101,12 +101,6 @@ struct io_buffers_queue {
* @brief Boundary for R/W sequential access.
*/
uint8_t *top;
/**
* @brief Buffer is being accessed.
* @details This flag indicates that the current buffer is being read or
* written by a long, preemptable operation.
*/
bool accessed;
/**
* @brief Data notification callback.
*/
@ -242,7 +236,10 @@ extern "C" {
void ibqPostFullBufferI(input_buffers_queue_t *ibqp, size_t size);
msg_t ibqGetFullBufferTimeout(input_buffers_queue_t *ibqp,
systime_t timeout);
msg_t ibqGetFullBufferTimeoutS(input_buffers_queue_t *ibqp,
systime_t timeout);
void ibqReleaseEmptyBuffer(input_buffers_queue_t *ibqp);
void ibqReleaseEmptyBufferS(input_buffers_queue_t *ibqp);
msg_t ibqGetTimeout(input_buffers_queue_t *ibqp, systime_t timeout);
size_t ibqReadTimeout(input_buffers_queue_t *ibqp, uint8_t *bp,
size_t n, systime_t timeout);
@ -255,7 +252,10 @@ extern "C" {
void obqReleaseEmptyBufferI(output_buffers_queue_t *obqp);
msg_t obqGetEmptyBufferTimeout(output_buffers_queue_t *obqp,
systime_t timeout);
msg_t obqGetEmptyBufferTimeoutS(output_buffers_queue_t *obqp,
systime_t timeout);
void obqPostFullBuffer(output_buffers_queue_t *obqp, size_t size);
void obqPostFullBufferS(output_buffers_queue_t *obqp, size_t size);
msg_t obqPutTimeout(output_buffers_queue_t *obqp, uint8_t b,
systime_t timeout);
size_t obqWriteTimeout(output_buffers_queue_t *obqp, const uint8_t *bp,

View File

@ -74,7 +74,6 @@ void ibqObjectInit(input_buffers_queue_t *ibqp, uint8_t *bp,
ibqp->buffers = bp;
ibqp->ptr = NULL;
ibqp->top = NULL;
ibqp->accessed = false;
ibqp->notify = infy;
ibqp->link = link;
}
@ -174,14 +173,44 @@ void ibqPostFullBufferI(input_buffers_queue_t *ibqp, size_t size) {
*/
msg_t ibqGetFullBufferTimeout(input_buffers_queue_t *ibqp,
systime_t timeout) {
msg_t msg;
osalSysLock();
chSysLock();
msg = ibqGetFullBufferTimeoutS(ibqp, timeout);
chSysUnlock();
return msg;
}
/**
* @brief Gets the next filled buffer from the queue.
* @note The function always acquires the same buffer if called repeatedly.
* @post After calling the function the fields @p ptr and @p top are set
* at beginning and end of the buffer data or @NULL if the queue
* is empty.
*
* @param[in] ibqp pointer to the @p input_buffers_queue_t object
* @param[in] timeout the number of ticks before the operation timeouts,
* the following special values are allowed:
* - @a TIME_IMMEDIATE immediate timeout.
* - @a TIME_INFINITE no timeout.
* .
* @return The operation status.
* @retval MSG_OK if a buffer has been acquired.
* @retval MSG_TIMEOUT if the specified time expired.
* @retval MSG_RESET if the queue has been reset.
*
* @sclass
*/
msg_t ibqGetFullBufferTimeoutS(input_buffers_queue_t *ibqp,
systime_t timeout) {
osalDbgCheckClassS();
while (ibqIsEmptyI(ibqp)) {
msg_t msg = osalThreadEnqueueTimeoutS(&ibqp->waiting, timeout);
if (msg < MSG_OK) {
osalSysUnlock();
return msg;
return msg;
}
}
@ -191,7 +220,6 @@ msg_t ibqGetFullBufferTimeout(input_buffers_queue_t *ibqp,
ibqp->ptr = ibqp->brdptr + sizeof (size_t);
ibqp->top = ibqp->ptr + *((size_t *)ibqp->brdptr);
osalSysUnlock();
return MSG_OK;
}
@ -205,8 +233,22 @@ msg_t ibqGetFullBufferTimeout(input_buffers_queue_t *ibqp,
*/
void ibqReleaseEmptyBuffer(input_buffers_queue_t *ibqp) {
osalSysLock();
chSysLock();
ibqReleaseEmptyBufferS(ibqp);
chSysUnlock();
}
/**
* @brief Releases the buffer back in the queue.
* @note The object callback is called after releasing the buffer.
*
* @param[in] ibqp pointer to the @p input_buffers_queue_t object
*
* @sclass
*/
void ibqReleaseEmptyBufferS(input_buffers_queue_t *ibqp) {
osalDbgCheckClassS();
osalDbgAssert(!ibqIsEmptyI(ibqp), "buffers queue empty");
/* Freeing a buffer slot in the queue.*/
@ -223,8 +265,6 @@ void ibqReleaseEmptyBuffer(input_buffers_queue_t *ibqp) {
if (ibqp->notify != NULL) {
ibqp->notify(ibqp);
}
osalSysUnlock();
}
/**
@ -232,7 +272,6 @@ void ibqReleaseEmptyBuffer(input_buffers_queue_t *ibqp) {
* @details This function reads a byte value from an input queue. If
* the queue is empty then the calling thread is suspended until a
* new buffer arrives in the queue or a timeout occurs.
* @note This function is not reentrant.
*
* @param[in] ibqp pointer to the @p input_buffers_queue_t object
* @param[in] timeout the number of ticks before the operation timeouts,
@ -249,16 +288,13 @@ void ibqReleaseEmptyBuffer(input_buffers_queue_t *ibqp) {
msg_t ibqGetTimeout(input_buffers_queue_t *ibqp, systime_t timeout) {
msg_t msg;
osalDbgAssert(!ibqp->accessed, "queue is being accessed");
/* Marking the queue as being busy becuase by a long operation.*/
ibqp->accessed = true;
osalSysLock();
/* This condition indicates that a new buffer must be acquired.*/
if (ibqp->ptr == NULL) {
msg = ibqGetFullBufferTimeout(ibqp, timeout);
msg = ibqGetFullBufferTimeoutS(ibqp, timeout);
if (msg != MSG_OK) {
ibqp->accessed = false;
osalSysUnlock();
return msg;
}
}
@ -270,10 +306,10 @@ msg_t ibqGetTimeout(input_buffers_queue_t *ibqp, systime_t timeout) {
/* If the current buffer has been fully read then it is returned as
empty in the queue.*/
if (ibqp->ptr >= ibqp->top) {
ibqReleaseEmptyBuffer(ibqp);
ibqReleaseEmptyBufferS(ibqp);
}
ibqp->accessed = false;
osalSysUnlock();
return msg;
}
@ -283,7 +319,6 @@ msg_t ibqGetTimeout(input_buffers_queue_t *ibqp, systime_t timeout) {
* The operation completes when the specified amount of data has been
* transferred or after the specified timeout or if the queue has
* been reset.
* @note This function is not reentrant.
*
* @param[in] ibqp pointer to the @p input_buffers_queue_t object
* @param[out] bp pointer to the data buffer
@ -304,26 +339,27 @@ size_t ibqReadTimeout(input_buffers_queue_t *ibqp, uint8_t *bp,
size_t r = 0;
systime_t deadline;
osalDbgAssert(!ibqp->accessed, "queue is being accessed");
/* Marking the queue as being busy becuase by a long operation.*/
ibqp->accessed = true;
osalSysLock();
/* Time window for the whole operation.*/
deadline = osalOsGetSystemTimeX() + timeout;
while (r < n) {
while (true) {
size_t size;
/* This condition indicates that a new buffer must be acquired.*/
if (ibqp->ptr == NULL) {
msg_t msg;
/* TIME_IMMEDIATE is a special case, never wait.*/
if (timeout == TIME_IMMEDIATE) {
msg = MSG_TIMEOUT;
osalSysUnlock();
return r;
}
else if (timeout == TIME_INFINITE) {
msg = ibqGetFullBufferTimeout(ibqp, timeout);
/* TIME_INFINITE is handled differently, no deadline.*/
if (timeout == TIME_INFINITE) {
msg = ibqGetFullBufferTimeoutS(ibqp, timeout);
}
else {
systime_t next_timeout = deadline - osalOsGetSystemTimeX();
@ -332,14 +368,15 @@ size_t ibqReadTimeout(input_buffers_queue_t *ibqp, uint8_t *bp,
in this case next becomes a very high number because the system
time is an unsigned type.*/
if (next_timeout > timeout) {
msg = MSG_TIMEOUT;
}
else {
msg = ibqGetFullBufferTimeout(ibqp, next_timeout);
osalSysUnlock();
return r;
}
msg = ibqGetFullBufferTimeoutS(ibqp, next_timeout);
}
/* Anything except MSG_OK interrupts the operation.*/
if (msg != MSG_OK) {
ibqp->accessed = false;
osalSysUnlock();
return r;
}
}
@ -350,22 +387,34 @@ size_t ibqReadTimeout(input_buffers_queue_t *ibqp, uint8_t *bp,
size = n - r;
}
/* Copying the chunk into the read buffer.*/
memcpy(bp, ibqp->ptr, size);
/* Updating the pointers and the counter.*/
r += size;
bp += size;
ibqp->ptr += size;
/* Smaller chunks in order to not make the critical zone too long,
this impacts throughput however.*/
if (size > 64) {
/* Giving the compiler a chance to optimize for a fixed size move.*/
memcpy(bp, ibqp->ptr, 64);
bp += 64;
ibqp->ptr += 64;
r += 64;
}
else {
memcpy(bp, ibqp->ptr, size);
bp += size;
ibqp->ptr += size;
r += size;
}
/* Has the current data buffer been finished? if so then release it.*/
if (ibqp->ptr >= ibqp->top) {
ibqReleaseEmptyBuffer(ibqp);
ibqReleaseEmptyBufferS(ibqp);
}
}
ibqp->accessed = false;
return r;
/* Giving a preemption chance.*/
osalSysUnlock();
if (r >= n) {
return r;
}
osalSysLock();
}
}
/**
@ -396,7 +445,6 @@ void obqObjectInit(output_buffers_queue_t *obqp, uint8_t *bp,
obqp->buffers = bp;
obqp->ptr = NULL;
obqp->top = NULL;
obqp->accessed = false;
obqp->notify = onfy;
obqp->link = link;
}
@ -494,14 +542,44 @@ void obqReleaseEmptyBufferI(output_buffers_queue_t *obqp) {
* @api
*/
msg_t obqGetEmptyBufferTimeout(output_buffers_queue_t *obqp,
systime_t timeout) {
systime_t timeout) {
msg_t msg;
osalSysLock();
chSysLock();
msg = obqGetEmptyBufferTimeoutS(obqp, timeout);
chSysUnlock();
return msg;
}
/**
* @brief Gets the next empty buffer from the queue.
* @note The function always acquires the same buffer if called repeatedly.
* @post After calling the function the fields @p ptr and @p top are set
* at beginning and end of the buffer data or @NULL if the queue
* is empty.
*
* @param[in] obqp pointer to the @p output_buffers_queue_t object
* @param[in] timeout the number of ticks before the operation timeouts,
* the following special values are allowed:
* - @a TIME_IMMEDIATE immediate timeout.
* - @a TIME_INFINITE no timeout.
* .
* @return The operation status.
* @retval MSG_OK if a buffer has been acquired.
* @retval MSG_TIMEOUT if the specified time expired.
* @retval MSG_RESET if the queue has been reset.
*
* @sclass
*/
msg_t obqGetEmptyBufferTimeoutS(output_buffers_queue_t *obqp,
systime_t timeout) {
osalDbgCheckClassS();
while (obqIsFullI(obqp)) {
msg_t msg = osalThreadEnqueueTimeoutS(&obqp->waiting, timeout);
if (msg < MSG_OK) {
osalSysUnlock();
return msg;
}
}
@ -512,7 +590,6 @@ msg_t obqGetEmptyBufferTimeout(output_buffers_queue_t *obqp,
obqp->ptr = obqp->bwrptr + sizeof (size_t);
obqp->top = obqp->bwrptr + obqp->bsize;
osalSysUnlock();
return MSG_OK;
}
@ -527,10 +604,24 @@ msg_t obqGetEmptyBufferTimeout(output_buffers_queue_t *obqp,
*/
void obqPostFullBuffer(output_buffers_queue_t *obqp, size_t size) {
osalDbgCheck((size > 0) && (size <= obqp->bsize - sizeof (size_t)));
osalSysLock();
obqPostFullBufferS(obqp, size);
osalSysUnlock();
}
/**
* @brief Posts a new filled buffer to the queue.
* @note The object callback is called after releasing the buffer.
*
* @param[in] obqp pointer to the @p output_buffers_queue_t object
* @param[in] size used size of the buffer, cannot be zero
*
* @sclass
*/
void obqPostFullBufferS(output_buffers_queue_t *obqp, size_t size) {
osalDbgCheckClassS();
osalDbgCheck((size > 0) && (size <= obqp->bsize - sizeof (size_t)));
osalDbgAssert(!obqIsFullI(obqp), "buffers queue full");
/* Writing size field in the buffer.*/
@ -550,8 +641,6 @@ void obqPostFullBuffer(output_buffers_queue_t *obqp, size_t size) {
if (obqp->notify != NULL) {
obqp->notify(obqp);
}
osalSysUnlock();
}
/**
@ -559,7 +648,6 @@ void obqPostFullBuffer(output_buffers_queue_t *obqp, size_t size) {
* @details This function writes a byte value to an output queue. If
* the queue is full then the calling thread is suspended until a
* new buffer is freed in the queue or a timeout occurs.
* @note This function is not reentrant.
*
* @param[in] obqp pointer to the @p output_buffers_queue_t object
* @param[in] timeout the number of ticks before the operation timeouts,
@ -577,15 +665,13 @@ msg_t obqPutTimeout(output_buffers_queue_t *obqp, uint8_t b,
systime_t timeout) {
msg_t msg;
osalDbgAssert(!obqp->accessed, "queue is being accessed");
/* Marking the queue as being busy becuase by a long operation.*/
obqp->accessed = true;
osalSysLock();
/* This condition indicates that a new buffer must be acquired.*/
if (obqp->ptr == NULL) {
msg = obqGetEmptyBufferTimeout(obqp, timeout);
msg = obqGetEmptyBufferTimeoutS(obqp, timeout);
if (msg != MSG_OK) {
osalSysUnlock();
return msg;
}
}
@ -597,10 +683,10 @@ msg_t obqPutTimeout(output_buffers_queue_t *obqp, uint8_t b,
/* If the current buffer has been fully written then it is posted as
full in the queue.*/
if (obqp->ptr >= obqp->top) {
obqPostFullBuffer(obqp, obqp->bsize - sizeof (size_t));
obqPostFullBufferS(obqp, obqp->bsize - sizeof (size_t));
}
obqp->accessed = false;
osalSysUnlock();
return MSG_OK;
}
@ -610,7 +696,6 @@ msg_t obqPutTimeout(output_buffers_queue_t *obqp, uint8_t b,
* operation completes when the specified amount of data has been
* transferred or after the specified timeout or if the queue has
* been reset.
* @note This function is not reentrant.
*
* @param[in] obqp pointer to the @p output_buffers_queue_t object
* @param[in] bp pointer to the data buffer
@ -628,29 +713,30 @@ msg_t obqPutTimeout(output_buffers_queue_t *obqp, uint8_t b,
*/
size_t obqWriteTimeout(output_buffers_queue_t *obqp, const uint8_t *bp,
size_t n, systime_t timeout) {
size_t r = 0;
size_t w = 0;
systime_t deadline;
osalDbgAssert(!obqp->accessed, "queue is being accessed");
/* Marking the queue as being busy becuase by a long operation.*/
obqp->accessed = true;
osalSysLock();
/* Time window for the whole operation.*/
deadline = osalOsGetSystemTimeX() + timeout;
while (r < n) {
while (true) {
size_t size;
/* This condition indicates that a new buffer must be acquired.*/
if (obqp->ptr == NULL) {
msg_t msg;
/* TIME_IMMEDIATE is a special case, never wait.*/
if (timeout == TIME_IMMEDIATE) {
msg = MSG_TIMEOUT;
osalSysUnlock();
return w;
}
else if (timeout == TIME_INFINITE) {
msg = obqGetEmptyBufferTimeout(obqp, timeout);
/* TIME_INFINITE is handled differently, no deadline.*/
if (timeout == TIME_INFINITE) {
msg = obqGetEmptyBufferTimeoutS(obqp, timeout);
}
else {
systime_t next_timeout = deadline - osalOsGetSystemTimeX();
@ -659,40 +745,53 @@ size_t obqWriteTimeout(output_buffers_queue_t *obqp, const uint8_t *bp,
in this case next becomes a very high number because the system
time is an unsigned type.*/
if (next_timeout > timeout) {
msg = MSG_TIMEOUT;
}
else {
msg = obqGetEmptyBufferTimeout(obqp, next_timeout);
osalSysUnlock();
return w;
}
msg = obqGetEmptyBufferTimeoutS(obqp, next_timeout);
}
/* Anything except MSG_OK interrupts the operation.*/
if (msg != MSG_OK) {
obqp->accessed = false;
return r;
osalSysUnlock();
return w;
}
}
/* Size of the space available in the current buffer.*/
size = obqp->top - obqp->ptr;
if (size > n - r) {
size = n - r;
if (size > n - w) {
size = n - w;
}
/* Copying the chunk into the read buffer.*/
memcpy(obqp->ptr, bp, size);
/* Updating the pointers and the counter.*/
r += size;
bp += size;
obqp->ptr += size;
/* Smaller chunks in order to not make the critical zone too long,
this impacts throughput however.*/
if (size > 64) {
/* Giving the compiler a chance to optimize for a fixed size move.*/
memcpy(obqp->ptr, bp, 64);
bp += 64;
obqp->ptr += 64;
w += 64;
}
else {
memcpy(obqp->ptr, bp, size);
bp += size;
obqp->ptr += size;
w += size;
}
/* Has the current data buffer been finished? if so then release it.*/
if (obqp->ptr >= obqp->top) {
obqPostFullBuffer(obqp, obqp->bsize - sizeof (size_t));
obqPostFullBufferS(obqp, obqp->bsize - sizeof (size_t));
}
}
obqp->accessed = false;
return r;
/* Giving a preemption chance.*/
osalSysUnlock();
if (w >= n) {
return w;
}
osalSysLock();
}
}
/**
@ -714,7 +813,7 @@ bool obqTryFlushI(output_buffers_queue_t *obqp) {
/* If queue is empty and there is a buffer partially filled and
it is not being written.*/
if (obqIsEmptyI(obqp) && (obqp->ptr != NULL) && !obqp->accessed) {
if (obqIsEmptyI(obqp) && (obqp->ptr != NULL)) {
size_t size = (size_t)(obqp->ptr - (obqp->bwrptr + sizeof (size_t)));
if (size > 0U) {
@ -740,7 +839,6 @@ bool obqTryFlushI(output_buffers_queue_t *obqp) {
/**
* @brief Flushes the current, partially filled, buffer to the queue.
* @note This function is not reentrant.
*
* @param[in] obqp pointer to the @p output_buffers_queue_t object
*
@ -748,20 +846,17 @@ bool obqTryFlushI(output_buffers_queue_t *obqp) {
*/
void obqFlush(output_buffers_queue_t *obqp) {
osalDbgAssert(!obqp->accessed, "queue is being accessed");
/* Marking the queue as being busy becuase by a long operation.*/
obqp->accessed = true;
osalSysLock();
/* If there is a buffer partially filled and not being written.*/
if (obqp->ptr != NULL) {
size_t size = (size_t)(obqp->ptr - obqp->bwrptr);
if (size > 0U) {
obqPostFullBuffer(obqp, size);
obqPostFullBufferS(obqp, size);
}
}
obqp->accessed = false;
osalSysUnlock();
}
/** @} */

View File

@ -111,7 +111,7 @@ static void cmd_write(BaseSequentialStream *chp, int argc, char *argv[]) {
}
while (chnGetTimeout((BaseChannel *)chp, TIME_IMMEDIATE) == Q_TIMEOUT) {
#if 1
#if 0
/* Writing in stream mode.*/
streamWrite(&SDU2, buf, sizeof buf - 1);
#else