Skip to content

Commit

Permalink
Track session states (aws#729)
Browse files Browse the repository at this point in the history
  • Loading branch information
sergiyvamz authored Nov 17, 2023
1 parent 2f8f5ca commit 6ac6957
Show file tree
Hide file tree
Showing 12 changed files with 458 additions and 44 deletions.
13 changes: 13 additions & 0 deletions wrapper/src/main/java/software/amazon/jdbc/PluginService.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import software.amazon.jdbc.dialect.Dialect;
import software.amazon.jdbc.exceptions.ExceptionHandler;
import software.amazon.jdbc.hostavailability.HostAvailability;
import software.amazon.jdbc.states.SessionDirtyFlag;
import software.amazon.jdbc.util.telemetry.TelemetryFactory;

/**
Expand All @@ -47,6 +48,18 @@ EnumSet<NodeChangeOptions> setCurrentConnection(
@Nullable ConnectionPlugin skipNotificationForThisPlugin)
throws SQLException;

EnumSet<SessionDirtyFlag> getCurrentConnectionState();

void setCurrentConnectionState(SessionDirtyFlag flag);

void resetCurrentConnectionState(SessionDirtyFlag flag);

void resetCurrentConnectionStates();

boolean getAutoCommit();

void setAutoCommit(final boolean autoCommit);

List<HostSpec> getHosts();

HostSpec getInitialConnectionHostSpec();
Expand Down
26 changes: 26 additions & 0 deletions wrapper/src/main/java/software/amazon/jdbc/PluginServiceImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import software.amazon.jdbc.hostavailability.HostAvailability;
import software.amazon.jdbc.hostavailability.HostAvailabilityStrategyFactory;
import software.amazon.jdbc.hostlistprovider.StaticHostListProvider;
import software.amazon.jdbc.states.SessionDirtyFlag;
import software.amazon.jdbc.util.CacheMap;
import software.amazon.jdbc.util.Messages;
import software.amazon.jdbc.util.telemetry.TelemetryFactory;
Expand All @@ -68,6 +69,8 @@ public class PluginServiceImpl implements PluginService, CanReleaseResources,
private final ExceptionManager exceptionManager;
protected final DialectProvider dialectProvider;
protected Dialect dialect;
protected EnumSet<SessionDirtyFlag> currentConnectionSessionState = EnumSet.noneOf(SessionDirtyFlag.class);
protected boolean isAutoCommit = false;

public PluginServiceImpl(
@NonNull final ConnectionPluginManager pluginManager,
Expand Down Expand Up @@ -568,4 +571,27 @@ public String getTargetName() {
return this.pluginManager.getDefaultConnProvider().getTargetName();
}

public EnumSet<SessionDirtyFlag> getCurrentConnectionState() {
return this.currentConnectionSessionState.clone();
}

public void setCurrentConnectionState(SessionDirtyFlag flag) {
this.currentConnectionSessionState.add(flag);
}

public void resetCurrentConnectionState(SessionDirtyFlag flag) {
this.currentConnectionSessionState.remove(flag);
}

public void resetCurrentConnectionStates() {
this.currentConnectionSessionState.clear();
}

public boolean getAutoCommit() {
return this.isAutoCommit;
}

public void setAutoCommit(final boolean autoCommit) {
this.isAutoCommit = autoCommit;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@
import software.amazon.jdbc.hostavailability.HostAvailability;
import software.amazon.jdbc.plugin.AbstractConnectionPlugin;
import software.amazon.jdbc.plugin.staledns.AuroraStaleDnsHelper;
import software.amazon.jdbc.states.RestoreSessionStateCallable;
import software.amazon.jdbc.states.SessionDirtyFlag;
import software.amazon.jdbc.states.SessionStateHelper;
import software.amazon.jdbc.states.SessionStateTransferCallable;
import software.amazon.jdbc.util.Messages;
import software.amazon.jdbc.util.RdsUrlType;
import software.amazon.jdbc.util.RdsUtils;
Expand Down Expand Up @@ -85,6 +89,10 @@ public class FailoverConnectionPlugin extends AbstractConnectionPlugin {
static final String METHOD_ABORT = "Connection.abort";
static final String METHOD_CLOSE = "Connection.close";
static final String METHOD_IS_CLOSED = "Connection.isClosed";

protected static SessionStateTransferCallable sessionStateTransferCallable;
protected static RestoreSessionStateCallable restoreSessionStateCallable;

private final PluginService pluginService;
protected final Properties properties;
protected boolean enableFailoverSetting;
Expand Down Expand Up @@ -199,6 +207,22 @@ public FailoverConnectionPlugin(final PluginService pluginService, final Propert
this.failoverReaderFailedCounter = telemetryFactory.createCounter("readerFailover.completed.failed.count");
}

public static void setSessionStateTransferFunc(SessionStateTransferCallable callable) {
sessionStateTransferCallable = callable;
}

public static void resetSessionStateTransferFunc() {
sessionStateTransferCallable = null;
}

public static void setRestoreSessionStateFunc(RestoreSessionStateCallable callable) {
restoreSessionStateCallable = callable;
}

public static void resetRestoreSessionStateFunc() {
restoreSessionStateCallable = null;
}

@Override
public Set<String> getSubscribedMethods() {
return subscribedMethods;
Expand Down Expand Up @@ -521,9 +545,10 @@ private boolean shouldAttemptReaderConnection() {
*/
private void switchCurrentConnectionTo(final HostSpec host, final Connection connection) throws SQLException {
Connection currentConnection = this.pluginService.getCurrentConnection();
HostSpec currentHostSpec = this.pluginService.getCurrentHostSpec();

if (currentConnection != connection) {
transferSessionState(currentConnection, connection);
transferSessionState(currentConnection, currentHostSpec, connection, host);
invalidateCurrentConnection();
}

Expand All @@ -535,46 +560,73 @@ private void switchCurrentConnectionTo(final HostSpec host, final Connection con
}

/**
* Transfers basic session state from one connection to another.
* Transfers session state from one connection to another.
*
* @param from The connection to transfer state from
* @param to The connection to transfer state to
* @param src The connection to transfer state from
* @param srcHostSpec The connection {@link HostSpec} to transfer state from
* @param dest The connection to transfer state to
* @param destHostSpec The connection {@link HostSpec} to transfer state to
* @throws SQLException if a database access error occurs, this method is called on a closed connection, this
* method is called during a distributed transaction, or this method is called during a
* transaction
*/
protected void transferSessionState(
final Connection from,
final Connection to) throws SQLException {
final Connection src,
final HostSpec srcHostSpec,
final Connection dest,
final HostSpec destHostSpec) throws SQLException {

if (from == null || to == null) {
if (src == null || dest == null) {
return;
}

to.setReadOnly(from.isReadOnly());
to.setAutoCommit(from.getAutoCommit());
to.setTransactionIsolation(from.getTransactionIsolation());
EnumSet<SessionDirtyFlag> sessionState = this.pluginService.getCurrentConnectionState();

SessionStateTransferCallable callableCopy = sessionStateTransferCallable;
if (callableCopy != null) {
final boolean isHandled = callableCopy.transferSessionState(sessionState, src, srcHostSpec, dest, destHostSpec);
if (isHandled) {
// Custom function has handled session transfer
return;
}
}

// Otherwise, lets run default logic.
sessionState = this.pluginService.getCurrentConnectionState();
final SessionStateHelper helper = new SessionStateHelper();
helper.transferSessionState(sessionState, src, dest);
}

/**
* Restores partial session state from saved values to a connection.
*
* @param to The connection to transfer state to
* @param dest The connection to transfer state to
* @throws SQLException if a database access error occurs, this method is called on a closed connection, this
* method is called during a distributed transaction, or this method is called during a
* transaction
*/
protected void restoreSessionState(final Connection to) throws SQLException {
if (to == null) {
protected void restoreSessionState(final Connection dest) throws SQLException {
if (dest == null) {
return;
}

if (savedReadOnlyStatus != null) {
to.setReadOnly(savedReadOnlyStatus);
}
if (savedAutoCommitStatus != null) {
to.setAutoCommit(savedAutoCommitStatus);
final RestoreSessionStateCallable callableCopy = restoreSessionStateCallable;
if (callableCopy != null) {
final boolean isHandled = callableCopy.restoreSessionState(
this.pluginService.getCurrentConnectionState(),
dest,
this.savedReadOnlyStatus,
this.savedAutoCommitStatus
);
if (isHandled) {
// Custom function has handled everything.
return;
}
}

// Otherwise, lets run default logic.
final SessionStateHelper helper = new SessionStateHelper();
helper.restoreSessionState(dest, this.savedReadOnlyStatus, this.savedAutoCommitStatus);
}

private <E extends Exception> void dealWithOriginalException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@
import software.amazon.jdbc.cleanup.CanReleaseResources;
import software.amazon.jdbc.plugin.AbstractConnectionPlugin;
import software.amazon.jdbc.plugin.failover.FailoverSQLException;
import software.amazon.jdbc.states.SessionDirtyFlag;
import software.amazon.jdbc.states.SessionStateHelper;
import software.amazon.jdbc.states.SessionStateTransferCallable;
import software.amazon.jdbc.util.Messages;
import software.amazon.jdbc.util.SqlState;
import software.amazon.jdbc.util.WrapperUtils;
Expand All @@ -61,6 +64,9 @@ public class ReadWriteSplittingPlugin extends AbstractConnectionPlugin
static final String METHOD_SET_READ_ONLY = "Connection.setReadOnly";
static final String METHOD_CLEAR_WARNINGS = "Connection.clearWarnings";

protected static SessionStateTransferCallable sessionStateTransferCallable;


private final PluginService pluginService;
private final Properties properties;
private final String readerSelectorStrategy;
Expand Down Expand Up @@ -105,6 +111,14 @@ public class ReadWriteSplittingPlugin extends AbstractConnectionPlugin
this.readerConnection = readerConnection;
}

public static void setSessionStateTransferFunc(SessionStateTransferCallable callable) {
sessionStateTransferCallable = callable;
}

public static void resetSessionStateTransferFunc() {
sessionStateTransferCallable = null;
}

@Override
public Set<String> getSubscribedMethods() {
return subscribedMethods;
Expand Down Expand Up @@ -408,7 +422,7 @@ private void switchCurrentConnectionTo(
return;
}

transferSessionStateOnReadWriteSplit(newConnection);
transferSessionStateOnReadWriteSplit(newConnection, newConnectionHost);
this.pluginService.setCurrentConnection(newConnection, newConnectionHost);
LOGGER.finest(() -> Messages.get(
"ReadWriteSplittingPlugin.settingCurrentConnection",
Expand All @@ -421,19 +435,41 @@ private void switchCurrentConnectionTo(
* status. This method is only called when setReadOnly is being called; the read-only status
* will be updated when the setReadOnly call continues down the plugin chain
*
* @param to The connection to transfer state to
* @param dest The destination connection to transfer state to
* @param destHostSpec The destination connection {@link HostSpec}
* @throws SQLException if a database access error occurs, this method is called on a closed
* connection, or this method is called during a distributed transaction
*/
protected void transferSessionStateOnReadWriteSplit(
final Connection to) throws SQLException {
final Connection from = this.pluginService.getCurrentConnection();
if (from == null || to == null) {
final Connection dest,
final HostSpec destHostSpec)
throws SQLException {

final Connection src = this.pluginService.getCurrentConnection();
if (src == null || dest == null) {
return;
}

to.setAutoCommit(from.getAutoCommit());
to.setTransactionIsolation(from.getTransactionIsolation());
EnumSet<SessionDirtyFlag> sessionState = this.pluginService.getCurrentConnectionState();

SessionStateTransferCallable callableCopy = sessionStateTransferCallable;
if (callableCopy != null) {
final boolean isHandled = callableCopy.transferSessionState(
sessionState,
src,
this.pluginService.getCurrentHostSpec(),
dest,
destHostSpec);
if (isHandled) {
// Custom function has handled session transfer
return;
}
}

sessionState = this.pluginService.getCurrentConnectionState();
sessionState.remove(SessionDirtyFlag.READONLY); // We don't want to change READONLY flag of the connection
final SessionStateHelper helper = new SessionStateHelper();
helper.transferSessionState(sessionState, src, dest);
}

private synchronized void switchToReaderConnection(final List<HostSpec> hosts)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package software.amazon.jdbc.states;

import java.sql.Connection;
import java.sql.SQLException;
import java.util.EnumSet;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;

public interface RestoreSessionStateCallable {
/**
* Restores partial session state from saved values to a connection.
*
* @param sessionState Session state flags for from-connection
* @param dest The destination connection to transfer state to
* @param readOnly ReadOnly flag to set to
* @param autoCommit AutoCommit flag to set to
* @return true, if session state is restored successful and no default logic should be executed after.
* False, if default logic should be executed.
*/
boolean restoreSessionState(
final @NonNull EnumSet<SessionDirtyFlag> sessionState,
final @NonNull Connection dest,
final @Nullable Boolean readOnly,
final @Nullable Boolean autoCommit)
throws SQLException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package software.amazon.jdbc.states;


import java.util.EnumSet;

public enum SessionDirtyFlag {
READONLY,
AUTO_COMMIT,
TRANSACTION_ISOLATION,
CATALOG,
NETWORK_TIMEOUT,
SCHEMA,
TYPE_MAP,
HOLDABILITY;

public static final EnumSet<SessionDirtyFlag> ALL = EnumSet.allOf(SessionDirtyFlag.class);
}
Loading

0 comments on commit 6ac6957

Please sign in to comment.