aboutsummaryrefslogtreecommitdiff
path: root/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveUtilities.java
blob: 5ff55060a8d515739a7ba5edb048a270403756ed (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.drill.exec.store.hive;

import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import io.netty.buffer.DrillBuf;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.TypeProtos.DataMode;
import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.expr.holders.Decimal18Holder;
import org.apache.drill.exec.expr.holders.Decimal28SparseHolder;
import org.apache.drill.exec.expr.holders.Decimal38SparseHolder;
import org.apache.drill.exec.expr.holders.Decimal9Holder;
import org.apache.drill.exec.planner.physical.PlannerSettings;
import org.apache.drill.exec.server.options.OptionSet;
import org.apache.drill.exec.util.DecimalUtility;
import org.apache.drill.exec.vector.NullableBigIntVector;
import org.apache.drill.exec.vector.NullableBitVector;
import org.apache.drill.exec.vector.NullableDateVector;
import org.apache.drill.exec.vector.NullableDecimal18Vector;
import org.apache.drill.exec.vector.NullableDecimal28SparseVector;
import org.apache.drill.exec.vector.NullableDecimal38SparseVector;
import org.apache.drill.exec.vector.NullableDecimal9Vector;
import org.apache.drill.exec.vector.NullableFloat4Vector;
import org.apache.drill.exec.vector.NullableFloat8Vector;
import org.apache.drill.exec.vector.NullableIntVector;
import org.apache.drill.exec.vector.NullableTimeStampVector;
import org.apache.drill.exec.vector.NullableVarBinaryVector;
import org.apache.drill.exec.vector.NullableVarCharVector;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.work.ExecErrorConstants;

import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.io.IOConstants;
import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
import org.apache.hadoop.hive.ql.metadata.HiveUtils;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
import org.apache.hadoop.hive.serde2.typeinfo.BaseCharTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.HiveDecimalUtils;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;

import javax.annotation.Nullable;
import java.math.BigDecimal;
import java.sql.Date;
import java.sql.Timestamp;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import static org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_STORAGE;

public class HiveUtilities {
  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HiveUtilities.class);

  /** Partition value is received in string format. Convert it into appropriate object based on the type. */
  public static Object convertPartitionType(TypeInfo typeInfo, String value, final String defaultPartitionValue) {
    if (typeInfo.getCategory() != Category.PRIMITIVE) {
      // In Hive only primitive types are allowed as partition column types.
      throw new DrillRuntimeException("Non-Primitive types are not allowed as partition column type in Hive, " +
          "but received one: " + typeInfo.getCategory());
    }

    if (defaultPartitionValue.equals(value)) {
      return null;
    }

    final PrimitiveCategory pCat = ((PrimitiveTypeInfo) typeInfo).getPrimitiveCategory();

    try {
      switch (pCat) {
        case BINARY:
          return value.getBytes();
        case BOOLEAN:
          return Boolean.parseBoolean(value);
        case DECIMAL: {
          DecimalTypeInfo decimalTypeInfo = (DecimalTypeInfo) typeInfo;
          return HiveDecimalUtils.enforcePrecisionScale(HiveDecimal.create(value), decimalTypeInfo);
        }
        case DOUBLE:
          return Double.parseDouble(value);
        case FLOAT:
          return Float.parseFloat(value);
        case BYTE:
        case SHORT:
        case INT:
          return Integer.parseInt(value);
        case LONG:
          return Long.parseLong(value);
        case STRING:
        case VARCHAR:
          return value.getBytes();
        case CHAR:
          return value.trim().getBytes();
        case TIMESTAMP:
          return Timestamp.valueOf(value);
        case DATE:
          return Date.valueOf(value);
      }
    } catch(final Exception e) {
      // In Hive, partition values that can't be converted from string are considered to be NULL.
      logger.trace("Failed to interpret '{}' value from partition value string '{}'", pCat, value);
      return null;
    }

    throwUnsupportedHiveDataTypeError(pCat.toString());
    return null;
  }

  public static void populateVector(final ValueVector vector, final DrillBuf managedBuffer, final Object val,
      final int start, final int end) {
    TypeProtos.MinorType type = vector.getField().getType().getMinorType();

    switch(type) {
      case VARBINARY: {
        NullableVarBinaryVector v = (NullableVarBinaryVector) vector;
        byte[] value = (byte[]) val;
        for (int i = start; i < end; i++) {
          v.getMutator().setSafe(i, value, 0, value.length);
        }
        break;
      }
      case BIT: {
        NullableBitVector v = (NullableBitVector) vector;
        Boolean value = (Boolean) val;
        for (int i = start; i < end; i++) {
          v.getMutator().set(i, value ? 1 : 0);
        }
        break;
      }
      case FLOAT8: {
        NullableFloat8Vector v = (NullableFloat8Vector) vector;
        double value = (double) val;
        for (int i = start; i < end; i++) {
          v.getMutator().setSafe(i, value);
        }
        break;
      }
      case FLOAT4: {
        NullableFloat4Vector v = (NullableFloat4Vector) vector;
        float value = (float) val;
        for (int i = start; i < end; i++) {
          v.getMutator().setSafe(i, value);
        }
        break;
      }
      case TINYINT:
      case SMALLINT:
      case INT: {
        NullableIntVector v = (NullableIntVector) vector;
        int value = (int) val;
        for (int i = start; i < end; i++) {
          v.getMutator().setSafe(i, value);
        }
        break;
      }
      case BIGINT: {
        NullableBigIntVector v = (NullableBigIntVector) vector;
        long value = (long) val;
        for (int i = start; i < end; i++) {
          v.getMutator().setSafe(i, value);
        }
        break;
      }
      case VARCHAR: {
        NullableVarCharVector v = (NullableVarCharVector) vector;
        byte[] value = (byte[]) val;
        for (int i = start; i < end; i++) {
          v.getMutator().setSafe(i, value, 0, value.length);
        }
        break;
      }
      case TIMESTAMP: {
        NullableTimeStampVector v = (NullableTimeStampVector) vector;
        DateTime ts = new DateTime(((Timestamp) val).getTime()).withZoneRetainFields(DateTimeZone.UTC);
        long value = ts.getMillis();
        for (int i = start; i < end; i++) {
          v.getMutator().setSafe(i, value);
        }
        break;
      }
      case DATE: {
        NullableDateVector v = (NullableDateVector) vector;
        DateTime date = new DateTime(((Date)val).getTime()).withZoneRetainFields(DateTimeZone.UTC);
        long value = date.getMillis();
        for (int i = start; i < end; i++) {
          v.getMutator().setSafe(i, value);
        }
        break;
      }

      case DECIMAL9: {
        final BigDecimal value = ((HiveDecimal)val).bigDecimalValue();
        final NullableDecimal9Vector v = ((NullableDecimal9Vector) vector);
        final Decimal9Holder holder = new Decimal9Holder();
        holder.scale = v.getField().getScale();
        holder.precision = v.getField().getPrecision();
        holder.value = DecimalUtility.getDecimal9FromBigDecimal(value, holder.scale, holder.precision);
        for (int i = start; i < end; i++) {
          v.getMutator().setSafe(i, holder);
        }
        break;
      }

      case DECIMAL18: {
        final BigDecimal value = ((HiveDecimal)val).bigDecimalValue();
        final NullableDecimal18Vector v = ((NullableDecimal18Vector) vector);
        final Decimal18Holder holder = new Decimal18Holder();
        holder.scale = v.getField().getScale();
        holder.precision = v.getField().getPrecision();
        holder.value = DecimalUtility.getDecimal18FromBigDecimal(value, holder.scale, holder.precision);
        for (int i = start; i < end; i++) {
          v.getMutator().setSafe(i, holder);
        }
        break;
      }

      case DECIMAL28SPARSE: {
      final int needSpace = Decimal28SparseHolder.nDecimalDigits * DecimalUtility.INTEGER_SIZE;
        Preconditions.checkArgument(managedBuffer.capacity() > needSpace,
            String.format("Not sufficient space in given managed buffer. Need %d bytes, buffer has %d bytes",
                needSpace, managedBuffer.capacity()));

        final BigDecimal value = ((HiveDecimal)val).bigDecimalValue();
        final NullableDecimal28SparseVector v = ((NullableDecimal28SparseVector) vector);
        final Decimal28SparseHolder holder = new Decimal28SparseHolder();
        holder.scale = v.getField().getScale();
        holder.precision = v.getField().getPrecision();
        holder.buffer = managedBuffer;
        holder.start = 0;
        DecimalUtility.getSparseFromBigDecimal(value, holder.buffer, 0, holder.scale, holder.precision,
            Decimal28SparseHolder.nDecimalDigits);
        for (int i = start; i < end; i++) {
          v.getMutator().setSafe(i, holder);
        }
        break;
      }

      case DECIMAL38SPARSE: {
      final int needSpace = Decimal38SparseHolder.nDecimalDigits * DecimalUtility.INTEGER_SIZE;
        Preconditions.checkArgument(managedBuffer.capacity() > needSpace,
            String.format("Not sufficient space in given managed buffer. Need %d bytes, buffer has %d bytes",
                needSpace, managedBuffer.capacity()));
        final BigDecimal value = ((HiveDecimal)val).bigDecimalValue();
        final NullableDecimal38SparseVector v = ((NullableDecimal38SparseVector) vector);
        final Decimal38SparseHolder holder = new Decimal38SparseHolder();
        holder.scale = v.getField().getScale();
        holder.precision = v.getField().getPrecision();
        holder.buffer = managedBuffer;
        holder.start = 0;
        DecimalUtility.getSparseFromBigDecimal(value, holder.buffer, 0, holder.scale, holder.precision,
            Decimal38SparseHolder.nDecimalDigits);
        for (int i = start; i < end; i++) {
          v.getMutator().setSafe(i, holder);
        }
        break;
      }
    }
  }

  public static MajorType getMajorTypeFromHiveTypeInfo(final TypeInfo typeInfo, final OptionSet options) {
    switch (typeInfo.getCategory()) {
      case PRIMITIVE: {
        PrimitiveTypeInfo primitiveTypeInfo = (PrimitiveTypeInfo) typeInfo;
        MinorType minorType = HiveUtilities.getMinorTypeFromHivePrimitiveTypeInfo(primitiveTypeInfo, options);
        MajorType.Builder typeBuilder = MajorType.newBuilder().setMinorType(minorType)
            .setMode(DataMode.OPTIONAL); // Hive columns (both regular and partition) could have null values

        switch (primitiveTypeInfo.getPrimitiveCategory()) {
          case CHAR:
          case VARCHAR:
            BaseCharTypeInfo baseCharTypeInfo = (BaseCharTypeInfo) primitiveTypeInfo;
            typeBuilder.setPrecision(baseCharTypeInfo.getLength());
            break;
          case DECIMAL:
            DecimalTypeInfo decimalTypeInfo = (DecimalTypeInfo) primitiveTypeInfo;
            typeBuilder.setPrecision(decimalTypeInfo.getPrecision()).setScale(decimalTypeInfo.getScale());
            break;
          default:
            // do nothing, other primitive categories do not have precision or scale
        }

        return typeBuilder.build();
      }

      case LIST:
      case MAP:
      case STRUCT:
      case UNION:
      default:
        throwUnsupportedHiveDataTypeError(typeInfo.getCategory().toString());
    }

    return null;
  }

  public static TypeProtos.MinorType getMinorTypeFromHivePrimitiveTypeInfo(PrimitiveTypeInfo primitiveTypeInfo,
                                                                           OptionSet options) {
    switch(primitiveTypeInfo.getPrimitiveCategory()) {
      case BINARY:
        return TypeProtos.MinorType.VARBINARY;
      case BOOLEAN:
        return TypeProtos.MinorType.BIT;
      case DECIMAL: {

        if (!options.getOption(PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY).bool_val) {
          throw UserException.unsupportedError()
              .message(ExecErrorConstants.DECIMAL_DISABLE_ERR_MSG)
              .build(logger);
        }
        DecimalTypeInfo decimalTypeInfo = (DecimalTypeInfo) primitiveTypeInfo;
        return DecimalUtility.getDecimalDataType(decimalTypeInfo.precision());
      }
      case DOUBLE:
        return TypeProtos.MinorType.FLOAT8;
      case FLOAT:
        return TypeProtos.MinorType.FLOAT4;
      // TODO (DRILL-2470)
      // Byte and short (tinyint and smallint in SQL types) are currently read as integers
      // as these smaller integer types are not fully supported in Drill today.
      case SHORT:
      case BYTE:
      case INT:
        return TypeProtos.MinorType.INT;
      case LONG:
        return TypeProtos.MinorType.BIGINT;
      case STRING:
      case VARCHAR:
      case CHAR:
        return TypeProtos.MinorType.VARCHAR;
      case TIMESTAMP:
        return TypeProtos.MinorType.TIMESTAMP;
      case DATE:
        return TypeProtos.MinorType.DATE;
    }
    throwUnsupportedHiveDataTypeError(primitiveTypeInfo.getPrimitiveCategory().toString());
    return null;
  }

  /**
   * Utility method which gets table or partition {@link InputFormat} class. First it
   * tries to get the class name from given StorageDescriptor object. If it doesn't contain it tries to get it from
   * StorageHandler class set in table properties. If not found throws an exception.
   * @param job {@link JobConf} instance needed incase the table is StorageHandler based table.
   * @param sd {@link StorageDescriptor} instance of currently reading partition or table (for non-partitioned tables).
   * @param table Table object
   * @throws Exception
   */
  public static Class<? extends InputFormat<?, ?>> getInputFormatClass(final JobConf job, final StorageDescriptor sd,
      final Table table) throws Exception {
    final String inputFormatName = sd.getInputFormat();
    if (Strings.isNullOrEmpty(inputFormatName)) {
      final String storageHandlerClass = table.getParameters().get(META_TABLE_STORAGE);
      if (Strings.isNullOrEmpty(storageHandlerClass)) {
        throw new ExecutionSetupException("Unable to get Hive table InputFormat class. There is neither " +
            "InputFormat class explicitly specified nor StorageHandler class");
      }
      final HiveStorageHandler storageHandler = HiveUtils.getStorageHandler(job, storageHandlerClass);
      TableDesc tableDesc = new TableDesc();
      tableDesc.setProperties(MetaStoreUtils.getTableMetadata(table));
      storageHandler.configureInputJobProperties(tableDesc, table.getParameters());
      return (Class<? extends InputFormat<?, ?>>) storageHandler.getInputFormatClass();
    } else {
      return (Class<? extends InputFormat<?, ?>>) Class.forName(inputFormatName) ;
    }
  }

  /**
   * Utility method which adds give configs to {@link JobConf} object.
   *
   * @param job {@link JobConf} instance.
   * @param properties New config properties
   */
  public static void addConfToJob(final JobConf job, final Properties properties) {
    for (Object obj : properties.keySet()) {
      job.set((String) obj, (String) properties.get(obj));
    }
  }

  /**
   * Wrapper around {@link MetaStoreUtils#getPartitionMetadata(Partition, Table)} which also adds parameters from table
   * to properties returned by {@link MetaStoreUtils#getPartitionMetadata(Partition, Table)}.
   *
   * @param partition the source of partition level parameters
   * @param table     the source of table level parameters
   * @return properties
   */
  public static Properties getPartitionMetadata(final HivePartition partition, final HiveTableWithColumnCache table) {
    final Properties properties;
    restoreColumns(table, partition);
    properties = MetaStoreUtils.getPartitionMetadata(partition, table);

    // SerDe expects properties from Table, but above call doesn't add Table properties.
    // Include Table properties in final list in order to not to break SerDes that depend on
    // Table properties. For example AvroSerDe gets the schema from properties (passed as second argument)
    for (Map.Entry<String, String> entry : table.getParameters().entrySet()) {
      if (entry.getKey() != null && entry.getKey() != null) {
        properties.put(entry.getKey(), entry.getValue());
      }
    }

    return properties;
  }

  /**
   * Sets columns from table cache to table and partition.
   *
   * @param partition partition which will set column list
   * @param table     the source of column lists cache
   */
  public static void restoreColumns(HiveTableWithColumnCache table, HivePartition partition) {
    // exactly the same column lists for partitions or table
    // stored only one time to reduce physical plan serialization
    if (partition != null && partition.getSd().getCols() == null) {
      partition.getSd().setCols(table.getColumnListsCache().getColumns(partition.getColumnListIndex()));
    }
    if (table.getSd().getCols() == null) {
      table.getSd().setCols(table.getColumnListsCache().getColumns(0));
    }
  }

  /**
   * Wrapper around {@link MetaStoreUtils#getSchema(StorageDescriptor, StorageDescriptor, Map, String, String, List)}
   * which also sets columns from table cache to table and returns properties returned by
   * {@link MetaStoreUtils#getSchema(StorageDescriptor, StorageDescriptor, Map, String, String, List)}.
   */
  public static Properties getTableMetadata(HiveTableWithColumnCache table) {
    restoreColumns(table, null);
    return MetaStoreUtils.getSchema(table.getSd(), table.getSd(), table.getParameters(),
      table.getDbName(), table.getTableName(), table.getPartitionKeys());
  }

  public static void throwUnsupportedHiveDataTypeError(String unsupportedType) {
    StringBuilder errMsg = new StringBuilder();
    errMsg.append(String.format("Unsupported Hive data type %s. ", unsupportedType));
    errMsg.append(System.getProperty("line.separator"));
    errMsg.append("Following Hive data types are supported in Drill for querying: ");
    errMsg.append(
        "BOOLEAN, TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, DATE, TIMESTAMP, BINARY, DECIMAL, STRING, VARCHAR and CHAR");

    throw UserException.unsupportedError()
        .message(errMsg.toString())
        .build(logger);
  }

  /**
   * Returns property value. If property is absent, return given default value.
   * If property value is non-numeric will fail.
   *
   * @param tableProperties table properties
   * @param propertyName property name
   * @param defaultValue default value used in case if property is absent
   * @return property value
   * @throws NumberFormatException if property value is not numeric
   */
  public static int retrieveIntProperty(Properties tableProperties, String propertyName, int defaultValue) {
    Object propertyObject = tableProperties.get(propertyName);
    if (propertyObject == null) {
      return defaultValue;
    }

    try {
      return Integer.valueOf(propertyObject.toString());
    } catch (NumberFormatException e) {
      throw new NumberFormatException(String.format("Hive table property %s value '%s' is non-numeric",
          propertyName, propertyObject.toString()));
    }
  }

  /**
   * Checks if given table has header or footer.
   * If at least one of them has value more then zero, method will return true.
   *
   * @param table table with column cache instance
   * @return true if table contains header or footer, false otherwise
   */
  public static boolean hasHeaderOrFooter(HiveTableWithColumnCache table) {
    Properties tableProperties = getTableMetadata(table);
    int skipHeader = retrieveIntProperty(tableProperties, serdeConstants.HEADER_COUNT, -1);
    int skipFooter = retrieveIntProperty(tableProperties, serdeConstants.FOOTER_COUNT, -1);
    return skipHeader > 0 || skipFooter > 0;
  }

  /**
   * This method checks whether the table is transactional and set necessary properties in {@link JobConf}.
   * If schema evolution properties aren't set in job conf for the input format, method sets the column names
   * and types from table/partition properties or storage descriptor.
   *
   * @param job the job to update
   * @param sd storage descriptor
   */
  public static void verifyAndAddTransactionalProperties(JobConf job, StorageDescriptor sd) {

    if (AcidUtils.isTablePropertyTransactional(job)) {
      AcidUtils.setTransactionalTableScan(job, true);

      // No work is needed, if schema evolution is used
      if (Utilities.isSchemaEvolutionEnabled(job, true) && job.get(IOConstants.SCHEMA_EVOLUTION_COLUMNS) != null &&
          job.get(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES) != null) {
        return;
      }

      String colNames;
      String colTypes;

      // Try to get get column names and types from table or partition properties. If they are absent there, get columns
      // data from storage descriptor of the table
      colNames = job.get(serdeConstants.LIST_COLUMNS);
      colTypes = job.get(serdeConstants.LIST_COLUMN_TYPES);

      if (colNames == null || colTypes == null) {
        colNames = Joiner.on(",").join(Lists.transform(sd.getCols(), new Function<FieldSchema, String>()
        {
          @Nullable
          @Override
          public String apply(@Nullable FieldSchema input)
          {
            return input.getName();
          }
        }));

        colTypes = Joiner.on(",").join(Lists.transform(sd.getCols(), new Function<FieldSchema, String>()
        {
          @Nullable
          @Override
          public String apply(@Nullable FieldSchema input)
          {
            return input.getType();
          }
        }));
      }

      job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, colNames);
      job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, colTypes);
    }
  }
}