Skip to content

Commit

Permalink
Add support for automatic remote object disposal.
Browse files Browse the repository at this point in the history
  • Loading branch information
broneill committed Oct 4, 2024
1 parent f0f0f0c commit 430afa5
Show file tree
Hide file tree
Showing 28 changed files with 1,209 additions and 157 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ Changelog
v2.4.0
------
* Added pipe methods for efficiently encoding and decoding complex objects.
* Added support for automatic remote object disposal.
* Optimize reading and writing primitive arrays.

v2.3.3 (2024-04-10)
Expand Down
35 changes: 35 additions & 0 deletions src/main/java/org/cojen/dirmi/AutoDispose.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright 2024 Cojen.org
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.cojen.dirmi;

import java.lang.annotation.*;

/**
* Designates a remote interface as supporting automatic disposal when the client-side object
* is reclaimed by the garbage collector. Automatic disposal doesn't work for the {@link
* Session#root root} object, and it doesn't work when a reference cycle is formed between
* client-side and remote-side objects.
*
* @author Brian S. O'Neill
* @see Disposer
* @see Session#dispose
*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
public @interface AutoDispose {
}
1 change: 1 addition & 0 deletions src/main/java/org/cojen/dirmi/Disposer.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
* @author Brian S O'Neill
* @see SessionAware
* @see Session#dispose
* @see AutoDispose
* @see DisposedException
*/
@Documented
Expand Down
166 changes: 166 additions & 0 deletions src/main/java/org/cojen/dirmi/core/AutoDisposer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
/*
* Copyright 2024 Cojen.org
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.cojen.dirmi.core;

import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;

import java.lang.ref.Reference;
import java.lang.ref.ReferenceQueue;
import java.lang.ref.WeakReference;

/**
*
*
* @author Brian S. O'Neill
*/
public final class AutoDisposer extends ReferenceQueue<Stub> implements Runnable {
private static AutoDisposer cInstance;

private static AutoDisposer access() {
AutoDisposer instance;
synchronized (AutoDisposer.class) {
instance = cInstance;
if (instance == null) {
cInstance = instance = new AutoDisposer();
}
instance.mRegistered++;
}
return instance;
}

private long mRegistered;

private AutoDisposer() {
Thread t = new Thread(this, getClass().getSimpleName());
t.setDaemon(true);
t.start();
}

public void run() {
// Infinite timeout.
long timeout = 0;

while (true) {
Reference<? extends Stub> ref;
try {
ref = remove(timeout);
} catch (InterruptedException e) {
// Clear the interrupted status.
Thread.interrupted();
ref = null;
}

if (ref == null) {
// Timed out waiting, which implies that the timeout isn't infinite.
synchronized (AutoDisposer.class) {
if (mRegistered == 0) {
// Still idle, so exit.
if (cInstance == this) {
cInstance = null;
}
return;
}
}
continue;
}

long removed = 1;

while (true) {
if (ref instanceof BasicRef br) {
br.removed();
}
ref = poll();
if (ref == null) {
break;
}
removed++;
}

synchronized (AutoDisposer.class) {
if ((mRegistered -= removed) == 0) {
// If still idle after one minute, then exit.
timeout = 60_000;
} else {
// Queue still has registered refs, so use an infinite timeout. There's no
// point in waking up this thread unless it has something to do.
timeout = 0;
}
}
}
}

public static sealed class BasicRef extends WeakReference<StubWrapper> {
private final StubInvoker mInvoker;

public BasicRef(StubWrapper wrapper, StubInvoker invoker) {
super(wrapper, access());
mInvoker = invoker;
}

void removed() {
if (mInvoker.support().session() instanceof CoreSession session) {
// Note: Although the pipe isn't flushed immediately, this operation might
// still block. If it does, then no dispose messages will be sent for any
// sessions until the blocked one automatically disconnects. This can be
// prevented by running a task in a separate thread, but that would end up
// creating a new temporary object. Ideally, the task option should only be
// used when the pipe's output buffer is full.
session.stubDisposeAndNotify(mInvoker, null, false);
}
}
}

public static final class CountedRef extends BasicRef {
private static final VarHandle cRefCountHandle;

static {
try {
var lookup = MethodHandles.lookup();
cRefCountHandle = lookup.findVarHandle(CountedRef.class, "mRefCount", long.class);
} catch (Throwable e) {
throw CoreUtils.rethrow(e);
}
}

private long mRefCount;

public CountedRef(StubWrapper wrapper, StubInvoker invoker) {
super(wrapper, invoker);
mRefCount = 1;
}

@Override
void removed() {
decRefCount(1);
}

public void incRefCount() {
cRefCountHandle.getAndAdd(this, 1L);
}

/**
* Note: Calling this method might block if a notification needs to be written.
*/
public void decRefCount(long amount) {
if (((long) cRefCountHandle.getAndAdd(this, -amount)) <= amount) {
super.removed();
}
}
}
}
22 changes: 11 additions & 11 deletions src/main/java/org/cojen/dirmi/core/ClientSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,11 @@ void init(long serverId, Class<R> rootType, byte[] bname,

mStubFactoriesByClass.putIfAbsent(rootType, factory);

Stub root = factory.newStub(rootId, stubSupport());
StubInvoker root = factory.newStub(rootId, stubSupport());
mStubs.put(root);
mRoot = (R) root;
mRoot = (R) root.init();

Stub.setRootOrigin(root);
StubInvoker.setRootOrigin(root);
}

@Override
Expand Down Expand Up @@ -133,8 +133,8 @@ boolean close(int reason, CorePipe controlPipe) {
}

private boolean isRootDisposed() {
return mRoot instanceof Stub stub
&& Stub.cSupportHandle.getAcquire(stub) instanceof DisposedStubSupport;
return mRoot instanceof StubInvoker stub
&& StubInvoker.cSupportHandle.getAcquire(stub) instanceof DisposedStubSupport;
}

@SuppressWarnings("unchecked")
Expand All @@ -161,7 +161,7 @@ private boolean reconnectAttempt(Object result) {

mEngine.changeIdentity(this, newSession.id);

var newRoot = (Stub) newSession.mRoot;
var newRoot = (StubInvoker) newSession.mRoot;
Object removed = newSession.mStubs.remove(newRoot);
assert newRoot == removed;
assert newSession.mStubs.size() == 0;
Expand All @@ -176,7 +176,7 @@ private boolean reconnectAttempt(Object result) {

cServerSessionIdHandle.setRelease(this, newSession.mServerSessionId);

var root = (Stub) mRoot;
var root = (StubInvoker) mRoot;
mStubs.changeIdentity(root, newRoot.id);

Map<String, RemoteInfo> typeMap;
Expand Down Expand Up @@ -219,7 +219,7 @@ private boolean reconnectAttempt(Object result) {
return false;
}

Stub.cSupportHandle.setRelease(mRoot, newSupport);
StubInvoker.cSupportHandle.setRelease(mRoot, newSupport);

// For all restorable stubs, update the MethodIdWriter and set a support object that
// allows them to restore on demand.
Expand Down Expand Up @@ -249,20 +249,20 @@ private boolean reconnectAttempt(Object result) {
}

if (writer != null) {
Stub.cWriterHandle.setRelease(stub, writer);
StubInvoker.cWriterHandle.setRelease(stub, writer);
} else {
// Although no remote methods changed, the current StubFactory is preferred.
if (type == null) {
type = RemoteExaminer.remoteType(stub);
}
StubFactory factory = mStubFactoriesByClass.get(type);
if (factory != null) {
Stub.cWriterHandle.setRelease(stub, factory);
StubInvoker.cWriterHandle.setRelease(stub, factory);
}
}

if (stub != mRoot) {
Stub.cSupportHandle.setRelease(stub, restorableSupport);
StubInvoker.cSupportHandle.setRelease(stub, restorableSupport);
}
});

Expand Down
Loading

0 comments on commit 430afa5

Please sign in to comment.