aboutsummaryrefslogtreecommitdiffstats
path: root/rb
diff options
context:
space:
mode:
authorMichał Łyszczek <michal.lyszczek@bofc.pl>2018-01-15 11:00:54 +0100
committerMichał Łyszczek <michal.lyszczek@bofc.pl>2018-01-15 11:00:54 +0100
commit49de165820fb41afc50fe69ef5d3bc0e391ecc52 (patch)
tree1255885b89fb75ab173a56b48452891136dafa0a /rb
parent54a6fe365c52d7e0305e0e84a2089c3e1d913438 (diff)
downloadlibrb-49de165820fb41afc50fe69ef5d3bc0e391ecc52.tar.gz
librb-49de165820fb41afc50fe69ef5d3bc0e391ecc52.tar.bz2
librb-49de165820fb41afc50fe69ef5d3bc0e391ecc52.zip
fix: deadlocks and crashes in some ultra-rare situations
due to pthread nature, with very bad luck, calling rb_destroy could cause deadlock or application crash. This was fixed introducing stop function, that can be called before destroy, to force all threads to exit from locked rb functions
Diffstat (limited to 'rb')
-rw-r--r--rb/rb.c243
-rw-r--r--rb/tests.c17
2 files changed, 102 insertions, 158 deletions
diff --git a/rb/rb.c b/rb/rb.c
index 398c44b..11c5b93 100644
--- a/rb/rb.c
+++ b/rb/rb.c
@@ -66,12 +66,9 @@ struct rb
pthread_mutex_t lock; /* mutex for concurrent access */
pthread_cond_t wait_data; /* ca, will block if buffer is empty */
pthread_cond_t wait_room; /* ca, will block if buffer is full */
- pthread_cond_t exit_cond; /* signal cond when thread exits recv/send */
-
- int tinsend; /* number of threads inside send function */
- int tinread; /* number of threads inside recv function */
+ pthread_t stop_thread; /* thread to force thread to exit send/recv */
+ int stopped_all; /* when set no threads are in send/recv */
#endif
-
};
@@ -495,41 +492,19 @@ long rb_sendt
/* ==========================================================================
- This function is called only when threads are enabled. When rb_destroy
- is called, it will call this function in separate detached thread. This
- function will cleanup all resources once it makes sure all threads doing
- some operations on rb object are finished. First check is to count
- threads in read or send, and if that count is zero it proceeds to next
- check.
-
- Next check: every thread that exits read/send functions, will signal
- conditional variable rb->exit_cond. If function don't get any such
- signal withing 30 seconds, it assumes all threads exited and it is safe
- to free all memory. We need to do this, as there is small chance that
- some thread will enter read/send and then immediately cotext switch will
- occur, before thread can increment thred counter leading to memory
- dealocation (as thread counter is 0), and right after thread wakes back
- up, it will operate on unallocated memory.
-
- When that condition is met, function frees all resources.
-
- Bugs:
-
- When calling rb_destroy on threaded object, and then exiting main thread
- within 30 seconds, sanitizing tools will report memory leak. This is
- false positive, as rb_destroy_async didn't reach free() code yet, and
- while it is detached, it is impossible to check if function finished.
-
+ This function simply signals all conditional variables to force any
+ locked thread to exit from read/send functions
========================================================================== */
-static void *rb_destroy_async(void *arg)
+static void *rb_stop_thread(void *arg)
{
- int i; /* counter for exit signal */
- struct rb *rb; /* ring buffer object */
+ struct rb *rb; /* ring buffer object */
+ int stopped; /* copy of rb->stopped_all */
/*~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~*/
rb = arg;
+ stopped = 0;
pthread_mutex_lock(&rb->lock);
rb->force_exit = 1;
@@ -539,78 +514,15 @@ static void *rb_destroy_async(void *arg)
* Send cond signal, until all threads exits read/send functions.
*/
- for (;;)
+ while (stopped != 1)
{
pthread_mutex_lock(&rb->lock);
-
- if (rb->tinread == 0)
- {
- pthread_mutex_unlock(&rb->lock);
- break;
- }
-
pthread_cond_signal(&rb->wait_data);
- pthread_mutex_unlock(&rb->lock);
- }
-
- for (;;)
- {
- pthread_mutex_lock(&rb->lock);
-
- if (rb->tinsend == 0)
- {
- pthread_mutex_unlock(&rb->lock);
- break;
- }
-
pthread_cond_signal(&rb->wait_room);
+ stopped = rb->stopped_all;
pthread_mutex_unlock(&rb->lock);
}
- for (i = 0; i != 5;)
- {
- struct timespec ts;
- /*~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~*/
-
- clock_gettime(CLOCK_REALTIME, &ts);
- ts.tv_sec += 1;
-
- pthread_mutex_lock(&rb->lock);
-
- if (pthread_cond_timedwait(&rb->exit_cond, &rb->lock, &ts) == ETIMEDOUT)
- {
- /*
- * we increment counter only when no exit_cond signal has been
- * received
- */
-
- ++i;
- }
- else
- {
- /*
- * signal received, set counter to 0
- */
-
- i = 0;
- }
-
- pthread_mutex_unlock(&rb->lock);
- }
-
- pthread_mutex_lock(&rb->lock);
- pthread_cond_destroy(&rb->wait_data);
- pthread_cond_destroy(&rb->wait_room);
- pthread_cond_destroy(&rb->exit_cond);
- memset(&rb->wait_data, 0, sizeof(rb->wait_data));
- memset(&rb->wait_room, 0, sizeof(rb->wait_room));
- pthread_mutex_unlock(&rb->lock);
- pthread_mutex_destroy(&rb->lock);
-
- memset(&rb->lock, 0, sizeof(rb->lock));
- free(rb->buffer);
- free(rb);
-
return NULL;
}
@@ -703,21 +615,13 @@ struct rb *rb_new
* Multithreaded environment
*/
- rb->tinread = 0;
- rb->tinsend = 0;
-
+ rb->stopped_all = -1;
if (pthread_mutex_init(&rb->lock, NULL))
{
e = errno;
goto error;
}
- if (pthread_cond_init(&rb->exit_cond, NULL))
- {
- e = errno;
- goto error;
- }
-
if (pthread_cond_init(&rb->wait_data, NULL))
{
e = errno;
@@ -736,7 +640,6 @@ error:
pthread_mutex_destroy(&rb->lock);
pthread_cond_destroy(&rb->wait_data);
pthread_cond_destroy(&rb->wait_room);
- pthread_cond_destroy(&rb->exit_cond);
free(rb->buffer);
free(rb);
@@ -776,7 +679,7 @@ long rb_read
Same as rb_read but also accepts flags
========================================================================== */
-
+#include <stdio.h>
long rb_recv
(
struct rb *rb, /* rb object */
@@ -785,22 +688,27 @@ long rb_recv
unsigned long flags /* operation flags */
)
{
- if (rb == NULL || rb->force_exit || buffer == NULL || rb->buffer == NULL)
+ if (rb == NULL || buffer == NULL || rb->buffer == NULL)
{
errno = EINVAL;
return -1;
}
+ pthread_mutex_lock(&rb->lock);
+ if (rb->force_exit)
+ {
+ pthread_mutex_unlock(&rb->lock);
+ errno = ECANCELED;
+ return -1;
+ }
+ pthread_mutex_unlock(&rb->lock);
+
#if ENABLE_THREADS
if (rb->flags & O_NONTHREAD)
{
return rb_recvs(rb, buffer, count, flags);
}
- pthread_mutex_lock(&rb->lock);
- rb->tinread++;
- pthread_mutex_unlock(&rb->lock);
-
if (flags & MSG_PEEK)
{
/*
@@ -812,18 +720,10 @@ long rb_recv
pthread_mutex_lock(&rb->lock);
count = rb_recvs(rb, buffer, count, flags);
pthread_mutex_unlock(&rb->lock);
+ return count;
}
- else
- {
- count = rb_recvt(rb, buffer, count, flags);
- }
-
- pthread_mutex_lock(&rb->lock);
- rb->tinread--;
- pthread_cond_signal(&rb->exit_cond);
- pthread_mutex_unlock(&rb->lock);
- return count;
+ return rb_recvt(rb, buffer, count, flags);
#else
return rb_recvs(rb, buffer, count, flags);
#endif
@@ -869,30 +769,28 @@ long rb_send
unsigned long flags /* operation flags */
)
{
- if (rb == NULL || rb->force_exit || buffer == NULL || rb->buffer == NULL)
+ if (rb == NULL || buffer == NULL || rb->buffer == NULL)
{
errno = EINVAL;
return -1;
}
+ pthread_mutex_lock(&rb->lock);
+ if (rb->force_exit)
+ {
+ pthread_mutex_unlock(&rb->lock);
+ errno = ECANCELED;
+ return -1;
+ }
+ pthread_mutex_unlock(&rb->lock);
+
#if ENABLE_THREADS
if (rb->flags & O_NONTHREAD)
{
return rb_sends(rb, buffer, count, flags);
}
- pthread_mutex_lock(&rb->lock);
- rb->tinsend++;
- pthread_mutex_unlock(&rb->lock);
-
- count = rb_sendt(rb, buffer, count, flags);
-
- pthread_mutex_lock(&rb->lock);
- rb->tinsend--;
- pthread_cond_signal(&rb->exit_cond);
- pthread_mutex_unlock(&rb->lock);
-
- return count;
+ return rb_sendt(rb, buffer, count, flags);
#else
return rb_sends(rb, buffer, count, flags);
#endif
@@ -943,15 +841,8 @@ int rb_clear
/* ==========================================================================
- Frees resources allocated by rb_new. Also it stops any blocked rb_read
- or rb_write functions so they can return.
-
- Below only applies to library working in threaded environment When
- rb_read and rb_write work in another threads and you want to join
- them after stopping rb, you should call this function before joining
- threads. If you do it otherwise, threads calling rb_write or
- rb_read can be in locked state, waiting for resources, and threads
- might never return to call this function. You have been warned
+ Frees resources allocated by rb_new. Due to pthread nature this function
+ should be called *only* when no other threads are working on rb object.
========================================================================== */
@@ -960,10 +851,6 @@ int rb_destroy
struct rb *rb /* rb object */
)
{
-#if ENABLE_THREADS
- pthread_t destroy_thread;
-#endif
-
if (rb == NULL)
{
errno = EINVAL;
@@ -978,15 +865,63 @@ int rb_destroy
return 0;
}
- pthread_create(&destroy_thread, NULL, rb_destroy_async, rb);
- pthread_detach(destroy_thread);
+ /*
+ * check if user called rb_stop, if not (rb->stopped will be -1), we trust
+ * caller made sure all threads are stopped before calling destroy.
+ */
+
+ pthread_mutex_lock(&rb->lock);
+ if (rb->stopped_all == 0)
+ {
+ rb->stopped_all = 1;
+ pthread_mutex_unlock(&rb->lock);
+ pthread_join(rb->stop_thread, NULL);
+ }
+ else
+ {
+ pthread_mutex_unlock(&rb->lock);
+ }
+
+ pthread_cond_destroy(&rb->wait_data);
+ pthread_cond_destroy(&rb->wait_room);
+ pthread_mutex_destroy(&rb->lock);
+#endif
-#else
free(rb->buffer);
free(rb);
-#endif
+ return 0;
+}
+
+
+/* ==========================================================================
+ Simply starts rb_stop_thread that will force all threads to exit any
+ rb_* public functions.
+ ========================================================================== */
+
+
+int rb_stop
+(
+ struct rb *rb /* rb object */
+)
+{
+#if ENABLE_THREADS
+ if (rb == NULL || rb->flags & O_NONTHREAD)
+ {
+ errno = EINVAL;
+ return -1;
+ }
+
+ rb->stopped_all = 0;
+ if (pthread_create(&rb->stop_thread, NULL, rb_stop_thread, rb) != 0)
+ {
+ return -1;
+ }
return 0;
+#else
+ errno = ENOSYS;
+ return -1;
+#endif
}
diff --git a/rb/tests.c b/rb/tests.c
index 1f779dd..1c115c3 100644
--- a/rb/tests.c
+++ b/rb/tests.c
@@ -132,9 +132,11 @@ static void multi_producers_consumers(void)
pthread_t *prod;
struct rb *rb;
unsigned int i, r;
+ int count;
multi_index = 0;
multi_index_count = 0;
+ count = 0;
memset(data, 0, sizeof(data));
cons = malloc(t_num_consumers * sizeof(*cons));
prod = malloc(t_num_producers * sizeof(*prod));
@@ -156,10 +158,14 @@ static void multi_producers_consumers(void)
/*
* wait until all indexes has been consumed
*/
- while (multi_index_count < sizeof(data)/sizeof(*data))
+ while (count < sizeof(data)/sizeof(*data))
{
int buf[16];
+ pthread_mutex_lock(&multi_mutex_count);
+ count = multi_index_count;
+ pthread_mutex_unlock(&multi_mutex_count);
+
/*
* while waiting, we randomly peek into rb, and to make sure,
* peeking won't make a difference
@@ -168,7 +174,7 @@ static void multi_producers_consumers(void)
rb_recv(rb, buf, rand() % 16, MSG_PEEK);
}
- rb_destroy(rb);
+ rb_stop(rb);
for (i = 0; i != t_num_consumers; ++i)
{
@@ -180,6 +186,8 @@ static void multi_producers_consumers(void)
pthread_join(prod[i], NULL);
}
+ rb_destroy(rb);
+
for (r = 0, i = 0; i < sizeof(data)/sizeof(*data); ++i)
{
r += (data[i] != 1);
@@ -370,8 +378,8 @@ int main(void)
unsigned int t_writelen_max = 128;
unsigned int t_objsize_max = 128;
- unsigned int t_num_producers_max = 8;
- unsigned int t_num_consumers_max = 8;
+ unsigned int t_num_producers_max = 68;
+ unsigned int t_num_consumers_max = 68;
srand(time(NULL));
@@ -387,6 +395,7 @@ int main(void)
}
#endif
+ mt_return();
for (t_rblen = 2; t_rblen < t_rblen_max; t_rblen *= 2)
{
for (t_readlen = 2; t_readlen < t_readlen_max;