const kafka = require('kafka-node');
let conn = {'kafkaHost':'10.10.0.21:9092'}; //ip和端口
let consumers = [
{
'type': 'consumer',
'options': {'autoCommit': true},
'name':'common',
'topic':[
//修改topic
{'topic': 'data_output', 'partition': 0}
]
}
];
let MQ = function(){
}
MQ.prototype.AddConsumer = function (conn, topics, options, handler){
let client = new kafka.KafkaClient(conn);
let consumer = new kafka.Consumer(client, topics, options);
if(!!handler){
consumer.on('message', handler);
}
consumer.on('error', function(err){
console.error('consumer error ',err.stack);
});
}
var mq = new MQ();
mq.AddConsumer(conn, consumers[0].topic, consumers[0].options, function (message){
console.log(message.value);
});