GCC Code Coverage Report
Directory: ./ Exec Total Coverage
File: src/worker.c Lines: 0 886 0.0 %
Date: 2020-12-10 21:44:00 Branches: 0 368 0.0 %

Line Branch Exec Source
1
2
#include <errno.h> 				/* errno */
3
#include <stdio.h> 				/* printf, fflush, fprintf, fopen, fclose */
4
#include <string.h>             /* strerror, memset, strncpy, memcpy, strlen */
5
#include <stdlib.h>             /* atoi, malloc, free, realloc */
6
#include <unistd.h>             /* close, read, write */
7
#include <math.h> 				/* log10 */
8
#include <time.h>
9
#include <locale.h>
10
11
#include <sys/types.h>          /* socket, setsockopt, accept, send, recv */
12
#include <sys/stat.h> 			/* stat */
13
#include <sys/socket.h>         /* socket, setsockopt, inet_ntoa, accept */
14
#include <sys/select.h>         /* socket, setsockopt, inet_ntoa, accept */
15
#include <netinet/in.h>         /* sockaddr_in, inet_ntoa */
16
#include <arpa/inet.h>          /* htonl, htons, inet_ntoa */
17
18
#include <pthread.h> 			/* pthread_create, pthread_t, pthread_attr_t
19
                                   pthread_mutex_init */
20
21
#include "alloc.h"
22
#include "worker.h"
23
#include "server.h"
24
#include "event.h"
25
#include "log.h"
26
#include "session.h"
27
#include "socket.h"
28
#include "error.h"
29
#include "header.h"
30
#include "frame.h"
31
#include "message.h"
32
#include "config.h"
33
#include "httpstatuscodes.h"
34
#include "str.h"
35
#include "utf8.h"
36
#include "error.h"
37
#include "predict.h"
38
#include "ssl.h"
39
40
/**
41
 * Function that generates a handshake response, used to authorize a websocket
42
 * session.
43
 *
44
 * @param   header  [wss_header_t*]         "The http header obtained from the session"
45
 * @param   code    [enum HttpStatus_Code]  "The http status code to return"
46
 * @return          [wss_message_t *]       "A message structure that can be passed through ringbuffer"
47
 */
48
static wss_message_t *handshake_response(wss_header_t *header, enum HttpStatus_Code code) {
49
    wss_message_t *msg;
50
    int length;
51
    size_t j;
52
    size_t offset = 0;
53
    char *message;
54
    char *version = "HTTP/1.1";
55
    const char *reason = HttpStatus_reasonPhrase(code);
56
    int line = 0, headers = 0;
57
    char sha1Key[SHA_DIGEST_LENGTH];
58
    int magic_length = strlen(MAGIC_WEBSOCKET_KEY);
59
    int key_length = strlen(header->ws_key) + magic_length;
60
    char key[key_length];
61
    char *acceptKey;
62
    size_t acceptKeyLength;
63
64
    if ( likely(NULL != header->version) ) {
65
        version = header->version;
66
    }
67
68
    line += strlen(HTTP_STATUS_LINE)*sizeof(char)-6*sizeof(char);
69
    line += strlen(version)*sizeof(char);
70
    line += (log10(code)+1)*sizeof(char);
71
    line += strlen(reason)*sizeof(char);
72
73
    // Generate accept key
74
    memset(key, '\0', key_length);
75
    memset(sha1Key, '\0', SHA_DIGEST_LENGTH);
76
    memcpy(key+(key_length-magic_length), MAGIC_WEBSOCKET_KEY, magic_length);
77
    memcpy(key, header->ws_key, (key_length-magic_length));
78
79
    acceptKeyLength = WSS_base64_encode_sha1(key, key_length, &acceptKey);
80
    if (acceptKeyLength == 0) {
81
        WSS_free((void **)&acceptKey);
82
        return NULL;
83
    }
84
85
    if ( NULL != header->ws_extensions ) {
86
        headers += strlen(HTTP_HANDSHAKE_EXTENSIONS)*sizeof(char);
87
        for (j = 0; likely(j < header->ws_extensions_count); j++) {
88
            headers += (strlen(header->ws_extensions[j]->name))*sizeof(char);
89
            if ( likely(NULL != header->ws_extensions[j]->accepted) ) {
90
                headers += strlen(header->ws_extensions[j]->accepted)*sizeof(char);
91
            }
92
            headers += 2*sizeof(char);
93
        }
94
        headers += (header->ws_extensions_count)*sizeof(char);
95
    }
96
97
    if ( NULL != header->ws_protocol ) {
98
        headers += strlen(HTTP_HANDSHAKE_SUBPROTOCOL)*sizeof(char);
99
        headers += strlen(header->ws_protocol->name)*sizeof(char);
100
        headers += 2*sizeof(char);
101
    }
102
103
    headers += strlen(HTTP_HANDSHAKE_ACCEPT)*sizeof(char);
104
    headers += acceptKeyLength*sizeof(char);
105
    headers += 2*sizeof(char);
106
    headers += strlen(HTTP_HANDSHAKE_HEADERS)*sizeof(char)-2*sizeof(char);
107
    headers += strlen(WSS_SERVER_VERSION)*sizeof(char);
108
109
    length = line + headers;
110
    if ( unlikely(NULL == (message = (char *) WSS_malloc((length+1)*sizeof(char)))) ) {
111
        WSS_free((void **)&acceptKey);
112
        return NULL;
113
    }
114
115
    sprintf(message+offset, HTTP_STATUS_LINE, version, code, reason);
116
    offset += line;
117
    if ( NULL != header->ws_extensions ) {
118
        sprintf(message+offset, HTTP_HANDSHAKE_EXTENSIONS);
119
        offset += strlen(HTTP_HANDSHAKE_EXTENSIONS);
120
        for (j = 0; likely(j < header->ws_extensions_count); j++) {
121
            memcpy(message+offset, header->ws_extensions[j]->name, strlen(header->ws_extensions[j]->name));
122
            offset += strlen(header->ws_extensions[j]->name);
123
124
            if ( likely(NULL != header->ws_extensions[j]->accepted) ) {
125
                message[offset] = ';';
126
                offset += 1;
127
128
                memcpy(message+offset, header->ws_extensions[j]->accepted, strlen(header->ws_extensions[j]->accepted));
129
                offset += strlen(header->ws_extensions[j]->accepted);
130
            }
131
132
            if ( likely(j+1 != header->ws_extensions_count) ) {
133
                message[offset] = ',';
134
                offset += 1;
135
            }
136
        }
137
        sprintf(message+offset, "\r\n");
138
        offset += 2;
139
    }
140
    if (NULL != header->ws_protocol) {
141
        sprintf(message+offset, HTTP_HANDSHAKE_SUBPROTOCOL);
142
        offset += strlen(HTTP_HANDSHAKE_SUBPROTOCOL);
143
        memcpy(message+offset, header->ws_protocol->name, strlen(header->ws_protocol->name));
144
        offset += strlen(header->ws_protocol->name);
145
        sprintf(message+offset, "\r\n");
146
        offset += 2;
147
    }
148
    sprintf(message+offset, HTTP_HANDSHAKE_ACCEPT);
149
    offset += strlen(HTTP_HANDSHAKE_ACCEPT);
150
    memcpy(message+offset, acceptKey, acceptKeyLength);
151
    offset += acceptKeyLength;
152
    sprintf(message+offset, "\r\n");
153
    offset += 2;
154
    sprintf(message+offset, HTTP_HANDSHAKE_HEADERS, WSS_SERVER_VERSION);
155
    WSS_free((void **) &acceptKey);
156
157
    if ( unlikely(NULL == (msg = (wss_message_t *) WSS_malloc(sizeof(wss_message_t)))) ) {
158
        WSS_free((void **) &message);
159
        return NULL;
160
    }
161
162
    WSS_log_debug("Handshake Response: \n%s", message);
163
164
    msg->msg = message;
165
    msg->length = length;
166
167
    return msg;
168
}
169
170
/**
171
 * Function that generates a favicon response, used to present a favicon on the
172
 * HTTP landing page of the WSS server.
173
 *
174
 * @param   header  [wss_header_t *]        "The http header obtained from the session"
175
 * @param   code    [enum HttpStatus_Code]  "The http status code to return"
176
 * @param   config  [wss_config_t *]        "The configuration of the server"
177
 * @return          [message_t *]           "A message structure that can be passed through ringbuffer"
178
 */
179
static wss_message_t *favicon_response(wss_header_t *header, enum HttpStatus_Code code, wss_config_t *config) {
180
    time_t now;
181
    struct tm tm;
182
    char *ico;
183
    char *etag;
184
    char *message;
185
    wss_message_t *msg;
186
    size_t length;
187
    struct stat filestat;
188
    char sha1Key[SHA_DIGEST_LENGTH];
189
    char date[GMT_FORMAT_LENGTH];
190
    char modified[GMT_FORMAT_LENGTH];
191
    char savedlocale[256];
192
    char *version = "HTTP/1.1";
193
    const char *reason  = HttpStatus_reasonPhrase(code);
194
    int icon = 0, line = 0, headers = 0;
195
196
    // Get GMT current time
197
    strncpy(savedlocale, setlocale(LC_ALL, NULL), 255);
198
    setlocale(LC_ALL, "C");
199
    now = time(0);
200
    tm = *gmtime(&now);
201
    memset(date, '\0', GMT_FORMAT_LENGTH);
202
    memset(modified, '\0', GMT_FORMAT_LENGTH);
203
    strftime(date, sizeof date, "%a, %d %b %Y %H:%M:%S %Z", &tm);
204
    setlocale(LC_ALL, savedlocale);
205
206
    if ( likely(NULL != header->version) ) {
207
        version = header->version;
208
    }
209
210
    line += strlen(HTTP_STATUS_LINE)*sizeof(char)-6*sizeof(char);
211
    line += strlen(version)*sizeof(char);
212
    line += (log10(code)+1)*sizeof(char);
213
    line += strlen(reason)*sizeof(char);
214
215
    // Read the content of the favicon file
216
    if (NULL != config->favicon && access(config->favicon, F_OK) != -1) {
217
        // Get last modified
218
        stat(config->favicon, &filestat);
219
        setlocale(LC_ALL, "C");
220
        tm = *gmtime(&filestat.st_mtime);
221
        strftime(modified, sizeof modified, "%a, %d %b %Y %H:%M:%S %Z", &tm);
222
        setlocale(LC_ALL, savedlocale);
223
224
        icon = strload(config->favicon, &ico);
225
    } else {
226
        return NULL;
227
    }
228
229
    // Generate etag from favicon content
230
    WSS_sha1(ico, icon, (char **)&sha1Key);
231
232
    // Convert etag binary values to hex values
233
    if ( unlikely(NULL == (etag = bin2hex((const unsigned char *)sha1Key, SHA_DIGEST_LENGTH))) ) {
234
        WSS_free((void **) &ico);
235
        return NULL;
236
    }
237
238
    headers += strlen(HTTP_ICO_HEADERS)*sizeof(char)-10*sizeof(char);
239
    headers += (log10(icon)+1);
240
    headers += strlen(date)*sizeof(char);
241
    headers += strlen(WSS_SERVER_VERSION)*sizeof(char);
242
    headers += strlen(etag)*sizeof(char);
243
    headers += strlen(modified)*sizeof(char);
244
245
    length = line + headers + icon + 1;
246
    if ( unlikely(NULL == (message = (char *) WSS_malloc(length*sizeof(char)))) ) {
247
        WSS_free((void **) &etag);
248
        WSS_free((void **) &ico);
249
        return NULL;
250
    }
251
252
    sprintf(message, HTTP_STATUS_LINE, version, code, reason);
253
    sprintf(message+line, HTTP_ICO_HEADERS, icon, date, WSS_SERVER_VERSION, etag, modified);
254
    memcpy(message+(line+headers), ico, icon);
255
256
    msg = (wss_message_t *) WSS_malloc(sizeof(wss_message_t));
257
    msg->msg = message;
258
    msg->length = length;
259
260
    WSS_free((void **) &etag);
261
    WSS_free((void **) &ico);
262
263
    return msg;
264
}
265
266
/**
267
 * Function that generates an HTTP upgrade response, used to tell the connecting
268
 * client that an upgrade is needed in order to use the server.
269
 *
270
 * @param   header  [wss_header_t *]        "The http header obtained from the session"
271
 * @param   code    [enum HttpStatus_Code]  "The http status code to return"
272
 * @param   exp     [char *]                "An explanation of what caused this response"
273
 * @return          [message_t *]           "A message structure that can be passed through ringbuffer"
274
 */
275
static wss_message_t *upgrade_response(wss_header_t *header, enum HttpStatus_Code code, char *exp) {
276
    wss_message_t *msg;
277
    int length;
278
    char *message;
279
    char *version = "HTTP/1.1";
280
    const char *reason = HttpStatus_reasonPhrase(code);
281
    int body = 0, line = 0, headers = 0;
282
283
    if ( likely(NULL != header->version) ) {
284
        version = header->version;
285
    }
286
287
    line += strlen(HTTP_STATUS_LINE)*sizeof(char)-6*sizeof(char);
288
    line += strlen(version)*sizeof(char);
289
    line += (log10(code)+1)*sizeof(char);
290
    line += strlen(reason)*sizeof(char);
291
292
    body += strlen(exp)*sizeof(char);
293
294
    headers += strlen(HTTP_UPGRADE_HEADERS)*sizeof(char)-4*sizeof(char);
295
    headers += strlen(WSS_SERVER_VERSION)*sizeof(char);
296
    headers += (log10(body)+1)*sizeof(char);
297
298
    length = line + headers + body + 1;
299
    if ( unlikely(NULL == (message = (char *) WSS_malloc(length*sizeof(char)))) ) {
300
        return NULL;
301
    }
302
303
    sprintf(message, HTTP_STATUS_LINE, version, code, reason);
304
    sprintf(message+line, HTTP_UPGRADE_HEADERS, body, WSS_SERVER_VERSION);
305
    sprintf(message+line+headers, "%s", exp);
306
307
    if ( unlikely(NULL == (msg = (wss_message_t *) WSS_malloc(sizeof(wss_message_t)))) ) {
308
        WSS_free((void **) &message);
309
        return NULL;
310
    }
311
    msg->msg = message;
312
    msg->length = length;
313
314
    return msg;
315
}
316
317
/**
318
 * Function that generates a HTTP error response, used to tell the connecting
319
 * client that an error occured.
320
 *
321
 * @param   header  [wss_header_t *]        "The http header obtained from the session"
322
 * @param   code    [enum HttpStatus_Code]  "The http status code to return"
323
 * @param   exp     [char *]                "An explanation of what caused this response"
324
 * @return          [message_t *]           "A message structure that can be passed through ringbuffer"
325
 */
326
static wss_message_t *http_response(wss_header_t *header, enum HttpStatus_Code code, char *exp) {
327
    time_t now;
328
    struct tm tm;
329
    wss_message_t *msg;
330
    int length;
331
    char *message;
332
    char date[GMT_FORMAT_LENGTH];
333
    char savedlocale[256];
334
    char *version = "HTTP/1.1";
335
    const char *reason  = HttpStatus_reasonPhrase(code);
336
    int body = 0, line = 0, headers = 0, support = 0;
337
338
    savedlocale[255] = '\0';
339
340
    // Get GMT current time
341
    strncpy(savedlocale, setlocale(LC_ALL, NULL), 255);
342
    setlocale(LC_ALL, "C");
343
    now = time(0);
344
    tm = *gmtime(&now);
345
    strftime(date, sizeof date, "%a, %d %b %Y %H:%M:%S %Z", &tm);
346
    setlocale(LC_ALL, savedlocale);
347
348
    if ( likely(NULL != header->version) ) {
349
        version = header->version;
350
    }
351
352
    line += strlen(HTTP_STATUS_LINE)*sizeof(char)-6*sizeof(char);
353
    line += strlen(version)*sizeof(char);
354
    line += (log10(code)+1)*sizeof(char);
355
    line += strlen(reason)*sizeof(char);
356
357
    body += strlen(HTTP_BODY)*sizeof(char)-8*sizeof(char);
358
    body += (log10(code)+1)*sizeof(char);
359
    body += strlen(reason)*sizeof(char);
360
    body += strlen(reason)*sizeof(char);
361
    body += strlen(exp)*sizeof(char);
362
363
    if (code == HttpStatus_NotImplemented) {
364
        support = strlen(HTTP_WS_VERSION_HEADER)*sizeof(char)-6*sizeof(char);
365
    }
366
    headers += strlen(HTTP_HTML_HEADERS)*sizeof(char)-6*sizeof(char);
367
    headers += strlen(WSS_SERVER_VERSION)*sizeof(char);
368
    headers += strlen(date)*sizeof(char);
369
    headers += (log10(body)+1)*sizeof(char);
370
371
    length = line + support + headers + body + 1;
372
    if ( unlikely(NULL == (message = (char *) WSS_malloc(length*sizeof(char)))) ) {
373
        return NULL;
374
    }
375
376
    sprintf(message, HTTP_STATUS_LINE, version, code, reason);
377
    if (code == HttpStatus_NotImplemented) {
378
        sprintf(message+support, HTTP_WS_VERSION_HEADER, RFC6455, HYBI10, HYBI07);
379
    }
380
    sprintf(message+support+line, HTTP_HTML_HEADERS, body, date, WSS_SERVER_VERSION);
381
    sprintf(message+support+line+headers, HTTP_BODY, code, reason, reason, exp);
382
383
    if ( unlikely(NULL == (msg = (wss_message_t *) WSS_malloc(sizeof(wss_message_t)))) ) {
384
        WSS_free((void **) &message);
385
        return NULL;
386
    }
387
    msg->msg = message;
388
    msg->length = length;
389
390
    return msg;
391
}
392
393
/**
394
 * Function that disconnects a session and freeing any allocated memory used by
395
 * the session.
396
 *
397
 * @param 	server	[wss_server_t *] 	"The server structure"
398
 * @param 	session	[wss_session_t *] 	"The session structure"
399
 * @return          [void]
400
 */
401
void WSS_disconnect(wss_server_t *server, wss_session_t *session) {
402
    int i;
403
    wss_error_t err;
404
    bool dc;
405
406
    // If we are already closing
407
    WSS_session_is_disconnecting(session, &dc);
408
    if (dc) {
409
        return;
410
    }
411
412
    WSS_session_jobs_wait(session);
413
414
    session->state = CLOSING;
415
416
    WSS_log_trace("Informing subprotocol of client with file descriptor %d disconnecting", session->fd);
417
418
    if ( NULL != session->header && session->header->ws_protocol != NULL ) {
419
        WSS_log_trace("Informing subprotocol about session close");
420
        session->header->ws_protocol->close(session->fd);
421
    }
422
423
    WSS_log_trace("Removing poll filedescriptor from eventlist");
424
425
    WSS_poll_remove(server, session->fd);
426
427
    WSS_log_trace("Deleting client session");
428
429
    if (NULL == server->ssl_ctx) {
430
        WSS_log_info("Session %d disconnected from ip: %s:%d using HTTP request", session->fd, session->ip, session->port);
431
    } else {
432
        WSS_log_info("Session %d disconnected from ip: %s:%d using HTTPS request", session->fd, session->ip, session->port);
433
    }
434
435
    for (i = 0; i < session->jobs; i++) {
436
        pthread_mutex_unlock(&session->lock);
437
    }
438
439
    if ( unlikely(WSS_SUCCESS != (err = WSS_session_delete(session))) ) {
440
        switch (err) {
441
            case WSS_SSL_SHUTDOWN_READ_ERROR:
442
                session->event = READ;
443
                return;
444
            case WSS_SSL_SHUTDOWN_WRITE_ERROR:
445
                session->event = WRITE;
446
                return;
447
            default:
448
                break;
449
        }
450
        WSS_log_error("Unable to delete client session, received error code: %d", err);
451
        return;
452
    }
453
}
454
455
/**
456
 * Function that handles new connections. This function creates a new session and
457
 * associates the sessions filedescriptor to the epoll instance such that we can
458
 * start communicating with the session.
459
 *
460
 * @param 	server	[wss_server_t *] 	"The server structure"
461
 * @param 	session	[wss_session_t *] 	"The session structure"
462
 * @return          [void]
463
 */
464
void WSS_connect(wss_server_t *server) {
465
    int client_fd;
466
    struct sockaddr_in client;
467
    size_t ringbuf_obj_size;
468
    socklen_t client_size;
469
    wss_session_t *session;
470
    ringbuf_t *ringbuf;
471
    size_t workers = server->config->pool_workers+1;
472
473
    client_size	= sizeof(client);
474
    memset((char *) &client, '\0', sizeof(client));
475
476
    while (1) {
477
        if ( (client_fd = accept(server->fd, (struct sockaddr *) &client,
478
                        &client_size)) < 0 ) {
479
            if ( likely(EAGAIN == errno || EWOULDBLOCK == errno) ) {
480
                break;
481
            }
482
483
            WSS_log_fatal("Accept failed: %s", strerror(errno));
484
            break;
485
        }
486
487
        WSS_log_trace("Received incoming connection");
488
489
        WSS_socket_non_blocking(client_fd);
490
491
        WSS_log_trace("Client filedescriptor was set to non-blocking");
492
493
        if ( unlikely(NULL == (session = WSS_session_add(client_fd,
494
                        inet_ntoa(client.sin_addr), ntohs(client.sin_port)))) ) {
495
            continue;
496
        }
497
498
        WSS_session_jobs_inc(session);
499
        pthread_mutex_lock(&session->lock);
500
501
        session->state = CONNECTING;
502
        WSS_log_trace("Created client session: %d", client_fd);
503
504
        // Creating ringbuffer for session
505
        ringbuf_get_sizes(0, workers, &ringbuf_obj_size, NULL);
506
        if ( unlikely(NULL == (ringbuf = WSS_malloc(ringbuf_obj_size))) ) {
507
            WSS_log_fatal("Failed to allocate memory for ringbuffer");
508
            WSS_disconnect(server, session);
509
            return;
510
        }
511
512
        if ( unlikely(NULL == (session->messages = WSS_malloc(server->config->size_ringbuffer*sizeof(wss_message_t *)))) ) {
513
            WSS_log_fatal("Failed to allocate memory for ringbuffer messages");
514
            WSS_free((void **)&ringbuf);
515
            WSS_disconnect(server, session);
516
            return;
517
        }
518
        session->messages_count = server->config->size_ringbuffer;
519
520
        ringbuf_setup(ringbuf, 0, workers, server->config->size_ringbuffer);
521
        session->ringbuf = ringbuf;
522
523
        if (NULL == server->ssl_ctx) {
524
            WSS_log_trace("User connected from ip: %s:%d using HTTP request", session->ip, session->port);
525
        } else {
526
            WSS_log_trace("User connected from ip: %s:%d using HTTPS request", session->ip, session->port);
527
        }
528
529
        if (NULL != server->ssl_ctx) {
530
            if (! WSS_session_ssl(server, session)) {
531
                WSS_free((void **)&ringbuf);
532
                return;
533
            }
534
535
            WSS_ssl_handshake(server, session);
536
        } else {
537
            session->state = IDLE;
538
539
            clock_gettime(CLOCK_MONOTONIC, &session->alive);
540
541
            WSS_poll_set_read(server, session->fd);
542
543
            WSS_log_info("Client with session %d connected", session->fd);
544
545
        }
546
547
        WSS_session_jobs_dec(session);
548
        pthread_mutex_unlock(&session->lock);
549
    }
550
}
551
552
/**
553
 * Puts a message in the client sessions writing buffer.
554
 *
555
 * @param 	session     [wss_session_t *] 	"The client session"
556
 * @param 	message     [wss_message_t *] 	"The message to send"
557
 * @return              [void]
558
 */
559
static wss_error_t write_internal(wss_session_t *session, wss_message_t *mes) {
560
    ssize_t off;
561
    ringbuf_worker_t *w = NULL;
562
563
    WSS_log_trace("Putting message into ringbuffer");
564
565
    if ( unlikely(-1 == (off = ringbuf_acquire(session->ringbuf, &w, 1))) ) {
566
        WSS_free((void **) &mes->msg);
567
        WSS_free((void **) &mes);
568
569
        WSS_log_error("Failed to acquire space in ringbuffer");
570
571
        return WSS_RINGBUFFER_ERROR;
572
    }
573
    session->messages[off] = mes;
574
    ringbuf_produce(session->ringbuf, &w);
575
576
    return WSS_SUCCESS;
577
}
578
579
/**
580
 * Performs the actual IO read operation using either the read or SSL_read
581
 * system calls.
582
 *
583
 * @param 	server      [wss_server_t *] 	"A server instance"
584
 * @param 	session     [wss_session_t *] 	"The client session"
585
 * @param 	buffer      [char *] 		    "The buffer to put the data into"
586
 * @return              [int]               "The amount of bytes read or -1 for error or -2 for wait for IO"
587
 */
588
static int read_internal(wss_server_t *server, wss_session_t *session, char *buffer) {
589
    int n;
590
591
    if ( NULL != session->ssl ) {
592
        n = WSS_ssl_read(server, session, buffer);
593
    } else {
594
        do {
595
            n = read(session->fd, buffer, server->config->size_buffer);
596
            if (n == -1) {
597
                if ( unlikely(errno == EINTR) ) {
598
                    errno = 0;
599
                    continue;
600
                } else if ( unlikely(errno != EAGAIN && errno != EWOULDBLOCK) ) {
601
                    WSS_log_error("Read failed: %s", strerror(errno));
602
                    session->closing = true;
603
                } else {
604
                    n = 0;
605
                }
606
            }
607
        } while ( unlikely(0) );
608
    }
609
610
    return n;
611
}
612
613
/**
614
 * Performs a websocket handshake with the client session
615
 *
616
 * @param 	server      [wss_server_t *] 	"A server instance"
617
 * @param 	session     [wss_session_t *] 	"The client session"
618
 * @return              [void]
619
 */
620
static void handshake(wss_server_t *server, wss_session_t *session) {
621
    int n;
622
    wss_header_t *header;
623
    wss_message_t *message;
624
    enum HttpStatus_Code code;
625
    char *buffer;
626
627
    if ( unlikely(NULL == (buffer = WSS_malloc(server->config->size_buffer)))) {
628
        WSS_log_fatal("Unable to allocate buffer");
629
        return;
630
    }
631
632
    WSS_log_trace("Preparing client header");
633
634
    if ( unlikely(NULL != session->header) ) {
635
        header = session->header;
636
        session->header = NULL;
637
    } else {
638
        if ( unlikely(NULL == (header = WSS_malloc(sizeof(wss_header_t)))) ) {
639
            WSS_log_fatal("Unable to allocate header");
640
            return;
641
        }
642
643
        header->content             = NULL;
644
        header->method              = NULL;
645
        header->version             = NULL;
646
        header->path                = NULL;
647
        header->host                = NULL;
648
        header->payload             = NULL;
649
        header->length              = 0;
650
        header->ws_version          = 0;
651
        header->ws_type             = UNKNOWN;
652
        header->ws_protocol         = NULL;
653
        header->ws_upgrade          = NULL;
654
        header->ws_connection       = NULL;
655
        header->ws_extensions       = NULL;
656
        header->ws_extensions_count = 0;
657
        header->ws_origin           = NULL;
658
        header->ws_key              = NULL;
659
        header->ws_key1             = NULL;
660
        header->ws_key2             = NULL;
661
        header->ws_key3             = NULL;
662
    }
663
664
    WSS_log_trace("Reading headers");
665
666
    // Continue reading until we get no bytes back
667
    do {
668
        n = read_internal(server, session, buffer);
669
670
        switch (n) {
671
            // Wait for IO for either read or write on the filedescriptor
672
            case -2:
673
                session->header = header;
674
675
                session->event = WRITE;
676
677
                return;
678
            // An error occured, notify client by writing back to it
679
            case -1:
680
                WSS_log_trace("Rejecting HTTP request due to being unable to read from client");
681
682
                message = http_response(header, HttpStatus_InternalServerError, "Unable to read from client");
683
                WSS_free_header(header);
684
685
                if ( likely(NULL != message && WSS_SUCCESS == write_internal(session, message)) ) {
686
                    session->state = WRITING;
687
                    session->event = READ;
688
                    WSS_write(server, session);
689
                }
690
691
                return;
692
            case 0:
693
                break;
694
            default:
695
                // Reallocate space for the header and copy buffer into it
696
                if ( unlikely(NULL == (header->content = WSS_realloc((void **) &header->content, header->length*sizeof(char), (header->length+n+1)*sizeof(char)))) ) {
697
                    WSS_log_fatal("Unable to realloc header content");
698
                    return;
699
                }
700
                memcpy(header->content+header->length, buffer, n);
701
                header->length += n;
702
                memset(buffer, '\0', server->config->size_buffer);
703
704
                // Check if payload from client is too large for the server to handle.
705
                // If so write error back to the client
706
                if ( unlikely(header->length > (server->config->size_header+server->config->size_uri+server->config->size_payload)) ) {
707
                    WSS_log_trace("Rejecting HTTP request as client payload is too large for the server to handle");
708
709
                    message = http_response(header, HttpStatus_PayloadTooLarge,
710
                            "The given payload is too large for the server to handle");
711
                    WSS_free_header(header);
712
713
                    if ( likely(NULL != message && WSS_SUCCESS == write_internal(session, message)) ) {
714
                        session->state = WRITING;
715
                        session->event = READ;
716
                        WSS_write(server, session);
717
                    }
718
719
                    return;
720
                }
721
                break;
722
        }
723
    } while ( likely(n != 0) );
724
725
    WSS_free((void **)&buffer);
726
727
    WSS_log_debug("Client header: \n%s", header->content);
728
729
    WSS_log_trace("Starting parsing header received from client");
730
731
    // Parsing HTTP header
732
    // If header could not be parsed correctly, notify client with response.
733
    if ( (code = WSS_parse_header(session->fd, header, server->config)) != HttpStatus_OK ) {
734
        WSS_log_trace("Rejecting HTTP request due to header not being correct");
735
736
        message = http_response(header, code, (char *)HttpStatus_reasonPhrase(code));
737
        WSS_free_header(header);
738
739
        if ( likely(NULL != message && WSS_SUCCESS == write_internal(session, message)) ) {
740
            session->state = WRITING;
741
            session->event = READ;
742
            WSS_write(server, session);
743
        }
744
745
        return;
746
    }
747
748
    // Serve favicon
749
    if (strlen(header->path) == 12 && strncmp(header->path, "/favicon.ico", 12) == 0) {
750
        message = favicon_response(header, code, server->config);
751
        if (NULL != message) {
752
            WSS_free_header(header);
753
754
            WSS_log_trace("Serving a favicon to the client");
755
756
            // Find and serve favicon.
757
            if ( likely(WSS_SUCCESS == write_internal(session, message)) ) {
758
                session->state = WRITING;
759
                session->event = READ;
760
                WSS_write(server, session);
761
            }
762
        } else {
763
            WSS_log_trace("Rejecting HTTP request due to favicon not being available");
764
765
            // Else notify client that favicon could not be found
766
            code = HttpStatus_NotFound;
767
            message = http_response(header, code, (char *)HttpStatus_reasonPhrase(code));
768
            WSS_free_header(header);
769
770
            if ( likely(NULL != message && WSS_SUCCESS == write_internal(session, message)) ) {
771
                session->state = WRITING;
772
                session->event = READ;
773
                WSS_write(server, session);
774
            }
775
        }
776
777
        return;
778
    }
779
780
    WSS_log_trace("Header successfully parsed");
781
782
    // Create Upgrade HTTP header based on clients header
783
    code = WSS_upgrade_header(header, server->config, server->re);
784
    switch (code) {
785
        case HttpStatus_UpgradeRequired:
786
            WSS_log_trace("Rejecting HTTP request as the service requires use of the Websocket protocol.");
787
            message = upgrade_response(header, code,
788
                    "This service requires use of the Websocket protocol.");
789
            break;
790
        case HttpStatus_NotImplemented:
791
            WSS_log_trace("Rejecting HTTP request as Websocket protocol is not yet implemented");
792
            message = http_response(header, code,
793
                    "Websocket protocol is not yet implemented");
794
            break;
795
        case HttpStatus_SwitchingProtocols:
796
            message = handshake_response(header, code);
797
            break;
798
        case HttpStatus_NotFound:
799
            WSS_log_trace("Rejecting HTTP request as the page requested was not found.");
800
            message = http_response(header, code,
801
                    (char *)HttpStatus_reasonPhrase(code));
802
            break;
803
        case HttpStatus_Forbidden:
804
            WSS_log_trace("Rejecting HTTP request as the origin is not allowed to establish a websocket connection.");
805
            message = http_response(header, code,
806
                    "The origin is not allowed to establish a websocket connection.");
807
            break;
808
        default:
809
            WSS_log_trace("Rejecting HTTP request as server was unable to parse http header as websocket request");
810
            message = http_response(header, code,
811
                    (char *)HttpStatus_reasonPhrase(code));
812
            break;
813
    }
814
815
    // If code status isnt switching protocols, we notify client with a HTTP error
816
    if (code != HttpStatus_SwitchingProtocols) {
817
        WSS_log_trace("Unable to establish websocket connection");
818
819
        if ( likely(WSS_SUCCESS == write_internal(session, message)) ) {
820
            session->state = WRITING;
821
            session->event = READ;
822
            WSS_write(server, session);
823
        }
824
825
        WSS_free_header(header);
826
827
        return;
828
    }
829
830
    // Use echo protocol if none was chosen
831
    if (NULL == header->ws_protocol) {
832
        header->ws_protocol = WSS_find_subprotocol("echo");
833
    }
834
835
    // Notify websocket protocol of the connection
836
    header->ws_protocol->connect(session->fd, session->ip, session->port, header->path, header->cookies);
837
838
    // Set session as fully handshaked
839
    session->handshaked = true;
840
    session->header = header;
841
842
    if ( likely(WSS_SUCCESS == write_internal(session, message)) ) {
843
        session->state = WRITING;
844
        session->event = READ;
845
        WSS_write(server, session);
846
    }
847
}
848
849
/**
850
 * Function that reads information from a session.
851
 *
852
 * @param 	server	[wss_server_t *] 	"The server structure"
853
 * @param 	session	[wss_session_t *] 	"The session structure"
854
 * @return          [void]
855
 */
856
void WSS_read(wss_server_t *server, wss_session_t *session) {
857
    int n;
858
    size_t len;
859
    uint16_t code;
860
    wss_frame_t *frame;
861
    char *msg;
862
    bool closing = false;
863
    size_t offset = 0, prev_offset = 0;
864
    wss_frame_t **frames = NULL;
865
    char *payload = NULL;
866
    size_t payload_length = 0;
867
    size_t frames_length = 0;
868
    char *message;
869
    size_t message_length;
870
    wss_message_t *m;
871
    size_t i, j, k;
872
    size_t msg_length = 0;
873
    size_t msg_offset = 0;
874
    size_t starting_frame = 0;
875
    bool fragmented = false;
876
    char *buffer;
877
878
    // If no initial header has been seen for the session, the websocket
879
    // handshake is yet to be made.
880
    if ( unlikely(! session->handshaked) ){
881
        WSS_log_trace("Doing websocket handshake");
882
        handshake(server, session);
883
        return;
884
    }
885
886
    WSS_log_trace("Starting initial steps to read from client");
887
888
    if ( unlikely(NULL == (buffer = WSS_malloc(server->config->size_buffer)))) {
889
        WSS_log_error("Unable to allocate buffer");
890
        session->closing = true;
891
        return;
892
    }
893
894
    // Use earlier payload
895
    payload = session->payload;
896
    payload_length = session->payload_length;
897
    offset = session->offset;
898
    frames = session->frames;
899
    frames_length = session->frames_length;
900
    session->payload = NULL;
901
    session->payload_length = 0;
902
    session->offset = 0;
903
    session->frames = NULL;
904
    session->frames_length = 0;
905
906
    // If handshake has been made, we can read the websocket frames from
907
    // the connection
908
    do {
909
        n = read_internal(server, session, buffer);
910
911
        switch (n) {
912
            // Wait for IO for either read or write on the filedescriptor
913
            case -2:
914
                WSS_log_trace("Detected that server needs further IO to complete the reading");
915
                session->payload = payload;
916
                session->payload_length = payload_length;
917
918
                session->event = WRITE;
919
920
                return;
921
            // An error occured
922
            case -1:
923
                WSS_free((void **) &payload);
924
925
                frame = WSS_closing_frame(CLOSE_UNEXPECTED, NULL);
926
                message_length = WSS_stringify_frame(frame, &message);
927
                WSS_free_frame(frame);
928
929
                if ( unlikely(NULL == (m = WSS_malloc(sizeof(wss_message_t)))) ) {
930
                    WSS_log_error("Unable to allocate the message structure");
931
                    WSS_free((void **) &message);
932
                    session->closing = true;
933
                    return;
934
                }
935
                m->msg = message;
936
                m->length = message_length;
937
                m->framed = true;
938
939
                if ( likely(WSS_SUCCESS == write_internal(session, m)) ) {
940
                    session->state = WRITING;
941
                    session->event = READ;
942
                    WSS_write(server, session);
943
                }
944
                return;
945
            // No new data received
946
            case 0:
947
                break;
948
            // Empty buffer into payload
949
            default:
950
                if ( unlikely(NULL == (payload = WSS_realloc((void **) &payload, payload_length*sizeof(char), (payload_length+n+1)*sizeof(char)))) ) {
951
                    WSS_log_error("Unable to reallocate the payload");
952
                    session->closing = true;
953
                    return;
954
                }
955
956
                memcpy(payload+payload_length, buffer, n);
957
                payload_length += n;
958
                memset(buffer, '\0', server->config->size_buffer);
959
960
        }
961
    } while ( likely(n != 0) );
962
963
    // Release memory used for buffer
964
    WSS_free((void **) &buffer);
965
966
    WSS_log_trace("Payload from client was read. Continues flow by parsing frames.");
967
968
    // Parse the payload into websocket frames
969
    do {
970
        prev_offset = offset;
971
972
        if ( unlikely(NULL == (frame = WSS_parse_frame(payload, payload_length, &offset))) ) {
973
            WSS_log_trace("Unable to parse frame");
974
            WSS_free((void **) &payload);
975
            session->closing = true;
976
            return;
977
        }
978
979
        // Check if we were forced to read beyond the payload to create a full frame
980
        if ( unlikely(offset > payload_length) ) {
981
            WSS_log_trace("Detected that data was missing in order to complete frame, will wait for more");
982
983
            WSS_free_frame(frame);
984
985
            session->payload = payload;
986
            session->payload_length = payload_length;
987
            session->offset = prev_offset;
988
            session->frames = frames;
989
            session->frames_length = frames_length;
990
            session->event = READ;
991
992
            return;
993
        }
994
995
        // If no extension is negotiated, the rsv bits must not be used
996
        if ( unlikely(NULL == session->header->ws_extensions && (frame->rsv1 || frame->rsv2 || frame->rsv3)) ) {
997
            WSS_log_trace("Protocol Error: rsv bits must not be set without using extensions");
998
            WSS_free_frame(frame);
999
            frame = WSS_closing_frame(CLOSE_PROTOCOL, NULL);
1000
        } else
1001
1002
        // If opcode is unknown
1003
        if ( unlikely((frame->opcode >= 0x3 && frame->opcode <= 0x7) ||
1004
                (frame->opcode >= 0xB && frame->opcode <= 0xF)) ) {
1005
            WSS_log_trace("Type Error: Unknown upcode");
1006
            WSS_free_frame(frame);
1007
            frame = WSS_closing_frame(CLOSE_TYPE, NULL);
1008
        } else
1009
1010
        // Server expects all received data to be masked
1011
        if ( unlikely(! frame->mask) ) {
1012
            WSS_log_trace("Protocol Error: Client message should always be masked");
1013
            WSS_free_frame(frame);
1014
            frame = WSS_closing_frame(CLOSE_PROTOCOL, NULL);
1015
        } else
1016
1017
        // Control frames cannot be fragmented
1018
        if ( unlikely(! frame->fin && frame->opcode >= 0x8 && frame->opcode <= 0xA) ) {
1019
            WSS_log_trace("Protocol Error: Control frames cannot be fragmented");
1020
            WSS_free_frame(frame);
1021
            frame = WSS_closing_frame(CLOSE_PROTOCOL, NULL);
1022
        } else
1023
1024
        // Check that frame is not too large
1025
        if ( unlikely(frame->payloadLength > server->config->size_frame) ) {
1026
            WSS_log_trace("Protocol Error: Control frames cannot be fragmented");
1027
            WSS_free_frame(frame);
1028
            frame = WSS_closing_frame(CLOSE_BIG, NULL);
1029
        } else
1030
1031
        // Control frames cannot have a payload length larger than 125 bytes
1032
        if ( unlikely(frame->opcode >= 0x8 && frame->opcode <= 0xA && frame->payloadLength > 125) ) {
1033
            WSS_log_trace("Protocol Error: Control frames cannot have payload larger than 125 bytes");
1034
            WSS_free_frame(frame);
1035
            frame = WSS_closing_frame(CLOSE_PROTOCOL, NULL);
1036
        } else
1037
1038
        // In HYBI10 specification the most significant bit must not be set
1039
        if ( unlikely((session->header->ws_type == HYBI10 || session->header->ws_type == HYBI07) && frame->payloadLength & ((uint64_t)1 << (sizeof(uint64_t)*8-1))) ) {
1040
            WSS_log_trace("Protocol Error: Frame payload length must not use MSB");
1041
            WSS_free_frame(frame);
1042
            frame = WSS_closing_frame(CLOSE_PROTOCOL, NULL);
1043
        } else
1044
1045
        // Close frame
1046
        if ( unlikely(frame->opcode == CLOSE_FRAME) ) {
1047
            // A code of 2 byte must be present if there is any application data for closing frame
1048
            if ( unlikely(frame->applicationDataLength > 0 && frame->applicationDataLength < 2) ) {
1049
                WSS_log_trace("Protocol Error: Closing frame with payload too small bytewise error code");
1050
                WSS_free_frame(frame);
1051
                frame = WSS_closing_frame(CLOSE_PROTOCOL, NULL);
1052
            } else
1053
1054
1055
            // The payload after the code, must be valid UTF8
1056
            if ( unlikely(frame->applicationDataLength >= 2 && ! utf8_check(frame->payload+2, frame->applicationDataLength-2)) ) {
1057
                WSS_log_trace("Protocol Error: Payload of error frame must be valid UTF8.");
1058
                WSS_free_frame(frame);
1059
                frame = WSS_closing_frame(CLOSE_UTF8, NULL);
1060
            } else
1061
1062
            // Check status code is within valid range
1063
            if (frame->applicationDataLength >= 2) {
1064
                // Copy code
1065
                memcpy(&code, frame->payload, sizeof(uint16_t));
1066
                code = ntohs(code);
1067
1068
                WSS_log_debug("Closing frame code: %d", code);
1069
1070
                // Current rfc6455 codes
1071
                if ( unlikely(code < 1000 || (code >= 1004 && code <= 1006) || (code >= 1015 && code < 3000) || code >= 5000) ) {
1072
                    WSS_log_trace("Protocol Error: Closing frame has invalid error code");
1073
                    WSS_free_frame(frame);
1074
                    frame = WSS_closing_frame(CLOSE_PROTOCOL, NULL);
1075
                }
1076
            }
1077
        } else
1078
1079
        // Pong
1080
        if ( unlikely(frame->opcode == PONG_FRAME) ) {
1081
            WSS_log_trace("Pong received");
1082
1083
            WSS_free_frame(frame);
1084
1085
            continue;
1086
        } else
1087
1088
        // Ping
1089
        if ( unlikely(frame->opcode == PING_FRAME) ) {
1090
            WSS_log_trace("Ping received");
1091
            frame = WSS_pong_frame(frame);
1092
        }
1093
1094
        if ( unlikely(NULL == (frames = WSS_realloc((void **) &frames, frames_length*sizeof(wss_frame_t *),
1095
                        (frames_length+1)*sizeof(wss_frame_t *)))) ) {
1096
            WSS_log_error("Unable to reallocate frames");
1097
            session->closing = true;
1098
            return;
1099
        }
1100
        frames[frames_length] = frame;
1101
        frames_length += 1;
1102
1103
        // Check if frame count is exceeded
1104
        if ( unlikely(frames_length > server->config->max_frames) ) {
1105
            WSS_free_frame(frame);
1106
            frame = WSS_closing_frame(CLOSE_BIG, NULL);
1107
            frames[frames_length-1] = frame;
1108
        }
1109
1110
        // Close
1111
        if ( unlikely(frame->opcode == CLOSE_FRAME) ) {
1112
            closing = true;
1113
            WSS_log_trace("Stopping frame validation as closing frame was parsed");
1114
            break;
1115
        }
1116
    } while ( likely(offset < payload_length) );
1117
1118
    WSS_free((void **) &payload);
1119
1120
    WSS_log_trace("A total of %lu frames was parsed.", frames_length);
1121
1122
    WSS_log_trace("Starting frame validation");
1123
1124
    // Validating frames.
1125
    for (i = 0; likely(i < frames_length); i++) {
1126
        // If we are not processing a fragmented set of frames, expect the
1127
        // opcode different from the continuation frame.
1128
        if ( unlikely(! fragmented && (frames[i]->opcode == CONTINUATION_FRAME)) ) {
1129
            WSS_log_trace("Protocol Error: continuation opcode used in non-fragmented message");
1130
            for (j = i; likely(j < frames_length); j++) {
1131
                WSS_free_frame(frames[j]);
1132
            }
1133
            frames[i] = WSS_closing_frame(CLOSE_PROTOCOL, NULL);
1134
            frames_length = i+1;
1135
            closing = true;
1136
            break;
1137
        }
1138
1139
        // If we receive control frame within fragmented frames.
1140
        if ( unlikely(fragmented && (frames[i]->opcode >= 0x8 && frames[i]->opcode <= 0xA)) ) {
1141
            WSS_log_trace("Received control frame within fragmented message");
1142
1143
            frame = frames[i];
1144
            // If we received a closing frame substitue the fragment with the
1145
            // closing frame and end validation
1146
            if (frames[i]->opcode == CLOSE_FRAME) {
1147
                WSS_log_trace("Stopping further validation of fragmented message as a closing frame was detected");
1148
1149
                for (j = starting_frame; likely(j < frames_length); j++) {
1150
                    if ( unlikely(j != i) ) {
1151
                        WSS_free_frame(frames[j]);
1152
                    } else {
1153
                        frames[j] = NULL;
1154
                    }
1155
                }
1156
                frames[starting_frame] = frame;
1157
                frames_length = starting_frame+1;
1158
                fragmented = false;
1159
                closing = true;
1160
                break;
1161
            // Else rearrange the frames, such that control frame is first
1162
            } else {
1163
                WSS_log_trace("Rearranging frames such that control frame will be written before fragmented frames");
1164
                for (j = i; likely(j > starting_frame); j--) {
1165
                    frames[j] = frames[j-1];
1166
                }
1167
                frames[starting_frame] = frame;
1168
                starting_frame++;
1169
            }
1170
        } else
1171
1172
        // If we are processing a fragmented set of frames, expect the opcode
1173
        // to be a contination frame.
1174
        if ( unlikely(fragmented && frames[i]->opcode != CONTINUATION_FRAME) ) {
1175
            WSS_log_trace("Protocol Error: during fragmented message received other opcode than continuation");
1176
            for (j = starting_frame; likely(j < frames_length); j++) {
1177
                WSS_free_frame(frames[j]);
1178
            }
1179
            frames[starting_frame] = WSS_closing_frame(CLOSE_PROTOCOL, NULL);
1180
            frames_length = starting_frame+1;
1181
            fragmented = false;
1182
            closing = true;
1183
            break;
1184
        }
1185
1186
        // If message consists of single frame or we have the starting frame of
1187
        // a multiframe message, store the starting index
1188
        if ( frames[i]->fin && frames[i]->opcode != CONTINUATION_FRAME ) {
1189
            starting_frame = i;
1190
        } else if ( ! frames[i]->fin && frames[i]->opcode != CONTINUATION_FRAME ) {
1191
            starting_frame = i;
1192
            fragmented = true;
1193
        }
1194
1195
        if (frames[i]->fin) {
1196
            fragmented = false;
1197
        }
1198
    }
1199
1200
    // If fragmented is still true, we did not receive the whole message, and
1201
    // we hence want to wait until we get the rest.
1202
    if ( unlikely(fragmented) ) {
1203
        WSS_log_trace("Detected missing frames in fragmented message, will wait for further IO");
1204
1205
        session->frames = frames;
1206
        session->frames_length = frames_length;
1207
        session->event = READ;
1208
1209
        return;
1210
    }
1211
1212
    WSS_log_trace("Frames was validated");
1213
1214
    session->state = IDLE;
1215
1216
    WSS_log_trace("Sending frames to receivers");
1217
1218
    // Sending message to subprotocol and sending it to the filedescriptors
1219
    // returned by the subprotocol.
1220
    for (i = 0; likely(i < frames_length); i++) {
1221
        // If message consists of single frame or we have the starting frame of
1222
        // a multiframe message, store the starting index
1223
        if ( (frames[i]->fin && !(frames[i]->opcode == 0x0)) ||
1224
             (! frames[i]->fin && frames[i]->opcode != 0x0) ) {
1225
            starting_frame = i;
1226
        }
1227
1228
        if (frames[i]->fin) {
1229
            len = i-starting_frame+1;
1230
1231
            WSS_log_trace("Applying %d extensions on input", session->header->ws_extensions_count);
1232
1233
            // Apply extensions to collection of frames (message)
1234
            for (j = 0; likely(j < session->header->ws_extensions_count); j++) {
1235
                // Apply extension perframe
1236
                for (k = 0; likely(k < len); k++) {
1237
                    session->header->ws_extensions[j]->ext->inframe(
1238
                            session->fd,
1239
                            (void *)frames[k+starting_frame]);
1240
                }
1241
1242
                // Apply extension for set of frames
1243
                session->header->ws_extensions[j]->ext->inframes(
1244
                        session->fd,
1245
                        frames+starting_frame,
1246
                        len);
1247
            }
1248
1249
            WSS_log_trace("Assembling message");
1250
1251
            for (j = starting_frame; likely(j <= i); j++) {
1252
                msg_length += frames[j]->applicationDataLength;
1253
            }
1254
1255
            if ( unlikely(NULL == (msg = WSS_malloc((msg_length+1)*sizeof(char)))) ) {
1256
                WSS_log_error("Unable to allocate message");
1257
1258
                for (k = 0; likely(k < frames_length); k++) {
1259
                    WSS_free_frame(frames[k]);
1260
                }
1261
                WSS_free((void **) &frames);
1262
1263
                session->closing = true;
1264
1265
                return;
1266
            }
1267
1268
            for (j = starting_frame; likely(j <= i); j++) {
1269
                memcpy(msg+msg_offset, frames[j]->payload+frames[j]->extensionDataLength, frames[j]->applicationDataLength);
1270
                msg_offset += frames[j]->applicationDataLength;
1271
            }
1272
1273
            WSS_log_debug("Unmasked message (%d bytes): %s\n", msg_length, msg);
1274
1275
            // Check utf8 for text frames
1276
            if ( unlikely(frames[starting_frame]->opcode == TEXT_FRAME && ! utf8_check(msg, msg_length)) ) {
1277
                WSS_log_trace("UTF8 Error: the text was not UTF8 encoded correctly");
1278
1279
                for (j = starting_frame; likely(j < frames_length); j++) {
1280
                    WSS_free_frame(frames[j]);
1281
                }
1282
                frames[starting_frame] = WSS_closing_frame(CLOSE_UTF8, NULL);
1283
                frames_length = starting_frame+1;
1284
                i = starting_frame;
1285
                len = 1;
1286
                WSS_free((void **) &msg);
1287
                msg_length = 0;
1288
                closing = true;
1289
            }
1290
1291
            if ( likely(frames[starting_frame]->opcode == TEXT_FRAME || frames[starting_frame]->opcode == BINARY_FRAME) ) {
1292
                WSS_log_trace("Notifying subprotocol of message");
1293
1294
                // Use subprotocol
1295
                session->header->ws_protocol->message(session->fd, frames[starting_frame]->opcode, msg, msg_length);
1296
            } else {
1297
                WSS_log_trace("Writing control frame message");
1298
1299
                WSS_session_jobs_inc(session);
1300
                WSS_message_send_frames((void *)server, (void *)session, &frames[starting_frame], len);
1301
            }
1302
1303
            WSS_free((void **) &msg);
1304
            msg_length = 0;
1305
            msg_offset = 0;
1306
        }
1307
    }
1308
1309
    if ( likely(frames_length > 0) ) {
1310
        for (i = 0; likely(i < frames_length); i++) {
1311
            WSS_free_frame(frames[i]);
1312
        }
1313
        WSS_free((void **) &frames);
1314
    }
1315
1316
    if (! closing && session->event == NONE) {
1317
        WSS_log_trace("Set epoll file descriptor to read mode after finishing read");
1318
        session->event = READ;
1319
    }
1320
}
1321
1322
/**
1323
 * Function that writes information to a session and decides wether event poll
1324
 * should be rearmed and whether a session lock should be performed.
1325
 *
1326
 * @param 	server	[wss_server_t *] 	"The server structure"
1327
 * @param 	session	[wss_session_t *] 	"The session structure"
1328
 * @return          [void]
1329
 */
1330
void WSS_write(wss_server_t *server, wss_session_t *session) {
1331
    int n;
1332
    wss_message_t *message;
1333
    unsigned int i;
1334
    unsigned int message_length;
1335
    unsigned int bytes_sent;
1336
    size_t len, off;
1337
    bool closing = false;
1338
1339
    WSS_log_trace("Performing write by popping messages from ringbuffer");
1340
1341
    while ( likely(0 != (len = ringbuf_consume(session->ringbuf, &off))) ) {
1342
        for (i = 0; likely(i < len); i++) {
1343
            if ( unlikely(session->handshaked && closing) ) {
1344
                WSS_log_trace("No further messages are necessary as client connection is closing");
1345
                break;
1346
            }
1347
1348
            bytes_sent = session->written;
1349
            session->written = 0;
1350
            message = session->messages[off+i];
1351
            message_length = message->length;
1352
1353
            while ( likely(bytes_sent < message_length) ) {
1354
                // Check if message contains closing byte
1355
                if ( unlikely(message->framed && bytes_sent == 0 &&
1356
                     ((message->msg[0] & 0xF) & 0x8) == (message->msg[0] & 0xF)) ) {
1357
                    closing = true;
1358
                }
1359
1360
                if (NULL != session->ssl) {
1361
                    if (! WSS_ssl_write_partial(session, i, message, &bytes_sent)) {
1362
                        return;
1363
                    }
1364
                } else {
1365
                    do {
1366
                        n = write(session->fd, message->msg+bytes_sent, message_length-bytes_sent);
1367
                        if (unlikely(n == -1)) {
1368
                            if ( unlikely(errno == EINTR) ) {
1369
                                errno = 0;
1370
                                continue;
1371
                            } else if ( unlikely(errno != EAGAIN && errno != EWOULDBLOCK) ) {
1372
                                WSS_log_error("Write failed: %s", strerror(errno));
1373
                                session->closing = true;
1374
                                return;
1375
                            }
1376
1377
                            session->written = bytes_sent;
1378
                            ringbuf_release(session->ringbuf, i);
1379
1380
                            session->event = WRITE;
1381
1382
                            return;
1383
                        } else {
1384
                            bytes_sent += n;
1385
                        }
1386
                    } while ( unlikely(0) );
1387
                }
1388
            }
1389
1390
            if ( likely(session->messages != NULL) ) {
1391
                if ( likely(session->messages[off+i] != NULL) ) {
1392
                    if ( likely(session->messages[off+i]->msg != NULL) ) {
1393
                        WSS_free((void **)&session->messages[off+i]->msg);
1394
                    }
1395
                    WSS_free((void **)&session->messages[off+i]);
1396
                }
1397
            }
1398
        }
1399
1400
        ringbuf_release(session->ringbuf, len);
1401
    }
1402
1403
    WSS_log_trace("Done writing to filedescriptors");
1404
1405
    session->state = IDLE;
1406
1407
    if (closing) {
1408
        WSS_log_trace("Closing connection, since closing frame has been sent");
1409
1410
        session->closing = true;
1411
1412
        return;
1413
    }
1414
}
1415
1416
/**
1417
 * Function that performs and distributes the IO work.
1418
 *
1419
 * @param 	args	[void *] 	"Is a args_t structure holding server_t, filedescriptor, and the state"
1420
 * @return          [void]
1421
 */
1422
void WSS_work(void *args) {
1423
    wss_session_t *session;
1424
    long unsigned int ms;
1425
    struct timespec now;
1426
    wss_thread_args_t *arguments = (wss_thread_args_t *) args;
1427
    wss_server_t *server = (wss_server_t *) arguments->server;
1428
    wss_session_state_t session_state = arguments->state;
1429
    int fd = arguments->fd;
1430
1431
    // Free arguments structure as this won't be needed no more
1432
    WSS_free((void **) &arguments);
1433
1434
    if ( unlikely(session_state == CONNECTING) ) {
1435
        WSS_log_trace("Handling connect event");
1436
        WSS_connect(server);
1437
        return;
1438
    }
1439
1440
    if ( unlikely(NULL == (session = WSS_session_find(fd))) ) {
1441
        WSS_log_trace("Unable to find client with session %d", fd);
1442
        return;
1443
    }
1444
1445
    WSS_log_trace("Incrementing session jobs");
1446
1447
    WSS_session_jobs_inc(session);
1448
    pthread_mutex_lock(&session->lock);
1449
1450
    session->event = NONE;
1451
1452
    WSS_log_trace("Checking session state");
1453
1454
    // We first check if session was in the middle of performing an IO task but
1455
    // needed to wait for further IO
1456
    switch (session->state) {
1457
        case WRITING:
1458
            clock_gettime(CLOCK_MONOTONIC, &now);
1459
            ms = (((now.tv_sec - session->alive.tv_sec)*1000)+(now.tv_nsec/1000000)) - (session->alive.tv_nsec/1000000);
1460
            if ( unlikely(server->config->timeout_write >= 0 && ms >= (long unsigned int)server->config->timeout_write) ) {
1461
                WSS_log_trace("Write timeout detected for session %d", fd);
1462
                session->closing = true;
1463
                break;
1464
            }
1465
1466
            session->event = READ;
1467
            WSS_write(server, session);
1468
            break;
1469
        case CLOSING:
1470
            session->closing = true;
1471
            break;
1472
        case CONNECTING:
1473
            if (NULL != server->ssl_ctx) {
1474
                WSS_ssl_handshake(server, session);
1475
                WSS_session_jobs_dec(session);
1476
                pthread_mutex_unlock(&session->lock);
1477
                return;
1478
            }
1479
1480
            WSS_read(server, session);
1481
            break;
1482
        case READING:
1483
            clock_gettime(CLOCK_MONOTONIC, &now);
1484
            ms = (((now.tv_sec - session->alive.tv_sec)*1000)+(now.tv_nsec/1000000)) - (session->alive.tv_nsec/1000000);
1485
            if ( unlikely(server->config->timeout_read >= 0 && ms >= (long unsigned int)server->config->timeout_read) ) {
1486
                WSS_log_trace("Read timeout detected for session %d", fd);
1487
                session->closing = true;
1488
                break;
1489
            }
1490
1491
            WSS_read(server, session);
1492
            break;
1493
        case IDLE:
1494
            session->state = session_state;
1495
1496
            switch (session_state) {
1497
                case WRITING:
1498
                    WSS_log_trace("Handling write event");
1499
                    session->event = READ;
1500
                    WSS_write(server, session);
1501
                    break;
1502
                case CLOSING:
1503
                    session->closing = true;
1504
                    WSS_log_trace("Handling close event");
1505
                    break;
1506
                case READING:
1507
                    WSS_log_trace("Handling read event");
1508
                    WSS_read(server, session);
1509
                    break;
1510
                default:
1511
                    WSS_log_error("State that should not be possible at this point was encountered %s", state);
1512
                    break;
1513
            }
1514
    }
1515
1516
    WSS_session_jobs_dec(session);
1517
    pthread_mutex_unlock(&session->lock);
1518
1519
    if (session->closing) {
1520
        session->event = NONE;
1521
        WSS_disconnect(server, session);
1522
    }
1523
1524
    switch (session->event) {
1525
        case WRITE:
1526
            clock_gettime(CLOCK_MONOTONIC, &session->alive);
1527
            WSS_poll_set_write(server, session->fd);
1528
            break;
1529
        case READ:
1530
            clock_gettime(CLOCK_MONOTONIC, &session->alive);
1531
            WSS_poll_set_read(server, session->fd);
1532
            break;
1533
        case NONE:
1534
            break;
1535
    }
1536
}