Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
theangryangel committed Nov 17, 2015
1 parent a6c669c commit 9804850
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 29 deletions.
49 changes: 34 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,30 +6,49 @@ See below for tested adapters, and example configurations.

This has not yet been extensively tested with all JDBC drivers and may not yet work for you.

If you do find this works for a JDBC driver not listed, let me know and provide a small example configuration.

This plugin does not bundle any JDBC jar files, and does expect them to be in a
particular location. Please ensure you read the 4 installation lines below.

## Headlines
- Support for connection pooling added in 0.2.0 [unreleased until #21 is resolved]
- Support for unsafe statement handling (allowing dynamic queries) in 0.2.0 [unreleased until #21 is resolved]

## Versions
- See master branch for logstash v2+
- See v1.5 branch for logstash v1.5
- See v1.4 branch for logstash 1.4

## Installation
- Run `bin/plugin install logstash-output-jdbc` in your logstash installation directory
- Create the directory vendor/jar/jdbc in your logstash installation (`mkdir -p vendor/jar/jdbc/`)
- Add JDBC jar files to vendor/jar/jdbc in your logstash installation
- Configure
- Now either:
- Use driver_class in your configuraton to specify a path to your jar file
- Or:
- Create the directory vendor/jar/jdbc in your logstash installation (`mkdir -p vendor/jar/jdbc/`)
- Add JDBC jar files to vendor/jar/jdbc in your logstash installation
- And then configure (examples below)

## Configuration options
* driver_class, string, JDBC driver class to load
* connection_string, string, JDBC connection string
* statement, array, an array of strings representing the SQL statement to run. Index 0 is the SQL statement that is prepared, all other array entries are passed in as parameters (in order). A parameter may either be a property of the event (i.e. "@timestamp", or "host") or a formatted string (i.e. "%{host} - %{message}" or "%{message}"). If a key is passed then it will be automatically converted as required for insertion into SQL. If it's a formatted string then it will be passed in verbatim.
* flush_size, number, default = 1000, number of entries to buffer before sending to SQL
* idle_flush_time, number, default = 1, number of idle seconds before sending data to SQL, even if the flush_size has not been reached. If you modify this value you should also consider altering max_repeat_exceptions_time
* max_repeat_exceptions, number, default = 5, number of times the same exception can repeat before we stop logstash. Set to a value less than 1 if you never want it to stop
* max_repeat_exceptions_time, number, default = 30, maxium number of seconds between exceptions before they're considered "different" exceptions. If you modify idle_flush_time you should consider this value

| Option | Type | Description | Required? |
| ------ | ---- | ----------- | --------- |
| driver_path | String | File path to jar file containing your JDBC driver. This is optional, and all JDBC jars may be placed in $LOGSTASH_HOME/vendor/jar/jdbc instead. | No |
| connection_string | String | JDBC connection URL | Yes |
| username | String | JDBC username - this is optional as it may be included in the connection string, for many drivers | No |
| password | String | JDBC password - this is optional as it may be included in the connection string, for many drivers | No |
| statement | Array | An array of strings representing the SQL statement to run. Index 0 is the SQL statement that is prepared, all other array entries are passed in as parameters (in order). A parameter may either be a property of the event (i.e. "@timestamp", or "host") or a formatted string (i.e. "%{host} - %{message}" or "%{message}"). If a key is passed then it will be automatically converted as required for insertion into SQL. If it's a formatted string then it will be passed in verbatim. | Yes |
| unsafe_statement | Boolean | If yes, the statement is evaluated for event fields - this allows you to use dynamic table names, etc. **This is highly dangerous** and you should **not** use this unless you are 100% sure that the field(s) you are passing in are 100% safe. Failure to do so will result in possible SQL injections. Please be aware that there is also a potential performance penalty as each event must be evaluated and inserted into SQL one at a time, where as when this is false multiple events are inserted at once. Example statement: [ "insert into %{table_name_field} (column) values(?)", "fieldname" ] | No |
| max_pool_size | Number | Maximum number of connections to open to the SQL server at any 1 time | No |
| connection_timeout | Number | Number of seconds before a SQL connection is closed | No |
| flush_size | Number | Maximum number of entries to buffer before sending to SQL - if this is reached before idle_flush_time | No |
| idle_flush_time | Number | Number of idle seconds before sending data to SQL - even if the flush_size has not yet been reached | No |
| max_repeat_exceptions | Number | Number of times the same exception can repeat before we stop logstash. Set to a value less than 1 if you never want it to stop | No |
| max_repeat_exceptions_time | Number | Maxium number of seconds between exceptions before they're considered "different" exceptions. If you modify idle_flush_time you should consider this value | No |

## Example configurations
If you have a working sample configuration, for a DB thats not listed, pull requests are welcome.

### SQLite3
* Tested using https://bitbucket.org/xerial/sqlite-jdbc
* SQLite setup - `echo "CREATE table log (host text, timestamp datetime, message text);" | sqlite3 test.db`
Expand All @@ -42,7 +61,6 @@ output {
stdout { }
jdbc {
driver_class => 'org.sqlite.JDBC'
connection_string => 'jdbc:sqlite:test.db'
statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, ?, ?)", "host", "@timestamp", "message" ]
}
Expand All @@ -58,7 +76,6 @@ input
}
output {
jdbc {
driver_class => 'com.microsoft.sqlserver.jdbc.SQLServerDriver'
connection_string => "jdbc:sqlserver://server:1433;databaseName=databasename;user=username;password=password;autoReconnect=true;"
statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, ?, ?)", "host", "@timestamp", "message" ]
}
Expand All @@ -74,7 +91,6 @@ input
}
output {
jdbc {
driver_class => 'org.postgresql.Driver'
connection_string => 'jdbc:postgresql://hostname:5432/database?user=username&password=password'
statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, CAST (? AS timestamp), ?)", "host", "@timestamp", "message" ]
}
Expand All @@ -92,7 +108,6 @@ input
}
output {
jdbc {
driver_class => "oracle.jdbc.driver.OracleDriver"
connection_string => "jdbc:oracle:thin:USER/PASS@HOST:PORT:SID"
statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, CAST (? AS timestamp), ?)", "host", "@timestamp", "message" ]
}
Expand All @@ -110,9 +125,13 @@ input
}
output {
jdbc {
driver_class => "com.mysql.jdbc.Driver"
connection_string => "jdbc:mysql://HOSTNAME/DATABASE?user=USER&password=PASSWORD"
statement => [ "INSERT INTO log (host, timestamp, message) VALUES(?, CAST (? AS timestamp), ?)", "host", "@timestamp", "message" ]
}
}
```

### MariaDB
This is reportedly working, according to [@db2882](https://github.com/db2882) in issue #20.
No example configuration provided.
If you have a working sample, pull requests are welcome.
41 changes: 29 additions & 12 deletions lib/logstash/outputs/jdbc.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ class LogStash::Outputs::Jdbc < LogStash::Outputs::Base
config_name "jdbc"

# Driver class - No longer required
config :driver_class, :obsolete => true
config :driver_class, :obsolete => "driver_class is no longer required and can be removed from your configuration"

# Where to find the jar
# Defaults to not required, and to the original behaviour
Expand Down Expand Up @@ -104,7 +104,7 @@ def register
end

def receive(event)
return unless output?(event)
return unless output?(event) or event.cancelled?
return unless @statement.length > 0

buffer_receive(event)
Expand Down Expand Up @@ -173,28 +173,24 @@ def load_jar_files!

def safe_flush(events, teardown=false)
connection = @pool.getConnection()

statement = connection.prepareStatement(@statement[0])

events.each do |event|
next if event.cancelled?
next if @statement.length < 2
statement = add_statement_event_params(statement, event)

statement.addBatch()
end

begin
@logger.debug("JDBC - Sending SQL", :sql => statement.toString())
statement.executeBatch()
statement.close()
rescue => e
# Raising an exception will incur a retry from Stud::Buffer.
# Since the exceutebatch failed this should mean any events failed to be
# inserted will be re-run. We're going to log it for the lols anyway.
@logger.warn("JDBC - Exception. Will automatically retry", :exception => e)
if e.getNextException() != nil
@logger.warn("JDBC - Exception. Will automatically retry", :exception => e.getNextException())
end
log_jdbc_exception(e)
ensure
connection.close();
end
Expand All @@ -204,13 +200,25 @@ def unsafe_flush(events, teardown=false)
connection = @pool.getConnection()

events.each do |event|
next if event.cancelled?

statement = connection.prepareStatement(event.sprintf(@statement[0]))

statement = add_statement_event_params(statement, event) if @statement.length > 1

statement.execute()
statement.close()
connection.close()
begin
statement.execute()

# cancel the event, since we may end up outputting the same event multiple times
# if an exception happens later down the line
event.cancel
rescue => e
# Raising an exception will incur a retry from Stud::Buffer.
# We log for the lols.
log_jdbc_exception(e)
ensure
statement.close()
connection.close()
end
end
end

Expand All @@ -237,4 +245,13 @@ def add_statement_event_params(statement, event)

statement
end

def log_jdbc_exception(e)
ce = e
loop do
@logger.error("JDBC Exception encountered: Will automatically retry.", :exception => ce)
ce = e.getNextException()
break if ce == nil
end
end
end # class LogStash::Outputs::jdbc
5 changes: 3 additions & 2 deletions logstash-output-jdbc.gemspec
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Gem::Specification.new do |s|
s.name = 'logstash-output-jdbc'
s.version = "0.2.0.rc2"
s.version = "0.2.0.rc3"
s.licenses = [ "Apache License (2.0)" ]
s.summary = "This plugin allows you to output to SQL, via JDBC"
s.description = "This gem is a logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/plugin install gemname. This gem is not a stand-alone program"
Expand All @@ -10,7 +10,8 @@ Gem::Specification.new do |s|
s.require_paths = [ "lib" ]

# Files
s.files = `git ls-files`.split($\)
s.files = Dir.glob("{lib,vendor,spec}/**/*") + %w(LICENSE.txt README.md)

# Tests
s.test_files = s.files.grep(%r{^(test|spec|features)/})

Expand Down

0 comments on commit 9804850

Please sign in to comment.