一、功能实现
完成平台基础数据配置,MQ服务,数据流转(网关读取设备数据,自定义报文上传,触发器判断,自定义报文下发,网关写入设备数据)
JSON串转换过程
网关发送编码 {"ts":"2024-09-05T03:03:40.174Z","d":[{"tag":"40105","value":50}]}
IoT接收解码 {"temperature":50}
IoT触发规则(写入设备) {"ts":"2024-09-05T03:03:40.301Z","w":[{"tag":"00105","value":true}]}
网关接收解码 >> {"GateCode":"---","Equips":[{"EquipCode":"---","Tags":[{"TagCode":"00105","TagValue":true}]}]}
*** 麻雀虽小,五章俱全 ***
二、使用技术
HslCommunication:用于跟Modbus交互
LiteDB:非关系型数据库,用于存储设备模型(属性,触发器),存储告警日志,进行分页查询
Microsoft.ClearScript.V8:在js文件里自定义编解码规则,打通数据包格式
DynamicExpresso.Core:规则引擎,评判触发条件
MQTTnet:MQServer/MQClient,用于订阅/发布消息
三、实现效果
【设备模拟器】- Modbus TCP
场景测试(变化上报):
1. 轮询Modbus设备点位 -> 检测到数据变化 -> 发布mq报文(自定义编码)
2. IoT订阅接收报文 -> 自定义解码 -> 规则引擎校验 -> 发布mq报文(自定义编码)
3. IoT订阅接收报文 -> 自定义解码 -> 写入点位值
场景测试(定时上报):
四、核心代码
【设备层】边缘网关
using HslCommunication;
using HslCommunication.Core;
using HslCommunication.Core.Device;
using HslCommunication.ModBus;
using IotDataFlow.Section.gate.model;
using IotDataFlow.Section.iot.model;
using IotDataFlow.Util;
using Microsoft.ClearScript.V8;
using MQTTnet.Client;
using System.Text;
using System.Text.Json;
namespace IotDataFlow.Section.gate
{
/// <summary>
/// 【设备层】边缘网关
/// </summary>
public class Gate
{
static GateModel GateModel_Time = new GateModel();
static GateModel GateModel_Change = new GateModel();
static Dictionary<string, EquipModel> DicTag = new Dictionary<string, EquipModel>();
static MQClient MqttGate = new MQClient();
static string ScriptPathGate = AppDomain.CurrentDomain.BaseDirectory + $"Script{Path.DirectorySeparatorChar}gate{Path.DirectorySeparatorChar}script1.js";
static V8ScriptEngine Engine = new V8ScriptEngine();
static string PubTopic;
/// <summary>
/// 配置基础信息:网关信息
/// </summary>
public static void ConfigBaseInfo()
{
try
{
GateModel_Time.GateCode = "gw1";
GateModel_Time.Equips = new List<EquipModel>()
{
new EquipModel()
{
EquipCode = "JY355",
HostAddress = "127.0.0.1",
PortNumber = 502,
Tags = new List<TagModel>()
{
new TagModel() { TagCode = "40105", TagDT = (typeof(ushort)).Name.ToLower(), TagValue = 0 },
new TagModel() { TagCode = "40106", TagDT = (typeof(ushort)).Name.ToLower(), TagValue = 0 },
new TagModel() { TagCode = "00105", TagDT = (typeof(bool)).Name.ToLower(), TagValue = false },
new TagModel() { TagCode = "00106", TagDT = (typeof(bool)).Name.ToLower(), TagValue = false }
}
}
};
// 添加地址与数据类型对应关系,用于写入时查询
foreach (var equip in GateModel_Time.Equips)
{
foreach (var tag in equip.Tags)
{
if (!DicTag.ContainsKey(tag.TagCode))
{
DicTag.Add(tag.TagCode, equip);
}
}
}
// 变化上传Model
GateModel_Change.GateCode = GateModel_Time.GateCode;
GateModel_Change.Equips = new List<EquipModel>() { new EquipModel() { Tags = new List<TagModel>() { new TagModel() } } };
}
catch (Exception ex)
{
Logger.Error($"Gate,ConfigBaseInfo,errmsg:{ex.Message}\r\nstacktrace:{ex.StackTrace}");
}
}
/// <summary>
/// 运行服务(定时轮询&定时上报)
/// </summary>
public static async Task Run(MqttModel mqModel, string subTopic, string pubTopic)
{
try
{
PubTopic = pubTopic;
var scriptContent = File.ReadAllText(ScriptPathGate);
Engine.Execute(scriptContent);
await MqttGate.InitConnect("127.0.0.1", mqModel.Port, mqModel.UserName, mqModel.Password, "mqgate", subTopic, SubCallBack);
Task.Run(() => TskPoll());
Task.Run(() => TskUpload());
}
catch (Exception ex)
{
Logger.Error($"Gate,Run,errmsg:{ex.Message}\r\nstacktrace:{ex.StackTrace}");
}
}
/// <summary>
/// 定时轮询
/// </summary>
static async void TskPoll()
{
while (true)
{
try
{
foreach (var equip in GateModel_Time.Equips)
{
var hsl = GetHslBase(equip);
foreach (var tag in equip.Tags)
{
bool status = false; object result = null; string message = null;
var startaddress = tag.TagCode; ConvertStandardModbusAddress2HSLAddress(ref startaddress);
switch (tag.TagDT)
{
case "uint16":
{
OperateResult<ushort> response = await hsl.ReadUInt16Async(startaddress);
if (null != response) { status = response.IsSuccess; result = response.Content; message = response.ToMessageShowString(); }
}
break;
case "boolean":
{
OperateResult<bool> response = await hsl.ReadBoolAsync(startaddress);
if (null != response) { status = response.IsSuccess; result = response.Content; message = response.ToMessageShowString(); }
}
break;
}
if (!status)
{
Logger.Error($"网关读取Modbus数据异常,address:{tag.TagCode},errmsg:{message}");
}
else
{
// 变化上报
if (!tag.TagValue.ToString().Equals(result.ToString()))
{
tag.TagValue = result;
//
GateModel_Change.Equips[0].EquipCode = equip.EquipCode;
GateModel_Change.Equips[0].Tags[0] = tag;
UploadData("变化上报", GateModel_Change);// {"ts":"2024-09-04T03:13:41.874Z","d":[{"tag":"40105","value":0}]}
}
}
await Task.Delay(100);
}
}
}
catch (Exception ex)
{
Logger.Error($"Gate,TskPoll,errmsg={ex.Message}\r\nstacktrace:{ex.StackTrace}");
}
await Task.Delay(500);
}
}
/// <summary>
/// 定时上报
/// </summary>
static async void TskUpload()
{
while (true)
{
await Task.Delay(60000);
try
{
await UploadData("定时上报", GateModel_Time);// {"ts":"2024-09-03T07:57:58.471Z","d":[{"tag":"40105","value":"5"},{"tag":"40106","value":"6"}]}
}
catch (Exception ex)
{
Logger.Error($"Gate,TskUpload,errmsg={ex.Message}\r\nstacktrace:{ex.StackTrace}");
}
}
}
/// <summary>
/// 上报数据
/// </summary>
static async Task UploadData(string note, GateModel gateModel)
{
string dataJson = JsonSerializer.Serialize(gateModel);
var encodeJson = Engine.Invoke("encodeMqttPayload", dataJson);
Logger.Info($"网关自定义编码 >> 备注=【{note}】 Topic主题=【{PubTopic}】 消息=【{encodeJson}】");
await MqttGate.Publish(PubTopic, encodeJson.ToString());
}
/// <summary>
/// 收到消息事件
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
static async Task SubCallBack(MqttApplicationMessageReceivedEventArgs arg)
{
try
{
string topic = arg.ApplicationMessage.Topic;
string mqttPayload = Encoding.UTF8.GetString(arg.ApplicationMessage.PayloadSegment);
Logger.Info($"网关接收报文 >> 客户端ID=【{arg.ClientId}】 Topic主题=【{topic}】 消息=【{mqttPayload}】");
// 写入 {"ts":"2024-09-04T03:41:34.747Z","w":[{"tag":"10105"}]}
var decodeJson = Engine.Invoke("decodeMqttPayload", mqttPayload);
Logger.Info($"网关自定义解码 >> {decodeJson}");
var model = JsonSerializer.Deserialize<GateModel>(decodeJson.ToString());
foreach (var equip in model.Equips)
{
foreach (var tag in equip.Tags)
{
var tagCode = tag.TagCode;
var address = tagCode; ConvertStandardModbusAddress2HSLAddress(ref address);
var content = tag.TagValue.ToString(); OperateResult? operateResult = null;
if (DicTag.ContainsKey(tagCode))
{
var tempDevice = DicTag[tagCode];
var hsl = GetHslBase(tempDevice);
var tempTag = tempDevice.Tags.Where(x => x.TagCode == tagCode).First();
if (null != tempTag)
{
switch (tempTag.TagDT)
{
case "uint16":
{
operateResult = await hsl.WriteAsync(address, Convert.ToUInt16(content));
}
break;
case "boolean":
{
operateResult = await hsl.WriteAsync(address, Convert.ToBoolean(content));
}
break;
}
if (null != operateResult && operateResult.IsSuccess)
{
if (operateResult.IsSuccess)
{
Logger.Info($"网关写入数据 >> 状态=【成功】 地址=【{tagCode}】 值=【{content}】");
}
else
{
Logger.Error($"网关写入数据 >> 状态=【失败】 地址=【{tagCode}】 值=【{content}】错误信息=【{operateResult.ToMessageShowString()}】");
}
}
else
{
Logger.Error($"网关写入数据 >> 状态=【失败】 地址=【{tagCode}】 值=【{content}】 错误信息=【operateResult==null】");
}
}
else
{
Logger.Error($"网关写入数据 >> 状态=【失败】 地址=【{tagCode}】 值=【{content}】 错误信息=【tempTag==null】");
}
}
else
{
Logger.Error($"网关写入数据 >> 状态=【失败】 地址=【{tagCode}】 值=【{content}】错误信息=【DicTag字典查无数据】");
}
}
}
}
catch (Exception ex)
{
Logger.Error($"Gate,SubCallBack,errmsg:{ex.Message}\r\nstacktrace:{ex.StackTrace}");
}
}
/// <summary>
/// 获取Hsl进行设备交互的对象
/// </summary>
static DeviceCommunication GetHslBase(EquipModel equip)
{
var hsl = (DeviceCommunication)UidMgr.GetClient(equip.EquipCode);
if (null == hsl)
{
var temp = new ModbusTcpNet(equip.HostAddress, equip.PortNumber);
temp.Station = 1;
temp.AddressStartWithZero = false;
temp.DataFormat = DataFormat.CDAB;
temp.ReceiveTimeOut = 5000;
hsl = temp;
UidMgr.AddClient(equip.EquipCode, hsl);
Logger.Info($"网关初始化设备交互 >> 设备编码=【{equip.EquipCode}】");
}
return hsl;
}
/// <summary>
/// 地址转换
/// </summary>
/// <param name="val"></param>
static void ConvertStandardModbusAddress2HSLAddress(ref string val)
{
if (!val.Contains("x="))
{
int code = 1;
ushort address = Convert.ToUInt16(val);
if (address >= 00001 && address <= 09999)// 00001 ~ 09999
{
code = 1;// 读线圈状态
}
else if (address >= 10001 && address <= 19999)// 10001 ~ 19999
{
code = 2;// 读离散输入状态(只读)
}
else if (address >= 30001 && address <= 39999)// 30001 ~ 39999 04指令
{
code = 4;// 读输入寄存器(只读)
}
else if (address >= 40001 && address <= 49999)// 40001 ~ 49999 03指令
{
code = 3;// 读保存寄存器
}
var temp = Convert.ToUInt16(val.Substring(1));
val = $"x={code};{temp}";
}
}
}
}
【应用层】管理后台
using IotDataFlow.Section.iot.model;
using IotDataFlow.Util;
using System.Text.Encodings.Web;
using System.Text.Json;
namespace IotDataFlow.Section.iot
{
/// <summary>
/// 【应用层】管理后台
/// </summary>
public class Web
{
/// <summary>
/// 配置基础数据:设备信息(物模型)
/// </summary>
public static void ConfigBaseInfo(string deviceId, string subTopic, string pubTopic)
{
// 设备注册
var insert = false;
var deviceModel = LitedbHelper.GetDeviceModel(deviceId);
if (null == deviceModel)
{
deviceModel = new DeviceModel(); insert = true;
}
deviceModel.SubTopic = subTopic;
deviceModel.PubTopic = pubTopic;
deviceModel.DeviceId = deviceId;
deviceModel.DeviceName = "温湿度计";
deviceModel.Propertys = new Dictionary<string, PropertyModel>()
{
["temperature"] = new PropertyModel() { DataType = (typeof(double)).Name.ToLower(), DataValue = 0 },
["humidity"]= new PropertyModel() { DataType = (typeof(double)).Name.ToLower(), DataValue = 0 },
["status"] = new PropertyModel() { DataType = (typeof(string)).Name.ToLower(), DataValue = "offline" },
["switchstate1"] = new PropertyModel() { DataType = (typeof(bool)).Name.ToLower(), DataValue = false },
["switchstate2"] = new PropertyModel() { DataType = (typeof(bool)).Name.ToLower(), DataValue = false }
};
deviceModel.Triggers = new List<TriggerModel>()
{
new TriggerModel() { TriggerId = 1, Description = "温度超过阈值警报", Condition = "temperature > 30", Action = new ActionModel(){ Type = ActionType.WriteDevice, Arg = new Dictionary<string, object>() { ["switchstate1"] = true } } },
new TriggerModel() { TriggerId = 2, Description = "温度正常", Condition = "temperature <= 30", Action = new ActionModel(){ Type = ActionType.WriteDevice, Arg = new Dictionary<string, object>() { ["switchstate1"] = false } } },
new TriggerModel() { TriggerId = 3, Description = "湿度低于阈值警报", Condition = "humidity > 30", Action = new ActionModel(){ Type = ActionType.WriteDevice, Arg = new Dictionary<string, object>() { ["switchstate2"] = true } } },
new TriggerModel() { TriggerId = 4, Description = "湿度正常", Condition = "humidity <= 30", Action = new ActionModel(){ Type = ActionType.WriteDevice, Arg = new Dictionary<string, object>() { ["switchstate2"] = false } } },
new TriggerModel() { TriggerId = 5, Description = "温度和湿度都正常", Condition = "temperature <= 30 and humidity >= 30", Action = new ActionModel(){ Type = ActionType.WxNotice, Arg = new Dictionary<string, object>() { ["openid"] = "112233" } } },
new TriggerModel() { TriggerId = 6, Description = "继电器1开", Condition = "switchstate1 == true", Action = null },
new TriggerModel() { TriggerId = 7, Description = "继电器2开", Condition = "switchstate2 == true", Action = null }
};
if (insert)
{
LitedbHelper.InsertDeviceModel(deviceModel);
}
else
{
LitedbHelper.UpdateDeviceModel(deviceModel);
}
Logger.Info($"Web基础信息:{ JsonSerializer.Serialize(deviceModel, new JsonSerializerOptions { Encoder = JavaScriptEncoder.UnsafeRelaxedJsonEscaping })}");
}
}
}
【服务层】通信服务
using DynamicExpresso;
using IotDataFlow.Section.iot.model;
using IotDataFlow.Util;
using Microsoft.ClearScript.V8;
using MQTTnet.Client;
using System.Text;
using System.Text.Json;
namespace IotDataFlow.Section.iot
{
/// <summary>
/// 【服务层】通信服务
/// </summary>
public class Service
{
static MQClient mqIot = new MQClient();
static string scriptPathIot = AppDomain.CurrentDomain.BaseDirectory + $"Script{Path.DirectorySeparatorChar}iot{Path.DirectorySeparatorChar}script1.js";
static V8ScriptEngine engine = new V8ScriptEngine();
/// <summary>
/// 运行服务:MQ服务
/// </summary>
public static async Task Run(MqttModel mqModel, string subTopic)
{
string scriptContent = File.ReadAllText(scriptPathIot);
engine = new V8ScriptEngine();
engine.Execute(scriptContent);
// 启动mqttserver
await MQServer.Start(mqModel.Port, mqModel.UserName, mqModel.Password);
// 监听订阅信息
await mqIot.InitConnect("127.0.0.1", mqModel.Port, mqModel.UserName, mqModel.Password, "mqiot", subTopic, SubCallBack);
}
/// <summary>
/// IOT收到消息事件
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
static async Task SubCallBack(MqttApplicationMessageReceivedEventArgs arg)
{
try
{
// mq接收
string topic = arg.ApplicationMessage.Topic;
string mqttPayload = Encoding.UTF8.GetString(arg.ApplicationMessage.PayloadSegment);
Logger.Info($"IoT接收报文 >> 客户端ID=【{arg.ClientId}】 Topic主题=【{topic}】 消息=【{mqttPayload}】 qos等级=【{arg.ApplicationMessage.QualityOfServiceLevel}】");
var encodeJson = engine.Invoke("encodeMqttPayload", mqttPayload);// {"temperature":5,"humidity":6}
Logger.Info($"IoT自定义解码 >> {encodeJson}");
// 规则引擎
var deviceId = topic.Split("/").Last();
var deviceModel = LitedbHelper.GetDeviceModel(deviceId);
var propertys = deviceModel.Propertys;
var rules = deviceModel.Triggers;
if (null == deviceModel || null == propertys || null == rules) return;
var datas = JsonSerializer.Deserialize<Dictionary<string, JsonElement>>(encodeJson.ToString());
var lstParam = new List<Parameter>();
var lstJosnProperty = new List<string>();
foreach (var data in datas)
{
var propertyKey = data.Key.ToLower();
if (propertys.ContainsKey(propertyKey))
{
// 收集json报文中含有的属性
lstJosnProperty.Add(propertyKey);
// 填充表达式的参数
var propertyModel = propertys[propertyKey];
switch (propertyModel.DataType)
{
case "double": lstParam.Add(new Parameter(data.Key, data.Value.GetDouble())); break;
case "string": lstParam.Add(new Parameter(data.Key, data.Value.GetString())); break;
case "boolean": lstParam.Add(new Parameter(data.Key, data.Value.GetBoolean())); break;
}
}
}
// 评估规则
var interpreter = new Interpreter();
var filteredRules = rules.Where(rule => lstJosnProperty.Any(property => rule.Condition.Contains(property))).ToList();
foreach (var rule in filteredRules)
{
try
{
var condition = rule.Condition.Replace(" and ", " && ").Replace(" or ", " || ");
bool result = (bool)interpreter.Eval(condition, lstParam.ToArray());
if (result)
{
// 添加记录
LitedbHelper.InsertAlarm(new AlarmModel() { Description= rule.Description });
if (null != rule.Action && null != rule.Action.Arg)
{
string argJson = JsonSerializer.Serialize(rule.Action.Arg);
switch (rule.Action.Type)
{
case ActionType.WxNotice:
{
Logger.Info($"IoT触发规则 >> {rule.Description} 联动场景=【微信通知】 执行参数=【{argJson}】");
}
break;
case ActionType.WriteDevice:
{
var decodeJson = engine.Invoke("decodeMqttPayload", argJson);// {"temperature":5,"humidity":6}
Logger.Info($"IoT触发规则 >> {rule.Description} 联动场景=【写入设备】 执行参数=【{argJson}】 自定义解码=【{decodeJson}】");
mqIot.Publish(deviceModel.PubTopic, decodeJson.ToString());
}
break;
}
}
else
{
Logger.Info($"IoT触发规则 >> {rule.Description} 联动场景=【无】");
}
}
}
catch (Exception ex)
{
//Logger.Error($"Service,Condition,errmsg:{ex.Message}\r\nstacktrace:{ex.StackTrace}");
}
}
}
catch (Exception ex)
{
Logger.Error($"Service,SubCallBack,errmsg:{ex.Message}\r\nstacktrace:{ex.StackTrace}");
}
}
}
}
编解码文件
// =======================================================================================
// ** 脚本名称:script2.js
// ** 输入Json:{"GateCode":"gw1","Equips":[{"EquipCode":"JY355","Tags":[{"TagCode":"40105","TagValue":"5"},{"TagCode":"40106","TagValue":"6"}]},{"DeviceCode":"JY356","Tags":[{"TagCode":"40107","TagValue":"7"},{"TagCode":"40108","TagValue":"8"}]}]}
// ** 输出Json:{"clientid":"gw1","time":"2024-08-30 11:34:35","JY355":[{"tag":"40105","value":"5"},{"tag":"40106","value":"6"}],"JY356":[{"tag":"40107","value":"7"},{"tag":"40108","value":"8"}]}
// =======================================================================================
function createMqttPayload(dataJson) {
let gate = JSON.parse(dataJson);
let clientid = gate.GateCode;
let device = gate.Devices;
let result = {
clientid: gate.GateCode,
time: new Date(),
};
device.forEach(function (d) {
let equipCode = d.EquipCode;
let tag = d.Tags;
if (!result[equipCode]) {
result[equipCode] = [];
}
tag.forEach(function (t) {
result[equipCode].push({
tag: t.TagCode,
value: t.TagValue
});
});
});
return JSON.stringify(result);
}