|

楼主 |
发表于 2022-4-3 16:30:51
|
显示全部楼层
mqtt,服务端代码。就几行。
node的脚本,跟前端非常配合,用的很舒服,比java方便多了。
//mqtt.js
var mosca = require('mosca'); //构建服务器
var request = require('request');
var mysql = require('mysql');
var connection = initializeConnection({
host: 'localhost',
user: 'root',
password: '111111111111111111',
port: '3306',
database: 'hbrely'//上文说的名字
});
// 此处配置mqtt的http客户端,尤为重要,http.port则为你HTTP访问端口,可以在网页上输入127.0.0.1:8888(端口),能请求成功则配置成功
var MqttServer = new mosca.Server({
port: 8083,
http: {
port: 8090,
bundle: true,
static: './'
}
});
var clientList=[];
var pushData = {
title: "eee",
content:"bbb",
cid:"d3bb266fc9c7bf5274fffef5508534bf",//个推的cid,有它就可以单独推送。实现离线消息
//cid:"131a25fb6faa1d5af35c3f4510b8ed9f",
payload: JSON.stringify({receiver:"18031151041",fName:"张磊",fImage:"u1.jpg"}),
};
//监听链接
MqttServer.on("clientConnected", function(client) {
console.log("客户端上线", client.id,"时间:"+new Date().toLocaleString());
/*
var cli=clientList.indexOf(client.id);
if(cli==-1)
{
clientList.push(client.id);
}
*/
console.log(clientList);
});
//监听订阅
MqttServer.on("subscribed", function(client) {
console.log(client);
console.log("app运行", client,"时间:"+new Date().toLocaleString());
var cli=clientList.indexOf(client);
if(cli==-1)
{
clientList.push(client);
}
console.log(clientList);
});
//监听链接
MqttServer.on("clientDisconnected", function(client) {
console.log("客户端离线", client.id,"时间:"+new Date().toLocaleString());
var cli=clientList.indexOf(client.id);
if(cli>-1)
{
clientList.splice(cli,1);
}
console.log(clientList);
});
//监听订阅
MqttServer.on("unsubscribed", function(client) {
console.log("客户端背后运行", client,"时间:"+new Date().toLocaleString());
var cli=clientList.indexOf(client);
if(cli>-1)
{
clientList.splice(cli,1);
}
console.log(clientList);
});
//监听mqtt的主题消息
MqttServer.on("published", function(packet, client) { //当客户端有连接的时候,发布主题消息
var topic = packet.topic;
// console.log(client,":客户端");
//console.log(packet,"消息体");
if(client!=undefined)
{
var message=JSON.parse(packet.payload.toString());
console.log(message);
saveMessage(message);
if(message.type<10)
{
var callback={
id:message.id,
state:1,
type:-1
}
console.log((message.platform + message.from),"时间:"+new Date().toLocaleString());
MqttServer.publish({
topic: (message.platform + message.from),
payload: JSON.stringify(callback),
});
}//通知类消息不做回执
var index=clientList.indexOf("H5_"+topic);
console.log(message,":消息");
if(index>-1)
{
topic="H5_"+topic;
}else
{
index=clientList.indexOf("App_"+topic);
if(index>-1)
{
topic="App_"+topic;
}else
{
pushData.title=message.name
pushData.content=message.message
pushData.cid=message.cid
push();
return;
}
}
//mqtt转发主题消息
MqttServer.publish({
topic: topic,
payload: packet.payload.toString(),
});
}else
{
/*
pushData.title=message.name
pushData.content=message.message
pushData.cid=message.cid
push();
*/
}
});
function saveMessage (data) {
var addSql = 'INSERT Ignore INTO hb_im(fID,from_user,to_user,content,type,platform,cid,name,touxiang) VALUES(?,?,?,?,?,?,?,?,?)';
var addSqlParams = [data.id,data.from,data.to,data.message,data.type,data.platform,data.cid,data.name,data.touxiang]; //这是想增加的数据
connection.query(addSql, addSqlParams, function (err, res) { //询问访问数据库,也就是去嫩那个数据库
if (err) { //失败就报个错
console.log('[INSERT ERROR] - ', err.message);
}
console.log("数据库增的结果:");
console.log(res);
});
}
function initializeConnection(config) {
function addDisconnectHandler(connection) {
connection.on("error", function (error) {
if (error instanceof Error) {
if (error.code === "PROTOCOL_CONNECTION_LOST") {
console.error(error.stack);
console.log("Lost connection. Reconnecting...");
initializeConnection(connection.config);
} else if (error.fatal) {
throw error;
}
}
});
}
var connection = mysql.createConnection(config);
// Add handlers.
addDisconnectHandler(connection);
connection.connect();
return connection;
}
function push()
{
request({
url: "https://www.hbrely.net/baas/Push/UniPush/push",
method: "POST",
json: true,
headers: {
"content-type": "application/json",
},
body: pushData
}, function(error, response, body) {
// console.log(response.statusCode)
if (!error && response.statusCode == 200) {
// console.log(body) // 请求成功的处理逻辑
}
});
}
MqttServer.on('ready', function() {
console.log("mqtt is running....");
});
|
|