const { Kafka } = require('kafkajs');
var kafka, producer, kafkajson;
fs.readFile(nwPath.resolve(nwDir, '../kafka-topic.json'), function (err, data) {
if (err) {
layer.open({
content: '缺少kafka-topic.json文件!'
, btn: '关闭'
});
// updater();
throw err;
}
if (data[0] === 0xEF && data[1] === 0xBB && data[2] === 0xBF) {
//解决记事本保存的utf8会带dom,导致无法JSON.parse
data = data.slice(3);
}
kafkajson = JSON.parse(data.toString());
kafka = new Kafka({
clientId: 'my-app',
requestTimeout: 25000,
connectionTimeout: 30000,
authenticationTimeout: 30000,
retry: {
initialRetryTime: 3000,
retries: 0
},
brokers: [kafkajson.messageServerUri]
})
//接收
const consumer = kafka.consumer({ groupId: 'test-group' })
consumer.connect().then(e => {
return consumer;
}).then(e => {
return consumer.subscribe({ topic: kafkajson.receiveTopic, fromBeginning: true });
}).then(e => {
consumer.run({
eachMessage: ({ topic, partition, message }) => {
kafkasend(message.value.toString())
},
})
}).catch(e => {
console.log("CATCH:",e);
})
//发送
producer = kafka.producer()
// kafkasend("hello")
})
function kafkasend(params) {
console.log("发送kafka");
producer.connect().then(e => {
return producer
}).then(e => {
return producer.send({
topic: kafkajson.sendTopic,
messages: params,
})
}).then(e => {
producer.disconnect()
console.log('hello');
location.reload();//刷新页面
}).catch(e => {
console.log("发送错误");
})
}
Comments | NOTHING