YEEHOLIC 2020-06-03
<!--es 相关依赖开始--> <dependency> <groupId>io.searchbox</groupId> <artifactId>jest</artifactId> <version>6.3.1</version> </dependency> <dependency> <groupId>net.java.dev.jna</groupId> <artifactId>jna</artifactId> <version>4.5.2</version> </dependency> <dependency> <groupId>org.codehaus.janino</groupId> <artifactId>commons-compiler</artifactId> <version>2.7.8</version> </dependency> <!-- es 相关依赖结束 -->
import com.atguigu.gmall.realtime.bean.AlertInfo import io.searchbox.client.config.HttpClientConfig import io.searchbox.client.{JestClient, JestClientFactory} import io.searchbox.core.{Bulk, Index} import org.apache.spark.rdd.RDD /** * Author atguigu * Date 2020/6/3 13:58 */ object ESUtil { val factory = new JestClientFactory // 1.1.1 给工厂设置es的相关参数 val esUrl = "http://hadoop102:8300" //注意换成自己的端口(9200) val config = new HttpClientConfig.Builder(esUrl) .maxTotalConnection(100) // 允许的最多客户端的个数 .connTimeout(10000) // 连接es的超时时间 .readTimeout(10000) // 读取数据的超时时间 .multiThreaded(true) .build() factory.setHttpClientConfig(config) /** * 向es中插入单条数据 * * @param index * @param source * @param id */ def insertSingle(index: String, source: Object, id: String = null): Unit = { val client: JestClient = factory.getObject val action = new Index.Builder(source) .index(index) .`type`("_doc") .id(id) // 如果是传递的null, 则相当于没有传 .build() client.execute(action) client.shutdownClient() // 把客户端还给工厂 } /** * 批量插入 * * @param index * @param sources */ def insertBulk(index: String, sources: Iterator[Object]) = { val client: JestClient = factory.getObject val builder = new Bulk.Builder() .defaultIndex(index) .defaultType("_doc") // 在一个Bulk.Builder中add进去多个Action, 可以一次性交给es完成插入 // Object (id, object) sources.foreach { case (id: String, data) => val action = new Index.Builder(data) .id(id) .build() builder.addAction(action) case data => val action = new Index.Builder(data) .build() builder.addAction(action) } client.execute(builder.build()) client.shutdownClient() } }
另外一部分,则需要先做聚类、分类处理,将聚合出的分类结果存入ES集群的聚类索引中。数据处理层的聚合结果存入ES中的指定索引,同时将每个聚合主题相关的数据存入每个document下面的某个field下。