diff options
3 files changed, 34 insertions, 17 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader2.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader2.java index 028a7d647..3e2c81c2b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader2.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader2.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.io.InputStream; import java.util.List; +import com.fasterxml.jackson.core.JsonParseException; import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.expression.SchemaPath; @@ -48,6 +49,7 @@ public class JSONRecordReader2 implements RecordReader{ private FileSystem fileSystem; private InputStream stream; private JsonReaderWithState jsonReader; + private int recordCount; public JSONRecordReader2(FragmentContext fragmentContext, String inputPath, FileSystem fileSystem, List<SchemaPath> columns) throws OutOfMemoryException { @@ -63,46 +65,53 @@ public class JSONRecordReader2 implements RecordReader{ this.writer = new VectorContainerWriter(output); this.mutator = output; jsonReader = new JsonReaderWithState(splitter); - }catch(IOException e){ - throw new ExecutionSetupException("Failure reading JSON file.", e); + }catch(Exception e){ + handleAndRaise("Failure reading JSON file.", e); } } + protected void handleAndRaise(String msg, Exception e) { + StringBuilder sb = new StringBuilder(); + sb.append(msg).append(" - Parser was at record: ").append(recordCount+1); + if (e instanceof JsonParseException) { + JsonParseException ex = JsonParseException.class.cast(e); + sb.append(" column: ").append(ex.getLocation().getColumnNr()); + } + throw new DrillRuntimeException(sb.toString(), e); + } + @Override public int next() { writer.allocate(); writer.reset(); - int i =0; + recordCount = 0; try{ outside: while(true){ - writer.setPosition(i); + writer.setPosition(recordCount); switch(jsonReader.write(writer)){ case WRITE_SUCCEED: - i++; + recordCount++; break; case NO_MORE: break outside; case WRITE_FAILED: - if (i == 0) { + if (recordCount == 0) { throw new DrillRuntimeException("Record is too big to fit into allocated ValueVector"); } break outside; }; } - - - writer.setValueCount(i); - return i; - - }catch(IOException e){ - throw new DrillRuntimeException("Failure while reading JSON file.", e); + } catch(Exception e) { + handleAndRaise("Failure while parsing JSON file.", e); } + writer.setValueCount(recordCount); + return recordCount; } @Override diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java index 485d13492..d365c6fe8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java @@ -42,6 +42,8 @@ import com.google.common.base.Charsets; public class JsonReader { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JsonReader.class); + public final static int MAX_RECORD_SIZE = 128*1024; + private final JsonFactory factory = new JsonFactory(); @@ -55,7 +57,7 @@ public class JsonReader { public boolean write(Reader reader, ComplexWriter writer) throws JsonParseException, IOException { parser = factory.createJsonParser(reader); - reader.mark(1024*128); + reader.mark(MAX_RECORD_SIZE); JsonToken t = parser.nextToken(); while(!parser.hasCurrentToken()) t = parser.nextToken(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonRecordSplitterBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonRecordSplitterBase.java index c73fef1cb..6d81d604b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonRecordSplitterBase.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonRecordSplitterBase.java @@ -20,9 +20,11 @@ package org.apache.drill.exec.vector.complex.fn; import java.io.IOException; import java.io.Reader; +import org.apache.drill.common.exceptions.DrillRuntimeException; + public abstract class JsonRecordSplitterBase implements JsonRecordSplitter { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ReaderJSONRecordSplitter.class); - public final static int MAX_RECORD_SIZE = 128*1024; + public final static int MAX_RECORD_SIZE = JsonReader.MAX_RECORD_SIZE; private static final int OPEN_CBRACKET = '{'; private static final int OPEN_BRACKET = '['; @@ -58,10 +60,15 @@ public abstract class JsonRecordSplitterBase implements JsonRecordSplitter { boolean found = false; long endOffset = start; + long curBytes = 0; int cur; outside: while(true) { cur = readNext(); endOffset++; + curBytes = endOffset - 1 - start; + if (curBytes > MAX_RECORD_SIZE) { + throw new DrillRuntimeException(String.format("Record is too long. Max allowed record size is %s bytes.", MAX_RECORD_SIZE)); + } if(cur == -1) { if(inCandidate){ @@ -110,8 +117,7 @@ public abstract class JsonRecordSplitterBase implements JsonRecordSplitter { } postScan(); - long maxBytes = endOffset - 1 - start; start = endOffset; - return createReader(maxBytes); + return createReader(curBytes); } } |