diff --git a/.gitignore b/.gitignore index dca56c4..3762b35 100644 --- a/.gitignore +++ b/.gitignore @@ -2,4 +2,3 @@ Gemfile.lock Gemfile.bak .bundle -vendor diff --git a/lib/logstash-output-jdbc_jars.rb b/lib/logstash-output-jdbc_jars.rb new file mode 100644 index 0000000..4211f0f --- /dev/null +++ b/lib/logstash-output-jdbc_jars.rb @@ -0,0 +1,5 @@ +# encoding: utf-8 +require 'logstash/environment' + +root_dir = File.expand_path(File.join(File.dirname(__FILE__), "..")) +LogStash::Environment.load_runtime_jars! File.join(root_dir, "vendor") diff --git a/lib/logstash/outputs/jdbc.rb b/lib/logstash/outputs/jdbc.rb index 435b906..3002478 100644 --- a/lib/logstash/outputs/jdbc.rb +++ b/lib/logstash/outputs/jdbc.rb @@ -3,6 +3,7 @@ require "logstash/namespace" require "stud/buffer" require "java" +require "logstash-output-jdbc_jars" class LogStash::Outputs::Jdbc < LogStash::Outputs::Base # Adds buffer support @@ -10,15 +11,31 @@ class LogStash::Outputs::Jdbc < LogStash::Outputs::Base config_name "jdbc" - # Driver class - config :driver_class, :validate => :string + # Driver class - No longer required + config :driver_class, :obsolete => true - # connection string + # Where to find the jar + # Defaults to not required, and to the original behaviour + config :driver_jar_path, :validate => :string, :required => false + + # jdbc connection string config :connection_string, :validate => :string, :required => true + # jdbc username - optional, maybe in the connection string + config :username, :validate => :string, :required => false + + # jdbc password - optional, maybe in the connection string + config :password, :validate => :string, :required => false + # [ "insert into table (message) values(?)", "%{message}" ] config :statement, :validate => :array, :required => true + # Number of connections in the pool to maintain + config :max_pool_size, :validate => :number, :default => 5 + + # Connection timeout + config :connection_timeout, :validate => :number, :default => 2800 + # We buffer a certain number of events before flushing that out to SQL. # This setting controls how many events will be buffered before sending a # batch of events. @@ -40,7 +57,7 @@ class LogStash::Outputs::Jdbc < LogStash::Outputs::Base # Maximum number of repeating (sequential) exceptions, before we stop retrying # If set to < 1, then it will infinitely retry. - config :max_repeat_exceptions, :validate => :number, :default => 5 + config :max_repeat_exceptions, :validate => :number, :default => 4 # The max number of seconds since the last exception, before we consider it # a different cause. @@ -49,34 +66,21 @@ class LogStash::Outputs::Jdbc < LogStash::Outputs::Base public def register - @logger.info("JDBC - Starting up") - if ENV['LOGSTASH_HOME'] - jarpath = File.join(ENV['LOGSTASH_HOME'], "/vendor/jar/jdbc/*.jar") - else - jarpath = File.join(File.dirname(__FILE__), "../../../vendor/jar/jdbc/*.jar") - end - - @logger.debug("JDBC - jarpath", path: jarpath) - - jars = Dir[jarpath] - raise Exception.new("JDBC - No jars found in jarpath. Have you read the README?") if jars.empty? - - jars.each do |jar| - @logger.debug("JDBC - Loaded jar", :jar => jar) - require jar - end - - import @driver_class + load_jar_files! - driver = Object.const_get(@driver_class[@driver_class.rindex('.') + 1, @driver_class.length]).new - @connection = driver.connect(@connection_string, java.util.Properties.new) + @pool = Java::ComZaxxerHikari::HikariDataSource.new + @pool.setJdbcUrl(@connection_string) + + @pool.setUsername(@username) if @username + @pool.setPassword(@password) if @password - @logger.debug("JDBC - Created connection", :driver => driver, :connection => @connection) + @pool.setMaximumPoolSize(@max_pool_size) + @pool.setConnectionTimeout(@connection_timeout) if (@flush_size > 1000) - @logger.warn("JDBC - Flush size is set to > 1000. May have performance penalties, depending on your SQL engine.") + @logger.warn("JDBC - Flush size is set to > 1000") end @repeat_exception_count = 0 @@ -101,7 +105,9 @@ def receive(event) end def flush(events, teardown=false) - statement = @connection.prepareStatement(@statement[0]) + connection = @pool.getConnection() + + statement = connection.prepareStatement(@statement[0]) events.each do |event| next if @statement.length < 2 @@ -132,6 +138,7 @@ def flush(events, teardown=false) begin @logger.debug("JDBC - Sending SQL", :sql => statement.toString()) statement.executeBatch() + statement.close() rescue => e # Raising an exception will incur a retry from Stud::Buffer. # Since the exceutebatch failed this should mean any events failed to be @@ -140,9 +147,9 @@ def flush(events, teardown=false) if e.getNextException() != nil @logger.warn("JDBC - Exception. Will automatically retry", :exception => e.getNextException()) end + ensure + connection.close(); end - - statement.close() end def on_flush_error(e) @@ -165,8 +172,36 @@ def on_flush_error(e) def teardown buffer_flush(:final => true) - @connection.close() + @pool.close() super end + private + + def load_jar_files! + # Load jar from driver path + unless @driver_jar_path.nil? + raise Exception.new("JDBC - Could not find jar file at given path. Check config.") unless File.exists? @driver_jar_path + require @driver_jar_path + return + end + + # Revert original behaviour of loading from vendor directory + # if no path given + if ENV['LOGSTASH_HOME'] + jarpath = File.join(ENV['LOGSTASH_HOME'], "/vendor/jar/jdbc/*.jar") + else + jarpath = File.join(File.dirname(__FILE__), "../../../vendor/jar/jdbc/*.jar") + end + + @logger.debug("JDBC - jarpath", path: jarpath) + + jars = Dir[jarpath] + raise Exception.new("JDBC - No jars found in jarpath. Have you read the README?") if jars.empty? + + jars.each do |jar| + @logger.debug("JDBC - Loaded jar", :jar => jar) + require jar + end + end end # class LogStash::Outputs::jdbc diff --git a/logstash-output-jdbc.gemspec b/logstash-output-jdbc.gemspec index ebaf541..27516f7 100644 --- a/logstash-output-jdbc.gemspec +++ b/logstash-output-jdbc.gemspec @@ -1,6 +1,6 @@ Gem::Specification.new do |s| s.name = 'logstash-output-jdbc' - s.version = "0.1.4" + s.version = "0.2.0-rc.1" s.licenses = [ "Apache License (2.0)" ] s.summary = "This plugin allows you to output to SQL, via JDBC" s.description = "This gem is a logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/plugin install gemname. This gem is not a stand-alone program" @@ -19,6 +19,8 @@ Gem::Specification.new do |s| # Gem dependencies s.add_runtime_dependency "logstash-core", ">= 2.0.0.beta2", "< 3.0.0" + s.add_runtime_dependency 'stud' s.add_runtime_dependency "logstash-codec-plain" + s.add_development_dependency "logstash-devutils" end diff --git a/vendor/jar-dependencies/runtime-jars/HikariCP-2.4.2.jar b/vendor/jar-dependencies/runtime-jars/HikariCP-2.4.2.jar new file mode 100644 index 0000000..1c49ac5 Binary files /dev/null and b/vendor/jar-dependencies/runtime-jars/HikariCP-2.4.2.jar differ diff --git a/vendor/jar-dependencies/runtime-jars/slf4j-api-1.7.13.jar b/vendor/jar-dependencies/runtime-jars/slf4j-api-1.7.13.jar new file mode 100644 index 0000000..4dfaaa8 Binary files /dev/null and b/vendor/jar-dependencies/runtime-jars/slf4j-api-1.7.13.jar differ diff --git a/vendor/jar-dependencies/runtime-jars/slf4j-nop-1.7.13.jar b/vendor/jar-dependencies/runtime-jars/slf4j-nop-1.7.13.jar new file mode 100644 index 0000000..15a41eb Binary files /dev/null and b/vendor/jar-dependencies/runtime-jars/slf4j-nop-1.7.13.jar differ