Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reconnect Cassandra client and reinit session when NoHostAvailableException #60

Merged
merged 1 commit into from
Oct 21, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package org.commonjava.indy.service.tracking.data.cassandra;

import com.datastax.driver.core.*;
import com.datastax.driver.core.exceptions.NoHostAvailableException;
import com.datastax.driver.mapping.Mapper;
import com.datastax.driver.mapping.MappingManager;
import org.commonjava.indy.service.tracking.exception.ContentException;
Expand Down Expand Up @@ -130,6 +131,37 @@ public void init()
logger.info( "-- Cassandra Folo Records Keyspace and Tables created" );
}

private ResultSet executeSession ( BoundStatement bind )
{
boolean exception = false;
ResultSet trackingRecord = null;
try
{
if ( session == null )
{
client.close();
client.init();
this.init();
}
trackingRecord = session.execute( bind );
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if the session need to be a new one or not, for example in tracking service cluster.connect() may try to reconnect. But you can test locally if you have the instance for test before.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or I see, there is client.init(). so let's have a try.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sswguo Yeah, the session refetch will call through this class init method. I've tested locally, it could reconnect immediately after re-startup if relocated db server to a different IP, but still need to validate in the real devel Cluster env, I'll merge this to have a try.

}
catch ( NoHostAvailableException e )
{
exception = true;
logger.error( "Cannot connect to host, reconnect once more with new session.", e );
}
finally
{
if ( exception )
{
client.close();
client.init();
this.init();
trackingRecord = session.execute( bind );
}
}
return trackingRecord;
}
public boolean recordArtifact( TrackedContentEntry entry ) throws ContentException, IndyWorkflowException
{

Expand All @@ -139,7 +171,7 @@ public boolean recordArtifact( TrackedContentEntry entry ) throws ContentExcepti
String effect = entry.getEffect().toString();

BoundStatement bind = getTrackingRecord.bind( buildId, storeKey, path, effect );
ResultSet trackingRecord = session.execute( bind );
ResultSet trackingRecord = executeSession( bind );
Row one = trackingRecord.one();

if ( one != null )
Expand All @@ -161,7 +193,7 @@ public void delete( TrackingKey key )
{
logger.info( "Delete tracking records, tracking_id: {}", key.getId() );
BoundStatement bind = deleteTrackingRecordsByTrackingKey.bind( key.getId() );
session.execute( bind );
executeSession( bind );
}

public void replaceTrackingRecord( TrackedContent record )
Expand All @@ -172,7 +204,7 @@ public void replaceTrackingRecord( TrackedContent record )
public boolean hasRecord( TrackingKey key )
{
BoundStatement bind = isTrackingRecordExist.bind( key );
ResultSet result = session.execute( bind );
ResultSet result = executeSession( bind );
Row row = result.one();
boolean exists = false;
if ( row != null )
Expand Down Expand Up @@ -303,15 +335,15 @@ private TrackedContent transformDtxTrackingRecordToTrackingContent( TrackingKey
private List<DtxTrackingRecord> getLegacyDtxTrackingRecordsFromDb( TrackingKey trackingKey )
{
BoundStatement bind = getLegacyTrackingRecordsByTrackingKey.bind( trackingKey.getId() );
ResultSet execute = session.execute( bind );
ResultSet execute = executeSession( bind );
List<Row> rows = execute.all();
return fetchRecordsFromRows( rows );
}

private List<DtxTrackingRecord> getDtxTrackingRecordsFromDb( TrackingKey trackingKey )
{
BoundStatement bind = getTrackingRecordsByTrackingKey.bind( trackingKey.getId() );
ResultSet execute = session.execute( bind );
ResultSet execute = executeSession( bind );
List<Row> rows = execute.all();
return fetchRecordsFromRows( rows );
}
Expand Down Expand Up @@ -385,7 +417,7 @@ private Set<TrackingKey> getTrackingKeys()

private Set<TrackingKey> getTrackingKeys( BoundStatement statement )
{
ResultSet resultSet = session.execute( statement );
ResultSet resultSet = executeSession( statement );
List<Row> all = resultSet.all();
Iterator<Row> iterator = all.iterator();

Expand Down
Loading