diff options
author | Abhishek Ravi <abhi.ravi@gmail.com> | 2018-09-10 23:07:12 -0700 |
---|---|---|
committer | Sorabh Hamirwasia <sorabh@apache.org> | 2018-09-14 23:43:34 -0700 |
commit | 6ff854b79f4139237d20bd9a4fbd256f8ce4c0f2 (patch) | |
tree | 8cd2077c08909d2088c2d6c7eb7bd412ff73c345 | |
parent | b28355c0581529168a5962d3f41242166033a01c (diff) |
DRILL-6625: Intermittent failures in Kafka unit tests
Unit test changes to fix intermittent kafka producer and consumer errors.
- Increase the value of REQUEST_TIMEOUT_MS_CONFIG to accomadate slower systems.
- Increase the value of producer RETRIES_CONFIG to 3 (from 0).
- Prevent producer to send duplicate messages due to retries by enabling Idempotent producer.
- Increase consumer poll timeout (from 200 ms).
- The design of `TestKafkaSuit` is very similar to design of `MongoTestSuit` and hence would require changes similar to the ones made in [storage-mongo/pom.xml](https://github.com/apache/drill/pull/923/commits/f5dfa56f33a46b92e2f9de153d82a16a77642ddf#diff-e110e2cbfd77d27e85d5121529c612bfR83).
- Current behavior is surefire runs test classes twice - once as a part of `TestKafkaSuit` and the other by directly running classes. To prevent the latter from happening, changes were made in `pom.xml` for `storage-mongo` plugin.
closes #1463
5 files changed, 36 insertions, 3 deletions
diff --git a/contrib/storage-kafka/pom.xml b/contrib/storage-kafka/pom.xml index 5b5917d4a..0260c1c0c 100644 --- a/contrib/storage-kafka/pom.xml +++ b/contrib/storage-kafka/pom.xml @@ -32,6 +32,7 @@ <properties> <kafka.version>0.11.0.1</kafka.version> + <kafka.TestSuite>**/TestKafkaSuit.class</kafka.TestSuite> </properties> <dependencies> @@ -97,4 +98,29 @@ <scope>test</scope> </dependency> </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <includes> + <include>${kafka.TestSuite}</include> + </includes> + <excludes> + <exclude>**/KafkaFilterPushdownTest.java</exclude> + <exclude>**/KafkaQueriesTest.java</exclude> + <exclude>**/MessageIteratorTest.java</exclude> + </excludes> + <systemProperties> + <property> + <name>logback.log.dir</name> + <value>${project.build.directory}/surefire-reports</value> + </property> + </systemProperties> + </configuration> + </plugin> + </plugins> + </build> </project> diff --git a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaFilterPushdownTest.java b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaFilterPushdownTest.java index 7be0ec394..d87473305 100644 --- a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaFilterPushdownTest.java +++ b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaFilterPushdownTest.java @@ -27,6 +27,7 @@ import org.junit.experimental.categories.Category; import static org.apache.drill.exec.store.kafka.TestKafkaSuit.NUM_JSON_MSG; import static org.apache.drill.exec.store.kafka.TestKafkaSuit.embeddedKafkaCluster; +import static org.junit.Assert.assertTrue; @Category({KafkaStorageTest.class, SlowTest.class}) public class KafkaFilterPushdownTest extends KafkaTestBase { @@ -42,6 +43,10 @@ public class KafkaFilterPushdownTest extends KafkaTestBase { KafkaMessageGenerator generator = new KafkaMessageGenerator(embeddedKafkaCluster.getKafkaBrokerList(), StringSerializer.class); generator.populateJsonMsgWithTimestamps(TestQueryConstants.JSON_PUSHDOWN_TOPIC, NUM_JSON_MSG); + String query = String.format(TestQueryConstants.MSG_SELECT_QUERY, TestQueryConstants.JSON_PUSHDOWN_TOPIC); + //Ensure messages are present + assertTrue("Kafka server does not have expected number of messages", + testSql(query) == NUM_PARTITIONS * NUM_JSON_MSG); } /** diff --git a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaMessageGenerator.java b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaMessageGenerator.java index f4a254ebc..d094531a7 100644 --- a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaMessageGenerator.java +++ b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaMessageGenerator.java @@ -55,14 +55,15 @@ public class KafkaMessageGenerator { public KafkaMessageGenerator (final String broker, Class<?> valueSerializer) { producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker); producerProperties.put(ProducerConfig.ACKS_CONFIG, "all"); - producerProperties.put(ProducerConfig.RETRIES_CONFIG, 0); + producerProperties.put(ProducerConfig.RETRIES_CONFIG, 3); producerProperties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); producerProperties.put(ProducerConfig.LINGER_MS_CONFIG, 0); producerProperties.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1); - producerProperties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 1000); + producerProperties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 5000); producerProperties.put(ProducerConfig.CLIENT_ID_CONFIG, "drill-test-kafka-client"); producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer); + producerProperties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); //So that retries do not cause duplicates } public void populateAvroMsgIntoKafka(String topic, int numMsg) throws IOException { diff --git a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaTestBase.java b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaTestBase.java index 9f0660621..b1742d756 100644 --- a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaTestBase.java +++ b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaTestBase.java @@ -57,7 +57,7 @@ public class KafkaTestBase extends PlanTestBase { pluginRegistry.createOrUpdate(KafkaStoragePluginConfig.NAME, storagePluginConfig, true); testNoResult(String.format("alter session set `%s` = '%s'", ExecConstants.KAFKA_RECORD_READER, "org.apache.drill.exec.store.kafka.decoders.JsonMessageReader")); - testNoResult(String.format("alter session set `%s` = %d", ExecConstants.KAFKA_POLL_TIMEOUT, 200)); + testNoResult(String.format("alter session set `%s` = %d", ExecConstants.KAFKA_POLL_TIMEOUT, 5000)); } public List<QueryDataBatch> runKafkaSQLWithResults(String sql) throws Exception { diff --git a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestKafkaSuit.java b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestKafkaSuit.java index ecf998e3a..784eb4ee5 100644 --- a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestKafkaSuit.java +++ b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestKafkaSuit.java @@ -112,6 +112,7 @@ public class TestKafkaSuit { Properties topicProps = new Properties(); topicProps.put(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "CreateTime"); + topicProps.put(TopicConfig.RETENTION_MS_CONFIG, "-1"); ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(embeddedKafkaCluster.getZkServer().getConnectionString()), false); AdminUtils.createTopic(zkUtils, topicName, partitions, 1, |