Commit a26227dc authored by Martin Manchev's avatar Martin Manchev

Changed plugin file ...

parent 66303c4e
package main package main
import "github.com/fluent/fluent-bit-go/output"
import ( import (
"bytes"
"C" "C"
"bytes"
"fmt" "fmt"
"time" "net/http"
"os" "regexp"
"strconv"
"unsafe" "unsafe"
"net/http"
"crypto/tls"
"strings"
"github.com/fluent/fluent-bit-go/output"
) )
// configuration parameters
var user string
var password string
var uri string
var reference string
//export FLBPluginRegister //export FLBPluginRegister
func FLBPluginRegister(def unsafe.Pointer) int { func FLBPluginRegister(ctx unsafe.Pointer) int {
return output.FLBPluginRegister(def, "fluentbit_wendelin", "Fluentbit output plugin for wendelin") return output.FLBPluginRegister(ctx, "wendelin_out", "Wendelin Out GO!")
} }
//export FLBPluginInit //export FLBPluginInit
// (fluentbit will call this) // (fluentbit will call this)
// plugin (context) pointer to fluentbit context (state/ c code) // ctx (context) pointer to fluentbit context (state/ c code)
func FLBPluginInit(plugin unsafe.Pointer) int { func FLBPluginInit(ctx unsafe.Pointer) int {
streamtool_uri := output.FLBPluginConfigKey(plugin, "streamtool_uri") // Example to retrieve an optional configuration parameter
user := output.FLBPluginConfigKey(plugin, "user") // param := output.FLBPluginConfigKey(ctx, "param")
password := output.FLBPluginConfigKey(plugin, "password") user = output.FLBPluginConfigKey(ctx, "User")
buffer_type := output.FLBPluginConfigKey(plugin, "buffer_type") password = output.FLBPluginConfigKey(ctx, "Password")
flush_interval := output.FLBPluginConfigKey(plugin, "flush_interval") uri = output.FLBPluginConfigKey(ctx, "Uri")
disable_retry_limit := output.FLBPluginConfigKey(plugin, "disable_retry_limit") reference = output.FLBPluginConfigKey(ctx, "Reference")
reference := output.FLBPluginConfigKey(plugin, "reference")
dict := map[string]string{ //fmt.Printf("[flb-go] plugin parameter = '%s'\n", param)
"streamtool_uri": streamtool_uri, fmt.Printf("[flb-go user] plugin parameter = '%s'\n", user)
"user": user, fmt.Printf("[flb-go password] plugin parameter = '%s'\n", password)
"password": password, fmt.Printf("[flb-go uri] plugin parameter = '%s'\n", uri)
"buffer_type": buffer_type, fmt.Printf("[flb-go reference] plugin parameter = '%s'\n", reference)
"flush_interval": flush_interval,
"disable_retry_limit": disable_retry_limit,
"reference": reference,
}
output.FLBPluginSetContext(plugin, dict)
return output.FLB_OK return output.FLB_OK
} }
//export FLBPluginFlushCtx //export FLBPluginFlush
func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int { func FLBPluginFlush(data unsafe.Pointer, length C.int, tag *C.char) int {
var ret int
var ts interface{} request_string := uri + "/ingest?reference=" + reference
var record map[interface{}]interface{} var b []byte
// Create Fluent Bit decoder b = C.GoBytes(data, C.int(length))
dec := output.NewDecoder(data, int(length))
dict := output.FLBPluginGetContext(ctx).(map[string]string) hc := http.Client{}
// Iterate Records
var result string req, err := http.NewRequest("POST", request_string, bytes.NewBuffer(b))
result = ""
var is_end bool = false if err != nil {
for { return output.FLB_ERROR
// Extract Record
ret, ts, record = output.GetRecord(dec)
if ret != 0 {
break
}
var timestamp time.Time
switch t := ts.(type) {
case output.FLBTime:
timestamp = ts.(output.FLBTime).Time
case uint64:
timestamp = time.Unix(int64(t), 0)
default:
fmt.Println("time provided invalid, defaulting to now.")
timestamp = time.Now()
}
// Print record keys and values
result = result + C.GoString(tag) + ":" +timestamp.String()
for _, v := range record {
result += "["
var output_string string = ""
for _, s := range v.([]uint8) {
output_string = output_string + string(s)
}
if strings.Contains(output_string, "fluentbit_end") {
is_end = true
}
result = result + output_string + "]"
}
result += "\n"
} }
// Return options: req.Header.Set("Content-Type", "application/octet-stream")
// req.SetBasicAuth(user, password)
// output.FLB_OK = data have been processed.
// output.FLB_ERROR = unrecoverable error, do not try this again. resp, err := hc.Do(req)
// output.FLB_RETRY = retry to flush later.
//body result if err != nil {
// content type "application/octet-stream" return output.FLB_ERROR
var b = []byte(result) }
uri := fmt.Sprintf("%s/ingest?reference=%s", dict["streamtool_uri"], dict["reference"])
client := &http.Client{ /*
Transport: &http.Transport{ * Only allow the following HTTP status:
TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, *
}, * - 200: OK
} * - 201: Created
req, err := http.NewRequest("POST", uri, bytes.NewReader(b)) * - 202: Accepted
if err != nil { * - 203: no authorative resp
fmt.Fprintf(os.Stderr, "Got error %s", err.Error()) * - 204: No Content
return output.FLB_RETRY * - 205: Reset content
} */
req.SetBasicAuth(dict["user"], dict["password"]) re := regexp.MustCompile("[0-9]+") // get only the status code
req.Header.Set("Content-Type", "application/octet-stream") status_code := re.FindAllString(resp.Status, -1)
rsp, err := client.Do(req) resp_status, err := strconv.Atoi(status_code[0])
if err != nil { if err != nil {
fmt.Fprintf(os.Stderr, "got error %s", err.Error()) fmt.Println(err)
return output.FLB_RETRY return output.FLB_RETRY
} }
if rsp.StatusCode != 204 {
fmt.Fprintf(os.Stderr, "status code %d", rsp.StatusCode) fmt.Println(resp.Status)
return output.FLB_RETRY fmt.Println(err)
}
if is_end { if resp_status < 200 && resp_status > 205 {
os.Exit(0) return output.FLB_RETRY
} }
return output.FLB_OK
defer resp.Body.Close()
/*
* Return options:
*
* - output.FLB_OK = data have been processed.
* - output.FLB_ERROR = unrecoverable error, do not try this again.
* - output.FLB_RETRY = retry to flush later.
*/
return output.FLB_OK
} }
//export FLBPluginExit //export FLBPluginExit
...@@ -131,3 +110,4 @@ func FLBPluginExit() int { ...@@ -131,3 +110,4 @@ func FLBPluginExit() int {
func main() { func main() {
} }
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment