MythTV master
websocket.cpp
Go to the documentation of this file.
1// Own header
2#include "websocket.h"
3
4// C++
5#include <algorithm>
6#include <chrono>
7#include <cmath>
8
9// MythTV headers
14
16
17// QT headers
18#include <QCryptographicHash>
19#include <QSslCipher>
20#include <QTcpSocket>
21#include <QThread>
22#include <QtGlobal>
23
25//
27
29 m_threadPool("WebSocketServerPool")
30{
31 setObjectName("WebSocketServer");
32 // Number of connections processed concurrently
33 int maxThreads = std::max(QThread::idealThreadCount() * 2, 2); // idealThreadCount can return -1
34 // Don't allow more connections than we can process, it will simply cause
35 // browsers to stall
36 setMaxPendingConnections(maxThreads);
38
39 LOG(VB_HTTP, LOG_NOTICE, QString("WebSocketServer(): Max Thread Count %1")
41}
42
44{
45 m_rwlock.lockForWrite();
46 m_running = false;
47 m_rwlock.unlock();
48
50}
51
53{
54
56 auto *server = qobject_cast<PrivTcpServer *>(QObject::sender());
57 if (server)
58 type = server->GetServerType();
59
61 new WebSocketWorkerThread(*this, socket, type
62#ifndef QT_NO_OPENSSL
64#endif
65 ),
66 QString("WebSocketServer%1").arg(socket));
67}
68
70//
72
74 qintptr sock, PoolServerType type
75#ifndef QT_NO_OPENSSL
76 , const QSslConfiguration& sslConfig
77#endif
78 )
79 :
80 m_webSocketServer(webSocketServer), m_socketFD(sock),
81 m_connectionType(type)
82#ifndef QT_NO_OPENSSL
83 , m_sslConfig(sslConfig)
84#endif
85{
86}
87
89{
92#ifndef QT_NO_OPENSSL
94#endif
95 );
96 worker->Exec();
97
98 worker->deleteLater();
99}
100
102//
104
106 qintptr sock, PoolServerType type
107#ifndef QT_NO_OPENSSL
108 , const QSslConfiguration& sslConfig
109#endif
110 )
111 : m_eventLoop(new QEventLoop()),
112 m_webSocketServer(webSocketServer),
113 m_socketFD(sock), m_connectionType(type),
114 m_heartBeat(new QTimer())
115#ifndef QT_NO_OPENSSL
116 , m_sslConfig(sslConfig)
117#endif
118{
119 setObjectName(QString("WebSocketWorker(%1)")
120 .arg(m_socketFD));
121 LOG(VB_HTTP, LOG_INFO, QString("WebSocketWorker(%1): New connection")
122 .arg(m_socketFD));
123
124 // For now, until it's refactored, register the only extension here
126
127 SetupSocket();
128
129 m_isRunning = true;
130}
131
133{
135
136 // If extensions weren't deregistered then it's our responsibility to
137 // delete them
138 while (!m_extensions.isEmpty())
139 {
140 WebSocketExtension *extension = m_extensions.takeFirst();
141 extension->deleteLater();
142 extension = nullptr;
143 }
144
145 m_eventLoop->deleteLater();
146 m_eventLoop = nullptr;
147 delete m_heartBeat;
148}
149
151{
152 m_eventLoop->exec();
153}
154
156{
157 blockSignals(true);
158 m_isRunning = false;
159 LOG(VB_HTTP, LOG_INFO, "WebSocketWorker::CloseConnection()");
160 if (m_eventLoop)
161 m_eventLoop->exit();
162}
163
165{
167 {
168
169#ifndef QT_NO_OPENSSL
170 auto *pSslSocket = new QSslSocket();
171 if (pSslSocket->setSocketDescriptor(m_socketFD)
172 && gCoreContext->CheckSubnet(pSslSocket))
173 {
174 pSslSocket->setSslConfiguration(m_sslConfig);
175 pSslSocket->startServerEncryption();
176 if (pSslSocket->waitForEncrypted(5000))
177 {
178 LOG(VB_HTTP, LOG_INFO, "SSL Handshake occurred, connection encrypted");
179 LOG(VB_HTTP, LOG_INFO, QString("Using %1 cipher").arg(pSslSocket->sessionCipher().name()));
180 }
181 else
182 {
183 LOG(VB_HTTP, LOG_WARNING, "SSL Handshake FAILED, connection terminated");
184 delete pSslSocket;
185 pSslSocket = nullptr;
186 }
187 }
188 else
189 {
190 delete pSslSocket;
191 pSslSocket = nullptr;
192 }
193
194 if (pSslSocket)
195 m_socket = pSslSocket;
196 else
197 return;
198#else
199 return;
200#endif
201 }
202 else // Plain old unencrypted socket
203 {
204 m_socket = new QTcpSocket();
205 m_socket->setSocketDescriptor(m_socketFD);
207 {
208 delete m_socket;
209 m_socket = nullptr;
210 return;
211 }
212
213 }
214
215 m_socket->setSocketOption(QAbstractSocket::KeepAliveOption, QVariant(1));
216
217 connect(m_socket, &QIODevice::readyRead, this, &WebSocketWorker::doRead);
218 connect(m_socket, &QAbstractSocket::disconnected, this, &WebSocketWorker::CloseConnection);
219
220 // Setup heartbeat
221 m_heartBeat->setInterval(20s);
222 m_heartBeat->setSingleShot(false);
224}
225
227{
228 if (m_socket->error() != QAbstractSocket::UnknownSocketError)
229 {
230 LOG(VB_HTTP, LOG_WARNING, QString("WebSocketWorker(%1): Error %2 (%3)")
231 .arg(m_socketFD)
232 .arg(m_socket->errorString())
233 .arg(m_socket->error()));
234 }
235
236 std::chrono::milliseconds writeTimeout = 5s;
237 // Make sure any data in the buffer is flushed before the socket is closed
238 while (m_webSocketServer.IsRunning() &&
239 m_socket->isValid() &&
240 m_socket->state() == QAbstractSocket::ConnectedState &&
241 m_socket->bytesToWrite() > 0)
242 {
243 LOG(VB_HTTP, LOG_DEBUG, QString("WebSocketWorker(%1): "
244 "Waiting for %2 bytes to be written "
245 "before closing the connection.")
246 .arg(m_socketFD)
247 .arg(m_socket->bytesToWrite()));
248
249 // If the client stops reading for longer than 'writeTimeout' then
250 // stop waiting for them. We can't afford to leave the socket
251 // connected indefinitely, it could be used by another client.
252 //
253 // NOTE: Some clients deliberately stall as a way of 'pausing' A/V
254 // streaming. We should create a new server extension or adjust the
255 // timeout according to the User-Agent, instead of increasing the
256 // standard timeout. However we should ALWAYS have a timeout.
257 if (!m_socket->waitForBytesWritten(writeTimeout.count()))
258 {
259 LOG(VB_GENERAL, LOG_WARNING, QString("WebSocketWorker(%1): "
260 "Timed out waiting to write bytes to "
261 "the socket, waited %2 seconds")
262 .arg(m_socketFD)
263 .arg(writeTimeout.count() / 1000));
264 break;
265 }
266 }
267
268 if (m_socket->bytesToWrite() > 0)
269 {
270 LOG(VB_HTTP, LOG_WARNING, QString("WebSocketWorker(%1): "
271 "Failed to write %2 bytes to "
272 "socket, (%3)")
273 .arg(m_socketFD)
274 .arg(m_socket->bytesToWrite())
275 .arg(m_socket->errorString()));
276 }
277
278 LOG(VB_HTTP, LOG_INFO, QString("WebSocketWorker(%1): Connection %2 closed.")
279 .arg(m_socketFD)
280 .arg(m_socket->socketDescriptor()));
281
282 m_socket->close();
283 m_socket->deleteLater();
284 m_socket = nullptr;
285}
286
288{
289 if (!m_isRunning)
290 return;
291
292 if (!m_webSocketServer.IsRunning() || !m_socket->isOpen())
293 {
295 return;
296 }
297
298 if (m_webSocketMode)
299 {
301 }
302 else
303 {
306 }
307
308 if (!m_webSocketMode)
309 {
310 LOG(VB_HTTP, LOG_WARNING, "WebSocketServer: Timed out waiting for connection upgrade");
312 }
313}
314
315bool WebSocketWorker::ProcessHandshake(QTcpSocket *socket)
316{
317 QByteArray line;
318
319 // Minimum length of the request line is 16
320 // i.e. "GET / HTTP/1.1\r\n"
321 while (socket->bytesAvailable() < 16)
322 {
323 if (!socket->waitForReadyRead(5000)) // 5 second timeout
324 return false;
325 }
326
327 // Set a maximum length for a request line of 255 bytes, it's an
328 // arbitrary limit but a reasonable one for now
329 line = socket->readLine(255).trimmed(); // Strip newline
330 if (line.isEmpty())
331 return false;
332
333 QStringList tokens = QString(line).split(' ', Qt::SkipEmptyParts);
334 if (tokens.length() != 3) // Anything but 3 is invalid - {METHOD} {HOST/PATH} {PROTOCOL}
335 {
336 LOG(VB_GENERAL, LOG_ERR, "WebSocketWorker::ProcessHandshake() - Invalid number of tokens in Request line");
337 return false;
338 }
339
340 if (tokens[0] != "GET") // RFC 6455 - Request method MUST be GET
341 {
342 LOG(VB_GENERAL, LOG_ERR, QString("WebSocketWorker::ProcessHandshake() - Invalid method: %1").arg(tokens[0]));
343 return false;
344 }
345
346 const QString& path = tokens[1];
347
348 if (path.contains('#')) // RFC 6455 - Fragments MUST NOT be used
349 {
350 LOG(VB_GENERAL, LOG_ERR, "WebSocketWorker::ProcessHandshake() - Path contains illegal fragment");
351 return false;
352 }
353
354 if (!tokens[2].startsWith("HTTP/")) // Must be HTTP
355 {
356 LOG(VB_GENERAL, LOG_ERR, QString("WebSocketWorker::ProcessHandshake() - Invalid protocol: %1").arg(tokens[2]));
357 return false;
358 }
359
360 QString versionStr = tokens[2].section('/', 1, 1);
361
362 int majorVersion = versionStr.section('.', 0, 0).toInt();
363 int minorVersion = versionStr.section('.', 1, 1).toInt();
364
365 if ((majorVersion < 1) || (minorVersion < 1)) // RFC 6455 - HTTP version MUST be at least 1.1
366 {
367 LOG(VB_GENERAL, LOG_ERR, QString("WebSocketWorker::ProcessHandshake() - Invalid HTTP version: %1.%2").arg(majorVersion).arg(minorVersion));
368 return false;
369 }
370
371 // Read Header
372 line = socket->readLine(2000).trimmed(); // 2KB Maximum
373
374 QMap<QString, QString> requestHeaders;
375 while (!line.isEmpty())
376 {
377 QString strLine = QString(line);
378 QString sName = strLine.section( ':', 0, 0 ).trimmed();
379 QString sValue = strLine.section( ':', 1 ).trimmed();
380 sValue.replace(" ",""); // Remove all internal whitespace
381
382 if (!sName.isEmpty() && !sValue.isEmpty())
383 requestHeaders.insert(sName.toLower(), sValue);
384
385 line = socket->readLine(2000).trimmed(); // 2KB Maximum
386 }
387
388 // Dump request header
389 QMap<QString, QString>::iterator it;
390 for ( it = requestHeaders.begin();
391 it != requestHeaders.end();
392 ++it )
393 {
394 LOG(VB_HTTP, LOG_INFO, QString("(Request Header) %1: %2")
395 .arg(it.key(), *it));
396 }
397
398 if (!requestHeaders.contains("connection")) // RFC 6455 - 1.3. Opening Handshake
399 {
400 LOG(VB_GENERAL, LOG_ERR, "WebSocketWorker::ProcessHandshake() - Missing 'Connection' header");
401 return false;
402 }
403
404 QStringList connectionValues = requestHeaders["connection"].split(',', Qt::SkipEmptyParts);
405 if (!connectionValues.contains("Upgrade", Qt::CaseInsensitive)) // RFC 6455 - 1.3. Opening Handshake
406 {
407 LOG(VB_GENERAL, LOG_ERR, "WebSocketWorker::ProcessHandshake() - Invalid 'Connection' header");
408 return false;
409 }
410
411 if (!requestHeaders.contains("upgrade") ||
412 requestHeaders["upgrade"].toLower() != "websocket") // RFC 6455 - 1.3. Opening Handshake
413 {
414 LOG(VB_GENERAL, LOG_ERR, "WebSocketWorker::ProcessHandshake() - Missing or invalid 'Upgrade' header");
415 return false;
416 }
417
418 if (!requestHeaders.contains("sec-websocket-version")) // RFC 6455 - 1.3. Opening Handshake
419 {
420 LOG(VB_GENERAL, LOG_ERR, "WebSocketWorker::ProcessHandshake() - Missing 'Sec-WebSocket-Version' header");
421 return false;
422 }
423
424 if (requestHeaders["sec-websocket-version"] != "13") // RFC 6455 - 1.3. Opening Handshake
425 {
426 LOG(VB_GENERAL, LOG_ERR, QString("WebSocketWorker::ProcessHandshake() - Unsupported WebSocket protocol "
427 "version. We speak '13' the client speaks '%1'")
428 .arg(requestHeaders["sec-websocket-version"]));
429 return false;
430 }
431
432 if (!requestHeaders.contains("sec-websocket-key") ||
433 QByteArray::fromBase64(requestHeaders["sec-websocket-key"].toLatin1()).length() != 16) // RFC 6455 - 1.3. Opening Handshake
434 {
435 LOG(VB_GENERAL, LOG_ERR, "WebSocketWorker::ProcessHandshake() - Missing or invalid 'sec-websocket-key' header");
436 return false;
437 }
438
439 // If we're running the Autobahn fuzz/unit tester then we need to disable
440 // the heartbeat and 'echo' text/binary frames back to the sender
441 if (requestHeaders.contains("user-agent") &&
442 requestHeaders["user-agent"].contains("AutobahnTestSuite"))
443 m_fuzzTesting = true;
444
445 QString key = requestHeaders["sec-websocket-key"];
446 const QString magicString = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
447
448 // Response token - Watch very closely, there's a lot happening here
449 QString acceptToken = QCryptographicHash::hash(key.append(magicString).toUtf8(),
450 QCryptographicHash::Sha1).toBase64();
451
452 QMap<QString, QString> responseHeaders;
453 responseHeaders.insert("Connection", "Upgrade");
454 responseHeaders.insert("Upgrade", "websocket");
455 responseHeaders.insert("Sec-WebSocket-Accept", acceptToken);
456
457 socket->write("HTTP/1.1 101 Switching Protocols\r\n");
458
459 QString header("%1: %2\r\n");
460 for (it = responseHeaders.begin(); it != responseHeaders.end(); ++it)
461 {
462 socket->write(header.arg(it.key(), *it).toLatin1());
463 LOG(VB_HTTP, LOG_INFO, QString("(Response Header) %1: %2")
464 .arg(it.key(), *it));
465 }
466
467 socket->write("\r\n");
468 socket->waitForBytesWritten();
469
470 m_webSocketMode = true;
471 // Start the heart beat
472 if (!m_fuzzTesting)
473 m_heartBeat->start();
474
475 return true;
476}
477
478void WebSocketWorker::ProcessFrames(QTcpSocket *socket)
479{
480 while (m_isRunning && socket && socket->bytesAvailable() >= 2) // No header? Return and wait for more
481 {
482 uint8_t headerSize = 2; // Smallest possible header size is 2 bytes, greatest is 14 bytes
483
484 QByteArray header = socket->peek(headerSize); // Read header to establish validity and size of frame
485
486 WebSocketFrame frame;
487 // FIN
488 frame.m_finalFrame = (bool)(header[0] & 0x80);
489 // Reserved bits
490 if (header.at(0) & 0x70)
491 {
492 LOG(VB_GENERAL, LOG_ERR, "WebSocketWorker::ProcessFrames() "
493 "- Invalid data in reserved fields");
494 SendClose(kCloseProtocolError, "Invalid data in reserved fields");
495 return;
496 }
497 // Operation code
498 uint8_t opCode = (header.at(0) & 0xF);
499 if ((opCode > WebSocketFrame::kOpBinaryFrame &&
500 opCode < WebSocketFrame::kOpClose) ||
501 (opCode > WebSocketFrame::kOpPong))
502 {
503 LOG(VB_GENERAL, LOG_ERR, QString("WebSocketWorker::ProcessFrames() "
504 "- Invalid OpCode (%1)")
505 .arg(QString::number(opCode, 16)));
506 SendClose(kCloseProtocolError, "Invalid OpCode");
507 return;
508 }
509 frame.m_opCode = (WebSocketFrame::OpCode)opCode;
510 frame.m_isMasked = (((header.at(1) >> 7) & 0xFE) != 0);
511
512 if (frame.m_isMasked)
513 headerSize += 4; // Add 4 bytes for the mask
514
515 frame.m_payloadSize = (header.at(1) & 0x7F);
516 // Handle 16 or 64bit payload size headers
517 if (frame.m_payloadSize >= 126)
518 {
519 uint8_t payloadHeaderSize = 2; // 16bit payload size
520 if (frame.m_payloadSize == 127)
521 payloadHeaderSize = 8; // 64bit payload size
522
523 headerSize += payloadHeaderSize; // Add bytes for extended payload size
524
525 if (socket->bytesAvailable() < headerSize)
526 return; // Return and wait for more
527
528 header = socket->peek(headerSize); // Peek the entire header
529 QByteArray payloadHeader = header.mid(2,payloadHeaderSize);
530 frame.m_payloadSize = 0;
531 for (int i = 0; i < payloadHeaderSize; i++)
532 {
533 frame.m_payloadSize |= ((uint64_t)payloadHeader.at(i) << ((payloadHeaderSize - (i + 1)) * 8));
534 }
535 }
536 else
537 {
538 if (socket->bytesAvailable() < headerSize)
539 return; // Return and wait for more
540 header = socket->peek(headerSize); // Peek the entire header including mask
541 }
542
543 while ((uint64_t)socket->bytesAvailable() < (frame.m_payloadSize + header.length()))
544 {
545 if (!socket->waitForReadyRead(2000)) // Wait 2 seconds for the next chunk of the frame
546 {
547
548 m_errorCount++;
549
550 if (m_errorCount == 5)
551 {
552 LOG(VB_GENERAL, LOG_ERR, "WebSocketWorker::ProcessFrames() - Timed out waiting for rest of frame to arrive.");
554 }
555 return;
556 }
557 }
558
561
562 LOG(VB_HTTP, LOG_DEBUG, QString("Read Header: %1").arg(QString(header.toHex())));
563 LOG(VB_HTTP, LOG_DEBUG, QString("Final Frame: %1").arg(frame.m_finalFrame ? "Yes" : "No"));
564 LOG(VB_HTTP, LOG_DEBUG, QString("Op Code: %1").arg(QString::number(frame.m_opCode)));
565 LOG(VB_HTTP, LOG_DEBUG, QString("Payload Size: %1 Bytes").arg(QString::number(frame.m_payloadSize)));
566 LOG(VB_HTTP, LOG_DEBUG, QString("Total Payload Size: %1 Bytes").arg(QString::number( m_readFrame.m_payloadSize)));
567
568 if (!m_fuzzTesting &&
569 frame.m_payloadSize > std::pow(2,20)) // Set 1MB limit on payload per frame
570 {
571 LOG(VB_GENERAL, LOG_ERR, "WebSocketWorker::ProcessFrames() - Frame payload larger than limit of 1MB");
572 SendClose(kCloseTooLarge, "Frame payload larger than limit of 1MB");
573 return;
574 }
575
576 if (!m_fuzzTesting &&
577 m_readFrame.m_payloadSize > std::pow(2,22)) // Set 4MB limit on total payload
578 {
579 LOG(VB_GENERAL, LOG_ERR, "WebSocketWorker::ProcessFrames() - Total payload larger than limit of 4MB");
580 SendClose(kCloseTooLarge, "Total payload larger than limit of 4MB");
581 return;
582 }
583
584 socket->read(headerSize); // Discard header from read buffer
585
586 frame.m_payload = socket->read(frame.m_payloadSize);
587
588 // Unmask payload
589 if (frame.m_isMasked)
590 {
591 frame.m_mask = header.right(4);
592 for (uint i = 0; i < frame.m_payloadSize; i++)
593 frame.m_payload[i] = frame.m_payload.at(i) ^ frame.m_mask[i % 4];
594 }
595
599 {
600 LOG(VB_GENERAL, LOG_ERR, "WebSocketWorker - Incomplete multi-part frame? Expected continuation.");
601 SendClose(kCloseProtocolError, "Incomplete multi-part frame? Expected continuation.");
602 return;
603 }
604
605 // Check control frame validity
606 if (frame.m_opCode >= 0x08)
607 {
608 if (!frame.m_finalFrame)
609 {
610 SendClose(kCloseProtocolError, "Control frames MUST NOT be fragmented");
611 return;
612 }
613 if (frame.m_payloadSize > 125)
614 {
615 SendClose(kCloseProtocolError, "Control frames MUST NOT have payload greater than 125 bytes");
616 return;
617 }
618 }
619
620 switch (frame.m_opCode)
621 {
624 {
625 LOG(VB_GENERAL, LOG_ERR, "WebSocketWorker - Received Continuation Frame out of sequence");
626 SendClose(kCloseProtocolError, "Received Continuation Frame out of sequence");
627 return;
628 }
629
630 m_readFrame.m_payload.append(frame.m_payload);
631
633 {
635 frame = m_readFrame;
636 // Fall through to appropriate handler for complete payload
637 }
638 else
639 {
640 break;
641 }
642 [[fallthrough]];
645 HandleDataFrame(frame);
646 break;
648 SendPong(frame.m_payload);
649 break;
651 break;
653 if (!frame.m_finalFrame)
654 SendClose(kCloseProtocolError, "Control frames MUST NOT be fragmented");
655 else
657 break;
658 default:
659 LOG(VB_GENERAL, LOG_ERR, "WebSocketWorker - Received Unknown Frame Type");
660 break;
661 }
662
663 frame.reset();
664 }
665}
666
668{
669}
670
672{
673 if (frame.m_finalFrame)
674 {
675 switch (frame.m_opCode)
676 {
679 {
680 LOG(VB_GENERAL, LOG_ERR, "WebSocketWorker - Message is not valid UTF-8");
681 SendClose(kCloseBadData, "Message is not valid UTF-8");
682 return;
683 }
684 // For Debugging and fuzz testing
685 if (m_fuzzTesting)
686 SendText(frame.m_payload);
687 for (auto *const extension : std::as_const(m_extensions))
688 {
689 if (extension->HandleTextFrame(frame))
690 break;
691 }
692 break;
694 if (m_fuzzTesting)
695 SendBinary(frame.m_payload);
696 for (auto *const extension : std::as_const(m_extensions))
697 {
698 if (extension->HandleBinaryFrame(frame))
699 break;
700 }
701 break;
702 default:
703 break;
704 }
706 }
707 else
708 {
709 // Start of new fragmented frame
716 }
717}
718
719void WebSocketWorker::HandleCloseConnection(const QByteArray &payload)
720{
721 uint16_t code = kCloseNormal;
722
723 if (payload.length() == 1)
724 {
725 LOG(VB_GENERAL, LOG_ERR, "WebSocketWorker - Invalid close payload");
726 SendClose(kCloseProtocolError, "Invalid close payload");
727 return;
728 }
729
730 if (payload.length() >= 2)
731 {
732 code = (payload[0] << 8);
733 code |= payload[1] & 0xFF;
734 }
735
736 if ((code < 1000) ||
737 ((code > 1003) && (code < 1007)) ||
738 ((code > 1011) && (code < 3000)) ||
739 (code > 4999))
740 {
741 LOG(VB_GENERAL, LOG_ERR, "WebSocketWorker - Invalid close code received");
742 SendClose(kCloseProtocolError, "Invalid close code");
743 return;
744 }
745
746 QString closeMessage;
747 if (payload.length() > 2)
748 {
749 QByteArray messageBytes = payload.mid(2);
750 if (!StringUtil::isValidUTF8(messageBytes))
751 {
752 LOG(VB_GENERAL, LOG_ERR, "WebSocketWorker - Message is not valid UTF-8");
753 SendClose(kCloseBadData, "Message is not valid UTF-8");
754 return;
755 }
756 closeMessage = QString(messageBytes);
757 }
758
759 LOG(VB_HTTP, LOG_INFO, QString("WebSocketWorker - Received CLOSE frame - [%1] %2")
760 .arg(QString::number(code), closeMessage));
761 SendClose((ErrorCode)code);
762}
763
765 const QByteArray &payload)
766{
767 QByteArray frame;
768
769 int payloadSize = payload.length();
770
771 if (payloadSize >= std::pow(2,64))
772 {
773 LOG(VB_GENERAL, LOG_ERR, "WebSocketWorker::CreateFrame() - Payload "
774 "exceeds the allowed size for a single frame");
775 return frame;
776 }
777
778 if (type >= 0x08)
779 {
780 //if (!m_finalFrame)
781 // SendClose(kCloseProtocolError, "Control frames MUST NOT be fragmented");
782 if (payloadSize > 125)
783 {
784 LOG(VB_GENERAL, LOG_ERR, "WebSocketWorker::CreateFrame() - Control frames MUST NOT have payload greater than 125bytes");
785 return frame;
786 }
787 }
788
789 uint16_t header = 0;
790 uint16_t extendedHeader = 0; // For payloads > 125 bytes
791 uint64_t superHeader = 0; // For payloads > 65,535 bytes
792
793 // Only support single frames for now
794 header |= (1 << 15); // FIN (1 bit)
795 // RSV 1 to RSV 3 (1 bit each)
796 header |= (type << 8); // OpCode (4 bits)
797 header |= (0 << 7); // Mask (1 bit) (Off)
798 if (payloadSize < 126)
799 {
800 header |= payloadSize;
801 frame.insert(2, payload.constData(), payloadSize);
802 }
803 else if (payloadSize <= 65535)
804 {
805 header |= 126;
806 extendedHeader = payloadSize;
807 frame.insert(4, payload.constData(), payloadSize);
808 }
809 else
810 {
811 header |= 127;
812 superHeader = payloadSize;
813 frame.insert(10, payload.constData(), payloadSize);
814 }
815
816 frame[0] = (uint8_t)((header >> 8) & 0xFF);
817 frame[1] = (uint8_t)(header & 0xFF);
818 if (extendedHeader > 0)
819 {
820 frame[2] = (extendedHeader >> 8) & 0xFF;
821 frame[3] = extendedHeader & 0xFF;
822 }
823 else if (superHeader > 0)
824 {
825 frame[2] = (superHeader >> 56) & 0xFF;
826 frame[3] = (superHeader >> 48) & 0xFF;
827 frame[4] = (superHeader >> 40) & 0xFF;
828 frame[5] = (superHeader >> 32) & 0xFF;
829 frame[6] = (superHeader >> 24) & 0xFF;
830 frame[7] = (superHeader >> 16) & 0xFF;
831 frame[8] = (superHeader >> 8) & 0xFF;
832 frame[9] = (superHeader & 0xFF);
833 }
834
835 LOG(VB_HTTP, LOG_DEBUG, QString("WebSocketWorker::CreateFrame() - Frame size %1").arg(QString::number(frame.length())));
836
837 return frame;
838}
839
840bool WebSocketWorker::SendFrame(const QByteArray &frame)
841{
842 if (!m_socket || !m_socket->isOpen() || !m_socket->isWritable())
843 {
845 return false;
846 }
847
848 LOG(VB_HTTP, LOG_DEBUG, QString("WebSocketWorker::SendFrame() - '%1'...").arg(QString(frame.left(64).toHex())));
849
850 m_socket->write(frame.constData(), frame.length());
851
852 return true;
853}
854
855bool WebSocketWorker::SendText(const QString &message)
856{
857 return SendText(message.trimmed().toUtf8());
858}
859
860bool WebSocketWorker::SendText(const QByteArray& message)
861{
862 if (!StringUtil::isValidUTF8(message))
863 {
864 LOG(VB_GENERAL, LOG_ERR, QString("WebSocketWorker::SendText('%1...') - "
865 "Text contains invalid UTF-8 character codes. Discarded.")
866 .arg(QString(message.left(20))));
867 return false;
868 }
869
870 QByteArray frame = CreateFrame(WebSocketFrame::kOpTextFrame, message);
871
872 return !frame.isEmpty() && SendFrame(frame);
873}
874
875bool WebSocketWorker::SendBinary(const QByteArray& data)
876{
877 QByteArray frame = CreateFrame(WebSocketFrame::kOpBinaryFrame, data);
878
879 return !frame.isEmpty() && SendFrame(frame);
880}
881
882bool WebSocketWorker::SendPing(const QByteArray& payload)
883{
884 QByteArray frame = CreateFrame(WebSocketFrame::kOpPing, payload);
885
886 return !frame.isEmpty() && SendFrame(frame);
887}
888
889bool WebSocketWorker::SendPong(const QByteArray& payload)
890{
891 QByteArray frame = CreateFrame(WebSocketFrame::kOpPong, payload);
892
893 return !frame.isEmpty() && SendFrame(frame);
894}
895
897 const QString &message)
898{
899 LOG(VB_HTTP, LOG_DEBUG, "WebSocketWorker::SendClose()");
900 QByteArray payload;
901 payload.resize(2);
902 payload[0] = (uint8_t)(errCode >> 8);
903 payload[1] = (uint8_t)(errCode & 0xFF);
904
905 if (!message.isEmpty())
906 payload.append(message.toUtf8());
907
908 QByteArray frame = CreateFrame(WebSocketFrame::kOpClose, payload);
909
910 if (!frame.isEmpty() && SendFrame(frame))
911 {
913 return true;
914 }
915
917 return false;
918}
919
921{
922 SendPing(QByteArray("HeartBeat"));
923}
924
926{
927 if (!extension)
928 return;
929
930 connect(extension, &WebSocketExtension::SendTextMessage,
931 this, qOverload<const QString &>(&WebSocketWorker::SendText));
932 connect(extension, &WebSocketExtension::SendBinaryMessage,
934
935 m_extensions.append(extension);
936}
937
939{
940 if (!extension)
941 return;
942
943 for (auto it = m_extensions.begin(); it != m_extensions.end(); ++it)
944 {
945 if ((*it) == extension)
946 {
947 it = m_extensions.erase(it);
948 break;
949 }
950 }
951}
952
953
int maxThreadCount(void) const
void setMaxThreadCount(int maxThreadCount)
void startReserved(QRunnable *runnable, const QString &debugName, std::chrono::milliseconds waitForAvailMS=0ms)
void Stop(void)
bool CheckSubnet(const QAbstractSocket *socket)
Check if a socket is connected to an approved peer.
void setMaxPendingConnections(int n)
Definition: serverpool.h:94
Base class for extensions.
Definition: websocket.h:138
void SendTextMessage(const QString &)
void SendBinaryMessage(const QByteArray &)
A representation of a single WebSocket frame.
Definition: websocket.h:77
void reset(void)
Definition: websocket.h:90
bool m_fragmented
Definition: websocket.h:120
QByteArray m_mask
Definition: websocket.h:119
QByteArray m_payload
Definition: websocket.h:115
bool m_finalFrame
Definition: websocket.h:114
uint64_t m_payloadSize
Definition: websocket.h:116
OpCode m_opCode
Definition: websocket.h:117
Extension for sending MythEvents over WebSocketServer.
The WebSocket server, which listens for connections.
Definition: websocket.h:39
QReadWriteLock m_rwlock
Definition: websocket.h:58
QSslConfiguration m_sslConfig
Definition: websocket.h:63
~WebSocketServer() override
Definition: websocket.cpp:43
MThreadPool m_threadPool
Definition: websocket.h:59
bool IsRunning(void) const
Definition: websocket.h:46
void newTcpConnection(qintptr socket) override
Definition: websocket.cpp:52
The thread in which WebSocketWorker does it's thing.
Definition: websocket.h:159
WebSocketServer & m_webSocketServer
Definition: websocket.h:172
PoolServerType m_connectionType
Definition: websocket.h:174
WebSocketWorkerThread(WebSocketServer &webSocketServer, qintptr sock, PoolServerType type, const QSslConfiguration &sslConfig)
Definition: websocket.cpp:73
QSslConfiguration m_sslConfig
Definition: websocket.h:176
void run(void) override
Definition: websocket.cpp:88
Performs all the protocol-level work for a single websocket connection.
Definition: websocket.h:199
void HandleCloseConnection(const QByteArray &payload)
Definition: websocket.cpp:719
void SendHeartBeat()
Definition: websocket.cpp:920
bool SendBinary(const QByteArray &data)
Definition: websocket.cpp:875
void HandleDataFrame(const WebSocketFrame &frame)
Definition: websocket.cpp:671
void ProcessFrames(QTcpSocket *socket)
Returns false if an error occurs.
Definition: websocket.cpp:478
bool m_webSocketMode
Definition: websocket.h:281
void CleanupSocket()
Definition: websocket.cpp:226
QSslConfiguration m_sslConfig
Definition: websocket.h:290
QList< WebSocketExtension * > m_extensions
Definition: websocket.h:295
QTimer * m_heartBeat
Definition: websocket.h:287
void CloseConnection()
Definition: websocket.cpp:155
static QByteArray CreateFrame(WebSocketFrame::OpCode type, const QByteArray &payload)
Definition: websocket.cpp:764
void RegisterExtension(WebSocketExtension *extension)
Definition: websocket.cpp:925
WebSocketServer & m_webSocketServer
Definition: websocket.h:275
qintptr m_socketFD
Definition: websocket.h:276
bool SendPong(const QByteArray &payload)
Definition: websocket.cpp:889
uint8_t m_errorCount
Definition: websocket.h:284
QTcpSocket * m_socket
Definition: websocket.h:277
WebSocketFrame m_readFrame
Definition: websocket.h:282
QEventLoop * m_eventLoop
Definition: websocket.h:274
bool SendClose(ErrorCode errCode, const QString &message=QString())
Definition: websocket.cpp:896
void HandleControlFrame(const WebSocketFrame &frame)
Returns false if an error occurs.
Definition: websocket.cpp:667
bool SendText(const QString &message)
Definition: websocket.cpp:855
WebSocketWorker(WebSocketServer &webSocketServer, qintptr sock, PoolServerType type, const QSslConfiguration &sslConfig)
Definition: websocket.cpp:105
void DeregisterExtension(WebSocketExtension *extension)
Definition: websocket.cpp:938
bool SendFrame(const QByteArray &frame)
Definition: websocket.cpp:840
bool m_fuzzTesting
Definition: websocket.h:293
void SetupSocket()
Definition: websocket.cpp:164
bool SendPing(const QByteArray &payload)
Definition: websocket.cpp:882
PoolServerType m_connectionType
Definition: websocket.h:278
bool ProcessHandshake(QTcpSocket *socket)
Definition: websocket.cpp:315
~WebSocketWorker() override
Definition: websocket.cpp:132
unsigned int uint
Definition: compat.h:68
unsigned short uint16_t
Definition: iso6937tables.h:3
MythCoreContext * gCoreContext
This global variable contains the MythCoreContext instance for the app.
#define LOG(_MASK_, _LEVEL_, _QSTRING_)
Definition: mythlogging.h:39
MBASE_PUBLIC bool isValidUTF8(const QByteArray &data)
Definition: stringutil.cpp:47
bool
Definition: pxsup2dast.c:31
PoolServerType
Definition: serverpool.h:30
@ kSSLServer
Definition: serverpool.h:33
@ kTCPServer
Definition: serverpool.h:31