GCC Code Coverage Report
Directory: ./ Exec Total Coverage
File: src/pool.c Lines: 63 107 58.9 %
Date: 2020-12-10 21:44:00 Branches: 28 68 41.2 %

Line Branch Exec Source
1
/*
2
 * Copyright (c) 2013, Mathias Brossard <mathias@brossard.org>.
3
 * All rights reserved.
4
 *
5
 * Redistribution and use in source and binary forms, with or without
6
 * modification, are permitted provided that the following conditions are
7
 * met:
8
 *
9
 *  1. Redistributions of source code must retain the above copyright
10
 *     notice, this list of conditions and the following disclaimer.
11
 *
12
 *  2. Redistributions in binary form must reproduce the above copyright
13
 *     notice, this list of conditions and the following disclaimer in the
14
 *     documentation and/or other materials provided with the distribution.
15
 *
16
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
17
 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
18
 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
19
 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
20
 * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23
 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24
 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
26
 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27
 */
28
29
/**
30
 * @file threadpool.c
31
 * @brief Threadpool implementation file
32
 */
33
34
#include <stdlib.h>
35
#include <pthread.h>
36
#include <unistd.h>
37
38
#include "pool.h"
39
#include "alloc.h"
40
41
typedef enum {
42
    immediate_shutdown = 1,
43
    graceful_shutdown  = 2
44
} threadpool_shutdown_t;
45
46
/**
47
 *  @struct threadpool_task
48
 *  @brief the work struct
49
 *
50
 *  @var function Pointer to the function that will perform the task.
51
 *  @var argument Argument to be passed to the function.
52
 */
53
54
typedef struct {
55
    void (*function)(void *);
56
    void *argument;
57
} threadpool_task_t;
58
59
/**
60
 *  @struct threadpool
61
 *  @brief The threadpool struct
62
 *
63
 *  @var attr 		  Attribute variable to give the thread attributes.
64
 *  @var notify       Condition variable to notify worker threads.
65
 *  @var threads      Array containing worker threads ID.
66
 *  @var thread_count Number of threads
67
 *  @var queue        Array containing the task queue.
68
 *  @var queue_size   Size of the task queue.
69
 *  @var head         Index of the first element.
70
 *  @var tail         Index of the next element.
71
 *  @var count        Number of pending tasks
72
 *  @var shutdown     Flag indicating if the pool is shutting down
73
 *  @var started      Number of started threads
74
 */
75
struct threadpool_t {
76
    pthread_mutex_t lock;
77
    pthread_cond_t notify;
78
    pthread_attr_t attr;
79
    pthread_t *threads;
80
    threadpool_task_t *queue;
81
    int thread_count;
82
    int queue_size;
83
    int head;
84
    int tail;
85
    int count;
86
    int shutdown;
87
    int started;
88
};
89
90
/**
91
 * @function void *threadpool_thread(void *threadpool)
92
 * @brief the worker thread
93
 * @param threadpool the pool which own the thread
94
 */
95
static void *threadpool_thread(void *threadpool);
96
97
int threadpool_free(threadpool_t *pool);
98
99
char *threadpool_strerror(int err) {
100
    switch (err) {
101
        case threadpool_thread_failure:
102
            return "Thread failed in threadpool";
103
        case threadpool_lock_failure:
104
            return "Lock failed in threadpool";
105
        case threadpool_invalid:
106
            return "Invalid arguments in threadpool";
107
        case threadpool_queue_full:
108
            return "Queue was full in threadpool";
109
        case threadpool_shutdown:
110
            return "Threadpool is shutting down";
111
        default:
112
            return "Unknown error occured in threadpool";
113
    }
114
}
115
116
2
threadpool_t *threadpool_create(int thread_count, int queue_size,
117
        int thread_size, int flags) {
118
2
    threadpool_t *pool;
119
2
    int i;
120
121
    /* TODO: Check for negative or otherwise very big input parameters */
122
123
2
    if ((pool = (threadpool_t *)WSS_malloc(sizeof(threadpool_t))) == NULL) {
124
        goto err;
125
    }
126
127
    /* Initialize */
128
2
    pool->thread_count = 0;
129
2
    pool->queue_size = queue_size;
130
2
    pool->head = pool->tail = pool->count = 0;
131
2
    pool->shutdown = pool->started = 0;
132
133
    /* Allocate attributes */
134
2
    pthread_attr_init(&pool->attr);
135
2
    pthread_attr_setstacksize(&pool->attr, thread_size);
136
137
    /* Allocate thread and task queue */
138
2
    pool->threads = (pthread_t *) WSS_malloc(sizeof(pthread_t) * thread_count);
139
2
    pool->queue = (threadpool_task_t *) WSS_malloc(sizeof(threadpool_task_t) * queue_size);
140
141
    /* Initialize mutex and conditional variable first */
142

4
    if ( (pthread_mutex_init(&(pool->lock), NULL) != 0) ||
143
2
            (pthread_cond_init(&(pool->notify), NULL) != 0) ||
144
2
            (pool->threads == NULL) ||
145
1
            (pool->queue == NULL)) {
146
1
        goto err;
147
    }
148
149
    /* Start worker threads */
150
5
    for (i = 0; i < thread_count; i++) {
151
4
        if (pthread_create(&(pool->threads[i]), &pool->attr, threadpool_thread,
152
                    (void*) pool) != 0) {
153
            threadpool_destroy(pool, 0);
154
            return NULL;
155
        }
156
4
        pool->thread_count++;
157
4
        pool->started++;
158
    }
159
160
    return pool;
161
162
1
err:
163
1
    if (pool) {
164
1
        threadpool_free(pool);
165
    }
166
    return NULL;
167
}
168
169
int threadpool_add(threadpool_t *pool, void (*function)(void *),
170
        void *argument, int flags){
171
    int err = 0;
172
    int next;
173
174
    (void) flags;
175
176
    if (NULL == pool || NULL == function) {
177
        return threadpool_invalid;
178
    }
179
180
    if (pthread_mutex_lock(&(pool->lock)) != 0) {
181
        return threadpool_lock_failure;
182
    }
183
184
    next = pool->tail + 1;
185
    next = (next == pool->queue_size) ? 0 : next;
186
187
    do {
188
        /* Are we full ? */
189
        if (pool->count == pool->queue_size) {
190
            err = threadpool_queue_full;
191
            break;
192
        }
193
194
        /* Are we shutting down ? */
195
        if (pool->shutdown) {
196
            err = threadpool_shutdown;
197
            break;
198
        }
199
200
        /* Add task to queue */
201
        pool->queue[pool->tail].function = function;
202
        pool->queue[pool->tail].argument = argument;
203
        pool->tail = next;
204
        pool->count += 1;
205
206
        /* pthread_cond_broadcast */
207
        if (pthread_cond_signal(&(pool->notify)) != 0) {
208
            err = threadpool_lock_failure;
209
            break;
210
        }
211
    } while(0);
212
213
    if (pthread_mutex_unlock(&pool->lock) != 0) {
214
        err = threadpool_lock_failure;
215
    }
216
217
    return err;
218
}
219
220
1
int threadpool_destroy(threadpool_t *pool, int flags) {
221
1
    int i, err = 0;
222
223
1
    if (NULL == pool) {
224
        return threadpool_invalid;
225
    }
226
227
1
    if (pthread_mutex_lock(&(pool->lock)) != 0) {
228
        return threadpool_lock_failure;
229
    }
230
231
1
    do {
232
        /* Already shutting down */
233
1
        if (pool->shutdown) {
234
            err = threadpool_shutdown;
235
            break;
236
        }
237
238
2
        pool->shutdown = (flags & threadpool_graceful) ?
239
1
            graceful_shutdown : immediate_shutdown;
240
241
        /* Wake up all worker threads */
242

2
        if ( (pthread_cond_broadcast(&(pool->notify)) != 0) ||
243
1
                (pthread_mutex_unlock(&(pool->lock)) != 0) ) {
244
            err = threadpool_lock_failure;
245
            break;
246
        }
247
248
        /* Join all worker thread */
249
5
        for (i = 0; i < pool->thread_count; i++) {
250
4
            if (pthread_join(pool->threads[i], NULL) != 0) {
251
                err = threadpool_thread_failure;
252
            }
253
        }
254
1
    } while(0);
255
256
    /* Only if everything went well do we deallocate the pool */
257
1
    if (!err) {
258
1
        threadpool_free(pool);
259
    }
260
    return err;
261
}
262
263
2
int threadpool_free(threadpool_t *pool) {
264

2
    if(NULL == pool || pool->started > 0) {
265
        return -1;
266
    }
267
268
    /* Did we manage to allocate ? */
269
2
    if (pool->threads) {
270
1
        pthread_attr_destroy(&pool->attr);
271
1
        WSS_free((void **) &pool->threads);
272
1
        WSS_free((void **) &pool->queue);
273
274
        /* Because we allocate pool->threads after initializing the
275
           mutex and condition variable, we're sure they're
276
           initialized. Let's lock the mutex just in case. */
277
1
        pthread_mutex_lock(&(pool->lock));
278
1
        pthread_mutex_destroy(&(pool->lock));
279
1
        pthread_cond_destroy(&(pool->notify));
280
    }
281
282
2
    WSS_free((void **) &pool);
283
2
    return 0;
284
}
285
286
287
4
static void *threadpool_thread(void *arguments) {
288
4
    threadpool_t *pool = (threadpool_t *)arguments;
289
4
    threadpool_task_t task;
290
291
#ifdef USE_RPMALLOC
292
4
    rpmalloc_thread_initialize();
293
#endif
294
295
4
    for (;;) {
296
        /* Lock must be taken to wait on conditional variable */
297
4
        pthread_mutex_lock(&(pool->lock));
298
299
        /* Wait on condition variable, check for spurious wakeups.
300
           When returning from pthread_cond_wait(), we own the lock. */
301
6
        while ( (pool->count == 0) && (!pool->shutdown) ) {
302
2
            pthread_cond_wait(&(pool->notify), &(pool->lock));
303
        }
304
305

4
        if ( (pool->shutdown == immediate_shutdown) ||
306
                ((pool->shutdown == graceful_shutdown) && (pool->count == 0)) ) {
307
            break;
308
        }
309
310
        /* Grab our task */
311
        task.function = pool->queue[pool->head].function;
312
        task.argument = pool->queue[pool->head].argument;
313
        pool->head += 1;
314
        pool->head = (pool->head == pool->queue_size) ? 0 : pool->head;
315
        pool->count -= 1;
316
317
        /* Unlock */
318
        pthread_mutex_unlock(&(pool->lock));
319
        /* Get to work */
320
        (*(task.function))(task.argument);
321
    }
322
323
4
    pool->started--;
324
325
4
    pthread_mutex_unlock(&(pool->lock));
326
327
#ifdef USE_RPMALLOC
328
4
    rpmalloc_thread_finalize();
329
#endif
330
331
4
    pthread_exit(NULL);
332
    return NULL;
333
}