myt0 2016-01-10
Flume 集群搭建 ,配置了2个sink,负载均衡
三台服务器,分别是
192.168.134.131 master
192.168.134.132 datanodea
192.168.134.133 datanodeb
集群的模式是这样的:
Master
|
=====================
|| ||
DataNodeA DataNodeB
这是master
#agent agent.channels = channel agent.sources = source agent.sinks = node1 node2 agent.sinkgroups = g1 agent.sinkgroups.g1.sinks = node1 node2 agent.sinkgroups.g1.processor.type = load_balance agent.sinkgroups.g1.processor.backoff = true agent.sinkgroups.g1.processor.selector = round_robin agent.sinkgroups.g1.processor.selector.maxTimeOut=10000 #channel agent.channels.channel.type = memory agent.channels.channel.capacity = 1000000 agent.channels.channel.transactionCapacity = 1000000 agent.channels.channel.keep-alive = 10 #source agent.sources.source.channels = channel agent.sources.source.type = avro agent.sources.source.bind = master agent.sources.source.port = 41414 agent.sources.source.threads = 5 #sink #node1 agent.sinks.node1.channel = channel agent.sinks.node1.type = avro agent.sinks.node1.hostname = datanodea agent.sinks.node1.port = 41414 #node2 agent.sinks.node2.channel = channel agent.sinks.node2.type = avro agent.sinks.node2.hostname = datanodeb agent.sinks.node2.port = 41414
这是datanodea
#agent agent.channels = ch1 ch2 agent.sources = source agent.sinks = elasticsearch file agent.source.source.selector.type = replicating #channel agent.channels.ch1.type = memory agent.channels.ch1.capacity = 1000000 agent.channels.ch1.transactionCapacity = 1000000 agent.channels.ch1.keep-alive = 10 agent.channels.ch2.type = memory agent.channels.ch2.capacity = 1000000 agent.channels.ch2.transactionCapacity = 1000000 agent.channels.ch2.keep-alive = 10 #source agent.sources.source.channels = ch1 ch2 agent.sources.source.type = avro agent.sources.source.bind = datanodea agent.sources.source.port = 41414 agent.sources.source.threads = 5 #sink agent.sinks.file.channel = ch1 agent.sinks.file.type = file_roll agent.sinks.file.sink.directory = /opt/flume/data agent.sinks.file.sink.serializer = TEXT agent.sinks.elasticsearch.channel = ch2 agent.sinks.elasticsearch.type = elasticsearch agent.sinks.elasticsearch.hostNames = master:9300 agent.sinks.elasticsearch.indexName = flume_index agent.sinks.elasticsearch.indexType = flume_type agent.sinks.elasticsearch.clusterName = elasticsearch agent.sinks.elasticsearch.batchSize = 1 agent.sinks.elasticsearch.serializer = org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer
这是datanodeb
#agent agent.channels = ch1 ch2 agent.sources = source agent.sinks = elasticsearch file agent.source.source.selector.type = replicating #channel agent.channels.ch1.type = memory agent.channels.ch1.capacity = 1000000 agent.channels.ch1.transactionCapacity = 1000000 agent.channels.ch1.keep-alive = 10 agent.channels.ch2.type = memory agent.channels.ch2.capacity = 1000000 agent.channels.ch2.transactionCapacity = 1000000 agent.channels.ch2.keep-alive = 10 #source agent.sources.source.channels = ch1 ch2 agent.sources.source.type = avro agent.sources.source.bind = datanodeb agent.sources.source.port = 41414 agent.sources.source.threads = 5 #sink agent.sinks.file.channel = ch1 agent.sinks.file.type = file_roll agent.sinks.file.sink.directory = /opt/flume/data agent.sinks.file.sink.serializer = TEXT agent.sinks.elasticsearch.channel = ch2 agent.sinks.elasticsearch.type = elasticsearch agent.sinks.elasticsearch.hostNames = master:9300 agent.sinks.elasticsearch.indexName = flume_index agent.sinks.elasticsearch.indexType = flume_type agent.sinks.elasticsearch.clusterName = elasticsearch agent.sinks.elasticsearch.batchSize = 1 agent.sinks.elasticsearch.serializer = org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer