diff --git a/README.md b/README.md index 0a59021..6bac658 100644 --- a/README.md +++ b/README.md @@ -1,10 +1,5 @@ # pulsar-client-plugin -Pulsar client plugin for auth0, aws, and etc. - -## auth0 integration -Integration of auth0 enables Pulsar client authenticated against [auth0](https://www.auth0.com) backend instead of the default Pulsar token. The authentication follows [the recommended M2M flow](https://auth0.com/blog/using-m2m-authorization/). - -Auth0 integration consists of the client side plugin and a broker auth plugin. The client plugin generates an auth0 JWT, which in turn can be authenticated and authorized by the broker side. The broker plugin has to be configured on Pulsar and is not part of this repo. Please contact [Kafkaesque](https://kafkaesque.io/contact/#) to enable the broker side plugin. +Pulsar client plugin for auth0 and AWS Cognito authentication. The Jar artifact is loaded on GitHub package registry. @@ -36,6 +31,11 @@ In pom.xml, ``` +## auth0 integration +Integration of auth0 enables Pulsar client authenticated against [auth0](https://www.auth0.com) backend instead of the default Pulsar token. The authentication follows [the recommended M2M flow](https://auth0.com/blog/using-m2m-authorization/). + +Auth0 integration consists of the client side plugin and a broker auth plugin. The client plugin generates an auth0 JWT, which in turn can be authenticated and authorized by the broker side. The broker plugin has to be configured on Pulsar and is not part of this repo. Please contact [Kafkaesque](https://kafkaesque.io/contact/#) to enable the broker side plugin. + Java Client example: ``` example.java String domain = "https://.auth0.com/oauth/token"; @@ -51,3 +51,28 @@ PulsarClient client = PulsarClient.builder() ) .build(); ``` + +### AWS Cognito integration +Integration of AWS Cognito enables Pulsar client authenticated against [AWS Cognito](https://aws.amazon.com/cognito/). The authentication flow requires creation of Cognito user pool and App client. The App client must allow `Client credential` OAuth flow, and specifies custome scopes for OAuth 2.0 grants. Here is [a good example](https://lobster1234.github.io/2018/05/31/server-to-server-auth-with-amazon-cognito/) explaining machine to machine authentication with Cognito. + +The client plugin enables client credential to exchange an access token following [the Cognito deverloper's guide](https://docs.aws.amazon.com/cognito/latest/developerguide/token-endpoint.html). Under the hood, we will use `client_credentials` as grant_type. Scope must be preconfigured under the a User Pool's resource server and enabled by checking off `App client`'s OAuth2 Allowed Custom Scopes. This can be done via AWS CLI or console. The scope name will be used for authorization. + +Resource server's identifier and client Id, that becomes `sub` in the Cognito JWT, can be optionally used for verification on the Pulsar broker side's authentication. + +Cognito integration consists of the client side plugin and a broker auth plugin. The client plugin generates an access token, which in turn can be authenticated and authorized by the broker side. The broker plugin has to be configured on Pulsar and is not part of this repo. Please contact [Kafkaesque](https://kafkaesque.io/contact/#) to enable the broker side plugin. + +Java Client example: +``` example.java +String domain = "https://.auth.us-east-2.amazoncognito.com/oauth2/token"; +String clientId = ""; +String clientSecret = ""; +String scope = "kafkaesque.io/ming.pulsar"; + +// Create client object +PulsarClient client = PulsarClient.builder() + .serviceUrl(SERVICE_URL) + .authentication( + AuthFactory.cognito(domain, clientId, clientSecret, scope) + ) + .build(); +``` diff --git a/java/src/main/java/io/kafkaesque/pulsar/client/auth/AuthFactory.java b/java/src/main/java/io/kafkaesque/pulsar/client/auth/AuthFactory.java index b0324ff..91f9520 100644 --- a/java/src/main/java/io/kafkaesque/pulsar/client/auth/AuthFactory.java +++ b/java/src/main/java/io/kafkaesque/pulsar/client/auth/AuthFactory.java @@ -1,6 +1,7 @@ package io.kafkaesque.pulsar.client.auth; import io.kafkaesque.pulsar.client.auth.auth0.Auth0JWT; +import io.kafkaesque.pulsar.client.auth.cognito.CognitoJWT; import org.apache.pulsar.client.api.Authentication; @@ -38,4 +39,17 @@ public static Authentication auth0(String domain, String clientId, String client return auth0; } + /** + * Request JWT from AWS Cognito and pass the JWT to pulsar broker. + * @param domain + * @param clientId + * @param clientSecret + * @param scope + * @return + */ + public static Authentication cognito(String domain, String clientId, String clientSecret, String scope) { + return new AuthenticationCognito(CognitoJWT.create(domain, clientId, clientSecret, scope).generateAndCheck()); + + } + } \ No newline at end of file diff --git a/java/src/main/java/io/kafkaesque/pulsar/client/auth/AuthMethod.java b/java/src/main/java/io/kafkaesque/pulsar/client/auth/AuthMethod.java index 102f121..d8e92f7 100644 --- a/java/src/main/java/io/kafkaesque/pulsar/client/auth/AuthMethod.java +++ b/java/src/main/java/io/kafkaesque/pulsar/client/auth/AuthMethod.java @@ -8,6 +8,8 @@ public final class AuthMethod { public final static String TOKEN = "token"; public final static String AUTH0 = "auth0"; + + public final static String COGNITO = "cognito"; - public final static List supportedMethods = Arrays.asList(TOKEN, AUTH0); + public final static List supportedMethods = Arrays.asList(TOKEN, AUTH0, COGNITO); } \ No newline at end of file diff --git a/java/src/main/java/io/kafkaesque/pulsar/client/auth/AuthenticationAuth0.java b/java/src/main/java/io/kafkaesque/pulsar/client/auth/AuthenticationAuth0.java index d22b440..cb2623b 100644 --- a/java/src/main/java/io/kafkaesque/pulsar/client/auth/AuthenticationAuth0.java +++ b/java/src/main/java/io/kafkaesque/pulsar/client/auth/AuthenticationAuth0.java @@ -23,7 +23,7 @@ public class AuthenticationAuth0 implements Authentication, EncodedAuthenticatio * */ private static final long serialVersionUID = 1L; - private Supplier tokenSupplier; + protected Supplier tokenSupplier; private String authMethod = AuthMethod.AUTH0; diff --git a/java/src/main/java/io/kafkaesque/pulsar/client/auth/AuthenticationCognito.java b/java/src/main/java/io/kafkaesque/pulsar/client/auth/AuthenticationCognito.java new file mode 100644 index 0000000..4bc3d60 --- /dev/null +++ b/java/src/main/java/io/kafkaesque/pulsar/client/auth/AuthenticationCognito.java @@ -0,0 +1,45 @@ +package io.kafkaesque.pulsar.client.auth; + +import com.google.common.base.Charsets; + +import java.io.IOException; +import java.net.URI; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.Map; +import java.util.function.Supplier; + +import org.apache.pulsar.client.api.AuthenticationDataProvider; +import org.apache.pulsar.client.api.PulsarClientException; + +/** + * Cognito JWT based authentication provider. + */ +public class AuthenticationCognito extends AuthenticationAuth0 { + + /** + * + */ + private static final long serialVersionUID = 1L; + + private String authMethod = AuthMethod.COGNITO; + + public AuthenticationCognito(String token) { + super(() -> token); + } + + public AuthenticationCognito(Supplier tokenSupplier) { + super(tokenSupplier); + } + + @Override + public String getAuthMethodName() { + return authMethod; + } + + @Override + public AuthenticationDataProvider getAuthData() throws PulsarClientException { + return new AuthenticationDataCognito(tokenSupplier); + } + +} diff --git a/java/src/main/java/io/kafkaesque/pulsar/client/auth/AuthenticationDataCognito.java b/java/src/main/java/io/kafkaesque/pulsar/client/auth/AuthenticationDataCognito.java new file mode 100644 index 0000000..2e8f368 --- /dev/null +++ b/java/src/main/java/io/kafkaesque/pulsar/client/auth/AuthenticationDataCognito.java @@ -0,0 +1,93 @@ +package io.kafkaesque.pulsar.client.auth; + +import java.util.Map; + +import java.util.Set; +import java.util.function.Supplier; + +import javax.naming.AuthenticationException; + +import org.apache.pulsar.common.api.AuthData; + +import org.apache.pulsar.client.api.AuthenticationDataProvider; + +import static java.nio.charset.StandardCharsets.UTF_8; +/** + * This plugin is for AWS Cognito JWT Pulsar authentication data. + */ +public class AuthenticationDataCognito implements AuthenticationDataProvider { + + /** + * + */ + private static final long serialVersionUID = 1L; + + private final Supplier token; + + public AuthenticationDataCognito(Supplier token) { + this.token = token; + } + + /* + * HTTP + */ + + /** + * Check if data for HTTP are available. + * + * @return true if this authentication data contain data for HTTP + */ + public boolean hasDataForHttp() { + return true; + } + + /** + * + * @return a authentication scheme, or {@code null} if the request will not be authenticated. + */ + public String getHttpAuthType() { + return null; + } + + /** + * + * @return an enumeration of all the header names + */ + public Set> getHttpHeaders() throws Exception { + return null; + } + + /* + * Command + */ + + /** + * Check if data from Pulsar protocol are available. + * + * @return true if this authentication data contain data from Pulsar protocol + */ + public boolean hasDataFromCommand() { + return token.get() != null; + } + + /** + * + * @return authentication data which will be stored in a command + */ + public String getCommandData() { + return token.get(); + } + + /** + * For mutual authentication, This method use passed in `data` to evaluate and challenge, + * then returns null if authentication has completed; + * returns authenticated data back to server side, if authentication has not completed. + * + *

Mainly used for mutual authentication like sasl. + */ + public AuthData authenticate(AuthData data) throws AuthenticationException { + byte[] bytes = (hasDataFromCommand() ? this.getCommandData() : "").getBytes(UTF_8); + return AuthData.of(bytes); + } + +} \ No newline at end of file diff --git a/java/src/main/java/io/kafkaesque/pulsar/client/auth/auth0/Auth0JWT.java b/java/src/main/java/io/kafkaesque/pulsar/client/auth/auth0/Auth0JWT.java index c288c5d..9563efb 100644 --- a/java/src/main/java/io/kafkaesque/pulsar/client/auth/auth0/Auth0JWT.java +++ b/java/src/main/java/io/kafkaesque/pulsar/client/auth/auth0/Auth0JWT.java @@ -66,7 +66,7 @@ public JSONObject generate() throws UnsupportedEncodingException { int statusCode = response.getStatus(); //TODO: add retry-after 503, 429, 301 if (statusCode != 200) { - throw new JWTVerificationException("invalide auth0.com status code " + statusCode); + throw new JWTVerificationException("invalid auth0.com status code " + statusCode); } JSONObject jsonObj = response.getBody().getObject(); diff --git a/java/src/main/java/io/kafkaesque/pulsar/client/auth/cognito/CognitoJWT.java b/java/src/main/java/io/kafkaesque/pulsar/client/auth/cognito/CognitoJWT.java new file mode 100644 index 0000000..6dd7afe --- /dev/null +++ b/java/src/main/java/io/kafkaesque/pulsar/client/auth/cognito/CognitoJWT.java @@ -0,0 +1,92 @@ +package io.kafkaesque.pulsar.client.auth.cognito; + +import java.io.UnsupportedEncodingException; + +import com.auth0.jwt.exceptions.JWTVerificationException; + +import org.apache.pulsar.shade.io.netty.util.internal.StringUtil; +import org.apache.pulsar.shade.org.apache.commons.lang3.Validate; + +import kong.unirest.HttpResponse; +import kong.unirest.JsonNode; +import kong.unirest.Unirest; +import kong.unirest.json.JSONObject; + +/** + * Generate a cognito JWT with basic scope validation. + */ +public class CognitoJWT { + + String tokenServerUrl; + String clientId; + String clientSecret; + String scope; + + private CognitoJWT(String domain, String clientId, String clientSecret, String scope) { + this.tokenServerUrl = Validate.notEmpty(domain); + if (!domain.endsWith("/oauth2/token")) { + this.tokenServerUrl = domain + "/oauth2/token"; + } + this.clientId = Validate.notEmpty(clientId); + this.clientSecret = Validate.notEmpty(clientSecret); + this.scope = Validate.notEmpty(scope); + } + + /** + * Create a CognitoJWT object. + * @param domain + * @param clientId + * @param clientSecret + * @param scope + * @return + */ + public static CognitoJWT create(String domain, String clientId, String clientSecret, String scope) { + return new CognitoJWT(domain, clientId, clientSecret, scope); + } + + /** + * + * @return + * @throws UnsupportedEncodingException + */ + public JSONObject generate() throws UnsupportedEncodingException { + + Unirest.config().enableCookieManagement(false); + String reqBody = "grant_type=client_credentials&scope=" + this.scope; + + HttpResponse response = Unirest.post(this.tokenServerUrl) + .header("content-type", "application/x-www-form-urlencoded") + .basicAuth(this.clientId, this.clientSecret) + .body(reqBody).asJson(); + + Unirest.config().reset(); + int statusCode = response.getStatus(); + JSONObject jsonObj = response.getBody().getObject(); + //TODO: may retry with some 400 or 500 code + if (statusCode != 200) { + throw new JWTVerificationException("invalid aws cognito status code " + statusCode); + } + + return jsonObj; + } + + /** + * Generate and returns a Cognito JWT. + * @return + * @throws JWTVerificationException + */ + public String generateAndCheck() throws JWTVerificationException{ + JSONObject resp; + try { + resp = generate(); + } catch (UnsupportedEncodingException e) { + throw new JWTVerificationException(e.getMessage()); + } + String token = resp.getString("access_token"); + if (StringUtil.isNullOrEmpty(token)) { + throw new JWTVerificationException("Cognito JWT is empty"); + } + return token; + } + +} \ No newline at end of file