aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader2.java35
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonRecordSplitterBase.java12
3 files changed, 34 insertions, 17 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader2.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader2.java
index 028a7d647..3e2c81c2b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader2.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader2.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.util.List;
+import com.fasterxml.jackson.core.JsonParseException;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.SchemaPath;
@@ -48,6 +49,7 @@ public class JSONRecordReader2 implements RecordReader{
private FileSystem fileSystem;
private InputStream stream;
private JsonReaderWithState jsonReader;
+ private int recordCount;
public JSONRecordReader2(FragmentContext fragmentContext, String inputPath, FileSystem fileSystem,
List<SchemaPath> columns) throws OutOfMemoryException {
@@ -63,46 +65,53 @@ public class JSONRecordReader2 implements RecordReader{
this.writer = new VectorContainerWriter(output);
this.mutator = output;
jsonReader = new JsonReaderWithState(splitter);
- }catch(IOException e){
- throw new ExecutionSetupException("Failure reading JSON file.", e);
+ }catch(Exception e){
+ handleAndRaise("Failure reading JSON file.", e);
}
}
+ protected void handleAndRaise(String msg, Exception e) {
+ StringBuilder sb = new StringBuilder();
+ sb.append(msg).append(" - Parser was at record: ").append(recordCount+1);
+ if (e instanceof JsonParseException) {
+ JsonParseException ex = JsonParseException.class.cast(e);
+ sb.append(" column: ").append(ex.getLocation().getColumnNr());
+ }
+ throw new DrillRuntimeException(sb.toString(), e);
+ }
+
@Override
public int next() {
writer.allocate();
writer.reset();
- int i =0;
+ recordCount = 0;
try{
outside: while(true){
- writer.setPosition(i);
+ writer.setPosition(recordCount);
switch(jsonReader.write(writer)){
case WRITE_SUCCEED:
- i++;
+ recordCount++;
break;
case NO_MORE:
break outside;
case WRITE_FAILED:
- if (i == 0) {
+ if (recordCount == 0) {
throw new DrillRuntimeException("Record is too big to fit into allocated ValueVector");
}
break outside;
};
}
-
-
- writer.setValueCount(i);
- return i;
-
- }catch(IOException e){
- throw new DrillRuntimeException("Failure while reading JSON file.", e);
+ } catch(Exception e) {
+ handleAndRaise("Failure while parsing JSON file.", e);
}
+ writer.setValueCount(recordCount);
+ return recordCount;
}
@Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java
index 485d13492..d365c6fe8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java
@@ -42,6 +42,8 @@ import com.google.common.base.Charsets;
public class JsonReader {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JsonReader.class);
+ public final static int MAX_RECORD_SIZE = 128*1024;
+
private final JsonFactory factory = new JsonFactory();
@@ -55,7 +57,7 @@ public class JsonReader {
public boolean write(Reader reader, ComplexWriter writer) throws JsonParseException, IOException {
parser = factory.createJsonParser(reader);
- reader.mark(1024*128);
+ reader.mark(MAX_RECORD_SIZE);
JsonToken t = parser.nextToken();
while(!parser.hasCurrentToken()) t = parser.nextToken();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonRecordSplitterBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonRecordSplitterBase.java
index c73fef1cb..6d81d604b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonRecordSplitterBase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonRecordSplitterBase.java
@@ -20,9 +20,11 @@ package org.apache.drill.exec.vector.complex.fn;
import java.io.IOException;
import java.io.Reader;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+
public abstract class JsonRecordSplitterBase implements JsonRecordSplitter {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ReaderJSONRecordSplitter.class);
- public final static int MAX_RECORD_SIZE = 128*1024;
+ public final static int MAX_RECORD_SIZE = JsonReader.MAX_RECORD_SIZE;
private static final int OPEN_CBRACKET = '{';
private static final int OPEN_BRACKET = '[';
@@ -58,10 +60,15 @@ public abstract class JsonRecordSplitterBase implements JsonRecordSplitter {
boolean found = false;
long endOffset = start;
+ long curBytes = 0;
int cur;
outside: while(true) {
cur = readNext();
endOffset++;
+ curBytes = endOffset - 1 - start;
+ if (curBytes > MAX_RECORD_SIZE) {
+ throw new DrillRuntimeException(String.format("Record is too long. Max allowed record size is %s bytes.", MAX_RECORD_SIZE));
+ }
if(cur == -1) {
if(inCandidate){
@@ -110,8 +117,7 @@ public abstract class JsonRecordSplitterBase implements JsonRecordSplitter {
}
postScan();
- long maxBytes = endOffset - 1 - start;
start = endOffset;
- return createReader(maxBytes);
+ return createReader(curBytes);
}
}