Skip to content

Commit

Permalink
Apache Spark support in pipelinedp4j for end-to-end Differential Priv…
Browse files Browse the repository at this point in the history
…acy (#278)

* Spark kotlin setup

* jackson databind dependency issue

* fix jacksondata bind version issue

* Spark Encoders

* Add Spark Table implementation

* spark table encoder

* adding unit test for spark table

* Add more unit tests for SparkTable

* spark encoder pair runtime exception

* Use scala 2.13

* Add more unit test for spark collection

* remove spaces

* Pair<T1, T2> Encoder for Spark

* resolve PR comments

* gitignore /.ijwb/.idea/ files

* gitignore /.ijwb/.idea files

* gitignore /.ijwb/.idea files

* gitignore ijwb files

* create class rule to create spark session for each test class run

* Add implementation of samplePerKey for SparkTable

* Add QueryBuilder for Spark

* resolve comments

* Spark QueryBuilder implementation

* Remove comments

* add copyright comment

* rename variables

* Add copyright for files

* Added comment and renamed variables in filterKeysStoredInSparkCollection

* Added formatting changes for new liner

* SparkExample for end-to-end Differential Privacy

* Remove println

* resolve comments

* resolve comments

* spark kryo serializer requires class to be public

* resolve comments

* Correct Java comment

* Format files
  • Loading branch information
sakkumar authored Dec 5, 2024
1 parent d986e7f commit 1dfe8f9
Show file tree
Hide file tree
Showing 32 changed files with 1,785 additions and 16 deletions.
9 changes: 9 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,12 @@
**/bazel-java
**/bazel-out
**/bazel-testlogs

**/bazel-differential-privacy
**/.ijwb/
**/pipelinedp4j/.ijwb/
**/pipelinedp4j/bazel-pipelinedp4j
**/pipelinedp4j/MODULE**
**/examples/.idea/
**/examples/pipelinedp4j/bazel-pipelinedp4j
**/examples/pipelinedp4j/MODULE**
12 changes: 12 additions & 0 deletions examples/pipelinedp4j/WORKSPACE.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,18 @@ maven_install(
"org.apache.beam:beam-sdks-java-core:2.49.0",
"org.apache.beam:beam-sdks-java-extensions-avro:2.49.0",
"org.apache.beam:beam-sdks-java-extensions-protobuf:2.49.0",

"info.picocli:picocli:4.7.6",
# For Apache Spark
"org.apache.spark:spark-core_2.13:3.3.2",
"org.apache.spark:spark-sql_2.13:3.3.2",
"org.apache.spark:spark-mllib_2.13:3.3.2",
"org.apache.spark:spark-catalyst_2.13:3.3.2",
"com.fasterxml.jackson.core:jackson-databind:2.14.2",
"com.fasterxml.jackson.module:jackson-module-paranamer:2.14.2",
"com.fasterxml.jackson.module:jackson-module-scala_2.13:2.14.2",
"org.scala-lang:scala-library:2.13.12",

# For logging to console.
"org.slf4j:slf4j-jdk14:1.7.36",
],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,27 @@ java_binary(
"@maven//:org_slf4j_slf4j_jdk14",
],
)

java_binary(
name = "SparkExample",
srcs = [
"SparkExample.java",
"MovieMetrics.java",
"MovieView.java",
],
main_class = "com.google.privacy.differentialprivacy.pipelinedp4j.examples.SparkExample",
deps = [
"@com_google_privacy_differentialprivacy_pipielinedp4j//main/com/google/privacy/differentialprivacy/pipelinedp4j/api",
"@maven//:com_google_guava_guava",
"@maven//:info_picocli_picocli",
"@maven//:org_jetbrains_kotlin_kotlin_stdlib",

"@maven//:org_apache_spark_spark_core_2_13",
"@maven//:org_apache_spark_spark_sql_2_13",
"@maven//:org_apache_spark_spark_mllib_2_13",
"@maven//:org_apache_spark_spark_catalyst_2_13",
"@maven//:com_fasterxml_jackson_core_jackson_databind",
"@maven//:com_fasterxml_jackson_module_jackson_module_paranamer",
"@maven//:org_scala_lang_scala_library",
],
)
Original file line number Diff line number Diff line change
Expand Up @@ -150,23 +150,23 @@ static void runBeamExample(BeamExampleOptions options) {

// Data extractors. They always have to implement Function1 and Serializable interfaces. If it
// doesn't implement Serializable interface, it will fail on Beam. If it doesn't implement
// Function1, it will at compile time due to types mismatch. Do not use lambdas for data
// Function1, it will fail at compile time due to types mismatch. Do not use lambdas for data
// extractors as they won't be serializable.
static class UserIdExtractor implements Function1<MovieView, String>, Serializable {
private static class UserIdExtractor implements Function1<MovieView, String>, Serializable {
@Override
public String invoke(MovieView movieView) {
return movieView.getUserId();
}
}

static class MovieIdExtractor implements Function1<MovieView, String>, Serializable {
private static class MovieIdExtractor implements Function1<MovieView, String>, Serializable {
@Override
public String invoke(MovieView movieView) {
return movieView.getMovieId();
}
}

static class RatingExtractor implements Function1<MovieView, Double>, Serializable {
private static class RatingExtractor implements Function1<MovieView, Double>, Serializable {
@Override
public Double invoke(MovieView movieView) {
return movieView.getRating();
Expand All @@ -185,7 +185,7 @@ private static PCollection<MovieView> readData(Pipeline pipeline, String inputFi

/**
* Movie ids (which are group keys for this dataset) are integers from 1 to ~17000. Set public
* groups 1-10.
* groups 4500-4509.
*/
private static PCollection<String> publiclyKnownMovieIds(Pipeline pipeline) {
var publicGroupsAsJavaList =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
*
* <p>It is the result of the DP metrics query.
*/
final class MovieMetrics {
public final class MovieMetrics {
private final String movieId;

private final long numberOfViewers;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import java.util.List;

/** Represents a single movie view from the Netflix dataset. */
final class MovieView {
public final class MovieView {
private final String userId;
private final String movieId;
private final Double rating;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
/*
* Copyright 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.privacy.differentialprivacy.pipelinedp4j.examples;

import static java.lang.Math.round;
import static java.util.stream.Collectors.toCollection;

import com.google.privacy.differentialprivacy.pipelinedp4j.api.NoiseKind;
import com.google.privacy.differentialprivacy.pipelinedp4j.api.QueryBuilder;
import com.google.privacy.differentialprivacy.pipelinedp4j.api.QueryPerGroupResult;
import com.google.privacy.differentialprivacy.pipelinedp4j.api.TotalBudget;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.stream.IntStream;
import kotlin.jvm.functions.Function1;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import picocli.CommandLine;
import picocli.CommandLine.Command;
import picocli.CommandLine.Option;

/**
* An end-to-end example how to compute DP metrics on a Netflix dataset using the library on Spark.
*
* <p>See README for details including how to run the example.
*/
@Command(
name = "SparkExample",
version = {"SparkExample 1.0"},
mixinStandardHelpOptions = true)
public class SparkExample implements Runnable {
@Option(
names = "--use-public-groups",
description =
"If true we will assume in the example that movie ids are publicly known and are from "
+ "4500 to 4509"
+ ". Default is false, i.e. we will choose movie ids in a differentially"
+ " private way.",
defaultValue = "false")
private boolean usePublicGroups = false;

@Option(
names = "--local-input-file-path",
description =
"Input file. For using as input file you can download data from"
+ " https://www.kaggle.com/datasets/netflix-inc/netflix-prize-data. Use only part of"
+ " it to speed up the calculations.",
required = true)
private String localInputFilePath;

@Option(
names = "--local-output-file-path",
description = "Output file.",
defaultValue = "/tmp/anonymized_output/")
private String localOutputFilePath;

public static void main(String[] args) {
int exitCode = new CommandLine(new SparkExample()).execute(args);
System.exit(exitCode);
}

@Override
public void run() {
System.out.println("Starting calculations...");
SparkSession spark = initSpark();
// Read the input data, these are movie views that contain movie id, user id and rating.
Dataset<MovieView> data = readData(spark);

// Define the query
var query =
QueryBuilder.from(data, /* privacyIdExtractor= */ new UserIdExtractor())
.groupBy(
/* groupKeyExtractor= */ new MovieIdExtractor(),
/* maxGroupsContributed= */ 3,
/* maxContributionsPerGroup= */ 1,
usePublicGroups ? publiclyKnownMovieIds(spark) : null)
.countDistinctPrivacyUnits("numberOfViewers")
.count(/* outputColumnName= */ "numberOfViews")
.mean(
new RatingExtractor(),
/* minValue= */ 1.0,
/* maxValue= */ 5.0,
/* outputColumnName= */ "averageOfRatings",
/* budget= */ null)
.build();
// Run the query with DP parameters.
Dataset<QueryPerGroupResult> result =
query.run(new TotalBudget(/* epsilon= */ 1.1, /* delta= */ 1e-10), NoiseKind.LAPLACE);

// Convert the result to better representation, i.e. to MovieMetrics.
Encoder<MovieMetrics> movieMetricsEncoder = Encoders.kryo(MovieMetrics.class);
MapFunction<QueryPerGroupResult, MovieMetrics> mapToMovieMetricsFn =
perGroupResult -> {
String movieId = perGroupResult.getGroupKey();
long numberOfViewers =
round(perGroupResult.getAggregationResults().get("numberOfViewers"));
long numberOfViews = round(perGroupResult.getAggregationResults().get("numberOfViews"));
double averageOfRatings = perGroupResult.getAggregationResults().get("averageOfRatings");
return new MovieMetrics(movieId, numberOfViewers, numberOfViews, averageOfRatings);
};
// We now have our anonymized metrics of movie views.
Dataset<MovieMetrics> anonymizedMovieMetrics =
result.map(mapToMovieMetricsFn, movieMetricsEncoder);

// Save the result to a file.
writeOutput(anonymizedMovieMetrics);

// Stop spark session
spark.stop();
System.out.println("Finished calculations.");
}

// Data extractors. They always have to implement Function1 and Serializable interfaces. If it
// doesn't implement Serializable interface, it will fail on Spark. If it doesn't implement
// Function1, it will fail at compile time due to types mismatch. Do not use lambdas for data
// extractors as they won't be serializable.
private static class UserIdExtractor implements Function1<MovieView, String>, Serializable {
@Override
public String invoke(MovieView movieView) {
return movieView.getUserId();
}
}

private static class MovieIdExtractor implements Function1<MovieView, String>, Serializable {
@Override
public String invoke(MovieView movieView) {
return movieView.getMovieId();
}
}

private static class RatingExtractor implements Function1<MovieView, Double>, Serializable {
@Override
public Double invoke(MovieView movieView) {
return movieView.getRating();
}
}

private static SparkSession initSpark() {
return SparkSession.builder()
.appName("Kotlin Spark Example")
.master("local[*]")
.config("spark.driver.bindAddress", "127.0.0.1")
.getOrCreate();
}

private Dataset<MovieView> readData(SparkSession spark) {
Dataset<Row> inputDataFrame = spark.read().option("header", "false").csv(localInputFilePath);
MapFunction<Row, MovieView> mapToMovieView =
row ->
new MovieView(
row.getString(1), row.getString(0), java.lang.Double.valueOf((String) row.get(2)));
return inputDataFrame.map(mapToMovieView, Encoders.kryo(MovieView.class));
}

/**
* Movie ids (which are group keys for this dataset) are integers from 1 to ~17000. Set public
* groups 4500-4509.
*/
private static Dataset<String> publiclyKnownMovieIds(SparkSession spark) {
ArrayList<String> publicGroupsAsJavaList =
IntStream.rangeClosed(4500, 4509)
.mapToObj(Integer::toString)
.collect(toCollection(ArrayList::new));
return spark.createDataset(publicGroupsAsJavaList, Encoders.STRING());
}

private void writeOutput(Dataset<MovieMetrics> result) {
Dataset<String> lines =
result.map((MapFunction<MovieMetrics, String>) MovieMetrics::toString, Encoders.STRING());
lines
.write()
.mode(SaveMode.Overwrite) // Overwrite existing file if any
.text(localOutputFilePath);
}
}
17 changes: 17 additions & 0 deletions pipelinedp4j/.bazelrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


build --java_language_version=11
build --java_runtime_version=remotejdk_11
4 changes: 4 additions & 0 deletions pipelinedp4j/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ pom_file(
"//main/com/google/privacy/differentialprivacy/pipelinedp4j/proto:accumulators_proto",
"//main/com/google/privacy/differentialprivacy/pipelinedp4j/proto:dpaggregates_kt_proto",
"//main/com/google/privacy/differentialprivacy/pipelinedp4j/proto:dpaggregates_proto",
"//main/com/google/privacy/differentialprivacy/pipelinedp4j/spark:spark_collections",
"//main/com/google/privacy/differentialprivacy/pipelinedp4j/spark:spark_dp_engine_factory",
"//main/com/google/privacy/differentialprivacy/pipelinedp4j/spark:spark_encoders",

],
template_file = "pom.template",
)
Expand Down
10 changes: 10 additions & 0 deletions pipelinedp4j/WORKSPACE.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,16 @@ maven_install(
"org.apache.beam:beam-sdks-java-core:2.49.0",
"org.apache.beam:beam-sdks-java-extensions-avro:2.49.0",
"org.apache.beam:beam-sdks-java-extensions-protobuf:2.49.0",

"org.apache.spark:spark-core_2.13:3.3.2",
"org.apache.spark:spark-sql_2.13:3.3.2",
"org.apache.spark:spark-mllib_2.13:3.3.2",
"org.apache.spark:spark-catalyst_2.13:3.3.2",
"com.fasterxml.jackson.core:jackson-databind:2.14.2",
"com.fasterxml.jackson.module:jackson-module-paranamer:2.14.2",
"com.fasterxml.jackson.module:jackson-module-scala_2.13:2.14.2",
"org.scala-lang:scala-library:2.13.12",

# Test only dependencies.
maven.artifact(
"com.google.truth",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,12 @@ kt_jvm_library(
"//main/com/google/privacy/differentialprivacy/pipelinedp4j/local:local_collections",
"//main/com/google/privacy/differentialprivacy/pipelinedp4j/local:local_encoders",
"//main/com/google/privacy/differentialprivacy/pipelinedp4j/proto:dpaggregates_kt_proto",
"//main/com/google/privacy/differentialprivacy/pipelinedp4j/spark:spark_collections",
"//main/com/google/privacy/differentialprivacy/pipelinedp4j/spark:spark_encoders",
"@maven//:com_google_errorprone_error_prone_annotations",
"@maven//:com_google_guava_guava",
"@maven//:org_apache_beam_beam_sdks_java_core",
"@maven//:org_apache_beam_beam_sdks_java_extensions_avro",

],
)
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ import com.google.privacy.differentialprivacy.pipelinedp4j.core.EncoderFactory
import com.google.privacy.differentialprivacy.pipelinedp4j.core.FrameworkCollection
import com.google.privacy.differentialprivacy.pipelinedp4j.local.LocalCollection
import com.google.privacy.differentialprivacy.pipelinedp4j.local.LocalEncoderFactory
import com.google.privacy.differentialprivacy.pipelinedp4j.spark.SparkCollection
import com.google.privacy.differentialprivacy.pipelinedp4j.spark.SparkEncoderFactory
import org.apache.beam.sdk.values.PCollection as BeamPCollection
import org.apache.spark.sql.Dataset

/**
* An internal interface to represent an arbitrary collection that is supported by PipelineDP4j.
Expand Down Expand Up @@ -52,3 +55,10 @@ internal data class LocalPipelineDpCollection<T>(val data: Sequence<T>) : Pipeli

override fun toFrameworkCollection() = LocalCollection<T>(data)
}

/** Spark Collection represented as a Spark Dataset. */
internal data class SparkPipelineDpCollection<T>(val data: Dataset<T>) : PipelineDpCollection<T> {
override val encoderFactory = SparkEncoderFactory()

override fun toFrameworkCollection() = SparkCollection<T>(data)
}
Loading

0 comments on commit 1dfe8f9

Please sign in to comment.