Qt实现简易心跳包机制
因为网络通信的不稳定性,在一些实时性、网络稳定性要求较高的情境下,我们需要一个实时检测客户端与服务端之间的连接状态和通信功能是否运行正常的机制,心跳包机制便是其中之一。接下来我会讲一下在之前的一个小项目中,利用Qt实现的简易心跳包检测机制。
架构
心跳包检测的原理是客户端定时向服务端发送心跳包,服务端收到心跳包后立即回复客户端。此间客户端检查心跳包是否发送成功以及是否超时。在一些实时性要求比较高的情境下,检测心跳包超时是有必要的。可以通过记录等待回复的心跳包数来实现检测心跳包超时。
客户端
启动定时器
客户端的核心在于定时发送心跳包,这里我们利用一个TcpHeart类来实现定时发送心跳包。
TcpHeart类声明如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| #include <QObject> #include <QTimer>
class TcpHeart : public QObject { Q_OBJECT public: explicit TcpHeart(QObject* parent = 0); ~TcpHeart() noexcept;
public: void startHeartTimer();
signals: void sigHeartBad(); void sigHeartReq();
private slots: void slotTimeOut();
public slots: void slotHeartBack();
private: QTimer* m_heart_timer; int m_count; };
|
TcpHeart核心是利用QTimer实现一个定时任务:启动一个定时器,定时器timeOut消息发出后,检测当前等待回复的心跳包数量。若等待回复的心跳包数量≤3,则可以继续发送心跳包,并增加等待回复的心跳包数量;否则采取心跳包超时处理,停止计时器并发送心跳包错误消息。
定时事件slotTimeOut()
实现如下:
1 2 3 4 5 6 7 8 9 10
| void TcpHeart::slotTimeOut() { if (m_count > 2) { m_count = 0; m_heart_timer->stop(); emit sigHeartBad(); return; } m_count++; emit sigHeartReq(); }
|
在收到服务端心跳包回复后,我们需要将等待回复的心跳包计数清零。这里需要直接将计数清零,因为每次收到回复后,超时检测机制都应重新启动。
1 2 3
| void TcpHeart::slotHeartBack() { m_count = 0; }
|
TcpHeart的完整实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| TcpHeart::TcpHeart(QObject *parent) : QObject(parent) { m_heart_timer = new QTimer(this); m_count = 0;
connect(m_heart_timer, &QTimer::timeout, this, &TcpHeart::slotTimeOut); }
TcpHeart::~TcpHeart() noexcept { }
void TcpHeart::startHeartTimer() { m_heart_timer->start(2000); }
void TcpHeart::slotTimeOut() { if (m_count > 2) { m_count = 0; m_heart_timer->stop(); emit sigHeartBad(); return; } m_count++; emit sigHeartReq(); }
void TcpHeart::slotHeartBack() { m_count = 0; }
|
发送心跳包
正式实现心跳包机制的类是StatusClient。StatusClient类声明如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| #include <QDebug> #include <QObject> #include <QTcpSocket>
#include "TcpHeart.h" #include "json.hpp"
class StatusClient : public QObject { Q_OBJECT public: explicit StatusClient(const QString host, const int port, QObject *parent = nullptr);
public slots: void connectedToServer();
signals: void sigHeartBack(); void signalDisconnectedToServer(); private slots: void slotStatusReadyRead(); void slotWriteHeartSocket(); void slotHeartBad();
private: QTcpSocket *_statusSocket; TcpHeart *heart;
private: QString m_host; int m_port; };
|
在我之前的那个小项目中,客户端服务端利用JSON进行通信,定义了一些双方之间通信的一些消息格式,如code字段区分正常消息与心跳包消息。
StatusClient首先与心跳包服务器建立链接,在收到服务器端的”OK”消息后,便利用TcpHeart类开启定时器,定时向服务器发送心跳包。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| void StatusClient::slotStatusReadyRead() { QByteArray data; data = _statusSocket->readAll();
using Json = nlohmann::json; QString dataStr = QString::fromUtf8(data); Json json = Json::parse(dataStr.toUtf8(), nullptr, false); int code = json["code"].get<int>();
if (code == 2) { QString msgStr = QString::fromStdString(json["Msg"].get<std::string>());
if (msgStr == "OK") { heart = new TcpHeart; heart->startHeartTimer(); connect(heart, &TcpHeart::sigHeartReq, this, &StatusClient::slotWriteHeartSocket); connect(this, &StatusClient::sigHeartBack, heart, &TcpHeart::slotHeartBack);
connect(heart, &TcpHeart::sigHeartBad, this, &StatusClient::slotHeartBad); } ... } }
|
向服务端定时发送心跳包消息,若发送失败,进行心跳包掉线处理:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| void StatusClient::slotWriteHeartSocket() { QByteArray data; using Json = nlohmann::json; Json jsonValue; jsonValue["code"] = 2; jsonValue["Msg"] = "Heart"; data = QString::fromStdString(jsonValue.dump(2)).toUtf8();
qDebug() << this->thread() << "发送一次心跳包";
bool ret = _statusSocket->write(data); _statusSocket->waitForBytesWritten();
if (!ret) { qDebug() << "发送心跳包失败"; emit signalDisconnectedToServer(); } }
|
若从客户端收到心跳包回复,则重启TcpHeart对象的超时检测机制:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| void StatusClient::slotStatusReadyRead() { QByteArray data; data = _statusSocket->readAll();
using Json = nlohmann::json; QString dataStr = QString::fromUtf8(data); Json json = Json::parse(dataStr.toUtf8(), nullptr, false); int code = json["code"].get<int>();
if (code == 2) { QString msgStr = QString::fromStdString(json["Msg"].get<std::string>());
...
if (msgStr == "HEART_BACK") { qDebug() << "HEART BACK ONCE"; emit sigHeartBack(); } } }
|
心跳包掉线处理
收到TcpHeart的心跳包超时消息后,进行相应的掉线处理:
1 2 3 4 5 6
| void StatusClient::slotHeartBad() { qDebug() << "心跳包断线"; _statusSocket->disconnectFromHost(); emit signalDisconnectedToServer(); }
|
完整实现
客户端完整实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81
| StatusClient::StatusClient(const QString host, const int port, QObject *parent) : m_host(host), m_port(port), QObject(parent) { }
void StatusClient::connectedToServer() { _statusSocket = new QTcpSocket;
connect(_statusSocket, &QTcpSocket::readyRead, this, &StatusClient::slotStatusReadyRead); connect(_statusSocket, &QTcpSocket::disconnected, this, &StatusClient::signalDisconnectedToServer);
_statusSocket->connectToHost(m_host, m_port);
if (!_statusSocket->waitForConnected()) { emit signalDisconnectedToServer(); } }
void StatusClient::slotStatusReadyRead() { QByteArray data; data = _statusSocket->readAll();
using Json = nlohmann::json; QString dataStr = QString::fromUtf8(data); Json json = Json::parse(dataStr.toUtf8(), nullptr, false); int code = json["code"].get<int>();
if (code == 2) { QString msgStr = QString::fromStdString(json["Msg"].get<std::string>());
if (msgStr == "OK") { heart = new TcpHeart; heart->startHeartTimer(); connect(heart, &TcpHeart::sigHeartReq, this, &StatusClient::slotWriteHeartSocket); connect(this, &StatusClient::sigHeartBack, heart, &TcpHeart::slotHeartBack);
connect(heart, &TcpHeart::sigHeartBad, this, &StatusClient::slotHeartBad); }
if (msgStr == "HEART_BACK") { qDebug() << "HEART BACK ONCE"; emit sigHeartBack(); } } }
void StatusClient::slotWriteHeartSocket() { QByteArray data; using Json = nlohmann::json; Json jsonValue; jsonValue["code"] = 2; jsonValue["Msg"] = "Heart"; data = QString::fromStdString(jsonValue.dump(2)).toUtf8();
qDebug() << this->thread() << "发送一次心跳包";
bool ret = _statusSocket->write(data); _statusSocket->waitForBytesWritten();
if (!ret) { qDebug() << "发送心跳包失败"; emit signalDisconnectedToServer(); } }
void StatusClient::slotHeartBad() { qDebug() << "心跳包断线"; _statusSocket->disconnectFromHost(); emit signalDisconnectedToServer(); }
|
服务端
服务端StatusServer声明如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| #include <QDebug> #include <QObject> #include <QTcpServer> #include <QTcpSocket>
#include "json.hpp"
class StatusServer : public QObject { Q_OBJECT public: explicit StatusServer(const int statusPort, QObject *parent = nullptr);
signals: void serverEstablished(); void serverError(); public slots: void establishServer();
private slots: void slotNewStatusConnection(); void slotStatusReadyRead();
private: QTcpServer *_tcpStatusServer; QList<QTcpSocket *> _tcpStatusClients;
private: int m_statusPort; };
|
服务端逻辑很简单,启动监听,保存客户端的链接,向客户端发送心跳初始化消息并回复客户端的心跳包。完整实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
| StatusServer::StatusServer(const int statusPort, QObject *parent) : m_statusPort(statusPort), QObject(parent) { }
void StatusServer::establishServer() { _tcpStatusServer = new QTcpServer; bool ret = _tcpStatusServer->listen(QHostAddress::Any, m_statusPort);
if (!ret) { emit serverError(); qDebug() << "状态服务器建立失败"; } else emit serverEstablished();
connect(_tcpStatusServer, &QTcpServer::newConnection, this, &StatusServer::slotNewStatusConnection); }
void StatusServer::slotNewStatusConnection() { QTcpSocket *currentSocket = _tcpStatusServer->nextPendingConnection(); _tcpStatusClients.push_back(currentSocket);
using Json = nlohmann::json; QByteArray sendData; Json sendJsonData; sendJsonData["code"] = 2; sendJsonData["Msg"] = "OK"; sendData = QString::fromStdString(sendJsonData.dump(2)).toUtf8(); currentSocket->write(sendData); currentSocket->flush(); currentSocket->waitForBytesWritten();
connect(currentSocket, &QTcpSocket::readyRead, this, &StatusServer::slotStatusReadyRead); connect(currentSocket, &QTcpSocket::disconnected, [=, this]() { _tcpStatusClients.removeAll(currentSocket); }); }
void StatusServer::slotStatusReadyRead() { qDebug() << "心跳包thread" << this->thread(); QTcpSocket *currentSocket = (QTcpSocket *) sender(); QByteArray msgData = currentSocket->readAll();
using Json = nlohmann::json; Json jsonData = Json::parse(msgData.data(), nullptr, false); int code = jsonData["code"].get<int>();
if (code == 2) { qDebug() << currentSocket << "收到一次心跳包"; QString msg = QString::fromStdString(jsonData["Msg"].get<std::string>()); if (msg == "Heart") { QByteArray sendData; Json sendJsonData; sendJsonData["code"] = 2; sendJsonData["Msg"] = "HEART_BACK"; sendData = QString::fromStdString(sendJsonData.dump(2)).toUtf8(); currentSocket->write(sendData); currentSocket->flush(); currentSocket->waitForBytesWritten(); } } }
|