Skip to content

Commit

Permalink
[Java] Add job name to GCS custom audit info (#31316)
Browse files Browse the repository at this point in the history
* Add job name to GCS custom audit info in Java

* Apply spotless
  • Loading branch information
shunping authored May 17, 2024
1 parent 93a5bc7 commit 28a7199
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
Expand Down Expand Up @@ -208,6 +209,8 @@ private boolean retryOnStatusCode(int statusCode) {

private Set<Integer> ignoredResponseCodes = new HashSet<>(DEFAULT_IGNORED_RESPONSE_CODES);

private Map<String, String> httpHeaders = null;

public RetryHttpRequestInitializer() {
this(Collections.emptyList());
}
Expand Down Expand Up @@ -270,6 +273,10 @@ public void initialize(HttpRequest request) throws IOException {
request.setUnsuccessfulResponseHandler(loggingHttpBackOffHandler);
request.setIOExceptionHandler(loggingHttpBackOffHandler);

if (this.httpHeaders != null) {
request.getHeaders().putAll(this.httpHeaders);
}

// Set response initializer
if (responseInterceptor != null) {
request.setResponseInterceptor(responseInterceptor);
Expand All @@ -284,4 +291,8 @@ public void setCustomErrors(CustomHttpErrors customErrors) {
public void setWriteTimeout(int writeTimeout) {
this.writeTimeout = writeTimeout;
}

public void setHttpHeaders(Map<String, String> httpHeaders) {
this.httpHeaders = httpHeaders;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.beam.sdk.extensions.gcp.util;

import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings.isNullOrEmpty;

import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport;
import com.google.api.client.http.HttpRequestInitializer;
import com.google.api.client.http.HttpTransport;
Expand All @@ -31,9 +33,12 @@
import java.net.URL;
import java.nio.file.Paths;
import java.security.GeneralSecurityException;
import java.util.Optional;
import org.apache.beam.sdk.extensions.gcp.auth.NullCredentialInitializer;
import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
import org.apache.beam.sdk.util.ReleaseInfo;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;

/** Helpers for cloud communication. */
public class Transport {
Expand Down Expand Up @@ -89,20 +94,31 @@ private static ApiComponents apiComponentsFromUrl(String urlString) {

/** Returns a Cloud Storage client builder using the specified {@link GcsOptions}. */
public static Storage.Builder newStorageClient(GcsOptions options) {
String applicationNameSuffix = " (GPN:Beam)";
String jobName = Optional.ofNullable(options.getJobName()).orElse("UNKNOWN");

String applicationName =
String.format(
"%sapache-beam/%s (GPN:Beam)",
isNullOrEmpty(options.getAppName()) ? "" : options.getAppName() + " ",
ReleaseInfo.getReleaseInfo().getSdkVersion());

String servicePath = options.getGcsEndpoint();

// Do not log the code 404. Code up the stack will deal with 404's if needed,
// and logging it by default clutters the output during file staging.
RetryHttpRequestInitializer retryHttpRequestInitializer =
new RetryHttpRequestInitializer(ImmutableList.of(404), new UploadIdResponseInterceptor());

// Set custom audit info in request headers
retryHttpRequestInitializer.setHttpHeaders(ImmutableMap.of("x-goog-custom-audit-job", jobName));

Storage.Builder storageBuilder =
new Storage.Builder(
getTransport(),
getJsonFactory(),
chainHttpRequestInitializer(
options.getGcpCredential(),
// Do not log the code 404. Code up the stack will deal with 404's if needed,
// and
// logging it by default clutters the output during file staging.
new RetryHttpRequestInitializer(
ImmutableList.of(404), new UploadIdResponseInterceptor())))
.setApplicationName(options.getAppName() + applicationNameSuffix)
options.getGcpCredential(), retryHttpRequestInitializer))
.setApplicationName(applicationName)
.setGoogleClientRequestInitializer(options.getGoogleApiTrace());
if (servicePath != null) {
ApiComponents components = apiComponentsFromUrl(servicePath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,18 @@

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.junit.Assert.assertEquals;

import com.google.api.client.http.HttpRequest;
import com.google.api.services.storage.Storage;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.util.ReleaseInfo;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
Expand All @@ -33,17 +39,36 @@
public class TransportTest {

@Test
public void testUserAgentInGcsRequestHeaders() throws IOException {
public void testUserAgentAndCustomAuditInGcsRequestHeaders() throws IOException {
GcsOptions gcsOptions = PipelineOptionsFactory.as(GcsOptions.class);
gcsOptions.setGcpCredential(new TestCredential());
gcsOptions.setJobName("test-job");
gcsOptions.setAppName("test-app");

Storage storageClient = Transport.newStorageClient(gcsOptions).build();
Storage.Objects.Get getObject = storageClient.objects().get("testbucket", "testobject");
Storage.Objects.Get getObject = storageClient.objects().get("test-bucket", "test-object");
HttpRequest request = getObject.buildHttpRequest();

// An example of user agent string will be like
// "TransportTest (GPN:Beam) Google-API-Java-Client/2.0.0"
// "test-app apache-beam/2.57.0.dev (GPN:Beam) Google-API-Java-Client/2.0.0"
// For a valid user-agent string, a comment like "(GPN:Beam)" cannot be the first token.
// https://www.rfc-editor.org/rfc/rfc7231#section-5.5.3
assertThat(
Arrays.asList(getObject.getRequestHeaders().getUserAgent().split(" "))
.indexOf("(GPN:Beam)"),
Arrays.asList(request.getHeaders().getUserAgent().split(" ")).indexOf("test-app"),
greaterThanOrEqualTo(0));

assertThat(
Arrays.asList(request.getHeaders().getUserAgent().split(" "))
.indexOf(String.format("apache-beam/%s", ReleaseInfo.getReleaseInfo().getSdkVersion())),
greaterThan(0));

assertThat(
Arrays.asList(request.getHeaders().getUserAgent().split(" ")).indexOf("(GPN:Beam)"),
greaterThan(0));

// there should be one and only one custom audit entry for job name
assertEquals(
request.getHeaders().getHeaderStringValues("x-goog-custom-audit-job"),
Collections.singletonList("test-job"));
}
}

0 comments on commit 28a7199

Please sign in to comment.