Skip to content
Oleg Zhurakousky edited this page Apr 30, 2014 · 137 revisions

== YARN Assembly Java DSL

Isolation form native YARN API

Application Container styles:

Distributed Processor - YARN HPC Grid

Nested and deterministic YARN Containers

Transparent classpath management

Testing:

==

YARN Assembly Java DSL

YAYA comes with its own intuitive, type-safe and "hold-your-hand" style language (DSL) which greatly simplifies YARN application development. The DSL eliminates any potential for mistakes during the assembly of YARN Application. It is accomplished through builder chaining technique. For example; Below is the assembly for the Java-based YARN Application

YarnApplication<DataProcessor> yarnApplication = YarnAssembly.forApplicationContainer("ls -all").
				containerCount(2).
				memory(256).withApplicationMaster(new YarnConfiguration()).
					maxAttempts(2).
					memory(512).
					build("SampleUnixCommandApp");
yarnApplication.launch();

You can see that you begin by assembling your Application Container specification. You do so by calling YarnAssembly.forApplicationContainer(..) method. That is the only method you can invoke on YarnAssembly. Once invoked, you can invoke any of the available methods to override default setting for Application Container (e.g., memory, virtualCores etc). Invocation of any available method will exclude such method from subsequence invocation. For example; YarnAssembly.forApplicationContainer("ls -all").containerCount(2).containerCount(2) will not compile, thus precluding you from making assembly mistakes.

Once you've provided specification for your Application Container you can now define specification for the Application Master by invoking withApplicationMaster(..) method. You can invoke this method at any time, but once invoked you will no longer be able to make any changes to the Application Container specification. Once all of the specification parameters were set on the Application Master specification you call build("application_name") method which will return YarnApplication.

Once you have an instance of YarnApplication you can no longer update specification of either Application Container or Application Master. You are now ready to launch your application by simply calling YarnApplication.launch() method which returns void or DataProcessor. For more information on this return type and what it means read the next section - Application Container styles.

==

Isolation form YARN native API

By defining simple strategies and assembly API (DSL) around core YARN abstractions such as Application Master and Application Container, YAYA completely isolates developer from the YARN native API. Technically with YAYA you can develop and test the functionality of your YARN application in complete isolation form YARN native API.

==

Application Container styles

YAYA supports a variety of Application Container styles

Command-based Application Containers

These are the simplest Application Containers and essentially delegate the execution of any command to YARN. For example:

YarnApplication<Void> yarnApplication = YarnAssembly.forApplicationContainer("ls -all").
				containerCount(2).
				withApplicationMaster(new YarnConfiguration()).
					build("SampleUnixCommandApp");
yarnApplication.launch();

The above will simply launch two Application Containers each executing ls -all command writing its output to the container's log directory.

Java-based Application Containers

Similar to the Command-based Application Containers Java-based Application Containers allows user to invoke custom java code. For example:

YarnApplication<Void> yarnApplication = 
     YarnAssembly.forApplicationContainer(SimpleEchoContainer.class, ByteBuffer.wrap("Hello".getBytes())).
				containerCount(2).
				withApplicationMaster().
					maxAttempts(2).
                    build("sample-yarn-application");
yarnApplication.launch();
. . .
public static class SimpleEchoContainer implements ApplicationContainer {
	@Override
	public ByteBuffer process(ByteBuffer inputMessage) {
		return inputMessage;
	}
}

The above will simply launch two Application Containers each invoking SimpleEchoContainer's process(ByteBuffer) method. In the above case the input to the SimpleEchoContainer comes in a form of a ByteBuffer and contains "Hello" string which will be echoed right back by the Application Container and written to the container's log directory. You can also return null from the ApplicationContainer if you don't want/have any output.

Java-based Long-running and interact-able Application Containers

Aside from simplifying development of simple Java and/or Command-based Application Containers, YAYA introduces the concept of a long-running interact-able Application Containers. These are essentially reusable containers that stay alive after completing their work ready to accept more work. This feature greatly improves the performance of YARN by allowing creation of Application Containers that could be reused without being re-launched which caries a certain latency. This way similar and subsequent tasks could be delegated to these reusable containers via simple messaging. Upon completion YARN application is shut down by invoking shutDown() method for graceful shutdown or terminate() method for immediate termination.

Let's look at the example:

YarnApplication<DataProcessor> yarnApplication = YarnAssembly.forApplicationContainer(DemoEchoContainer.class).
		containerCount(4).
		withApplicationMaster(new YarnConfiguration()).
			maxAttempts(2).
			build("InteractableYarnApplicationContainersEmulatorDemo");

final DataProcessor dataProcessor = yarnApplication.launch();
executor.execute(new Runnable() {
        @Override
        public void run() {
            for (int i = 0; i < 30; i++) {
                dataProcessor.process(ByteBuffer.wrap(("" + i).getBytes()));
            }
        }
});
		
Thread.sleep(2000); //let it run for a bit and then shutdown
/*
 * NOTE: This is a graceful shutdown, letting 
 * currently running Application Containers to finish, while
 * not accepting any more. So you may see a "Rejecting submission..." message in the logs.
 */
yarnApplication.shutDown();

First, you'll notice that launch() method actually returns DataProcessor instead of a Void. This is a type-trick of of YarnAssembly builder and is determined at the compile time based on which forApplicationContainer(..) factory method was used when Application Container was first defined.

DataProcessor acts as a proxy object to an Application Container and exposes void process(ByteBuffer) method which represents the actual message exchange between the client and an Application Container it proxies.

DataProcessor also exposes another interesting feature. Since it maintains the location information about proxied Application Containers they become addressable allowing you to deterministically choose an Application Container you want to interact with. All you need to do is to invoke void process(ByteBufer, String) method where String is the Regular Expression for IP Filtering. For example: Providing this regular expression 192\.168\.19\.(1[0-5]) will only send process requests to Application Containers running on the hosts matching the IP filter provided.

==

Distributed Processor

Yarn's ability to host distributed containers in Hadoop and Long-running Containers feature of the framework exposes a unique opportunity for creating a Distributed high-performance Computing Grid. Once YarnApplication is assembled as set of Long-running application containers, call to YarnApplication.launch() method returns DataProcessor, which is essentially a proxy, aware of both the liveliness and the availability state of all Application Containers deployed as part of the current application. DataProcessor exposes two methods process(ByteBuffer data) and process(ByteBuffer data, String ipRegexFilter), which while invoked locally will delegate the actual processing to the ApplicationContainerProcessor of the first available Application Container (the first method) or the first available Application Container(s) who's host IP address matches the Regular Expression provided via ipRegexFilter argument (e.g., "192\.168\.19\.(1[0-5])"). Call to process(..) methods will only block if DataProcessor determines that no Application Containers are currently available (all busy processing previous tasks), thus ensuring that your distributed grid is protected from task over-saturation (task queueing). The replies produced by ApplicationContainerProcessor will be either logged or sent to the DataProcessorReplyListener if one is registered via DataProcessor.registerReplyListener(DataProcessorReplyListener) method.

This example shows it.

==

Nested and deterministic YARN Containers

While still in works, this feature allows new YARN applications to be created and launched from within the running Application Containers. Based on the simple message exchange and your implementation of an Application Container you can create and launch another YARN Application from another Application Container.

==

Transparent classpath management

Code-and-run approach to classpath management where the framework will take care of the classpath calculation and provisioning throughout the cluster.

Whether you are in the IDE session changing and testing code or deploying your YARN application from the command line, you don't ever have to worry about the classpath of your YARN app. The framework takes care of packaging and provisioning of your current classpath across the entire YARN environment. One of the key features is automatic generation of the JAR for your current development environment allowing your last second change to be transparently reflected on your next launch. It will also perform a cleanup, upon the completion of your application.

You don't have to do anything to enjoy it. It just there.

However, your current classpath may (and hopefully does) include testing JARs (e.g., junit, mockito etc.). Obviously those are very valuable tools necessary to develop quality applications, but they present no value in your deployment and therefore don't need to be propagated. The framework allows you to influence which JAR files are going to be propagated into the launch classpath via classpath.filters file that contains a list of regular expressions for JARs to be excluded. Default values are:

junit
mockito
hamcrest

By including classpath.filters file in your application classpath you can change the default behavior and specify your own set of exclusion patterns. If you don't want to exclude anything simply provide an empty classpath.filters file.

==

Testing

==

Mini-cluster

A simplified and cleaned up version of Apache YARN mini-cluster tailored specifically for testing of YAYA applications.

==

In-JVM Container Executor

YARN Application Masters and Application Containers are executed in its own JVM which makes it very hard to debug while developing. InJvmContainerExecutor was developed specifically to test your application in mini-cluster allowing Application Masters and Application Containers to run in the same JVM as the mini-cluster. Throw a break point in your code, start mini-cluster in debug mode and that's it. To enable In-JVM Container executor all you need to do is change server configuration and have yarn.nodemanager.container-executor.class property point to oz.hadoop.yarn.test.cluster.InJvmContainerExecutor (see below).

<property>
    <description>who will execute(launch) the containers.</description>
    <name>yarn.nodemanager.container-executor.class</name>
    <value>oz.hadoop.yarn.test.cluster.InJvmContainerExecutor</value>
</property>

In the current state of the code this property has already been set. You can see it in mini-cluster's yarn-site.xml

==

YARN Emulator

Testing YARN based applications is not simple. What adds to the complexity is the fact that often YARN Infrastructure issues interfere with testing the functionality of an actual application. YARN Emulator eliminates YARN completely from the equation during the development and functional testing of your application while preserving some important YARN semantics such as Application Master, Application Container(s) etc., allowing you to concentrate on implementing and testing the functionality of your application before it is deployed in YARN.

The most important thing is that your code will never change, nor do you need to do anything to enjoy developing and running your application in YARN emulator. All you need to do is exclude YarnConfiguration from your application. For example:

The following application assembly will run in the cluster

YarnApplication<Void> yarnApplication = YarnAssembly.forApplicationContainer("ping -c 4 yahoo.com").
					containerCount(2).
					memory(512).withApplicationMaster(new YarnConfiguration()).
							maxAttempts(2).
							priority(2).
							build("ApplicationInEmulator");
yarnApplication.launch();

. . . and this one will run in YARN Emulator

YarnApplication<Void> yarnApplication = YarnAssembly.forApplicationContainer("ping -c 4 yahoo.com").
					containerCount(2).
					memory(512).withApplicationMaster().
							maxAttempts(2).
							priority(2).
							build("ApplicationInEmulator");
yarnApplication.launch();

==

Please send question and updates via pull requests and/or raising issues on this project.

Cheers!