From 60eb8da87460a5260c8fb3ca987bb22861373f1c Mon Sep 17 00:00:00 2001 From: Mohammad Derakhshani Date: Wed, 24 Oct 2018 12:04:13 -0700 Subject: [PATCH 1/3] updated * upgraded the SDK to the latest version * switching to withMasterKeyOrResourceToken as withMasterKey is deprecated * added scheduler in the sample to show blocking work should be done on a non netty thread --- README.md | 2 +- azure-cosmosdb-get-started/pom.xml | 2 +- .../microsoft/azure/cosmosdb/sample/Main.java | 217 ++++++++++-------- 3 files changed, 128 insertions(+), 93 deletions(-) diff --git a/README.md b/README.md index 192474d..1e72fe7 100644 --- a/README.md +++ b/README.md @@ -32,7 +32,7 @@ git clone https://github.com/Azure-Samples/azure-cosmos-db-sql-api-async-java-ge ```bash cd azure-cosmos-db-sql-api-async-java-getting-started cd azure-cosmosdb-get-started -mvn package +mvn clean package ``` * From a command prompt or shell, run the following command to run the application. diff --git a/azure-cosmosdb-get-started/pom.xml b/azure-cosmosdb-get-started/pom.xml index 028fb94..27badec 100644 --- a/azure-cosmosdb-get-started/pom.xml +++ b/azure-cosmosdb-get-started/pom.xml @@ -43,7 +43,7 @@ com.microsoft.azure azure-cosmosdb - 1.0.0 + 2.2.0 org.slf4j diff --git a/azure-cosmosdb-get-started/src/main/java/com/microsoft/azure/cosmosdb/sample/Main.java b/azure-cosmosdb-get-started/src/main/java/com/microsoft/azure/cosmosdb/sample/Main.java index c546dd1..c79c06b 100644 --- a/azure-cosmosdb-get-started/src/main/java/com/microsoft/azure/cosmosdb/sample/Main.java +++ b/azure-cosmosdb-get-started/src/main/java/com/microsoft/azure/cosmosdb/sample/Main.java @@ -1,4 +1,4 @@ -/** +/* * The MIT License (MIT) * Copyright (c) 2018 Microsoft Corporation * @@ -23,11 +23,6 @@ package com.microsoft.azure.cosmosdb.sample; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CountDownLatch; - import com.microsoft.azure.cosmosdb.ConnectionPolicy; import com.microsoft.azure.cosmosdb.ConsistencyLevel; import com.microsoft.azure.cosmosdb.Database; @@ -42,34 +37,72 @@ import com.microsoft.azure.cosmosdb.SqlParameterCollection; import com.microsoft.azure.cosmosdb.SqlQuerySpec; import com.microsoft.azure.cosmosdb.rx.AsyncDocumentClient; - import rx.Observable; +import rx.Scheduler; +import rx.schedulers.Schedulers; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; public class Main { + private final ExecutorService executorService; + private final Scheduler scheduler; private AsyncDocumentClient client; - private String databaseName = "AzureSampleFamilyDB"; - private String collectionName = "FamilyCollection"; + private final String databaseName = "AzureSampleFamilyDB"; + private final String collectionName = "FamilyCollection"; + + public Main() { + executorService = Executors.newFixedThreadPool(100); + // The SDK uses netty library for doing async IO operations, and the operations are performed the netty io threads. + // The number of IO netty threads are limited; it is the same as the number of CPU cores. + + // The app should avoid doing anything which takes a lot of time from IO netty thread. + // If the app consumes too much of IO netty thread you may face: + // * low throughput + // * bad latency + // * ReadTimeoutException because there is no netty IO thread available to read data from network. + // * deadlock + + // The app code will receive the data from Azure Cosmos DB on the netty IO thread. + // The app should ensure the user's blocking call or computationally/IO heavy work after receiving data + // from Azure Cosmos DB is performed on a custom thread managed by the user (not the SDK netty IO threads). + // + // If you are doing blocking calls or heavy work, provide your own scheduler to switch thread. + // for example you can do this: + // client.createDocument(.).observeOn(userCustomScheduler).toBlocking().single(); + + // the following scheduler is used for switching from netty thread to user app thread. + scheduler = Schedulers.from(executorService); + } + + public void close() { + executorService.shutdown(); + client.close(); + } /** * Run a Hello DocumentDB console application. - * - * @param args + * + * @param args command line args. */ public static void main(String[] args) { Main p = new Main(); - try { + try { p.getStartedDemo(); System.out.println(String.format("Demo complete, please hold while resources are deleted")); } catch (Exception e) { - System.out.println(String.format("DocumentDB GetStarted failed with %s", e)); + System.err.println(String.format("DocumentDB GetStarted failed with %s", e)); } finally { System.out.println("close the client"); - p.client.close(); - System.exit(0); + p.close(); } } @@ -78,13 +111,13 @@ private void getStartedDemo() throws Exception { client = new AsyncDocumentClient.Builder() .withServiceEndpoint(AccountSettings.HOST) - .withMasterKey(AccountSettings.MASTER_KEY) + .withMasterKeyOrResourceToken(AccountSettings.MASTER_KEY) .withConnectionPolicy(ConnectionPolicy.GetDefault()) .withConsistencyLevel(ConsistencyLevel.Session) .build(); - this.createDatabaseIfNotExists(); - this.createDocumentCollectionIfNotExists(); + createDatabaseIfNotExists(); + createDocumentCollectionIfNotExists(); Family andersenFamily = Families.getAndersenFamilyDocument(); Family wakefieldFamily = Families.getWakefieldFamilyDocument(); @@ -126,42 +159,44 @@ private void createDatabaseIfNotExists() throws Exception { String databaseLink = String.format("/dbs/%s", databaseName); - Observable> databaseReadObs = + Observable> databaseReadObs = client.readDatabase(databaseLink, null); - Observable> databaseExistenceObs = + Observable> databaseExistenceObs = databaseReadObs - .doOnNext(x -> { - System.out.println("database " + databaseName + " already exists."); - }) - .onErrorResumeNext( - e -> { - // if the database doesn't already exists - // readDatabase() will result in 404 error - if (e instanceof DocumentClientException) { - DocumentClientException de = (DocumentClientException) e; - // if database - if (de.getStatusCode() == 404) { - // if the database doesn't exist, create it. - System.out.println("database " + databaseName + " doesn't existed," - + " creating it..."); - - Database dbDefinition = new Database(); - dbDefinition.setId(databaseName); - - return client.createDatabase(dbDefinition, null); - } - } - - // some unexpected failure in reading database happened. - // pass the error up. - System.err.println("Reading database " + databaseName + " failed."); - return Observable.error(e); - }); - - - // wait for completion - databaseExistenceObs.toCompletable().await(); + .doOnNext(x -> { + System.out.println("database " + databaseName + " already exists."); + }) + .onErrorResumeNext( + e -> { + // if the database doesn't already exists + // readDatabase() will result in 404 error + if (e instanceof DocumentClientException) { + DocumentClientException de = (DocumentClientException) e; + // if database + if (de.getStatusCode() == 404) { + // if the database doesn't exist, create it. + System.out.println("database " + databaseName + " doesn't existed," + + " creating it..."); + + Database dbDefinition = new Database(); + dbDefinition.setId(databaseName); + + return client.createDatabase(dbDefinition, null); + } + } + + // some unexpected failure in reading database happened. + // pass the error up. + System.err.println("Reading database " + databaseName + " failed."); + return Observable.error(e); + }); + + + // wait for completion, + // as waiting for completion is a blocking call try to + // provide your own scheduler to avoid stealing netty io threads. + databaseExistenceObs.toCompletable().observeOn(scheduler).await(); System.out.println("Checking database " + databaseName + " completed!\n"); } @@ -176,60 +211,60 @@ private void createDocumentCollectionIfNotExists() throws Exception { String databaseLink = String.format("/dbs/%s", databaseName); - client.queryCollections(databaseLink, - new SqlQuerySpec("SELECT * FROM r where r.id = @id", + client.queryCollections(databaseLink, + new SqlQuerySpec("SELECT * FROM r where r.id = @id", new SqlParameterCollection( new SqlParameter("@id", collectionName))), null) - .single() // we know there is only single page of result (empty or with a match) - .flatMap(page -> { - if (page.getResults().isEmpty()) { - // if there is no matching collection create the collection. - DocumentCollection collection = new DocumentCollection(); - collection.setId(collectionName); - System.out.println("Creating collection " + collectionName); - - return client.createCollection(databaseLink, collection, null); - } else { - // collection already exists, nothing else to be done. - System.out.println("Collection " + collectionName + "already exists"); - return Observable.empty(); - } - }).toCompletable().await(); + .single() // we know there is only single page of result (empty or with a match) + .flatMap(page -> { + if (page.getResults().isEmpty()) { + // if there is no matching collection create the collection. + DocumentCollection collection = new DocumentCollection(); + collection.setId(collectionName); + System.out.println("Creating collection " + collectionName); + + return client.createCollection(databaseLink, collection, null); + } else { + // collection already exists, nothing else to be done. + System.out.println("Collection " + collectionName + "already exists"); + return Observable.empty(); + } + }).toCompletable().observeOn(scheduler).await(); System.out.println("Checking collection " + collectionName + " completed!\n"); } - private void createFamiliesAsyncAndRegisterListener(List families, CountDownLatch completionLatch) throws Exception { + private void createFamiliesAsyncAndRegisterListener(List families, CountDownLatch completionLatch) { String collectionLink = String.format("/dbs/%s/colls/%s", databaseName, collectionName); List>> createDocumentsOBs = new ArrayList<>(); - for(Family family: families) { + for (Family family : families) { Observable> obs = client.createDocument( collectionLink, family, new RequestOptions(), true); createDocumentsOBs.add(obs); } Observable.merge(createDocumentsOBs) - .map(ResourceResponse::getRequestCharge) - .reduce((sum, value) -> sum + value) - .subscribe( - totalRequestCharge -> { - // this will get print out when completed - System.out.println("total charge for creating documents is " - + totalRequestCharge); - }, - - // terminal error signal - e -> { - e.printStackTrace(); - completionLatch.countDown(); - }, + .map(ResourceResponse::getRequestCharge) + .reduce((sum, value) -> sum + value) + .subscribe( + totalRequestCharge -> { + // this will get print out when completed + System.out.println("total charge for creating documents is " + + totalRequestCharge); + }, + + // terminal error signal + e -> { + e.printStackTrace(); + completionLatch.countDown(); + }, - // terminal completion signal - () -> { - completionLatch.countDown(); - }); + // terminal completion signal + () -> { + completionLatch.countDown(); + }); } private void createFamiliesAndWaitForCompletion(List families) throws Exception { @@ -237,7 +272,7 @@ private void createFamiliesAndWaitForCompletion(List families) throws Ex String collectionLink = String.format("/dbs/%s/colls/%s", databaseName, collectionName); List>> createDocumentsOBs = new ArrayList<>(); - for(Family family: families) { + for (Family family : families) { Observable> obs = client.createDocument( collectionLink, family, new RequestOptions(), true); createDocumentsOBs.add(obs); @@ -245,7 +280,7 @@ private void createFamiliesAndWaitForCompletion(List families) throws Ex Double totalRequestCharge = Observable.merge(createDocumentsOBs) .map(ResourceResponse::getRequestCharge) - .reduce((sum, value) -> sum+value) + .reduce((sum, value) -> sum + value) .toBlocking().single(); writeToConsoleAndPromptToContinue(String.format("Created %d documents with total request charge of %.2f", @@ -260,7 +295,7 @@ private void executeSimpleQueryAsyncAndRegisterListenerForResult(CountDownLatch queryOptions.setEnableCrossPartitionQuery(true); String collectionLink = String.format("/dbs/%s/colls/%s", databaseName, collectionName); - Observable> queryObservable = + Observable> queryObservable = client.queryDocuments(collectionLink, "SELECT * FROM Family WHERE Family.lastName = 'Andersen'", queryOptions); From f528d53d34824df92620984dcd0e4b7513b54a33 Mon Sep 17 00:00:00 2001 From: Mohammad Derakhshani Date: Wed, 24 Oct 2018 15:06:31 -0700 Subject: [PATCH 2/3] use sheculer when receiving data on the netty io thread --- .../microsoft/azure/cosmosdb/sample/Main.java | 74 ++++++++++++------- 1 file changed, 48 insertions(+), 26 deletions(-) diff --git a/azure-cosmosdb-get-started/src/main/java/com/microsoft/azure/cosmosdb/sample/Main.java b/azure-cosmosdb-get-started/src/main/java/com/microsoft/azure/cosmosdb/sample/Main.java index c79c06b..ac332fd 100644 --- a/azure-cosmosdb-get-started/src/main/java/com/microsoft/azure/cosmosdb/sample/Main.java +++ b/azure-cosmosdb-get-started/src/main/java/com/microsoft/azure/cosmosdb/sample/Main.java @@ -47,6 +47,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; public class Main { private final ExecutorService executorService; @@ -59,7 +60,7 @@ public class Main { public Main() { executorService = Executors.newFixedThreadPool(100); - // The SDK uses netty library for doing async IO operations, and the operations are performed the netty io threads. + // The SDK uses netty library for doing async IO operations. The IO operations are performed on the netty io threads. // The number of IO netty threads are limited; it is the same as the number of CPU cores. // The app should avoid doing anything which takes a lot of time from IO netty thread. @@ -70,12 +71,11 @@ public Main() { // * deadlock // The app code will receive the data from Azure Cosmos DB on the netty IO thread. - // The app should ensure the user's blocking call or computationally/IO heavy work after receiving data - // from Azure Cosmos DB is performed on a custom thread managed by the user (not the SDK netty IO threads). + // The app should ensure the user's computationally/IO heavy work after receiving data + // from Azure Cosmos DB is performed on a custom thread managed by the user (not on the SDK netty IO thread). // - // If you are doing blocking calls or heavy work, provide your own scheduler to switch thread. - // for example you can do this: - // client.createDocument(.).observeOn(userCustomScheduler).toBlocking().single(); + // If you are doing heavy work after receiving the result from the SDK, + // you should provide your own scheduler to switch thread. // the following scheduler is used for switching from netty thread to user app thread. scheduler = Schedulers.from(executorService); @@ -92,7 +92,6 @@ public void close() { * @param args command line args. */ public static void main(String[] args) { - Main p = new Main(); try { @@ -196,7 +195,7 @@ private void createDatabaseIfNotExists() throws Exception { // wait for completion, // as waiting for completion is a blocking call try to // provide your own scheduler to avoid stealing netty io threads. - databaseExistenceObs.toCompletable().observeOn(scheduler).await(); + databaseExistenceObs.toCompletable().await(); System.out.println("Checking database " + databaseName + " completed!\n"); } @@ -229,7 +228,7 @@ private void createDocumentCollectionIfNotExists() throws Exception { System.out.println("Collection " + collectionName + "already exists"); return Observable.empty(); } - }).toCompletable().observeOn(scheduler).await(); + }).toCompletable().await(); System.out.println("Checking collection " + collectionName + " completed!\n"); } @@ -268,7 +267,6 @@ private void createFamiliesAsyncAndRegisterListener(List families, Count } private void createFamiliesAndWaitForCompletion(List families) throws Exception { - String collectionLink = String.format("/dbs/%s/colls/%s", databaseName, collectionName); List>> createDocumentsOBs = new ArrayList<>(); @@ -280,6 +278,13 @@ private void createFamiliesAndWaitForCompletion(List families) throws Ex Double totalRequestCharge = Observable.merge(createDocumentsOBs) .map(ResourceResponse::getRequestCharge) + .observeOn(scheduler) // the scheduler will be used for the following work + .map(charge -> { + // as we don't want to run heavyWork() on netty IO thread, we provide the custom scheduler + // for switching from netty IO thread to user thread. + heavyWork(); + return charge; + }) .reduce((sum, value) -> sum + value) .toBlocking().single(); @@ -288,6 +293,17 @@ private void createFamiliesAndWaitForCompletion(List families) throws Ex totalRequestCharge)); } + private void heavyWork() { + // I may do a lot of IO work: e.g., writing to log files + // a lot of computational work + // or may do Thread.sleep() + + try { + TimeUnit.SECONDS.sleep(2); + } catch (Exception e) { + } + } + private void executeSimpleQueryAsyncAndRegisterListenerForResult(CountDownLatch completionLatch) { // Set some common query options FeedOptions queryOptions = new FeedOptions(); @@ -299,22 +315,28 @@ private void executeSimpleQueryAsyncAndRegisterListenerForResult(CountDownLatch client.queryDocuments(collectionLink, "SELECT * FROM Family WHERE Family.lastName = 'Andersen'", queryOptions); - queryObservable.subscribe( - queryResultPage -> { - System.out.println("Got a page of query result with " + - queryResultPage.getResults().size() + " document(s)" - + " and request charge of " + queryResultPage.getRequestCharge()); - }, - // terminal error signal - e -> { - e.printStackTrace(); - completionLatch.countDown(); - }, - - // terminal completion signal - () -> { - completionLatch.countDown(); - }); + queryObservable + .observeOn(scheduler) + .subscribe( + queryResultPage -> { + // we want to make sure heavyWork() doesn't block any of netty IO threads + // so we use observeOn(scheduler) to switch from the netty thread to user's thread. + heavyWork(); + + System.out.println("Got a page of query result with " + + queryResultPage.getResults().size() + " document(s)" + + " and request charge of " + queryResultPage.getRequestCharge()); + }, + // terminal error signal + e -> { + e.printStackTrace(); + completionLatch.countDown(); + }, + + // terminal completion signal + () -> { + completionLatch.countDown(); + }); } private void writeToConsoleAndPromptToContinue(String text) throws IOException { From 2ae5719f4d3e1d2893016b74749fca0b91d561f2 Mon Sep 17 00:00:00 2001 From: Mohammad Derakhshani Date: Wed, 24 Oct 2018 20:07:20 -0700 Subject: [PATCH 3/3] minor cleanup, reformattig, pretty printing etc --- azure-cosmosdb-get-started/pom.xml | 2 +- .../azure/cosmosdb/sample/Families.java | 8 ++++---- .../microsoft/azure/cosmosdb/sample/Main.java | 20 ++++++++++++------- 3 files changed, 18 insertions(+), 12 deletions(-) diff --git a/azure-cosmosdb-get-started/pom.xml b/azure-cosmosdb-get-started/pom.xml index 27badec..f54bce5 100644 --- a/azure-cosmosdb-get-started/pom.xml +++ b/azure-cosmosdb-get-started/pom.xml @@ -20,7 +20,7 @@ org.codehaus.mojo exec-maven-plugin - 1.2.1 + 1.6.0 com.microsoft.azure.cosmosdb.sample.Main diff --git a/azure-cosmosdb-get-started/src/main/java/com/microsoft/azure/cosmosdb/sample/Families.java b/azure-cosmosdb-get-started/src/main/java/com/microsoft/azure/cosmosdb/sample/Families.java index aabb798..ff0ced1 100644 --- a/azure-cosmosdb-get-started/src/main/java/com/microsoft/azure/cosmosdb/sample/Families.java +++ b/azure-cosmosdb-get-started/src/main/java/com/microsoft/azure/cosmosdb/sample/Families.java @@ -26,7 +26,7 @@ public class Families { public static Family getAndersenFamilyDocument() { Family andersenFamily = new Family(); - andersenFamily.setId("Andersen" + System.currentTimeMillis()); + andersenFamily.setId("Andersen-" + System.currentTimeMillis()); andersenFamily.setLastName("Andersen"); Parent parent1 = new Parent(); @@ -61,7 +61,7 @@ public static Family getAndersenFamilyDocument() { public static Family getWakefieldFamilyDocument() { Family wakefieldFamily = new Family(); - wakefieldFamily.setId("Wakefield" + System.currentTimeMillis()); + wakefieldFamily.setId("Wakefield-" + System.currentTimeMillis()); wakefieldFamily.setLastName("Wakefield"); Parent parent1 = new Parent(); @@ -108,7 +108,7 @@ public static Family getWakefieldFamilyDocument() { public static Family getJohnsonFamilyDocument() { Family andersenFamily = new Family(); - andersenFamily.setId("Johnson" + System.currentTimeMillis()); + andersenFamily.setId("Johnson-" + System.currentTimeMillis()); andersenFamily.setLastName("Johnson"); Parent parent1 = new Parent(); @@ -122,7 +122,7 @@ public static Family getJohnsonFamilyDocument() { public static Family getSmithFamilyDocument() { Family andersenFamily = new Family(); - andersenFamily.setId("Smith" + System.currentTimeMillis()); + andersenFamily.setId("Smith-" + System.currentTimeMillis()); andersenFamily.setLastName("Smith"); Parent parent1 = new Parent(); diff --git a/azure-cosmosdb-get-started/src/main/java/com/microsoft/azure/cosmosdb/sample/Main.java b/azure-cosmosdb-get-started/src/main/java/com/microsoft/azure/cosmosdb/sample/Main.java index ac332fd..bb6bf74 100644 --- a/azure-cosmosdb-get-started/src/main/java/com/microsoft/azure/cosmosdb/sample/Main.java +++ b/azure-cosmosdb-get-started/src/main/java/com/microsoft/azure/cosmosdb/sample/Main.java @@ -48,6 +48,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; public class Main { private final ExecutorService executorService; @@ -96,13 +97,14 @@ public static void main(String[] args) { try { p.getStartedDemo(); - System.out.println(String.format("Demo complete, please hold while resources are deleted")); + System.out.println(String.format("Demo complete, please hold while resources are released")); } catch (Exception e) { System.err.println(String.format("DocumentDB GetStarted failed with %s", e)); } finally { System.out.println("close the client"); p.close(); } + System.exit(0); } private void getStartedDemo() throws Exception { @@ -112,7 +114,7 @@ private void getStartedDemo() throws Exception { .withServiceEndpoint(AccountSettings.HOST) .withMasterKeyOrResourceToken(AccountSettings.MASTER_KEY) .withConnectionPolicy(ConnectionPolicy.GetDefault()) - .withConsistencyLevel(ConsistencyLevel.Session) + .withConsistencyLevel(ConsistencyLevel.Eventual) .build(); createDatabaseIfNotExists(); @@ -307,25 +309,29 @@ private void heavyWork() { private void executeSimpleQueryAsyncAndRegisterListenerForResult(CountDownLatch completionLatch) { // Set some common query options FeedOptions queryOptions = new FeedOptions(); - queryOptions.setMaxItemCount(100); + queryOptions.setMaxItemCount(10); queryOptions.setEnableCrossPartitionQuery(true); String collectionLink = String.format("/dbs/%s/colls/%s", databaseName, collectionName); Observable> queryObservable = client.queryDocuments(collectionLink, - "SELECT * FROM Family WHERE Family.lastName = 'Andersen'", queryOptions); + "SELECT * FROM Family WHERE Family.lastName != 'Andersen'", queryOptions); queryObservable .observeOn(scheduler) .subscribe( - queryResultPage -> { + page -> { // we want to make sure heavyWork() doesn't block any of netty IO threads // so we use observeOn(scheduler) to switch from the netty thread to user's thread. heavyWork(); System.out.println("Got a page of query result with " + - queryResultPage.getResults().size() + " document(s)" - + " and request charge of " + queryResultPage.getRequestCharge()); + page.getResults().size() + " document(s)" + + " and request charge of " + page.getRequestCharge()); + + + System.out.println("Document Ids " + page.getResults().stream().map(d -> d.getId()) + .collect(Collectors.toList())); }, // terminal error signal e -> {