Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Task doesn't fail after read.timeout.ms is exceeded #534

Open
jpmarques66 opened this issue May 25, 2021 · 5 comments
Open

Task doesn't fail after read.timeout.ms is exceeded #534

jpmarques66 opened this issue May 25, 2021 · 5 comments

Comments

@jpmarques66
Copy link

jpmarques66 commented May 25, 2021

TL;DR

Elasticsearch becomes unavailable in the middle of a connection, which makes read.timeout.ms be exceeded. I expected the task to fail, but the task keeps the status RUNNING.

Description

Hello! I'm trying to set up this elasticsearch connector to interact with tenant-owned systems, hence I need it to be resilient to tenants' systems failure (as much as possible).

While testing for elasticsearch unavailability, I tried killing its only master node and I get this error:

[2021-05-25 16:34:00,818] WARN Bulk request 58 failed. Retrying request. (io.confluent.connect.elasticsearch.ElasticsearchClient)
java.net.SocketTimeoutException: 3,000 milliseconds timeout on connection http-outgoing-22 [ACTIVE]
	at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.timeout(HttpAsyncRequestExecutor.java:387)
	at org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:92)
	at org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:39)
	at org.apache.http.impl.nio.reactor.AbstractIODispatch.timeout(AbstractIODispatch.java:175)
	at org.apache.http.impl.nio.reactor.BaseIOReactor.sessionTimedOut(BaseIOReactor.java:263)
	at org.apache.http.impl.nio.reactor.AbstractIOReactor.timeoutCheck(AbstractIOReactor.java:492)
	at org.apache.http.impl.nio.reactor.BaseIOReactor.validate(BaseIOReactor.java:213)
	at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:280)
	at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
	at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591)
	at java.base/java.lang.Thread.run(Thread.java:834)
[2021-05-25 16:39:05,899] INFO [Consumer clientId=connector-consumer-simple-elasticsearch-connector-0, groupId=connect-simple-elasticsearch-connector] Member connector-consumer-simple-elasticsearch-connector-0-a0b0c673-c1d6-4ec8-b9c6-138bec30adcc sending LeaveGroup request to coordinator kafka-0.kafka-headless.kafka.svc.cluster.local:9092 (id: 2147483647 rack: null) due to consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)

The error seems ok, but the task doesn't fail as I expected

❯ curl -X GET localhost:8083/connectors/simple-elasticsearch-connector/status | jq
{
  "name": "simple-elasticsearch-connector",
  "connector": {
    "state": "RUNNING",
    "worker_id": "10.244.1.103:8083"
  },
  "tasks": [
    {
      "id": 0,
      "state": "RUNNING",
      "worker_id": "10.244.1.103:8083"
    }
  ],
  "type": "sink"
}

My connector configuration:

curl -X PUT localhost:8083/connectors/simple-elasticsearch-connector/config -H "Content-Type: application/json" -d '{
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "connection.url": "http://elastic-elasticsearch-coordinating-only.elasticsearch.svc:9200",
    "tasks.max": "1",
    "topics": "test-helm-repo-dfb79",
    "name": "simple-elasticsearch-connector",
    "type.name": "_doc",
    "schema.ignore": "true",
    "key.ignore":"true",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "transforms":"routeTS",  
    "transforms.routeTS.type":"org.apache.kafka.connect.transforms.TimestampRouter",  
    "transforms.routeTS.topic.format":"${timestamp}-logs",
    "errors.log.enable": "true",
    "read.timeout.ms": "3000"
}'

Versions

Docker image: confluentinc/cp-kafka-connect:6.1.0
kafka-connect-elasticsearch: confluentinc/kafka-connect-elasticsearch:11.0.4

Is this supposed to happen? Thanks in advance!!! <3

@thomas-tomlinson
Copy link

I'm experiencing the same condition as well. I was able to reproduce this with the TRACE level logging and it shows an interesting behavior in the retry portion. I've got the max.retries set to 19 and didn't set the retry.backoff.ms, so it's default to 100. I have a reverse load balancer on the kafka connect hosts that connects to Elasticsearch, so after indexing a few documents I just shut that off on the node hosting the single task I have configured.

This is what the log shows

[2021-07-15 07:26:52,555] DEBUG Putting 1 records to Elasticsearch. (io.confluent.connect.elasticsearch.ElasticsearchSinkTask)
[2021-07-15 07:26:52,555] TRACE Writing record from topic=trans_2021_07 partition=0 offset=496188 to Elasticsearch. (io.confluent.connect.elasticsearch.ElasticsearchSinkTask)
[2021-07-15 07:26:52,556] TRACE Adding record from topic=trans_2021_07 partition=0 offset=496188 to bulk processor. (io.confluent.connect.elasticsearch.ElasticsearchSinkTask)
[2021-07-15 07:26:52,564] WARN Bulk request 4 failed. Retrying request. (io.confluent.connect.elasticsearch.ElasticsearchClient)
[2021-07-15 07:26:52,575] TRACE Try retrying bulk request (attempt 1 of 19) (io.confluent.connect.elasticsearch.RetryUtil)
[2021-07-15 07:26:53,638] DEBUG Putting 0 records to Elasticsearch. (io.confluent.connect.elasticsearch.ElasticsearchSinkTask)
[2021-07-15 07:26:53,639] DEBUG Flushing data to Elasticsearch with the following offsets: {ES_SINK_TRANS-0=OffsetAndMetadata{offset=496189, leaderEpoch=null, metadata=''}} (io.confluent.connect.elasticsearch.ElasticsearchSinkTask)

So it looks like the retry process is entered, but then the behavior doesn't make a lot of sense. There's only a single retry and then 0 records are put into Elasticsearch, then nothing. There's actually more records waiting but nothing happens.

@thomas-tomlinson
Copy link

I've tried all manner of configuration options (increasing the linger.ms, max.infilght.requests, etc) and I simply cannot get the BulkProcessor portion of this code to do a retry. I've went as far as adding a couple of debug statements to the code (inside the ElasticSearchClient.java and RetryUtil.java) to try and understand what's going on here. It looks like we're running into an issue with the native Elasticsearch java client with an open issue here: #71159, I then see the PR that was made as a result PR513 which was the attempt to make this work outside of the native client. Question is, has anyone actually seen this retry behavior work outside of the initial index check? From what i can tell, the retry method that was implemented in the sink code here should be working, but is falling victim to whatever async thread condition that seems show up randomly in the elasticsearch native client BulkProcessor.

Are there other recovery options we could use within the kafka connect framework? Such as inducing a retry-able task level failure? That would at least allow for some automated responses to the issues that enter this state instead of requiring manual intervention to restart the task as a result of outside monitoring (we're currently having to monitor the consumer group offset for this instead of relying on the status of the connector and associated tasks).

@tporeba
Copy link

tporeba commented Aug 26, 2021

@thomas-tomlinson I am experiencing similar issues with retries hanging. I noticed there is some active development going on in #575 and I hope it will fix the issue.

@thomas-tomlinson
Copy link

I built the sink connector from the 11.0.x branch (which has the #575 PR merge) and my initial testing shows a retry behavior that works during a bulkProcessor request. I'm inducing this failure by stopping my local reverse proxy that the connect node is configured to use fo the search cluster address. Here's the logging on my test node that shows the desired retry behavior.

[2021-08-26 12:32:52,066] WARN Failed to execute bulk request due to java.net.ConnectException: Connection refused. Retrying attempt (1/1001) after backoff of 887 ms (io.confluent.connect.elasticsearch.RetryUtil)
[2021-08-26 12:32:52,957] WARN Failed to execute bulk request due to java.net.ConnectException: Connection refused. Retrying attempt (2/1001) after backoff of 3919 ms (io.confluent.connect.elasticsearch.RetryUtil)
[2021-08-26 12:32:56,879] WARN Failed to execute bulk request due to java.net.ConnectException: Connection refused. Retrying attempt (3/1001) after backoff of 4254 ms (io.confluent.connect.elasticsearch.RetryUtil)
[2021-08-26 12:33:01,137] WARN Failed to execute bulk request due to java.net.ConnectException: Connection refused. Retrying attempt (4/1001) after backoff of 49 ms (io.confluent.connect.elasticsearch.RetryUtil)
[2021-08-26 12:33:01,190] WARN Failed to execute bulk request due to java.net.ConnectException: Connection refused. Retrying attempt (5/1001) after backoff of 7975 ms (io.confluent.connect.elasticsearch.RetryUtil)

@yeikel
Copy link

yeikel commented Dec 10, 2023

Did this ever get resolved? I am also seeing this one along with #739 and I am using the latest version of the connector

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants