From 8933f309c9d5e80c56421d74ffba675e780750c9 Mon Sep 17 00:00:00 2001
From: Michael McMahon
+ *
+ * Since the 23.1 release, Oracle JDBC no longer blocks threads during + * asynchronous calls. The remainder of this JavaDoc describes behavior which + * was present in 21.x releases of Oracle JDBC. If a 23.1 or newer version + * of Oracle JDBC is installed, then Oracle R2DBC will use the + * {@link NoOpAsyncLock} rather than {@link AsyncLockImpl}. + *
* Any time Oracle R2DBC invokes a synchronous API of Oracle JDBC, it will * acquire an instance of this lock before doing so. Synchronous method calls * will block a thread if JDBC has a database call in progress, and this can @@ -90,83 +96,28 @@ * methods. *
*/ -final class AsyncLock { - - /** - * Count that is incremented for invocation of {@link #lock(Runnable)}, and is - * decremented by each invocation of {@link #unlock()}. This lock is unlocked - * when the count is 0. - */ - private final AtomicInteger waitCount = new AtomicInteger(0); +public interface AsyncLock { /** - * Dequeue of {@code Runnable} callbacks enqueued each time an invocation of - * {@link #lock(Runnable)} is not able to acquire this lock. The head of this - * dequeue is dequeued and executed by an invocation of {@link #unlock()}. - */ - private final ConcurrentLinkedDeque- * A {@code Subscriber} that uses this {@link AsyncLock} to ensure that - * threads do not become blocked when contending for this adapter's JDBC - * {@code Connection}. Any time a {@code Subscriber} subscribes to a - * {@code Publisher} that uses the JDBC {@code Connection}, an instance of - * {@code UsingConnectionSubscriber} should be created in order to proxy - * signals between that {@code Publisher} and the downstream - * {@code Subscriber}. - *
- * - *- * {@code UsingConnectionSubscriber} solves a problem with how Oracle JDBC - * implements thread safety. When an asynchronous database call is initiated - * with a {@code Connection}, that {@code Connection} is locked until - * the call completes. When a {@code Connection} is locked, any thread that - * invokes a method of that {@code Connection} or any object created by that - * {@code Connection} will become blocked. This can lead to a deadlock where - * all threads in a pool have become blocked until the database call - * completes, and JDBC can not complete the database call until a thread - * becomes unblocked. - *
- * As a simplified example, consider what would happen with the code below if - * the Executor had a pool of 1 thread: - *
- * List> publishers = new ArrayList<>(); - * executor.execute(() -> { - * try { - * publishers.add(connection.prepareStatement("SELECT 0 FROM dual") - * .unwrap(OraclePreparedStatement.class) - * .executeAsyncOracle()); - * - * publishers.add(connection.prepareStatement("SELECT 1 FROM dual") - * .unwrap(OraclePreparedStatement.class) - * .executeAsyncOracle()); - * } - * catch (SQLException sqlException) { - * sqlException.printStackTrace(); - * } - * }); - *
- * After the first call to {@code executeAsyncOracle}, the connection is - * locked, and so when the second call to {@code executeAsyncOracle} is - * made, the executor thread is blocked. If Oracle JDBC is configured to use - * this same executor, which has a pool of just one thread, then no thread - * is left to handle the response from the database for the first call to - * {@code executeAsyncOracle}. With no thread available to handle the - * response, the call is never completed and the connection is never - * unlocked, so the code above results in a deadlock. - *
- * While the code above presents a somewhat obvious scenario, it is more - * common for deadlocks to occur in less obvious ways. Consider this code - * example which uses Project Reactor and R2DBC: - *
- * Flux.usingWhen( - * connectionFactory.create(), - * connection -> - * Flux.usingWhen( - * Mono.from(connection.beginTransaction()) - * .thenReturn(connection), - * connection -> - * connection.createStatement("INSERT INTO deadlock VALUES(?)") - * .bind(0, 0) - * .execute(), - * Connection::commitTransaction), - * Connection::close) - * .hasElements(); - *
- * The hasElements() operator transforms the sequence into a single boolean - * value. When an {@code onNext} signal delivers this value, the subscriber - * emits a {@code cancel} signal to the upstream publisher as the - * subscriber does not require any additional values. This cancel signal - * triggers a subscription to both the commitTransaction() publisher and to - * the close() publisher. The commitTransaction() publisher subscribed to - * first, and this has the Oracle JDBC connection locked until that - * database call completes. The close() publisher is subscribed to immediately - * afterwards, and this has the thread become blocked. As there is no - * thread left to handle the result of the commit, the connection never - * becomes unlocked. - *
- * - *- * Access to the JDBC Connection must be guarded such that no thread will - * attempt to use it when an asynchronous database call is in-flight. The - * potential for an in-flight call exists whenever there is a pending signal - * from the upstream {@code Publisher}. Instances of - * {@code UsingConnectionSubscriber} acquire this {@link AsyncLock} - * before requesting a signal from the publisher, and release the - * {@code asyncLock} once that signal is received. This ensures that no other - * thread will be able to acquire the {@code asyncLock} when a pending signal - * is potentially pending upon an asynchronous database call. - *
- * An {@code onSubscribe} signal is pending between an invocation of - * {@link Publisher#subscribe(Subscriber)} and an invocation of - * {@link Subscriber#onSubscribe(Subscription)}. Accordingly, the - * {@link AsyncLock} MUST be acquired before invoking - * {@code subscribe} with an instance of {@code UsingConnectionSubscriber}. - * When that instance receives an {@code onSubscribe} signal, it will release - * the {@code asyncLock}. - *
- * An {@code onNext} signal is pending between an invocation of - * {@link Subscription#request(long)} and a number of invocations of - * {@link Subscriber#onNext(Object)} equal to the number of - * values requested. Accordingly, instances of - * {@code UsingConnectionSubscriber} acquire the {@link AsyncLock} before - * emitting a {@code request} signal, and release the {@code asyncLock} when - * a corresponding number of {@code onNext} signals have been received. - *
- * When a {@code cancel} signal is emitted to the upstream {@code Publisher}, - * that publisher will not emit any further signals to the downstream - * {@code Subscriber}. If an instance {@code UsingConnectionSubscriber} - * has acquired the {@link AsyncLock} for a pending {@code onNext} signal, - * then it will defer sending a {@code cancel} signal until the pending - * {@code onNext} signal has been received. Deferring cancellation until the - * the publisher invokes {@code onNext} ensures that the cancellation happens - * after any pending database call, and before any subsequent database calls - * that would obtain additional values for {@code onNext}. - *
- */ - private final class UsingConnectionSubscriber- * Acquires the lock before signalling a {@code request} upstream, - * where the request will increase demand from zero. Increasing demand - * from zero may initiate a database call from JDBC, so the lock must be - * acquired first. - *
- * The lock is released after {@code onNext} signals have decreased demand - * back to zero. Or, a terminal {@code onComplete/onError} signal may have - * the lock released before demand reaches zero. - *
- * If demand is increased from a number greater than zero, this - * indicates that the lock has already been acquired for a previous - * request, and that the lock can not be released until demand - * reaches zero. The request is sent upstream without reacquiring the - * lock in this case. - *
- * If demand is a negative number, this indicates that a terminal signal - * has already been received, either from upstream with - * {@code onComplete/onError}, or from downstream with {@code cancel}. In - * either case, the lock is not acquired and the request is not sent - * upstream; If this subscription is terminated, then there will be no - * future signals to unlock the lock. - *
- */ - @Override - public void request(long n) { - lock(() -> { - long currentDemand = demand.getAndUpdate(current -> - current < 0L - ? current // Leave negative values as is - : (Long.MAX_VALUE - current) < n // Check for overflow - ? Long.MAX_VALUE - : current + n); - - if (currentDemand >= 0) - upstream.request(n); - else //if (currentDemand == TERMINATED) - unlock(); - }); - } - - /** - * {@inheritDoc} - *- * Decrements demand and releases the lock if it has reached zero. When - * demand is zero, there should be no active database calls from JDBC. - *
- * If a {@code cancel} signal has been received from downstream, but has - * not yet been sent upstream, then it will be sent from this method and - * the lock will be released. The upstream publisher should detect the - * cancel signal after it has called {@code onNext} on this subscriber, and - * and so it should cancel any future database calls. - *
- */ - @Override - public void onNext(T value) { - - long currentDemand = demand.getAndUpdate(current -> - current == Long.MAX_VALUE - ? current - : current == CANCEL_PENDING - ? TERMINATED - : current - 1L); - - if (currentDemand == CANCEL_PENDING) { - unlock(); - upstream.cancel(); - } - else if (currentDemand > 0L) { - - if (currentDemand == 1) - unlock(); - - downstream.onNext(value); - } - // else: - // Nothing is sent downstream if this subscription has been cancelled. - - } - - /** - * {@inheritDoc} - *- * Defers sending the {@code cancel} upstream if an {@code onNext} signal - * is pending. If an {@code onNext} signal is pending, then there may be - * a database call in progress, and this subscriber must wait for that call - * to complete before releasing the lock. In this case, the demand is set - * to a negative value, and {@link #onNext(Object)} will detect this and - * send the {@code cancel} signal. - *
- * If no {@code onNext} signal is pending, then the {@code cancel} signal - * is sent upstream immediately. - *
- */ - @Override - public void cancel() { - long currentDemand = demand.getAndUpdate(current -> - current > 0 || current == CANCEL_PENDING - ? CANCEL_PENDING - : TERMINATED); - - if (currentDemand == 0) - upstream.cancel(); - - } - - @Override - public void onError(Throwable error) { - terminate(); - downstream.onError(error); - } - - @Override - public void onComplete() { - terminate(); - downstream.onComplete(); - } - - /** - * Terminates upon receiving {@code onComplete} or {@code onError}. - * Termination has this subscriber release the lock if it is currently - * being held. The {@link #demand} is updated so that no future request - * signals will have this subscriber acquire the lock again. - */ - private void terminate() { - long currentDemand = demand.getAndSet(TERMINATED); - - if (currentDemand > 0 || currentDemand == CANCEL_PENDING) - unlock(); - } - } ++ * A {@code Subscriber} that uses this {@link AsyncLockImpl} to ensure that + * threads do not become blocked when contending for this adapter's JDBC + * {@code Connection}. Any time a {@code Subscriber} subscribes to a + * {@code Publisher} that uses the JDBC {@code Connection}, an instance of + * {@code UsingConnectionSubscriber} should be created in order to proxy + * signals between that {@code Publisher} and the downstream + * {@code Subscriber}. + *
+ * + *+ * {@code UsingConnectionSubscriber} solves a problem with how Oracle JDBC + * implements thread safety. When an asynchronous database call is initiated + * with a {@code Connection}, that {@code Connection} is locked until + * the call completes. When a {@code Connection} is locked, any thread that + * invokes a method of that {@code Connection} or any object created by that + * {@code Connection} will become blocked. This can lead to a deadlock where + * all threads in a pool have become blocked until the database call + * completes, and JDBC can not complete the database call until a thread + * becomes unblocked. + *
+ * As a simplified example, consider what would happen with the code below if + * the Executor had a pool of 1 thread: + *
+ * List> publishers = new ArrayList<>(); + * executor.execute(() -> { + * try { + * publishers.add(connection.prepareStatement("SELECT 0 FROM dual") + * .unwrap(OraclePreparedStatement.class) + * .executeAsyncOracle()); + * + * publishers.add(connection.prepareStatement("SELECT 1 FROM dual") + * .unwrap(OraclePreparedStatement.class) + * .executeAsyncOracle()); + * } + * catch (SQLException sqlException) { + * sqlException.printStackTrace(); + * } + * }); + *
+ * After the first call to {@code executeAsyncOracle}, the connection is + * locked, and so when the second call to {@code executeAsyncOracle} is + * made, the executor thread is blocked. If Oracle JDBC is configured to use + * this same executor, which has a pool of just one thread, then no thread + * is left to handle the response from the database for the first call to + * {@code executeAsyncOracle}. With no thread available to handle the + * response, the call is never completed and the connection is never + * unlocked, so the code above results in a deadlock. + *
+ * While the code above presents a somewhat obvious scenario, it is more + * common for deadlocks to occur in less obvious ways. Consider this code + * example which uses Project Reactor and R2DBC: + *
+ * Flux.usingWhen( + * connectionFactory.create(), + * connection -> + * Flux.usingWhen( + * Mono.from(connection.beginTransaction()) + * .thenReturn(connection), + * connection -> + * connection.createStatement("INSERT INTO deadlock VALUES(?)") + * .bind(0, 0) + * .execute(), + * Connection::commitTransaction), + * Connection::close) + * .hasElements(); + *
+ * The hasElements() operator transforms the sequence into a single boolean + * value. When an {@code onNext} signal delivers this value, the subscriber + * emits a {@code cancel} signal to the upstream publisher as the + * subscriber does not require any additional values. This cancel signal + * triggers a subscription to both the commitTransaction() publisher and to + * the close() publisher. The commitTransaction() publisher subscribed to + * first, and this has the Oracle JDBC connection locked until that + * database call completes. The close() publisher is subscribed to immediately + * afterwards, and this has the thread become blocked. As there is no + * thread left to handle the result of the commit, the connection never + * becomes unlocked. + *
+ * + *+ * Access to the JDBC Connection must be guarded such that no thread will + * attempt to use it when an asynchronous database call is in-flight. The + * potential for an in-flight call exists whenever there is a pending signal + * from the upstream {@code Publisher}. Instances of + * {@code UsingConnectionSubscriber} acquire this {@link AsyncLockImpl} + * before requesting a signal from the publisher, and release the + * {@code asyncLock} once that signal is received. This ensures that no other + * thread will be able to acquire the {@code asyncLock} when a pending signal + * is potentially pending upon an asynchronous database call. + *
+ * An {@code onSubscribe} signal is pending between an invocation of + * {@link Publisher#subscribe(Subscriber)} and an invocation of + * {@link Subscriber#onSubscribe(Subscription)}. Accordingly, the + * {@link AsyncLockImpl} MUST be acquired before invoking + * {@code subscribe} with an instance of {@code UsingConnectionSubscriber}. + * When that instance receives an {@code onSubscribe} signal, it will release + * the {@code asyncLock}. + *
+ * An {@code onNext} signal is pending between an invocation of + * {@link Subscription#request(long)} and a number of invocations of + * {@link Subscriber#onNext(Object)} equal to the number of + * values requested. Accordingly, instances of + * {@code UsingConnectionSubscriber} acquire the {@link AsyncLockImpl} before + * emitting a {@code request} signal, and release the {@code asyncLock} when + * a corresponding number of {@code onNext} signals have been received. + *
+ * When a {@code cancel} signal is emitted to the upstream {@code Publisher}, + * that publisher will not emit any further signals to the downstream + * {@code Subscriber}. If an instance {@code UsingConnectionSubscriber} + * has acquired the {@link AsyncLockImpl} for a pending {@code onNext} signal, + * then it will defer sending a {@code cancel} signal until the pending + * {@code onNext} signal has been received. Deferring cancellation until the + * the publisher invokes {@code onNext} ensures that the cancellation happens + * after any pending database call, and before any subsequent database calls + * that would obtain additional values for {@code onNext}. + *
+ */ + private final class UsingConnectionSubscriber+ * Acquires the lock before signalling a {@code request} upstream, + * where the request will increase demand from zero. Increasing demand + * from zero may initiate a database call from JDBC, so the lock must be + * acquired first. + *
+ * The lock is released after {@code onNext} signals have decreased demand + * back to zero. Or, a terminal {@code onComplete/onError} signal may have + * the lock released before demand reaches zero. + *
+ * If demand is increased from a number greater than zero, this + * indicates that the lock has already been acquired for a previous + * request, and that the lock can not be released until demand + * reaches zero. The request is sent upstream without reacquiring the + * lock in this case. + *
+ * If demand is a negative number, this indicates that a terminal signal + * has already been received, either from upstream with + * {@code onComplete/onError}, or from downstream with {@code cancel}. In + * either case, the lock is not acquired and the request is not sent + * upstream; If this subscription is terminated, then there will be no + * future signals to unlock the lock. + *
+ */ + @Override + public void request(long n) { + lock(() -> { + long currentDemand = demand.getAndUpdate(current -> + current < 0L + ? current // Leave negative values as is + : (Long.MAX_VALUE - current) < n // Check for overflow + ? Long.MAX_VALUE + : current + n); + + if (currentDemand >= 0) + upstream.request(n); + else //if (currentDemand == TERMINATED) + unlock(); + }); + } + + /** + * {@inheritDoc} + *+ * Decrements demand and releases the lock if it has reached zero. When + * demand is zero, there should be no active database calls from JDBC. + *
+ * If a {@code cancel} signal has been received from downstream, but has + * not yet been sent upstream, then it will be sent from this method and + * the lock will be released. The upstream publisher should detect the + * cancel signal after it has called {@code onNext} on this subscriber, and + * and so it should cancel any future database calls. + *
+ */ + @Override + public void onNext(T value) { + + long currentDemand = demand.getAndUpdate(current -> + current == Long.MAX_VALUE + ? current + : current == CANCEL_PENDING + ? TERMINATED + : current - 1L); + + if (currentDemand == CANCEL_PENDING) { + unlock(); + upstream.cancel(); + } + else if (currentDemand > 0L) { + + if (currentDemand == 1) + unlock(); + + downstream.onNext(value); + } + // else: + // Nothing is sent downstream if this subscription has been cancelled. + + } + + /** + * {@inheritDoc} + *+ * Defers sending the {@code cancel} upstream if an {@code onNext} signal + * is pending. If an {@code onNext} signal is pending, then there may be + * a database call in progress, and this subscriber must wait for that call + * to complete before releasing the lock. In this case, the demand is set + * to a negative value, and {@link #onNext(Object)} will detect this and + * send the {@code cancel} signal. + *
+ * If no {@code onNext} signal is pending, then the {@code cancel} signal + * is sent upstream immediately. + *
+ */ + @Override + public void cancel() { + long currentDemand = demand.getAndUpdate(current -> + current > 0 || current == CANCEL_PENDING + ? CANCEL_PENDING + : TERMINATED); + + if (currentDemand == 0) + upstream.cancel(); + + } + + @Override + public void onError(Throwable error) { + terminate(); + downstream.onError(error); + } + + @Override + public void onComplete() { + terminate(); + downstream.onComplete(); + } + + /** + * Terminates upon receiving {@code onComplete} or {@code onError}. + * Termination has this subscriber release the lock if it is currently + * being held. The {@link #demand} is updated so that no future request + * signals will have this subscriber acquire the lock again. + */ + private void terminate() { + long currentDemand = demand.getAndSet(TERMINATED); + + if (currentDemand > 0 || currentDemand == CANCEL_PENDING) + unlock(); + } + } + +} diff --git a/src/main/java/oracle/r2dbc/impl/NoOpAsyncLock.java b/src/main/java/oracle/r2dbc/impl/NoOpAsyncLock.java new file mode 100644 index 0000000..bcf969e --- /dev/null +++ b/src/main/java/oracle/r2dbc/impl/NoOpAsyncLock.java @@ -0,0 +1,61 @@ +/* + Copyright (c) 2020, 2021, Oracle and/or its affiliates. + + This software is dual-licensed to you under the Universal Permissive License + (UPL) 1.0 as shown at https://oss.oracle.com/licenses/upl or Apache License + 2.0 as shown at http://www.apache.org/licenses/LICENSE-2.0. You may choose + either license. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package oracle.r2dbc.impl; + +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +/** + * A no-op implementation of {@link AsyncLock} for use with 23.1 and newer + * versions of Oracle JDBC. All methods are implemented by immediately executing + * operations without acquiring a lock. + */ +final class NoOpAsyncLock implements AsyncLock { + + @Override + public void lock(Runnable callback) { + callback.run(); + } + + @Override + public Publisher
* A instance of this class is obtained by invoking {@link #getInstance()}. A
@@ -124,12 +126,21 @@
final class OracleReactiveJdbcAdapter implements ReactiveJdbcAdapter {
/** Guards access to a JDBC {@code Connection} created by this adapter */
- private final AsyncLock asyncLock = new AsyncLock();
+ private final AsyncLock asyncLock;
/**
* Used to construct the instances of this class.
*/
- private OracleReactiveJdbcAdapter() { }
+ private OracleReactiveJdbcAdapter() {
+ int driverVersion = new oracle.jdbc.OracleDriver().getMajorVersion();
+
+ // Since 23.1, Oracle JDBC no longer blocks threads during asynchronous
+ // calls. Use the no-op implementation of AsyncLock if the driver is 23 or
+ // newer.
+ asyncLock = driverVersion < 23
+ ? new AsyncLockImpl()
+ : new NoOpAsyncLock();
+ }
/**
* Returns an instance of this adapter.
@@ -618,7 +629,7 @@ private static void configureJdbcDefaults(OracleDataSource oracleDataSource) {
// Have the Oracle JDBC Driver implement behavior that the JDBC
// Specification defines as correct. The javadoc for this property lists
- // all of it's effects. One effect is to have ResultSetMetaData describe
+ // its effects. One effect is to have ResultSetMetaData describe
// FLOAT columns as the FLOAT type, rather than the NUMBER type. This
// effect allows the Oracle R2DBC Driver obtain correct metadata for
// FLOAT type columns. The property is deprecated, but the deprecation note
@@ -653,6 +664,16 @@ private static void configureJdbcDefaults(OracleDataSource oracleDataSource) {
// TODO: Disable the result set cache? This is needed to support the
// SERIALIZABLE isolation level, which requires result set caching to be
// disabled.
+
+ // Disable "zero copy IO" by default. This is important when using JSON or
+ // VECTOR binds, which are usually sent with zero copy IO. The 23.4 database
+ // does not fully support zero copy IO with pipelined calls. In particular,
+ // it won't respond if a SQL operation results in an error, and zero copy IO
+ // was used to send bind values. This will likely be resolved in a later
+ // release; Keep an eye on bug #36485816 to see when it's fixed.
+ setPropertyIfAbsent(oracleDataSource,
+ OracleConnection.CONNECTION_PROPERTY_THIN_NET_USE_ZERO_COPY_IO,
+ "false");
}
/**
diff --git a/src/main/java/oracle/r2dbc/impl/ReactiveJdbcAdapter.java b/src/main/java/oracle/r2dbc/impl/ReactiveJdbcAdapter.java
index 2ec7b6a..d9a1350 100755
--- a/src/main/java/oracle/r2dbc/impl/ReactiveJdbcAdapter.java
+++ b/src/main/java/oracle/r2dbc/impl/ReactiveJdbcAdapter.java
@@ -535,7 +535,7 @@ Publisher