spark streaming 与 kafka 集成测试

Tinazhou 2016-11-22

版本:
kafka:2.11
spark:2.0.2
测试过程:
1、开发spark streaming程序,读取kafka队列数据,并进行处理;
2、启动spark、zookeeper及kafka;
3、启动log4j输出到kafka的程序,先用kafka receive console程序验证其正确性;
4、启动spark streaming程序,观察执行效果,启动命令如下:
spark-submit --class com.itown.bigdata.kafka.KafkaReader /usr/hadoop/jar/sparkApp-0.0.1-SNAPSHOT-jar-with-dependencies.jar
 
开发过程:
1、java类:
注意:
 
package com.itown.bigdata.kafka;
 
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.regex.Pattern;
 
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
 
import scala.Tuple2;
 
import com.google.common.collect.Lists;
 
public class KafkaReader {
static final Pattern SPACE = Pattern.compile(" ");
 
public static void main(String[] args) {
// 每个话题的分片数
int numThreads = 2;
SparkConf sparkConf = new SparkConf().setAppName("KafkaWordCount")
.setMaster("local[2]");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
new Duration(10000));
 
Map<String, Object> kafkaParams = new HashMap<String, Object>();
kafkaParams.put("bootstrap.servers", "localhost:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", false);
 
Collection<String> topics = Arrays.asList("test", "test2");
 
final JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils
.createDirectStream(jssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String> Subscribe(topics,
kafkaParams));
 
JavaDStream<String> words = stream
.flatMap(new FlatMapFunction<ConsumerRecord<String, String>, String>() {
public Iterator<String> call(
ConsumerRecord<String, String> t) throws Exception {
System.out.println(">>>" + t.value());
 
return Lists.newArrayList(SPACE.split(t.value()))
.iterator();
}
});
 
// 对其中的单词进行统计
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String s) {
return new Tuple2<String, Integer>(s, 1);
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
 
// 打印结果
wordCounts.print();
 
try {
jssc.start();
jssc.awaitTermination();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
2、maven pom.xml
注意:
1)spark-streaming-kafka的引用部分
2)打包同时将依赖包也打了进来
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
 
<groupId>com.itown.bigdata</groupId>
<artifactId>sparkApp</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
 
<name>sparkApp</name>
<url>http://maven.apache.org</url>
 
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
 
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.0.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.3</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.0.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.0.2</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.2.4</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
 
<plugin>
<artifactId> maven-assembly-plugin </artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
 
</plugins>
</build>
 
<repositories>
<repository>
<id>central</id>
<name>central</name>
<url>http://central.maven.org/maven2/</url>
<releases>
<enabled>true</enabled>
</releases>
</repository>
</repositories>
</project>
 
 
 
 
 

相关推荐

ganyouxianjava / 0评论 2012-05-31