diff --git a/README.md b/README.md index 192474d..1e72fe7 100644 --- a/README.md +++ b/README.md @@ -32,7 +32,7 @@ git clone https://github.com/Azure-Samples/azure-cosmos-db-sql-api-async-java-ge ```bash cd azure-cosmos-db-sql-api-async-java-getting-started cd azure-cosmosdb-get-started -mvn package +mvn clean package ``` * From a command prompt or shell, run the following command to run the application. diff --git a/azure-cosmosdb-get-started/pom.xml b/azure-cosmosdb-get-started/pom.xml index 028fb94..f54bce5 100644 --- a/azure-cosmosdb-get-started/pom.xml +++ b/azure-cosmosdb-get-started/pom.xml @@ -20,7 +20,7 @@ org.codehaus.mojo exec-maven-plugin - 1.2.1 + 1.6.0 com.microsoft.azure.cosmosdb.sample.Main @@ -43,7 +43,7 @@ com.microsoft.azure azure-cosmosdb - 1.0.0 + 2.2.0 org.slf4j diff --git a/azure-cosmosdb-get-started/src/main/java/com/microsoft/azure/cosmosdb/sample/Families.java b/azure-cosmosdb-get-started/src/main/java/com/microsoft/azure/cosmosdb/sample/Families.java index aabb798..ff0ced1 100644 --- a/azure-cosmosdb-get-started/src/main/java/com/microsoft/azure/cosmosdb/sample/Families.java +++ b/azure-cosmosdb-get-started/src/main/java/com/microsoft/azure/cosmosdb/sample/Families.java @@ -26,7 +26,7 @@ public class Families { public static Family getAndersenFamilyDocument() { Family andersenFamily = new Family(); - andersenFamily.setId("Andersen" + System.currentTimeMillis()); + andersenFamily.setId("Andersen-" + System.currentTimeMillis()); andersenFamily.setLastName("Andersen"); Parent parent1 = new Parent(); @@ -61,7 +61,7 @@ public static Family getAndersenFamilyDocument() { public static Family getWakefieldFamilyDocument() { Family wakefieldFamily = new Family(); - wakefieldFamily.setId("Wakefield" + System.currentTimeMillis()); + wakefieldFamily.setId("Wakefield-" + System.currentTimeMillis()); wakefieldFamily.setLastName("Wakefield"); Parent parent1 = new Parent(); @@ -108,7 +108,7 @@ public static Family getWakefieldFamilyDocument() { public static Family getJohnsonFamilyDocument() { Family andersenFamily = new Family(); - andersenFamily.setId("Johnson" + System.currentTimeMillis()); + andersenFamily.setId("Johnson-" + System.currentTimeMillis()); andersenFamily.setLastName("Johnson"); Parent parent1 = new Parent(); @@ -122,7 +122,7 @@ public static Family getJohnsonFamilyDocument() { public static Family getSmithFamilyDocument() { Family andersenFamily = new Family(); - andersenFamily.setId("Smith" + System.currentTimeMillis()); + andersenFamily.setId("Smith-" + System.currentTimeMillis()); andersenFamily.setLastName("Smith"); Parent parent1 = new Parent(); diff --git a/azure-cosmosdb-get-started/src/main/java/com/microsoft/azure/cosmosdb/sample/Main.java b/azure-cosmosdb-get-started/src/main/java/com/microsoft/azure/cosmosdb/sample/Main.java index c546dd1..bb6bf74 100644 --- a/azure-cosmosdb-get-started/src/main/java/com/microsoft/azure/cosmosdb/sample/Main.java +++ b/azure-cosmosdb-get-started/src/main/java/com/microsoft/azure/cosmosdb/sample/Main.java @@ -1,4 +1,4 @@ -/** +/* * The MIT License (MIT) * Copyright (c) 2018 Microsoft Corporation * @@ -23,11 +23,6 @@ package com.microsoft.azure.cosmosdb.sample; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CountDownLatch; - import com.microsoft.azure.cosmosdb.ConnectionPolicy; import com.microsoft.azure.cosmosdb.ConsistencyLevel; import com.microsoft.azure.cosmosdb.Database; @@ -42,35 +37,74 @@ import com.microsoft.azure.cosmosdb.SqlParameterCollection; import com.microsoft.azure.cosmosdb.SqlQuerySpec; import com.microsoft.azure.cosmosdb.rx.AsyncDocumentClient; - import rx.Observable; +import rx.Scheduler; +import rx.schedulers.Schedulers; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; public class Main { + private final ExecutorService executorService; + private final Scheduler scheduler; private AsyncDocumentClient client; - private String databaseName = "AzureSampleFamilyDB"; - private String collectionName = "FamilyCollection"; + private final String databaseName = "AzureSampleFamilyDB"; + private final String collectionName = "FamilyCollection"; + + public Main() { + executorService = Executors.newFixedThreadPool(100); + // The SDK uses netty library for doing async IO operations. The IO operations are performed on the netty io threads. + // The number of IO netty threads are limited; it is the same as the number of CPU cores. + + // The app should avoid doing anything which takes a lot of time from IO netty thread. + // If the app consumes too much of IO netty thread you may face: + // * low throughput + // * bad latency + // * ReadTimeoutException because there is no netty IO thread available to read data from network. + // * deadlock + + // The app code will receive the data from Azure Cosmos DB on the netty IO thread. + // The app should ensure the user's computationally/IO heavy work after receiving data + // from Azure Cosmos DB is performed on a custom thread managed by the user (not on the SDK netty IO thread). + // + // If you are doing heavy work after receiving the result from the SDK, + // you should provide your own scheduler to switch thread. + + // the following scheduler is used for switching from netty thread to user app thread. + scheduler = Schedulers.from(executorService); + } + + public void close() { + executorService.shutdown(); + client.close(); + } /** * Run a Hello DocumentDB console application. - * - * @param args + * + * @param args command line args. */ public static void main(String[] args) { - Main p = new Main(); - try { + try { p.getStartedDemo(); - System.out.println(String.format("Demo complete, please hold while resources are deleted")); + System.out.println(String.format("Demo complete, please hold while resources are released")); } catch (Exception e) { - System.out.println(String.format("DocumentDB GetStarted failed with %s", e)); + System.err.println(String.format("DocumentDB GetStarted failed with %s", e)); } finally { System.out.println("close the client"); - p.client.close(); - System.exit(0); + p.close(); } + System.exit(0); } private void getStartedDemo() throws Exception { @@ -78,13 +112,13 @@ private void getStartedDemo() throws Exception { client = new AsyncDocumentClient.Builder() .withServiceEndpoint(AccountSettings.HOST) - .withMasterKey(AccountSettings.MASTER_KEY) + .withMasterKeyOrResourceToken(AccountSettings.MASTER_KEY) .withConnectionPolicy(ConnectionPolicy.GetDefault()) - .withConsistencyLevel(ConsistencyLevel.Session) + .withConsistencyLevel(ConsistencyLevel.Eventual) .build(); - this.createDatabaseIfNotExists(); - this.createDocumentCollectionIfNotExists(); + createDatabaseIfNotExists(); + createDocumentCollectionIfNotExists(); Family andersenFamily = Families.getAndersenFamilyDocument(); Family wakefieldFamily = Families.getWakefieldFamilyDocument(); @@ -126,41 +160,43 @@ private void createDatabaseIfNotExists() throws Exception { String databaseLink = String.format("/dbs/%s", databaseName); - Observable> databaseReadObs = + Observable> databaseReadObs = client.readDatabase(databaseLink, null); - Observable> databaseExistenceObs = + Observable> databaseExistenceObs = databaseReadObs - .doOnNext(x -> { - System.out.println("database " + databaseName + " already exists."); - }) - .onErrorResumeNext( - e -> { - // if the database doesn't already exists - // readDatabase() will result in 404 error - if (e instanceof DocumentClientException) { - DocumentClientException de = (DocumentClientException) e; - // if database - if (de.getStatusCode() == 404) { - // if the database doesn't exist, create it. - System.out.println("database " + databaseName + " doesn't existed," - + " creating it..."); - - Database dbDefinition = new Database(); - dbDefinition.setId(databaseName); - - return client.createDatabase(dbDefinition, null); - } - } - - // some unexpected failure in reading database happened. - // pass the error up. - System.err.println("Reading database " + databaseName + " failed."); - return Observable.error(e); - }); - - - // wait for completion + .doOnNext(x -> { + System.out.println("database " + databaseName + " already exists."); + }) + .onErrorResumeNext( + e -> { + // if the database doesn't already exists + // readDatabase() will result in 404 error + if (e instanceof DocumentClientException) { + DocumentClientException de = (DocumentClientException) e; + // if database + if (de.getStatusCode() == 404) { + // if the database doesn't exist, create it. + System.out.println("database " + databaseName + " doesn't existed," + + " creating it..."); + + Database dbDefinition = new Database(); + dbDefinition.setId(databaseName); + + return client.createDatabase(dbDefinition, null); + } + } + + // some unexpected failure in reading database happened. + // pass the error up. + System.err.println("Reading database " + databaseName + " failed."); + return Observable.error(e); + }); + + + // wait for completion, + // as waiting for completion is a blocking call try to + // provide your own scheduler to avoid stealing netty io threads. databaseExistenceObs.toCompletable().await(); System.out.println("Checking database " + databaseName + " completed!\n"); @@ -176,68 +212,67 @@ private void createDocumentCollectionIfNotExists() throws Exception { String databaseLink = String.format("/dbs/%s", databaseName); - client.queryCollections(databaseLink, - new SqlQuerySpec("SELECT * FROM r where r.id = @id", + client.queryCollections(databaseLink, + new SqlQuerySpec("SELECT * FROM r where r.id = @id", new SqlParameterCollection( new SqlParameter("@id", collectionName))), null) - .single() // we know there is only single page of result (empty or with a match) - .flatMap(page -> { - if (page.getResults().isEmpty()) { - // if there is no matching collection create the collection. - DocumentCollection collection = new DocumentCollection(); - collection.setId(collectionName); - System.out.println("Creating collection " + collectionName); - - return client.createCollection(databaseLink, collection, null); - } else { - // collection already exists, nothing else to be done. - System.out.println("Collection " + collectionName + "already exists"); - return Observable.empty(); - } - }).toCompletable().await(); + .single() // we know there is only single page of result (empty or with a match) + .flatMap(page -> { + if (page.getResults().isEmpty()) { + // if there is no matching collection create the collection. + DocumentCollection collection = new DocumentCollection(); + collection.setId(collectionName); + System.out.println("Creating collection " + collectionName); + + return client.createCollection(databaseLink, collection, null); + } else { + // collection already exists, nothing else to be done. + System.out.println("Collection " + collectionName + "already exists"); + return Observable.empty(); + } + }).toCompletable().await(); System.out.println("Checking collection " + collectionName + " completed!\n"); } - private void createFamiliesAsyncAndRegisterListener(List families, CountDownLatch completionLatch) throws Exception { + private void createFamiliesAsyncAndRegisterListener(List families, CountDownLatch completionLatch) { String collectionLink = String.format("/dbs/%s/colls/%s", databaseName, collectionName); List>> createDocumentsOBs = new ArrayList<>(); - for(Family family: families) { + for (Family family : families) { Observable> obs = client.createDocument( collectionLink, family, new RequestOptions(), true); createDocumentsOBs.add(obs); } Observable.merge(createDocumentsOBs) - .map(ResourceResponse::getRequestCharge) - .reduce((sum, value) -> sum + value) - .subscribe( - totalRequestCharge -> { - // this will get print out when completed - System.out.println("total charge for creating documents is " - + totalRequestCharge); - }, - - // terminal error signal - e -> { - e.printStackTrace(); - completionLatch.countDown(); - }, - - // terminal completion signal - () -> { - completionLatch.countDown(); - }); + .map(ResourceResponse::getRequestCharge) + .reduce((sum, value) -> sum + value) + .subscribe( + totalRequestCharge -> { + // this will get print out when completed + System.out.println("total charge for creating documents is " + + totalRequestCharge); + }, + + // terminal error signal + e -> { + e.printStackTrace(); + completionLatch.countDown(); + }, + + // terminal completion signal + () -> { + completionLatch.countDown(); + }); } private void createFamiliesAndWaitForCompletion(List families) throws Exception { - String collectionLink = String.format("/dbs/%s/colls/%s", databaseName, collectionName); List>> createDocumentsOBs = new ArrayList<>(); - for(Family family: families) { + for (Family family : families) { Observable> obs = client.createDocument( collectionLink, family, new RequestOptions(), true); createDocumentsOBs.add(obs); @@ -245,7 +280,14 @@ private void createFamiliesAndWaitForCompletion(List families) throws Ex Double totalRequestCharge = Observable.merge(createDocumentsOBs) .map(ResourceResponse::getRequestCharge) - .reduce((sum, value) -> sum+value) + .observeOn(scheduler) // the scheduler will be used for the following work + .map(charge -> { + // as we don't want to run heavyWork() on netty IO thread, we provide the custom scheduler + // for switching from netty IO thread to user thread. + heavyWork(); + return charge; + }) + .reduce((sum, value) -> sum + value) .toBlocking().single(); writeToConsoleAndPromptToContinue(String.format("Created %d documents with total request charge of %.2f", @@ -253,33 +295,54 @@ private void createFamiliesAndWaitForCompletion(List families) throws Ex totalRequestCharge)); } + private void heavyWork() { + // I may do a lot of IO work: e.g., writing to log files + // a lot of computational work + // or may do Thread.sleep() + + try { + TimeUnit.SECONDS.sleep(2); + } catch (Exception e) { + } + } + private void executeSimpleQueryAsyncAndRegisterListenerForResult(CountDownLatch completionLatch) { // Set some common query options FeedOptions queryOptions = new FeedOptions(); - queryOptions.setMaxItemCount(100); + queryOptions.setMaxItemCount(10); queryOptions.setEnableCrossPartitionQuery(true); String collectionLink = String.format("/dbs/%s/colls/%s", databaseName, collectionName); - Observable> queryObservable = + Observable> queryObservable = client.queryDocuments(collectionLink, - "SELECT * FROM Family WHERE Family.lastName = 'Andersen'", queryOptions); - - queryObservable.subscribe( - queryResultPage -> { - System.out.println("Got a page of query result with " + - queryResultPage.getResults().size() + " document(s)" - + " and request charge of " + queryResultPage.getRequestCharge()); - }, - // terminal error signal - e -> { - e.printStackTrace(); - completionLatch.countDown(); - }, - - // terminal completion signal - () -> { - completionLatch.countDown(); - }); + "SELECT * FROM Family WHERE Family.lastName != 'Andersen'", queryOptions); + + queryObservable + .observeOn(scheduler) + .subscribe( + page -> { + // we want to make sure heavyWork() doesn't block any of netty IO threads + // so we use observeOn(scheduler) to switch from the netty thread to user's thread. + heavyWork(); + + System.out.println("Got a page of query result with " + + page.getResults().size() + " document(s)" + + " and request charge of " + page.getRequestCharge()); + + + System.out.println("Document Ids " + page.getResults().stream().map(d -> d.getId()) + .collect(Collectors.toList())); + }, + // terminal error signal + e -> { + e.printStackTrace(); + completionLatch.countDown(); + }, + + // terminal completion signal + () -> { + completionLatch.countDown(); + }); } private void writeToConsoleAndPromptToContinue(String text) throws IOException {