Skykey's Home

Skykey的私人博客ᕕ( ᐛ )ᕗ

Qt实现简易心跳包机制

Qt实现简易心跳包机制

因为网络通信的不稳定性,在一些实时性、网络稳定性要求较高的情境下,我们需要一个实时检测客户端与服务端之间的连接状态和通信功能是否运行正常的机制,心跳包机制便是其中之一。接下来我会讲一下在之前的一个小项目中,利用Qt实现的简易心跳包检测机制。

架构

客户端

服务端

心跳包检测的原理是客户端定时向服务端发送心跳包,服务端收到心跳包后立即回复客户端。此间客户端检查心跳包是否发送成功以及是否超时。在一些实时性要求比较高的情境下,检测心跳包超时是有必要的。可以通过记录等待回复的心跳包数来实现检测心跳包超时。

客户端

启动定时器

客户端的核心在于定时发送心跳包,这里我们利用一个TcpHeart类来实现定时发送心跳包。

TcpHeart类声明如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#include <QObject>
#include <QTimer>

class TcpHeart : public QObject {
Q_OBJECT
public:
explicit TcpHeart(QObject* parent = 0);
~TcpHeart() noexcept;

public:
void startHeartTimer(); // 启动定时器

signals:
void sigHeartBad(); // 心跳包错误信号
void sigHeartReq(); // 发送心跳包信号

private slots:
void slotTimeOut(); // 定时事件

public slots:
void slotHeartBack(); // 收到服务端心跳包回复

private:
QTimer* m_heart_timer;
int m_count; // 等待回复心跳包累计数
};

TcpHeart核心是利用QTimer实现一个定时任务:启动一个定时器,定时器timeOut消息发出后,检测当前等待回复的心跳包数量。若等待回复的心跳包数量≤3,则可以继续发送心跳包,并增加等待回复的心跳包数量;否则采取心跳包超时处理,停止计时器并发送心跳包错误消息。

定时事件slotTimeOut()实现如下:

1
2
3
4
5
6
7
8
9
10
void TcpHeart::slotTimeOut() {
if (m_count > 2) {
m_count = 0;
m_heart_timer->stop();
emit sigHeartBad();
return;
}
m_count++;
emit sigHeartReq();
}

在收到服务端心跳包回复后,我们需要将等待回复的心跳包计数清零。这里需要直接将计数清零,因为每次收到回复后,超时检测机制都应重新启动。

1
2
3
void TcpHeart::slotHeartBack() {
m_count = 0;
}

TcpHeart的完整实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
TcpHeart::TcpHeart(QObject *parent)
: QObject(parent) {
m_heart_timer = new QTimer(this);
m_count = 0;

connect(m_heart_timer, &QTimer::timeout, this, &TcpHeart::slotTimeOut);
}

TcpHeart::~TcpHeart() noexcept {
}

void TcpHeart::startHeartTimer() {
m_heart_timer->start(2000);
}

void TcpHeart::slotTimeOut() {
if (m_count > 2) {
m_count = 0;
m_heart_timer->stop();
emit sigHeartBad();
return;
}
m_count++;
emit sigHeartReq();
}

void TcpHeart::slotHeartBack() {
m_count = 0;
}

发送心跳包

正式实现心跳包机制的类是StatusClientStatusClient类声明如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
#include <QDebug>
#include <QObject>
#include <QTcpSocket>

#include "TcpHeart.h"
#include "json.hpp"

class StatusClient : public QObject {
Q_OBJECT
public:
explicit StatusClient(const QString host, const int port, QObject *parent = nullptr);

public slots:
void connectedToServer();

signals:
void sigHeartBack(); // 心跳包返回
void signalDisconnectedToServer();// 心跳包断连
private slots:
void slotStatusReadyRead(); // 客户端状态信道
void slotWriteHeartSocket();// 写心跳包
void slotHeartBad(); // 心跳包掉线处理

private:
// 客户端状态信道
QTcpSocket *_statusSocket;// 客户端状态信道
TcpHeart *heart;

private:
QString m_host;
int m_port;
};

在我之前的那个小项目中,客户端服务端利用JSON进行通信,定义了一些双方之间通信的一些消息格式,如code字段区分正常消息与心跳包消息。

StatusClient首先与心跳包服务器建立链接,在收到服务器端的”OK”消息后,便利用TcpHeart类开启定时器,定时向服务器发送心跳包。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
void StatusClient::slotStatusReadyRead() {
// 收到服务端发来的客户端状态查询
QByteArray data;
data = _statusSocket->readAll();

using Json = nlohmann::json;
// 判断是否为状态查询
QString dataStr = QString::fromUtf8(data);
Json json = Json::parse(dataStr.toUtf8(), nullptr, false);
int code = json["code"].get<int>();

if (code == 2) {
QString msgStr = QString::fromStdString(json["Msg"].get<std::string>());

if (msgStr == "OK")// 服务器返回"OK",开启心跳包检测
{
heart = new TcpHeart;
// 开始心跳检测
heart->startHeartTimer();
connect(heart, &TcpHeart::sigHeartReq, this, &StatusClient::slotWriteHeartSocket);// 发送心跳包
connect(this, &StatusClient::sigHeartBack, heart, &TcpHeart::slotHeartBack);

// 处理心跳包异常
connect(heart, &TcpHeart::sigHeartBad, this, &StatusClient::slotHeartBad);
}

...

}
}

向服务端定时发送心跳包消息,若发送失败,进行心跳包掉线处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
void StatusClient::slotWriteHeartSocket() {
QByteArray data;
// 构建状态返回
using Json = nlohmann::json;
Json jsonValue;
jsonValue["code"] = 2;
jsonValue["Msg"] = "Heart";
data = QString::fromStdString(jsonValue.dump(2)).toUtf8();

// 状态信道,向服务端发送心跳包
qDebug() << this->thread() << "发送一次心跳包";

// 状态信道
bool ret = _statusSocket->write(data);
_statusSocket->waitForBytesWritten();

if (!ret) {
qDebug() << "发送心跳包失败";
emit signalDisconnectedToServer();
}
}

若从客户端收到心跳包回复,则重启TcpHeart对象的超时检测机制:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
void StatusClient::slotStatusReadyRead() {
// 收到服务端发来的客户端状态查询
QByteArray data;
data = _statusSocket->readAll();

using Json = nlohmann::json;
// 判断是否为状态查询
QString dataStr = QString::fromUtf8(data);
Json json = Json::parse(dataStr.toUtf8(), nullptr, false);
int code = json["code"].get<int>();

if (code == 2) {
QString msgStr = QString::fromStdString(json["Msg"].get<std::string>());

...

// 心跳反馈
if (msgStr == "HEART_BACK") {
qDebug() << "HEART BACK ONCE";
emit sigHeartBack();
}
}
}

心跳包掉线处理

收到TcpHeart的心跳包超时消息后,进行相应的掉线处理:

1
2
3
4
5
6
void StatusClient::slotHeartBad() {
// 断线处理
qDebug() << "心跳包断线";
_statusSocket->disconnectFromHost();
emit signalDisconnectedToServer();
}

完整实现

客户端完整实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
StatusClient::StatusClient(const QString host, const int port, QObject *parent)
: m_host(host),
m_port(port),
QObject(parent) {
}

void StatusClient::connectedToServer() {
_statusSocket = new QTcpSocket;

connect(_statusSocket, &QTcpSocket::readyRead, this, &StatusClient::slotStatusReadyRead);
connect(_statusSocket, &QTcpSocket::disconnected, this, &StatusClient::signalDisconnectedToServer);

_statusSocket->connectToHost(m_host, m_port);

if (!_statusSocket->waitForConnected()) {
emit signalDisconnectedToServer();
}
}

void StatusClient::slotStatusReadyRead() {
// 收到服务端发来的客户端状态查询
QByteArray data;
data = _statusSocket->readAll();

using Json = nlohmann::json;
// 判断是否为状态查询
QString dataStr = QString::fromUtf8(data);
Json json = Json::parse(dataStr.toUtf8(), nullptr, false);
int code = json["code"].get<int>();

if (code == 2) {
QString msgStr = QString::fromStdString(json["Msg"].get<std::string>());

if (msgStr == "OK")// 服务器返回"OK",开启心跳包检测
{
heart = new TcpHeart;
// 开始心跳检测
heart->startHeartTimer();
connect(heart, &TcpHeart::sigHeartReq, this, &StatusClient::slotWriteHeartSocket);// 发送心跳包
connect(this, &StatusClient::sigHeartBack, heart, &TcpHeart::slotHeartBack);

// 处理心跳包异常
connect(heart, &TcpHeart::sigHeartBad, this, &StatusClient::slotHeartBad);
}

// 心跳反馈
if (msgStr == "HEART_BACK") {
qDebug() << "HEART BACK ONCE";
emit sigHeartBack();
}
}
}

void StatusClient::slotWriteHeartSocket() {
QByteArray data;
// 构建状态返回
using Json = nlohmann::json;
Json jsonValue;
jsonValue["code"] = 2;
jsonValue["Msg"] = "Heart";
data = QString::fromStdString(jsonValue.dump(2)).toUtf8();

// 状态信道,向服务端发送心跳包
qDebug() << this->thread() << "发送一次心跳包";

// 状态信道
bool ret = _statusSocket->write(data);
_statusSocket->waitForBytesWritten();

if (!ret) {
qDebug() << "发送心跳包失败";
emit signalDisconnectedToServer();
}
}

void StatusClient::slotHeartBad() {
// 断线处理
qDebug() << "心跳包断线";
_statusSocket->disconnectFromHost();
emit signalDisconnectedToServer();
}

服务端

服务端StatusServer声明如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
#include <QDebug>
#include <QObject>
#include <QTcpServer>
#include <QTcpSocket>

#include "json.hpp"

class StatusServer : public QObject {
Q_OBJECT
public:
explicit StatusServer(const int statusPort, QObject *parent = nullptr);

signals:
void serverEstablished();
void serverError();
public slots:
void establishServer();

private slots:
void slotNewStatusConnection();// 状态信道链接
void slotStatusReadyRead();

private:
// 状态信道
QTcpServer *_tcpStatusServer;
QList<QTcpSocket *> _tcpStatusClients;

private:
int m_statusPort;
};

服务端逻辑很简单,启动监听,保存客户端的链接,向客户端发送心跳初始化消息并回复客户端的心跳包。完整实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
StatusServer::StatusServer(const int statusPort, QObject *parent)
: m_statusPort(statusPort),
QObject(parent) {
}

void StatusServer::establishServer() {
_tcpStatusServer = new QTcpServer;
bool ret = _tcpStatusServer->listen(QHostAddress::Any, m_statusPort);

if (!ret) {
emit serverError();
qDebug() << "状态服务器建立失败";
} else
emit serverEstablished();

connect(_tcpStatusServer, &QTcpServer::newConnection, this, &StatusServer::slotNewStatusConnection);
}

void StatusServer::slotNewStatusConnection() {
// 处理状态信道的新连接
QTcpSocket *currentSocket = _tcpStatusServer->nextPendingConnection();
_tcpStatusClients.push_back(currentSocket);

// 向客户端发送初始化消息,启动心跳包
using Json = nlohmann::json;
QByteArray sendData;
Json sendJsonData;
sendJsonData["code"] = 2;
sendJsonData["Msg"] = "OK";
sendData = QString::fromStdString(sendJsonData.dump(2)).toUtf8();
currentSocket->write(sendData);
currentSocket->flush();
currentSocket->waitForBytesWritten();

connect(currentSocket, &QTcpSocket::readyRead, this, &StatusServer::slotStatusReadyRead);
connect(currentSocket, &QTcpSocket::disconnected, [=, this]() {
_tcpStatusClients.removeAll(currentSocket);
});
}

void StatusServer::slotStatusReadyRead() {
qDebug() << "心跳包thread" << this->thread();
// 状态信道,处理客户端发来的心跳包
QTcpSocket *currentSocket = (QTcpSocket *) sender();
QByteArray msgData = currentSocket->readAll();

using Json = nlohmann::json;
Json jsonData = Json::parse(msgData.data(), nullptr, false);
int code = jsonData["code"].get<int>();

if (code == 2) {
qDebug() << currentSocket << "收到一次心跳包";
QString msg = QString::fromStdString(jsonData["Msg"].get<std::string>());
if (msg == "Heart") {
QByteArray sendData;
Json sendJsonData;
sendJsonData["code"] = 2;
sendJsonData["Msg"] = "HEART_BACK";
sendData = QString::fromStdString(sendJsonData.dump(2)).toUtf8();
currentSocket->write(sendData);
currentSocket->flush();
currentSocket->waitForBytesWritten();
}
}
}