PoppyEvan 2016-06-28
http://www.iteblog.com/archives/1677
ApacheKafka0.10.0.0稳定版发布及其新特性介绍
在Kafka0.9.0.0,开发者们在新consumer上使用poll()函数的时候是几乎无法控制返回消息的条数。不过值得高兴的是,此版本的Kafka引入了max.poll.records参数,允许开发者控制返回消息的条数。
NewConsumerAPI
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.0.0</version>
</dependency>
kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
poll
ConsumerRecords<K,V>poll(longtimeout)
SeeAlso:
KafkaConsumer.poll(long)
poll
publicConsumerRecords<K,V>poll(longtimeout)
Fetchdataforthetopicsorpartitionsspecifiedusingoneofthesubscribe/assignAPIs.Itisanerrortonothavesubscribedtoanytopicsorpartitionsbeforepollingfordata.
Oneachpoll,consumerwilltrytousethelastconsumedoffsetasthestartingoffsetandfetchsequentially.Thelastconsumedoffsetcanbemanuallysetthroughseek(TopicPartition,long)orautomaticallysetasthelastcommittedoffsetforthesubscribedlistofpartitions
Specifiedby:
pollininterfaceConsumer<K,V>
Parameters:
timeout-Thetime,inmilliseconds,spentwaitinginpollifdataisnotavailableinthebuffer.If0,returnsimmediatelywithanyrecordsthatareavailablecurrentlyinthebuffer,elsereturnsempty.Mustnotbenegative.
Returns:
mapoftopictorecordssincethelastfetchforthesubscribedlistoftopicsandpartitions
Throws:
InvalidOffsetException-iftheoffsetforapartitionorsetofpartitionsisundefinedoroutofrangeandnooffsetresetpolicyhasbeenconfigured
WakeupException-ifwakeup()iscalledbeforeorwhilethisfunctioniscalled
AuthorizationException-ifcallerdoesReadaccesstoanyofthesubscribedtopicsortotheconfiguredgroupId
KafkaException-foranyotherunrecoverableerrors(e.g.invalidgroupIdorsessiontimeout,errorsdeserializingkey/valuepairs,oranynewerrorcasesinfutureversions)
SeeAlso:
poll(long)
Propertiesprops=newProperties();
props.put("bootstrap.servers","localhost:9092");
props.put("group.id","test");
props.put("enable.auto.commit","false");
props.put("auto.commit.interval.ms","1000");
props.put("session.timeout.ms","30000");
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String,String>consumer=newKafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo","bar"));
finalintminBatchSize=200;
List<ConsumerRecord<String,String>>buffer=newArrayList<>();
while(true){
ConsumerRecords<String,String>records=consumer.poll(100);
for(ConsumerRecord<String,String>record:records){
buffer.add(record);
}
if(buffer.size()>=minBatchSize){
insertIntoDb(buffer);
consumer.commitSync();
buffer.clear();
}
}
SimpleConsumerdemo:
https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example
ReadingtheData
// When calling FetchRequestBuilder, it's important NOT to call .replicaId(), which is meant for internal use only. // Setting the replicaId incorrectly will cause the brokers to behave incorrectly. FetchRequest req = new FetchRequestBuilder() .clientId(clientName) .addFetch(a_topic, a_partition, readOffset, 100000) .build(); FetchResponse fetchResponse = consumer.fetch(req); if (fetchResponse.hasError()) { // See code in previous section } numErrors = 0; long numRead = 0; for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) { long currentOffset = messageAndOffset.offset(); if (currentOffset < readOffset) { System.out.println("Found an old offset: " + currentOffset + " Expecting: " + readOffset); continue; } readOffset = messageAndOffset.nextOffset(); ByteBuffer payload = messageAndOffset.message().payload(); byte[] bytes = new byte[payload.limit()]; payload.get(bytes); System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8")); numRead++; a_maxReads--; } if (numRead == 0) { try { Thread.sleep(1000); } catch (InterruptedException ie) { } }
Notethatthe‘readOffset’asksthelastreadmessagewhatthenextOffsetwouldbe.ThiswaywhentheblockofmessagesisprocessedweknowwheretoaskKafkawheretostartthenextfetch.
Alsonotethatweareexplicitlycheckingthattheoffsetbeingreadisnotlessthantheoffsetthatwerequested.ThisisneededsinceifKafkaiscompressingthemessages,thefetchrequestwillreturnanentirecompressedblockeveniftherequestedoffsetisn'tthebeginningofthecompressedblock.Thusamessagewesawpreviouslymaybereturnedagain.NotealsothatweaskforafetchSizeof100000bytes.IftheKafkaproducersarewritinglargebatches,thismightnotbeenough,andmightreturnanemptymessageset.Inthiscase,thefetchSizeshouldbeincreaseduntilanon-emptysetisreturned.
Finally,wekeeptrackofthe#ofmessagesread.Ifwedidn'treadanythingonthelastrequestwegotosleepforasecondsowearen'thammeringKafkawhenthereisnodata.
http://zqhxuyuan.github.io/2016/02/20/Kafka-Consumer-New/
http://blog.csdn.net/xianzhen376/article/details/51167742