Skip to content

Commit

Permalink
*Make some fixes for retries and upload
Browse files Browse the repository at this point in the history
  • Loading branch information
ag-ramachandran committed Nov 11, 2024
1 parent 7da8798 commit b624d01
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 32 deletions.
1 change: 0 additions & 1 deletion lib/logstash/outputs/kusto.rb
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,6 @@ def close
@logger.error("Error stopping ingestor: #{e.message}")
@logger.error(e.backtrace.join("\n"))
end

@logger.info("Kusto output plugin Closed")
end

Expand Down
61 changes: 30 additions & 31 deletions lib/logstash/outputs/kusto/ingestor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ class Ingestor
FIELD_REF = /%\{[^}]+\}/

def initialize(kusto_logstash_configuration, logger, threadpool = DEFAULT_THREADPOOL)
@retry_count = 3
@retry_delay = 10
@workers_pool = threadpool
@logger = logger
#Validate and assign
Expand Down Expand Up @@ -85,51 +87,48 @@ def initialize(kusto_logstash_configuration, logger, threadpool = DEFAULT_THREAD
@logger.debug('No mapping reference provided. Columns will be mapped by names in the logstash output')
@ingestion_properties.setDataFormat(kusto_java.ingest.IngestionProperties::DataFormat::JSON)
end
# retry_policy = Java::com.azure.storage.common.policy
# duration = Java::java.time.Duration.ofSeconds(5)

# fixed_delay_options = Java::com.azure.core.http.policy.FixedDelayOptions.new(1,duration)
# retry_options = Java::com.azure.core.http.policy.RetryOptions.new(fixed_delay_options)
# req_retry_options = Java::com.azure.storage.common.policy.RequestRetryOptions.fromRetryOptions(retry_options, Java::java.time.Duration.ofSeconds(10), "")

# queued_ingest_client = @kusto_client.to_java(Java::com.microsoft.azure.kusto.ingest.QueuedIngestClientImpl)
# queued_ingest_client.setQueueRequestOptions(req_retry_options)
@logger.debug('Kusto resources are ready.')
end

def upload_async(data)
if @workers_pool.remaining_capacity <= LOW_QUEUE_LENGTH
@logger.warn("Ingestor queue capacity is running low with #{@workers_pool.remaining_capacity} free slots.")
end
exception = nil
@workers_pool.post do
LogStash::Util.set_thread_name("Kusto to ingest data")
begin
upload(data)
rescue => e
@logger.error('Error during async upload.', exception: e.class, message: e.message, backtrace: e.backtrace)
exception = e
end
LogStash::Util.set_thread_name("Kusto to ingest data #{JRuby.reference(Thread.current).native_thread.id}")
upload(data)
end
# Wait for the task to complete and check for exceptions
@workers_pool.shutdown
@workers_pool.wait_for_termination

raise exception if exception
rescue Exception => e
@logger.error('StandardError in upload_async.', exception: e.class, message: e.message, backtrace: e.backtrace)
raise e
end

def upload(data)
@logger.debug("Sending data to Kusto")
if data.size > 0
begin
data_source_info = Java::com.microsoft.azure.kusto.ingest.source.StreamSourceInfo.new(java.io.ByteArrayInputStream.new(data.to_java_bytes))
result = @kusto_client.ingestFromStream(data_source_info, @ingestion_properties)
rescue => e
@logger.error('Uploading failed.', exception: e.class, message: e.message, backtrace: e.backtrace)
raise e
begin
@logger.debug("Sending data to Kusto")
if data.size > 0
data_source_info = Java::com.microsoft.azure.kusto.ingest.source.StreamSourceInfo.new(java.io.ByteArrayInputStream.new(data.to_java_bytes))
result = @kusto_client.ingestFromStream(data_source_info, @ingestion_properties)
else
@logger.warn("Data is empty and is not ingested.")
end
@logger.debug("Data sent to Kusto.")
rescue => e
if tries < @retry_count
tries += 1
logger.warn("Uploading failed, retrying (##{tries} of #{@retry_count})", :exception => e.class, :message => e.message, :path => file.path, :backtrace => e.backtrace)
sleep @retry_delay
retry
else
logger.error("Failed to upload file (retried #{@retry_count} times).", :exception => e.class, :message => e.message, :path => file.path, :backtrace => e.backtrace)
end
else
@logger.warn("Data is empty and is not ingested.")
end

@logger.debug("Data sent to Kusto.")
rescue => e
@logger.error('Uploading failed.', exception: e.class, message: e.message, backtrace: e.backtrace)
raise e
end

def stop
Expand Down

0 comments on commit b624d01

Please sign in to comment.