mqtt node 코드
2023-01-30
아래 코드는 iot 센서 데이터를 모스키토에서 받아와서 다시 oneM2M core 쪽으로 전달하는, 게이트 웨이 역할을 하는 코드 입니다.
publish (발행) 을 1분에 한번씩 계속 해주도록 만들어 두었는데, 분리를 하면 좋을것 같다고 하네요.
제 생각에는 그냥 껏다 켯다 할수 있는 화면 하나만 제공 하거나, 직접 날리는 버튼 하나만 만들어 줄까 하네요.
화면에서 데이터 흘러가는걸 표시해주면 조금 더 좋을것 같기도 하고, 일단 그렇습니다.
import mqtt from "mqtt"; // mqtt 라이브러리
import wait from "waait"; // 지연을 위한 라이브러리
import fetch from "node-fetch"; // http 라이브러리
import { logger } from "./logger.js"
import moment from "moment-timezone";
moment.tz.setDefault("Asia/Seoul");
const config = {
topics: [
'location',
'location_power',
'tc_spreader',
'qc_spreader',
'taco',
'chassis',
'wearable_location',
'wearable_hr',
'wearable_power',
'cctv_veh_container',
'cctv_waiting',
'cctv_gate',
'heartbeat',
'fota',
'device_version',
'iot_backwardZ',
], // topic은 ngl과 협의 필요할 것으로 보입니다.
mqtt_server : {
host : '192.168.44.65', // ngl 서버로 추후 변경 필요 합니다. 지금은 저희 서버의 모스키토 이용중입니다.
port : 1883,
},
message : { // ngl 서버에서 보내주는 sensor 메시지 형태를 맞추어야 될 것 같습니다.
ae : 'S00000000002',
cnt: 'heartbeat',
body : {
"m2m:cin": {
"con": {"battery": 1, "status": "OK"},
"cnf": "application/json",
"or": "iochord"
}
}
},
mobius: {
host: '192.168.44.65',
port: '7579',
cb: 'CB00003',
headers : {
Accept: 'application/json',
'X-M2M-RI': 1,
'X-M2M-Origin': 1,
'Content-Type': 'application/json;ty=4',
}
}
}
/**
* 연결하기
*
* @type {AudioNode | void}
*/
const client = mqtt.connect({
host: config.mqtt_server.host,
port: config.mqtt_server.port,
});
/**
* connect 이벤트
*/
client.on("connect", () => {
logger.info(`--------------------------`)
logger.info(`connected : ${client.connected}`)
logger.info(`--------------------------`)
main();
});
/**
* 연결 실패
*/
client.on("error", (error) => {
logger.error(`--------------------------`)
logger.error(`Can't connect : ${error}`)
logger.error(`--------------------------`)
end();
});
/**
* 연결 종료
*
* @returns {*}
*/
const end = () => {
client.end();
logger.info(`End connection.`)
}
/**
* 메시지 보내기
*/
const publish = () => {
const msg = JSON.stringify(config.message);
config.topics.map(topic => {
client.publish(topic, msg, {
retain: true, // 옵션임
qos: 1 // 옵션임
}, (callbackMsg) => {
const msg = callbackMsg instanceof Object ?
JSON.stringify(callbackMsg) :
callbackMsg;
logger.info(`publish (발행) : ${msg}`)
});
});
}
/**
* 구독
*/
const subscribe = () => {
config.topics.map(topic => {
client.subscribe(topic, {qos:1}, (callbackMsg) => {
const msg = callbackMsg instanceof Object ?
JSON.stringify(callbackMsg) :
callbackMsg;
logger.info(`subscribe (구독) : ${msg}`)
});
})
}
/**
* 메시지 받기
*/
const on = () => {
config.topics.map(topic => {
client.on('message', (topic, message, packet) => {
logger.info(`topic is : ${topic}`)
const msg_obj = JSON.parse(message);
const {ae, cnt, body} = msg_obj;
body["m2m:cin"].con["mqtt_client_time"] = moment().format("YYYY-MM-DD HH:mm:ss");
//send_sensor_data(ae, cnt, JSON.stringify(body));
// cnt 대신 topic 을 넣어도 될 것 같음
send_sensor_data(ae, topic, JSON.stringify(body));
});
})
}
/**
* iiot_core로 cin 호출
*
* @param ae
* @param cnt
* @param body
*/
const send_sensor_data = (ae, cnt, body) => {
const { host, port, cb } = config.mobius;
const url = `http://${host}:${port}/${cb}/${ae}/${cnt}`;
logger.info(`url : ${url}`)
logger.info(`body : ${body}`)
fetch(url, {
method: 'POST',
headers: config.mobius.headers,
body: body,
})
.then((response) => response.json())
.then((data) => {
logger.info(`send_sensor_data : ${JSON.stringify(data)}`)
});
}
const main = async () => {
try {
subscribe() // 구독
await wait(3000);
on()
while(true) {
await wait(3000);
publish() // 발행
await wait(60000);
}
} catch (e) {
end()
}
}