diff --git a/.pubnub.yml b/.pubnub.yml index 9e826f2c..165a74d4 100644 --- a/.pubnub.yml +++ b/.pubnub.yml @@ -1,5 +1,24 @@ --- changelog: + - + changes: + - + text: "Refactors networking module to allow additional flexibility." + type: feature + - + text: "Adds supervisor module that allows reconnection, retry and other additional, cross module functionalities." + type: feature + - + text: "Adds meta parameter to publish call and makes publish using GET instead of POST." + type: feature + - + text: "Exposes `batch`, `objects` and other APIs from the PubNub class." + type: bug + - + text: "Fixes a typo in BatchHistory where timetoken was returned null." + type: bug + date: Aug 31, 20 + version: v2.0.0 - changes: - @@ -284,4 +303,4 @@ supported-platforms: platforms: - "Dart SDK >=2.6.0 <3.0.0" version: "PubNub Dart SDK" -version: "1.4.4" +version: "2.0.0" diff --git a/CHANGELOG.md b/CHANGELOG.md index 8c4f2e1a..31c12e9e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,14 @@ +## [v2.0.0](https://github.com/pubnub/dart/releases/tag/v2.0.0) +August 31 2020 + +[Full Changelog](https://github.com/pubnub/dart/compare/v1.4.4...v2.0.0) + +- 🌟️ Refactors networking module to allow additional flexibility. +- 🌟️ Adds supervisor module that allows reconnection, retry and other additional, cross module functionalities. +- 🌟️ Adds meta parameter to publish call and makes publish using GET instead of POST. +- πŸ› Exposes `batch`, `objects` and other APIs from the PubNub class. Fixed the following issues reported by [@devopsokdone](https://github.com/devopsokdone): [#11](https://github.com/pubnub/dart/issues/11). +- πŸ› Fixes a typo in BatchHistory where timetoken was returned null. Fixed the following issues reported by [@devopsokdone](https://github.com/devopsokdone): [#13](https://github.com/pubnub/dart/issues/13). + ## [v1.4.4](https://github.com/pubnub/dart/releases/tag/v1.4.4) August 19 2020 diff --git a/analysis_options.yaml b/analysis_options.yaml index e6de7709..39cc08c2 100644 --- a/analysis_options.yaml +++ b/analysis_options.yaml @@ -2,4 +2,4 @@ include: package:pedantic/analysis_options.yaml analyzer: errors: - todo: ignore \ No newline at end of file + # todo: ignore \ No newline at end of file diff --git a/example/example.dart b/example/example.dart index 31b1e7cb..2dc55712 100644 --- a/example/example.dart +++ b/example/example.dart @@ -17,7 +17,7 @@ void main() async { await pubnub.publish('test', {'message': 'My message!'}); // Unsubscribe - await subscription.unsubscribe(); + await subscription.dispose(); // Channel abstraction for easier usage var channel = pubnub.channel('test'); diff --git a/example/logging.dart b/example/logging.dart index a2f7676a..1c232eef 100644 --- a/example/logging.dart +++ b/example/logging.dart @@ -3,10 +3,10 @@ import 'package:pubnub/pubnub.dart'; // This is the same example as the `example.dart` file, but shows how to enable extra logging void main() async { // Create a root logger - var logger = StreamLogger.root('root', logLevel: Level.warning); + var logger = StreamLogger.root('root', logLevel: Level.all); // Subscribe to messages with a default printer - logger.stream.listen( + var sub = logger.stream.listen( LogRecord.createPrinter(r'[$time] (${level.name}) $scope: $message')); var pubnub = PubNub( @@ -17,13 +17,11 @@ void main() async { var _ = await provideLogger(logger, () async { var subscription = await pubnub.subscribe(channels: {'test'}); - subscription.messages.take(1).listen((message) { - print(message); - }); + await pubnub.publish('test', {'message': 'My message'}); - await pubnub.publish('test', {'message': 'My message!'}); + print(await subscription.messages.first); - await subscription.unsubscribe(); + await subscription.dispose(); }); // You can change the log level as well! @@ -42,4 +40,8 @@ void main() async { print('Messages on test channel: $count'); }); + + await sub.cancel(); + await logger.dispose(); + print('disposed!'); } diff --git a/example/supervisor.dart b/example/supervisor.dart new file mode 100644 index 00000000..52d404db --- /dev/null +++ b/example/supervisor.dart @@ -0,0 +1,50 @@ +import 'dart:io'; + +import 'package:pubnub/pubnub.dart'; +import 'package:pubnub/networking.dart'; + +void main() async { + var logger = StreamLogger.root('root', logLevel: Level.warning); + + // Subscribe to messages with a default printer + logger.stream.listen( + LogRecord.createPrinter(r'[$time] (${level.name}) $scope $message')); + + await provideLogger(logger, () async { + // Create PubNub instance with default keyset. + var pubnub = PubNub( + networking: NetworkingModule( + retryPolicy: RetryPolicy.exponential(maxRetries: 10)), + defaultKeyset: + Keyset(subscribeKey: 'demo', publishKey: 'demo', uuid: UUID('demo')), + ); + + print( + 'Network reconnection test. Please wait few seconds for further instructions...'); + + var sub = await pubnub.subscribe(channels: {'test2'}); + + await Future.delayed(Duration(seconds: 5)); + + print('Subscribed. Disconnect your network for few seconds.'); + + await Future.delayed(Duration(seconds: 5)); + + var f = pubnub.publish('test2', {'myMessage': 'it works!'}); + + print( + 'Now reconnect your network again! If everything goes well, you should see the message. You will see few diagnostic log lines in the meantime.'); + + await f; + + var message = await sub.messages.first; + + print(message.payload); + + await sub.dispose(); + + print('Done!'); + + exit(0); + }); +} diff --git a/lib/networking.dart b/lib/networking.dart new file mode 100644 index 00000000..99677154 --- /dev/null +++ b/lib/networking.dart @@ -0,0 +1,2 @@ +export 'src/net/net.dart'; +export 'src/net/meta/meta.dart'; diff --git a/lib/pubnub.dart b/lib/pubnub.dart index 5ac8ac6f..e5508102 100644 --- a/lib/pubnub.dart +++ b/lib/pubnub.dart @@ -1,18 +1,25 @@ /// PubNub is an SDK that allows you to communicate with /// PubNub Data Streaming Network in a fast and easy manner. +/// +/// The best starting point to take a look around is the [PubNub] class that combines all available features. library pubnub; export './src/core/core.dart'; export './src/dx/_utils/ensure.dart' show InvariantException; -export './src/crypto/crypto.dart' show PubNubCryptoModule, CryptoConfiguration; +export './src/crypto/crypto.dart' show CryptoModule, CryptoConfiguration; + export './src/crypto/encryption_mode.dart' show EncryptionMode, EncryptionModeExtension; +export './src/dx/files/files.dart' show FileDx; +export './src/dx/batch/batch.dart' show BatchDx; +export './src/dx/objects/objects.dart' show ObjectsDx; +export './src/dx/channel/channel_group.dart' show ChannelGroup, ChannelGroupDx; + export './src/dx/channel/channel.dart' show Channel; -export './src/dx/channel/channel_group.dart' show ChannelGroup; export './src/dx/push/push.dart' show Device; export './src/dx/_endpoints/publish.dart' show PublishResult; -export './src/dx/_endpoints/file.dart' +export 'src/dx/_endpoints/files.dart' show PublishFileMessageResult, ListFilesResult, @@ -51,11 +58,13 @@ export './src/dx/_endpoints/message_action.dart' DeleteMessageActionResult; export './src/dx/subscribe/subscription.dart' show Subscription; +export './src/dx/subscribe/extensions/keyset.dart' + show SubscribeKeysetExtension, PresenceKeysetExtension; export './src/dx/subscribe/envelope.dart' show Envelope, PresenceAction, PresenceEvent, MessageType; export './src/dx/channel/channel_history.dart' show PaginatedChannelHistory, ChannelHistory; -export './src/dx/file/file.dart'; +export 'src/dx/files/files.dart' show FileInfo, FileMessage; export './src/dx/channel/message.dart' show Message; export './src/dx/_endpoints/objects/objects_types.dart'; diff --git a/lib/src/core/core.dart b/lib/src/core/core.dart index 250a87b1..ab7806bc 100644 --- a/lib/src/core/core.dart +++ b/lib/src/core/core.dart @@ -1,14 +1,16 @@ import 'package:meta/meta.dart'; import 'net/net.dart'; -import 'parse.dart'; +import 'parser.dart'; import 'crypto/crypto.dart'; +import 'supervisor/supervisor.dart'; import 'keyset.dart'; export 'net/net.dart'; -export 'parse.dart'; +export 'parser.dart'; export 'logging/logging.dart'; export 'crypto/crypto.dart'; +export 'supervisor/supervisor.dart'; export 'keyset.dart'; export 'endpoint.dart'; export 'uuid.dart'; @@ -19,19 +21,24 @@ class Core { /// Allows to have multiple [Keyset] associated with one instance of [PubNub]. KeysetStore keysets = KeysetStore(); - NetworkModule networking; - ParserModule parser; - CryptoModule crypto; + INetworkingModule networking; + IParserModule parser; + ICryptoModule crypto; + SupervisorModule supervisor = SupervisorModule(); - static String version = '1.4.4'; + static String version = '2.0.0'; Core( {Keyset defaultKeyset, @required this.networking, @required this.parser, - this.crypto}) { + @required this.crypto}) { if (defaultKeyset != null) { keysets.add(defaultKeyset, name: 'default', useAsDefault: true); } + + networking.register(this); + parser.register(this); + crypto.register(this); } } diff --git a/lib/src/core/crypto/crypto.dart b/lib/src/core/crypto/crypto.dart index c3285448..63dcaa55 100644 --- a/lib/src/core/crypto/crypto.dart +++ b/lib/src/core/crypto/crypto.dart @@ -1,3 +1,4 @@ +import '../core.dart'; import '../exceptions.dart'; import 'cipher_key.dart'; @@ -7,7 +8,9 @@ class CryptoException extends PubNubException { CryptoException([String message]) : super(message); } -abstract class CryptoModule { +abstract class ICryptoModule { + void register(Core core); + String encrypt(CipherKey key, String input); dynamic decrypt(CipherKey key, String input); diff --git a/lib/src/core/exceptions.dart b/lib/src/core/exceptions.dart index a5d63cb7..09b287d5 100644 --- a/lib/src/core/exceptions.dart +++ b/lib/src/core/exceptions.dart @@ -49,3 +49,9 @@ class NotImplementedException extends PubNubException { class PublishException extends PubNubException { PublishException(String message) : super(message); } + +class MaximumRetriesException extends PubNubException { + static final String _message = 'Maximum number of retries has been reached.'; + + MaximumRetriesException() : super(_message); +} diff --git a/lib/src/core/keyset.dart b/lib/src/core/keyset.dart index 0ffd2fe2..8817b7c4 100644 --- a/lib/src/core/keyset.dart +++ b/lib/src/core/keyset.dart @@ -93,6 +93,12 @@ class KeysetStore { } } } + + Keyset obtain(Keyset keyset, String using) { + keyset ??= get(using, defaultIfNameIsNull: true); + + return keyset; + } } /// Represents a configuration for a given subscribe key. diff --git a/lib/src/core/logging/logging.dart b/lib/src/core/logging/logging.dart index 67dba1dd..07688922 100644 --- a/lib/src/core/logging/logging.dart +++ b/lib/src/core/logging/logging.dart @@ -5,9 +5,9 @@ import 'dummy_logger.dart'; final _pubnubLoggerModuleKey = #pubnub.logging; /// Provides a [logger] to the code inside [body]. -Future provideLogger(ILogger logger, R Function() body) async { - var result = - await runZoned(body, zoneValues: {_pubnubLoggerModuleKey: logger}); +Future provideLogger(ILogger logger, Future Function() body) async { + var result = await runZoned>(body, + zoneValues: {_pubnubLoggerModuleKey: logger}); return result; } diff --git a/lib/src/net/exceptions.dart b/lib/src/core/net/exceptions.dart similarity index 62% rename from lib/src/net/exceptions.dart rename to lib/src/core/net/exceptions.dart index b2b40dc3..8ba1d4ac 100644 --- a/lib/src/net/exceptions.dart +++ b/lib/src/core/net/exceptions.dart @@ -1,21 +1,28 @@ -import 'package:pubnub/src/core/exceptions.dart'; +import 'package:pubnub/pubnub.dart'; class PubNubRequestTimeoutException extends PubNubException { - PubNubRequestTimeoutException() : super('request timed out'); + dynamic additionalData; + + PubNubRequestTimeoutException([this.additionalData]) + : super('request timed out'); } class PubNubRequestCancelException extends PubNubException { dynamic additionalData; + PubNubRequestCancelException([this.additionalData]) : super('request cancelled'); } class PubNubRequestOtherException extends PubNubException { dynamic additionalData; + PubNubRequestOtherException([this.additionalData]) : super('request failed'); } class PubNubRequestFailureException extends PubNubException { - dynamic responseData; - PubNubRequestFailureException(this.responseData) : super('request failed'); + IResponse response; + + PubNubRequestFailureException(this.response) + : super('request returned non-success status code'); } diff --git a/lib/src/core/net/net.dart b/lib/src/core/net/net.dart index 1af516ea..5667e707 100644 --- a/lib/src/core/net/net.dart +++ b/lib/src/core/net/net.dart @@ -1,10 +1,19 @@ -import 'request.dart'; +import '../core.dart'; +import 'request_handler.dart'; -export 'request.dart' show Request, RequestHandler; +export 'request.dart' show Request; export 'request_type.dart' show RequestType, RequestTypeExtension; +export 'request_handler.dart' show IRequestHandler; +export 'response.dart' show IResponse; +export 'exceptions.dart' + show + PubNubRequestCancelException, + PubNubRequestFailureException, + PubNubRequestOtherException, + PubNubRequestTimeoutException; -abstract class NetworkModule { - Future handle(Request request); +abstract class INetworkingModule { + void register(Core core); - Future handleCustomRequest(Request request); + Future handler(); } diff --git a/lib/src/core/net/request.dart b/lib/src/core/net/request.dart index c899f30a..acd6fd02 100644 --- a/lib/src/core/net/request.dart +++ b/lib/src/core/net/request.dart @@ -1,60 +1,22 @@ -import '../core.dart'; import 'request_type.dart'; -typedef SignFunction = String Function( - RequestType type, - List pathSegments, - Map queryParameters, - Map headers, - String body); - class Request { - static Map defaultQueryParameters = { - 'pnsdk': 'PubNub-Dart/${Core.version}' - }; - static final Map defaultHeaders = { - 'Content-Type': 'application/json' - }; - RequestType type; - List pathSegments; - - Map queryParameters; + Uri uri; Map headers; dynamic body; - Uri url; - - Request(this.type, this.pathSegments, - {Map queryParameters, - Map headers, - dynamic body, - SignFunction signWith, - this.url}) { - pathSegments = pathSegments; - this.queryParameters = { - ...(queryParameters ?? {}), - ...defaultQueryParameters - }; - this.headers = {...(headers ?? {}), ...defaultHeaders}; - this.body = body; - if (signWith != null) { - this.queryParameters['signature'] = signWith( - type, pathSegments, this.queryParameters, this.headers, this.body); - } - } + Request.get({this.uri, this.headers, this.body}) : type = RequestType.get; + Request.post({this.uri, this.headers, this.body}) : type = RequestType.post; + Request.patch({this.uri, this.headers, this.body}) : type = RequestType.patch; + Request.delete({this.uri, this.headers, this.body}) + : type = RequestType.delete; + Request.subscribe({this.uri, this.headers, this.body}) + : type = RequestType.subscribe; + Request.file({this.uri, this.headers, this.body}) : type = RequestType.file; @override String toString() { - return '${type.method} - $pathSegments?$queryParameters'; + return 'Request { [${type}] ${uri} }'; } } - -abstract class RequestHandler { - Future text(); - Future>> headers(); - Future response(); - - bool isCancelled; - void cancel([dynamic reason]); -} diff --git a/lib/src/core/net/request_handler.dart b/lib/src/core/net/request_handler.dart new file mode 100644 index 00000000..2b8d3f7c --- /dev/null +++ b/lib/src/core/net/request_handler.dart @@ -0,0 +1,32 @@ +import '../core.dart'; +import 'request.dart'; +import 'response.dart'; + +abstract class IRequestHandler { + static final Uri defaultUri = Uri( + scheme: 'https', + host: 'ps.pndsn.com', + queryParameters: {'pnsdk': 'PubNub-Dart/${Core.version}'}); + + static final Map defaultHeaders = { + 'Content-Type': 'application/json' + }; + + Future response(Request request); + + Uri prepareUri(Uri requestUri) { + return defaultUri.replace( + scheme: requestUri.hasScheme ? requestUri.scheme : defaultUri.scheme, + host: requestUri.host != '' ? requestUri.host : null, + path: requestUri.path != '' ? requestUri.path : null, + port: requestUri.hasPort ? requestUri.port : defaultUri.port, + queryParameters: { + ...defaultUri.queryParameters, + ...requestUri.queryParameters + }, + userInfo: requestUri.userInfo); + } + + bool isCancelled; + void cancel([dynamic reason]); +} diff --git a/lib/src/core/net/request_type.dart b/lib/src/core/net/request_type.dart index 9a32d871..b49363c8 100644 --- a/lib/src/core/net/request_type.dart +++ b/lib/src/core/net/request_type.dart @@ -1,4 +1,31 @@ -enum RequestType { get, post, patch, subscribe, delete } +enum RequestType { get, post, patch, subscribe, delete, file } + +const _sendTimeoutRequestDefault = { + RequestType.get: 10000, + RequestType.post: 10000, + RequestType.delete: 10000, + RequestType.patch: 10000, + RequestType.subscribe: 300000, + RequestType.file: 30000, +}; + +const _receiveTimeoutRequestDefault = { + RequestType.get: 10000, + RequestType.post: 10000, + RequestType.delete: 10000, + RequestType.patch: 10000, + RequestType.subscribe: 300000, + RequestType.file: 30000, +}; + +const _connectTimeoutRequestDefault = { + RequestType.get: 10000, + RequestType.post: 10000, + RequestType.delete: 10000, + RequestType.patch: 10000, + RequestType.subscribe: 300000, + RequestType.file: 30000, +}; extension RequestTypeExtension on RequestType { static const methods = { @@ -6,8 +33,13 @@ extension RequestTypeExtension on RequestType { RequestType.post: 'POST', RequestType.patch: 'PATCH', RequestType.subscribe: 'GET', - RequestType.delete: 'DELETE' + RequestType.delete: 'DELETE', + RequestType.file: 'POST', }; String get method => methods[this]; + + int get sendTimeout => _sendTimeoutRequestDefault[this]; + int get receiveTimeout => _receiveTimeoutRequestDefault[this]; + int get connectTimeout => _connectTimeoutRequestDefault[this]; } diff --git a/lib/src/core/net/response.dart b/lib/src/core/net/response.dart new file mode 100644 index 00000000..8746ff59 --- /dev/null +++ b/lib/src/core/net/response.dart @@ -0,0 +1,7 @@ +abstract class IResponse { + int get statusCode; + Map> get headers; + + String get text; + List get byteList; +} diff --git a/lib/src/core/parse.dart b/lib/src/core/parse.dart deleted file mode 100644 index 5bb262b1..00000000 --- a/lib/src/core/parse.dart +++ /dev/null @@ -1,10 +0,0 @@ -import 'exceptions.dart'; - -class ParserException extends PubNubException { - ParserException([String message]) : super(message); -} - -abstract class ParserModule { - Future decode(String input); - Future encode(dynamic input); -} diff --git a/lib/src/core/parser.dart b/lib/src/core/parser.dart new file mode 100644 index 00000000..6ec75f55 --- /dev/null +++ b/lib/src/core/parser.dart @@ -0,0 +1,15 @@ +import 'core.dart'; +import 'exceptions.dart'; + +class ParserException extends PubNubException { + dynamic originalException; + + ParserException([String message, this.originalException]) : super(message); +} + +abstract class IParserModule { + void register(Core core); + + Future decode(String input, {String type}); + Future encode(T input, {String type}); +} diff --git a/lib/src/core/supervisor/event.dart b/lib/src/core/supervisor/event.dart new file mode 100644 index 00000000..c979bbb8 --- /dev/null +++ b/lib/src/core/supervisor/event.dart @@ -0,0 +1,5 @@ +abstract class SupervisorEvent {} + +class NetworkIsDownEvent extends SupervisorEvent {} + +class NetworkIsUpEvent extends SupervisorEvent {} diff --git a/lib/src/core/supervisor/fiber.dart b/lib/src/core/supervisor/fiber.dart new file mode 100644 index 00000000..42307252 --- /dev/null +++ b/lib/src/core/supervisor/fiber.dart @@ -0,0 +1,86 @@ +import 'dart:async'; + +import 'package:meta/meta.dart'; +import 'package:pedantic/pedantic.dart'; + +import '../core.dart'; + +final _logger = injectLogger('pubnub.core.fiber'); + +abstract class Resolution { + const Resolution(); + + factory Resolution.fail() => FailResolution(); + factory Resolution.delay(Duration delay) => DelayResolution(delay); + factory Resolution.retry() => RetryResolution(); + factory Resolution.networkStatus(bool isUp) => NetworkStatusResolution(isUp); +} + +class FailResolution extends Resolution {} + +class DelayResolution extends Resolution { + final Duration delay; + + const DelayResolution(this.delay); +} + +class RetryResolution extends Resolution {} + +class NetworkStatusResolution extends Resolution { + final bool isUp; + + const NetworkStatusResolution(this.isUp); +} + +typedef FiberAction = Future Function(); + +class Fiber { + static int _id = 0; + + final int id; + final Core _core; + final FiberAction action; + + final _completer = Completer(); + Future get future => _completer.future; + + int tries = 0; + + Fiber(this._core, {@required this.action}) : id = _id++; + + Future run() async { + tries += 1; + + try { + var result = await action(); + + _completer.complete(result); + + _core.supervisor.notify(NetworkIsUpEvent()); + } catch (exception, stackTrace) { + _logger.warning('An exception has occured while running a fiber.'); + var diagnostic = _core.supervisor.runDiagnostics(this, exception); + + if (diagnostic == null) { + return _completer.completeError(exception, stackTrace); + } + + _logger.silly('Possible reason found: $diagnostic'); + + var resolutions = _core.supervisor.runStrategies(this, diagnostic); + + for (var resolution in resolutions) { + if (resolution is FailResolution) { + _completer.completeError(exception); + } else if (resolution is DelayResolution) { + await Future.delayed(resolution.delay); + } else if (resolution is RetryResolution) { + unawaited(Future.microtask(run)); + } else if (resolution is NetworkStatusResolution) { + _core.supervisor.notify( + resolution.isUp ? NetworkIsUpEvent() : NetworkIsDownEvent()); + } + } + } + } +} diff --git a/lib/src/core/supervisor/supervisor.dart b/lib/src/core/supervisor/supervisor.dart new file mode 100644 index 00000000..536c9a0a --- /dev/null +++ b/lib/src/core/supervisor/supervisor.dart @@ -0,0 +1,64 @@ +import 'dart:async'; + +import '../core.dart'; +import 'event.dart'; +import 'fiber.dart'; + +export 'event.dart'; +export 'fiber.dart'; + +final _logger = injectLogger('pubnub.core.supervisor'); + +abstract class Diagnostic { + const Diagnostic(); +} + +abstract class Strategy { + List resolve(Fiber fiber, Diagnostic diagnostic); +} + +typedef DiagnosticHandler = Diagnostic Function(dynamic exception); + +class SupervisorModule { + final Set _handlers = {}; + final Set _strategies = {}; + + final StreamController _events = + StreamController.broadcast(); + + bool _isNetworkUp = true; + + Stream get events => _events.stream; + + void registerDiagnostic(DiagnosticHandler handler) { + _handlers.add(handler); + } + + void registerStrategy(Strategy strategy) { + _strategies.add(strategy); + } + + void notify(SupervisorEvent event) { + if (_isNetworkUp && event is NetworkIsDownEvent) { + _isNetworkUp = false; + _logger.warning('Detected that network is down.'); + _events.add(event); + } else if (_isNetworkUp == false && event is NetworkIsUpEvent) { + _isNetworkUp = true; + _logger.warning('Detected that network is up.'); + _events.add(event); + } + } + + Diagnostic runDiagnostics(Fiber fiber, Exception exception) { + return _handlers + .map((handler) => handler(exception)) + .firstWhere((diagnostic) => diagnostic != null, orElse: () => null); + } + + List runStrategies(Fiber fiber, Diagnostic diagnostic) { + return _strategies + .map((strategy) => strategy.resolve(fiber, diagnostic)) + .firstWhere((resolutions) => resolutions != null, orElse: () => []); + } +} diff --git a/lib/src/crypto/crypto.dart b/lib/src/crypto/crypto.dart index f5291f00..2372880b 100644 --- a/lib/src/crypto/crypto.dart +++ b/lib/src/crypto/crypto.dart @@ -1,6 +1,6 @@ import 'package:encrypt/encrypt.dart' as crypto; import 'package:crypto/crypto.dart' show sha256; -import 'dart:convert' show json, base64, utf8; +import 'dart:convert' show base64, utf8; import 'dart:typed_data' show Uint8List; import '../core/core.dart'; @@ -17,10 +17,10 @@ class CryptoConfiguration { this.useRandomInitializationVector = false}); } -class PubNubCryptoModule implements CryptoModule { +class CryptoModule implements ICryptoModule { final CryptoConfiguration defaultConfiguration; - PubNubCryptoModule({this.defaultConfiguration = const CryptoConfiguration()}); + CryptoModule({this.defaultConfiguration = const CryptoConfiguration()}); crypto.Key _getKey(CipherKey cipherKey, CryptoConfiguration configuration) { if (configuration.encryptKey) { @@ -105,4 +105,7 @@ class PubNubCryptoModule implements CryptoModule { throw CryptoException('Error while encrypting file data \n${e.message}'); } } + + @override + void register(Core core) {} } diff --git a/lib/src/default.dart b/lib/src/default.dart index f389a173..3207be89 100644 --- a/lib/src/default.dart +++ b/lib/src/default.dart @@ -15,8 +15,7 @@ import 'dx/message_action/message_action.dart'; import 'dx/pam/pam.dart'; import 'dx/push/push.dart'; import 'dx/presence/presence.dart'; -import 'dx/file/file.dart'; -import 'dx/file/fileManager.dart'; +import 'dx/files/files.dart'; import 'dx/objects/objects_types.dart'; import 'dx/objects/objects.dart'; @@ -66,18 +65,18 @@ class PubNub extends Core PubNub( {Keyset defaultKeyset, - NetworkModule networking, - ParserModule parser, - CryptoModule crypto}) + INetworkingModule networking, + IParserModule parser, + ICryptoModule crypto}) : super( defaultKeyset: defaultKeyset, - networking: networking ?? PubNubNetworkingModule(), - parser: parser ?? PubNubParserModule(), - crypto: crypto ?? PubNubCryptoModule()) { + networking: networking ?? NetworkingModule(), + parser: parser ?? ParserModule(), + crypto: crypto ?? CryptoModule()) { batch = BatchDx(this); channelGroups = ChannelGroupDx(this); objects = ObjectsDx(this); - files = FileDx(this, PubNubFileManager()); + files = FileDx(this); } /// Returns a representation of a channel. diff --git a/lib/src/dx/_endpoints/channel_group.dart b/lib/src/dx/_endpoints/channel_group.dart index 4d3550cf..8807d116 100644 --- a/lib/src/dx/_endpoints/channel_group.dart +++ b/lib/src/dx/_endpoints/channel_group.dart @@ -20,8 +20,8 @@ class ChannelGroupListChannelsParams extends Parameters { var queryParameters = {'auth': keyset.authKey, 'uuid': keyset.uuid.value}; - return Request(RequestType.get, pathSegments, - queryParameters: queryParameters); + return Request.get( + uri: Uri(pathSegments: pathSegments, queryParameters: queryParameters)); } } @@ -65,8 +65,8 @@ class ChannelGroupChangeChannelsParams extends Parameters { if (remove != null) 'remove': remove.join(',') }; - return Request(RequestType.get, pathSegments, - queryParameters: queryParameters); + return Request.get( + uri: Uri(pathSegments: pathSegments, queryParameters: queryParameters)); } } @@ -94,8 +94,8 @@ class ChannelGroupDeleteParams extends Parameters { var queryParameters = {'auth': keyset.authKey, 'uuid': keyset.uuid.value}; - return Request(RequestType.get, pathSegments, - queryParameters: queryParameters); + return Request.get( + uri: Uri(pathSegments: pathSegments, queryParameters: queryParameters)); } } diff --git a/lib/src/dx/_endpoints/file.dart b/lib/src/dx/_endpoints/files.dart similarity index 78% rename from lib/src/dx/_endpoints/file.dart rename to lib/src/dx/_endpoints/files.dart index 64b084f9..ba50b182 100644 --- a/lib/src/dx/_endpoints/file.dart +++ b/lib/src/dx/_endpoints/files.dart @@ -5,9 +5,9 @@ typedef decryptFunction = List Function(CipherKey key, List data); class GenerateFileUploadUrlParams extends Parameters { Keyset keyset; String channel; - String fileName; + String payload; - GenerateFileUploadUrlParams(this.keyset, this.channel, this.fileName); + GenerateFileUploadUrlParams(this.keyset, this.channel, this.payload); @override Request toRequest() { @@ -19,40 +19,42 @@ class GenerateFileUploadUrlParams extends Parameters { channel, 'generate-upload-url' ]; - return Request(RequestType.post, pathSegments, body: fileName); + return Request.post(uri: Uri(pathSegments: pathSegments), body: payload); } } -class GenerateFileUploadUrlBody { - String fileName; - - GenerateFileUploadUrlBody(this.fileName); - - Map toJson() => {'name': fileName}; -} - class GenerateFileUploadUrlResult extends Result { - Map data; + String fileId; + String fileName; - Map fileUploadRequest; + Uri uploadUri; + Map formFields; GenerateFileUploadUrlResult._(); factory GenerateFileUploadUrlResult.fromJson(dynamic object) => GenerateFileUploadUrlResult._() - ..data = object['data'] - ..fileUploadRequest = object['file_upload_request']; + ..fileId = object['data']['id'] + ..fileName = object['data']['name'] + ..uploadUri = Uri.parse(object['file_upload_request']['url']) + ..formFields = + (object['file_upload_request']['form_fields'] as List) + .fold({}, (previousValue, element) { + previousValue[element['name']] = element['value']; + return previousValue; + }); } class FileUploadParams extends Parameters { Uri requestUrl; - dynamic formData; + + Map formData; FileUploadParams(this.requestUrl, this.formData); @override Request toRequest() { - return Request(RequestType.post, [], body: formData, url: requestUrl); + return Request.file(uri: requestUrl, body: formData); } } @@ -98,8 +100,8 @@ class PublishFileMessageParams extends Parameters { if (ttl != null) 'ttl': ttl.toString(), if (meta != null) 'meta': meta }; - return Request(RequestType.get, pathSegments, - queryParameters: queryParameters); + return Request.get( + uri: Uri(pathSegments: pathSegments, queryParameters: queryParameters)); } } @@ -120,13 +122,13 @@ class PublishFileMessageResult extends Result { } class DownloadFileParams extends Parameters { - Uri url; + Uri uri; - DownloadFileParams(this.url); + DownloadFileParams(this.uri); @override Request toRequest() { - return Request(RequestType.get, [], url: url); + return Request.get(uri: uri); } } @@ -139,9 +141,10 @@ class DownloadFileResult extends Result { {CipherKey cipherKey, Function decryptFunction}) { if (cipherKey != null) { return DownloadFileResult._() - ..fileContent = decryptFunction(cipherKey, object.data as List); + ..fileContent = + decryptFunction(cipherKey, object.byteList as List); } - return DownloadFileResult._()..fileContent = object.data; + return DownloadFileResult._()..fileContent = object.byteList; } } @@ -170,8 +173,8 @@ class ListFilesParams extends Parameters { if (next != null) 'next': next }; - return Request(RequestType.get, pathSegments, - queryParameters: queryParameters); + return Request.get( + uri: Uri(pathSegments: pathSegments, queryParameters: queryParameters)); } } @@ -234,7 +237,7 @@ class DeleteFileParams extends Parameters { fileId, fileName ]; - return Request(RequestType.delete, pathSegments); + return Request.delete(uri: Uri(pathSegments: pathSegments)); } } diff --git a/lib/src/dx/_endpoints/history.dart b/lib/src/dx/_endpoints/history.dart index 1c3fde3e..678ee042 100644 --- a/lib/src/dx/_endpoints/history.dart +++ b/lib/src/dx/_endpoints/history.dart @@ -46,8 +46,8 @@ class FetchHistoryParams extends Parameters { if (keyset.uuid != null) 'uuid': '${keyset.uuid}' }; - return Request(RequestType.get, pathSegments, - queryParameters: queryParameters); + return Request.get( + uri: Uri(pathSegments: pathSegments, queryParameters: queryParameters)); } } @@ -119,8 +119,8 @@ class BatchHistoryParams extends Parameters { if (keyset.uuid != null) 'uuid': '${keyset.uuid}' }; - return Request(RequestType.get, pathSegments, - queryParameters: queryParameters); + return Request.get( + uri: Uri(pathSegments: pathSegments, queryParameters: queryParameters)); } } @@ -136,7 +136,7 @@ class BatchHistoryResultEntry { factory BatchHistoryResultEntry.fromJson(Map object, {CipherKey cipherKey, Function decryptFunction}) { return BatchHistoryResultEntry._() - ..timetoken = Timetoken(object['timestamp'] as int) + ..timetoken = Timetoken(int.tryParse(object['timetoken'])) ..uuid = object['uuid'] ..messageType = (object['message_type'] is int) ? fromInt(object['message_type']) @@ -218,8 +218,8 @@ class CountMessagesParams extends Parameters { 'timetoken': '$timetoken', }; - return Request(RequestType.get, pathSegments, - queryParameters: queryParameters); + return Request.get( + uri: Uri(pathSegments: pathSegments, queryParameters: queryParameters)); } } @@ -262,8 +262,8 @@ class DeleteMessagesParams extends Parameters { if (end != null) 'end': '$end', }; - return Request(RequestType.delete, pathSegments, - queryParameters: queryParameters); + return Request.delete( + uri: Uri(pathSegments: pathSegments, queryParameters: queryParameters)); } } diff --git a/lib/src/dx/_endpoints/message_action.dart b/lib/src/dx/_endpoints/message_action.dart index d2880a22..fadd3cd7 100644 --- a/lib/src/dx/_endpoints/message_action.dart +++ b/lib/src/dx/_endpoints/message_action.dart @@ -28,8 +28,8 @@ class FetchMessageActionsParams extends Parameters { if (keyset.authKey != null) 'auth': '${keyset.authKey}', }; - return Request(RequestType.get, pathSegments, - queryParameters: queryParameters); + return Request.get( + uri: Uri(pathSegments: pathSegments, queryParameters: queryParameters)); } } @@ -125,8 +125,9 @@ class AddMessageActionParams extends Parameters { if (keyset.uuid != null) 'uuid': '${keyset.uuid}' }; - return Request(RequestType.post, pathSegments, - queryParameters: queryParameters, body: messageAction); + return Request.post( + uri: Uri(pathSegments: pathSegments, queryParameters: queryParameters), + body: messageAction); } } @@ -179,8 +180,8 @@ class DeleteMessageActionParams extends Parameters { if (keyset.uuid != null) 'uuid': '${keyset.uuid}' }; - return Request(RequestType.delete, pathSegments, - queryParameters: queryParameters); + return Request.delete( + uri: Uri(pathSegments: pathSegments, queryParameters: queryParameters)); } } diff --git a/lib/src/dx/_endpoints/objects/channel_metadata.dart b/lib/src/dx/_endpoints/objects/channel_metadata.dart index cf0d5316..8a6db9b2 100644 --- a/lib/src/dx/_endpoints/objects/channel_metadata.dart +++ b/lib/src/dx/_endpoints/objects/channel_metadata.dart @@ -33,8 +33,8 @@ class GetAllChannelMetadataParams extends Parameters { if (sort != null && sort.isNotEmpty) 'sort': sort.join(','), if (keyset.authKey != null) 'auth': '${keyset.authKey}' }; - return Request(RequestType.get, pathSegments, - queryParameters: queryParameters); + return Request.get( + uri: Uri(pathSegments: pathSegments, queryParameters: queryParameters)); } } @@ -110,8 +110,8 @@ class GetChannelMetadataParams extends Parameters { if (keyset.authKey != null) 'auth': keyset.authKey }; - return Request(RequestType.get, pathSegments, - queryParameters: queryParameters); + return Request.get( + uri: Uri(pathSegments: pathSegments, queryParameters: queryParameters)); } } @@ -152,8 +152,11 @@ class SetChannelMetadataParams extends Parameters { if (keyset.authKey != null) 'auth': keyset.authKey, }; - return Request(RequestType.patch, pathSegments, - queryParameters: queryParameters.isNotEmpty ? queryParameters : null, + return Request.patch( + uri: Uri( + pathSegments: pathSegments, + queryParameters: + queryParameters.isNotEmpty ? queryParameters : null), body: channelMetadata); } } @@ -189,8 +192,8 @@ class RemoveChannelMetadataParams extends Parameters { if (keyset.authKey != null) 'auth': keyset.authKey, }; - return Request(RequestType.delete, pathSegments, - queryParameters: queryParameters); + return Request.delete( + uri: Uri(pathSegments: pathSegments, queryParameters: queryParameters)); } } diff --git a/lib/src/dx/_endpoints/objects/membership_metadata.dart b/lib/src/dx/_endpoints/objects/membership_metadata.dart index 475ee396..3eb65fc9 100644 --- a/lib/src/dx/_endpoints/objects/membership_metadata.dart +++ b/lib/src/dx/_endpoints/objects/membership_metadata.dart @@ -48,8 +48,8 @@ class GetMembershipsMetadataParams extends Parameters { if (sort != null && sort.isNotEmpty) 'sort': sort.join(',') }; - return Request(RequestType.get, pathSegments, - queryParameters: queryParameters); + return Request.get( + uri: Uri(pathSegments: pathSegments, queryParameters: queryParameters)); } } @@ -140,8 +140,9 @@ class ManageMembershipsParams extends Parameters { if (sort != null && sort.isNotEmpty) 'sort': sort.join(',') }; - return Request(RequestType.patch, pathSegments, - queryParameters: queryParameters, body: membershipMetadata); + return Request.patch( + uri: Uri(pathSegments: pathSegments, queryParameters: queryParameters), + body: membershipMetadata); } } @@ -188,11 +189,11 @@ class GetChannelMembersParams extends Parameters { if (sort != null && sort.isNotEmpty) 'sort': sort.join(',') }; - return Request( - RequestType.get, - pathSegments, + return Request.get( + uri: Uri( + pathSegments: pathSegments, queryParameters: queryParameters, - ); + )); } } @@ -283,10 +284,8 @@ class ManageChannelMembersParams extends Parameters { if (filter != null && filter.isNotEmpty) 'filter': filter, if (sort != null && sort.isNotEmpty) 'sort': sort.join(',') }; - return Request( - RequestType.patch, - pathSegments, - queryParameters: queryParameters, + return Request.patch( + uri: Uri(pathSegments: pathSegments, queryParameters: queryParameters), body: membersMetadata, ); } diff --git a/lib/src/dx/_endpoints/objects/uuid_metadata.dart b/lib/src/dx/_endpoints/objects/uuid_metadata.dart index 37a06e98..989be2a7 100644 --- a/lib/src/dx/_endpoints/objects/uuid_metadata.dart +++ b/lib/src/dx/_endpoints/objects/uuid_metadata.dart @@ -32,8 +32,8 @@ class GetAllUuidMetadataParams extends Parameters { if (sort != null && sort.isNotEmpty) 'sort': sort.join(','), if (keyset.authKey != null) 'auth': '${keyset.authKey}' }; - return Request(RequestType.get, pathSegments, - queryParameters: queryParameters); + return Request.get( + uri: Uri(pathSegments: pathSegments, queryParameters: queryParameters)); } } @@ -113,8 +113,8 @@ class GetUuidMetadataParams extends Parameters { if (include != null && include.isNotEmpty) 'include': include.join(','), if (keyset.authKey != null) 'auth': '${keyset.authKey}' }; - return Request(RequestType.get, pathSegments, - queryParameters: queryParameters); + return Request.get( + uri: Uri(pathSegments: pathSegments, queryParameters: queryParameters)); } } @@ -153,8 +153,9 @@ class SetUuidMetadataParams extends Parameters { if (include != null && include.isNotEmpty) 'include': include.join(','), if (keyset.authKey != null) 'auth': keyset.authKey }; - return Request(RequestType.patch, pathSegments, - queryParameters: queryParameters, body: uuidMetadata); + return Request.patch( + uri: Uri(pathSegments: pathSegments, queryParameters: queryParameters), + body: uuidMetadata); } } @@ -186,8 +187,8 @@ class RemoveUuidMetadataParams extends Parameters { uuid ?? '${keyset.uuid}' ]; var queryParameters = {if (keyset.authKey != null) 'auth': keyset.authKey}; - return Request(RequestType.delete, pathSegments, - queryParameters: queryParameters); + return Request.delete( + uri: Uri(pathSegments: pathSegments, queryParameters: queryParameters)); } } diff --git a/lib/src/dx/_endpoints/pam.dart b/lib/src/dx/_endpoints/pam.dart index faafa018..ece24b13 100644 --- a/lib/src/dx/_endpoints/pam.dart +++ b/lib/src/dx/_endpoints/pam.dart @@ -17,10 +17,11 @@ class PamGrantTokenParams extends Parameters { 'timestamp': timestamp, }; - return Request(RequestType.post, pathSegments, - queryParameters: queryParameters, - body: payload, - signWith: (t, p, q, h, b) => computeV2Signature(keyset, t, p, q, b)); + return Request.post( + uri: Uri(pathSegments: pathSegments, queryParameters: queryParameters), + body: payload); + // TODO: fix me + // signWith: (t, p, q, h, b) => computeV2Signature(keyset, t, p, q, b)); } } @@ -78,9 +79,10 @@ class PamGrantParams extends Parameters { if (read != null) 'r': read ? '1' : '0', if (write != null) 'w': write ? '1' : '0', }; - return Request(RequestType.get, pathSegments, - queryParameters: queryParameters, - signWith: (t, p, q, h, b) => computeSignature(keyset, p, q)); + return Request.get( + uri: Uri(pathSegments: pathSegments, queryParameters: queryParameters)); + //TODO: fix me + // signWith: (t, p, q, h, b) => computeSignature(keyset, p, q)); } } diff --git a/lib/src/dx/_endpoints/presence.dart b/lib/src/dx/_endpoints/presence.dart index cc269ac7..7c375183 100644 --- a/lib/src/dx/_endpoints/presence.dart +++ b/lib/src/dx/_endpoints/presence.dart @@ -31,8 +31,8 @@ class HeartbeatParams extends Parameters { if (state != null) 'state': '$state' }; - return Request(RequestType.get, pathSegments, - queryParameters: queryParameters); + return Request.get( + uri: Uri(pathSegments: pathSegments, queryParameters: queryParameters)); } } @@ -71,8 +71,8 @@ class SetUserStateParams extends Parameters { 'state': '$state', }; - return Request(RequestType.get, pathSegments, - queryParameters: queryParameters); + return Request.get( + uri: Uri(pathSegments: pathSegments, queryParameters: queryParameters)); } } @@ -113,8 +113,8 @@ class GetUserStateParams extends Parameters { if (keyset.authKey != null) 'auth': '${keyset.authKey}', }; - return Request(RequestType.get, pathSegments, - queryParameters: queryParameters); + return Request.get( + uri: Uri(pathSegments: pathSegments, queryParameters: queryParameters)); } } @@ -156,8 +156,8 @@ class LeaveParams extends Parameters { if (keyset.uuid != null) 'uuid': '${keyset.uuid.value}', }; - return Request(RequestType.get, pathSegments, - queryParameters: queryParameters); + return Request.get( + uri: Uri(pathSegments: pathSegments, queryParameters: queryParameters)); } } @@ -212,8 +212,8 @@ class HereNowParams extends Parameters { if (stateInfo == StateInfo.all) 'state': '1' }; - return Request(RequestType.get, pathSegments, - queryParameters: queryParameters); + return Request.get( + uri: Uri(pathSegments: pathSegments, queryParameters: queryParameters)); } } @@ -297,8 +297,8 @@ class WhereNowParams extends Parameters { if (keyset.authKey != null) 'auth': keyset.authKey }; - return Request(RequestType.get, pathSegments, - queryParameters: queryParameters); + return Request.get( + uri: Uri(pathSegments: pathSegments, queryParameters: queryParameters)); } } diff --git a/lib/src/dx/_endpoints/publish.dart b/lib/src/dx/_endpoints/publish.dart index fcdfdf2d..83758dbc 100644 --- a/lib/src/dx/_endpoints/publish.dart +++ b/lib/src/dx/_endpoints/publish.dart @@ -5,6 +5,7 @@ class PublishParams extends Parameters { String channel; String message; + String meta; bool storeMessage; int ttl; @@ -19,7 +20,8 @@ class PublishParams extends Parameters { keyset.subscribeKey, '0', channel, - '0' + '0', + message ]; var queryParameters = { @@ -27,14 +29,17 @@ class PublishParams extends Parameters { 'store': '1' else if (storeMessage == false) 'store': '0', + if (meta != null) 'meta': meta, if (keyset.authKey != null) 'auth': keyset.authKey, if (keyset.uuid != null) 'uuid': keyset.uuid.value, if (ttl != null) 'ttl': ttl.toString() }; - return Request(RequestType.post, pathSegments, - queryParameters: queryParameters.isNotEmpty ? queryParameters : null, - body: message); + return Request.get( + uri: Uri( + pathSegments: pathSegments, + queryParameters: + queryParameters.isNotEmpty ? queryParameters : null)); } } diff --git a/lib/src/dx/_endpoints/push.dart b/lib/src/dx/_endpoints/push.dart index 04a023b0..56269701 100644 --- a/lib/src/dx/_endpoints/push.dart +++ b/lib/src/dx/_endpoints/push.dart @@ -70,8 +70,8 @@ class ListPushChannelsParams extends Parameters { queryParameters['topic'] = topic; queryParameters.remove('type'); } - return Request(RequestType.get, pathSegments, - queryParameters: queryParameters); + return Request.get( + uri: Uri(pathSegments: pathSegments, queryParameters: queryParameters)); } } @@ -123,8 +123,8 @@ class AddPushChannelsParams extends Parameters { queryParameters['topic'] = topic; queryParameters.remove('type'); } - return Request(RequestType.get, pathSegments, - queryParameters: queryParameters); + return Request.get( + uri: Uri(pathSegments: pathSegments, queryParameters: queryParameters)); } } @@ -177,8 +177,8 @@ class RemovePushChannelsParams extends Parameters { queryParameters['topic'] = topic; queryParameters.remove('type'); } - return Request(RequestType.get, pathSegments, - queryParameters: queryParameters); + return Request.get( + uri: Uri(pathSegments: pathSegments, queryParameters: queryParameters)); } } @@ -231,8 +231,8 @@ class RemoveDeviceParams extends Parameters { queryParameters['topic'] = topic; queryParameters.remove('type'); } - return Request(RequestType.get, pathSegments, - queryParameters: queryParameters); + return Request.get( + uri: Uri(pathSegments: pathSegments, queryParameters: queryParameters)); } } diff --git a/lib/src/dx/_endpoints/signal.dart b/lib/src/dx/_endpoints/signal.dart index d8bc4f4c..f608b9b5 100644 --- a/lib/src/dx/_endpoints/signal.dart +++ b/lib/src/dx/_endpoints/signal.dart @@ -25,8 +25,8 @@ class SignalParams extends Parameters { if (keyset.uuid != null) 'uuid': keyset.uuid.value, }; - return Request(RequestType.get, pathSegments, - queryParameters: queryParameters); + return Request.get( + uri: Uri(pathSegments: pathSegments, queryParameters: queryParameters)); } } diff --git a/lib/src/dx/_endpoints/subscribe.dart b/lib/src/dx/_endpoints/subscribe.dart index ca422895..e0c12fae 100644 --- a/lib/src/dx/_endpoints/subscribe.dart +++ b/lib/src/dx/_endpoints/subscribe.dart @@ -48,9 +48,8 @@ class SubscribeParams extends Parameters { 'filter-expr': keyset.filterExpression }; - return Request(RequestType.subscribe, pathSegments, - queryParameters: queryParameters, - headers: {'Content-Type': 'application/json'}); + return Request.subscribe( + uri: Uri(pathSegments: pathSegments, queryParameters: queryParameters)); } } diff --git a/lib/src/dx/_endpoints/time.dart b/lib/src/dx/_endpoints/time.dart index 1c3ba8f9..eb188130 100644 --- a/lib/src/dx/_endpoints/time.dart +++ b/lib/src/dx/_endpoints/time.dart @@ -3,6 +3,6 @@ import 'package:pubnub/src/core/core.dart'; class TimeParams extends Parameters { @override Request toRequest() { - return Request(RequestType.get, ['time', '0']); + return Request.get(uri: Uri(pathSegments: ['time', '0'])); } } diff --git a/lib/src/dx/_utils/custom_flow.dart b/lib/src/dx/_utils/custom_flow.dart deleted file mode 100644 index e13ac1c1..00000000 --- a/lib/src/dx/_utils/custom_flow.dart +++ /dev/null @@ -1,39 +0,0 @@ -import 'dart:convert' show utf8; -import 'package:xml/xml.dart' show XmlDocument; -import 'package:meta/meta.dart'; - -import 'package:pubnub/src/core/core.dart'; -import 'package:pubnub/src/dx/_utils/utils.dart'; -import 'package:pubnub/src/net/exceptions.dart'; - -Future customFlow

( - {@required ILogger logger, - @required Core core, - @required P params, - @required Serialize serialize}) async { - var request = params.toRequest(); - - try { - var handler = await core.networking.handleCustomRequest(request); - - var response = await handler.response(); - var headers = await handler.headers(); - - var result = serialize(response, headers); - - return result; - } on PubNubRequestFailureException catch (exception) { - var responseData = exception.responseData; - if (responseData.data != null) { - var details = utf8.decode(exception.responseData.data); - var messageNode = - XmlDocument.parse(details).rootElement.getElement('Message'); - if (messageNode != null) { - details = messageNode.text; - } - throw PubNubException( - '${responseData.statusCode}\n${responseData.statusMessage}\n${exception.message}\n$details'); - } - throw PubNubException('${responseData.statusCode}'); - } -} diff --git a/lib/src/dx/_utils/default_flow.dart b/lib/src/dx/_utils/default_flow.dart index d33d16e6..da80da0e 100644 --- a/lib/src/dx/_utils/default_flow.dart +++ b/lib/src/dx/_utils/default_flow.dart @@ -2,30 +2,48 @@ import 'package:meta/meta.dart'; import 'package:pubnub/src/core/core.dart'; import 'package:pubnub/src/dx/_utils/utils.dart'; -import 'package:pubnub/src/net/exceptions.dart'; typedef Serialize = R Function(dynamic object, [Map> headers]); Future defaultFlow

( - {@required ILogger logger, - @required Core core, + {@required Core core, @required P params, + bool deserialize = true, + @required Serialize serialize}) async { + var fiber = Fiber(core, + action: () => _defaultFlow( + core: core, + params: params, + deserialize: deserialize, + serialize: serialize)); + + await fiber.run(); + + return fiber.future; +} + +Future _defaultFlow

( + {@required Core core, + @required P params, + bool deserialize = true, @required Serialize serialize}) async { var request = params.toRequest(); try { - var handler = await core.networking.handle(request); + var handler = await core.networking.handler(); + var response = await handler.response(request); - var response = await handler.text(); - var headers = await handler.headers(); + if (deserialize) { + var result = await core.parser.decode(response.text); - var object = await core.parser.decode(response); - var result = serialize(object, headers); - - return result; + return serialize(result, response.headers); + } else { + return serialize(response); + } } on PubNubRequestFailureException catch (exception) { - var error = await core.parser.decode(exception.responseData); + var error = await core.parser.decode(exception.response.text, + type: request.type == RequestType.file ? 'xml' : 'json'); if (error is Map) { error = DefaultResult.fromJson(error); diff --git a/lib/src/dx/_utils/exceptions.dart b/lib/src/dx/_utils/exceptions.dart index 2ec4f413..0ffddfa3 100644 --- a/lib/src/dx/_utils/exceptions.dart +++ b/lib/src/dx/_utils/exceptions.dart @@ -1,11 +1,18 @@ import 'package:pubnub/src/core/core.dart'; import 'package:pubnub/src/dx/_utils/utils.dart'; +import 'package:xml/xml.dart'; PubNubException getExceptionFromAny(dynamic error) { if (error is DefaultResult) { return getExceptionFromDefaultResult(error); } + if (error is XmlDocument) { + var details = error.rootElement.getElement('Message')?.text; + + return PubNubException( + 'Request to third party service failed. Details: $details'); + } if (error is List) { if (error.isEmpty) { return UnknownException(); diff --git a/lib/src/dx/_utils/utils.dart b/lib/src/dx/_utils/utils.dart index bec9756b..e7203df4 100644 --- a/lib/src/dx/_utils/utils.dart +++ b/lib/src/dx/_utils/utils.dart @@ -1,5 +1,4 @@ export './default_flow.dart'; -export './custom_flow.dart'; export './default_result.dart'; export './ensure.dart'; export './exceptions.dart'; diff --git a/lib/src/dx/batch/batch.dart b/lib/src/dx/batch/batch.dart index 7935e983..29001f4c 100644 --- a/lib/src/dx/batch/batch.dart +++ b/lib/src/dx/batch/batch.dart @@ -3,8 +3,6 @@ import 'package:pubnub/src/core/core.dart'; import 'package:pubnub/src/dx/_utils/utils.dart'; import 'package:pubnub/src/dx/_endpoints/history.dart'; -final _logger = injectLogger('dx.batch.history'); - class BatchDx { final Core _core; @@ -32,7 +30,6 @@ class BatchDx { BatchHistoryResult loopResult; do { loopResult = await defaultFlow( - logger: _logger, core: _core, params: BatchHistoryParams(keyset, channels, max: count, @@ -95,7 +92,6 @@ class BatchDx { } return defaultFlow( - logger: _logger, core: _core, params: params, serialize: (object, [_]) => CountMessagesResult.fromJson(object)); diff --git a/lib/src/dx/channel/channel.dart b/lib/src/dx/channel/channel.dart index 437a8d3e..94db06bf 100644 --- a/lib/src/dx/channel/channel.dart +++ b/lib/src/dx/channel/channel.dart @@ -33,12 +33,18 @@ class Channel { /// saving using [storeMessage] flag - `true` to save and `false` to discard. /// Leave this option unset if you want to use the default. /// + /// [meta] parameter is for providing additional information with message + /// that can be used for stream filtering + /// * Inorder to make stream filtering work, Provide valid `Object` as meta. + /// * Invalid type (e.g String) data won't be passed to server. + /// /// You can set a per-message time to live in storage using [ttl] option. /// If set to `0`, message won't expire. /// If unset, expiration will fall back to default. - Future publish(dynamic message, {bool storeMessage, int ttl}) { + Future publish(dynamic message, + {bool storeMessage, int ttl, dynamic meta}) { return _core.publish(name, message, - storeMessage: storeMessage, ttl: ttl, keyset: _keyset); + storeMessage: storeMessage, ttl: ttl, keyset: _keyset, meta: meta); } /// Returns [PaginatedChannelHistory]. Most useful in infinite list type scenario. diff --git a/lib/src/dx/channel/channel_group.dart b/lib/src/dx/channel/channel_group.dart index f4a497c8..1dec25d0 100644 --- a/lib/src/dx/channel/channel_group.dart +++ b/lib/src/dx/channel/channel_group.dart @@ -5,8 +5,6 @@ import 'package:pubnub/src/dx/_utils/utils.dart'; import 'package:pubnub/src/dx/_endpoints/channel_group.dart'; -final _log = injectLogger('dx.channel_group'); - /// Representation of a channel group. class ChannelGroup { final PubNub _core; @@ -63,7 +61,6 @@ class ChannelGroupDx { keyset ??= _core.keysets.get(using, defaultIfNameIsNull: true); return defaultFlow( - logger: _log, core: _core, params: ChannelGroupListChannelsParams(keyset, group), serialize: (object, [_]) => @@ -77,7 +74,6 @@ class ChannelGroupDx { keyset ??= _core.keysets.get(using, defaultIfNameIsNull: true); return defaultFlow( - logger: _log, core: _core, params: ChannelGroupChangeChannelsParams(keyset, group, add: channels), serialize: (object, [_]) => @@ -91,7 +87,6 @@ class ChannelGroupDx { keyset ??= _core.keysets.get(using, defaultIfNameIsNull: true); return defaultFlow( - logger: _log, core: _core, params: ChannelGroupChangeChannelsParams(keyset, group, remove: channels), @@ -104,7 +99,6 @@ class ChannelGroupDx { {Keyset keyset, String using}) { keyset ??= _core.keysets.get(using, defaultIfNameIsNull: true); return defaultFlow( - logger: _log, core: _core, params: ChannelGroupDeleteParams(keyset, group), serialize: (object, [_]) => ChannelGroupDeleteResult.fromJson(object)); diff --git a/lib/src/dx/channel/channel_history.dart b/lib/src/dx/channel/channel_history.dart index 8791592d..05dbbf1c 100644 --- a/lib/src/dx/channel/channel_history.dart +++ b/lib/src/dx/channel/channel_history.dart @@ -6,8 +6,6 @@ import 'package:pubnub/src/dx/_endpoints/history.dart'; import 'channel.dart'; import 'message.dart'; -final _logger = injectLogger('dx.channel.history'); - enum ChannelHistoryOrder { ascending, descending } extension ChannelHistoryOrderExtension on ChannelHistoryOrder { @@ -52,7 +50,6 @@ class ChannelHistory { /// [to] parameter is disregarded. Future count() async { var result = await defaultFlow( - logger: _logger, core: _core, params: CountMessagesParams(_keyset, channels: {_channel.name}, timetoken: from ?? Timetoken(1)), @@ -69,7 +66,6 @@ class ChannelHistory { /// * if both [to] and [from] are defined, then it will work on messages that were sent between [from] and [to]. Future delete() async { await defaultFlow( - logger: _logger, core: _core, params: DeleteMessagesParams(_keyset, _channel.name, end: from, start: to), @@ -89,7 +85,6 @@ class ChannelHistory { do { var result = await defaultFlow( - logger: _logger, core: _core, params: FetchHistoryParams(_keyset, _channel.name, reverse: true, @@ -157,7 +152,6 @@ class PaginatedChannelHistory { /// Fetches more messages and stores them in [messages]. Future more() async { var result = await defaultFlow( - logger: _logger, core: _core, params: FetchHistoryParams(_keyset, _channel.name, reverse: _order.choose(ascending: true, descending: false), diff --git a/lib/src/dx/file/fileManager.dart b/lib/src/dx/file/fileManager.dart deleted file mode 100644 index 590b2f9a..00000000 --- a/lib/src/dx/file/fileManager.dart +++ /dev/null @@ -1,27 +0,0 @@ -import 'dart:io'; - -import 'package:dio/dio.dart'; - -abstract class FileManager { - List read(File file); - MultipartFile createMultipartFile(List bytes, {String fileName}); - FormData createFormData(Map form); -} - -class PubNubFileManager implements FileManager { - PubNubFileManager(); - @override - List read(File file) { - return file.readAsBytesSync(); - } - - @override - MultipartFile createMultipartFile(List bytes, {String fileName}) { - return MultipartFile.fromBytes(bytes, filename: fileName); - } - - @override - FormData createFormData(Map form) { - return FormData.fromMap(form); - } -} diff --git a/lib/src/dx/file/schema.dart b/lib/src/dx/file/schema.dart deleted file mode 100644 index 3a2692b9..00000000 --- a/lib/src/dx/file/schema.dart +++ /dev/null @@ -1,9 +0,0 @@ -class FileMessage { - Map file; - dynamic message; - - FileMessage(this.file, {this.message}); - - Map toJson() => - {'message': message, 'file': file}; -} diff --git a/lib/src/dx/file/extensions/keyset.dart b/lib/src/dx/files/extensions/keyset.dart similarity index 100% rename from lib/src/dx/file/extensions/keyset.dart rename to lib/src/dx/files/extensions/keyset.dart diff --git a/lib/src/dx/file/file.dart b/lib/src/dx/files/files.dart similarity index 73% rename from lib/src/dx/file/file.dart rename to lib/src/dx/files/files.dart index f531d152..95a705a7 100644 --- a/lib/src/dx/file/file.dart +++ b/lib/src/dx/files/files.dart @@ -1,48 +1,41 @@ -import 'dart:io'; import 'package:pubnub/src/core/core.dart'; import 'package:pubnub/src/dx/_utils/utils.dart'; -import 'package:pubnub/src/dx/_endpoints/file.dart'; -import 'package:pubnub/src/dx/file/fileManager.dart'; +import 'package:pubnub/src/dx/_endpoints/files.dart'; + import 'schema.dart'; import 'extensions/keyset.dart'; -export 'extensions/keyset.dart'; -export 'schema.dart'; -final _logger = injectLogger('dx.file'); +export 'schema.dart'; +export 'extensions/keyset.dart'; class FileDx { final Core _core; - final FileManager _fileManager; - FileDx(this._core, this._fileManager); + FileDx(this._core); - /// This method allows to send [file] to [channel] - /// If file upload operation , It also publish [fileMessage] along with file data `fileId` and `fileName` + /// This method allows to send a [file] to a [channel] with an optional [fileMessage]. /// - /// Provide [cipherKey] to encrypt file content & fileEvent message if you want to override default `cipherKey` of `Keyset` - /// * It gives priority of [cipherKey] provided in method argument over `keyset`'s `cipherKey` + /// > Ensure that your [Keyset] has a `publishKey` defined. /// - /// It retries for publishing [fileMessage] till default value of PubNub configuration value - /// [fileMessagePublishRetryLimit] which is configurable + /// If you provide a [cipherKey], the file will be encrypted with it. + /// If its missing, then a `cipherKey` from [Keyset] will be used. + /// If no `cipherKey` is provided, then the file won't be encrypted. /// - /// * If all retry exhaused for publish file Message then response's `fileInfo` - /// field will give file's id and name + /// If the upload was successful, but publishing the file event to the [channel] wasn't, + /// this method will retry publishing up to a value configured in `fileMessagePublishRetryLimit`. + /// In case that is unsuccessful, you can retry publishing the file event manually by passing [FileInfo] + /// to the [publishFileMessage] method. /// - /// If [fileMessage] is null then only file information (fileId, fileName) will be published to [channel] - /// * Additional Publish File Message options - /// You can set a per-message time to live in storage using [fileMessageTtl] option. - /// If set to `0`, message won't expire. - /// If unset, expiration will fall back to default. - /// You can override the default account configuration on message - /// saving using [storeFileMessage] flag - `true` to save and `false` to discard. - /// Leave this option unset if you want to use the default. - /// Provide [fileMessageMeta] for additional information + /// #### Additional file event options + /// * You can set a per-message time to live in storage using [fileMessageTtl] option. + /// If set to `0`, message won't expire. If unset, expiration will fall back to default. /// - /// If [keyset] is not provided, then it tries to obtain a keyset [using] name. - /// If that fails, then it uses the default keyset. - /// If that fails as well, then it will throw [InvariantException]. - /// Ensure that you provide [publishKey] as it is required for publishing message + /// * You can override the default account configuration on message + /// saving using [storeFileMessage] flag - `true` to save and `false` to discard. + /// Leave this option unset if you want to use the default. + /// + /// * Provide [fileMessageMeta] for additional information Future sendFile( - String channel, File file, String fileName, + String channel, String fileName, List file, {CipherKey cipherKey, dynamic fileMessage, bool storeFileMessage, @@ -52,40 +45,46 @@ class FileDx { String using}) async { keyset ??= _core.keysets.get(using, defaultIfNameIsNull: true); - Ensure(keyset.publishKey).isNotNull('publish key for file upload message'); - var requestPayload = - await _core.parser.encode(GenerateFileUploadUrlBody(fileName)); - var fileUploadDetails = await defaultFlow( - logger: _logger, core: _core, params: GenerateFileUploadUrlParams(keyset, channel, requestPayload), serialize: (object, [_]) => GenerateFileUploadUrlResult.fromJson(object)); - var uri = Uri.parse(fileUploadDetails.fileUploadRequest['url']); - var form_fields = fileUploadDetails.fileUploadRequest['form_fields']; - var form = {}; - form_fields.forEach((m) => form[m['key']] = m['value']); + if (keyset.cipherKey != null || cipherKey != null) { - form['file'] = _fileManager.createMultipartFile(_core.crypto - .encryptFileData( - cipherKey ?? keyset.cipherKey, _fileManager.read(file))); - } else { - form['file'] = _fileManager.createMultipartFile(_fileManager.read(file), - fileName: fileName); + file = _core.crypto.encryptFileData(cipherKey ?? keyset.cipherKey, file); } - var fileInfo = fileUploadDetails.data.map((k, v) => MapEntry('$k', '$v')); - fileInfo['url'] = getFileUrl(channel, '${fileUploadDetails.data['id']}', - '${fileUploadDetails.data['name']}') - .toString(); + + // TODO: Decide what to do here + // form['file'] = _fileManager.createMultipartFile(_fileManager.read(file), + // fileName: fileName); + + var fileInfo = FileInfo( + uploadDetails.fileId, + uploadDetails.fileName, + getFileUrl(channel, uploadDetails.fileId, uploadDetails.fileName) + .toString(), + ); + var publishMessage = FileMessage(fileInfo, message: fileMessage); - var publishFileResult = PublishFileMessageResult(); + var retryCount = keyset.fileMessagePublishRetryLimit; - var s3Response = await customFlow( - logger: _logger, + + var s3Response = await defaultFlow( core: _core, - params: FileUploadParams(uri, _fileManager.createFormData(form)), + params: FileUploadParams(uploadDetails.uploadUri, + {...uploadDetails.formFields, 'file': file}), + deserialize: false, serialize: (object, [_]) => FileUploadResult.fromJson(object)); + + var publishFileResult = PublishFileMessageResult(); + if (s3Response.statusCode == 204) { do { try { @@ -147,7 +146,6 @@ class FileDx { } if (meta != null) meta = await _core.parser.encode(meta); return defaultFlow( - logger: _logger, core: _core, params: PublishFileMessageParams(keyset, channel, messagePayload, storeMessage: storeMessage, ttl: ttl, meta: meta), @@ -171,10 +169,10 @@ class FileDx { if (keyset.cipherKey != null || cipherKey != null) { decrypter = _core.crypto.decryptFileData; } - return customFlow( - logger: _logger, + return defaultFlow( core: _core, params: DownloadFileParams(getFileUrl(channel, fileId, fileName)), + deserialize: false, serialize: (object, [_]) => DownloadFileResult.fromJson(object, cipherKey: cipherKey ?? keyset.cipherKey, decryptFunction: decrypter)); @@ -194,7 +192,6 @@ class FileDx { keyset ??= _core.keysets.get(using, defaultIfNameIsNull: true); return defaultFlow( - logger: _logger, core: _core, params: ListFilesParams(keyset, channel, limit: limit, next: next), serialize: (object, [_]) => ListFilesResult.fromJson(object)); @@ -210,7 +207,6 @@ class FileDx { {Keyset keyset, String using}) async { keyset ??= _core.keysets.get(using, defaultIfNameIsNull: true); return defaultFlow( - logger: _logger, core: _core, params: DeleteFileParams(keyset, channel, fileId, fileName), serialize: (object, [_]) => DeleteFileResult.fromJson(object)); @@ -238,9 +234,7 @@ class FileDx { 'files', fileId, fileName - ], queryParameters: { - 'pnsdk': 'PubNub-Dart/${Core.version}' - }); + ]); } /// This method helps to encrypt the file content in bytes format diff --git a/lib/src/dx/files/schema.dart b/lib/src/dx/files/schema.dart new file mode 100644 index 00000000..05ef5057 --- /dev/null +++ b/lib/src/dx/files/schema.dart @@ -0,0 +1,21 @@ +class FileInfo { + String id; + String name; + String url; + + FileInfo(this.id, this.name, [this.url]); + + Map toJson() { + return {'id': id, 'name': name}; + } +} + +class FileMessage { + FileInfo file; + dynamic message; + + FileMessage(this.file, {this.message}); + + Map toJson() => + {'message': message, 'file': file.toJson()}; +} diff --git a/lib/src/dx/message_action/message_action.dart b/lib/src/dx/message_action/message_action.dart index bf88c30c..8f7adda9 100644 --- a/lib/src/dx/message_action/message_action.dart +++ b/lib/src/dx/message_action/message_action.dart @@ -3,8 +3,6 @@ import 'package:pubnub/src/core/core.dart'; import 'package:pubnub/src/dx/_utils/utils.dart'; import 'package:pubnub/src/dx/_endpoints/message_action.dart'; -final _logger = injectLogger('dx.message_action'); - mixin MessageActionDx on Core { /// Returns all message actions of a given [channel]. /// @@ -33,7 +31,6 @@ mixin MessageActionDx on Core { var loopResult; do { loopResult = await defaultFlow( - logger: _logger, core: this, params: FetchMessageActionsParams(keyset, channel, start: from, end: to, limit: limit), @@ -85,7 +82,6 @@ mixin MessageActionDx on Core { keyset, channel, timetoken, addMessageActionBody); return defaultFlow( - logger: _logger, core: this, params: params, serialize: (object, [_]) => AddMessageActionResult.fromJson(object)); @@ -114,7 +110,6 @@ mixin MessageActionDx on Core { keyset, channel, messageTimetoken, actionTimetoken); return defaultFlow( - logger: _logger, core: this, params: params, serialize: (object, [_]) => DeleteMessageActionResult.fromJson(object)); diff --git a/lib/src/dx/objects/objects.dart b/lib/src/dx/objects/objects.dart index 1dcb1fb3..0c9543fe 100644 --- a/lib/src/dx/objects/objects.dart +++ b/lib/src/dx/objects/objects.dart @@ -5,8 +5,6 @@ import 'package:pubnub/src/dx/_endpoints/objects/membership_metadata.dart'; import '../_utils/utils.dart'; import 'schema.dart'; -final _logger = injectLogger('dx.objects'); - class ObjectsDx { final Core _core; @@ -61,7 +59,6 @@ class ObjectsDx { sort: sort); return defaultFlow( - logger: _logger, core: _core, params: params, serialize: (object, [_]) => GetAllUuidMetadataResult.fromJson(object)); @@ -96,7 +93,6 @@ class ObjectsDx { var params = GetUuidMetadataParams(keyset, uuid: uuid, include: include); return defaultFlow( - logger: _logger, core: _core, params: params, serialize: (object, [_]) => GetUuidMetadataResult.fromJson(object)); @@ -143,7 +139,6 @@ class ObjectsDx { SetUuidMetadataParams(keyset, payload, uuid: uuid, include: include); return defaultFlow( - logger: _logger, core: _core, params: params, serialize: (object, [_]) => SetUuidMetadataResult.fromJson(object)); @@ -166,7 +161,6 @@ class ObjectsDx { var params = RemoveUuidMetadataParams(keyset, uuid: uuid); return defaultFlow( - logger: _logger, core: _core, params: params, serialize: (object, [_]) => RemoveUuidMetadataResult.fromJson(object)); @@ -224,7 +218,6 @@ class ObjectsDx { return defaultFlow( - logger: _logger, core: _core, params: params, serialize: (object, [_]) => @@ -252,7 +245,6 @@ class ObjectsDx { var params = GetChannelMetadataParams(keyset, channelId, include: include); return defaultFlow( - logger: _logger, core: _core, params: params, serialize: (object, [_]) => GetChannelMetadataResult.fromJson(object)); @@ -290,7 +282,6 @@ class ObjectsDx { SetChannelMetadataParams(keyset, channelId, payload, include: include); return defaultFlow( - logger: _logger, core: _core, params: params, serialize: (object, [_]) => SetChannelMetadataResult.fromJson(object)); @@ -308,7 +299,6 @@ class ObjectsDx { return defaultFlow( - logger: _logger, core: _core, params: params, serialize: (object, [_]) => @@ -383,7 +373,6 @@ class ObjectsDx { sort: sort); return defaultFlow( - logger: _logger, core: _core, params: params, serialize: (object, [_]) => MembershipsResult.fromJson(object)); @@ -476,7 +465,6 @@ class ObjectsDx { sort: sort); return defaultFlow( - logger: _logger, core: _core, params: params, serialize: (object, [_]) => MembershipsResult.fromJson(object)); @@ -565,7 +553,6 @@ class ObjectsDx { sort: sort); return defaultFlow( - logger: _logger, core: _core, params: params, serialize: (object, [_]) => MembershipsResult.fromJson(object)); @@ -651,7 +638,6 @@ class ObjectsDx { sort: sort); return defaultFlow( - logger: _logger, core: _core, params: params, serialize: (object, [_]) => MembershipsResult.fromJson(object)); @@ -717,7 +703,6 @@ class ObjectsDx { sort: sort); return defaultFlow( - logger: _logger, core: _core, params: params, serialize: (object, [_]) => ChannelMembersResult.fromJson(object)); @@ -798,7 +783,6 @@ class ObjectsDx { sort: sort); return defaultFlow( - logger: _logger, core: _core, params: params, serialize: (object, [_]) => ChannelMembersResult.fromJson(object)); @@ -873,7 +857,6 @@ class ObjectsDx { sort: sort); return defaultFlow( - logger: _logger, core: _core, params: params, serialize: (object, [_]) => ChannelMembersResult.fromJson(object)); @@ -950,7 +933,6 @@ class ObjectsDx { sort: sort); return defaultFlow( - logger: _logger, core: _core, params: params, serialize: (object, [_]) => ChannelMembersResult.fromJson(object)); diff --git a/lib/src/dx/pam/pam.dart b/lib/src/dx/pam/pam.dart index 72d41915..573df219 100644 --- a/lib/src/dx/pam/pam.dart +++ b/lib/src/dx/pam/pam.dart @@ -10,7 +10,7 @@ export 'token.dart' show Token; export 'resource.dart' show Resource, ResourceType, ResourceTypeExtension; export 'token_request.dart' show TokenRequest; -final _logger = injectLogger('dx.pam'); +final _logger = injectLogger('pubnub.dx.pam'); mixin PamDx on Core { /// Use this method to modify permissions for provided [authKeys]. @@ -47,7 +47,6 @@ mixin PamDx on Core { delete: delete); var result = await defaultFlow( - logger: _logger, core: this, params: params, serialize: (object, [_]) => PamGrantResult.fromJson(object)); diff --git a/lib/src/dx/pam/token_request.dart b/lib/src/dx/pam/token_request.dart index 6ac0f68e..dbaf5a47 100644 --- a/lib/src/dx/pam/token_request.dart +++ b/lib/src/dx/pam/token_request.dart @@ -7,8 +7,6 @@ import 'package:pubnub/src/dx/_utils/utils.dart'; import 'resource.dart'; import 'token.dart'; -final _logger = injectLogger('dx.pam'); - class TokenRequest { final Core _core; final Keyset _keyset; @@ -72,7 +70,6 @@ class TokenRequest { var payload = json.encode(data); return defaultFlow( - logger: _logger, core: _core, params: PamGrantTokenParams( _keyset, payload, '${Time().now().millisecondsSinceEpoch ~/ 1000}'), diff --git a/lib/src/dx/presence/presence.dart b/lib/src/dx/presence/presence.dart index eb5301ce..693f7bcc 100644 --- a/lib/src/dx/presence/presence.dart +++ b/lib/src/dx/presence/presence.dart @@ -4,8 +4,6 @@ import 'package:pubnub/src/dx/_utils/utils.dart'; export '../_endpoints/presence.dart' show StateInfo; -final _logger = injectLogger('dx.presence'); - mixin PresenceDx on Core { /// Gets the occupancy information from a list of [channels] and/or [channelGroups]. /// @@ -26,7 +24,6 @@ mixin PresenceDx on Core { channels: channels, channelGroups: channelGroups, stateInfo: stateInfo); return defaultFlow( - logger: _logger, core: this, params: params, serialize: (object, [_]) => HereNowResult.fromJson(object, diff --git a/lib/src/dx/publish/publish.dart b/lib/src/dx/publish/publish.dart index 980c9857..9051abee 100644 --- a/lib/src/dx/publish/publish.dart +++ b/lib/src/dx/publish/publish.dart @@ -3,7 +3,7 @@ import 'package:pubnub/src/core/core.dart'; import 'package:pubnub/src/dx/_endpoints/publish.dart'; import 'package:pubnub/src/dx/_utils/utils.dart'; -final _logger = injectLogger('dx.publish'); +final _logger = injectLogger('pubnub.dx.publish'); mixin PublishDx on Core { /// Publishes [message] to a [channel]. @@ -16,11 +16,20 @@ mixin PublishDx on Core { /// If set to `0`, message won't expire. /// If unset, expiration will fall back to default. /// + /// [meta] parameter is for providing additional information with message + /// that can be used for stream filtering + /// * Inorder to make stream filtering work, Provide valid `Object` as meta. + /// * Invalid type (e.g String) data won't be passed to server. + /// /// If [keyset] is not provided, then it tries to obtain a keyset [using] name. /// If that fails, then it uses the default keyset. /// If that fails as well, then it will throw [InvariantException]. Future publish(String channel, dynamic message, - {Keyset keyset, String using, bool storeMessage, int ttl}) async { + {Keyset keyset, + String using, + dynamic meta, + bool storeMessage, + int ttl}) async { Ensure(channel).isNotEmpty('channel name'); Ensure(message).isNotNull('message'); @@ -39,10 +48,13 @@ mixin PublishDx on Core { var params = PublishParams(keyset, channel, payload, storeMessage: storeMessage, ttl: ttl); + if (meta != null && !(meta is String)) { + params..meta = await super.parser.encode(meta); + } + _logger.verbose('Publishing a message to a channel $channel'); - return defaultFlow( - logger: _logger, + return await defaultFlow( core: this, params: params, serialize: (object, [_]) => PublishResult.fromJson(object)); diff --git a/lib/src/dx/push/push.dart b/lib/src/dx/push/push.dart index 6c01e43a..5c6ca7e5 100644 --- a/lib/src/dx/push/push.dart +++ b/lib/src/dx/push/push.dart @@ -4,8 +4,6 @@ import 'package:pubnub/src/default.dart'; import 'package:pubnub/src/dx/_utils/utils.dart'; import 'package:pubnub/src/dx/_endpoints/push.dart'; -final _logger = injectLogger('dx.push'); - // Managing device registrations for Push Notification Service mixin PushNotificationDx on Core { /// It returns list of all channels to which device [deviceId] is registered @@ -37,7 +35,6 @@ mixin PushNotificationDx on Core { var params = ListPushChannelsParams(keyset, deviceId, gateway, topic: topic, environment: environment); return defaultFlow( - logger: _logger, core: this, params: params, serialize: (object, [_]) => ListPushChannelsResult.fromJson(object)); @@ -72,7 +69,6 @@ mixin PushNotificationDx on Core { var params = AddPushChannelsParams(keyset, deviceId, gateway, channels, topic: topic, environment: environment); return defaultFlow( - logger: _logger, core: this, params: params, serialize: (object, [_]) => AddPushChannelsResult.fromJson(object)); @@ -107,7 +103,6 @@ mixin PushNotificationDx on Core { var params = RemovePushChannelsParams(keyset, deviceId, gateway, channels, topic: topic, environment: environment); return defaultFlow( - logger: _logger, core: this, params: params, serialize: (object, [_]) => RemovePushChannelsResult.fromJson(object)); @@ -141,7 +136,6 @@ mixin PushNotificationDx on Core { var params = RemoveDeviceParams(keyset, deviceId, gateway, topic: topic, environment: environment); return defaultFlow( - logger: _logger, core: this, params: params, serialize: (object, [_]) => RemoveDeviceResult.fromJson(object)); diff --git a/lib/src/dx/signal/signal.dart b/lib/src/dx/signal/signal.dart index 1c35e009..890acde9 100644 --- a/lib/src/dx/signal/signal.dart +++ b/lib/src/dx/signal/signal.dart @@ -3,8 +3,6 @@ import 'package:pubnub/src/core/core.dart'; import 'package:pubnub/src/dx/_utils/utils.dart'; import 'package:pubnub/src/dx/_endpoints/signal.dart'; -final _logger = injectLogger('dx.signal'); - mixin SignalDx on Core { /// Publishes signal [message] to a [channel]. Future signal(String channel, dynamic message, @@ -15,7 +13,6 @@ mixin SignalDx on Core { var params = SignalParams(keyset, channel, payload); return defaultFlow( - logger: _logger, core: this, params: params, serialize: (object, [_]) => SignalResult.fromJson(object)); diff --git a/lib/src/dx/subscribe/extensions/keyset.dart b/lib/src/dx/subscribe/extensions/keyset.dart index 3b51d2cf..cd97f601 100644 --- a/lib/src/dx/subscribe/extensions/keyset.dart +++ b/lib/src/dx/subscribe/extensions/keyset.dart @@ -4,6 +4,8 @@ import 'package:pubnub/src/dx/subscribe/subscription.dart'; extension SubscribeKeysetExtension on Keyset { String get filterExpression => settings['#filterExpression']; + + /// Set filter expression for Subscribe message filtering. set filterExpression(String value) => settings['#filterExpression'] = value; SubscriptionManager get subscriptionManager => diff --git a/lib/src/dx/subscribe/manager/manager.dart b/lib/src/dx/subscribe/manager/manager.dart index fab503b7..71c6b9bc 100644 --- a/lib/src/dx/subscribe/manager/manager.dart +++ b/lib/src/dx/subscribe/manager/manager.dart @@ -6,7 +6,26 @@ import 'package:pubnub/src/state_machine/state_machine.dart'; import 'exceptions.dart'; -final _logger = injectLogger('dx.subscribe.manager'); +final _logger = injectLogger('pubnub.dx.subscribe.manager'); + +class SubscribeFiber implements Fiber { + @override + int tries; + + SubscribeFiber(this.tries); + + @override + final action = null; + + @override + Future get future => Future.value(null); + + @override + int get id => -1; + + @override + Future run() async {} +} class SubscriptionManager { Core core; @@ -22,26 +41,31 @@ class SubscriptionManager { SubscriptionManager(this.core, this.keyset) { _RequestMachine = Blueprint>() + ..define('send', from: ['initialized'], to: 'pending') ..define('reject', from: ['pending'], to: 'rejected') ..define('resolve', from: ['pending'], to: 'resolved') ..define('timeout', from: ['pending'], to: 'rejected') ..when('resolved', 'enters').exit(withPayload: true) ..when('rejected', 'enters').exit(withPayload: true) - ..when(null).send('timeout', - payload: SubscribeTimeoutException(), after: Duration(seconds: 270)) - ..when('pending', 'enters').callback((ctx) async { - SubscribeParams params = ctx.payload; - - try { - var handler = await core.networking.handle(params.toRequest()); - - ctx.update({'handler': handler}); + ..when('initialized', 'enters').callback((ctx) { + core.networking.handler().then((handler) { + if (ctx.machine.state == null) { + handler.cancel(); + return; + } - var result = await handler.text(); - ctx.machine.send('resolve', result); - } catch (error) { + ctx.update({'handler': handler, 'params': ctx.payload}); + ctx.machine.send('send', handler); + }); + }) + ..when('pending', 'enters').callback((ctx) { + (ctx.payload as IRequestHandler) + .response(ctx.context['params'].toRequest()) + .then((response) { + ctx.machine.send('resolve', response); + }).catchError((error) { ctx.machine.send('reject', error); - } + }); }) ..when(null, 'enters').callback((ctx) { if (ctx.context != null) { @@ -60,7 +84,7 @@ class SubscriptionManager { }) ..when('state.idle', 'enters').callback((ctx) { _logger.silly( - 'Entering ${ctx.entering} from ${ctx.exiting} because of ${ctx.event}'); + 'TOP: Entering ${ctx.entering} from ${ctx.exiting} because of ${ctx.event} with ${ctx.payload}'); if (ctx.event == 'update') { var newContext = {...ctx.context, ...ctx.payload}; @@ -71,13 +95,14 @@ class SubscriptionManager { if ((newContext['channels'].length > 0 || newContext['channelGroups'].length > 0)) { if (_messagesController == null || _messagesController.isClosed) { - _logger.silly('Creating the controller...'); + _logger.silly('TOP: Creating the controller...'); _messagesController = StreamController.broadcast(); } + ctx.machine.send('fetch'); } else { if (_messagesController != null) { - _logger.silly('Disposing the controller...'); + _logger.silly('TOP: Disposing the controller...'); _messagesController.close(); _messagesController = null; } @@ -88,34 +113,81 @@ class SubscriptionManager { }); _SubscriptionMachine - ..when('state.fetching').machine('request', _RequestMachine, - onBuild: (m, sm) { - _logger.silly('Entering state.fetching'); - var params = SubscribeParams(keyset, m.context['timetoken'].value, - region: m.context['region'], - channels: m.context['channels'], - channelGroups: m.context['channelGroups']); - - sm.enter('pending', params); - }, onExit: (ctx, m, sm) async { - switch (ctx.exiting) { - case 'resolved': - var object = await core.parser.decode(ctx.payload); - var result = SubscribeResult.fromJson(object); - - await _messagesController - .addStream(Stream.fromIterable(result.messages)); - - m.send('update', - {'timetoken': result.timetoken, 'region': result.region}); - break; - case 'rejected': - if (ctx.payload is SubscribeTimeoutException) { - m.send('update', {}); - } - break; - } - }); + ..when('state.fetching').machine( + 'request', + _RequestMachine, + onParentEnter: (m, sm) { + Future.delayed(Duration(seconds: 270), () { + sm.send('timeout', SubscribeTimeoutException()); + }); + }, + onParentExit: (m, sm) { + _logger.silly('SUB: Parent is exiting when my state is ${sm.state}'); + if (sm.state == 'pending' || sm.state == 'initialized') { + _logger.silly('SUB: Cancelling pending request.'); + if (sm.context != null) sm.context['handler']?.cancel(); + } + }, + onBuild: (m, sm) { + _logger.silly('TOP: Entering state.fetching'); + var params = SubscribeParams(keyset, m.context['timetoken'].value, + region: m.context['region'], + channels: m.context['channels'], + channelGroups: m.context['channelGroups']); + + sm.enter('initialized', params); + }, + onEnter: (ctx, m, sm) { + _logger.silly( + 'SUB: Submachine entered ${ctx.exiting} from ${ctx.entering} because ${ctx.event} (with ${ctx.payload}).'); + }, + onExit: (ctx, m, sm) async { + switch (ctx.exiting) { + case 'resolved': + _logger.silly( + 'SUB: Submachine exited ${ctx.exiting} to ${ctx.entering} because ${ctx.event} (with ${ctx.payload}).'); + IResponse response = ctx.payload; + var object = await core.parser.decode(await response.text); + var result = SubscribeResult.fromJson(object); + + await _messagesController + .addStream(Stream.fromIterable(result.messages)); + + m.send('update', { + 'timetoken': result.timetoken, + 'region': result.region, + 'retry': 1 + }); + break; + case 'rejected': + var fiber = SubscribeFiber(m.context['retry'] ?? 1); + _logger.warning( + 'An exception has occured while running a subscribe loop (retry #${fiber.tries}).'); + var diagnostic = + core.supervisor.runDiagnostics(fiber, ctx.payload); + + if (diagnostic == null) { + return _messagesController.addError(ctx.payload); + } + + _logger.silly('Possible reason found: $diagnostic'); + + var resolutions = + core.supervisor.runStrategies(fiber, diagnostic); + + for (var resolution in resolutions) { + if (resolution is FailResolution) { + _messagesController.addError(ctx.payload); + } else if (resolution is DelayResolution) { + await Future.delayed(resolution.delay); + } else if (resolution is RetryResolution) { + m.send('update', {'retry': fiber.tries + 1}); + } + } + break; + } + }, + ); machine = _SubscriptionMachine.build(); @@ -125,6 +197,12 @@ class SubscriptionManager { 'timetoken': Timetoken(0), 'region': null, }); + + core.supervisor.events + .where((event) => event is NetworkIsDownEvent) + .listen((_event) { + machine.send('update', {'retry': (machine.context['retry'] ?? 1) + 1}); + }); } Future update( @@ -134,7 +212,12 @@ class SubscriptionManager { machine.send('fail', SubscribeOutdatedException()); } - machine.send('update', {...cb(machine.context), 'completer': completer}); + machine.send('update', { + ...cb( + machine.context, + ), + 'completer': completer, + }); return completer.future; } diff --git a/lib/src/dx/subscribe/subscribe.dart b/lib/src/dx/subscribe/subscribe.dart index ba3b8fcf..a359124a 100644 --- a/lib/src/dx/subscribe/subscribe.dart +++ b/lib/src/dx/subscribe/subscribe.dart @@ -9,8 +9,6 @@ import 'package:pubnub/src/dx/_endpoints/presence.dart'; export 'extensions/keyset.dart'; -final _logger = injectLogger('dx.subscribe'); - mixin SubscribeDx on Core { /// Subscribes to [channels] and [channelGroups]. Returns [Subscription]. Subscription subscription( @@ -70,7 +68,6 @@ mixin SubscribeDx on Core { Ensure(keyset).isNotNull('keyset'); return defaultFlow( - logger: _logger, core: this, params: LeaveParams(keyset, channels: channels, channelGroups: channelGroups), @@ -89,7 +86,6 @@ mixin SubscribeDx on Core { Ensure(keyset).isNotNull('keyset'); return defaultFlow( - logger: _logger, core: this, params: HeartbeatParams(keyset, channels: channels, diff --git a/lib/src/dx/subscribe/subscription.dart b/lib/src/dx/subscribe/subscription.dart index e22410a5..d24b3148 100644 --- a/lib/src/dx/subscribe/subscription.dart +++ b/lib/src/dx/subscribe/subscription.dart @@ -6,7 +6,7 @@ import 'package:pubnub/src/dx/_utils/disposable.dart'; import 'envelope.dart'; import 'extensions/keyset.dart'; -final _logger = injectLogger('dx.subscribe.subscription'); +final _logger = injectLogger('pubnub.dx.subscribe.subscription'); class Subscription extends Disposable { final Core _core; @@ -85,8 +85,9 @@ class Subscription extends Disposable { (presenceChannels.contains(envelope['c']) || presenceChannelGroups.contains(envelope['b']))); }).asyncMap((envelope) async { + _logger.silly('Processing envelope: $envelope'); if ((envelope['e'] == null || envelope['e'] == 4) && - !envelope['b'].endsWith('-pnpres') && + !envelope['c'].endsWith('-pnpres') && _keyset.cipherKey != null) { envelope['d'] = await _core.parser .decode(_core.crypto.decrypt(_keyset.cipherKey, envelope['d'])); diff --git a/lib/src/dx/time.dart b/lib/src/dx/time.dart index d9d14215..994ae57b 100644 --- a/lib/src/dx/time.dart +++ b/lib/src/dx/time.dart @@ -3,13 +3,10 @@ import 'package:pubnub/src/core/core.dart'; import 'package:pubnub/src/dx/_utils/utils.dart'; import 'package:pubnub/src/dx/_endpoints/time.dart'; -final _logger = injectLogger('pubnub.dx.time'); - mixin TimeDx on Core { /// Get current timetoken value from the PubNub network. Future time() async { return defaultFlow( - logger: _logger, core: this, params: TimeParams(), serialize: (object, [_]) => Timetoken(object[0] as int)); diff --git a/lib/src/logging/logging.dart b/lib/src/logging/logging.dart index e4865f6d..6a953be1 100644 --- a/lib/src/logging/logging.dart +++ b/lib/src/logging/logging.dart @@ -61,6 +61,12 @@ class LogRecord { customPrint(message); }; } + + /// Function that can be passed into `listen` method to print a [LogRecord]. + /// + /// Prints `[$time] (${level.name}) $message`. + static void Function(LogRecord) defaultPrinter = + LogRecord.createPrinter(r'[$time] (${level.name}) $message'); } /// A logger implementation that contains a stream of [LogRecord] records. @@ -220,4 +226,18 @@ class StreamLogger extends ILogger { _sink.add(record); } } + + bool _isDisposed = false; + + Future dispose() async { + if (!_isDisposed) { + await _streamController.close(); + + for (var child in _children.values) { + await child.dispose(); + } + + _isDisposed = true; + } + } } diff --git a/lib/src/net/meta/diagnostics.dart b/lib/src/net/meta/diagnostics.dart new file mode 100644 index 00000000..35e77953 --- /dev/null +++ b/lib/src/net/meta/diagnostics.dart @@ -0,0 +1,32 @@ +import 'package:pubnub/pubnub.dart'; + +class HostIsDownDiagnostic extends Diagnostic { + final String host; + final int port; + + const HostIsDownDiagnostic(this.host, this.port); +} + +class HostLookupFailedDiagnostic extends Diagnostic { + final String host; + + const HostLookupFailedDiagnostic(this.host); +} + +class TimeoutDiagnostic extends Diagnostic { + final int timeout; + + const TimeoutDiagnostic(this.timeout); +} + +final Map netDiagnosticsMap = { + RegExp(r'SocketException: Connection failed \(OS Error: Host is down, errno = 64\), address = ([a-zA-Z0-9\-\.]+), port = ([0-9]+)'): + (match) => + HostIsDownDiagnostic(match.group(1), int.parse(match.group(2))), + RegExp(r"SocketException: Failed host lookup: '([a-zA-Z0-9\-\.]+)' \(OS Error: nodename nor servname provided, or not known, errno = 8\)"): + (match) => HostLookupFailedDiagnostic(match.group(1)), + RegExp(r"Failed host lookup: '([a-zA-Z0-9\-\.]+)'"): (match) => + HostLookupFailedDiagnostic(match.group(1)), + RegExp(r'Connecting timed out \[([0-9]+)ms\]'): (match) => + TimeoutDiagnostic(int.parse(match.group(1))), +}; diff --git a/lib/src/net/meta/meta.dart b/lib/src/net/meta/meta.dart new file mode 100644 index 00000000..1e957773 --- /dev/null +++ b/lib/src/net/meta/meta.dart @@ -0,0 +1,23 @@ +import 'package:pubnub/pubnub.dart'; + +import 'diagnostics.dart'; + +export 'strategy.dart'; +export 'retry_policy.dart'; + +Diagnostic getNetworkDiagnostic(dynamic exception) { + if (exception is PubNubRequestOtherException || + exception is PubNubRequestTimeoutException) { + var otherException = exception.additionalData; + + if (otherException?.message != null) { + return netDiagnosticsMap.entries.map((entry) { + return entry.key.hasMatch(otherException.message) + ? entry.value(entry.key.matchAsPrefix(otherException.message)) + : null; + }).firstWhere((element) => element != null, orElse: () => null); + } + } + + return null; +} diff --git a/lib/src/net/meta/retry_policy.dart b/lib/src/net/meta/retry_policy.dart new file mode 100644 index 00000000..6015a8b3 --- /dev/null +++ b/lib/src/net/meta/retry_policy.dart @@ -0,0 +1,38 @@ +import 'dart:math'; + +import 'package:pubnub/pubnub.dart'; + +abstract class RetryPolicy { + final int maxRetries; + + const RetryPolicy(this.maxRetries); + + Duration getDelay(Fiber fiber); + + factory RetryPolicy.linear({int backoff, int maxRetries = 5}) => + LinearRetryPolicy(backoff: backoff, maxRetries: maxRetries); + factory RetryPolicy.exponential({int maxRetries = 5}) => + ExponentialRetryPolicy(maxRetries: maxRetries); +} + +class LinearRetryPolicy extends RetryPolicy { + final int backoff; + + const LinearRetryPolicy({this.backoff, int maxRetries}) : super(maxRetries); + + @override + Duration getDelay(Fiber fiber) { + return Duration( + milliseconds: (fiber.tries * backoff) + Random().nextInt(1000)); + } +} + +class ExponentialRetryPolicy extends RetryPolicy { + const ExponentialRetryPolicy({int maxRetries}) : super(maxRetries); + + @override + Duration getDelay(Fiber fiber) { + return Duration( + milliseconds: pow(2, fiber.tries - 1) * 1000 + Random().nextInt(1000)); + } +} diff --git a/lib/src/net/meta/strategy.dart b/lib/src/net/meta/strategy.dart new file mode 100644 index 00000000..71ce7c77 --- /dev/null +++ b/lib/src/net/meta/strategy.dart @@ -0,0 +1,35 @@ +import 'package:pubnub/pubnub.dart'; + +import 'diagnostics.dart'; +import 'retry_policy.dart'; + +class NetworkingStrategy extends Strategy { + RetryPolicy retryPolicy; + + NetworkingStrategy({this.retryPolicy}); + + @override + List resolve(Fiber fiber, Diagnostic diagnostic) { + if (retryPolicy == null) { + return [Resolution.fail()]; + } + + if (fiber.tries >= retryPolicy?.maxRetries) { + return [Resolution.fail()]; + } + + if (diagnostic is HostIsDownDiagnostic || + diagnostic is HostLookupFailedDiagnostic || + diagnostic is TimeoutDiagnostic) { + // Host is down. We should retry after some delay. + + return [ + Resolution.networkStatus(false), + Resolution.delay(retryPolicy.getDelay(fiber)), + Resolution.retry() + ]; + } + + return null; + } +} diff --git a/lib/src/net/net.dart b/lib/src/net/net.dart index d91a2722..9b91ab19 100644 --- a/lib/src/net/net.dart +++ b/lib/src/net/net.dart @@ -2,193 +2,32 @@ import 'dart:async'; import 'package:dio/dio.dart'; import 'package:pool/pool.dart'; +import 'package:pubnub/pubnub.dart'; -import 'package:pubnub/src/core/core.dart'; -import 'exceptions.dart'; +import 'meta/meta.dart'; +import 'request_handler.dart'; -final _logger = injectLogger('pubnub.networking'); - -class PubNubRequestHandler extends RequestHandler { - static int _idCounter = 0; - - Request request; - Dio _client; - - final CancelToken _cancelToken = CancelToken(); - final Completer _contents = Completer(); - PoolResource _resource; - - final int _id; - - PubNubRequestHandler(this.request, Dio client, PoolResource resource) - : _id = PubNubRequestHandler._idCounter++ { - _client = client; - _resource = resource; - _initialize(); - } - - void _initialize() async { - var uri = Uri( - pathSegments: request.pathSegments, - queryParameters: request.queryParameters); - _logger.info('($_id) Starting request to ${uri}...'); - try { - var response = await _client.requestUri(uri, - data: request.body, - options: - Options(method: request.type.method, headers: request.headers), - cancelToken: _cancelToken); - - _logger.info('($_id) Request succeed! (${response.request.uri})'); - _contents.complete(response); - } on DioError catch (e) { - _logger.info('($_id) Request failed ($e, ${e.message})'); - switch (e.type) { - case DioErrorType.CANCEL: - _contents.completeError(PubNubRequestCancelException(e.error)); - break; - case DioErrorType.CONNECT_TIMEOUT: - case DioErrorType.RECEIVE_TIMEOUT: - case DioErrorType.SEND_TIMEOUT: - _contents.completeError(PubNubRequestTimeoutException()); - break; - case DioErrorType.RESPONSE: - _contents - .completeError(PubNubRequestFailureException(e.response.data)); - break; - case DioErrorType.DEFAULT: - default: - _contents.completeError(PubNubRequestOtherException()); - break; - } - } finally { - _resource.release(); - } - } - - @override - Future response() async { - return await _contents.future; - } - - @override - Future text() async { - return (await _contents.future).data as String; - } - - @override - Future>> headers() async { - return (await _contents.future).headers.map; - } - - @override - bool get isCancelled => _cancelToken.isCancelled; - - @override - void cancel([dynamic reason]) { - if (!_cancelToken.isCancelled) { - _cancelToken.cancel(reason); - } - } -} - -class PubNubNetworkingModule implements NetworkModule { - static final Uri origin = Uri(scheme: 'https', host: 'ps.pndsn.com'); +class NetworkingModule implements INetworkingModule { + static int _requestCounter = 0; + final RetryPolicy retryPolicy; final Pool _pool = Pool(10); - final Dio _client = Dio(BaseOptions(baseUrl: '${origin.toString()}/')); + final Dio _client = Dio(); - @override - Future handle(Request request) async { - var resource = await _pool.request(); - return PubNubRequestHandler(request, _client, resource); - } + NetworkingModule({this.retryPolicy}); @override - Future handleCustomRequest(Request request) async { - var resource = await _pool.request(); - return CustomRequestHandler(request, Dio(), resource); - } -} - -class CustomRequestHandler extends RequestHandler { - static int _idCounter = 0; - - Request request; - Dio _client; - - final CancelToken _cancelToken = CancelToken(); - final Completer _contents = Completer(); - PoolResource _resource; - - final int _id; - - CustomRequestHandler(this.request, Dio client, PoolResource resource) - : _id = CustomRequestHandler._idCounter++ { - _client = client; - _resource = resource; - _initialize(); - } - - void _initialize() async { - _logger.info('($_id) Starting request to ${request.url}...'); - try { - var response = await _client.requestUri(request.url, - data: request.body, - options: Options( - method: request.type.method, - headers: request.headers, - responseType: ResponseType.bytes), - cancelToken: _cancelToken); + Future handler() async { + var requestId = _requestCounter++; - _logger.info('($_id) Request succeed! (${response.request.uri})'); - _contents.complete(response); - } on DioError catch (e) { - _logger.info('($_id) Request failed ($e, ${e.message})'); - switch (e.type) { - case DioErrorType.CANCEL: - _contents.completeError(PubNubRequestCancelException(e.error)); - break; - case DioErrorType.CONNECT_TIMEOUT: - case DioErrorType.RECEIVE_TIMEOUT: - case DioErrorType.SEND_TIMEOUT: - _contents.completeError(PubNubRequestTimeoutException()); - break; - case DioErrorType.RESPONSE: - _contents.completeError(PubNubRequestFailureException(e.response)); - break; - case DioErrorType.DEFAULT: - default: - _contents.completeError(PubNubRequestOtherException()); - break; - } - } finally { - _resource.release(); - } + return RequestHandler(requestId, + client: _client, resource: _pool.request()); } @override - Future response() { - return _contents.future.then((value) => value); - } - - @override - Future text() async { - return (await _contents.future).data; - } - - @override - Future>> headers() async { - return (await _contents.future).headers.map; - } - - @override - bool get isCancelled => _cancelToken.isCancelled; - - @override - void cancel([dynamic reason]) { - if (!_cancelToken.isCancelled) { - _cancelToken.cancel(reason); - } + void register(Core core) { + core.supervisor.registerDiagnostic(getNetworkDiagnostic); + core.supervisor + .registerStrategy(NetworkingStrategy(retryPolicy: retryPolicy)); } } diff --git a/lib/src/net/request_handler.dart b/lib/src/net/request_handler.dart new file mode 100644 index 00000000..2926064b --- /dev/null +++ b/lib/src/net/request_handler.dart @@ -0,0 +1,108 @@ +import 'package:dio/dio.dart' as dio; +import 'package:pool/pool.dart' show PoolResource; +import 'package:pubnub/pubnub.dart'; + +import 'response.dart'; + +final _logger = injectLogger('pubnub.networking.request_handler'); + +class RequestHandler extends IRequestHandler { + final int _id; + final dio.Dio _client; + final Future _resourceP; + PoolResource _resource; + final dio.CancelToken _cancelToken = dio.CancelToken(); + + bool _isReleased = false; + + RequestHandler(this._id, {dio.Dio client, Future resource}) + : _client = client, + _resourceP = resource; + + @override + Future response(Request request) async { + _logger.info('($_id) Awaiting for resource...'); + _resource = await _resourceP; + _logger.info('($_id) Resource obtained.'); + + var uri = prepareUri(request.uri); + var body = request.body; + + _logger.info('($_id) Starting request to ${uri}...'); + + if (request.type == RequestType.file) { + body = + dio.FormData.fromMap((body as Map).map((key, value) { + if (value is List) { + return MapEntry(key, dio.MultipartFile.fromBytes(value)); + } else { + return MapEntry(key, value); + } + })); + } + + try { + var response = await _client.requestUri>( + uri, + data: body, + options: dio.RequestOptions( + method: request.type.method, + headers: request.headers, + responseType: dio.ResponseType.bytes, + receiveTimeout: request.type.receiveTimeout, + sendTimeout: request.type.sendTimeout, + ), + cancelToken: _cancelToken, + ); + + _logger.info('(${_id}) Request succeed!'); + + return Response(response); + } on dio.DioError catch (e) { + _logger.info('($_id) Request failed ($e, ${e.message})'); + switch (e.type) { + case dio.DioErrorType.CANCEL: + throw PubNubRequestCancelException(e.error); + break; + case dio.DioErrorType.CONNECT_TIMEOUT: + case dio.DioErrorType.RECEIVE_TIMEOUT: + case dio.DioErrorType.SEND_TIMEOUT: + throw PubNubRequestTimeoutException(e); + break; + case dio.DioErrorType.RESPONSE: + var response = dio.Response>( + data: e.response.data, + headers: e.response.headers, + statusCode: e.response.statusCode); + + throw PubNubRequestFailureException(Response(response)); + break; + case dio.DioErrorType.DEFAULT: + default: + throw PubNubRequestOtherException(e.error); + break; + } + } catch (e) { + _logger.fatal('($_id) Request failed ($e)'); + throw PubNubRequestOtherException(e); + } finally { + if (!_isReleased) { + _isReleased = true; + _resource?.release(); + _logger.info('($_id) Resource released...'); + } + } + } + + @override + bool get isCancelled => _cancelToken.isCancelled; + + @override + void cancel([dynamic reason]) { + if (!_cancelToken.isCancelled && !_isReleased) { + _cancelToken.cancel(reason); + _resource?.release(); + _logger.info('($_id) Request cancelled and resource released.'); + } + } +} diff --git a/lib/src/net/response.dart b/lib/src/net/response.dart new file mode 100644 index 00000000..f099eb42 --- /dev/null +++ b/lib/src/net/response.dart @@ -0,0 +1,25 @@ +import 'dart:convert'; + +import 'package:dio/dio.dart' as dio; +import 'package:pubnub/src/core/core.dart'; + +class Response extends IResponse { + @override + final Map> headers; + + @override + final int statusCode; + + final List _data; + + Response(dio.Response> response) + : headers = response.headers.map, + statusCode = response.statusCode, + _data = response.data; + + @override + String get text => utf8.decode(_data); + + @override + List get byteList => _data; +} diff --git a/lib/src/parser/parser.dart b/lib/src/parser/parser.dart index a666d8f3..b4593cc0 100644 --- a/lib/src/parser/parser.dart +++ b/lib/src/parser/parser.dart @@ -1,24 +1,77 @@ import 'dart:convert'; +import 'package:xml/xml.dart' show XmlDocument; -import '../core/parse.dart'; +import 'package:pubnub/src/core/core.dart'; -class PubNubParserModule implements ParserModule { +import '../core/parser.dart'; + +abstract class Parser { + Future decode(String input); + Future encode(T input); + + const Parser(); +} + +class _JsonParser extends Parser { + const _JsonParser(); + + @override + Future decode(String input) async { + return json.decode(input); + } + + @override + Future encode(input) async { + return json.encode(input); + } +} + +class _XmlParser extends Parser { + const _XmlParser(); + + @override + Future decode(String input) async { + return XmlDocument.parse(input); + } + + @override + Future encode(XmlDocument input) async { + return input.toXmlString(); + } +} + +const Map _parserMap = { + 'json': _JsonParser(), + 'xml': _XmlParser(), +}; + +class ParserModule implements IParserModule { @override - Future decode(String input) async { + Future decode(String input, {String type = 'json'}) async { + if (!_parserMap.containsKey(type)) { + throw ParserException('Unsupported format $type.'); + } + try { - return json.decode(input); + return await _parserMap[type].decode(input); } catch (e) { - throw ParserException('Cannot decode string as JSON'); + throw ParserException('Cannot decode input string as $type.', e); } } @override - Future encode(dynamic input) async { + Future encode(T input, {String type = 'json'}) async { + if (!_parserMap.containsKey(type)) { + throw ParserException('Unsupported format $type.'); + } + try { - return json.encode(input); - } on JsonUnsupportedObjectError catch (error) { - throw ParserException( - 'Cannot encode object ${error.unsupportedObject} as JSON String'); + return await _parserMap[type].encode(input); + } catch (e) { + throw ParserException('Cannot encode input object as $type.', e); } } + + @override + void register(Core core) {} } diff --git a/lib/src/state_machine/blueprint.dart b/lib/src/state_machine/blueprint.dart index 4c04c9e4..d2dccd58 100644 --- a/lib/src/state_machine/blueprint.dart +++ b/lib/src/state_machine/blueprint.dart @@ -46,10 +46,16 @@ class BlueprintFactory { String name, Blueprint blueprint, {MachineCallbackWithCtx onEnter, MachineCallbackWithCtx onExit, - MachineCallback onBuild}) => + MachineCallback onBuild, + MachineCallback onParentEnter, + MachineCallback onParentExit}) => _addEffect(MachineEffect( name, blueprint, - onEnter: onEnter, onExit: onExit, onBuild: onBuild)); + onEnter: onEnter, + onExit: onExit, + onBuild: onBuild, + onParentEnter: onParentEnter, + onParentExit: onParentExit)); } class Blueprint { diff --git a/lib/src/state_machine/effects/machine.dart b/lib/src/state_machine/effects/machine.dart index c08c543b..31b85ade 100644 --- a/lib/src/state_machine/effects/machine.dart +++ b/lib/src/state_machine/effects/machine.dart @@ -15,12 +15,19 @@ class MachineEffect String name; Blueprint blueprint; + MachineCallback onParentExit; + MachineCallback onParentEnter; + MachineCallbackWithCtx onExit; MachineCallbackWithCtx onEnter; MachineCallback onBuild; MachineEffect(this.name, this.blueprint, - {this.onExit, this.onEnter, this.onBuild}); + {this.onExit, + this.onEnter, + this.onBuild, + this.onParentEnter, + this.onParentExit}); @override void execute( @@ -36,6 +43,7 @@ class MachineEffect machine.register(name, submachine); + if (onParentEnter != null) onParentEnter(machine, submachine); if (onBuild != null) onBuild(machine, submachine); submachine.when(null, 'exits', @@ -49,6 +57,7 @@ class MachineEffect })); } else if (edge == 'exits') { var submachine = machine.get(name); + if (onParentExit != null) onParentExit(machine, submachine); if (submachine.state != null) { submachine.exit(); diff --git a/lib/src/state_machine/state_machine.dart b/lib/src/state_machine/state_machine.dart index 21241980..5b949ccf 100644 --- a/lib/src/state_machine/state_machine.dart +++ b/lib/src/state_machine/state_machine.dart @@ -82,7 +82,10 @@ class StateMachine { void enter(State state, [dynamic payload]) => _transition(state, '_enter', payload); - void exit([dynamic payload]) => _transition(null, '_exit', payload); + void exit([dynamic payload]) { + _transition(null, '_exit', payload); + _transitionsController.close(); + } bool send(String event, [dynamic payload]) { var legalStates = _defs[event]; diff --git a/pubspec.lock b/pubspec.lock index 426d3b96..cbc75142 100644 --- a/pubspec.lock +++ b/pubspec.lock @@ -254,7 +254,7 @@ packages: source: hosted version: "1.7.0" pedantic: - dependency: "direct dev" + dependency: "direct main" description: name: pedantic url: "https://pub.dartlang.org" diff --git a/pubspec.yaml b/pubspec.yaml index e37acc0f..4ae2eb58 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -1,6 +1,6 @@ name: pubnub description: PubNub SDK v5 for Dart lang (with Flutter support) that allows you to create real-time applications -version: 1.4.4 +version: 2.0.0 homepage: https://www.pubnub.com/docs environment: @@ -14,8 +14,8 @@ dependencies: cbor: ^3.1.0 encrypt: ^4.0.2 convert: ^2.0.0 + pedantic: ^1.9.0 xml: ^4.3.0 dev_dependencies: - pedantic: 1.9.0 test: 1.11.1 test_coverage: 0.4.1 diff --git a/test/core/logging_test.dart b/test/core/logging_test.dart index 5fca2001..421035e0 100644 --- a/test/core/logging_test.dart +++ b/test/core/logging_test.dart @@ -21,7 +21,7 @@ void main() { test('should return the logger from Zone.current', () async { var logger = FakeLogger(); - await provideLogger(logger, () { + await provideLogger(logger, () async { var logger = injectLogger('test.logger'); logger.info('test'); diff --git a/test/crypto/crypto_test.dart b/test/crypto/crypto_test.dart index 874330e4..7909246f 100644 --- a/test/crypto/crypto_test.dart +++ b/test/crypto/crypto_test.dart @@ -4,13 +4,13 @@ import 'package:pubnub/src/crypto/crypto.dart'; import 'package:test/test.dart'; void main() { - PubNubCryptoModule crypto; + CryptoModule crypto; CipherKey key; group('Crypto [PubNubCryptoModule]', () { setUp(() { key = CipherKey.fromUtf8('thecustomsecretkey'); - crypto = PubNubCryptoModule(); + crypto = CryptoModule(); }); test('should work in two ways', () async { diff --git a/test/dx/channel_test.dart b/test/dx/channel_test.dart index cd8fd1ec..3eb5a4d5 100644 --- a/test/dx/channel_test.dart +++ b/test/dx/channel_test.dart @@ -46,7 +46,8 @@ void main() { #keyset: keyset, #using: null, #storeMessage: null, - #ttl: 60 + #ttl: 60, + #meta: null })); }); }); diff --git a/test/dx/file_test.dart b/test/dx/file_test.dart index 85cca6ce..112a0c9f 100644 --- a/test/dx/file_test.dart +++ b/test/dx/file_test.dart @@ -1,26 +1,21 @@ -import 'package:test/test.dart'; import 'dart:convert'; -import 'dart:io'; - -import 'package:dio/src/form_data.dart'; -import 'package:dio/src/multipart_file.dart'; - -import 'package:pubnub/src/dx/file/file.dart'; -import 'package:pubnub/src/dx/file/fileManager.dart'; +import 'package:test/test.dart'; import 'package:pubnub/pubnub.dart'; -import 'package:pubnub/src/dx/_endpoints/file.dart'; -import 'package:pubnub/src/core/core.dart'; +import 'package:pubnub/src/dx/files/files.dart'; +import 'package:pubnub/src/dx/_endpoints/files.dart'; + import '../net/fake_net.dart'; -part './fixtures/file.dart'; + +part 'fixtures/files.dart'; void main() { PubNub pubnub; + var keyset = Keyset(subscribeKey: 'test', publishKey: 'test'); group('DX [file]', () { setUp(() { pubnub = PubNub(networking: FakeNetworkingModule()) - ..keysets.add(Keyset(subscribeKey: 'test', publishKey: 'test'), - name: 'default', useAsDefault: true); + ..keysets.add(keyset, name: 'default', useAsDefault: true); }); test('#listFiles', () async { @@ -43,7 +38,7 @@ void main() { method: 'GET', ).then(status: 200, body: _publishFileMessageSuccessResponse); var message = - FileMessage({'id': 'some', 'name': 'cat_file.jpg'}, message: 'msg'); + FileMessage(FileInfo('some', 'cat_file.jpg'), message: 'msg'); var result = await pubnub.files.publishFileMessage('channel', message); expect(result, isA()); }); @@ -54,7 +49,7 @@ void main() { method: 'GET', ).then(status: 200, body: _publishFileMessageSuccessResponse); var message = - FileMessage({'id': 'some', 'name': 'cat_file.jpg'}, message: 'msg'); + FileMessage(FileInfo('some', 'cat_file.jpg'), message: 'msg'); var result = await pubnub.files.publishFileMessage('channel', message, cipherKey: CipherKey.fromUtf8('cipherKey')); expect(result, isA()); @@ -72,7 +67,7 @@ void main() { method: 'GET', ).then(status: 200, body: _publishFileMessageSuccessResponse); var message = - FileMessage({'id': 'some', 'name': 'cat_file.jpg'}, message: 'msg'); + FileMessage(FileInfo('some', 'cat_file.jpg'), message: 'msg'); var result = await pubnub.files.publishFileMessage('channel', message); expect(result, isA()); }); @@ -89,7 +84,7 @@ void main() { method: 'GET', ).then(status: 200, body: _publishFileMessageSuccessResponse); var message = - FileMessage({'id': 'some', 'name': 'cat_file.jpg'}, message: 'msg'); + FileMessage(FileInfo('some', 'cat_file.jpg'), message: 'msg'); var result = await pubnub.files.publishFileMessage('channel', message, cipherKey: CipherKey.fromUtf8('cipherKey')); expect(result, isA()); @@ -101,7 +96,7 @@ void main() { method: 'GET', ).then(status: 200, body: _publishFileMessageFailureResponse); var message = - FileMessage({'id': 'some', 'name': 'cat_file.jpg'}, message: 'msg'); + FileMessage(FileInfo('some', 'cat_file.jpg'), message: 'msg'); var result = await pubnub.files.publishFileMessage('channel', message); expect(result, isA()); expect(result.isError, equals(true)); @@ -119,43 +114,47 @@ void main() { expect(result, isA()); }); test('#SendFile', () async { - var keyset = Keyset(subscribeKey: 'test', publishKey: 'test'); - var pubnub = FakePubNub() - ..keysets.add(keyset, name: 'default', useAsDefault: true); when( path: _generateFileUploadUrl, method: 'POST', body: '{"name":"cat_file.jpg"}') .then(status: 200, body: _generateFileUploadUrlResponse); - when(path: 'https://pubnub-test-config.s3.amazonaws.com', method: 'POST') - .then(status: 200, statusCode: 204, body: ''); + + when( + path: 'https://pubnub-test-config.s3.amazonaws.com', + method: 'POST', + body: _sendFileResponse) + .then(status: 204, body: ''); + when( path: _publishFileMessageUrl2, method: 'GET', ).then(status: 200, body: _publishFileMessageSuccessResponse); + var result = await pubnub.files.sendFile( - 'channel', File('cat_file.jpg'), 'cat_file.jpg', + 'channel', 'cat_file.jpg', [0, 1, 2, 3], fileMessage: 'msg', keyset: keyset); expect(result, isA()); }); test('#SendFile #FileMessagePublish retry', () async { var keyset = Keyset(subscribeKey: 'test', publishKey: 'test'); - var pubnub = FakePubNub() - ..keysets.add(keyset, name: 'default', useAsDefault: true); when( path: _generateFileUploadUrl, method: 'POST', body: '{"name":"cat_file.jpg"}') .then(status: 200, body: _generateFileUploadUrlResponse); - when(path: 'https://pubnub-test-config.s3.amazonaws.com', method: 'POST') - .then(status: 200, statusCode: 204, body: ''); + when( + path: 'https://pubnub-test-config.s3.amazonaws.com', + method: 'POST', + body: _sendFileResponse) + .then(status: 204, body: ''); when( path: _publishFileMessageUrl2, method: 'GET', ).then(status: 400, body: _publishFileMessageFailureResponse); var result = await pubnub.files.sendFile( - 'channel', File('cat_file.jpg'), 'cat_file.jpg', + 'channel', 'cat_file.jpg', [0, 1, 2, 3], fileMessage: 'msg', keyset: keyset); expect(result, isA()); expect(result.fileInfo, isNotNull); @@ -163,9 +162,10 @@ void main() { test('#Download File', () async { when(path: _downloadFileUrl, method: 'GET') - .then(status: 200, data: [01, 02]); + .then(status: 200, body: [01, 02]); var result = await pubnub.files.downloadFile( 'channel', '5a3eb38c-483a-4b25-ac01-c4e20deba6d6', 'cat_file.jpg'); + expect(result, isA()); }); test('#getFileUrl', () async { diff --git a/test/dx/fixtures/file.dart b/test/dx/fixtures/files.dart similarity index 73% rename from test/dx/fixtures/file.dart rename to test/dx/fixtures/files.dart index a889e9c9..ed86fa13 100644 --- a/test/dx/fixtures/file.dart +++ b/test/dx/fixtures/files.dart @@ -1,50 +1,5 @@ part of '../file_test.dart'; -class FakeFileManager implements FileManager { - @override - FormData createFormData(Map form) { - return null; - } - - @override - MultipartFile createMultipartFile(List bytes, {String fileName}) { - return null; - } - - @override - List read(File file) { - return null; - } -} - -class FakeParser implements ParserModule { - @override - Future decode(String input) async { - return json.decode(input); - } - - @override - Future encode(dynamic input) async { - return json.encode(input); - } -} - -class FakePubNub extends Core { - List invocations = []; - - FileDx files; - - FakePubNub() - : super(networking: FakeNetworkingModule(), parser: FakeParser()) { - files = FileDx(this, FakeFileManager()); - } - - @override - void noSuchMethod(Invocation invocation) { - invocations.add(invocation); - } -} - final _listFilesSuccessResponse = '''{ "status": 200, "data": [ @@ -120,20 +75,35 @@ final _generateFileUploadUrlResponse = ''' } '''; -var _publishFileMessageUrl1 = +final _sendFileResponse = ''' +{ + "tagging": "ObjectTTL1000", + "key": "file-upload/5a3eb38c-483a-4b25-ac01-c4e20deba6d6/test_image.jpg", + "Content-Type": "binary/octet-stream", + "X-Amz-Credential": "xxx/20200403/us-west-2/s3/aws4_request", + "X-Amz-Security-Token": "lgnwegn2mg202j4g0g2mg04g02gj2", + "X-Amz-Algorithm": "AWS4-HMAC-SHA256", + "X-Amz-Date": "20200403T212950Z", + "Policy": "CnsgImV4cGlyYXRpb24iOiAiMjAyMC0wNC0wM1QyMToy...", + "X-Amz-Signature": "1fbaad6738c6cd4c7eec2afe4cb2553a1e9cd2be690fdc2ecdc6e26f60a3781a", + "file": [0, 1, 2, 3] +} +'''; + +final _publishFileMessageUrl1 = 'v1/files/publish-file/test/test/0/channel/0/%7B%22message%22:%22msg%22,%22file%22:%7B%22id%22:%22some%22,%22name%22:%22cat_file.jpg%22%7D%7D?pnsdk=PubNub-Dart%2F${PubNub.version}'; -var _publishFileMessageUrlEncryption = +final _publishFileMessageUrlEncryption = 'v1/files/publish-file/test/test/0/channel/0/%22X3LuZh36Z3vi4HFJSxdqD7XN%2FTsyUiPBmDfVaRipvaYs8wQE6OOloLTjGSTnZXIb0knFDIr8jPniWrnUYtdoTQ==%22?pnsdk=PubNub-Dart%2F${PubNub.version}'; -var _publishFileMessageUrl2 = +final _publishFileMessageUrl2 = 'v1/files/publish-file/test/test/0/channel/0/%7B%22message%22:%22msg%22,%22file%22:%7B%22id%22:%225a3eb38c-483a-4b25-ac01-c4e20deba6d6%22,%22name%22:%22cat_file.jpg%22,%22url%22:%22https:%2F%2Fps.pndsn.com%2Fv1%2Ffiles%2Ftest%2Fchannels%2Fchannel%2Ffiles%2F5a3eb38c-483a-4b25-ac01-c4e20deba6d6%2Fcat_file.jpg%3Fpnsdk=PubNub-Dart%252F1.4.2%22%7D%7D?pnsdk=PubNub-Dart%2F${PubNub.version}'; -var _generateFileUploadUrl = +final _generateFileUploadUrl = 'v1/files/test/channels/channel/generate-upload-url?pnsdk=PubNub-Dart%2F${PubNub.version}'; -var _downloadFileUrl = +final _downloadFileUrl = 'https://ps.pndsn.com/v1/files/test/channels/channel/files/5a3eb38c-483a-4b25-ac01-c4e20deba6d6/cat_file.jpg?pnsdk=PubNub-Dart%2F${PubNub.version}'; -var _getFileUrl = - 'https://ps.pndsn.com/v1/files/test/channels/channel/files/fileId/fileName?pnsdk=PubNub-Dart%2F${PubNub.version}'; +final _getFileUrl = + 'https://ps.pndsn.com/v1/files/test/channels/channel/files/fileId/fileName'; diff --git a/test/dx/fixtures/history.dart b/test/dx/fixtures/history.dart index 398add89..cf0449e6 100644 --- a/test/dx/fixtures/history.dart +++ b/test/dx/fixtures/history.dart @@ -4,7 +4,7 @@ final _batchFetchMessagesSuccessResponse = '''{ "error":false, "status":200, "error_message":"", - "channels":{"test-1":[{"message": 42, "timestamp": 1231231231231}], "test-2": [{"message": 10, "timestamp": 1231231231231}]} + "channels":{"test-1":[{"message": 42, "timetoken": "1231231231231"}], "test-2": [{"message": 10, "timetoken": "1231231231231"}]} }'''; final _batchCountMessagesSuccessResponse = '''{ @@ -22,7 +22,7 @@ final _batchFetchMessagesWithActionSuccessResponse = '''{ "demo-channel": [ { "message": "Hi", - "timetoken": 15610547826970040, + "timetoken": "15610547826970040", "actions": { "receipt": { "read": [ @@ -36,7 +36,7 @@ final _batchFetchMessagesWithActionSuccessResponse = '''{ }, { "message": "Hello", - "timetoken": 15610547826970000, + "timetoken": "15610547826970000", "actions": { "reaction": { "smiley_face": [ @@ -70,7 +70,7 @@ final _batchFetchMessagesWithActionsWithMore = '''{ "demo-channel": [ { "message": "Hi", - "timetoken": 15610547826970040, + "timetoken": "15610547826970040", "actions": { "receipt": { "read": [ @@ -84,7 +84,7 @@ final _batchFetchMessagesWithActionsWithMore = '''{ }, { "message": "Hello", - "timetoken": 15610547826970000, + "timetoken": "15610547826970000", "actions": { "reaction": { "smiley_face": [ diff --git a/test/dx/fixtures/pam.dart b/test/dx/fixtures/pam.dart index 8193096f..4be0edbb 100644 --- a/test/dx/fixtures/pam.dart +++ b/test/dx/fixtures/pam.dart @@ -1,9 +1,10 @@ part of '../pam_test.dart'; final _grantRequest = MockRequest('GET', - 'v2/auth/grant/sub-key/test?auth=authKey&channel=my_channel&ttl=1440×tamp=1234567890&m=0&r=1&w=0&pnsdk=PubNub-Dart%2F${PubNub.version}&signature=7IQCgpg73TUef0vywNJLvK27qrYKxKgvWUueR_Kej9U%3D'); + 'v2/auth/grant/sub-key/test?auth=authKey&channel=my_channel&ttl=1440×tamp=1234567890&m=0&r=1&w=0&pnsdk=PubNub-Dart%2F${PubNub.version}'); -final _grantSuccessResponse = MockResponse(200, {}, ''' +final _grantSuccessResponse = + MockResponse(statusCode: 200, headers: {}, body: ''' { "status": 200, "message": "Success", @@ -27,11 +28,12 @@ final _grantSuccessResponse = MockResponse(200, {}, ''' final _grantTokenRequest = MockRequest( 'POST', - 'v3/pam/test/grant?timestamp=1234567890&pnsdk=PubNub-Dart%2F${PubNub.version}&signature=v2.FL8sKKLo_xIlZnTV47foJdbUYUIWCtvYP4IqJzKVnKU', + 'v3/pam/test/grant?timestamp=1234567890&pnsdk=PubNub-Dart%2F${PubNub.version}', {}, '{"ttl":1440,"permissions":{"resources":{"channels":{"inbox-jay":3},"groups":{},"users":{},"spaces":{}},"patterns":{"channels":{},"groups":{},"users":{},"spaces":{}},"meta":{"user-id":"jay@example.com","contains-unicode":"The δΎ† test."}}}'); -final _grantTokenSuccessResponse = MockResponse(200, {}, '''{ +final _grantTokenSuccessResponse = + MockResponse(statusCode: 200, headers: {}, body: '''{ "status": 200, "data": { "message": "Success", @@ -40,7 +42,8 @@ final _grantTokenSuccessResponse = MockResponse(200, {}, '''{ "service": "Access Manager" }'''); -final _grantTokenFailureResponse = MockResponse(400, {}, ''' +final _grantTokenFailureResponse = + MockResponse(statusCode: 400, headers: {}, body: ''' { "status": 400, "error": { diff --git a/test/dx/history_test.dart b/test/dx/history_test.dart index 5daca097..4573aaee 100644 --- a/test/dx/history_test.dart +++ b/test/dx/history_test.dart @@ -93,6 +93,8 @@ void main() { isA>()); expect(result.channels['demo-channel'][0].actions['receipt']['read'], isA()); + expect(result.channels['demo-channel'][0].timetoken, + equals(Timetoken(15610547826970040))); }); test('.batch#fetchMessages with more messages', () async { diff --git a/test/dx/publish_test.dart b/test/dx/publish_test.dart index f4571c1c..620b351a 100644 --- a/test/dx/publish_test.dart +++ b/test/dx/publish_test.dart @@ -29,11 +29,10 @@ void main() { test('publish returns PublishResult with correct data', () async { when( - path: - 'publish/demo/demo/0/test/0?pnsdk=PubNub-Dart%2F${PubNub.version}', - method: 'POST', - body: '{"hello":"world"}', - ).then(status: 200, body: _publishSuccessResponse); + path: + 'publish/demo/demo/0/test/0/%7B%22hello%22:%22world%22%7D?pnsdk=PubNub-Dart%2F${PubNub.version}', + method: 'GET') + .then(status: 200, body: _publishSuccessResponse); var response = await pubnub.publish('test', {'hello': 'world'}); @@ -44,13 +43,37 @@ void main() { test('publish throws an exception when non-200 status code', () async { when( path: - 'publish/demo/demo/0/test/0?pnsdk=PubNub-Dart%2F${PubNub.version}', - method: 'POST', - body: '{"hello":"world"}', + 'publish/demo/demo/0/test/0/%7B%22hello%22:%22world%22%7D?pnsdk=PubNub-Dart%2F${PubNub.version}', + method: 'GET', ).then(status: 400, body: _publishFailureResponse); expect(pubnub.publish('test', {'hello': 'world'}), throwsA(TypeMatcher())); }); + + test('#publish with meta', () async { + when( + path: + 'publish/demo/demo/0/test/0/%7B%22hello%22:%22world%22%7D?meta=%7B%22hello%22%3A%22world%22%7D&pnsdk=PubNub-Dart%2F${PubNub.version}', + method: 'GET', + ).then(status: 200, body: _publishSuccessResponse); + var response = await pubnub + .publish('test', {'hello': 'world'}, meta: {'hello': 'world'}); + expect(response.description, equals('Sent')); + }); + + test('#publish with string meta', () async { + when( + path: + 'publish/demo/demo/0/test/0/%7B%22hello%22:%22world%22%7D?pnsdk=PubNub-Dart%2F${PubNub.version}', + method: 'GET', + ).then(status: 200, body: _publishSuccessResponse); + + var response = + await pubnub.publish('test', {'hello': 'world'}, meta: 'meta_sample'); + + expect(response.isError, equals(false)); + expect(response.description, equals('Sent')); + }); }); } diff --git a/test/dx/subscribe_test.dart b/test/dx/subscribe_test.dart index bb299eba..bbd93479 100644 --- a/test/dx/subscribe_test.dart +++ b/test/dx/subscribe_test.dart @@ -1,7 +1,5 @@ import 'package:test/test.dart'; void main() { - group('DX [subscribe]', () { - // TODO: write subscribe tests - }); + group('DX [subscribe]', () {}); } diff --git a/test/logging/logging_test.dart b/test/logging/logging_test.dart index c790749b..475ee4e2 100644 --- a/test/logging/logging_test.dart +++ b/test/logging/logging_test.dart @@ -21,7 +21,7 @@ void main() { ['test1', 'test1.1', 'test1.2', 'test2', 'test2.1', 'test2.2']), ); - await provideLogger(logger, () { + await provideLogger(logger, () async { var logger = injectLogger('test'); var logger1 = injectLogger('test.logger1'); var logger2 = injectLogger('test.logger2'); @@ -44,7 +44,7 @@ void main() { emitsInOrder(['test1.2', 'test2.2']), ); - await provideLogger(logger, () { + await provideLogger(logger, () async { var logger = injectLogger('test'); var logger1 = injectLogger('test.logger1'); var logger2 = injectLogger('test.logger2'); diff --git a/test/net/fake_net.dart b/test/net/fake_net.dart index 46637a89..ac39dd83 100644 --- a/test/net/fake_net.dart +++ b/test/net/fake_net.dart @@ -1,74 +1,78 @@ import 'dart:async'; +import 'dart:convert'; import 'package:pubnub/src/core/core.dart'; -import 'package:pubnub/src/core/net/net.dart'; -import 'package:pubnub/src/net/exceptions.dart'; class MockException extends PubNubException { MockException(String message) : super(message); } -class FakeRequestHandler extends RequestHandler { +class FakeRequestHandler extends IRequestHandler { Request request; + Mock mock; - final Completer _contents = Completer(); + FakeRequestHandler(this.mock); + + @override + Future response(Request request) { + var actualUri = prepareUri(request.uri); + var expectedUri = prepareUri(Uri.parse(mock.request.path)); - FakeRequestHandler(this.request, Mock mock) { - var uri = Uri( - pathSegments: request.pathSegments, - queryParameters: request.queryParameters); - if (request.url != null) uri = request.url; var doesMethodMatch = mock.request.method.toUpperCase() == request.type.method.toUpperCase(); - var doesBodyMatch = mock.request.body == request.body; + String body; + if (request.body is String) { + body = request.body; + } else if (request.body == null) { + body = null; + } else { + body = json.encode(request.body); + } + + String mockBody; - var doesUriMatch = mock.request.path == uri.toString(); + if (request.body is String) { + mockBody = mock.request.body; + } else if (request.body == null) { + mockBody = null; + } else { + mockBody = json.encode(json.decode(mock.request.body)); + } + + var doesBodyMatch = mockBody == body; - Future.microtask(() { + var doesUriMatch = expectedUri.toString() == actualUri.toString(); + + return Future.microtask(() { if (doesMethodMatch && doesBodyMatch && doesUriMatch) { - if (mock.response.status != 200) { - _contents - .completeError(PubNubRequestFailureException(mock.response.body)); + if (![200, 204].contains(mock.response.statusCode)) { + throw PubNubRequestFailureException(mock.response); } else { - _contents.complete(mock.response); + return mock.response; } } else { var exceptionBody = ''; + if (!doesMethodMatch) { exceptionBody += - '\n* method:\n| EXPECTED: ${mock.request.method.toUpperCase()}\n| ACTUAL: ${request.type.method.toUpperCase()}'; + '\n* method:\n| EXPECTED: ${mock.request.method.toUpperCase()}\n| ACTUAL: ${request.type.method.toUpperCase()}'; } if (!doesUriMatch) { exceptionBody += - '\n* uri:\n| EXPECTED: ${mock.request.path}\n| ACTUAL: ${uri.toString()}'; + '\n* uri:\n| EXPECTED: ${expectedUri}\n| ACTUAL: ${actualUri}'; } if (!doesBodyMatch) { exceptionBody += - '\n* body:\n| EXPECTED:\n${mock.request.body}\n| ACTUAL:\n${request.body}'; + '\n* body:\n| EXPECTED:\n${mockBody}\n| ACTUAL:\n${body}'; } - _contents.completeError(MockException( - 'mock request does not match the expected request$exceptionBody')); + throw MockException( + 'mock request does not match the expected request $exceptionBody'); } }); } - @override - Future response() { - return _contents.future; - } - - @override - Future text() async { - return (await _contents.future).body; - } - - @override - Future>> headers() async { - return (await _contents.future).headers; - } - @override void cancel([dynamic reason]) {} } @@ -76,22 +80,29 @@ class FakeRequestHandler extends RequestHandler { class MockRequest { final String method; final String path; - final String body; + final dynamic body; final Map> headers; const MockRequest(this.method, this.path, [this.headers = const {}, this.body]); } -class MockResponse { - final int status; - final String body; - final int statusCode; - final dynamic data; +class MockResponse implements IResponse { + final dynamic body; + + @override final Map> headers; - const MockResponse(this.status, - [this.headers = const {}, this.body, this.statusCode, this.data]); + @override + final int statusCode; + + const MockResponse({this.body, this.headers = const {}, this.statusCode}); + + @override + List get byteList => body; + + @override + String get text => body; } class Mock { @@ -108,14 +119,14 @@ class MockBuilder { MockBuilder(this._queue, this._request); void then( - {int status, - Map> headers, - String body, - int statusCode, - dynamic data, + {Map> headers, + dynamic body, + int status, MockResponse response}) { - var mock = Mock(_request, - response ?? MockResponse(status, headers, body, statusCode, data)); + var mock = Mock( + _request, + response ?? + MockResponse(statusCode: status, body: body, headers: headers)); _queue.add(mock); } @@ -127,32 +138,26 @@ MockBuilder when( {String method, String path, Map> headers, - String body, + dynamic body, MockRequest request}) { return MockBuilder( _queue, request ?? MockRequest(method, path, headers, body)); } -class FakeNetworkingModule implements NetworkModule { +class FakeNetworkingModule implements INetworkingModule { FakeNetworkingModule() { _queue.clear(); } @override - Future handle(Request request) async { + Future handler() async { if (_queue.isEmpty) { throw MockException('set up the mock first'); } - return FakeRequestHandler(request, _queue.removeAt(0)); + return FakeRequestHandler(_queue.removeAt(0)); } @override - Future handleCustomRequest(Request request) async { - if (_queue.isEmpty) { - throw MockException('set up the mock first'); - } - - return FakeRequestHandler(request, _queue.removeAt(0)); - } + void register(Core core) {} } diff --git a/test/parser/parser_test.dart b/test/parser/parser_test.dart index 0f08760a..64bbd5cf 100644 --- a/test/parser/parser_test.dart +++ b/test/parser/parser_test.dart @@ -1,13 +1,13 @@ -import 'package:pubnub/src/core/parse.dart'; +import 'package:pubnub/src/core/parser.dart'; import 'package:pubnub/src/parser/parser.dart'; import 'package:test/test.dart'; void main() { - PubNubParserModule parser; + ParserModule parser; group('Parser [PubNubParserModule]', () { setUp(() { - parser = PubNubParserModule(); + parser = ParserModule(); }); group('#decode', () {