Skip to content

Commit

Permalink
Fix URI parsing
Browse files Browse the repository at this point in the history
  • Loading branch information
hadrienk committed Mar 19, 2024
1 parent f7fe185 commit 35d4731
Show file tree
Hide file tree
Showing 4 changed files with 182 additions and 34 deletions.
18 changes: 9 additions & 9 deletions src/main/java/fr/insee/trevas/jupyter/SparkUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import fr.insee.vtl.engine.VtlScriptEngine;
import fr.insee.vtl.spark.SparkDataset;
import org.apache.http.client.utils.URLEncodedUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
Expand All @@ -11,9 +12,9 @@

import javax.script.ScriptEngine;
import javax.script.ScriptEngineManager;

import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Map;

public class SparkUtils {

Expand All @@ -33,7 +34,7 @@ public static SparkSession buildSparkSession() {
if (Files.exists(path)) {
org.apache.spark.util.Utils.loadDefaultSparkProperties(
conf, path.normalize().toAbsolutePath().toString());
if (conf.get("spark.jars", "").equals("")) {
if (conf.get("spark.jars", "").isEmpty()) {
conf.set(
"spark.jars",
String.join(
Expand Down Expand Up @@ -72,15 +73,13 @@ public static SparkDataset readSasDataset(SparkSession spark, String path) throw

public static SparkDataset readCSVDataset(SparkSession spark, String path) throws Exception {
Dataset<Row> dataset;

String url = Utils.getURL(path);
Map<String, String> options = Utils.getQueryMap(path);

var uri = Utils.uri(path);
var params = new Utils.QueryParam(uri);
try {
dataset = spark.read().option("sep", ";")
dataset = spark.read().option("sep", params.getValue("sep").orElse(";"))
.option("header", "true")
.options(options)
.csv(url);
.options(params.flatten())
.csv(Path.of(Utils.strip(uri)).normalize().toAbsolutePath().toString());
} catch (Exception e) {
throw new Exception(e);
}
Expand All @@ -96,4 +95,5 @@ public static void writeCSVDataset(String location, SparkDataset dataset) {
org.apache.spark.sql.Dataset<Row> sparkDataset = dataset.getSparkDataset();
sparkDataset.write().mode(SaveMode.Overwrite).csv(location);
}

}
90 changes: 76 additions & 14 deletions src/main/java/fr/insee/trevas/jupyter/Utils.java
Original file line number Diff line number Diff line change
@@ -1,26 +1,88 @@
package fr.insee.trevas.jupyter;

import java.util.HashMap;
import java.util.Map;
import com.amazonaws.services.dynamodbv2.xspec.S;

import javax.swing.text.html.Option;
import java.io.UnsupportedEncodingException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class Utils {

public static String getURL(String path) {
return path.split("\\?")[0];
public static class QueryParam {
private final Map<String, List<String>> params = new LinkedHashMap<>();

public QueryParam(URI uri) {
Arrays.stream(Optional.ofNullable(uri.getQuery()).orElse("").split("&")).forEach(keyVal -> {
var kv = keyVal.split("=");
if (kv.length >= 1) {
var list = params.computeIfAbsent(kv[0], k -> new ArrayList<>());
if (kv.length == 2) {
list.add(kv[1]);
} else {
list.add("");
}
}
});
}

public Optional<List<String>> getValues(String key) {
return Optional.ofNullable(params.get(key));
}

public Optional<String> getValue(String key) {
return getValues(key).map(strings -> strings.get(0));
}

public Map<String, String> flatten() {
return params.keySet().stream().collect(Collectors.toMap(
s -> s,
s -> getValue(s).get()
));
}
}

public static Map<String, String> getQueryMap(String path) {
if (path == null || !path.contains("\\?")) {
return Map.of();
public static URI strip(URI uri) {
try {
return new URI(
uri.getScheme(),
uri.getUserInfo(),
uri.getHost(),
uri.getPort(),
uri.getPath(),
null,
null
);
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
String[] params = path.split("\\?")[1].split("&");
Map<String, String> map = new HashMap<>();
}

for (String param : params) {
String name = param.split("=")[0];
String value = param.split("=")[1];
map.put(name, value);
/**
* Parse value to URI. If the value has no scheme, it is interpreted as a local path.
* <p>
* Ex: /foo/bar becomes file:///foo/bar
*/
public static URI uri(String value) {
try {
var uri = new URI(value);
if (uri.getScheme() == null) {
var tmp = Path.of(uri.getPath()).toUri();
return new URI(tmp +
Optional.ofNullable(uri.getQuery()).map(s -> URLEncoder.encode(s, StandardCharsets.UTF_8)).map(s -> "?" + s).orElse("") +
Optional.ofNullable(uri.getFragment()).map(s -> URLEncoder.encode(s, StandardCharsets.UTF_8)).map(s -> "#" + s).orElse("")
);
}
return uri;
} catch (URISyntaxException e) {
return Path.of(value).toUri();
}
return map;
}
}
45 changes: 34 additions & 11 deletions src/test/java/CSVTest.java
Original file line number Diff line number Diff line change
@@ -1,28 +1,51 @@
import fr.insee.trevas.jupyter.SparkUtils;
import fr.insee.trevas.jupyter.Utils;
import fr.insee.vtl.model.Dataset;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.Path;

import static org.assertj.core.api.Assertions.assertThat;

public class CSVTest {
@Test
void testURI() throws URISyntaxException {
URI uri;

private SparkSession spark;
uri = Utils.uri("src/test/resources/ds1.csv");
URI expected = Path.of("src/test/resources/ds1.csv").toUri();
assertThat(uri).isEqualTo(expected);

@BeforeEach
public void setUp() {
spark = SparkSession.builder()
.appName("test")
.master("local")
.getOrCreate();
uri = Utils.uri("src/test/resources/ds1.csv#fragment");
assertThat(uri.getHost()).isEqualTo(expected.getHost());
assertThat(uri.getAuthority()).isEqualTo(expected.getAuthority());
assertThat(uri.getFragment()).isEqualTo("fragment");

uri = Utils.uri("src/test/resources/ds1.csv?foo=bar&foo=baz");
assertThat(uri.getHost()).isEqualTo(expected.getHost());
assertThat(uri.getAuthority()).isEqualTo(expected.getAuthority());
assertThat(uri.getQuery()).isEqualTo("foo=bar&foo=baz");

uri = Utils.uri("src/test/resources/ds1.csv?sep=%7C&del=%3B");
assertThat(uri.getHost()).isEqualTo(expected.getHost());
assertThat(uri.getAuthority()).isEqualTo(expected.getAuthority());
assertThat(uri.getQuery()).isEqualTo("sep=|&del=;");
}

@Test
public void readCSVDatasetTest() throws Exception {
Dataset ds1 = SparkUtils.readCSVDataset(spark, "src/test/resources/ds1.csv");
assertThat(ds1.getDataPoints().get(1).get("name")).isEqualTo("B");
Dataset ds2 = SparkUtils.readCSVDataset(spark, "src/test/resources/ds2.csv?sep=\"|\"&delimiter=\"\"");
assertThat(ds2.getDataPoints().get(1).get("name")).isEqualTo("B");
try (SparkSession spark = SparkSession.builder()
.appName("test")
.master("local")
.getOrCreate()) {
Dataset ds1 = SparkUtils.readCSVDataset(spark, "src/test/resources/ds1.csv");
assertThat(ds1.getDataPoints().get(1).get("name")).isEqualTo("B");
Dataset ds2 = SparkUtils.readCSVDataset(spark, "src/test/resources/ds2.csv?sep=%7C");
assertThat(ds2.getDataPoints().get(1).get("name")).isEqualTo("G");
}
}
}
63 changes: 63 additions & 0 deletions src/test/java/fr/insee/trevas/jupyter/UtilsTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package fr.insee.trevas.jupyter;

import org.junit.jupiter.api.Test;

import java.net.URI;
import java.net.URISyntaxException;
import java.net.URLDecoder;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.*;

public class UtilsTest {

@Test
void testURI() throws URISyntaxException {
var bases = List.of(
"http://example.com/foo/bar/file.csv",
"https://example.com/foo/bar/file.csv",
"https://example.com:8080/foo/bar/file.csv",
"file://example.com/foo/bar/file.csv",
"file://example.com:8080/foo/bar/file.csv",
"file:///foo/bar/file.csv",
"/foo/bar/file.csv"
);
var queries = List.of(
"",
"?sep=,&=del=;",
"?sep=%2C&del=%3B"
);
var fragments = List.of(
"",
"#fragment"
);
for (String base : bases) {
for (String query : queries) {
for (String fragment : fragments) {
URI uri = URI.create(base + query + fragment);

assertThat(
Optional.ofNullable(uri.getQuery())
.map(f -> "?" + f)
.orElse("")
).isEqualTo(URLDecoder.decode(query, StandardCharsets.UTF_8));
assertThat(uri.getPath()).isEqualTo("/foo/bar/file.csv");
assertThat(Optional.ofNullable(uri.getFragment()).map(f -> "#" + f).orElse("")).isEqualTo(fragment);

System.out.println("===============================================================");
System.out.println(Utils.uri(base + query + fragment));
System.out.println("===============================================================");
System.out.println(uri);
System.out.printf("Host: %s, Path: %s%n", uri.getHost(), uri.getPath());
System.out.printf("Auth: %s, Schm: %s%n", uri.getAuthority(), uri.getScheme());
System.out.printf("Query: %s, Fragment: %s%n", uri.getQuery(), uri.getFragment());
}
}
}
}
}

0 comments on commit 35d4731

Please sign in to comment.