PoppyEvan 2016-06-28
http://www.iteblog.com/archives/1677
ApacheKafka0.10.0.0稳定版发布及其新特性介绍
在Kafka0.9.0.0,开发者们在新consumer上使用poll()函数的时候是几乎无法控制返回消息的条数。不过值得高兴的是,此版本的Kafka引入了max.poll.records参数,允许开发者控制返回消息的条数。
NewConsumerAPI
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.0.0</version>
</dependency>
kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
poll
ConsumerRecords<K,V>poll(longtimeout)
SeeAlso:
KafkaConsumer.poll(long)
poll
publicConsumerRecords<K,V>poll(longtimeout)
Fetchdataforthetopicsorpartitionsspecifiedusingoneofthesubscribe/assignAPIs.Itisanerrortonothavesubscribedtoanytopicsorpartitionsbeforepollingfordata.
Oneachpoll,consumerwilltrytousethelastconsumedoffsetasthestartingoffsetandfetchsequentially.Thelastconsumedoffsetcanbemanuallysetthroughseek(TopicPartition,long)orautomaticallysetasthelastcommittedoffsetforthesubscribedlistofpartitions
Specifiedby:
pollininterfaceConsumer<K,V>
Parameters:
timeout-Thetime,inmilliseconds,spentwaitinginpollifdataisnotavailableinthebuffer.If0,returnsimmediatelywithanyrecordsthatareavailablecurrentlyinthebuffer,elsereturnsempty.Mustnotbenegative.
Returns:
mapoftopictorecordssincethelastfetchforthesubscribedlistoftopicsandpartitions
Throws:
InvalidOffsetException-iftheoffsetforapartitionorsetofpartitionsisundefinedoroutofrangeandnooffsetresetpolicyhasbeenconfigured
WakeupException-ifwakeup()iscalledbeforeorwhilethisfunctioniscalled
AuthorizationException-ifcallerdoesReadaccesstoanyofthesubscribedtopicsortotheconfiguredgroupId
KafkaException-foranyotherunrecoverableerrors(e.g.invalidgroupIdorsessiontimeout,errorsdeserializingkey/valuepairs,oranynewerrorcasesinfutureversions)
SeeAlso:
poll(long)
Propertiesprops=newProperties();
props.put("bootstrap.servers","localhost:9092");
props.put("group.id","test");
props.put("enable.auto.commit","false");
props.put("auto.commit.interval.ms","1000");
props.put("session.timeout.ms","30000");
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String,String>consumer=newKafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo","bar"));
finalintminBatchSize=200;
List<ConsumerRecord<String,String>>buffer=newArrayList<>();
while(true){
ConsumerRecords<String,String>records=consumer.poll(100);
for(ConsumerRecord<String,String>record:records){
buffer.add(record);
}
if(buffer.size()>=minBatchSize){
insertIntoDb(buffer);
consumer.commitSync();
buffer.clear();
}
}
SimpleConsumerdemo:
https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example
ReadingtheData
// When calling FetchRequestBuilder, it's important NOT to call .replicaId(), which is meant for internal use only.
// Setting the replicaId incorrectly will cause the brokers to behave incorrectly.
FetchRequest req = new FetchRequestBuilder()
.clientId(clientName)
.addFetch(a_topic, a_partition, readOffset, 100000)
.build();
FetchResponse fetchResponse = consumer.fetch(req);
if (fetchResponse.hasError()) {
// See code in previous section
}
numErrors = 0;
long numRead = 0;
for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) {
long currentOffset = messageAndOffset.offset();
if (currentOffset < readOffset) {
System.out.println("Found an old offset: " + currentOffset + " Expecting: " + readOffset);
continue;
}
readOffset = messageAndOffset.nextOffset();
ByteBuffer payload = messageAndOffset.message().payload();
byte[] bytes = new byte[payload.limit()];
payload.get(bytes);
System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8"));
numRead++;
a_maxReads--;
}
if (numRead == 0) {
try {
Thread.sleep(1000);
} catch (InterruptedException ie) {
}
}Notethatthe‘readOffset’asksthelastreadmessagewhatthenextOffsetwouldbe.ThiswaywhentheblockofmessagesisprocessedweknowwheretoaskKafkawheretostartthenextfetch.
Alsonotethatweareexplicitlycheckingthattheoffsetbeingreadisnotlessthantheoffsetthatwerequested.ThisisneededsinceifKafkaiscompressingthemessages,thefetchrequestwillreturnanentirecompressedblockeveniftherequestedoffsetisn'tthebeginningofthecompressedblock.Thusamessagewesawpreviouslymaybereturnedagain.NotealsothatweaskforafetchSizeof100000bytes.IftheKafkaproducersarewritinglargebatches,thismightnotbeenough,andmightreturnanemptymessageset.Inthiscase,thefetchSizeshouldbeincreaseduntilanon-emptysetisreturned.
Finally,wekeeptrackofthe#ofmessagesread.Ifwedidn'treadanythingonthelastrequestwegotosleepforasecondsowearen'thammeringKafkawhenthereisnodata.
http://zqhxuyuan.github.io/2016/02/20/Kafka-Consumer-New/
http://blog.csdn.net/xianzhen376/article/details/51167742