aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAbhishek Ravi <abhi.ravi@gmail.com>2018-09-10 23:07:12 -0700
committerSorabh Hamirwasia <sorabh@apache.org>2018-09-14 23:43:34 -0700
commit6ff854b79f4139237d20bd9a4fbd256f8ce4c0f2 (patch)
tree8cd2077c08909d2088c2d6c7eb7bd412ff73c345
parentb28355c0581529168a5962d3f41242166033a01c (diff)
DRILL-6625: Intermittent failures in Kafka unit tests
Unit test changes to fix intermittent kafka producer and consumer errors. - Increase the value of REQUEST_TIMEOUT_MS_CONFIG to accomadate slower systems. - Increase the value of producer RETRIES_CONFIG to 3 (from 0). - Prevent producer to send duplicate messages due to retries by enabling Idempotent producer. - Increase consumer poll timeout (from 200 ms). - The design of `TestKafkaSuit` is very similar to design of `MongoTestSuit` and hence would require changes similar to the ones made in [storage-mongo/pom.xml](https://github.com/apache/drill/pull/923/commits/f5dfa56f33a46b92e2f9de153d82a16a77642ddf#diff-e110e2cbfd77d27e85d5121529c612bfR83). - Current behavior is surefire runs test classes twice - once as a part of `TestKafkaSuit` and the other by directly running classes. To prevent the latter from happening, changes were made in `pom.xml` for `storage-mongo` plugin. closes #1463
-rw-r--r--contrib/storage-kafka/pom.xml26
-rw-r--r--contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaFilterPushdownTest.java5
-rw-r--r--contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaMessageGenerator.java5
-rw-r--r--contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaTestBase.java2
-rw-r--r--contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestKafkaSuit.java1
5 files changed, 36 insertions, 3 deletions
diff --git a/contrib/storage-kafka/pom.xml b/contrib/storage-kafka/pom.xml
index 5b5917d4a..0260c1c0c 100644
--- a/contrib/storage-kafka/pom.xml
+++ b/contrib/storage-kafka/pom.xml
@@ -32,6 +32,7 @@
<properties>
<kafka.version>0.11.0.1</kafka.version>
+ <kafka.TestSuite>**/TestKafkaSuit.class</kafka.TestSuite>
</properties>
<dependencies>
@@ -97,4 +98,29 @@
<scope>test</scope>
</dependency>
</dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <includes>
+ <include>${kafka.TestSuite}</include>
+ </includes>
+ <excludes>
+ <exclude>**/KafkaFilterPushdownTest.java</exclude>
+ <exclude>**/KafkaQueriesTest.java</exclude>
+ <exclude>**/MessageIteratorTest.java</exclude>
+ </excludes>
+ <systemProperties>
+ <property>
+ <name>logback.log.dir</name>
+ <value>${project.build.directory}/surefire-reports</value>
+ </property>
+ </systemProperties>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
</project>
diff --git a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaFilterPushdownTest.java b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaFilterPushdownTest.java
index 7be0ec394..d87473305 100644
--- a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaFilterPushdownTest.java
+++ b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaFilterPushdownTest.java
@@ -27,6 +27,7 @@ import org.junit.experimental.categories.Category;
import static org.apache.drill.exec.store.kafka.TestKafkaSuit.NUM_JSON_MSG;
import static org.apache.drill.exec.store.kafka.TestKafkaSuit.embeddedKafkaCluster;
+import static org.junit.Assert.assertTrue;
@Category({KafkaStorageTest.class, SlowTest.class})
public class KafkaFilterPushdownTest extends KafkaTestBase {
@@ -42,6 +43,10 @@ public class KafkaFilterPushdownTest extends KafkaTestBase {
KafkaMessageGenerator generator = new KafkaMessageGenerator(embeddedKafkaCluster.getKafkaBrokerList(),
StringSerializer.class);
generator.populateJsonMsgWithTimestamps(TestQueryConstants.JSON_PUSHDOWN_TOPIC, NUM_JSON_MSG);
+ String query = String.format(TestQueryConstants.MSG_SELECT_QUERY, TestQueryConstants.JSON_PUSHDOWN_TOPIC);
+ //Ensure messages are present
+ assertTrue("Kafka server does not have expected number of messages",
+ testSql(query) == NUM_PARTITIONS * NUM_JSON_MSG);
}
/**
diff --git a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaMessageGenerator.java b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaMessageGenerator.java
index f4a254ebc..d094531a7 100644
--- a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaMessageGenerator.java
+++ b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaMessageGenerator.java
@@ -55,14 +55,15 @@ public class KafkaMessageGenerator {
public KafkaMessageGenerator (final String broker, Class<?> valueSerializer) {
producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker);
producerProperties.put(ProducerConfig.ACKS_CONFIG, "all");
- producerProperties.put(ProducerConfig.RETRIES_CONFIG, 0);
+ producerProperties.put(ProducerConfig.RETRIES_CONFIG, 3);
producerProperties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
producerProperties.put(ProducerConfig.LINGER_MS_CONFIG, 0);
producerProperties.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
- producerProperties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 1000);
+ producerProperties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 5000);
producerProperties.put(ProducerConfig.CLIENT_ID_CONFIG, "drill-test-kafka-client");
producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);
+ producerProperties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); //So that retries do not cause duplicates
}
public void populateAvroMsgIntoKafka(String topic, int numMsg) throws IOException {
diff --git a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaTestBase.java b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaTestBase.java
index 9f0660621..b1742d756 100644
--- a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaTestBase.java
+++ b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaTestBase.java
@@ -57,7 +57,7 @@ public class KafkaTestBase extends PlanTestBase {
pluginRegistry.createOrUpdate(KafkaStoragePluginConfig.NAME, storagePluginConfig, true);
testNoResult(String.format("alter session set `%s` = '%s'", ExecConstants.KAFKA_RECORD_READER,
"org.apache.drill.exec.store.kafka.decoders.JsonMessageReader"));
- testNoResult(String.format("alter session set `%s` = %d", ExecConstants.KAFKA_POLL_TIMEOUT, 200));
+ testNoResult(String.format("alter session set `%s` = %d", ExecConstants.KAFKA_POLL_TIMEOUT, 5000));
}
public List<QueryDataBatch> runKafkaSQLWithResults(String sql) throws Exception {
diff --git a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestKafkaSuit.java b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestKafkaSuit.java
index ecf998e3a..784eb4ee5 100644
--- a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestKafkaSuit.java
+++ b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestKafkaSuit.java
@@ -112,6 +112,7 @@ public class TestKafkaSuit {
Properties topicProps = new Properties();
topicProps.put(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "CreateTime");
+ topicProps.put(TopicConfig.RETENTION_MS_CONFIG, "-1");
ZkUtils zkUtils = new ZkUtils(zkClient,
new ZkConnection(embeddedKafkaCluster.getZkServer().getConnectionString()), false);
AdminUtils.createTopic(zkUtils, topicName, partitions, 1,