aboutsummaryrefslogtreecommitdiff
path: root/contrib
diff options
context:
space:
mode:
authorBohdan Kazydub <bohdan.kazydub@gmail.com>2018-08-31 18:20:34 +0300
committerVitalii Diravka <vitalii.diravka@gmail.com>2018-10-01 13:45:29 +0300
commit98e5de3b5af862779244bac8329852b3c9a901df (patch)
treec964a232cd2f2808ceb27b0a38d665b6122e2048 /contrib
parent3bc3b66919536dc94015dc23ce43e7254623de5d (diff)
DRILL-6724: Dump operator context to logs when error occurs during query execution
closes #1455
Diffstat (limited to 'contrib')
-rw-r--r--contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java17
-rw-r--r--contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java5
-rw-r--r--contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/HiveAbstractReader.java15
-rwxr-xr-xcontrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordReader.java8
-rw-r--r--contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java26
-rw-r--r--contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/JsonMessageReader.java4
-rw-r--r--contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordReader.java11
-rw-r--r--contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduWriter.java5
-rw-r--r--contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java5
9 files changed, 87 insertions, 9 deletions
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java
index 3c7ca8eba..b68f57403 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java
@@ -81,6 +81,7 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader {
private final String tableName;
private OperatorContext operatorContext;
private VectorContainerWriter vectorWriter;
+ private DBDocumentReaderBase reader;
private DrillBuf buffer;
@@ -195,7 +196,7 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader {
vectorWriter.reset();
int recordCount = 0;
- DBDocumentReaderBase reader = null;
+ reader = null;
while(recordCount < BaseValueVector.INITIAL_VALUE_ALLOCATION) {
vectorWriter.setPosition(recordCount);
@@ -526,4 +527,18 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader {
table.close();
}
}
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("MaprDBJsonRecordReader[Table=")
+ .append(table.getPath());
+ if (reader != null) {
+ sb.append(", Document ID=")
+ .append(IdCodec.asString(reader.getId()));
+ }
+ sb.append(", reader=")
+ .append(reader)
+ .append(']');
+ return sb.toString();
+ }
}
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
index 86038c479..2db1d02e8 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
@@ -338,4 +338,9 @@ public class HBaseRecordReader extends AbstractRecordReader implements DrillHBas
return rowCount < TARGET_RECORD_COUNT &&
operatorContext.getAllocator().getAllocatedMemory() < MAX_ALLOCATED_MEMORY_PER_BATCH;
}
+
+ @Override
+ public String toString() {
+ return "HBaseRecordReader[Table=" + hbaseTableName.getNamespaceAsString() + "]";
+ }
}
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/HiveAbstractReader.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/HiveAbstractReader.java
index 354a61e4c..ba1cd30b5 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/HiveAbstractReader.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/HiveAbstractReader.java
@@ -430,4 +430,19 @@ public abstract class HiveAbstractReader extends AbstractRecordReader {
}
}
+ @Override
+ public String toString() {
+ long position = -1;
+ try {
+ if (reader != null) {
+ position = reader.getPos();
+ }
+ } catch (IOException e) {
+ logger.trace("Unable to obtain reader position: " + e.getMessage());
+ }
+ return getClass().getSimpleName() + "[Database=" + table.getDbName()
+ + ", Table=" + table.getTableName()
+ + ", Position=" + position
+ + "]";
+ }
}
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordReader.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordReader.java
index 1b6e2111b..cd732a61c 100755
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordReader.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordReader.java
@@ -279,6 +279,13 @@ class JdbcRecordReader extends AbstractRecordReader {
AutoCloseables.close(resultSet, statement, connection);
}
+ @Override
+ public String toString() {
+ return "JdbcRecordReader[sql=" + sql
+ + ", Plugin=" + storagePluginName
+ + "]";
+ }
+
private abstract class Copier<T extends ValueVector.Mutator> {
protected final int columnIndex;
protected final ResultSet result;
@@ -478,5 +485,4 @@ class JdbcRecordReader extends AbstractRecordReader {
}
}
-
}
diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java
index d715ada9c..9559c3d8c 100644
--- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java
+++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java
@@ -60,6 +60,7 @@ public class KafkaRecordReader extends AbstractRecordReader {
private final boolean enableAllTextMode;
private final boolean readNumbersAsDouble;
private final String kafkaMsgReader;
+ private int currentMessageCount;
public KafkaRecordReader(KafkaPartitionScanSpec subScanSpec, List<SchemaPath> projectedColumns,
FragmentContext context, KafkaStoragePlugin plugin) {
@@ -105,27 +106,27 @@ public class KafkaRecordReader extends AbstractRecordReader {
writer.allocate();
writer.reset();
Stopwatch watch = Stopwatch.createStarted();
- int messageCount = 0;
+ currentMessageCount = 0;
try {
while (currentOffset < subScanSpec.getEndOffset() - 1 && msgItr.hasNext()) {
ConsumerRecord<byte[], byte[]> consumerRecord = msgItr.next();
currentOffset = consumerRecord.offset();
- writer.setPosition(messageCount);
+ writer.setPosition(currentMessageCount);
messageReader.readMessage(consumerRecord);
- if (++messageCount >= DEFAULT_MESSAGES_PER_BATCH) {
+ if (++currentMessageCount >= DEFAULT_MESSAGES_PER_BATCH) {
break;
}
}
messageReader.ensureAtLeastOneField();
- writer.setValueCount(messageCount);
- logger.debug("Took {} ms to process {} records.", watch.elapsed(TimeUnit.MILLISECONDS), messageCount);
+ writer.setValueCount(currentMessageCount);
+ logger.debug("Took {} ms to process {} records.", watch.elapsed(TimeUnit.MILLISECONDS), currentMessageCount);
logger.debug("Last offset consumed for {}:{} is {}", subScanSpec.getTopicName(), subScanSpec.getPartitionId(),
currentOffset);
- return messageCount;
+ return currentMessageCount;
} catch (Exception e) {
- String msg = "Failure while reading messages from kafka. Recordreader was at record: " + (messageCount + 1);
+ String msg = "Failure while reading messages from kafka. Recordreader was at record: " + (currentMessageCount + 1);
throw UserException.dataReadError(e).message(msg).addContext(e.getMessage()).build(logger);
}
}
@@ -139,4 +140,15 @@ public class KafkaRecordReader extends AbstractRecordReader {
messageReader.close();
}
+ @Override
+ public String toString() {
+ return "KafkaRecordReader[messageReader=" + messageReader
+ + ", kafkaPollTimeOut=" + kafkaPollTimeOut
+ + ", currentOffset=" + currentOffset
+ + ", enableAllTextMode=" + enableAllTextMode
+ + ", readNumbersAsDouble=" + readNumbersAsDouble
+ + ", kafkaMsgReader=" + kafkaMsgReader
+ + ", currentMessageCount=" + currentMessageCount
+ + "]";
+ }
}
diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/JsonMessageReader.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/JsonMessageReader.java
index a62357daf..40e9e129b 100644
--- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/JsonMessageReader.java
+++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/JsonMessageReader.java
@@ -105,4 +105,8 @@ public class JsonMessageReader implements MessageReader {
}
}
+ @Override
+ public String toString() {
+ return "JsonMessageReader[jsonReader=" + jsonReader + "]";
+ }
}
diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordReader.java b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordReader.java
index 845738c48..976b16d04 100644
--- a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordReader.java
+++ b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordReader.java
@@ -78,6 +78,9 @@ public class KuduRecordReader extends AbstractRecordReader {
private OutputMutator output;
private OperatorContext context;
+ private String lastColumnName;
+ private Type lastColumnType;
+
private static class ProjectedColumnInfo {
int index;
ValueVector vv;
@@ -176,6 +179,8 @@ public class KuduRecordReader extends AbstractRecordReader {
final String name = col.getName();
final Type kuduType = col.getType();
+ lastColumnName = name;
+ lastColumnType = kuduType;
MinorType minorType = TYPES.get(kuduType);
if (minorType == null) {
logger.warn("Ignoring column that is unsupported.", UserException
@@ -326,4 +331,10 @@ public class KuduRecordReader extends AbstractRecordReader {
public void close() {
}
+ @Override
+ public String toString() {
+ return "KuduRecordReader[Column=" + lastColumnName
+ + ", Type=" + lastColumnType
+ + "]";
+ }
}
diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduWriter.java b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduWriter.java
index d0fa1581c..76115763d 100644
--- a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduWriter.java
+++ b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduWriter.java
@@ -77,4 +77,9 @@ public class KuduWriter extends AbstractWriter {
public KuduStoragePlugin getPlugin() {
return plugin;
}
+
+ @Override
+ public String toString() {
+ return "KuduWriter[name=" + name + ", storageStrategy=" + getStorageStrategy() + "]";
+ }
}
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
index a79e39aa3..f5d1f2e01 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
@@ -216,4 +216,9 @@ public class MongoRecordReader extends AbstractRecordReader {
public void close() {
}
+ @Override
+ public String toString() {
+ Object reader = isBsonRecordReader ? bsonReader : jsonReader;
+ return "MongoRecordReader[reader=" + reader + "]";
+ }
}