GCC Code Coverage Report
Directory: ./ Exec Total Coverage
File: src/event.c Lines: 0 220 0.0 %
Date: 2020-12-10 21:44:00 Branches: 0 120 0.0 %

Line Branch Exec Source
1
#include "event.h"
2
3
#if defined(WSS_EPOLL)
4
#include <sys/epoll.h>
5
#elif defined(WSS_KQUEUE)
6
#include <sys/event.h>
7
#include <sys/time.h>
8
#elif defined(WSS_POLL)
9
10
#ifndef _GNU_SOURCE
11
#define _GNU_SOURCE
12
#endif
13
14
#ifndef POLLRDHUP
15
#define POLLRDHUP  0x2000
16
#endif
17
18
#include <poll.h>
19
#include <sys/time.h>
20
#include <sys/resource.h>
21
#endif
22
23
#include <unistd.h>
24
#include <errno.h>
25
#include <string.h>
26
#include <time.h>
27
#include "server.h"
28
#include "alloc.h"
29
#include "worker.h"
30
#include "socket.h"
31
#include "pool.h"
32
#include "log.h"
33
#include "predict.h"
34
35
int close_pipefd[2] = {-1, -1};
36
37
/**
38
 * Writes a rearm message on the rearm pipe.
39
 *
40
 * @param 	server	[wss_server_t *] 	"A wss_server_t instance"
41
 * @return 			[void]
42
 */
43
static inline void rearm(wss_server_t *server) {
44
    int n;
45
46
    WSS_log_trace("Notify about rearm");
47
48
    if ( likely(server->rearm_pipefd[0] != -1 && server->rearm_pipefd[1] != -1) ) {
49
        do {
50
            errno = 0;
51
            n = write(server->rearm_pipefd[1], "ARM", 3);
52
            if ( unlikely(n < 0) ) {
53
                if ( unlikely(errno != EINTR) ) {
54
                    WSS_log_fatal("Unable to write rearm message through pipe");
55
                }
56
            }
57
        } while ( unlikely(errno == EINTR) );
58
    }
59
}
60
61
/**
62
 * Reads as much as possible from rearm pipe.
63
 *
64
 * @param 	server	[wss_server_t *] 	"A wss_server_t instance"
65
 * @return 			[void]
66
 */
67
static inline void handle_rearm(wss_server_t *server) {
68
    int n;
69
    char buf[server->config->size_buffer];
70
71
    WSS_log_trace("Handle rearm");
72
73
    if ( likely(server->rearm_pipefd[0] != -1 && server->rearm_pipefd[1] != -1) ) {
74
        do {
75
            n = read(server->rearm_pipefd[0], buf, server->config->size_buffer);
76
            if ( unlikely(n < 0) ) {
77
                if ( likely(errno == EAGAIN || errno == EWOULDBLOCK) ) {
78
                    return;
79
                } else if ( likely(errno == EINTR) ) {
80
                    errno = 0;
81
                    continue;
82
                } else {
83
                    WSS_log_fatal("Unable to read rearm message through pipe");
84
                }
85
            }
86
        } while ( n > 0 );
87
    }
88
}
89
90
/**
91
 * Function that adds task-function and data instance to worker pool.
92
 *
93
 * @param 	server	[wss_server_t *] 	"A wss_server_t instance"
94
 * @param 	func	[void (*)(void *)] 	"A function pointer"
95
 * @param 	args	[void *] 	        "Arguments to be served to the function"
96
 * @return 			[wss_error_t]       "The error status"
97
 */
98
wss_error_t WSS_add_to_threadpool(wss_server_t *server, void (*func)(void *), void *args) {
99
    int err;
100
    struct timespec tim;
101
    unsigned int retries = 0;
102
103
    tim.tv_sec = 0;
104
    tim.tv_nsec = 100000000;
105
106
    do {
107
        if ( unlikely((err = threadpool_add(server->pool, func, args, 0)) != 0) ) {
108
            switch (err) {
109
                case threadpool_invalid:
110
                    WSS_log_fatal("Threadpool was served with invalid data");
111
                    return WSS_THREADPOOL_INVALID_ERROR;
112
                case threadpool_lock_failure:
113
                    WSS_log_fatal("Locking in thread failed");
114
                    return WSS_THREADPOOL_LOCK_ERROR;
115
                case threadpool_queue_full:
116
                    if ( likely(retries < server->config->pool_retries) ) {
117
                        retries += 1;
118
                        WSS_log_trace("Threadpool full, will retry shortly. Retry number: %d", retries);
119
                        nanosleep(&tim, NULL);
120
                        continue;
121
                    }
122
123
                    // Currently we treat a full threadpool as an error, but
124
                    // we could try to handle this by dynamically increasing size
125
                    // of threadpool, and maybe reset thread count to that of the
126
                    // configuration when the hot load is over
127
                    WSS_log_error("Threadpool queue is full");
128
                    return WSS_THREADPOOL_FULL_ERROR;
129
                case threadpool_shutdown:
130
                    WSS_log_error("Threadpool is shutting down");
131
                    return WSS_THREADPOOL_SHUTDOWN_ERROR;
132
                case threadpool_thread_failure:
133
                    WSS_log_fatal("Threadpool thread return an error");
134
                    return WSS_THREADPOOL_THREAD_ERROR;
135
                default:
136
                    WSS_log_fatal("Unknown error occured with threadpool");
137
                    return WSS_THREADPOOL_ERROR;
138
            }
139
        }
140
    } while (0);
141
142
    return WSS_SUCCESS;
143
}
144
145
/******************************************************************************
146
 *                                   KQUEUE                                   *
147
 ******************************************************************************/
148
149
#if defined(WSS_KQUEUE)
150
151
static struct timespec timeout;
152
153
/**
154
 * Function that creates poll instance and adding the filedescriptor of the
155
 * servers socket to it.
156
 *
157
 * @param 	server	    [wss_server_t *]    "A pointer to a server structure"
158
 * @return 			    [wss_error_t]       "The error status"
159
 */
160
wss_error_t WSS_poll_init(wss_server_t *server) {
161
    struct kevent event;
162
163
    WSS_log_info("Using KQUEUE");
164
165
    WSS_log_trace("Initializing kqueue instance");
166
167
    /**
168
     * Creating epoll instance.
169
     */
170
    if ( unlikely((server->poll_fd = kqueue()) < 0) ) {
171
        WSS_log_fatal("Unable to create server kqueue structure: %s", strerror(errno));
172
        server->poll_fd = -1;
173
        return WSS_POLL_CREATE_ERROR;
174
    }
175
    WSS_server_set_max_fd(server, server->poll_fd);
176
177
    WSS_log_trace("Initializing kqueue events");
178
179
    if ( unlikely(NULL == (server->events = WSS_calloc(server->config->pool_workers, sizeof(event)))) ) {
180
        WSS_log_fatal("Unable to calloc server kqueue events");
181
        return WSS_MEMORY_ERROR;
182
    }
183
184
    if ( close_pipefd[0] == -1 && close_pipefd[1] == -1 ) {
185
        WSS_log_trace("Creating close pipe file descriptors");
186
187
        if ( unlikely(pipe(close_pipefd) == -1) ) {
188
            WSS_log_trace("Unable to create close pipe file descriptors: %s", strerror(errno));
189
            return WSS_POLL_PIPE_ERROR;
190
        }
191
192
        if ( unlikely((err = WSS_socket_non_blocking(close_pipefd[0])) != WSS_SUCCESS) ) {
193
            return err;
194
        }
195
196
        if ( unlikely((err = WSS_socket_non_blocking(close_pipefd[1])) != WSS_SUCCESS) ) {
197
            return err;
198
        }
199
    }
200
201
    WSS_log_trace("Arms close pipe file descriptor to kqueue instance");
202
203
    do {
204
        errno = 0;
205
        EV_SET(&event, close_pipefd[0], EVFILT_READ, EV_ADD, 0, 0, ((void *) &close_pipefd[0]));
206
        ret = kevent(server->poll_fd, &event, 1, NULL, 0, NULL);
207
    } while ( unlikely(errno == EINTR) );
208
209
    if ( unlikely(ret < 0) ) {
210
        WSS_log_error("Failed to (re)arm close pipe file descriptor to kqueue: %s", strerror(errno));
211
        return WSS_POLL_SET_ERROR;
212
    }
213
214
    WSS_server_set_max_fd(server, close_pipefd[0]);
215
216
    WSS_log_trace("Creating arm pipe file descriptors");
217
218
    if ( unlikely(pipe(server->rearm_pipefd) == -1) ) {
219
        WSS_log_trace("Unable to create arm pipe file descriptors: %s", strerror(errno));
220
        return WSS_POLL_PIPE_ERROR;
221
    }
222
223
    if ( unlikely((err = WSS_socket_non_blocking(server->rearm_pipefd[0])) != WSS_SUCCESS) ) {
224
        return err;
225
    }
226
227
    if ( unlikely((err = WSS_socket_non_blocking(server->rearm_pipefd[1])) != WSS_SUCCESS ) ) {
228
        return err;
229
    }
230
231
    WSS_log_trace("Arms rearm pipe file descriptor to kqueue instance");
232
233
    do {
234
        errno = 0;
235
        EV_SET(&event, server->rearm_pipefd[0], EVFILT_READ, EV_ADD, 0, 0, ((void *) &server->rearm_pipefd[0]));
236
        ret = kevent(server->poll_fd, &event, 1, NULL, 0, NULL);
237
    } while ( unlikely(errno == EINTR) );
238
239
    if ( unlikely(ret < 0) ) {
240
        WSS_log_error("Failed to (re)arm arm pipe file descriptor to kqueue: %s", strerror(errno));
241
        return WSS_POLL_SET_ERROR;
242
    }
243
244
    WSS_server_set_max_fd(server, server->rearm_pipefd[0]);
245
246
    WSS_log_trace("Arms server file descriptor to kqueue instance");
247
248
    do {
249
        errno = 0;
250
        EV_SET(&event, server->fd, EVFILT_READ, EV_ADD, 0, 0, ((void *) &server->fd));
251
        ret = kevent(server->poll_fd, &event, 1, NULL, 0, NULL);
252
    } while ( unlikely(errno == EINTR) );
253
254
    if ( unlikely(ret < 0) ) {
255
        WSS_log_error("Failed to (re)arm file descriptor to kqueue: %s", strerror(errno));
256
        return WSS_POLL_SET_ERROR;
257
    }
258
259
    WSS_server_set_max_fd(server, server->fd);
260
261
    if (server->config->timeout_poll >= 0) {
262
        timeout = { server->config->timeout_poll/1000, (server->config->timeout_poll%1000)*1000000 }
263
    }
264
265
    return WSS_SUCCESS;
266
}
267
268
/**
269
 * Function that rearms the poll instance for write events with the clients
270
 * filedescriptor
271
 *
272
 * @param 	server      [wss_server_t *]	"A pointer to a server structure"
273
 * @param 	fd	        [int]	            "The clients file descriptor"
274
 * @return 			    [wss_error_t]       "The error status"
275
 */
276
wss_error_t WSS_poll_set_write(wss_server_t *server, int fd) {
277
    int ret;
278
    struct kevent event;
279
280
    WSS_log_trace("Rearms session %d for write events on kqueue", fd);
281
282
    do {
283
        errno = 0;
284
        EV_SET(&event, fd, EVFILT_WRITE, EV_ADD | EV_ENABLE | EV_CLEAR | EV_ONESHOT, 0, 0, ((void *) &fd));
285
        ret = kevent(server->poll_fd, &event, 1, NULL, 0, NULL);
286
    } while ( unlikely(errno == EINTR) );
287
288
    if ( unlikely(ret < 0) ) {
289
        WSS_log_error("Failed to (re)arm file descriptor to kqueue: %s", strerror(errno));
290
        return WSS_POLL_SET_ERROR;
291
    }
292
293
    WSS_server_set_max_fd(server, fd);
294
295
    return WSS_SUCCESS;
296
}
297
298
/**
299
 * Function that rearms the poll instance for read events with the clients
300
 * filedescriptor
301
 *
302
 * @param 	server      [wss_server_t *server]   "A pointer to a server structure"
303
 * @param 	fd	        [int]	                 "The clients file descriptor"
304
 * @return 			    [wss_error_t]            "The error status"
305
 */
306
wss_error_t WSS_poll_set_read(wss_server_t *server, int fd) {
307
    int ret;
308
    struct kevent event;
309
310
    WSS_log_trace("Rearms session %d for read events on kqueue", fd);
311
312
    do {
313
        errno = 0;
314
        EV_SET(&event, fd, EVFILT_READ, EV_ADD | EV_ENABLE | EV_CLEAR | EV_ONESHOT, 0, 0, ((void *) &fd));
315
        ret = kevent(server->poll_fd, &event, 1, NULL, 0, NULL);
316
    } while ( unlikely(errno == EINTR) );
317
318
    if ( unlikely(ret < 0) ) {
319
        WSS_log_error("Failed to (re)arm file descriptor to kqueue: %s", strerror(errno));
320
        return WSS_POLL_SET_ERROR;
321
    }
322
323
    WSS_server_set_max_fd(server, fd);
324
325
    return WSS_SUCCESS;
326
}
327
328
/**
329
 * Function removes the client filedescriptor from the poll instance
330
 *
331
 * @param 	server      [wss_server_t *]    "A pointer to a server structure"
332
 * @param 	fd	        [int]	            "The clients file descriptor"
333
 * @return 			    [wss_error_t]       "The error status"
334
 */
335
wss_error_t WSS_poll_remove(wss_server_t *server, int fd) {
336
    int ret;
337
    struct kevent event;
338
339
    WSS_log_trace("Removes session %d from kqueue", fd);
340
341
    do {
342
        errno = 0;
343
        EV_SET(&event, fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
344
        ret = kevent(server->poll_fd, &event, 1, NULL, 0, NULL);
345
    } while ( unlikely(errno == EINTR) );
346
347
    if ( unlikely(ret < 0) ) {
348
        WSS_log_error("Failed to delete file descriptor from kqueue: %s", strerror(errno));
349
        return WSS_POLL_REMOVE_ERROR;
350
    }
351
352
    do {
353
        errno = 0;
354
        EV_SET(&event, fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
355
        ret = kevent(server->poll_fd, &event, 1, NULL, 0, NULL);
356
    } while ( unlikely(errno == EINTR) );
357
358
    if ( unlikely(ret < 0) ) {
359
        WSS_log_error("Failed to delete file descriptor from kqueue: %s", strerror(errno));
360
        return WSS_POLL_REMOVE_ERROR;
361
    }
362
363
    return WSS_SUCCESS;
364
}
365
366
/**
367
 * Function that listens for new events on the servers file descriptor
368
 *
369
 * @param 	server      [wss_server_t *]    "A pointer to a server structure"
370
 * @return 			    [wss_error_t]       "The error status"
371
 */
372
wss_error_t WSS_poll_delegate(wss_server_t *server) {
373
    int i, n, fd;
374
    wss_error_t err;
375
    struct kevent *events = (struct kevent *) server->events;
376
377
    if (server->ssl_ctx != NULL) {
378
        WSS_log_trace("Listening for HTTPS kqueue events");
379
    } else {
380
        WSS_log_trace("Listening for HTTP kqueue events");
381
    }
382
383
    do {
384
        errno = 0;
385
386
        if (server->config->timeout_poll >= 0) {
387
            n = kevent(server->poll_fd, NULL, 0, events, server->config->pool_workers, &timeout);
388
        } else {
389
            n = kevent(server->poll_fd, NULL, 0, events, server->config->pool_workers, NULL);
390
        }
391
392
        if ( unlikely(n < 0) ) {
393
            if ( unlikely(errno != EINTR) ) {
394
                return WSS_POLL_WAIT_ERROR;
395
            }
396
        }
397
    } while ( unlikely(errno == EINTR) );
398
399
    if (server->ssl_ctx != NULL) {
400
        WSS_log_trace("Received %d HTTPS events", n);
401
    } else {
402
        WSS_log_trace("Received %d HTTP events", n);
403
    }
404
405
    for (i = 0; i < n; i++) {
406
        fd = *((int *)events[i].udata);
407
        if (events[i].flags & (EV_EOF | EV_ERROR)) {
408
            if ( unlikely(fd == server->fd) ) {
409
                WSS_log_fatal("A server error occured upon kqueue");
410
                return WSS_POLL_WAIT_ERROR;
411
            }
412
413
            WSS_log_trace("Session %d disconnecting", fd);
414
415
            wss_thread_args_t *args = (wss_thread_args_t *) WSS_malloc(sizeof(wss_thread_args_t));
416
            if ( unlikely(NULL == args) ) {
417
                WSS_log_fatal("Failed allocating threadpool argument");
418
                return WSS_MEMORY_ERROR;
419
            }
420
            args->server = server;
421
            args->fd = fd;
422
            args->state = CLOSING
423
424
            err = WSS_add_to_threadpool(server, &WSS_work, (void *)args)
425
            if ( unlikely(err != WSS_SUCCESS) ) {
426
                WSS_log_fatal("Failed adding disconnect job to worker pool");
427
                WSS_free((void **)&args);
428
                return err;
429
            }
430
        } else if ( fd == server->fd ) {
431
            WSS_log_trace("New session connecting");
432
433
            wss_thread_args_t *args = (wss_thread_args_t *) WSS_malloc(sizeof(wss_thread_args_t));
434
            if ( unlikely(NULL == args) ) {
435
                WSS_log_fatal("Failed allocating threadpool argument");
436
                return WSS_MEMORY_ERROR;
437
            }
438
            args->server = server;
439
            args->state = CONNECTING
440
441
            /**
442
             * If new session has connected
443
             */
444
            err = WSS_add_to_threadpool(server, &WSS_work, (void *)args);
445
            if ( unlikely(err != WSS_SUCCESS) ) {
446
                WSS_log_fatal("Failed adding connect job to worker pool");
447
                return err;
448
            }
449
        } else if ( unlikely(fd == close_pipefd[0]) ) {
450
            // Pipe file descriptor is used to interrupt blocking wait
451
            continue;
452
        } else if ( likely(fd == server->rearm_pipefd[0]) ) {
453
            handle_rearm(server);
454
            continue;
455
        } else {
456
            if (events[i].filter == EVFILT_READ) {
457
                WSS_log_trace("Session %d begins to read", fd);
458
459
                /**
460
                 * If new reads are ready
461
                 */
462
                wss_thread_args_t *args = (wss_thread_args_t *) WSS_malloc(sizeof(wss_thread_args_t));
463
                if ( unlikely(NULL == args) ) {
464
                    WSS_log_fatal("Failed allocating threadpool argument");
465
                    return WSS_MEMORY_ERROR;
466
                }
467
                args->server = server;
468
                args->fd = fd;
469
                args->state = READING
470
471
                err = WSS_add_to_threadpool(server, &WSS_work, (void *)args)
472
                if ( unlikely(err != WSS_SUCCESS) ) {
473
                    WSS_log_fatal("Failed adding read job to worker pool");
474
                    WSS_free((void **)&args);
475
                    return err;
476
                }
477
            }
478
479
            if (events[i].filter == EVFILT_WRITE) {
480
                WSS_log_trace("Session %d begins to write", fd);
481
482
                /**
483
                 * If new writes are ready
484
                 */
485
                wss_thread_args_t *args = (wss_thread_args_t *) WSS_malloc(sizeof(wss_thread_args_t));
486
                if ( unlikely(NULL == args) ) {
487
                    WSS_log_fatal("Failed allocating threadpool argument");
488
                    return WSS_MEMORY_ERROR;
489
                }
490
                args->server = server;
491
                args->fd = fd;
492
                args->state = WRITING
493
494
                err = WSS_add_to_threadpool(server, &WSS_work, (void *)args)
495
                if ( unlikely(err != WSS_SUCCESS) ) {
496
                    WSS_log_fatal("Failed adding write job to worker pool");
497
                    WSS_free((void **)&args);
498
                    return err;
499
                }
500
            }
501
        }
502
    }
503
504
    return WSS_SUCCESS;
505
}
506
507
/******************************************************************************
508
 *                                   EPOLL                                    *
509
 ******************************************************************************/
510
511
#elif defined(WSS_EPOLL)
512
static inline wss_error_t WSS_poll_add(int poll_fd, int fd, uint32_t events) {
513
    struct epoll_event event;
514
    int ret;
515
516
    WSS_log_trace("Modifying filedescriptor on epoll eventlist");
517
518
    do {
519
        memset(&event, 0, sizeof(event));
520
521
        event.events = events;
522
        event.data.fd = fd;
523
524
        if ( unlikely((ret = epoll_ctl(poll_fd, EPOLL_CTL_MOD, fd, &event)) < 0) ) {
525
            if ( likely(errno == ENOENT) ) {
526
                WSS_log_trace("Adding file descriptor to epoll eventlist");
527
                errno = 0;
528
529
                memset(&event, 0, sizeof(event));
530
531
                event.events = events;
532
                event.data.fd = fd;
533
534
                ret = epoll_ctl(poll_fd, EPOLL_CTL_ADD, fd, &event);
535
            }
536
        }
537
    } while ( unlikely(errno == EINTR) );
538
539
    if ( unlikely(ret < 0) ) {
540
        WSS_log_error("Failed to (re)arm file descriptor to epoll: %s", strerror(errno));
541
        return WSS_POLL_SET_ERROR;
542
    }
543
544
    return WSS_SUCCESS;
545
}
546
547
/**
548
 * Function that creates poll instance and adding the filedescriptor of the
549
 * servers socket to it.
550
 *
551
 * @param 	server	    [wss_server_t *]    "A pointer to a server structure"
552
 * @return 			    [wss_error_t]       "The error status"
553
 */
554
wss_error_t WSS_poll_init(wss_server_t *server) {
555
    wss_error_t err;
556
    struct epoll_event event;
557
558
    memset(&event, 0, sizeof(event));
559
560
    WSS_log_info("Using EPOLL");
561
562
    WSS_log_trace("Initializing epoll instance");
563
564
    /**
565
     * Creating epoll instance.
566
     */
567
    if ( unlikely((server->poll_fd = epoll_create1(0)) < 0) ) {
568
        WSS_log_fatal("Unable to create server epoll structure: %s", strerror(errno));
569
        server->poll_fd = -1;
570
        return WSS_POLL_CREATE_ERROR;
571
    }
572
    WSS_server_set_max_fd(server, server->poll_fd);
573
574
    WSS_log_trace("Initializing epoll events");
575
576
    if ( unlikely(NULL == (server->events = WSS_calloc(server->config->pool_workers, sizeof(event)))) ) {
577
        WSS_log_fatal("Unable to calloc server epoll events");
578
        return WSS_MEMORY_ERROR;
579
    }
580
581
    if ( close_pipefd[0] == -1 && close_pipefd[1] == -1 ) {
582
        WSS_log_trace("Creating close pipe file descriptors");
583
584
        if ( unlikely(pipe(close_pipefd) == -1) ) {
585
            WSS_log_trace("Unable to create close pipe file descriptors: %s", strerror(errno));
586
            return WSS_POLL_PIPE_ERROR;
587
        }
588
589
        if ( unlikely((err = WSS_socket_non_blocking(close_pipefd[0])) != WSS_SUCCESS) ) {
590
            return err;
591
        }
592
593
        if ( unlikely((err = WSS_socket_non_blocking(close_pipefd[1])) != WSS_SUCCESS) ) {
594
            return err;
595
        }
596
    }
597
598
    WSS_log_trace("Arms close pipe file descriptor to epoll instance");
599
600
    if ( unlikely((err = WSS_poll_add(server->poll_fd, close_pipefd[0], EPOLLIN | EPOLLET | EPOLLRDHUP)) != WSS_SUCCESS) ) {
601
        return err;
602
    }
603
604
    WSS_server_set_max_fd(server, close_pipefd[0]);
605
606
    WSS_log_trace("Creating arm pipe file descriptors");
607
608
    if ( unlikely(pipe(server->rearm_pipefd) == -1) ) {
609
        WSS_log_trace("Unable to create arm pipe file descriptors: %s", strerror(errno));
610
        return WSS_POLL_PIPE_ERROR;
611
    }
612
613
    if ( unlikely((err = WSS_socket_non_blocking(server->rearm_pipefd[0])) != WSS_SUCCESS ) ) {
614
        return err;
615
    }
616
617
    if ( unlikely((err = WSS_socket_non_blocking(server->rearm_pipefd[1])) != WSS_SUCCESS ) ) {
618
        return err;
619
    }
620
621
    WSS_log_trace("Arms arm pipe file descriptor to poll instance");
622
623
    if ( unlikely((err = WSS_poll_add(server->poll_fd, server->rearm_pipefd[0], EPOLLIN | EPOLLET | EPOLLRDHUP)) != WSS_SUCCESS) ) {
624
        return err;
625
    }
626
627
    WSS_server_set_max_fd(server, server->rearm_pipefd[0]);
628
629
    WSS_log_trace("Arms server file descriptor to epoll instance");
630
631
    if ( unlikely((err = WSS_poll_add(server->poll_fd, server->fd, EPOLLIN | EPOLLET | EPOLLRDHUP)) != WSS_SUCCESS) ) {
632
        return err;
633
    }
634
635
    WSS_server_set_max_fd(server, server->fd);
636
637
    return WSS_SUCCESS;
638
}
639
640
/**
641
 * Function that rearms the poll instance for write events with the clients
642
 * filedescriptor
643
 *
644
 * @param 	server      [wss_server_t *server]      "A pointer to a server structure"
645
 * @param 	fd	        [int]	                    "The clients file descriptor"
646
 * @return 			    [wss_error_t]               "The error status"
647
 */
648
wss_error_t WSS_poll_set_write(wss_server_t *server, int fd) {
649
    wss_error_t err;
650
651
    WSS_log_trace("Rearms session %d for write epoll events", fd);
652
653
    if ( unlikely((err = WSS_poll_add(server->poll_fd, fd, EPOLLOUT | EPOLLET | EPOLLONESHOT)) != WSS_SUCCESS) ) {
654
        return err;
655
    }
656
657
    WSS_server_set_max_fd(server, fd);
658
659
    return WSS_SUCCESS;
660
}
661
662
/**
663
 * Function that rearms the poll instance for read events with the clients
664
 * filedescriptor
665
 *
666
 * @param 	server      [wss_server_t *]    "A pointer to a server structure"
667
 * @param 	fd	        [int]	            "The clients file descriptor"
668
 * @return 			    [wss_error_t]       "The error status"
669
 */
670
wss_error_t WSS_poll_set_read(wss_server_t *server, int fd) {
671
    wss_error_t err;
672
673
    WSS_log_trace("Rearms session %d for read epoll events", fd);
674
675
    if ( unlikely((err = WSS_poll_add(server->poll_fd, fd, EPOLLIN | EPOLLET | EPOLLONESHOT | EPOLLRDHUP)) != WSS_SUCCESS) ) {
676
        return err;
677
    }
678
679
    WSS_server_set_max_fd(server, fd);
680
681
    return WSS_SUCCESS;
682
}
683
684
/**
685
 * Function removes the client filedescriptor from the poll instance
686
 *
687
 * @param 	server      [wss_server_t *]    "A pointer to a server structure"
688
 * @param 	fd	        [int]	            "The clients file descriptor"
689
 * @return 			    [wss_error_t]       "The error status"
690
 */
691
wss_error_t WSS_poll_remove(wss_server_t *server, int fd) {
692
    struct epoll_event event;
693
    int ret;
694
695
    WSS_log_trace("Removing session %d from epoll events", fd);
696
697
    do {
698
        memset(&event, 0, sizeof(event));
699
700
        event.events = EPOLLIN | EPOLLOUT;
701
        event.data.fd = fd;
702
703
        ret = epoll_ctl(server->poll_fd, EPOLL_CTL_DEL, fd, &event);
704
    } while ( unlikely(errno == EINTR) );
705
706
    if ( unlikely(ret < 0) ) {
707
        WSS_log_error("Failed to remove session %d from epoll: %s", fd, strerror(errno));
708
        return WSS_POLL_REMOVE_ERROR;
709
    }
710
711
    return WSS_SUCCESS;
712
}
713
714
/**
715
 * Function that listens for new events on the servers file descriptor
716
 *
717
 * @param 	server	    [wss_server_t *]    "A pointer to a server structure"
718
 * @return 			    [wss_error_t]       "The error status"
719
 */
720
wss_error_t WSS_poll_delegate(wss_server_t *server) {
721
    int i, n;
722
    wss_error_t err;
723
    struct epoll_event *events = server->events;
724
725
    if (server->ssl_ctx != NULL) {
726
        WSS_log_trace("Listening for HTTPS epoll events");
727
    } else {
728
        WSS_log_trace("Listening for HTTP epoll events");
729
    }
730
731
    do {
732
        errno = 0;
733
        n = epoll_wait(server->poll_fd, events, server->config->pool_workers, server->config->timeout_poll);
734
        if ( unlikely(n < 0) ) {
735
            if ( unlikely(errno != EINTR) ) {
736
                return WSS_POLL_WAIT_ERROR;
737
            }
738
        }
739
    } while ( unlikely(errno == EINTR) );
740
741
    if (server->ssl_ctx != NULL) {
742
        WSS_log_trace("Received %d HTTPS events", n);
743
    } else {
744
        WSS_log_trace("Received %d HTTP events", n);
745
    }
746
747
    for (i = 0; i < n; i++) {
748
        if ( unlikely((events[i].events & EPOLLHUP) ||
749
                    (events[i].events & EPOLLERR) ||
750
                    (events[i].events & EPOLLRDHUP)) ) {
751
            if ( unlikely(events[i].data.fd == server->fd) ) {
752
                WSS_log_fatal("A server error occured upon epoll");
753
                return WSS_POLL_WAIT_ERROR;
754
            }
755
756
            WSS_log_trace("Session %d disconnecting", events[i].data.fd);
757
758
            wss_thread_args_t *args = (wss_thread_args_t *) WSS_malloc(sizeof(wss_thread_args_t));
759
            if ( unlikely(NULL == args) ) {
760
                WSS_log_fatal("Failed allocating threadpool argument");
761
                return WSS_MEMORY_ERROR;
762
            }
763
            args->server = server;
764
            args->fd = events[i].data.fd;
765
            args->state = CLOSING;
766
767
            if ( unlikely((err = WSS_add_to_threadpool(server, &WSS_work, (void *)args)) != WSS_SUCCESS) ) {
768
                WSS_log_fatal("Failed adding disconnect job to worker pool");
769
                WSS_free((void **)&args);
770
                return err;
771
            }
772
        } else if ( events[i].data.fd == server->fd ) {
773
            WSS_log_trace("New session connecting");
774
775
            wss_thread_args_t *args = (wss_thread_args_t *) WSS_malloc(sizeof(wss_thread_args_t));
776
            if ( unlikely(NULL == args) ) {
777
                WSS_log_fatal("Failed allocating threadpool argument");
778
                return WSS_MEMORY_ERROR;
779
            }
780
            args->server = server;
781
            args->state = CONNECTING;
782
783
            /**
784
             * If new session has connected
785
             */
786
            if ( unlikely((err = WSS_add_to_threadpool(server, &WSS_work, (void *)args)) != WSS_SUCCESS) ) {
787
                WSS_log_fatal("Failed adding connect job to worker pool");
788
                return err;
789
            }
790
        } else if ( unlikely(events[i].data.fd == close_pipefd[0]) ) {
791
            // Pipe file descriptor is used to interrupt blocking wait
792
            continue;
793
        } else if ( likely(events[i].data.fd == server->rearm_pipefd[0]) ) {
794
            handle_rearm(server);
795
            continue;
796
        } else {
797
            if ( events[i].events & EPOLLIN ) {
798
                WSS_log_trace("Session %d begins to read", events[i].data.fd);
799
800
                /**
801
                 * If new reads are ready
802
                 */
803
                wss_thread_args_t *args = (wss_thread_args_t *) WSS_malloc(sizeof(wss_thread_args_t));
804
                if ( unlikely(NULL == args) ) {
805
                    WSS_log_fatal("Failed allocating threadpool argument");
806
                    return WSS_MEMORY_ERROR;
807
                }
808
                args->server = server;
809
                args->fd = events[i].data.fd;
810
                args->state = READING;
811
812
                if ( unlikely((err = WSS_add_to_threadpool(server, &WSS_work, (void *)args)) != WSS_SUCCESS) ) {
813
                    WSS_log_fatal("Failed adding read job to worker pool");
814
                    WSS_free((void **)&args);
815
                    return err;
816
                }
817
            }
818
819
            if ( events[i].events & EPOLLOUT ) {
820
                WSS_log_trace("Session %d begins to write", events[i].data.fd);
821
822
                /**
823
                 * If new writes are ready
824
                 */
825
                wss_thread_args_t *args = (wss_thread_args_t *) WSS_malloc(sizeof(wss_thread_args_t));
826
                if ( unlikely(NULL == args) ) {
827
                    WSS_log_fatal("Failed allocating threadpool argument");
828
                    return WSS_MEMORY_ERROR;
829
                }
830
                args->server = server;
831
                args->fd = events[i].data.fd;
832
                args->state = WRITING;
833
834
                if ( unlikely((err = WSS_add_to_threadpool(server, &WSS_work, (void *)args)) != WSS_SUCCESS) ) {
835
                    WSS_log_fatal("Failed adding write job to worker pool");
836
                    WSS_free((void **)&args);
837
                    return err;
838
                }
839
            }
840
        }
841
    }
842
843
    return WSS_SUCCESS;
844
}
845
846
/******************************************************************************
847
 *                                    POLL                                    *
848
 ******************************************************************************/
849
850
#elif defined(WSS_POLL)
851
static unsigned int MAXEVENTS;
852
853
/**
854
 * Function that creates poll instance and adding the filedescriptor of the
855
 * servers socket to it.
856
 *
857
 * @param 	server      [wss_server_t *]    "A pointer to a server structure"
858
 * @return 			    [wss_error_t]       "The error status"
859
 */
860
wss_error_t WSS_poll_init(wss_server_t *server) {
861
    unsigned int i;
862
    wss_error_t err;
863
    struct rlimit limits;
864
    struct pollfd event;
865
    struct pollfd *events;
866
867
    if ( unlikely(getrlimit(RLIMIT_NOFILE, &limits) < 0) ) {
868
        return WSS_RLIMIT_ERROR;
869
    }
870
871
    MAXEVENTS = limits.rlim_cur;
872
873
    WSS_log_info("Using POLL");
874
875
    WSS_log_trace("Initializing poll events");
876
877
    if ( unlikely(NULL == (events = WSS_calloc(MAXEVENTS, sizeof(event)))) ) {
878
        WSS_log_fatal("Unable to calloc server poll events");
879
        return WSS_MEMORY_ERROR;
880
    }
881
882
    for (i = 0; i < MAXEVENTS; i++) {
883
        events[i].fd = -1;
884
    }
885
886
    server->events = events;
887
888
    if ( close_pipefd[0] == -1 && close_pipefd[1] == -1 ) {
889
        WSS_log_trace("Creating close pipe file descriptors");
890
891
        if ( unlikely(pipe(close_pipefd) == -1) ) {
892
            WSS_log_trace("Unable to create close pipe file descriptors: %s", strerror(errno));
893
            return WSS_POLL_PIPE_ERROR;
894
        }
895
896
        if ( (err = WSS_socket_non_blocking(close_pipefd[0])) != WSS_SUCCESS ) {
897
            return err;
898
        }
899
900
        if ( (err = WSS_socket_non_blocking(close_pipefd[1])) != WSS_SUCCESS ) {
901
            return err;
902
        }
903
    }
904
905
    WSS_log_trace("Arms close pipe file descriptor to poll instance");
906
907
    if ((err = WSS_poll_set_read(server, close_pipefd[0])) != WSS_SUCCESS) {
908
        return err;
909
    }
910
911
    WSS_log_trace("Creating arm pipe file descriptors");
912
913
    if ( unlikely(pipe(server->rearm_pipefd) == -1) ) {
914
        WSS_log_trace("Unable to create arm pipe file descriptors: %s", strerror(errno));
915
        return WSS_POLL_PIPE_ERROR;
916
    }
917
918
    if ( unlikely((err = WSS_socket_non_blocking(server->rearm_pipefd[0])) != WSS_SUCCESS) ) {
919
        return err;
920
    }
921
922
    if ( unlikely((err = WSS_socket_non_blocking(server->rearm_pipefd[1])) != WSS_SUCCESS) ) {
923
        return err;
924
    }
925
926
    WSS_log_trace("Arms arm pipe file descriptor to poll instance");
927
928
    if ( unlikely((err = WSS_poll_set_read(server, server->rearm_pipefd[0])) != WSS_SUCCESS)) {
929
        return err;
930
    }
931
932
    WSS_log_trace("Arms server file descriptor to poll instance");
933
934
    return WSS_poll_set_read(server, server->fd);
935
936
}
937
938
/**
939
 * Function that rearms the poll instance for write events with the clients
940
 * filedescriptor
941
 *
942
 * @param 	server      [wss_server_t *]    "A pointer to a server structure"
943
 * @param 	fd	        [int]	            "The clients file descriptor"
944
 * @return 			    [wss_error_t]       "The error status"
945
 */
946
wss_error_t WSS_poll_set_write(wss_server_t *server, int fd) {
947
    struct pollfd *events = (struct pollfd *) server->events;
948
949
    if ( unlikely((unsigned int)fd >= MAXEVENTS) ) {
950
        return WSS_POLL_SET_ERROR;
951
    }
952
953
    WSS_log_trace("Session %d (re)armed for write events on poll", fd);
954
955
    WSS_server_set_max_fd(server, fd);
956
957
    events[fd].fd = fd;
958
    events[fd].events = POLLOUT;
959
960
    if (fd != server->rearm_pipefd[0] && fd != close_pipefd[0] && fd != server->fd) {
961
        rearm(server);
962
    }
963
964
    return WSS_SUCCESS;
965
}
966
967
/**
968
 * Function that rearms the poll instance for read events with the clients
969
 * filedescriptor
970
 *
971
 * @param 	server      [wss_server_t *]    "A pointer to a server structure"
972
 * @param 	fd	        [int]	            "The clients file descriptor"
973
 * @return 			    [wss_error_t]       "The error status"
974
 */
975
wss_error_t WSS_poll_set_read(wss_server_t *server, int fd) {
976
    struct pollfd *events = (struct pollfd *) server->events;
977
978
    if ( unlikely((unsigned int)fd >= MAXEVENTS) ) {
979
        return WSS_POLL_SET_ERROR;
980
    }
981
982
    WSS_log_trace("Session %d (re)armed for read events on poll", fd);
983
984
    WSS_server_set_max_fd(server, fd);
985
986
    events[fd].fd = fd;
987
    events[fd].events = POLLPRI | POLLIN | POLLRDHUP;
988
989
    if (fd != server->rearm_pipefd[0] && fd != close_pipefd[0] && fd != server->fd) {
990
        rearm(server);
991
    }
992
993
    return WSS_SUCCESS;
994
}
995
996
/**
997
 * Function removes the client filedescriptor from the poll instance
998
 *
999
 * @param 	server      [wss_server_t *]	"A pointer to a server structure"
1000
 * @param 	fd	        [int]	            "The clients file descriptor"
1001
 * @return 			    [wss_error_t]       "The error status"
1002
 */
1003
wss_error_t WSS_poll_remove(wss_server_t *server, int fd) {
1004
    struct pollfd *events = (struct pollfd *) server->events;
1005
1006
    if ( unlikely((unsigned int)fd >= MAXEVENTS) ) {
1007
        return WSS_POLL_REMOVE_ERROR;
1008
    }
1009
1010
    WSS_log_trace("Removing session %d from poll", fd);
1011
1012
    events[fd].fd = -1;
1013
    events[fd].events = 0;
1014
1015
    if (fd != server->rearm_pipefd[0] && fd != close_pipefd[0] && fd != server->fd) {
1016
        rearm(server);
1017
    }
1018
1019
    return WSS_SUCCESS;
1020
}
1021
1022
/**
1023
 * Function that listens for new events on the servers file descriptor
1024
 *
1025
 * @param 	server	    [wss_server_t *]    "A pointer to a server structure"
1026
 * @return 			    [wss_error_t]       "The error status"
1027
 */
1028
wss_error_t WSS_poll_delegate(wss_server_t *server) {
1029
    int n, i;
1030
    int start = 0;
1031
    int fd;
1032
    wss_error_t err;
1033
    int end = server->max_fd;
1034
    struct pollfd *events = (struct pollfd *) server->events;
1035
1036
    while ( likely(start < end && events[start].fd == -1) ) {
1037
        ++start;
1038
    }
1039
1040
    while ( likely(start < end && events[end].fd == -1) ) {
1041
        --end;
1042
    }
1043
1044
    if ( unlikely(start == end && events[start].fd == -1) ) {
1045
        return WSS_POLL_WAIT_ERROR;
1046
    }
1047
1048
    if (server->ssl_ctx != NULL) {
1049
        WSS_log_trace("Listening for HTTPS poll events");
1050
    } else {
1051
        WSS_log_trace("Listening for HTTP poll events");
1052
    }
1053
1054
    do {
1055
        errno = 0;
1056
1057
        n = poll(&events[start], end-start+1, server->config->timeout_poll);
1058
1059
        if ( unlikely(n < 0) ) {
1060
            if ( unlikely(errno != EINTR) ) {
1061
                return WSS_POLL_WAIT_ERROR;
1062
            }
1063
        }
1064
    } while ( unlikely(errno == EINTR) );
1065
1066
    if (server->ssl_ctx != NULL) {
1067
        WSS_log_trace("Received %d HTTPS events", n);
1068
    } else {
1069
        WSS_log_trace("Received %d HTTP events", n);
1070
    }
1071
1072
    for (i = start; likely(i <= end); ++i) {
1073
        if ( likely(events[i].fd > 0 && events[i].revents) ) {
1074
            fd = events[i].fd;
1075
            if ( unlikely(events[i].revents & (POLLHUP | POLLERR | POLLRDHUP | POLLNVAL)) ) {
1076
                if ( unlikely(fd == server->fd) ) {
1077
                    WSS_log_fatal("A server error occured upon poll");
1078
                    return WSS_POLL_WAIT_ERROR;
1079
                }
1080
1081
                WSS_poll_remove(server, fd);
1082
1083
                WSS_log_trace("Session %d is disconnecting", fd);
1084
1085
                wss_thread_args_t *args = (wss_thread_args_t *) WSS_malloc(sizeof(wss_thread_args_t));
1086
                if ( unlikely(NULL == args) ) {
1087
                    WSS_log_fatal("Failed allocating threadpool argument");
1088
                    return WSS_MEMORY_ERROR;
1089
                }
1090
                args->server = server;
1091
                args->fd = fd;
1092
                args->state = CLOSING;
1093
1094
                if ( unlikely((err = WSS_add_to_threadpool(server, &WSS_work, (void *)args)) != WSS_SUCCESS) ) {
1095
                    WSS_log_fatal("Failed adding disconnect job to worker pool");
1096
                    WSS_free((void **)&args);
1097
                    return err;
1098
                }
1099
            } else if (fd == server->fd) {
1100
                WSS_log_trace("New session connecting");
1101
1102
                wss_thread_args_t *args = (wss_thread_args_t *) WSS_malloc(sizeof(wss_thread_args_t));
1103
                if ( unlikely(NULL == args) ) {
1104
                    WSS_log_fatal("Failed allocating threadpool argument");
1105
                    return WSS_MEMORY_ERROR;
1106
                }
1107
                args->server = server;
1108
                args->state = CONNECTING;
1109
1110
                /**
1111
                 * If new session has connected
1112
                 */
1113
                if ( unlikely((err = WSS_add_to_threadpool(server, &WSS_work, (void *)args)) != WSS_SUCCESS) ) {
1114
                    WSS_log_fatal("Failed adding connect job to worker pool");
1115
                    return err;
1116
                }
1117
            } else if ( unlikely(fd == close_pipefd[0]) ) {
1118
                continue;
1119
            } else if ( likely(fd == server->rearm_pipefd[0]) ) {
1120
                handle_rearm(server);
1121
                continue;
1122
            } else {
1123
                WSS_poll_remove(server, fd);
1124
1125
                if (events[i].revents & (POLLIN | POLLPRI)) {
1126
                    WSS_log_trace("Session %d begins to read", fd);
1127
1128
                    /**
1129
                     * If new reads are ready
1130
                     */
1131
                    wss_thread_args_t *args = (wss_thread_args_t *) WSS_malloc(sizeof(wss_thread_args_t));
1132
                    if ( unlikely(NULL == args) ) {
1133
                        WSS_log_fatal("Failed allocating threadpool argument");
1134
                        return WSS_MEMORY_ERROR;
1135
                    }
1136
                    args->server = server;
1137
                    args->fd = fd;
1138
                    args->state = READING;
1139
1140
                    if ( unlikely((err = WSS_add_to_threadpool(server, &WSS_work, (void *)args)) != WSS_SUCCESS) ) {
1141
                        WSS_log_fatal("Failed adding read job to worker pool");
1142
                        WSS_free((void **)&args);
1143
                        return err;
1144
                    }
1145
                }
1146
1147
                if (events[i].revents & POLLOUT) {
1148
                    WSS_log_trace("Session %d begins to write", fd);
1149
1150
                    /**
1151
                     * If new writes are ready
1152
                     */
1153
                    wss_thread_args_t *args = (wss_thread_args_t *) WSS_malloc(sizeof(wss_thread_args_t));
1154
                    if ( unlikely(NULL == args) ) {
1155
                        WSS_log_fatal("Failed allocating threadpool argument");
1156
                        return WSS_MEMORY_ERROR;
1157
                    }
1158
                    args->server = server;
1159
                    args->fd = fd;
1160
                    args->state = WRITING;
1161
1162
                    if ( unlikely((err = WSS_add_to_threadpool(server, &WSS_work, (void *)args)) != WSS_SUCCESS) ) {
1163
                        WSS_log_fatal("Failed adding write job to worker pool");
1164
                        WSS_free((void **)&args);
1165
                        return err;
1166
                    }
1167
                }
1168
            }
1169
        }
1170
    }
1171
1172
    return WSS_SUCCESS;
1173
}
1174
#endif
1175
1176
/**
1177
 * Function that cleanup poll function when closing
1178
 *
1179
 * @param 	server	    [wss_server_t *]	"A pointer to a server structure"
1180
 * @return 			    [wss_error_t]       "The error status"
1181
 */
1182
wss_error_t WSS_poll_close(wss_server_t *server) {
1183
    if ( likely(server->rearm_pipefd[0] != -1) ) {
1184
        close(server->rearm_pipefd[0]);
1185
        server->rearm_pipefd[0] = -1;
1186
    }
1187
1188
    if ( likely(server->rearm_pipefd[1] != -1) ) {
1189
        close(server->rearm_pipefd[1]);
1190
        server->rearm_pipefd[1] = -1;
1191
    }
1192
1193
    if ( likely(close_pipefd[0] != -1) ) {
1194
        close(close_pipefd[0]);
1195
        close_pipefd[0] = -1;
1196
    }
1197
1198
    if ( likely(close_pipefd[1] != -1) ) {
1199
        close(close_pipefd[1]);
1200
        close_pipefd[1] = -1;
1201
    }
1202
1203
    return WSS_SUCCESS;
1204
}