-
Notifications
You must be signed in to change notification settings - Fork 362
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add a WebSocket implementation to package:cupertino_http #1153
Changes from 9 commits
084456f
3a43cf1
cc7214d
f44d5e3
5873182
98c0ae7
34d3908
6f32092
e1ada6d
5605c0e
1aa6aa8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
// Copyright (c) 2024, the Dart project authors. Please see the AUTHORS file | ||
// for details. All rights reserved. Use of this source code is governed by a | ||
// BSD-style license that can be found in the LICENSE file. | ||
|
||
import 'package:cupertino_http/cupertino_http.dart'; | ||
import 'package:test/test.dart'; | ||
import 'package:web_socket_conformance_tests/web_socket_conformance_tests.dart'; | ||
|
||
void main() { | ||
testAll(CupertinoWebSocket.connect); | ||
|
||
group('defaultSessionConfiguration', () { | ||
testAll( | ||
CupertinoWebSocket.connect, | ||
); | ||
}); | ||
group('fromSessionConfiguration', () { | ||
final config = URLSessionConfiguration.ephemeralSessionConfiguration(); | ||
testAll((uri, {protocols}) => | ||
CupertinoWebSocket.connect(uri, protocols: protocols, config: config)); | ||
}); | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,204 @@ | ||
// Copyright (c) 2024, the Dart project authors. Please see the AUTHORS file | ||
// for details. All rights reserved. Use of this source code is governed by a | ||
// BSD-style license that can be found in the LICENSE file. | ||
|
||
import 'dart:async'; | ||
import 'dart:convert'; | ||
import 'dart:typed_data'; | ||
|
||
import 'package:web_socket/web_socket.dart'; | ||
|
||
import 'cupertino_api.dart'; | ||
|
||
/// An error occurred while connecting to the peer. | ||
class ConnectionException extends WebSocketException { | ||
final Error error; | ||
|
||
ConnectionException(super.message, this.error); | ||
|
||
@override | ||
String toString() => 'CupertinoErrorWebSocketException: $message $error'; | ||
} | ||
|
||
/// A [WebSocket] implemented using the | ||
/// [NSURLSessionWebSocketTask API](https://developer.apple.com/documentation/foundation/nsurlsessionwebsockettask). | ||
class CupertinoWebSocket implements WebSocket { | ||
/// Create a new WebSocket connection using the | ||
/// [NSURLSessionWebSocketTask API](https://developer.apple.com/documentation/foundation/nsurlsessionwebsockettask). | ||
/// | ||
/// The URL supplied in [url] must use the scheme ws or wss. | ||
/// | ||
/// If provided, the [protocols] argument indicates that subprotocols that | ||
/// the peer is able to select. See | ||
/// [RFC-6455 1.9](https://datatracker.ietf.org/doc/html/rfc6455#section-1.9). | ||
static Future<CupertinoWebSocket> connect(Uri url, | ||
{Iterable<String>? protocols, URLSessionConfiguration? config}) async { | ||
if (!url.isScheme('ws') && !url.isScheme('wss')) { | ||
throw ArgumentError.value( | ||
url, 'url', 'only ws: and wss: schemes are supported'); | ||
} | ||
|
||
final readyCompleter = Completer<CupertinoWebSocket>(); | ||
late CupertinoWebSocket webSocket; | ||
|
||
final session = URLSession.sessionWithConfiguration( | ||
config ?? URLSessionConfiguration.defaultSessionConfiguration(), | ||
// In a successful flow, the callbacks will be made in this order: | ||
// onWebSocketTaskOpened(...) // Good connect. | ||
// <receive/send messages to the peer> | ||
// onWebSocketTaskClosed(...) // Optional: peer sent Close frame. | ||
// onComplete(..., error=null) // Disconnected. | ||
// | ||
// In a failure to connect to the peer, the flow will be: | ||
// onComplete(session, task, error=error): | ||
// | ||
// `onComplete` can also be called at any point if the peer is | ||
// disconnected without Close frames being exchanged. | ||
onWebSocketTaskOpened: (session, task, protocol) { | ||
webSocket = CupertinoWebSocket._(task, protocol ?? ''); | ||
readyCompleter.complete(webSocket); | ||
}, onWebSocketTaskClosed: (session, task, closeCode, reason) { | ||
assert(readyCompleter.isCompleted); | ||
webSocket._connectionClosed(closeCode, reason); | ||
}, onComplete: (session, task, error) { | ||
if (!readyCompleter.isCompleted) { | ||
// `onWebSocketTaskOpened should have been called and completed | ||
// `readyCompleter`. So either there was a error creating the connection | ||
// or a logic error. | ||
if (error == null) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This could be an assert. You can pass a string message as the second arg: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I left it this way because Let me know if you'd prefer that. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh, right. That makes sense. |
||
throw AssertionError( | ||
'expected an error or "onWebSocketTaskOpened" to be called ' | ||
'first'); | ||
} | ||
readyCompleter.completeError( | ||
ConnectionException('connection ended unexpectedly', error)); | ||
} else { | ||
// There are three possibilities here: | ||
// 1. the peer sent a close Frame, `onWebSocketTaskClosed` was already | ||
// called and `_connectionClosed` is a no-op. | ||
// 2. we sent a close Frame (through `close()`) and `_connectionClosed` | ||
// is a no-op. | ||
// 3. an error occured (e.g. network failure) and `_connectionClosed` | ||
// will signal that and close `event`. | ||
webSocket._connectionClosed( | ||
1006, Data.fromList('abnormal close'.codeUnits)); | ||
} | ||
}); | ||
|
||
session.webSocketTaskWithURL(url, protocols: protocols).resume(); | ||
return readyCompleter.future; | ||
} | ||
|
||
final URLSessionWebSocketTask _task; | ||
final String _protocol; | ||
final _events = StreamController<WebSocketEvent>(); | ||
|
||
CupertinoWebSocket._(this._task, this._protocol) { | ||
_scheduleReceive(); | ||
} | ||
|
||
/// Handle an incoming message from the peer and schedule receiving the next | ||
/// message. | ||
void _handleMessage(URLSessionWebSocketMessage value) { | ||
late WebSocketEvent event; | ||
switch (value.type) { | ||
case URLSessionWebSocketMessageType.urlSessionWebSocketMessageTypeString: | ||
event = TextDataReceived(value.string!); | ||
break; | ||
case URLSessionWebSocketMessageType.urlSessionWebSocketMessageTypeData: | ||
event = BinaryDataReceived(value.data!.bytes); | ||
break; | ||
} | ||
_events.add(event); | ||
_scheduleReceive(); | ||
} | ||
|
||
void _scheduleReceive() { | ||
unawaited(_task | ||
.receiveMessage() | ||
.then(_handleMessage, onError: _closeConnectionWithError)); | ||
} | ||
|
||
/// Close the WebSocket connection due to an error and send the | ||
/// [CloseReceived] event. | ||
void _closeConnectionWithError(Object e) { | ||
if (e is Error) { | ||
if (e.domain == 'NSPOSIXErrorDomain' && e.code == 57) { | ||
// Socket is not connected. | ||
// onWebSocketTaskClosed/onComplete will be invoked and may indicate a | ||
// close code. | ||
return; | ||
} | ||
var (int code, String? reason) = switch ([e.domain, e.code]) { | ||
['NSPOSIXErrorDomain', 100] => (1002, e.localizedDescription), | ||
_ => (1006, e.localizedDescription) | ||
}; | ||
_task.cancel(); | ||
_connectionClosed( | ||
code, reason == null ? null : Data.fromList(reason.codeUnits)); | ||
} else { | ||
throw StateError('unexpected error: $e'); | ||
} | ||
} | ||
|
||
void _connectionClosed(int? closeCode, Data? reason) { | ||
if (!_events.isClosed) { | ||
final closeReason = reason == null ? '' : utf8.decode(reason.bytes); | ||
|
||
_events | ||
..add(CloseReceived(closeCode, closeReason)) | ||
..close(); | ||
} | ||
} | ||
|
||
@override | ||
void sendBytes(Uint8List b) { | ||
if (_events.isClosed) { | ||
throw StateError('WebSocket is closed'); | ||
} | ||
_task | ||
.sendMessage(URLSessionWebSocketMessage.fromData(Data.fromList(b))) | ||
.then((_) => _, onError: _closeConnectionWithError); | ||
} | ||
|
||
@override | ||
void sendText(String s) { | ||
if (_events.isClosed) { | ||
throw StateError('WebSocket is closed'); | ||
} | ||
_task | ||
.sendMessage(URLSessionWebSocketMessage.fromString(s)) | ||
.then((_) => _, onError: _closeConnectionWithError); | ||
} | ||
|
||
@override | ||
Future<void> close([int? code, String? reason]) async { | ||
if (_events.isClosed) { | ||
throw StateError('WebSocket is closed'); | ||
} | ||
|
||
if (code != null) { | ||
RangeError.checkValueInInterval(code, 3000, 4999, 'code'); | ||
} | ||
if (reason != null && utf8.encode(reason).length > 123) { | ||
throw ArgumentError.value(reason, 'reason', | ||
'reason must be <= 123 bytes long when encoded as UTF-8'); | ||
} | ||
|
||
if (!_events.isClosed) { | ||
unawaited(_events.close()); | ||
if (code != null) { | ||
reason = reason ?? ''; | ||
_task.cancelWithCloseCode(code, Data.fromList(reason.codeUnits)); | ||
} else { | ||
_task.cancel(); | ||
} | ||
} | ||
} | ||
|
||
@override | ||
Stream<WebSocketEvent> get events => _events.stream; | ||
|
||
@override | ||
String get protocol => _protocol; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This flow is kinda confusing. Can you add some comments?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added some comments and reordered the callbacks into their expected order.