From b9d9f30d569ab65bcc4582c51d2425146aab2ed8 Mon Sep 17 00:00:00 2001
From: Kirill Smelkov <kirr@nexedi.com>
Date: Mon, 25 May 2015 21:50:48 +0300
Subject: [PATCH] First draft version of code

This was pre-tested and know to ingest data ok, modulo some known TODO.
---
 .gitignore                           |  2 +
 Gemfile                              |  4 ++
 LICENSE.txt                          | 13 ++++
 README.md                            | 49 ++++++++++++++
 example/to_wendelin.conf             | 90 +++++++++++++++++++++++++
 fluent-plugin-wendelin.gemspec       | 17 +++++
 lib/fluent/plugin/out_wendelin.rb    | 88 +++++++++++++++++++++++++
 lib/fluent/plugin/wendelin_client.rb | 98 ++++++++++++++++++++++++++++
 test/ingest-torture.rb               | 20 ++++++
 test/ingest.rb                       | 11 ++++
 test/wendelin_test.rb                | 22 +++++++
 11 files changed, 414 insertions(+)
 create mode 100644 .gitignore
 create mode 100644 Gemfile
 create mode 100644 LICENSE.txt
 create mode 100644 README.md
 create mode 100644 example/to_wendelin.conf
 create mode 100644 fluent-plugin-wendelin.gemspec
 create mode 100644 lib/fluent/plugin/out_wendelin.rb
 create mode 100644 lib/fluent/plugin/wendelin_client.rb
 create mode 100755 test/ingest-torture.rb
 create mode 100755 test/ingest.rb
 create mode 100644 test/wendelin_test.rb

diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..6d08d63
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,2 @@
+/.bundle/
+/Gemfile.lock
diff --git a/Gemfile b/Gemfile
new file mode 100644
index 0000000..109fec1
--- /dev/null
+++ b/Gemfile
@@ -0,0 +1,4 @@
+source 'https://rubygems.org'
+
+# gem's dependencies are in .gemspec
+gemspec
diff --git a/LICENSE.txt b/LICENSE.txt
new file mode 100644
index 0000000..5a55372
--- /dev/null
+++ b/LICENSE.txt
@@ -0,0 +1,13 @@
+Copyright (C) 2015 Nexedi SA and Contributors
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..1184556
--- /dev/null
+++ b/README.md
@@ -0,0 +1,49 @@
+fluent-plugin-wendelin
+======================
+
+This is output plugin for [Fluentd][] to forward data to [Wendelin][] system.
+
+[Fluentd]:  http://fluentd.org/
+[Wendelin]: http://wendelin.io/
+
+
+Configuration
+-------------
+
+A sample configuration to setup ingestion of data for a tag to Wendelin:
+
+```apache
+<match your.sensor.tag>
+  @type wendelin
+  @id wendelin_out
+
+  streamtool_uri    <Wendelin_URL>/erp5/portal_ingestion_policies/<YOUR_INGESTION_POLICY_ID>
+  user      <your_wendelin_user>
+  password  <your_wendelin_password>
+
+  # all parameters of BufferedOutput & Output classes are supported too, e.g.
+  # `buffer_type`, `flush_interval`, `num_threads`, `log_level`, etc - see
+  # their code near:
+  #
+  #   https://github.com/fluent/fluentd/blob/master/lib/fluent/output.rb#L182
+  #
+  # logging setup description:
+  #
+  #   http://docs.fluentd.org/articles/logging
+
+  buffer_type       memory
+  #buffer_path       "#{ENV['HOME']/var}"
+  flush_interval    5s
+</match>
+```
+
+See *example/to_wendelin.conf* for fully setup example.
+
+
+TODO
+----
+
+- make content to be posted over HTTP as raw (instead of urlencode data).
+- HTTPS certificates are currently not verified.
+- X.509 certificates instead of user / password.
+- cleanup
diff --git a/example/to_wendelin.conf b/example/to_wendelin.conf
new file mode 100644
index 0000000..ff267ff
--- /dev/null
+++ b/example/to_wendelin.conf
@@ -0,0 +1,90 @@
+# Example configuration of Wendelin output for Fluentd
+
+## built-in TCP input, so we can accept messages from other fluentd(s) and so
+## that fluent-cat works:
+## $ echo <json|msgpack> | fluent-cat <tag>
+<source>
+  @type forward
+  @id forward_input
+</source>
+
+
+# HTTP input
+# http://localhost:8888/<tag>?json=<json>
+<source>
+  @type http
+  @id http_input
+  port 8888
+</source>
+
+
+## output tag=sensor.** to Wendelin
+<match <YOUR_SENSOR_TAG>.**>
+  @type wendelin
+  @id wendelin_out
+
+  streamtool_uri    <Wendelin_URL>/erp5/portal_ingestion_policies/<YOUR_INGESTION_POLICY_ID>
+  # TODO ^^^ do not check peer's certificate
+  user      <your_wendelin_user>
+  password  <your_wendelin_password>
+
+  # all parameters of BufferedOutput & Output classes are supported too, e.g.
+  # `buffer_type`, `flush_interval`, `num_threads`, `log_level`, etc - see
+  # their code near:
+  #
+  #   https://github.com/fluent/fluentd/blob/master/lib/fluent/output.rb#L182
+  #
+  # logging setup description:
+  #
+  #   http://docs.fluentd.org/articles/logging
+
+  buffer_type       memory
+  #buffer_path       "#{ENV['HOME']/var}"
+  flush_interval    5s
+</match>
+
+
+
+
+# ---- monitoring & debugging ----
+
+
+# Listen HTTP for monitoring
+# http://localhost:24220/api/plugins
+# http://localhost:24220/api/plugins?type=TYPE
+# http://localhost:24220/api/plugins?tag=MYTAG
+<source>
+  @type monitor_agent
+  @id monitor_agent_input
+
+  port 24220
+</source>
+
+# Listen DRb for debug
+<source>
+  @type debug_agent
+  @id debug_agent_input
+
+  bind 127.0.0.1
+  port 24230
+</source>
+
+
+## match tag=debug.** and dump to console
+<match debug.**>
+  @type stdout
+  @id stdout_output
+</match>
+
+
+## match fluent's internal events
+#<match fluent.**>
+#  @type null
+#</match>
+
+## match not matched logs and write to file
+#<match **>
+#  @type file
+#  path /var/log/fluent/else
+#  compress gz
+#</match>
diff --git a/fluent-plugin-wendelin.gemspec b/fluent-plugin-wendelin.gemspec
new file mode 100644
index 0000000..a0d922e
--- /dev/null
+++ b/fluent-plugin-wendelin.gemspec
@@ -0,0 +1,17 @@
+Gem::Specification.new do |gem|
+    gem.name            = "fluent-plugin-wendelin"
+    gem.version         = "0.1.alpha1"
+    gem.authors         = ["Kirill Smelkov"]
+    gem.email           = ["kirr@nexedi.com"]
+    gem.summary         = %q{Fluentd output to Wendelin}
+    gem.description     = %q{Fluentd output plugin to forward data to Wendelin system}
+    gem.homepage        = "https://lab.nexedi.cn/nexedi/fluent-plugin-wendelin"
+    gem.license         = "APLv2"
+
+    gem.files           = `git ls-files -z`.split("\x0")
+    gem.require_paths   = ["lib"]
+
+    gem.add_runtime_dependency "fluentd", "~> 0"
+    # ? net/http
+    # ? openssl
+end
diff --git a/lib/fluent/plugin/out_wendelin.rb b/lib/fluent/plugin/out_wendelin.rb
new file mode 100644
index 0000000..3fcfdff
--- /dev/null
+++ b/lib/fluent/plugin/out_wendelin.rb
@@ -0,0 +1,88 @@
+# Fluentd output plugin to ingest data to Wendelin system
+# Copyright (C) 2015  Nexedi SA and Contributors.
+#                     Kirill Smelkov <kirr@nexedi.com>
+#
+#   This program is free software: you can Use, Study, Modify and Redistribute
+#   it under the terms of the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+
+
+# This plugin hooks into Fluentd in a way similiar to out_forward and
+# out_secure_forward and outputs event stream to a Wendelin HTTP endpoint.
+
+require 'fluent/output'
+require_relative 'wendelin_client'
+
+
+module Fluent
+
+  class WendelinOutput < ObjectBufferedOutput   # XXX verify base class
+    Plugin.register_output('wendelin', self)
+
+    # where Wendelin's Input Stream Tool is located,
+    # ex http://example.com/erp5/portal_ingestion_policies/example_ingestion
+    config_param    :streamtool_uri,    :string
+
+    # credentials to authenticate this fluentd to wendelin
+    # by default credentials are not used
+    # TODO user/password -> certificate
+    config_param    :user,      :string,  :default => nil
+    config_param    :password,  :string,  :default => nil
+
+
+    def configure(conf)
+        super
+
+        credentials = {}
+        if @user
+            credentials['user']     = @user
+            credentials['password'] = @password
+        end
+        @wendelin = WendelinClient.new(@streamtool_uri, credentials)
+    end
+
+    def start
+        super
+        # TODO
+    end
+
+    def shutdown
+        super
+        # TODO
+    end
+
+
+    # hooked to ObjectBufferedOutput - write out records from a buffer chunk for a tag.
+    #
+    # NOTE this is called from a separate thread (see OutputThread and BufferedOutput)
+    def write_objects(tag, chunk)
+        # NOTE if this fail and raises -> it will unroll to BufferedOutput#try_flush
+        # which detects errors and retries outputting logs up to retry maxcount
+        # times and aborts outputting current logs if all try fail.
+        #
+        # This way, we don't need to code rescue here.
+
+        # NOTE tag is 1, and chunk stores an event stream, usually [] of
+        # (timestamp, record) in msgpack, but it general it could be arbitrary
+        # data - we send it as-is.
+        data_chunk = chunk.read()
+
+        # for input_stream_ref use tag as-is - it will be processed/translated
+        # further on server by Wendelin
+        reference = tag
+
+        @wendelin.ingest(reference, data_chunk)
+    end
+
+  end
+
+end
diff --git a/lib/fluent/plugin/wendelin_client.rb b/lib/fluent/plugin/wendelin_client.rb
new file mode 100644
index 0000000..39c16bb
--- /dev/null
+++ b/lib/fluent/plugin/wendelin_client.rb
@@ -0,0 +1,98 @@
+# module to ingest data to a Wendelin through HTTP endpoint
+# Copyright (C) 2015  Nexedi SA and Contributors.
+#                     Kirill Smelkov <kirr@nexedi.com>
+#
+#   This program is free software: you can Use, Study, Modify and Redistribute
+#   it under the terms of the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+
+
+require 'net/http'
+require 'openssl'
+
+# show request/response traces or not
+$trace    = false
+
+
+# class representing a Wendelin client
+class WendelinClient
+
+  # `streamtool_uri` - URI pointing to portal_input_data_stream "mountpoint"
+  # `credentials`    # {'user' => _, 'password' => _}     TODO change to certificate
+  def initialize(streamtool_uri, credentials)
+    @streamtool_uri = streamtool_uri
+    @credentials    = credentials
+  end
+
+
+  # ingest `data_chunk` to a stream referenced as `reference`
+  def ingest(reference, data_chunk)
+      uri = URI("#{@streamtool_uri}/ingest?reference=#{reference}")
+      req = Net::HTTP::Post.new(uri)
+      if @credentials.has_key?('user')
+        req.basic_auth @credentials['user'], @credentials['password']
+      end
+
+      # TODO ensure content-type is 'raw', e.g. this way
+      # (but then querystring ?reference=... is lost)
+      # req.body = data_chunk
+      # req.content_type = 'application/octet-stream'
+      req.set_form_data('data_chunk' => data_chunk)
+
+      if $trace
+          puts '>>> REQUEST'
+          puts "method\t=> #{req.method}"
+          puts "path\t=> #{req.path}"
+          puts "uri\t=> #{req.uri}"
+          puts "body\t=> #{req.body}"
+          puts "body_stream\t=> #{req.body_stream}"
+          req.each {|h| puts "#{h}:\t#{req[h]}"}
+          puts
+      end
+
+      begin
+          # TODO keep connection open (so that every new ingest does not do
+          # full connect again)
+          res = Net::HTTP.start(uri.hostname, uri.port,
+                      :use_ssl      => (uri.scheme == 'https'),
+                      # NOTE = "do not check server cert"
+                      # TODO move this out to conf parameters
+                      :verify_mode  => OpenSSL::SSL::VERIFY_NONE
+                ) do |http|
+              http.request(req)
+          end
+
+      rescue
+          # some http/ssl/other connection error
+          puts "HTTP ERROR:"
+          raise
+
+      else
+          if $trace
+              puts '>>> RESPONSE'
+              res.each {|h| puts "#{h}:\t#{res[h]}"}
+              puts "code\t=> #{res.code}"
+              puts "msg\t=> #{res.message}"
+              puts "class\t=> #{res.class}"
+              puts "body:", res.body
+          end
+
+          if res.kind_of?(Net::HTTPSuccess)       # res.code is 2XX
+              #puts "ingested ok"
+          else
+              puts "FAIL:"
+              res.value
+          end
+      end
+  end
+
+end
diff --git a/test/ingest-torture.rb b/test/ingest-torture.rb
new file mode 100755
index 0000000..acc7e0c
--- /dev/null
+++ b/test/ingest-torture.rb
@@ -0,0 +1,20 @@
+#!/usr/bin/env ruby
+# ingest several data streams concurently forever
+require_relative 'wendelin_test'
+
+def runingest(stream_ref)
+    i = 0
+    while true
+        data_chunk = ", #{i} (#{stream_ref})"
+        #puts "ingest #{stream_ref}, #{data_chunk}"
+        ingest stream_ref, data_chunk
+        i = i + 1
+        sleep 0.1
+    end
+end
+
+
+t31 = Thread.new { runingest 'ABC-HELLO-01.cat' }
+t32 = Thread.new { runingest 'ABC-HELLO-01.tac' }
+
+t31.join    # wait forever
diff --git a/test/ingest.rb b/test/ingest.rb
new file mode 100755
index 0000000..e884c19
--- /dev/null
+++ b/test/ingest.rb
@@ -0,0 +1,11 @@
+#!/usr/bin/env ruby
+# manually test erp5_data_streams ingestion via Ruby HTTP lib
+# XXX draft/stub
+
+require_relative 'wendelin_test'
+require 'msgpack'
+
+#puts [1,2,'hello'].to_msgpack
+ingest 'ABC-HELLO-01.cat', [1,2,'hello'].to_msgpack
+#ingest 'ABC-HELLO-01.cat', ', hello'
+#ingest 'ABC-HELLO-01.tac', ', magic'
diff --git a/test/wendelin_test.rb b/test/wendelin_test.rb
new file mode 100644
index 0000000..6222f8a
--- /dev/null
+++ b/test/wendelin_test.rb
@@ -0,0 +1,22 @@
+# simple ingest() for wendelin tests
+# (without need for providing streamtool uri, credentials, etc)
+
+# load wendelin_clinet.rb from in-tree
+here = File.dirname(__FILE__)
+$LOAD_PATH << File.expand_path(File.join(here, '../lib'))
+require 'fluent/plugin/wendelin_client'
+
+# FIXME hardcoded
+$user     = 'user'
+$password = 'password'
+
+# FIXME hardcoded & shortcut for tests
+$erp5       = "https://example.com/erp5"                # erp5 root
+$streamtool = "#{$erp5}/portal_input_data_streams"      # where Input Stream Tool is mounted
+
+$wendelin   = WendelinClient.new($streamtool, {'user' => $user, 'password' => $password})
+
+
+def ingest(input_stream_ref, data_chunk)
+    $wendelin.ingestDataChunk(input_stream_ref, data_chunk)
+end
-- 
2.30.9