登峰小蚁 2020-01-10
1.代码
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.streaming.connectors.redis.RedisSinkimport org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfigimport org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}//温度传感器读取样例类case class SensorReading(id: String, timestamp: Long, temperature: Double)object RedisSinkTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) //source val inputStream = env.readTextFile("sensor1.txt") //transform import org.apache.flink.api.scala._ val dataStream = inputStream.map(x => { val arr = x.split(",") SensorReading(arr(0).trim, arr(1).trim.toLong, arr(2).trim.toDouble) }) //sink val conf = new FlinkJedisPoolConfig.Builder().setHost("localhost").setPort(6379).build() dataStream.addSink(new RedisSink[SensorReading](conf, new MyRedisMapper)) env.execute("redis sink test") }}//定义一个redis的mapper类,用于定义保存到redis时调用的命令class MyRedisMapper extends RedisMapper[SensorReading] { override def getCommandDescription: RedisCommandDescription = { //把传感器id和温度值保存成哈希表: HSET key field value new RedisCommandDescription(RedisCommand.HSET, "sensor_temperature") } //相当于是field override def getKeyFromData(data: SensorReading): String = { data.id } override def getValueFromData(data: SensorReading): String = { data.temperature.toString }}2.结果