农村外出务工男 2015-05-28
SparkStreaming(1)SparkStreamingConceptandZookeeper/KafkaonLocal
IwasusingSparkformorethan1yearnow,from0.7to0.9onproduction.RecentlyIcamebacktoSparkandconsideringupgradetheversionto1.3.1.Therearealotofnewthingsandgoodideaafter0.9.
1.Introduction
StandaloneCluster
mastermachineisasinglepoint.
https://spark.apache.org/docs/latest/spark-standalone.html#standby-masters-with-zookeeper
Wehaveoptions#1usezookeepertomanageseveralmasters
spark-env.sh,
spark.deploy.recoveryMode,defaultvalueisNONE,shouldbechangedtoZOOKEEPER
spark.deploy.zookeeper.url,eg,192.168.1.100:2181,192.168.1.102:2181
spark.deploy.zookeeper.dir,eg,/spark
Forsparkjob,standaloneclusterwillhaveallthejarsandfilesintheworkingdirectory,weneedtosetspark.worker.cleanup.appDataTtltocleanthem.ButYARNclusterwillautomaticallydothat.
ClusterJobSchedule
standalonecluster-FIFO,spark.cores.maxandspark.deploy.defaultCoresandotherstosethowmuchresourceoneapplicationcanuse.
mesos
YARN-—num-executor,—executor-memoryandetc.
SparkStreaming
sourcefromkafka,flume,twitter,zeromq,kinesis
originalDStreamtime1time2time3time4time5
windowedDStreamwindowtime1windowtime2
checkpoint
ssc.checkpoint(hdfsPath),usuallycheckpointtimewillbe5-10timessliding
dstream.checkpoint(checkpointInterval)
receivethestreaminginparallel,
valnumstreams=5
valkafkaStreams=(1tonumStreams).map{i=>KafkaUtils.createStream(…)}
valunifiedStream=streamingContext.union(kafkaStreams)
RecoverytheTaskfromCheckpoint
deffunctionToCreateContext():StreamingContext={
valssc=newStreamingContext(...)
vallines=sac.socketTextStream(...)
...
ssc.checkpoint(checkpointDirectory)
ssc
}
valcontext=StreamingContext.getOrCreate()checkpointDirectory,functionToCreateContext_)
context....
context.start()
context.awaitTermination()
2.Zookeeper
Installzookeeper
>wgethttp://apache.mesi.com.ar/zookeeper/stable/zookeeper-3.4.6.tar.gz
Unzipthat,Placeitintheworkingdirectory,addthebintothepath.
Setuptheconfiguration
>cpconf/zoo_sample.cfgconf/zoo.cfg
StarttheServer
>zkServer.shstartzoo.cfg
Checkstatus
>zkServer.shstatus
Or
>jps
2194
2294QuorumPeerMain
2330Jps
Connectfromclient
>zkCli.sh-serverlocalhost:2181
zookeeper>help
zookeeper>quit
3.Kafka
Downloadthebinarywithversion8.2.1
>wgethttp://psg.mtu.edu/pub/apache/kafka/0.8.2.1/kafka_2.10-0.8.2.1.tgz
PlacethatintheworkingdirectoryandAddthattopath
Commandtostartkafka
>kafka-server-start.shconfig/server.properties
Createatopic
>bin/kafka-topics.sh--create--zookeeperlocalhost:2181--replication-factor1--partitions1--topictest
Createdtopic"test".
Listthetopic
>bin/kafka-topics.sh--list--zookeeperlocalhost:2181
test
Producersendingsomemessages
>bin/kafka-console-producer.sh--broker-listlocalhost:9092--topictest
StartaConsumer
>bin/kafka-console-consumer.sh--zookeeperlocalhost:2181--topictest--from-beginning
References:
http://uohzoaix.github.io/studies/categories/#spark
sparkstreaming
http://dataunion.org/15193.html
http://dataunion.org/6308.html
http://colobu.com/2015/01/05/kafka-spark-streaming-integration-summary/