diff --git a/.gitignore b/.gitignore index ae3fdc2..70d4d2f 100644 --- a/.gitignore +++ b/.gitignore @@ -12,3 +12,4 @@ *.o *.a mkmf.log +*.gem diff --git a/fluent-plugin-azureeventhubs.gemspec b/fluent-plugin-azureeventhubs.gemspec index 732003d..f381e8a 100644 --- a/fluent-plugin-azureeventhubs.gemspec +++ b/fluent-plugin-azureeventhubs.gemspec @@ -4,9 +4,9 @@ $LOAD_PATH.unshift(lib) unless $LOAD_PATH.include?(lib) Gem::Specification.new do |spec| spec.name = "fluent-plugin-azureeventhubs" - spec.version = "0.0.5" - spec.authors = ["Hidemasa Togashi", "Justin Seely"] - spec.email = ["togachiro@gmail.com"] + spec.version = "0.0.6" + spec.authors = ["Hidemasa Togashi", "Toddy Mladenov", "Justin Seely"] + spec.email = ["togachiro@gmail.com", "toddysm@gmail.com"] spec.summary = "Fluentd output plugin for Azure Event Hubs" spec.description = "Fluentd output plugin for Azure Event Hubs" spec.homepage = "https://github.com/htgc/fluent-plugin-azureeventhubs" @@ -19,4 +19,5 @@ Gem::Specification.new do |spec| spec.add_development_dependency "bundler", "~> 1.7" spec.add_development_dependency "rake", "~> 10.0" + spec.add_dependency "fluentd", [">= 0.14.15", "< 2"] end diff --git a/lib/fluent/plugin/out_azureeventhubs_buffered.rb b/lib/fluent/plugin/out_azureeventhubs_buffered.rb index 488de16..cb52369 100644 --- a/lib/fluent/plugin/out_azureeventhubs_buffered.rb +++ b/lib/fluent/plugin/out_azureeventhubs_buffered.rb @@ -1,22 +1,32 @@ -#module Fluent +module Fluent::Plugin - class AzureEventHubsOutputBuffered < Fluent::BufferedOutput + class AzureEventHubsOutputBuffered < Output Fluent::Plugin.register_output('azureeventhubs_buffered', self) + helpers :compat_parameters, :inject + + DEFAULT_BUFFER_TYPE = "memory" + config_param :connection_string, :string config_param :hub_name, :string config_param :include_tag, :bool, :default => false config_param :include_time, :bool, :default => false config_param :tag_time_name, :string, :default => 'time' config_param :expiry_interval, :integer, :default => 3600 # 60min - config_param :type, :string, :default => 'https' # https / amqps (Not Implemented) + config_param :type, :string, :default => 'https' # https / amqps (Not Implemented) config_param :proxy_addr, :string, :default => '' config_param :proxy_port, :integer,:default => 3128 config_param :open_timeout, :integer,:default => 60 config_param :read_timeout, :integer,:default => 60 config_param :message_properties, :hash, :default => nil + config_section :buffer do + config_set_default :@type, DEFAULT_BUFFER_TYPE + config_set_default :chunk_keys, ['tag'] + end + def configure(conf) + compat_parameters_convert(conf, :buffer, :inject) super case @type when 'amqps' @@ -25,12 +35,18 @@ def configure(conf) require_relative 'azureeventhubs/http' @sender = AzureEventHubsHttpSender.new(@connection_string, @hub_name, @expiry_interval,@proxy_addr,@proxy_port,@open_timeout,@read_timeout) end + raise Fluent::ConfigError, "'tag' in chunk_keys is required." if not @chunk_key_tag end def format(tag, time, record) + record = inject_values_to_record(tag, time, record) [tag, time, record].to_msgpack end + def formatted_to_msgpack_binary? + true + end + def write(chunk) chunk.msgpack_each { |tag, time, record| p record.to_s @@ -44,5 +60,4 @@ def write(chunk) } end end -#end - +end