inhumming 2020-04-22
import java.sql.{DriverManager, PreparedStatement} import java.time.LocalDateTime import java.time.format.DateTimeFormatter import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} import scala.collection.mutable /** * spark-submit --master local[*] --jars /root/sparkjob/mysql-connector-java-5.1.38.jar * --class com.zxb.sparkapplication.readwrite.SparkWriteMysql /root/sparkjob/original-scalatest-1.0-SNAPSHOT.jar */ object SparkWriteMysql { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local").setAppName("spark write mysql") val sc = new SparkContext(conf) // 连接mysql相关配置信息 val driverClassName = "com.mysql.jdbc.Driver" val url = "jdbc:mysql://192.168.242.20:3306/test?characterEncoding=utf8&useSSL=false" val user = "root" val password = "123456" // 构造写入的数据 val logBuffer = mutable.ListBuffer[(String, String, String, String, String, String)]() val ofPattern = DateTimeFormatter.ofPattern("yyyy-MM-dd hh:mm:ss") for (i <- 1 to 100) { logBuffer.+=(("写" + i, "测试" + i, "localhost" + i, LocalDateTime.now().format(ofPattern), "spark", LocalDateTime.now().format(ofPattern))) } // 构造RDD val logRDD: RDD[(String, String, String, String, String, String)] = sc.makeRDD(logBuffer) // 按分区遍历(每个分区创建一个连接) logRDD.foreachPartition(logData=>{ Class.forName(driverClassName) val connection = DriverManager.getConnection(url,user,password) val sql = "insert into syslog(action, event, host, insertTime, userName, update_Time) values(?,?,?,?,?,?)" val statement:PreparedStatement = connection.prepareStatement(sql) try { logData.foreach { case (action, event, host, insertTime, userName, update_Time) => { statement.setString(1, action) statement.setString(2, event) statement.setString(3, host) statement.setString(4, insertTime) statement.setString(5, userName) statement.setString(6, update_Time) statement.executeUpdate() } } } finally{ if (statement != null) statement.close() if (connection != null) connection.close() } }) } }