Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][fn] Cherry-pick commit from apache PR#22122 #230

Merged
merged 3 commits into from
Mar 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion conf/functions_worker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,16 @@ metadataStoreOperationTimeoutSeconds: 30
# Metadata store cache expiry time in seconds
metadataStoreCacheExpirySeconds: 300

# Specifies if the function worker should use classloading for validating submissions for built-in
# connectors and functions. This is required for validateConnectorConfig to take effect.
# Default is false.
enableClassloadingOfBuiltinFiles: false

# Specifies if the function worker should use classloading for validating submissions for external
# connectors and functions. This is required for validateConnectorConfig to take effect.
# Default is false.
enableClassloadingOfExternalFiles: false

################################
# Function package management
################################
Expand Down Expand Up @@ -390,7 +400,10 @@ saslJaasServerRoleTokenSignerSecretPath:
connectorsDirectory: ./connectors
functionsDirectory: ./functions

# Should connector config be validated during submission
# Enables extended validation for connector config with fine-grain annotation based validation
# during submission. Classloading with either enableClassloadingOfExternalFiles or
# enableClassloadingOfBuiltinFiles must be enabled on the worker for this to take effect.
# Default is false.
validateConnectorConfig: false

# Whether to initialize distributed log metadata by runtime.
Expand Down
4 changes: 4 additions & 0 deletions distribution/server/src/assemble/LICENSE.bin.txt
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,10 @@ The Apache Software License, Version 2.0
* Jodah
- net.jodah-typetools-0.5.0.jar
- net.jodah-failsafe-2.4.4.jar
* Byte Buddy
- net.bytebuddy-byte-buddy-1.14.12.jar
* zt-zip
- org.zeroturnaround-zt-zip-1.17.jar
* Apache Avro
- org.apache.avro-avro-1.10.2.jar
- org.apache.avro-avro-protobuf-1.10.2.jar
Expand Down
14 changes: 14 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ flexible messaging model and an intuitive client API.</description>
<puppycrawl.checkstyle.version>8.37</puppycrawl.checkstyle.version>
<dockerfile-maven.version>1.4.13</dockerfile-maven.version>
<typetools.version>0.5.0</typetools.version>
<byte-buddy.version>1.14.12</byte-buddy.version>
<zt-zip.version>1.17</zt-zip.version>
<protobuf3.version>3.19.6</protobuf3.version>
<protoc3.version>${protobuf3.version}</protoc3.version>
<grpc.version>1.45.1</grpc.version>
Expand Down Expand Up @@ -966,6 +968,18 @@ flexible messaging model and an intuitive client API.</description>
<version>${typetools.version}</version>
</dependency>

<dependency>
<groupId>net.bytebuddy</groupId>
<artifactId>byte-buddy</artifactId>
<version>${byte-buddy.version}</version>
</dependency>

<dependency>
<groupId>org.zeroturnaround</groupId>
<artifactId>zt-zip</artifactId>
<version>${zt-zip.version}</version>
</dependency>

<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-bom</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1704,7 +1704,7 @@ public void testReplicatorWithFailedAck() throws Exception {
ConcurrentOpenHashMap<String, Replicator> replicators = topic.getReplicators();
PersistentReplicator replicator = (PersistentReplicator) replicators.get("r2");

Awaitility.await().pollInterval(1, TimeUnit.SECONDS).timeout(30, TimeUnit.SECONDS)
Awaitility.await().pollInterval(1, TimeUnit.SECONDS).timeout(90, TimeUnit.SECONDS)
.untilAsserted(() -> assertEquals(org.apache.pulsar.broker.service.AbstractReplicator.State.Started,
replicator.getState()));
assertEquals(replicator.getState(), org.apache.pulsar.broker.service.AbstractReplicator.State.Started);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ public class SinkConfig {
private Boolean autoAck;
private Long timeoutMs;
private Long negativeAckRedeliveryDelayMs;

private String sinkType;
private String archive;
// Whether the subscriptions the functions created/used should be deleted when the functions is deleted
private Boolean cleanupSubscription;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,11 @@ public NarClassLoader run() {
});
}

public static List<File> getClasspathFromArchive(File narPath, String narExtractionDirectory) throws IOException {
File unpacked = NarUnpacker.unpackNar(narPath, getNarExtractionDirectory(narExtractionDirectory));
return getClassPathEntries(unpacked);
}

private static File getNarExtractionDirectory(String configuredDirectory) {
return new File(configuredDirectory + "/" + TMP_DIR_PREFIX);
}
Expand All @@ -164,16 +169,11 @@ private static File getNarExtractionDirectory(String configuredDirectory) {
* @param narWorkingDirectory
* directory to explode nar contents to
* @param parent
* @throws IllegalArgumentException
* if the NAR is missing the Java Services API file for <tt>FlowFileProcessor</tt> implementations.
* @throws ClassNotFoundException
* if any of the <tt>FlowFileProcessor</tt> implementations defined by the Java Services API cannot be
* loaded.
* @throws IOException
* if an error occurs while loading the NAR.
*/
private NarClassLoader(final File narWorkingDirectory, Set<String> additionalJars, ClassLoader parent)
throws ClassNotFoundException, IOException {
throws IOException {
super(new URL[0], parent);
this.narWorkingDirectory = narWorkingDirectory;

Expand Down Expand Up @@ -238,22 +238,31 @@ public List<String> getServiceImplementation(String serviceName) throws IOExcept
* if the URL list could not be updated.
*/
private void updateClasspath(File root) throws IOException {
addURL(root.toURI().toURL()); // for compiled classes, META-INF/, etc.
getClassPathEntries(root).forEach(f -> {
try {
addURL(f.toURI().toURL());
} catch (IOException e) {
log.error("Failed to add entry to classpath: {}", f, e);
}
});
}

static List<File> getClassPathEntries(File root) {
List<File> classPathEntries = new ArrayList<>();
classPathEntries.add(root);
File dependencies = new File(root, "META-INF/bundled-dependencies");
if (!dependencies.isDirectory()) {
log.warn("{} does not contain META-INF/bundled-dependencies!", narWorkingDirectory);
log.warn("{} does not contain META-INF/bundled-dependencies!", root);
}
addURL(dependencies.toURI().toURL());
classPathEntries.add(dependencies);
if (dependencies.isDirectory()) {
final File[] jarFiles = dependencies.listFiles(JAR_FILTER);
if (jarFiles != null) {
Arrays.sort(jarFiles, Comparator.comparing(File::getName));
for (File libJar : jarFiles) {
addURL(libJar.toURI().toURL());
}
classPathEntries.addAll(Arrays.asList(jarFiles));
}
}
return classPathEntries;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,14 @@
import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.file.Path;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Base64;
import java.util.Enumeration;
import java.util.concurrent.ConcurrentHashMap;
import java.util.jar.JarEntry;
import java.util.jar.JarFile;
import java.util.zip.ZipEntry;
import java.util.zip.ZipFile;
import lombok.extern.slf4j.Slf4j;

/**
Expand Down Expand Up @@ -114,16 +115,24 @@ static File doUnpackNar(final File nar, final File baseWorkingDirectory, Runnabl
* if the NAR could not be unpacked.
*/
private static void unpack(final File nar, final File workingDirectory) throws IOException {
try (JarFile jarFile = new JarFile(nar)) {
Enumeration<JarEntry> jarEntries = jarFile.entries();
while (jarEntries.hasMoreElements()) {
JarEntry jarEntry = jarEntries.nextElement();
String name = jarEntry.getName();
File f = new File(workingDirectory, name);
if (jarEntry.isDirectory()) {
Path workingDirectoryPath = workingDirectory.toPath().normalize();
try (ZipFile zipFile = new ZipFile(nar)) {
Enumeration<? extends ZipEntry> zipEntries = zipFile.entries();
while (zipEntries.hasMoreElements()) {
ZipEntry zipEntry = zipEntries.nextElement();
String name = zipEntry.getName();
Path targetFilePath = workingDirectoryPath.resolve(name).normalize();
if (!targetFilePath.startsWith(workingDirectoryPath)) {
log.error("Invalid zip file with entry '{}'", name);
throw new IOException("Invalid zip file. Aborting unpacking.");
}
File f = targetFilePath.toFile();
if (zipEntry.isDirectory()) {
FileUtils.ensureDirectoryExistAndCanReadAndWrite(f);
} else {
makeFile(jarFile.getInputStream(jarEntry), f);
// The directory entry might appear after the file entry
FileUtils.ensureDirectoryExistAndCanReadAndWrite(f.getParentFile());
makeFile(zipFile.getInputStream(zipEntry), f);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.FunctionDefinition;
import org.apache.pulsar.common.functions.Utils;
import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.io.SinkConfig;
import org.apache.pulsar.common.io.SourceConfig;
import org.apache.pulsar.common.nar.FileUtils;
Expand All @@ -77,8 +79,11 @@
import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.utils.FunctionConfigUtils;
import org.apache.pulsar.functions.utils.FunctionRuntimeCommon;
import org.apache.pulsar.functions.utils.LoadedFunctionPackage;
import org.apache.pulsar.functions.utils.SinkConfigUtils;
import org.apache.pulsar.functions.utils.SourceConfigUtils;
import org.apache.pulsar.functions.utils.ValidatableFunctionPackage;
import org.apache.pulsar.functions.utils.functioncache.FunctionCacheEntry;
import org.apache.pulsar.functions.utils.functions.FunctionArchive;
import org.apache.pulsar.functions.utils.functions.FunctionUtils;
Expand Down Expand Up @@ -344,9 +349,12 @@ public void start(boolean blocking) throws Exception {
userCodeFile = functionConfig.getJar();
userCodeClassLoader = extractClassLoader(
userCodeFile, ComponentType.FUNCTION, functionConfig.getClassName());
ValidatableFunctionPackage validatableFunctionPackage =
new LoadedFunctionPackage(getCurrentOrUserCodeClassLoader(),
FunctionDefinition.class);
functionDetails = FunctionConfigUtils.convert(
functionConfig,
FunctionConfigUtils.validateJavaFunction(functionConfig, getCurrentOrUserCodeClassLoader()));
FunctionConfigUtils.validateJavaFunction(functionConfig, validatableFunctionPackage));
} else if (functionConfig.getRuntime() == FunctionConfig.Runtime.GO) {
userCodeFile = functionConfig.getGo();
} else if (functionConfig.getRuntime() == FunctionConfig.Runtime.PYTHON) {
Expand All @@ -356,24 +364,30 @@ public void start(boolean blocking) throws Exception {
}

if (functionDetails == null) {
functionDetails = FunctionConfigUtils.convert(functionConfig, getCurrentOrUserCodeClassLoader());
ValidatableFunctionPackage validatableFunctionPackage =
new LoadedFunctionPackage(getCurrentOrUserCodeClassLoader(),
FunctionDefinition.class);
functionDetails = FunctionConfigUtils.convert(functionConfig, validatableFunctionPackage);
}
} else if (sourceConfig != null) {
inferMissingArguments(sourceConfig);
userCodeFile = sourceConfig.getArchive();
parallelism = sourceConfig.getParallelism();
userCodeClassLoader = extractClassLoader(
userCodeFile, ComponentType.SOURCE, sourceConfig.getClassName());
functionDetails = SourceConfigUtils.convert(
sourceConfig,
SourceConfigUtils.validateAndExtractDetails(sourceConfig, getCurrentOrUserCodeClassLoader(), true));
ValidatableFunctionPackage validatableFunctionPackage =
new LoadedFunctionPackage(getCurrentOrUserCodeClassLoader(), ConnectorDefinition.class);
functionDetails = SourceConfigUtils.convert(sourceConfig,
SourceConfigUtils.validateAndExtractDetails(sourceConfig, validatableFunctionPackage, true));
} else if (sinkConfig != null) {
inferMissingArguments(sinkConfig);
userCodeFile = sinkConfig.getArchive();
transformFunctionFile = sinkConfig.getTransformFunction();
parallelism = sinkConfig.getParallelism();
userCodeClassLoader = extractClassLoader(
userCodeFile, ComponentType.SINK, sinkConfig.getClassName());
ValidatableFunctionPackage validatableFunctionPackage =
new LoadedFunctionPackage(getCurrentOrUserCodeClassLoader(), ConnectorDefinition.class);
if (isNotEmpty(sinkConfig.getTransformFunction())) {
transformFunctionCodeClassLoader = extractClassLoader(
sinkConfig.getTransformFunction(),
Expand All @@ -382,16 +396,19 @@ public void start(boolean blocking) throws Exception {
}

ClassLoader functionClassLoader = null;
ValidatableFunctionPackage validatableTransformFunction = null;
if (transformFunctionCodeClassLoader != null) {
functionClassLoader = transformFunctionCodeClassLoader.getClassLoader() == null
? Thread.currentThread().getContextClassLoader()
: transformFunctionCodeClassLoader.getClassLoader();
validatableTransformFunction =
new LoadedFunctionPackage(functionClassLoader, FunctionDefinition.class);
}

functionDetails = SinkConfigUtils.convert(
sinkConfig,
SinkConfigUtils.validateAndExtractDetails(sinkConfig, getCurrentOrUserCodeClassLoader(),
functionClassLoader, true));
SinkConfigUtils.validateAndExtractDetails(sinkConfig, validatableFunctionPackage,
validatableTransformFunction, true));
} else {
throw new IllegalArgumentException("Must specify Function, Source or Sink config");
}
Expand Down Expand Up @@ -458,7 +475,7 @@ private UserCodeClassLoader extractClassLoader(String userCodeFile, ComponentTyp
if (classLoader == null) {
if (userCodeFile != null && Utils.isFunctionPackageUrlSupported(userCodeFile)) {
File file = FunctionCommon.extractFileFromPkgURL(userCodeFile);
classLoader = FunctionCommon.getClassLoaderFromPackage(
classLoader = FunctionRuntimeCommon.getClassLoaderFromPackage(
componentType, className, file, narExtractionDirectory);
classLoaderCreated = true;
} else if (userCodeFile != null) {
Expand All @@ -480,7 +497,7 @@ private UserCodeClassLoader extractClassLoader(String userCodeFile, ComponentTyp
}
throw new RuntimeException(errorMsg + " (" + userCodeFile + ") does not exist");
}
classLoader = FunctionCommon.getClassLoaderFromPackage(
classLoader = FunctionRuntimeCommon.getClassLoaderFromPackage(
componentType, className, file, narExtractionDirectory);
classLoaderCreated = true;
} else {
Expand Down Expand Up @@ -696,7 +713,7 @@ private ClassLoader isBuiltInFunction(String functionType) throws IOException {
FunctionArchive function = functions.get(functionName);
if (function != null && function.getFunctionDefinition().getFunctionClass() != null) {
// Function type is a valid built-in type.
return function.getClassLoader();
return function.getFunctionPackage().getClassLoader();
} else {
return null;
}
Expand All @@ -710,7 +727,7 @@ private ClassLoader isBuiltInSource(String sourceType) throws IOException {
Connector connector = connectors.get(source);
if (connector != null && connector.getConnectorDefinition().getSourceClass() != null) {
// Source type is a valid built-in connector type.
return connector.getClassLoader();
return connector.getConnectorFunctionPackage().getClassLoader();
} else {
return null;
}
Expand All @@ -724,18 +741,18 @@ private ClassLoader isBuiltInSink(String sinkType) throws IOException {
Connector connector = connectors.get(sink);
if (connector != null && connector.getConnectorDefinition().getSinkClass() != null) {
// Sink type is a valid built-in connector type
return connector.getClassLoader();
return connector.getConnectorFunctionPackage().getClassLoader();
} else {
return null;
}
}

private TreeMap<String, FunctionArchive> getFunctions() throws IOException {
return FunctionUtils.searchForFunctions(functionsDir);
return FunctionUtils.searchForFunctions(functionsDir, narExtractionDirectory, true);
}

private TreeMap<String, Connector> getConnectors() throws IOException {
return ConnectorUtils.searchForConnectors(connectorsDir, narExtractionDirectory);
return ConnectorUtils.searchForConnectors(connectorsDir, narExtractionDirectory, true);
}

private SecretsProviderConfigurator getSecretsProviderConfigurator() {
Expand Down
Loading
Loading