Connection and Timeout in RabbitMQ NodeJS

powrexly 2019-03-31

ConnectionandTimeoutinRabbitMQNodeJS

Inoursimplescript,wearewriteasendToQueue.jsinNodeJStosendoutJSONfilecontenttoRabbitMQ.IthinkwejustfollowtheexampleinRabbitMQanddoitlikethis

//Usage:nodesendToQueue.jsmeeting-to-update-stage.json

{

constamqp=require('amqplib/callback_api');

constfs=require('fs');

constdurable=process.env.MQ_DURABLE||true;

constexchangeType='topic';

constexchange=‘xxxxxx_events';

constamqpHost='amqp://username:[email protected]/name?heartbeat=30';

constobjectFile=process.argv[2];

letobjects=fs.readFileSync(objectFile).toString().split("\n");

objects.forEach(msg=>{

console.log(`parsedfromfile-${JSON.stringify(msg)}\n`);

});

try{

console.log(`sendingmessagestorabbit...`);

amqp.connect(amqpHost,function(err,conn){

conn.createChannel(function(err,ch){

ch.assertExchange(exchange,exchangeType,{durable:durable});

objects.forEach(msg=>{

if(msg!==null&&msg!==''){

ch.publish(exchange,'',Buffer.from(msg));

}

});

});

setTimeout(function(){conn.close();process.exit(0)},500);});

}catch(error){

console.log(`error:${error}`);

}

}

TheissueissetTimeout(xxx,500);intheexample,itonlysendout1message,wait500msisgoodenough.

Butinmycase,itisaloopwiththousands,hundredsmessages.Itisworkingwhenthenumberis100,butnotworkingwhenthenumberis1000.

HereisthepromiseexampleIthink,itfixedthetimeoutconnectionissue.

{

constamqp=require('amqplib');

constfs=require('fs');

constdurable=process.env.MQ_DURABLE||true;

constexchangeType='topic';

constexchange=‘xxxxxx_events';

constamqpHost='amqp://username:[email protected]/name?heartbeat=30';

constobjectFile=process.argv[2];

letobjects=fs.readFileSync(objectFile).toString().split("\n");

(async()=>{

try{

constconn=awaitamqp.connect(amqpHost);

constch=awaitconn.createChannel();

awaitch.assertExchange(exchange,exchangeType,{durable:durable});

console.log("Loadingcsvfilecontents,numberofrecords="+objects.length);

awaitobjects.map(async(msg)=>{

if(msg!==null&&msg!==''){

try{

console.log(`msg=${JSON.stringify(msg)}\n`)

awaitch.publish(exchange,'',Buffer.from(msg));

returnmsg;

}catch(error){

console.log("****ERRORprocessingmsg:"+error);

}

}

});

awaitch.close();

awaitconn.close();

}catch(error){

console.log(error);

}

})()

}

Thenitworksperfectly.

References:

https://github.com/squaremo/amqp.node/issues/404

https://zhuanlan.zhihu.com/p/28276010

https://www.cnblogs.com/duhuo/p/6306535.html

https://stackoverflow.com/questions/41212249/node-wait-for-loop-to-finish

https://stackoverflow.com/questions/18574298/wait-until-node-amqp-has-sent-a-message

相关推荐