diff options
author | Kamesh <kam.iitkgp@gmail.com> | 2015-07-15 16:34:41 +0530 |
---|---|---|
committer | Jacques Nadeau <jacques@apache.org> | 2015-09-13 21:58:07 -0700 |
commit | 221489406505d738f1746dbf49f8dd0d67df3fc2 (patch) | |
tree | aa9ed5705d91dd67bf704dc13d26e30d1240a38e | |
parent | 97615e5675c1b25a4a9b5f96e6e1be7ed4f96c9c (diff) |
DRILL-2879: Enhancing extended json support for date in millies and binary with type info
Ignore project push down Mongo test until test completes correctly on Linux
7 files changed, 115 insertions, 20 deletions
diff --git a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoProjectPushDown.java b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoProjectPushDown.java index b17cf2f06..32666fce3 100644 --- a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoProjectPushDown.java +++ b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoProjectPushDown.java @@ -17,15 +17,14 @@ */ package org.apache.drill.exec.store.mongo; -import org.apache.drill.exec.ExecConstants; -import org.junit.BeforeClass; -import org.junit.Test; - -import java.io.IOException; - import static org.apache.drill.TestBuilder.listOf; import static org.apache.drill.TestBuilder.mapOf; +import org.apache.drill.exec.ExecConstants; +import org.junit.Ignore; +import org.junit.Test; + +@Ignore("DRILL-3775") public class TestMongoProjectPushDown extends MongoTestBase { /** diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java index dfc4f3af9..8e78cf124 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java @@ -155,8 +155,11 @@ public class JSONRecordReader extends AbstractRecordReader { if (columnNr > 0) { exceptionBuilder.pushContext("Column ", columnNr); } - exceptionBuilder.pushContext("Record ", currentRecordNumberInFile()) - .pushContext("File ", hadoopPath.toUri().getPath()); + + if (hadoopPath != null) { + exceptionBuilder.pushContext("Record ", currentRecordNumberInFile()) + .pushContext("File ", hadoopPath.toUri().getPath()); + } throw exceptionBuilder.build(logger); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/ExtendedType.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/ExtendedType.java index bec0fd2d3..13df44fdd 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/ExtendedType.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/ExtendedType.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.core.io.SerializedString; public enum ExtendedType { BINARY(ExtendedTypeName.BINARY), + TYPE(ExtendedTypeName.TYPE), DATE(ExtendedTypeName.DATE), TIME(ExtendedTypeName.TIME), TIMESTAMP(ExtendedTypeName.TIMESTAMP), diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/ExtendedTypeName.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/ExtendedTypeName.java index fcef24bee..e432d56ce 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/ExtendedTypeName.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/ExtendedTypeName.java @@ -19,6 +19,7 @@ package org.apache.drill.exec.vector.complex.fn; public interface ExtendedTypeName { public static final String BINARY = "$binary"; // base64 encoded binary (ZHJpbGw=) [from Mongo] + public static final String TYPE = "$type"; // type of binary data public static final String DATE = "$dateDay"; // ISO date with no time. such as (12-24-27) public static final String TIME = "$time"; // ISO time with no timezone (19:20:30.45Z) public static final String TIMESTAMP = "$date"; // ISO standard time (2009-02-23T00:00:00.000-08:00) [from Mongo] diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/VectorOutput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/VectorOutput.java index 651de3d3b..769f341d5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/VectorOutput.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/VectorOutput.java @@ -81,7 +81,7 @@ abstract class VectorOutput { switch(possibleTypeName){ case ExtendedTypeName.BINARY: writeBinary(checkNextToken(JsonToken.VALUE_STRING)); - checkNextToken(JsonToken.END_OBJECT); + checkCurrentToken(JsonToken.END_OBJECT); return true; case ExtendedTypeName.DATE: writeDate(checkNextToken(JsonToken.VALUE_STRING)); @@ -92,7 +92,7 @@ abstract class VectorOutput { checkNextToken(JsonToken.END_OBJECT); return true; case ExtendedTypeName.TIMESTAMP: - writeTimestamp(checkNextToken(JsonToken.VALUE_STRING)); + writeTimestamp(checkNextToken(JsonToken.VALUE_STRING, JsonToken.VALUE_NUMBER_INT)); checkNextToken(JsonToken.END_OBJECT); return true; case ExtendedTypeName.INTERVAL: @@ -100,7 +100,7 @@ abstract class VectorOutput { checkNextToken(JsonToken.END_OBJECT); return true; case ExtendedTypeName.INTEGER: - writeInteger(checkNextToken(JsonToken.VALUE_NUMBER_INT)); + writeInteger(checkNextToken(JsonToken.VALUE_STRING, JsonToken.VALUE_NUMBER_INT)); checkNextToken(JsonToken.END_OBJECT); return true; case ExtendedTypeName.DECIMAL: @@ -116,8 +116,35 @@ abstract class VectorOutput { return checkNextToken(expected, expected); } + public boolean checkCurrentToken(final JsonToken expected) throws IOException{ + return checkCurrentToken(expected, expected); + } + public boolean checkNextToken(final JsonToken expected1, final JsonToken expected2) throws IOException{ - JsonToken t = parser.nextToken(); + return checkToken(parser.nextToken(), expected1, expected2); + } + + public boolean checkCurrentToken(final JsonToken expected1, final JsonToken expected2) throws IOException{ + return checkToken(parser.getCurrentToken(), expected1, expected2); + } + + boolean hasType() throws JsonParseException, IOException { + JsonToken token = parser.nextToken(); + return token == JsonToken.FIELD_NAME && parser.getText().equals(ExtendedTypeName.TYPE); + } + + long getType() throws JsonParseException, IOException { + if (!checkNextToken(JsonToken.VALUE_NUMBER_INT, JsonToken.VALUE_STRING)) { + long type = parser.getValueAsLong(); + //Advancing the token, as checking current token in binary + parser.nextToken(); + return type; + } + throw new JsonParseException("Failure while reading $type value. Expected a NUMBER or STRING", + parser.getCurrentLocation()); + } + + public boolean checkToken(final JsonToken t, final JsonToken expected1, final JsonToken expected2) throws IOException{ if(t == JsonToken.VALUE_NULL){ return true; }else if(t == expected1){ @@ -154,7 +181,12 @@ abstract class VectorOutput { public void writeBinary(boolean isNull) throws IOException { VarBinaryWriter bin = writer.varBinary(); if(!isNull){ - work.prepareBinary(parser.getBinaryValue(), binary); + byte[] binaryData = parser.getBinaryValue(); + if (hasType()) { + //Ignoring type info as of now. + getType(); + } + work.prepareBinary(binaryData, binary); bin.write(binary); } } @@ -181,8 +213,18 @@ abstract class VectorOutput { public void writeTimestamp(boolean isNull) throws IOException { TimeStampWriter ts = writer.timeStamp(); if(!isNull){ - DateTimeFormatter f = ISODateTimeFormat.dateTime(); - ts.writeTimeStamp(DateTime.parse(parser.getValueAsString(), f).withZoneRetainFields(org.joda.time.DateTimeZone.UTC).getMillis()); + switch (parser.getCurrentToken()) { + case VALUE_NUMBER_INT: + DateTime dt = new DateTime(parser.getLongValue(), org.joda.time.DateTimeZone.UTC); + ts.writeTimeStamp(dt.getMillis()); + break; + case VALUE_STRING: + DateTimeFormatter f = ISODateTimeFormat.dateTime(); + ts.writeTimeStamp(DateTime.parse(parser.getValueAsString(), f).withZoneRetainFields(org.joda.time.DateTimeZone.UTC).getMillis()); + break; + default: + break; + } } } @@ -202,7 +244,7 @@ abstract class VectorOutput { public void writeInteger(boolean isNull) throws IOException { BigIntWriter intWriter = writer.bigInt(); if(!isNull){ - intWriter.writeBigInt(parser.getLongValue()); + intWriter.writeBigInt(Long.parseLong(parser.getValueAsString())); } } @@ -232,7 +274,12 @@ abstract class VectorOutput { public void writeBinary(boolean isNull) throws IOException { VarBinaryWriter bin = writer.varBinary(fieldName); if(!isNull){ - work.prepareBinary(parser.getBinaryValue(), binary); + byte[] binaryData = parser.getBinaryValue(); + if (hasType()) { + //Ignoring type info as of now. + getType(); + } + work.prepareBinary(binaryData, binary); bin.write(binary); } } @@ -260,8 +307,18 @@ abstract class VectorOutput { public void writeTimestamp(boolean isNull) throws IOException { TimeStampWriter ts = writer.timeStamp(fieldName); if(!isNull){ - DateTimeFormatter f = ISODateTimeFormat.dateTime(); - ts.writeTimeStamp(DateTime.parse(parser.getValueAsString(), f).withZoneRetainFields(org.joda.time.DateTimeZone.UTC).getMillis()); + switch (parser.getCurrentToken()) { + case VALUE_NUMBER_INT: + DateTime dt = new DateTime(parser.getLongValue(), org.joda.time.DateTimeZone.UTC); + ts.writeTimeStamp(dt.getMillis()); + break; + case VALUE_STRING: + DateTimeFormatter f = ISODateTimeFormat.dateTime(); + ts.writeTimeStamp(DateTime.parse(parser.getValueAsString(), f).withZoneRetainFields(org.joda.time.DateTimeZone.UTC).getMillis()); + break; + default: + break; + } } } @@ -281,7 +338,7 @@ abstract class VectorOutput { public void writeInteger(boolean isNull) throws IOException { BigIntWriter intWriter = writer.bigInt(fieldName); if(!isNull){ - intWriter.writeBigInt(parser.getLongValue()); + intWriter.writeBigInt(Long.parseLong(parser.getValueAsString())); } } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestExtendedTypes.java b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestExtendedTypes.java index f40310860..51ecec521 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestExtendedTypes.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestExtendedTypes.java @@ -64,4 +64,30 @@ public class TestExtendedTypes extends BaseTestQuery { ExecConstants.JSON_EXTENDED_TYPES.getDefault().getValue())); } } + + @Test + public void testMongoExtendedTypes() throws Exception { + + final String originalFile = "${WORKING_PATH}/src/test/resources/vector/complex/mongo_extended.json".replaceAll( + Pattern.quote("${WORKING_PATH}"), + Matcher.quoteReplacement(TestTools.getWorkingPath())); + + try { + testNoResult(String.format("ALTER SESSION SET `%s` = 'json'", ExecConstants.OUTPUT_FORMAT_VALIDATOR.getOptionName())); + testNoResult(String.format("ALTER SESSION SET `%s` = true", ExecConstants.JSON_EXTENDED_TYPES.getOptionName())); + + int actualRecordCount = testSql(String.format("select * from dfs.`%s`", originalFile)); + assertEquals( + String.format( + "Received unexpected number of rows in output: expected=%d, received=%s", + 1, actualRecordCount), 1, actualRecordCount); + } finally { + testNoResult(String.format("ALTER SESSION SET `%s` = '%s'", + ExecConstants.OUTPUT_FORMAT_VALIDATOR.getOptionName(), + ExecConstants.OUTPUT_FORMAT_VALIDATOR.getDefault().getValue())); + testNoResult(String.format("ALTER SESSION SET `%s` = %s", + ExecConstants.JSON_EXTENDED_TYPES.getOptionName(), + ExecConstants.JSON_EXTENDED_TYPES.getDefault().getValue())); + } + } } diff --git a/exec/java-exec/src/test/resources/vector/complex/mongo_extended.json b/exec/java-exec/src/test/resources/vector/complex/mongo_extended.json new file mode 100644 index 000000000..a38a83f48 --- /dev/null +++ b/exec/java-exec/src/test/resources/vector/complex/mongo_extended.json @@ -0,0 +1,8 @@ +{ + "drill_timestamp_millies" : { + "$date" : 1436241583488 + }, + "bin" : { + "$binary" : "ZHJpbGw=", "$type" : 1 + } +} |