Skip to content
This repository has been archived by the owner on May 8, 2021. It is now read-only.

Example for setup in a cluster #6

Open
Teots opened this issue Jan 10, 2013 · 7 comments
Open

Example for setup in a cluster #6

Teots opened this issue Jan 10, 2013 · 7 comments

Comments

@Teots
Copy link

Teots commented Jan 10, 2013

Hello,
I downloaded mupd8 and did everything in the quickstart tutorial which worked like a charm. Thanks for that. Now I want to continue and execute the example in a cluster environment, but I couldn't find any tutorials for that. So I don't know what I have to change in the config files or in the program itself.
Can anybody help me with this issue?!

@wlam
Copy link
Contributor

wlam commented Jan 11, 2013

Yes! You're right, the documentation still needs to advance to the next step/for multi-node operation.
In the meanwhile, let me add some quick remarks about the current system here so that you can proceed:

Configuration

  1. Add a messageserver section: { "mupd8" : { "messageserver" : { "host" : "<machine>" , "port" : <port number> } } }.
    The MessageServer is a relay that, for example, propagates host-down notifications discovered by any other node to all nodes.
  2. Name multiple machines in { "mupd8" : { "system_hosts" : [ "<machine1>", "<machine2>", ... ] } }. Each node runs off a copy of the same configuration directory, so please use hostnames that resolve to the same machines consistently across the cluster (e.g., avoid localhost, which is convenient for a single-machine quickstart but refers to different machines depending on where you look it up).

Operation

  1. Apply your application code/files, including configuration, on all the machines where you want to run (e.g., rdist, Puppet,...).
  2. Start the application cluster-wide:
  • Start the MessageServer on the host named in the messageserver configuration:

java com.walmartlabs.mupd8.MessageServer -pidFile <PID filename> -d <configuration directory>

  • Start Mupd8Main on each machine in system_hosts (at about the same time) after MessageServer has started, like you did for the single-machine case:

java com.walmartlabs.mupd8.Mupd8Main -pidFile <PID filename> -d <configuration directory>...

It so happens that the source tree currently includes a file called testapp.sh in the root directory. In testapp.sh, you can see a simple example of these command-line parameters and a pointer to an example messageserver configuration (though its configuration still names a single localhost rather than multiple 'real' hostnames).

I'm glad the quickstart worked for you. We definitely want to expand the documentation to cover this very natural scenario you're asking about. (Thanks for your report!)

@Teots
Copy link
Author

Teots commented Jan 11, 2013

Thank you for your response. I used your information and the testapp.sh to start the MessageServer and the Mupd8Main on another node. Unfortunately, I always get a broken pipe when the Mupd8Main tries to connect to the MessageServer.

MessageServer logs:
[d] is provided attemp to listen TO :8888 server started, listening 8888 register 192.168.2.32 broken pipe 192.168.2.32

After I added e.printStackTrace() after line 181 in the MessageServer.scala file I also got the following stacktrace as reason for the exception:
java.lang.NullPointerException at com.walmartlabs.mupd8.messaging.MessageParser$.getMessage(MessageParser.scala:25) at com.walmartlabs.mupd8.RequestHandler.cmdResponse(MessageServer.scala:208) at com.walmartlabs.mupd8.RequestHandler.run(MessageServer.scala:177) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) at java.lang.Thread.run(Thread.java:662)

Mupd8Main logs:
Registered Bean walmartlabs.com:name=T10Mapper Registered Bean walmartlabs.com:name=K2Updater Registered Bean walmartlabs.com:name=K1Updater Registered Bean walmartlabs.com:name=K4Updater Registered Bean walmartlabs.com:name=K3Updater Host id is 1 2013-01-11 14:26:31,527 [main] INFO org.scale7.cassandra.pelops.Cluster - Dynamic node discovery is disabled, using [192.168.2.28] as a static list of nodes use compression codec gzip Getting keyspaces from Cassandra Cluster 192.168.2.28:9160[OK] Checking for keyspace Mupd8[OK] 2013-01-11 14:26:31,594 [main] INFO org.scale7.cassandra.pelops.pool.CommonsBackedPool - Initialising pool configuration policy: Config{maxActivePerNode=20, maxTotal=-1, maxIdlePerNode=10, minIdlePerNode=10, maxWaitForConnection=12000, testConnectionsWhileIdle=true, timeBetweenScheduledMaintenanceTaskRunsMillis=60000, nodeDownSuspensionMillis=10000} 2013-01-11 14:26:31,597 [main] INFO org.scale7.cassandra.pelops.pool.CommonsBackedPool - Initialising pool node selection strategy: org.scale7.cassandra.pelops.pool.LeastLoadedNodeSelectionStrategy@f102d3 2013-01-11 14:26:31,597 [main] INFO org.scale7.cassandra.pelops.pool.CommonsBackedPool - Initialising pool node suspension strategy: org.scale7.cassandra.pelops.pool.NoOpNodeSuspensionStrategy@23f1bb 2013-01-11 14:26:31,598 [main] INFO org.scale7.cassandra.pelops.pool.CommonsBackedPool - Initialising pool connection validator: org.scale7.cassandra.pelops.pool.DescribeVersionConnectionValidator@a0864f 2013-01-11 14:26:31,613 [main] INFO org.scale7.cassandra.pelops.pool.CommonsBackedPool - Pre-initialising connections for nodes: [192.168.2.28:9160] 2013-01-11 14:26:31,613 [main] INFO org.scale7.cassandra.pelops.pool.CommonsBackedPool - Adding node '192.168.2.28' to the pool... 2013-01-11 14:26:31,615 [main] INFO org.scale7.cassandra.pelops.pool.CommonsBackedPool - Registering MBean 'com.scale7.cassandra.pelops.pool:type=PooledNode-Mupd8-192.168.2.28'... 2013-01-11 14:26:31,629 [main] INFO org.scale7.cassandra.pelops.pool.CommonsBackedPool - Running maintenance tasks during initialization... 2013-01-11 14:26:31,636 [main] INFO org.scale7.cassandra.pelops.pool.CommonsBackedPool - Configuring scheduled tasks to run every 60000 milliseconds 2013-01-11 14:26:31,639 [main] INFO org.scale7.cassandra.pelops.pool.CommonsBackedPool - Registering MBean 'com.scale7.cassandra.pelops.pool:type=Pool-Mupd8'... 2013-01-11 14:26:31,641 [main] INFO org.scale7.cassandra.pelops.Pelops - Pelops adds new pool Mupd8 Memory available for use by Slate Cache is 184182374 bytes SERVER - bound to *:6101 Host id 0 is (192.168.2.28,6101) --- CLIENT - Failed to connect to server at Teots-PC:6101 ... (the line above is repeated quite a number of times) ... --- CLIENT - Failed to connect to server at Teots-PC:6101 Failed to connect to0 192.168.2.28:6101 Host id 1 is (192.168.2.32,6101) Registered Bean walmartlabs.com:name= node_statistics for Teots-Laptop_127.0.1.1 start source from sys cfg 2013-01-11 14:26:45,889 [main] INFO com.walmartlabs.mupd8.Mupd8Main$ - Goodbye Uncaught Throwable for thread MessageServerClient : 1, exiting. java.lang.ArrayIndexOutOfBoundsException: 1 at com.walmartlabs.mupd8.messaging.MessageParser$.getMessage(MessageParser.scala:27) at com.walmartlabs.mupd8.MessageServerClient.messageDecoder(MessageServerClient.scala:44) at com.walmartlabs.mupd8.MessageServerClient.processMessage(MessageServerClient.scala:124) at com.walmartlabs.mupd8.MessageServerClient.run(MessageServerClient.scala:64) at java.lang.Thread.run(Thread.java:662)

mupd8.log:
14:26:31,527 INFO (Cluster.java:158) - Dynamic node discovery is disabled, using [192.168.2.28] as a static list of nodes 14:26:31,594 INFO (CommonsBackedPool.java:110) - Initialising pool configuration policy: Config{maxActivePerNode=20, maxTotal=-1, maxIdlePerNode=10, minIdlePerNode=10, maxWaitForConnection=12000, testConnectionsWhileIdle=true, timeBetweenScheduledMaintenanceTaskRunsMillis=60000, nodeDownSuspensionMillis=10000} 14:26:31,597 INFO (CommonsBackedPool.java:113) - Initialising pool node selection strategy: org.scale7.cassandra.pelops.pool.LeastLoadedNodeSelectionStrategy@f102d3 14:26:31,597 INFO (CommonsBackedPool.java:116) - Initialising pool node suspension strategy: org.scale7.cassandra.pelops.pool.NoOpNodeSuspensionStrategy@23f1bb 14:26:31,598 INFO (CommonsBackedPool.java:119) - Initialising pool connection validator: org.scale7.cassandra.pelops.pool.DescribeVersionConnectionValidator@a0864f 14:26:31,613 INFO (CommonsBackedPool.java:135) - Pre-initialising connections for nodes: [192.168.2.28:9160] 14:26:31,613 INFO (CommonsBackedPool.java:412) - Adding node '192.168.2.28' to the pool... 14:26:31,615 INFO (PooledNode.java:71) - Registering MBean 'com.scale7.cassandra.pelops.pool:type=PooledNode-Mupd8-192.168.2.28'... 14:26:31,629 INFO (CommonsBackedPool.java:157) - Running maintenance tasks during initialization... 14:26:31,636 INFO (CommonsBackedPool.java:165) - Configuring scheduled tasks to run every 60000 milliseconds 14:26:31,639 INFO (CommonsBackedPool.java:150) - Registering MBean 'com.scale7.cassandra.pelops.pool:type=Pool-Mupd8'... 14:26:31,641 INFO (Pelops.java:72) - Pelops adds new pool Mupd8 14:26:45,889 INFO (slf4j.scala:126) - Goodbye

The port 6101 is not the one I configured. I guess this is related to the mupd8 status, which I configured as well. Do I have to start a special server for this?

localhost.cfg
"messageserver" : { "host" : "192.168.2.28", "port" : 8888 }, "mupd8_status" : { "http_port" : 6001 }

The commands I used to start the system:
java com.walmartlabs.mupd8.MessageServer -pidFile server.pid -d 'pwd'/src/main/config/testapp
java com.walmartlabs.mupd8.Mupd8Main -pidFile main.pid -threads 6 -d 'pwd'/src/main/config/testapp

@wlam
Copy link
Contributor

wlam commented Jan 12, 2013

If I'm interpreting your logs correctly, I think you had configured two servers to run Mupd8Main, but started only one.

To see the TestApp application go, I suggest either configuring only one system_host to start (to verify that TestApp is configured and running for you), or starting both system_host machines' Mupd8Main JVMs at about the same time. (In your logs above, it appears that the Teots-PC one didn't start or was unable to reach the MessageServer and Teots-Laptop at all.)

(Independent of the above comment: Even when a host has failed/gone missing, I don't think that Exception you logged above is supposed to happen; it may be related to a regression from a recent large merge/checkin that I saw in internal testing lately. A checkin to address the regression should hit the public tree soon, so you may want to pull shortly if you're near master/HEAD.)

@Teots
Copy link
Author

Teots commented Jan 12, 2013

Yes I had two servers configured. This is my localhost.cfg:
`{
"mupd8" : {

"system_hosts" : [
     "192.168.2.28", "192.168.2.32"
],

"java_setting" : "-Xmx512M",

"messageserver" : {
     "host" : "192.168.2.28",
     "port" : 8888
},

"mupd8_status" : {
"http_port" : 6001 
},

"slate_store" : {
     "type" : "Cassandra",
     "hosts" : [ "192.168.2.28" ],
     "port" : 9160,
     "keyspace" : "Mupd8"
},

"sources" : [
    {
      "source" : "com.walmartlabs.mupd8.JSONSource",
      "host" : "192.168.2.28",
      "performer" : "T10Source",
      "parameters" : [ "192.168.2.28",  "19200" ]
    }
]

}

}`

But even when I use only one system host, the example is not running. To find out why, I'll add some questions about the commands in testapp.sh

  1. nohup java com.walmartlabs.mupd8.Mupd8Main -pidFile $pidfile -key k1 -from file:$HOME/data/T10.data -to T10Source -threads 6 -spwd/src/main/config/testapp/sys_old -apwd/src/main/config/testapp/app_old &

According to the allowed parameters of Mupd8Main, neither -key nor -from are supported parameters. Also the files sys_old and app_old are at completely different locations. Am I supposed to change this to something like nohup java com.walmartlabs.mupd8.Mupd8Main -pidFile $pidfile -key k1 -from file:$HOME/data/T10.data -to T10Source -threads 6 -spwd/src/main/config/testapp/sys_old -apwd/src/main/config/testapp/app_old & ? Even this just give me
Uncaught Throwable for thread main : null, exiting. java.lang.NullPointerException at com.walmartlabs.mupd8.AppStaticInfo.<init>(Mupd8Main.scala:748) at com.walmartlabs.mupd8.Mupd8Main$$anonfun$main$5$$anonfun$apply$60.apply(Mupd8Main.scala:1370) at com.walmartlabs.mupd8.Mupd8Main$$anonfun$main$5$$anonfun$apply$60.apply(Mupd8Main.scala:1360) at scala.Option.map(Option.scala:133) at com.walmartlabs.mupd8.Mupd8Main$$anonfun$main$5.apply(Mupd8Main.scala:1360) at com.walmartlabs.mupd8.Mupd8Main$$anonfun$main$5.apply(Mupd8Main.scala:1354) at scala.Option$WithFilter.flatMap(Option.scala:175) at com.walmartlabs.mupd8.Mupd8Main$.main(Mupd8Main.scala:1354) at com.walmartlabs.mupd8.Mupd8Main.main(Mupd8Main.scala)
which corresponds to this line in the code val cassPort = config.getScopedValue(Array("mupd8", "slate_store", "port")).asInstanceOf[Number].intValue()

If I just pass the config directory like this nohup java com.walmartlabs.mupd8.Mupd8Main -pidFile $pidfile -toT10Source -threads 6 -d pwd/src/main/config/testapp/ &I get the following exceptions: 2013-01-12 11:51:07,359 [SourceReader:T10Source:192.168.2.28:19200] ERROR com.walmartlabs.mupd8.JSONSource - JSONSource: socketReader hit exception java.lang.ArrayIndexOutOfBoundsException: 1 at com.walmartlabs.mupd8.JSONSource$$anonfun$socketReader$1.apply(SourceReader.scala:73) at com.walmartlabs.mupd8.JSONSource$$anonfun$socketReader$1.apply(SourceReader.scala:73) at grizzled.slf4j.Logger.info(slf4j.scala:126) at grizzled.slf4j.Logging$class.info(slf4j.scala:266) at com.walmartlabs.mupd8.JSONSource.info(SourceReader.scala:41) at com.walmartlabs.mupd8.JSONSource.socketReader(SourceReader.scala:73) at com.walmartlabs.mupd8.JSONSource.constructReader(SourceReader.scala:58) at com.walmartlabs.mupd8.JSONSource.readLine(SourceReader.scala:103) at com.walmartlabs.mupd8.JSONSource.hasNext(SourceReader.scala:92) at com.walmartlabs.mupd8.AppRuntime$SourceThread$1$$anonfun$run$4.apply$mcV$sp(Mupd8Main.scala:1204) at scala.util.control.Breaks.breakable(Breaks.scala:39) at com.walmartlabs.mupd8.AppRuntime$SourceThread$1.run(Mupd8Main.scala:1201) at java.lang.Thread.run(Thread.java:662)

All in all I'm quite confused about how to run the example even on just one machine. And even conceptually I'm not sure how to execute an application on multiple machines. Other stream processing engines have some master where all workers are registered. Then a job is passed to this master and it distributes the job onto all worker machines. But how is something similar done in your system?

@wlam
Copy link
Contributor

wlam commented Jan 12, 2013

Sorry for the confusion--let me try to address your questions from the bottom up:

Toward the questions

  • When an application is configured to run on multiple machines, each system_host needs to run a Mupd8Main JVM and the messageserver host needs to run a MessageServer JVM. When a Mupd8Main starts, it connects to MessageServer and to other Mupd8Main JVMs, and the Mupd8Main JVMs share the processing load (e.g., the caching and processing of Updater slates). That's why I ask that you start all the Mupd8Main JVMs near the same time, so that they can all connect to MessageServer and find each other. Once all the JVMs find each other, the Mupd8Main JVMs connect to their source streams and start processing events from them.
  • For question 1): testapp.sh has several commented-out command lines. The one that is uncommented and most appropriate is the last one, just like the commands you used to start the system in your earlier comment:

nohup java com.walmartlabs.mupd8.Mupd8Main -pidFile $pidfile -threads 6 -d pwd/src/main/config/testapp -statistics true

--so that you use the src/main/config/testapp/*.cfg files as your configuration. (-statistics true is also optional for this thread, so for simplicity let's skip it.)

(Sorry about the other commented-out command lines in testapp.sh; the quickstart does not mention those parameters to avoid their [historic] complexity. In retrospect, I regret mentioning testapp.sh--which is more of a developer-testing application--at all and distracting you from your already running quickstart example, which is specifically written to express only the interesting parts. Apparently, testapp.sh was more confounding than helpful, since the command lines you ran directly were pretty reasonable.)

Toward a more concrete example

To make concrete progress, let's use your quickstart application directly, since you are already running a single-machine instance of it. To distribute your application, add a second system_host and a messageserver to your quickstart application's configuration JSON. Since you've named some hosts above, I'll use them to flesh out this example of the changes:

{
  "mupd8" : {
    "messageserver" : {
     "host" : "192.168.2.28",
     "port" : 8888
    },
    "system_hosts" : [  "192.168.2.28", "192.168.2.32" ]
  }
}

Once you've updated the configuration file, you can run your commands as you described, but for all the machines you named:

Run MessageServer first:
ssh 192.168.2.28 java com.walmartlabs.mupd8.MessageServer -pidFile messageserver.pid -d <configuration directory>

...then run the Mupd8Main together:
ssh 192.168.2.28 java com.walmartlabs.mupd8.Mupd8Main -pidFile mupd8.pid -d <configuration directory>
ssh 192.168.2.32 java com.walmartlabs.mupd8.Mupd8Main -pidFile mupd8.pid -d <configuration directory>

I've put ssh in front to be explicit, but you see how each system_host gets one Mupd8Main JVM. (The first time you try it, I encourage you to run each java command in a separate window to keep any console output you retain separate.) The JVMs share the same configuration-directory contents, so they all know about each other because they're all listed in the configuration.

Toward more events to distribute

The quickstart walkthrough created a sample.txt input stream of only one event inside, but if you add more lines of JSON events (containing various different request:requestUrl filename extensions), Mupd8 will automatically know to count some extensions' events on one machine, and other extensions' events on another (allowing you to use all system_hosts machines' RAM more effectively to cache more extensions' counters).

You don't have to know which extension's events is counted on which machine; you can ask any Mupd8Main's HTTP server the same URI path (e.g., http://192.168.2.28:6001/mupd8/slate/mupd8_demo/count_by_extension/gif or http://192.168.2.32:6001/mupd8/slate/mupd8_demo/count_by_extension/gif) and still get the current answer from whichever machine is doing the counting.

I hope this helps! Please let me know how it works for you.

@Teots
Copy link
Author

Teots commented Jan 12, 2013

Thanks to you again! Finally, this example works now. Even distributed on two nodes :) I'll run some bigger examples soon. Maybe I can document them and let them flow back into your project as examples for others.

@wlam
Copy link
Contributor

wlam commented Jan 13, 2013

That sounds great--I'm glad that it's working for you!

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Development

No branches or pull requests

2 participants