| 1 |  |  | /* | 
    
    | 2 |  |  |  * Copyright (c) 2013, Mathias Brossard <mathias@brossard.org>. | 
    
    | 3 |  |  |  * All rights reserved. | 
    
    | 4 |  |  |  * | 
    
    | 5 |  |  |  * Redistribution and use in source and binary forms, with or without | 
    
    | 6 |  |  |  * modification, are permitted provided that the following conditions are | 
    
    | 7 |  |  |  * met: | 
    
    | 8 |  |  |  * | 
    
    | 9 |  |  |  *  1. Redistributions of source code must retain the above copyright | 
    
    | 10 |  |  |  *     notice, this list of conditions and the following disclaimer. | 
    
    | 11 |  |  |  * | 
    
    | 12 |  |  |  *  2. Redistributions in binary form must reproduce the above copyright | 
    
    | 13 |  |  |  *     notice, this list of conditions and the following disclaimer in the | 
    
    | 14 |  |  |  *     documentation and/or other materials provided with the distribution. | 
    
    | 15 |  |  |  * | 
    
    | 16 |  |  |  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS | 
    
    | 17 |  |  |  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT | 
    
    | 18 |  |  |  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR | 
    
    | 19 |  |  |  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT | 
    
    | 20 |  |  |  * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, | 
    
    | 21 |  |  |  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT | 
    
    | 22 |  |  |  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, | 
    
    | 23 |  |  |  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY | 
    
    | 24 |  |  |  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT | 
    
    | 25 |  |  |  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE | 
    
    | 26 |  |  |  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. | 
    
    | 27 |  |  |  */ | 
    
    | 28 |  |  |  | 
    
    | 29 |  |  | /** | 
    
    | 30 |  |  |  * @file threadpool.c | 
    
    | 31 |  |  |  * @brief Threadpool implementation file | 
    
    | 32 |  |  |  */ | 
    
    | 33 |  |  |  | 
    
    | 34 |  |  | #include <stdlib.h> | 
    
    | 35 |  |  | #include <pthread.h> | 
    
    | 36 |  |  | #include <unistd.h> | 
    
    | 37 |  |  |  | 
    
    | 38 |  |  | #include "pool.h" | 
    
    | 39 |  |  | #include "alloc.h" | 
    
    | 40 |  |  |  | 
    
    | 41 |  |  | typedef enum { | 
    
    | 42 |  |  |     immediate_shutdown = 1, | 
    
    | 43 |  |  |     graceful_shutdown  = 2 | 
    
    | 44 |  |  | } threadpool_shutdown_t; | 
    
    | 45 |  |  |  | 
    
    | 46 |  |  | /** | 
    
    | 47 |  |  |  *  @struct threadpool_task | 
    
    | 48 |  |  |  *  @brief the work struct | 
    
    | 49 |  |  |  * | 
    
    | 50 |  |  |  *  @var function Pointer to the function that will perform the task. | 
    
    | 51 |  |  |  *  @var argument Argument to be passed to the function. | 
    
    | 52 |  |  |  */ | 
    
    | 53 |  |  |  | 
    
    | 54 |  |  | typedef struct { | 
    
    | 55 |  |  |     void (*function)(void *); | 
    
    | 56 |  |  |     void *argument; | 
    
    | 57 |  |  | } threadpool_task_t; | 
    
    | 58 |  |  |  | 
    
    | 59 |  |  | /** | 
    
    | 60 |  |  |  *  @struct threadpool | 
    
    | 61 |  |  |  *  @brief The threadpool struct | 
    
    | 62 |  |  |  * | 
    
    | 63 |  |  |  *  @var attr 		  Attribute variable to give the thread attributes. | 
    
    | 64 |  |  |  *  @var notify       Condition variable to notify worker threads. | 
    
    | 65 |  |  |  *  @var threads      Array containing worker threads ID. | 
    
    | 66 |  |  |  *  @var thread_count Number of threads | 
    
    | 67 |  |  |  *  @var queue        Array containing the task queue. | 
    
    | 68 |  |  |  *  @var queue_size   Size of the task queue. | 
    
    | 69 |  |  |  *  @var head         Index of the first element. | 
    
    | 70 |  |  |  *  @var tail         Index of the next element. | 
    
    | 71 |  |  |  *  @var count        Number of pending tasks | 
    
    | 72 |  |  |  *  @var shutdown     Flag indicating if the pool is shutting down | 
    
    | 73 |  |  |  *  @var started      Number of started threads | 
    
    | 74 |  |  |  */ | 
    
    | 75 |  |  | struct threadpool_t { | 
    
    | 76 |  |  |     pthread_mutex_t lock; | 
    
    | 77 |  |  |     pthread_cond_t notify; | 
    
    | 78 |  |  |     pthread_attr_t attr; | 
    
    | 79 |  |  |     pthread_t *threads; | 
    
    | 80 |  |  |     threadpool_task_t *queue; | 
    
    | 81 |  |  |     int thread_count; | 
    
    | 82 |  |  |     int queue_size; | 
    
    | 83 |  |  |     int head; | 
    
    | 84 |  |  |     int tail; | 
    
    | 85 |  |  |     int count; | 
    
    | 86 |  |  |     int shutdown; | 
    
    | 87 |  |  |     int started; | 
    
    | 88 |  |  | }; | 
    
    | 89 |  |  |  | 
    
    | 90 |  |  | /** | 
    
    | 91 |  |  |  * @function void *threadpool_thread(void *threadpool) | 
    
    | 92 |  |  |  * @brief the worker thread | 
    
    | 93 |  |  |  * @param threadpool the pool which own the thread | 
    
    | 94 |  |  |  */ | 
    
    | 95 |  |  | static void *threadpool_thread(void *threadpool); | 
    
    | 96 |  |  |  | 
    
    | 97 |  |  | int threadpool_free(threadpool_t *pool); | 
    
    | 98 |  |  |  | 
    
    | 99 |  |  | char *threadpool_strerror(int err) { | 
    
    | 100 |  |  |     switch (err) { | 
    
    | 101 |  |  |         case threadpool_thread_failure: | 
    
    | 102 |  |  |             return "Thread failed in threadpool"; | 
    
    | 103 |  |  |         case threadpool_lock_failure: | 
    
    | 104 |  |  |             return "Lock failed in threadpool"; | 
    
    | 105 |  |  |         case threadpool_invalid: | 
    
    | 106 |  |  |             return "Invalid arguments in threadpool"; | 
    
    | 107 |  |  |         case threadpool_queue_full: | 
    
    | 108 |  |  |             return "Queue was full in threadpool"; | 
    
    | 109 |  |  |         case threadpool_shutdown: | 
    
    | 110 |  |  |             return "Threadpool is shutting down"; | 
    
    | 111 |  |  |         default: | 
    
    | 112 |  |  |             return "Unknown error occured in threadpool"; | 
    
    | 113 |  |  |     } | 
    
    | 114 |  |  | } | 
    
    | 115 |  |  |  | 
    
    | 116 |  | 2 | threadpool_t *threadpool_create(int thread_count, int queue_size, | 
    
    | 117 |  |  |         int thread_size, int flags) { | 
    
    | 118 |  | 2 |     threadpool_t *pool; | 
    
    | 119 |  | 2 |     int i; | 
    
    | 120 |  |  |  | 
    
    | 121 |  |  |     /* TODO: Check for negative or otherwise very big input parameters */ | 
    
    | 122 |  |  |  | 
    
    | 123 | ✗✓ | 2 |     if ((pool = (threadpool_t *)WSS_malloc(sizeof(threadpool_t))) == NULL) { | 
    
    | 124 |  |  |         goto err; | 
    
    | 125 |  |  |     } | 
    
    | 126 |  |  |  | 
    
    | 127 |  |  |     /* Initialize */ | 
    
    | 128 |  | 2 |     pool->thread_count = 0; | 
    
    | 129 |  | 2 |     pool->queue_size = queue_size; | 
    
    | 130 |  | 2 |     pool->head = pool->tail = pool->count = 0; | 
    
    | 131 |  | 2 |     pool->shutdown = pool->started = 0; | 
    
    | 132 |  |  |  | 
    
    | 133 |  |  |     /* Allocate attributes */ | 
    
    | 134 |  | 2 |     pthread_attr_init(&pool->attr); | 
    
    | 135 |  | 2 |     pthread_attr_setstacksize(&pool->attr, thread_size); | 
    
    | 136 |  |  |  | 
    
    | 137 |  |  |     /* Allocate thread and task queue */ | 
    
    | 138 |  | 2 |     pool->threads = (pthread_t *) WSS_malloc(sizeof(pthread_t) * thread_count); | 
    
    | 139 |  | 2 |     pool->queue = (threadpool_task_t *) WSS_malloc(sizeof(threadpool_task_t) * queue_size); | 
    
    | 140 |  |  |  | 
    
    | 141 |  |  |     /* Initialize mutex and conditional variable first */ | 
    
    | 142 | ✓✗✓✗ 
 | 4 |     if ( (pthread_mutex_init(&(pool->lock), NULL) != 0) || | 
    
    | 143 |  | 2 |             (pthread_cond_init(&(pool->notify), NULL) != 0) || | 
    
    | 144 | ✓✓ | 2 |             (pool->threads == NULL) || | 
    
    | 145 | ✗✓ | 1 |             (pool->queue == NULL)) { | 
    
    | 146 |  | 1 |         goto err; | 
    
    | 147 |  |  |     } | 
    
    | 148 |  |  |  | 
    
    | 149 |  |  |     /* Start worker threads */ | 
    
    | 150 | ✓✓ | 5 |     for (i = 0; i < thread_count; i++) { | 
    
    | 151 | ✗✓ | 4 |         if (pthread_create(&(pool->threads[i]), &pool->attr, threadpool_thread, | 
    
    | 152 |  |  |                     (void*) pool) != 0) { | 
    
    | 153 |  |  |             threadpool_destroy(pool, 0); | 
    
    | 154 |  |  |             return NULL; | 
    
    | 155 |  |  |         } | 
    
    | 156 |  | 4 |         pool->thread_count++; | 
    
    | 157 |  | 4 |         pool->started++; | 
    
    | 158 |  |  |     } | 
    
    | 159 |  |  |  | 
    
    | 160 |  |  |     return pool; | 
    
    | 161 |  |  |  | 
    
    | 162 |  | 1 | err: | 
    
    | 163 | ✓✗ | 1 |     if (pool) { | 
    
    | 164 |  | 1 |         threadpool_free(pool); | 
    
    | 165 |  |  |     } | 
    
    | 166 |  |  |     return NULL; | 
    
    | 167 |  |  | } | 
    
    | 168 |  |  |  | 
    
    | 169 |  |  | int threadpool_add(threadpool_t *pool, void (*function)(void *), | 
    
    | 170 |  |  |         void *argument, int flags){ | 
    
    | 171 |  |  |     int err = 0; | 
    
    | 172 |  |  |     int next; | 
    
    | 173 |  |  |  | 
    
    | 174 |  |  |     (void) flags; | 
    
    | 175 |  |  |  | 
    
    | 176 |  |  |     if (NULL == pool || NULL == function) { | 
    
    | 177 |  |  |         return threadpool_invalid; | 
    
    | 178 |  |  |     } | 
    
    | 179 |  |  |  | 
    
    | 180 |  |  |     if (pthread_mutex_lock(&(pool->lock)) != 0) { | 
    
    | 181 |  |  |         return threadpool_lock_failure; | 
    
    | 182 |  |  |     } | 
    
    | 183 |  |  |  | 
    
    | 184 |  |  |     next = pool->tail + 1; | 
    
    | 185 |  |  |     next = (next == pool->queue_size) ? 0 : next; | 
    
    | 186 |  |  |  | 
    
    | 187 |  |  |     do { | 
    
    | 188 |  |  |         /* Are we full ? */ | 
    
    | 189 |  |  |         if (pool->count == pool->queue_size) { | 
    
    | 190 |  |  |             err = threadpool_queue_full; | 
    
    | 191 |  |  |             break; | 
    
    | 192 |  |  |         } | 
    
    | 193 |  |  |  | 
    
    | 194 |  |  |         /* Are we shutting down ? */ | 
    
    | 195 |  |  |         if (pool->shutdown) { | 
    
    | 196 |  |  |             err = threadpool_shutdown; | 
    
    | 197 |  |  |             break; | 
    
    | 198 |  |  |         } | 
    
    | 199 |  |  |  | 
    
    | 200 |  |  |         /* Add task to queue */ | 
    
    | 201 |  |  |         pool->queue[pool->tail].function = function; | 
    
    | 202 |  |  |         pool->queue[pool->tail].argument = argument; | 
    
    | 203 |  |  |         pool->tail = next; | 
    
    | 204 |  |  |         pool->count += 1; | 
    
    | 205 |  |  |  | 
    
    | 206 |  |  |         /* pthread_cond_broadcast */ | 
    
    | 207 |  |  |         if (pthread_cond_signal(&(pool->notify)) != 0) { | 
    
    | 208 |  |  |             err = threadpool_lock_failure; | 
    
    | 209 |  |  |             break; | 
    
    | 210 |  |  |         } | 
    
    | 211 |  |  |     } while(0); | 
    
    | 212 |  |  |  | 
    
    | 213 |  |  |     if (pthread_mutex_unlock(&pool->lock) != 0) { | 
    
    | 214 |  |  |         err = threadpool_lock_failure; | 
    
    | 215 |  |  |     } | 
    
    | 216 |  |  |  | 
    
    | 217 |  |  |     return err; | 
    
    | 218 |  |  | } | 
    
    | 219 |  |  |  | 
    
    | 220 |  | 1 | int threadpool_destroy(threadpool_t *pool, int flags) { | 
    
    | 221 |  | 1 |     int i, err = 0; | 
    
    | 222 |  |  |  | 
    
    | 223 | ✓✗ | 1 |     if (NULL == pool) { | 
    
    | 224 |  |  |         return threadpool_invalid; | 
    
    | 225 |  |  |     } | 
    
    | 226 |  |  |  | 
    
    | 227 | ✓✗ | 1 |     if (pthread_mutex_lock(&(pool->lock)) != 0) { | 
    
    | 228 |  |  |         return threadpool_lock_failure; | 
    
    | 229 |  |  |     } | 
    
    | 230 |  |  |  | 
    
    | 231 |  | 1 |     do { | 
    
    | 232 |  |  |         /* Already shutting down */ | 
    
    | 233 | ✓✗ | 1 |         if (pool->shutdown) { | 
    
    | 234 |  |  |             err = threadpool_shutdown; | 
    
    | 235 |  |  |             break; | 
    
    | 236 |  |  |         } | 
    
    | 237 |  |  |  | 
    
    | 238 |  | 2 |         pool->shutdown = (flags & threadpool_graceful) ? | 
    
    | 239 | ✗✓ | 1 |             graceful_shutdown : immediate_shutdown; | 
    
    | 240 |  |  |  | 
    
    | 241 |  |  |         /* Wake up all worker threads */ | 
    
    | 242 | ✓✗✓✗ 
 | 2 |         if ( (pthread_cond_broadcast(&(pool->notify)) != 0) || | 
    
    | 243 |  | 1 |                 (pthread_mutex_unlock(&(pool->lock)) != 0) ) { | 
    
    | 244 |  |  |             err = threadpool_lock_failure; | 
    
    | 245 |  |  |             break; | 
    
    | 246 |  |  |         } | 
    
    | 247 |  |  |  | 
    
    | 248 |  |  |         /* Join all worker thread */ | 
    
    | 249 | ✓✓ | 5 |         for (i = 0; i < pool->thread_count; i++) { | 
    
    | 250 | ✗✓ | 4 |             if (pthread_join(pool->threads[i], NULL) != 0) { | 
    
    | 251 |  |  |                 err = threadpool_thread_failure; | 
    
    | 252 |  |  |             } | 
    
    | 253 |  |  |         } | 
    
    | 254 |  | 1 |     } while(0); | 
    
    | 255 |  |  |  | 
    
    | 256 |  |  |     /* Only if everything went well do we deallocate the pool */ | 
    
    | 257 | ✓✗ | 1 |     if (!err) { | 
    
    | 258 |  | 1 |         threadpool_free(pool); | 
    
    | 259 |  |  |     } | 
    
    | 260 |  |  |     return err; | 
    
    | 261 |  |  | } | 
    
    | 262 |  |  |  | 
    
    | 263 |  | 2 | int threadpool_free(threadpool_t *pool) { | 
    
    | 264 | ✓✗✓✗ 
 | 2 |     if(NULL == pool || pool->started > 0) { | 
    
    | 265 |  |  |         return -1; | 
    
    | 266 |  |  |     } | 
    
    | 267 |  |  |  | 
    
    | 268 |  |  |     /* Did we manage to allocate ? */ | 
    
    | 269 | ✓✓ | 2 |     if (pool->threads) { | 
    
    | 270 |  | 1 |         pthread_attr_destroy(&pool->attr); | 
    
    | 271 |  | 1 |         WSS_free((void **) &pool->threads); | 
    
    | 272 |  | 1 |         WSS_free((void **) &pool->queue); | 
    
    | 273 |  |  |  | 
    
    | 274 |  |  |         /* Because we allocate pool->threads after initializing the | 
    
    | 275 |  |  |            mutex and condition variable, we're sure they're | 
    
    | 276 |  |  |            initialized. Let's lock the mutex just in case. */ | 
    
    | 277 |  | 1 |         pthread_mutex_lock(&(pool->lock)); | 
    
    | 278 |  | 1 |         pthread_mutex_destroy(&(pool->lock)); | 
    
    | 279 |  | 1 |         pthread_cond_destroy(&(pool->notify)); | 
    
    | 280 |  |  |     } | 
    
    | 281 |  |  |  | 
    
    | 282 |  | 2 |     WSS_free((void **) &pool); | 
    
    | 283 |  | 2 |     return 0; | 
    
    | 284 |  |  | } | 
    
    | 285 |  |  |  | 
    
    | 286 |  |  |  | 
    
    | 287 |  | 4 | static void *threadpool_thread(void *arguments) { | 
    
    | 288 |  | 4 |     threadpool_t *pool = (threadpool_t *)arguments; | 
    
    | 289 |  | 4 |     threadpool_task_t task; | 
    
    | 290 |  |  |  | 
    
    | 291 |  |  | #ifdef USE_RPMALLOC | 
    
    | 292 |  | 4 |     rpmalloc_thread_initialize(); | 
    
    | 293 |  |  | #endif | 
    
    | 294 |  |  |  | 
    
    | 295 |  | 4 |     for (;;) { | 
    
    | 296 |  |  |         /* Lock must be taken to wait on conditional variable */ | 
    
    | 297 |  | 4 |         pthread_mutex_lock(&(pool->lock)); | 
    
    | 298 |  |  |  | 
    
    | 299 |  |  |         /* Wait on condition variable, check for spurious wakeups. | 
    
    | 300 |  |  |            When returning from pthread_cond_wait(), we own the lock. */ | 
    
    | 301 | ✓✓ | 6 |         while ( (pool->count == 0) && (!pool->shutdown) ) { | 
    
    | 302 |  | 2 |             pthread_cond_wait(&(pool->notify), &(pool->lock)); | 
    
    | 303 |  |  |         } | 
    
    | 304 |  |  |  | 
    
    | 305 | ✓✗✗✓ 
 | 4 |         if ( (pool->shutdown == immediate_shutdown) || | 
    
    | 306 |  |  |                 ((pool->shutdown == graceful_shutdown) && (pool->count == 0)) ) { | 
    
    | 307 |  |  |             break; | 
    
    | 308 |  |  |         } | 
    
    | 309 |  |  |  | 
    
    | 310 |  |  |         /* Grab our task */ | 
    
    | 311 |  |  |         task.function = pool->queue[pool->head].function; | 
    
    | 312 |  |  |         task.argument = pool->queue[pool->head].argument; | 
    
    | 313 |  |  |         pool->head += 1; | 
    
    | 314 |  |  |         pool->head = (pool->head == pool->queue_size) ? 0 : pool->head; | 
    
    | 315 |  |  |         pool->count -= 1; | 
    
    | 316 |  |  |  | 
    
    | 317 |  |  |         /* Unlock */ | 
    
    | 318 |  |  |         pthread_mutex_unlock(&(pool->lock)); | 
    
    | 319 |  |  |         /* Get to work */ | 
    
    | 320 |  |  |         (*(task.function))(task.argument); | 
    
    | 321 |  |  |     } | 
    
    | 322 |  |  |  | 
    
    | 323 |  | 4 |     pool->started--; | 
    
    | 324 |  |  |  | 
    
    | 325 |  | 4 |     pthread_mutex_unlock(&(pool->lock)); | 
    
    | 326 |  |  |  | 
    
    | 327 |  |  | #ifdef USE_RPMALLOC | 
    
    | 328 |  | 4 |     rpmalloc_thread_finalize(); | 
    
    | 329 |  |  | #endif | 
    
    | 330 |  |  |  | 
    
    | 331 |  | 4 |     pthread_exit(NULL); | 
    
    | 332 |  |  |     return NULL; | 
    
    | 333 |  |  | } |