k8s与日志--采用golang实现Fluent Bit的output插件

tmj 2019-06-27

采用golang实现Fluent Bit的output插件

前言

目前社区日志采集和处理的组件不少,之前elk方案中的logstash,cncf社区中的fluentd,efk方案中的filebeat,以及大数据用到比较多的flume。而Fluent Bit是一款用c语言编写的高性能的日志收集组件,整个架构源于fluentd。官方比较数据如下:

FluentdFluent Bit
ScopeContainers / ServersContainers / Servers
LanguageC & RubyC
Memory~40MB~450KB
PerformanceHigh PerformanceHigh Performance
DependenciesBuilt as a Ruby Gem, it requires a certain number of gems.Zero dependencies, unless some special plugin requires them.
PluginsMore than 650 plugins availableAround 35 plugins available
LicenseApache License v2.0Apache License v2.0

通过数据可以看出,fluent bit 占用资源更少。适合采用fluent bit + fluentd 的方案,实现日志中心化收集的方案。fluent bit主要负责采集,fluentd负责处理和传送。

扩展output插件

fluent bit 本身是C语言编写,扩展插件有一定的难度。可能官方考虑到这一点,实现了fluent-bit-go,可以实现采用go语言来编写插件,目前只支持output的编写。
fluent-bit-go其实就是利用cgo,封装了c接口。代码比较简单,主要分析其中一个关键文件

package output

/*
#include <stdlib.h>
#include "flb_plugin.h"
#include "flb_output.h"
*/
import "C"
import "fmt"
import "unsafe"

// Define constants matching Fluent Bit core
const FLB_ERROR               =  C.FLB_ERROR
const FLB_OK                  =  C.FLB_OK
const FLB_RETRY               =  C.FLB_RETRY

const FLB_PROXY_OUTPUT_PLUGIN =  C.FLB_PROXY_OUTPUT_PLUGIN
const FLB_PROXY_GOLANG        =  C.FLB_PROXY_GOLANG

// Local type to define a plugin definition
type FLBPlugin C.struct_flb_plugin_proxy
type FLBOutPlugin C.struct_flbgo_output_plugin

// When the FLBPluginInit is triggered by Fluent Bit, a plugin context
// is passed and the next step is to invoke this FLBPluginRegister() function
// to fill the required information: type, proxy type, flags name and
// description.
func FLBPluginRegister(ctx unsafe.Pointer, name string, desc string) int {
    p := (*FLBPlugin) (unsafe.Pointer(ctx))
    p._type = FLB_PROXY_OUTPUT_PLUGIN
    p.proxy = FLB_PROXY_GOLANG
    p.flags = 0
    p.name  = C.CString(name)
    p.description = C.CString(desc)
    return 0
}

// Release resources allocated by the plugin initialization
func FLBPluginUnregister(ctx unsafe.Pointer) {
    p := (*FLBPlugin) (unsafe.Pointer(ctx))
    fmt.Printf("[flbgo] unregistering %v\n", p)
    C.free(unsafe.Pointer(p.name))
    C.free(unsafe.Pointer(p.description))
}

func FLBPluginConfigKey(ctx unsafe.Pointer, key string) string {
    _key := C.CString(key)
    return C.GoString(C.output_get_property(_key, unsafe.Pointer(ctx)))
}

主要是定义了一些编写插件需要用到的变量和方法,例如FLBPluginRegister注册组件,FLBPluginConfigKey获取配置文件设定参数等。
PS
实际上用golang调用fluent-bit-go,再加一些实际的业务逻辑实现,最终编译成一个c-share的.so动态链接库。

定制fluent-bit-kafka-ouput插件

实际上,fluent-bit v0.13版本以后就提供了kafka output的插件,但是实际项目中,并不满足我们的需求,必须定制化。
当然接下来的代码主要是作为一个demo,讲清楚如何编写一个output插件。

代码编写和分析

先上代码:

package main

import (
    "C"
    "fmt"
    "io"
    "log"
    "reflect"
    "strconv"
    "strings"
    "time"
    "unsafe"

    "github.com/Shopify/sarama"
    "github.com/fluent/fluent-bit-go/output"
    "github.com/ugorji/go/codec"
)

var (
    brokers    []string
    producer   sarama.SyncProducer
    timeout    = 0 * time.Minute
    topic      string
    module     string
    messageKey string
)

//export FLBPluginRegister
func FLBPluginRegister(ctx unsafe.Pointer) int {
    return output.FLBPluginRegister(ctx, "out_kafka", "Kafka Output Plugin.!")
}

//export FLBPluginInit
// ctx (context) pointer to fluentbit context (state/ c code)
func FLBPluginInit(ctx unsafe.Pointer) int {

    if bs := output.FLBPluginConfigKey(ctx, "brokers"); bs != "" {
        brokers = strings.Split(bs, ",")
    } else {
        log.Printf("you must set brokers")
        return output.FLB_ERROR
    }

    if tp := output.FLBPluginConfigKey(ctx, "topics"); tp != "" {
        topic = tp
    } else {
        log.Printf("you must set topics")
        return output.FLB_ERROR
    }

    if mo := output.FLBPluginConfigKey(ctx, "module"); mo != "" {
        module = mo
    } else {
        log.Printf("you must set module")
        return output.FLB_ERROR
    }

    if key := output.FLBPluginConfigKey(ctx, "message_key"); key != "" {
        messageKey = key
    } else {
        log.Printf("you must set message_key")
        return output.FLB_ERROR
    }

    config := sarama.NewConfig()
    config.Producer.Return.Successes = true

    if required_acks := output.FLBPluginConfigKey(ctx, "required_acks"); required_acks != "" {
        if acks, err := strconv.Atoi(required_acks); err == nil {
            config.Producer.RequiredAcks = sarama.RequiredAcks(acks)
        }
    }

    if compression_codec := output.FLBPluginConfigKey(ctx, "compression_codec"); compression_codec != "" {
        if codec, err := strconv.Atoi(compression_codec); err == nil {
            config.Producer.Compression = sarama.CompressionCodec(codec)
        }
    }

    if max_retry := output.FLBPluginConfigKey(ctx, "max_retry"); max_retry != "" {
        if max_retry, err := strconv.Atoi(max_retry); err == nil {
            config.Producer.Retry.Max = max_retry
        }
    }

    if timeout == 0 {
        timeout = 5 * time.Minute
    }
    // If Kafka is not running on init, wait to connect
    deadline := time.Now().Add(timeout)
    for tries := 0; time.Now().Before(deadline); tries++ {
        var err error
        if producer == nil {
            producer, err = sarama.NewSyncProducer(brokers, config)
        }
        if err == nil {
            return output.FLB_OK
        }
        log.Printf("Cannot connect to Kafka: (%s) retrying...", err)
        time.Sleep(time.Second * 30)
    }

    log.Printf("Kafka failed to respond after %s", timeout)
    return output.FLB_ERROR

}

//export FLBPluginFlush
// FLBPluginFlush is called from fluent-bit when data need to be sent. is called from fluent-bit when data need to be sent.
func FLBPluginFlush(data unsafe.Pointer, length C.int, tag *C.char) int {
    var h codec.MsgpackHandle

    var b []byte
    var m interface{}
    var err error

    b = C.GoBytes(data, length)
    dec := codec.NewDecoderBytes(b, &h)

    // Iterate the original MessagePack array
    var msgs []*sarama.ProducerMessage
    for {
        // decode the msgpack data
        err = dec.Decode(&m)
        if err != nil {
            if err == io.EOF {
                break
            }
            log.Printf("Failed to decode msgpack data: %v\n", err)
            return output.FLB_ERROR
        }

        // Get a slice and their two entries: timestamp and map
        slice := reflect.ValueOf(m)
        data := slice.Index(1)

        // Convert slice data to a real map and iterate
        mapData := data.Interface().(map[interface{}]interface{})
        flattenData, err := Flatten(mapData, "", UnderscoreStyle)
        if err != nil {
            break
        }

        message := ""
        host := ""

        for k, v := range flattenData {
            value := ""
            switch t := v.(type) {
            case string:
                value = t
            case []byte:
                value = string(t)
            default:
                value = fmt.Sprintf("%v", v)
            }

            if k == "pod_name" {
                host = value
            }

            if k == messageKey {
                message = value
            }

        }
        if message == "" || host == "" {
            break
        }

        m := &sarama.ProducerMessage{
            Topic: topic,
            Key:   sarama.StringEncoder(fmt.Sprintf("host=%s|module=%s", host, module)),
            Value: sarama.ByteEncoder(message),
        }
        msgs = append(msgs, m)

    }

    err = producer.SendMessages(msgs)
    if err != nil {
        log.Printf("FAILED to send kafka message: %s\n", err)
        return output.FLB_ERROR
    }
    return output.FLB_OK
}

//export FLBPluginExit
func FLBPluginExit() int {
    producer.Close()
    return output.FLB_OK
}

func main() {
}
  • FLBPluginExit 插件退出的时候需要执行的一些方法,比如关闭连接。
  • FLBPluginRegister 注册插件
  • FLBPluginInit 插件初始化
  • FLBPluginFlush flush到数据到output
  • FLBPluginConfigKey 获取配置文件中参数

PS
当然除了FLBPluginConfigKey之外,也可以通过获取环境变量来获得设置参数。
ctx相当于一个上下文,负责之间的数据的传递。

编译和执行

编译的时候

go build -buildmode=c-shared -o out_kafka.so .

生成out_kafka.so

执行的时候

/fluent-bit/bin/fluent-bit" -c /fluent-bit/etc/fluent-bit.conf -e /fluent-bit/out_kafka.so

总结

采用类似的编写结构,就可以定制化自己的输出插件了。

相关推荐