diff options
author | Bohdan Kazydub <bohdan.kazydub@gmail.com> | 2018-08-31 18:20:34 +0300 |
---|---|---|
committer | Vitalii Diravka <vitalii.diravka@gmail.com> | 2018-10-01 13:45:29 +0300 |
commit | 98e5de3b5af862779244bac8329852b3c9a901df (patch) | |
tree | c964a232cd2f2808ceb27b0a38d665b6122e2048 /contrib | |
parent | 3bc3b66919536dc94015dc23ce43e7254623de5d (diff) |
DRILL-6724: Dump operator context to logs when error occurs during query execution
closes #1455
Diffstat (limited to 'contrib')
9 files changed, 87 insertions, 9 deletions
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java index 3c7ca8eba..b68f57403 100644 --- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java @@ -81,6 +81,7 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader { private final String tableName; private OperatorContext operatorContext; private VectorContainerWriter vectorWriter; + private DBDocumentReaderBase reader; private DrillBuf buffer; @@ -195,7 +196,7 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader { vectorWriter.reset(); int recordCount = 0; - DBDocumentReaderBase reader = null; + reader = null; while(recordCount < BaseValueVector.INITIAL_VALUE_ALLOCATION) { vectorWriter.setPosition(recordCount); @@ -526,4 +527,18 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader { table.close(); } } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder("MaprDBJsonRecordReader[Table=") + .append(table.getPath()); + if (reader != null) { + sb.append(", Document ID=") + .append(IdCodec.asString(reader.getId())); + } + sb.append(", reader=") + .append(reader) + .append(']'); + return sb.toString(); + } } diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java index 86038c479..2db1d02e8 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java @@ -338,4 +338,9 @@ public class HBaseRecordReader extends AbstractRecordReader implements DrillHBas return rowCount < TARGET_RECORD_COUNT && operatorContext.getAllocator().getAllocatedMemory() < MAX_ALLOCATED_MEMORY_PER_BATCH; } + + @Override + public String toString() { + return "HBaseRecordReader[Table=" + hbaseTableName.getNamespaceAsString() + "]"; + } } diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/HiveAbstractReader.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/HiveAbstractReader.java index 354a61e4c..ba1cd30b5 100644 --- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/HiveAbstractReader.java +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/HiveAbstractReader.java @@ -430,4 +430,19 @@ public abstract class HiveAbstractReader extends AbstractRecordReader { } } + @Override + public String toString() { + long position = -1; + try { + if (reader != null) { + position = reader.getPos(); + } + } catch (IOException e) { + logger.trace("Unable to obtain reader position: " + e.getMessage()); + } + return getClass().getSimpleName() + "[Database=" + table.getDbName() + + ", Table=" + table.getTableName() + + ", Position=" + position + + "]"; + } } diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordReader.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordReader.java index 1b6e2111b..cd732a61c 100755 --- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordReader.java +++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordReader.java @@ -279,6 +279,13 @@ class JdbcRecordReader extends AbstractRecordReader { AutoCloseables.close(resultSet, statement, connection); } + @Override + public String toString() { + return "JdbcRecordReader[sql=" + sql + + ", Plugin=" + storagePluginName + + "]"; + } + private abstract class Copier<T extends ValueVector.Mutator> { protected final int columnIndex; protected final ResultSet result; @@ -478,5 +485,4 @@ class JdbcRecordReader extends AbstractRecordReader { } } - } diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java index d715ada9c..9559c3d8c 100644 --- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java +++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java @@ -60,6 +60,7 @@ public class KafkaRecordReader extends AbstractRecordReader { private final boolean enableAllTextMode; private final boolean readNumbersAsDouble; private final String kafkaMsgReader; + private int currentMessageCount; public KafkaRecordReader(KafkaPartitionScanSpec subScanSpec, List<SchemaPath> projectedColumns, FragmentContext context, KafkaStoragePlugin plugin) { @@ -105,27 +106,27 @@ public class KafkaRecordReader extends AbstractRecordReader { writer.allocate(); writer.reset(); Stopwatch watch = Stopwatch.createStarted(); - int messageCount = 0; + currentMessageCount = 0; try { while (currentOffset < subScanSpec.getEndOffset() - 1 && msgItr.hasNext()) { ConsumerRecord<byte[], byte[]> consumerRecord = msgItr.next(); currentOffset = consumerRecord.offset(); - writer.setPosition(messageCount); + writer.setPosition(currentMessageCount); messageReader.readMessage(consumerRecord); - if (++messageCount >= DEFAULT_MESSAGES_PER_BATCH) { + if (++currentMessageCount >= DEFAULT_MESSAGES_PER_BATCH) { break; } } messageReader.ensureAtLeastOneField(); - writer.setValueCount(messageCount); - logger.debug("Took {} ms to process {} records.", watch.elapsed(TimeUnit.MILLISECONDS), messageCount); + writer.setValueCount(currentMessageCount); + logger.debug("Took {} ms to process {} records.", watch.elapsed(TimeUnit.MILLISECONDS), currentMessageCount); logger.debug("Last offset consumed for {}:{} is {}", subScanSpec.getTopicName(), subScanSpec.getPartitionId(), currentOffset); - return messageCount; + return currentMessageCount; } catch (Exception e) { - String msg = "Failure while reading messages from kafka. Recordreader was at record: " + (messageCount + 1); + String msg = "Failure while reading messages from kafka. Recordreader was at record: " + (currentMessageCount + 1); throw UserException.dataReadError(e).message(msg).addContext(e.getMessage()).build(logger); } } @@ -139,4 +140,15 @@ public class KafkaRecordReader extends AbstractRecordReader { messageReader.close(); } + @Override + public String toString() { + return "KafkaRecordReader[messageReader=" + messageReader + + ", kafkaPollTimeOut=" + kafkaPollTimeOut + + ", currentOffset=" + currentOffset + + ", enableAllTextMode=" + enableAllTextMode + + ", readNumbersAsDouble=" + readNumbersAsDouble + + ", kafkaMsgReader=" + kafkaMsgReader + + ", currentMessageCount=" + currentMessageCount + + "]"; + } } diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/JsonMessageReader.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/JsonMessageReader.java index a62357daf..40e9e129b 100644 --- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/JsonMessageReader.java +++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/JsonMessageReader.java @@ -105,4 +105,8 @@ public class JsonMessageReader implements MessageReader { } } + @Override + public String toString() { + return "JsonMessageReader[jsonReader=" + jsonReader + "]"; + } } diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordReader.java b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordReader.java index 845738c48..976b16d04 100644 --- a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordReader.java +++ b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordReader.java @@ -78,6 +78,9 @@ public class KuduRecordReader extends AbstractRecordReader { private OutputMutator output; private OperatorContext context; + private String lastColumnName; + private Type lastColumnType; + private static class ProjectedColumnInfo { int index; ValueVector vv; @@ -176,6 +179,8 @@ public class KuduRecordReader extends AbstractRecordReader { final String name = col.getName(); final Type kuduType = col.getType(); + lastColumnName = name; + lastColumnType = kuduType; MinorType minorType = TYPES.get(kuduType); if (minorType == null) { logger.warn("Ignoring column that is unsupported.", UserException @@ -326,4 +331,10 @@ public class KuduRecordReader extends AbstractRecordReader { public void close() { } + @Override + public String toString() { + return "KuduRecordReader[Column=" + lastColumnName + + ", Type=" + lastColumnType + + "]"; + } } diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduWriter.java b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduWriter.java index d0fa1581c..76115763d 100644 --- a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduWriter.java +++ b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduWriter.java @@ -77,4 +77,9 @@ public class KuduWriter extends AbstractWriter { public KuduStoragePlugin getPlugin() { return plugin; } + + @Override + public String toString() { + return "KuduWriter[name=" + name + ", storageStrategy=" + getStorageStrategy() + "]"; + } } diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java index a79e39aa3..f5d1f2e01 100644 --- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java +++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java @@ -216,4 +216,9 @@ public class MongoRecordReader extends AbstractRecordReader { public void close() { } + @Override + public String toString() { + Object reader = isBsonRecordReader ? bsonReader : jsonReader; + return "MongoRecordReader[reader=" + reader + "]"; + } } |