powrexly 2019-03-31
ConnectionandTimeoutinRabbitMQNodeJS
Inoursimplescript,wearewriteasendToQueue.jsinNodeJStosendoutJSONfilecontenttoRabbitMQ.IthinkwejustfollowtheexampleinRabbitMQanddoitlikethis
//Usage:nodesendToQueue.jsmeeting-to-update-stage.json
{
constamqp=require('amqplib/callback_api');
constfs=require('fs');
constdurable=process.env.MQ_DURABLE||true;
constexchangeType='topic';
constexchange=‘xxxxxx_events';
constamqpHost='amqp://username:[email protected]/name?heartbeat=30';
constobjectFile=process.argv[2];
letobjects=fs.readFileSync(objectFile).toString().split("\n");
objects.forEach(msg=>{
console.log(`parsedfromfile-${JSON.stringify(msg)}\n`);
});
try{
console.log(`sendingmessagestorabbit...`);
amqp.connect(amqpHost,function(err,conn){
conn.createChannel(function(err,ch){
ch.assertExchange(exchange,exchangeType,{durable:durable});
objects.forEach(msg=>{
if(msg!==null&&msg!==''){
ch.publish(exchange,'',Buffer.from(msg));
}
});
});
setTimeout(function(){conn.close();process.exit(0)},500);});
}catch(error){
console.log(`error:${error}`);
}
}
TheissueissetTimeout(xxx,500);intheexample,itonlysendout1message,wait500msisgoodenough.
Butinmycase,itisaloopwiththousands,hundredsmessages.Itisworkingwhenthenumberis100,butnotworkingwhenthenumberis1000.
HereisthepromiseexampleIthink,itfixedthetimeoutconnectionissue.
{
constamqp=require('amqplib');
constfs=require('fs');
constdurable=process.env.MQ_DURABLE||true;
constexchangeType='topic';
constexchange=‘xxxxxx_events';
constamqpHost='amqp://username:[email protected]/name?heartbeat=30';
constobjectFile=process.argv[2];
letobjects=fs.readFileSync(objectFile).toString().split("\n");
(async()=>{
try{
constconn=awaitamqp.connect(amqpHost);
constch=awaitconn.createChannel();
awaitch.assertExchange(exchange,exchangeType,{durable:durable});
console.log("Loadingcsvfilecontents,numberofrecords="+objects.length);
awaitobjects.map(async(msg)=>{
if(msg!==null&&msg!==''){
try{
console.log(`msg=${JSON.stringify(msg)}\n`)
awaitch.publish(exchange,'',Buffer.from(msg));
returnmsg;
}catch(error){
console.log("****ERRORprocessingmsg:"+error);
}
}
});
awaitch.close();
awaitconn.close();
}catch(error){
console.log(error);
}
})()
}
Thenitworksperfectly.
References:
https://github.com/squaremo/amqp.node/issues/404
https://zhuanlan.zhihu.com/p/28276010
https://www.cnblogs.com/duhuo/p/6306535.html
https://stackoverflow.com/questions/41212249/node-wait-for-loop-to-finish
https://stackoverflow.com/questions/18574298/wait-until-node-amqp-has-sent-a-message