From 9cbe9c60ce3583d43f9739d61812760be4f9c148 Mon Sep 17 00:00:00 2001 From: Kam Kasravi Date: Thu, 5 Nov 2015 18:26:29 -0800 Subject: [PATCH] Fixes #2 Extend tap config helper to handle zookeeper & kafka --- src/main/java/io/gearpump/tap/TapConfig.java | 52 +++++++++++++- .../java/io/gearpump/tap/TapJsonConfig.java | 71 +++++++++++++++---- 2 files changed, 109 insertions(+), 14 deletions(-) diff --git a/src/main/java/io/gearpump/tap/TapConfig.java b/src/main/java/io/gearpump/tap/TapConfig.java index 570ce3a..6a0d670 100644 --- a/src/main/java/io/gearpump/tap/TapConfig.java +++ b/src/main/java/io/gearpump/tap/TapConfig.java @@ -56,11 +56,43 @@ * }, * "name": "service instance name" * } - * ] + * ], + * "kafka": [ + * { + * "credentials": { + * "uri": "10.10.10.201:9092,10.10.10.50:9092,10.10.10.210:9092" + * }, + * "label": "kafka", + * "name": "gearpump_kafka", + * "plan": "shared", + * "tags": [ + * "kafka", + * "distributed", + * "real-time", + * "messaging" + * ] + * } + * ], + * "zookeeper": [ + * { + * "credentials": { + * "kerberos": { + * "kdc": "", + * "krealm": "" + * }, + * "zk.cluster": "10.10.10.201:2181,10.10.10.50:2181,10.10.10.210:2181", + * "zk.node": "/org/intel/zookeeperbroker/metadata/34ca13ec-18e4-4598-b2a2-c6ce981861b0" + * }, + * "label": "zookeeper", + * "name": "gearpump_zookeeper", + * "plan": "shared", + * "tags": [] + * } + * ] * } * } * - * "hbase", "hdfs" keys represent srvice types. There can by many instances within given type. + * "hbase", "hdfs" keys represent service types. There can by many instances within given type. * The instances of the same type are distinguished by service name ("name" key) */ public interface TapConfig { @@ -80,6 +112,22 @@ public interface TapConfig { */ Configuration getHDFSConfig(String serviceId); + /** + * get a Kafka Configuration by kafka instance Id + * + * @param serviceId + * @return + */ + Configuration getKafkaConfig(String serviceId); + + /** + * get a Zookeeper Configuration by zookeeper instance Id + * + * @param serviceId + * @return + */ + Configuration getZookeeperConfig(String serviceId); + /** * Get a namespace provisioned for given HBase service by TAP. * diff --git a/src/main/java/io/gearpump/tap/TapJsonConfig.java b/src/main/java/io/gearpump/tap/TapJsonConfig.java index 5f1fe33..0908fb8 100644 --- a/src/main/java/io/gearpump/tap/TapJsonConfig.java +++ b/src/main/java/io/gearpump/tap/TapJsonConfig.java @@ -26,15 +26,23 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.Map; public class TapJsonConfig implements TapConfig { private static final String TAP_CONFIG_ROOT = "VCAP_SERVICES"; private static final String TAP_CONFIG_HBASE_TYPE = "hbase"; + private static final String TAP_CONFIG_HDFS_TYPE = "hdfs"; + private static final String TAP_CONFIG_KAFKA_TYPE = "kafka"; + private static final String TAP_CONFIG_ZOOKEEPER_TYPE = "zookeeper"; private static final String TAP_CONFIG_INSTANCE_ID_KEY = "name"; private static final String TAP_CONFIG_NAMESPACE_KEY = "hbase.namespace"; private static final String HADOOP_CONFIG_KEY_VALUE = "HADOOP_CONFIG_KEY"; + private static final String KAFKA_URI = "uri"; + private static final String KAFKA_BROKERS = "brokers"; + private static final String ZOOKEEPER_CLUSTER = "zk.cluster"; + private static final String ZOOKEEPER_SERVERS = "zookeepers"; private static final String CREDENTIALS_KEY_VALUE = "credentials"; private final T tapConfig; @@ -68,14 +76,37 @@ private T serviceNodeForInstance(ArrayList serviceNodes, String instanceName) return result; } - private static Configuration createConfiguration(T hbaseService) { + private static Configuration createConfiguration(T service, String type) { Configuration result = null; - if (hbaseService != null) { - result = new Configuration(); - Map credentials = (Map) hbaseService.get(CREDENTIALS_KEY_VALUE); - Map values = (Map) credentials.get(TapJsonConfig.HADOOP_CONFIG_KEY_VALUE); - values.forEach(result::set); + if (service != null) { + result = new Configuration(); + Map credentials; + Map values; + switch(type) { + case TAP_CONFIG_HBASE_TYPE: + credentials = (Map) service.get(CREDENTIALS_KEY_VALUE); + values = (Map) credentials.get(TapJsonConfig.HADOOP_CONFIG_KEY_VALUE); + values.forEach(result::set); + break; + case TAP_CONFIG_HDFS_TYPE: + credentials = (Map) service.get(CREDENTIALS_KEY_VALUE); + values = (Map) credentials.get(TapJsonConfig.HADOOP_CONFIG_KEY_VALUE); + values.forEach(result::set); + break; + case TAP_CONFIG_KAFKA_TYPE: + credentials = (Map) service.get(CREDENTIALS_KEY_VALUE); + values = new HashMap(); + values.put(TapJsonConfig.KAFKA_BROKERS, credentials.get(TapJsonConfig.KAFKA_URI).toString()); + values.forEach(result::set); + break; + case TAP_CONFIG_ZOOKEEPER_TYPE: + credentials = (Map) service.get(CREDENTIALS_KEY_VALUE); + values = new HashMap(); + values.put(TapJsonConfig.ZOOKEEPER_SERVERS, credentials.get(TapJsonConfig.ZOOKEEPER_CLUSTER).toString()); + values.forEach(result::set); + break; + } } return result; @@ -91,17 +122,33 @@ public String getHBaseNamespace(String instanceName) { @Override public Configuration getHBase(String instanceName) { - ArrayList hbases = getServiceNodes(TAP_CONFIG_HBASE_TYPE); - T serviceNode = serviceNodeForInstance(hbases, instanceName); + ArrayList hbase = getServiceNodes(TAP_CONFIG_HBASE_TYPE); + T serviceNode = serviceNodeForInstance(hbase, instanceName); - return createConfiguration(serviceNode); + return createConfiguration(serviceNode, TAP_CONFIG_HBASE_TYPE); } @Override public Configuration getHDFSConfig(String instanceName) { - ArrayList hbases = getServiceNodes(TAP_CONFIG_HBASE_TYPE); - T serviceNode = serviceNodeForInstance(hbases, instanceName); + ArrayList hdfs = getServiceNodes(TAP_CONFIG_HDFS_TYPE); + T serviceNode = serviceNodeForInstance(hdfs, instanceName); + + return createConfiguration(serviceNode, TAP_CONFIG_HDFS_TYPE); + } + + @Override + public Configuration getKafkaConfig(String instanceName) { + ArrayList kafka = getServiceNodes(TAP_CONFIG_KAFKA_TYPE); + T serviceNode = serviceNodeForInstance(kafka, instanceName); + + return createConfiguration(serviceNode, TAP_CONFIG_KAFKA_TYPE); + } + + @Override + public Configuration getZookeeperConfig(String instanceName) { + ArrayList zookeeper = getServiceNodes(TAP_CONFIG_ZOOKEEPER_TYPE); + T serviceNode = serviceNodeForInstance(zookeeper, instanceName); - return createConfiguration(serviceNode); + return createConfiguration(serviceNode, TAP_CONFIG_ZOOKEEPER_TYPE); } }