MrZhangAdd 2012-06-18
Kafka(2)InstallubuntuandTrymoreJAVAclient
1.Trytosetupthisonwindows.
downloadandinstallthisfile
http://scalasbt.artifactoryonline.com/scalasbt/sbt-native-packages/org/scala-sbt/sbt-launcher/0.11.3/sbt.msi
Unzipthekafkatoworkingdirectory:
D:\tool\kafka-0.7.0
>sbtupdate
>sbtpackage
sbtisinstalledonwindows,butstill,itishardtoinstallkafkaonwindows
2.Trytosetuponubuntu12.04
>wgethttp://people.apache.org/~nehanarkhede/kafka-0.7.0-incubating/kafka-0.7.0-incubating-src.tar.gz
>tarzxvfkafka-0.7.0-incubating-src.tar.gz
>mvkafka-0.7.0-incubating-src/opt/tools/kafka-0.7.0
>cd/opt/tools/kafka-0.7.0
>./sbtupdate
>./sbtpackage
starttheserver
>bin/zookeeper-server-start.shconfig/zookeeper.properties
>bin/kafka-server-start.shconfig/server.properties
3.FixtheJavaClientProblem
ErrorMessage:
[2012-06-1117:55:00,109]WARNExceptioncausingcloseofsession0x137daf68ab70001duetojava.io.IOException:Connectionresetbypeer(org.apache.zookeeper.server.NIOServerCnxn)
[2012-06-1117:55:00,110]INFOClosedsocketconnectionforclient/192.168.56.1:62003whichhadsessionid0x137daf68ab70001(org.apache.zookeeper.server.NIOServerCnxn)
Solution:
server.properties
#Hostnamethebrokerwilladvertisetoconsumers.Ifnotset,kafkawillusethevaluereturned
#fromInetAddress.getLocalHost().IftherearemultipleinterfacesgetLocalHost
#maynotbewhatyouwant.
hostname=x.x.x.x
#zk.connect=localhost:2181
zk.connect=x.x.x.x:2181
#Timeoutinmsforconnectingtozookeeper
zk.connectiontimeout.ms=1000000
zk.sessiontimeout.ms=60000
zookeeper.properties
dataDir=/tmp/zookeeper
#theportatwhichtheclientswillconnect
clientPort=2181
#disabletheper-iplimitonthenumberofconnectionssincethisisanon-productionconfig
maxClientCnxns=0
tickTime=8000
Weneedtouserealipaddresshereinconfiguration.
TheJavaClientsamplecodesareunderthisdirectory:D:\book\distributed\kafka-0.7.0-incubating-src\examples\src\main\java\kafka\examples
Theclasswillbeasfollow:
packagecom.sillycat.magicneptune.example;
importjava.util.Properties;
importkafka.javaapi.producer.Producer;
importkafka.javaapi.producer.ProducerData;
importkafka.producer.ProducerConfig;
publicclassTestProducerMain{
publicstaticvoidmain(String[]args){
Propertiesprops2=newProperties();
props2.put("zk.connect","192.168.56.101:2181");
props2.put("serializer.class","kafka.serializer.StringEncoder");
//Thisisaddedbymyselfforchangingthedefaulttimeout6000.
props2.put("zk.connectiontimeout.ms","15000");
ProducerConfigconfig=newProducerConfig(props2);
Producer<String,String>producer=newProducer<String,String>(config);
//ThemessageissenttoarandomlyselectedpartitionregisteredinZK
ProducerData<String,String>data=newProducerData<String,String>(
"test","test-message,itisoknow.adsfasdf1111222");
producer.send(data);
producer.close();
}
}
packagecom.sillycat.magicneptune.example;
importjava.net.InetAddress;
importjava.net.UnknownHostException;
importkafka.api.FetchRequest;
importkafka.javaapi.consumer.SimpleConsumer;
importkafka.javaapi.message.ByteBufferMessageSet;
importkafka.message.MessageAndOffset;
publicclassTestConsumerMain{
publicstaticvoidmain(String[]args){
try{
System.out.println(InetAddress.getLocalHost().getHostAddress());
}catch(UnknownHostExceptione){
e.printStackTrace();
}
SimpleConsumerconsumer=newSimpleConsumer("192.168.56.101",9092,10000,
1024000);
longoffset=0;
while(true){
//createafetchrequestfortopictest,partition0,current
//offset,andfetchsizeof1MB
FetchRequestfetchRequest=newFetchRequest("test",0,offset,
1000000);
//getthemessagesetfromtheconsumerandprintthemout
ByteBufferMessageSetmessages=consumer.fetch(fetchRequest);
for(MessageAndOffsetmsg:messages){
System.out.println(ExampleUtils.getMessage(msg.message())+"offset="+offset);
//advancetheoffsetafterconsumingeachmessage
offset=msg.offset();
}
}
//consumer.close();
}
}
packagecom.sillycat.magicneptune.example;
importjava.nio.ByteBuffer;
importkafka.message.Message;
publicclassExampleUtils
{
publicstaticStringgetMessage(Messagemessage)
{
ByteBufferbuffer=message.payload();
byte[]bytes=newbyte[buffer.remaining()];
buffer.get(bytes);
returnnewString(bytes);
}
}
references:
http://www.jonzobrist.com/2012/04/17/install-apache-kafka-and-zookeeper-on-ubuntu-10-04/
https://github.com/harrah/xsbt/wiki/Getting-Started-Setup
http://incubator.apache.org/kafka/faq.html
http://incubator.apache.org/kafka/quickstart.html
http://blog.sina.com.cn/s/blog_3fe961ae01011o4z.html
http://incubator.apache.org/kafka/faq.html