diff options
author | Olaf Flebbe <of@oflebbe.de> | 2016-12-17 20:23:23 +0000 |
---|---|---|
committer | Olaf Flebbe <of@oflebbe.de> | 2016-12-17 21:51:25 +0100 |
commit | 1b34df0e87f347c01e0f2e56b0a835aff314e31a (patch) | |
tree | a5bd08684f9d37ccc23d5d633842efdc54a5d853 /bigtop-packages | |
parent | 83677c156dd89ff3ce9a314bf00c9ca55413e440 (diff) |
BIGTOP-2637: Fix flume because of kafka 0.10 update
Diffstat (limited to 'bigtop-packages')
-rw-r--r-- | bigtop-packages/src/common/flume/patch1-FLUME-3026.diff | 92 |
1 files changed, 92 insertions, 0 deletions
diff --git a/bigtop-packages/src/common/flume/patch1-FLUME-3026.diff b/bigtop-packages/src/common/flume/patch1-FLUME-3026.diff new file mode 100644 index 00000000..503de9d5 --- /dev/null +++ b/bigtop-packages/src/common/flume/patch1-FLUME-3026.diff @@ -0,0 +1,92 @@ +diff --git a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java +index 5e5f2d0..917cee2 100644 +--- a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java ++++ b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java +@@ -20,6 +20,7 @@ package org.apache.flume.channel.kafka; + + import com.google.common.collect.Lists; + import kafka.admin.AdminUtils; ++import kafka.admin.RackAwareMode; + import kafka.utils.ZKGroupTopicDirs; + import kafka.utils.ZkUtils; + import org.apache.commons.lang.RandomStringUtils; +@@ -883,7 +884,8 @@ public class TestKafkaChannel { + ZkUtils.apply(testUtil.getZkUrl(), sessionTimeoutMs, connectionTimeoutMs, false); + int replicationFactor = 1; + Properties topicConfig = new Properties(); +- AdminUtils.createTopic(zkUtils, topicName, numPartitions, replicationFactor, topicConfig); ++ AdminUtils.createTopic(zkUtils, topicName, numPartitions, replicationFactor, topicConfig, ++ RackAwareMode.Disabled$.MODULE$); + } + + public static void deleteTopic(String topicName) { +diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java +index 7c66420..bc2a299 100644 +--- a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java ++++ b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java +@@ -21,6 +21,7 @@ package org.apache.flume.sink.kafka; + import com.google.common.base.Charsets; + + import kafka.admin.AdminUtils; ++import kafka.admin.RackAwareMode; + import kafka.message.MessageAndMetadata; + import kafka.utils.ZkUtils; + +@@ -551,7 +552,8 @@ public class TestKafkaSink { + ZkUtils.apply(testUtil.getZkUrl(), sessionTimeoutMs, connectionTimeoutMs, false); + int replicationFactor = 1; + Properties topicConfig = new Properties(); +- AdminUtils.createTopic(zkUtils, topicName, numPartitions, replicationFactor, topicConfig); ++ AdminUtils.createTopic(zkUtils, topicName, numPartitions, replicationFactor, topicConfig, ++ RackAwareMode.Disabled$.MODULE$); + } + + public static void deleteTopic(String topicName) { +diff --git a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java +index 53bd65c..ae5348c 100644 +--- a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java ++++ b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java +@@ -17,6 +17,7 @@ + package org.apache.flume.source.kafka; + + import kafka.admin.AdminUtils; ++import kafka.admin.RackAwareMode; + import kafka.server.KafkaConfig; + import kafka.server.KafkaServerStartable; + import kafka.utils.ZkUtils; +@@ -131,7 +132,8 @@ public class KafkaSourceEmbeddedKafka { + ZkUtils zkUtils = ZkUtils.apply(zkClient, false); + int replicationFactor = 1; + Properties topicConfig = new Properties(); +- AdminUtils.createTopic(zkUtils, topicName, numPartitions, replicationFactor, topicConfig); ++ AdminUtils.createTopic(zkUtils, topicName, numPartitions, replicationFactor, topicConfig, ++ RackAwareMode.Disabled$.MODULE$); + } + + } +diff --git a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java +index d1daceb..cda91f9 100644 +--- a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java ++++ b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java +@@ -20,7 +20,7 @@ package org.apache.flume.source.kafka; + import com.google.common.base.Charsets; + import com.google.common.collect.Lists; + import junit.framework.Assert; +-import kafka.common.TopicExistsException; ++import org.apache.kafka.common.errors.TopicExistsException; + import kafka.utils.ZKGroupTopicDirs; + import kafka.utils.ZkUtils; + import org.apache.avro.io.BinaryEncoder; +diff --git a/pom.xml b/pom.xml +index f62c99a..fb2340f 100644 +--- a/pom.xml ++++ b/pom.xml +@@ -52,7 +52,7 @@ limitations under the License. + <elasticsearch.version>0.90.1</elasticsearch.version> + <hadoop2.version>2.4.0</hadoop2.version> + <thrift.version>0.7.0</thrift.version> +- <kafka.version>0.9.0.1</kafka.version> ++ <kafka.version>0.10.1.0</kafka.version> + <kite.version>1.0.0</kite.version> + <hive.version>1.0.0</hive.version> + <xalan.verion>2.7.1</xalan.verion> |