Skip to content

Commit

Permalink
Update
Browse files Browse the repository at this point in the history
  • Loading branch information
brianquinlan committed Feb 1, 2024
1 parent a6833bc commit 824ffcf
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 220 deletions.
252 changes: 43 additions & 209 deletions pkgs/web_socket_conformance_tests/lib/src/close_local_tests.dart
Original file line number Diff line number Diff line change
Expand Up @@ -49,243 +49,77 @@ void testLocalClose(
// If reasonBytes is longer than 123 bytes, then throw a "SyntaxError" DOMException.

final channel = await channelFactory(uri);
await expectLater(
() => channel.close(1004), throwsA(isA<XXXWebSocketException>()));
await expectLater(() => channel.close(1004), throwsA(isA<RangeError>()));
});

test('too long close reason', () async {
final channel = await channelFactory(uri);
await expectLater(() => channel.close(3000, 'Boom'.padLeft(1000)),
throwsA(isA<XXXWebSocketException>()));
await expectLater(() => channel.close(3000, 'a'.padLeft(124)),
throwsA(isA<ArgumentError>()));
});

test('with code and reason', () async {
test('close', () async {
final channel = await channelFactory(uri);

await channel.close(3000, 'Client initiated closure');
await channel.close();
final closeCode = await httpServerQueue.next as int?;
final closeReason = await httpServerQueue.next as String?;

expect(closeCode, 3000);
expect(closeReason, 'Client initiated closure');
expect(closeCode, 1005);
expect(closeReason, '');
expect(await channel.events.isEmpty, true);
});
/*
test('cancel', () async {
final channel =
channelFactory(uri.replace(queryParameters: {'sleep': '5'}));
var sinkDoneComplete = false;
var sinkDoneOnError = false;
var streamOnData = false;
var streamOnDone = false;
var streamOnError = false;
channel.sink.done.then((_) {
sinkDoneComplete = true;
}, onError: (_) {
sinkDoneOnError = true;
});
final streamSubscription = channel.stream.listen((_) {
streamOnData = true;
}, onError: (_) {
streamOnError = true;
}, onDone: () {
streamOnDone = true;
});
await expectLater(channel.ready, completes);
// VM: Cancels subscription to the socket, which means that this deadlocks.
await streamSubscription.cancel();
expect(() => channel.stream.listen((_) {}), throwsStateError);
channel.sink.add('add after stream closed');
expect(channel.closeCode, null);
expect(channel.closeReason, null);
expect(sinkDoneComplete, false);
expect(sinkDoneOnError, false);
expect(streamOnData, false);
expect(streamOnDone, false);
expect(streamOnError, false);
await channel.sink.done;
expect(await httpServerQueue.next, 'add after stream closed');
expect(await httpServerQueue.next, null);
expect(await httpServerQueue.next, null);
expect(channel.closeCode, 4123);
expect(channel.closeReason, 'server closed the connection');
// cancelling should close according to lassa!
}, skip: _isVM);
test('cancel - client close', () async {
final channel =
channelFactory(uri.replace(queryParameters: {'sleep': '5'}));
var sinkDoneComplete = false;
var sinkDoneOnError = false;
var streamOnData = false;
var streamOnDone = false;
var streamOnError = false;

channel.sink.done.then((_) {
sinkDoneComplete = true;
}, onError: (_) {
sinkDoneOnError = true;
});
final streamSubscription = channel.stream.listen((_) {
streamOnData = true;
}, onError: (_) {
streamOnError = true;
}, onDone: () {
streamOnDone = true;
});
await expectLater(channel.ready, completes);
await streamSubscription.cancel();
expect(() => channel.stream.listen((_) {}), throwsStateError);
channel.sink.add('add after stream closed');
expect(channel.closeCode, null);
expect(channel.closeReason, null);
test('with code 3000', () async {
final channel = await channelFactory(uri);

expect(sinkDoneComplete, false);
expect(sinkDoneOnError, false);
expect(streamOnData, false);
expect(streamOnDone, false);
expect(streamOnError, false);
await channel.close(3000);
final closeCode = await httpServerQueue.next as int?;
final closeReason = await httpServerQueue.next as String?;

await channel.sink.close(4444, 'client closed the connection');
expect(await httpServerQueue.next, 'add after stream closed');
expect(await httpServerQueue.next, 4444);
expect(await httpServerQueue.next, 'client closed the connection');
expect(channel.closeCode, 4444);
expect(channel.closeReason, 'client closed the connection');
expect(closeCode, 3000);
expect(closeReason, '');
expect(await channel.events.isEmpty, true);
});

test('client initiated', () async {
final channel = channelFactory(uri);
var sinkDoneComplete = false;
var sinkDoneOnError = false;
var streamOnData = false;
var streamOnDone = false;
var streamOnError = false;
channel.sink.done.then((_) {
sinkDoneComplete = true;
}, onError: (_) {
sinkDoneOnError = true;
});
channel.stream.listen((_) {
streamOnData = true;
}, onError: (_) {
streamOnError = true;
}, onDone: () {
streamOnDone = true;
});
await expectLater(channel.ready, completes);
await channel.sink.close(4444, 'client closed the connection');
expect(channel.closeCode, null); // VM 4123
expect(channel.closeReason, null); // VM 'server closed the connection'
expect(await httpServerQueue.next, 4444); // VM 4123
expect(await httpServerQueue.next,
'client closed the connection'); // VM 'server closed the connection'
expect(channel.closeCode, 4123);
expect(channel.closeReason, 'server closed the connection');
expect(() => channel.sink.add('add after connection closed'),
throwsStateError);
expect(sinkDoneComplete, true);
expect(sinkDoneOnError, false);
expect(streamOnData, false);
expect(streamOnDone, true);
expect(streamOnError, false);
}, skip: _isVM);
test('client initiated - slow server', () async {
final channel =
channelFactory(uri.replace(queryParameters: {'sleep': '5'}));
var sinkDoneComplete = false;
var sinkDoneOnError = false;
var streamOnData = false;
var streamOnDone = false;
var streamOnError = false;
channel.sink.done.then((_) {
sinkDoneComplete = true;
}, onError: (_) {
sinkDoneOnError = true;
});
channel.stream.listen((_) {
streamOnData = true;
}, onError: (_) {
streamOnError = true;
}, onDone: () {
streamOnDone = true;
});
await expectLater(channel.ready, completes);
await channel.sink.close(4444, 'client closed the connection');
expect(channel.closeCode, null);
expect(channel.closeReason, null);
test('with code 4999', () async {
final channel = await channelFactory(uri);

expect(await httpServerQueue.next, 4444);
expect(await httpServerQueue.next, 'client closed the connection');
expect(channel.closeCode, 4444); // VM: null - sometimes null
expect(channel.closeReason, 'client closed the connection'); // VM: null
expect(() => channel.sink.add('add after connection closed'),
throwsStateError);
await channel.sink.close();
await channel.close(4999);
final closeCode = await httpServerQueue.next as int?;
final closeReason = await httpServerQueue.next as String?;

expect(sinkDoneComplete, true);
expect(sinkDoneOnError, false);
expect(streamOnData, false);
expect(streamOnDone, true);
expect(streamOnError, false);
expect(closeCode, 4999);
expect(closeReason, '');
expect(await channel.events.isEmpty, true);
});

test('server initiated', () async {
final channel = channelFactory(uri);
test('with code and reason', () async {
final channel = await channelFactory(uri);

var sinkDoneComplete = false;
var sinkDoneOnError = false;
var streamOnData = false;
var streamOnDone = false;
var streamOnError = false;
await channel.close(3000, 'Client initiated closure');
final closeCode = await httpServerQueue.next as int?;
final closeReason = await httpServerQueue.next as String?;

channel.sink.done.then((_) {
sinkDoneComplete = true;
}, onError: (_) {
sinkDoneOnError = true;
});
expect(closeCode, 3000);
expect(closeReason, 'Client initiated closure');
expect(await channel.events.isEmpty, true);
});

final streamListen = channel.stream.listen((_) {
streamOnData = true;
}, onError: (_) {
streamOnError = true;
}).asFuture<void>();
test('close after close', () async {
final channel = await channelFactory(uri);

await expectLater(channel.ready, completes);
await streamListen;
await channel.close(3000, 'Client initiated closure');

expect(channel.closeCode, 4123);
expect(channel.closeReason, 'server closed the connection');
channel.sink.add('add after connection closed');
await channel.sink.close();
expectLater(
() async => await channel.close(3001, 'Client initiated closure'),
throwsA(isA<XXXWebSocketConnectionClosed>()));
final closeCode = await httpServerQueue.next as int?;
final closeReason = await httpServerQueue.next as String?;

expect(sinkDoneComplete, true);
expect(sinkDoneOnError, false);
expect(streamOnData, false);
expect(streamOnError, false);
expect(closeCode, 3000);
expect(closeReason, 'Client initiated closure');
expect(await channel.events.isEmpty, true);
});
*/
});
}
27 changes: 19 additions & 8 deletions pkgs/websocket/lib/iowebsocket.dart
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import 'dart:async';
import 'dart:convert';
import 'dart:io' as io;
import 'dart:typed_data';
import 'package:websocket/websocket.dart';
Expand Down Expand Up @@ -79,14 +80,24 @@ class IOWebSocket implements XXXWebSocket {
// endpoint that has already sent a Close frame will continue to process
// data.
@override
Future<void> close([int? code, String? reason]) async {
if (!_events.isClosed) {
unawaited(_events.close());
try {
await _webSocket.close(code, reason);
} on io.WebSocketException catch (e) {
throw XXXWebSocketException(e.message);
}
Future<void> close([int? code, String reason = '']) async {
if (_events.isClosed) {
throw XXXWebSocketConnectionClosed();
}

if (code != null) {
RangeError.checkValueInInterval(code, 3000, 4999, 'code');
}
if (utf8.encode(reason).length > 123) {
throw ArgumentError.value(reason, "reason",
"reason must be <= 123 bytes long when encoded as UTF-8");
}

unawaited(_events.close());
try {
await _webSocket.close(code, reason);
} on io.WebSocketException catch (e) {
throw XXXWebSocketException(e.message);
}
}

Expand Down
6 changes: 3 additions & 3 deletions pkgs/websocket/lib/websocket.dart
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ abstract interface class XXXWebSocket {
/// to the peer. If they are omitted, the peer will see a 1005 status code
/// with no reason.
///
/// If [code] is not in the range 3000-4999 then an [ArgumentError]
/// If [code] is not in the range 3000-4999 then an [RangeError]
/// will be thrown.
///
/// If [reason] is longer than 123 bytes when encoded as UTF-8 then
Expand All @@ -109,8 +109,8 @@ abstract interface class XXXWebSocket {
///
/// Throws [XXXWebSocketConnectionClosed] if the connection is already closed
/// (including by the peer). Alternatively, we could just throw the close
/// away.
Future<void> close([int? code, String? reason]);
/// away. Same as the reasoning for [sendText].
Future<void> close([int? code, String reason = '']);

/// Events received from the peer.
///
Expand Down

0 comments on commit 824ffcf

Please sign in to comment.