aboutsummaryrefslogtreecommitdiff
path: root/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/RestrictedJsonTableGroupScan.java
blob: 2f06d00754c8cd00d2bc0c9f9f65b972130ead97 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.drill.exec.store.mapr.db.json;

import java.util.List;
import java.util.NavigableMap;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;

import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.ScanStats;
import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
import org.apache.drill.exec.planner.index.MapRDBStatistics;
import org.apache.drill.exec.planner.cost.PluginCost;
import org.apache.drill.exec.planner.index.Statistics;
import org.apache.drill.exec.store.dfs.FileSystemPlugin;
import org.apache.drill.exec.store.mapr.db.MapRDBFormatPlugin;
import org.apache.drill.exec.store.mapr.db.MapRDBSubScan;
import org.apache.drill.exec.store.mapr.db.MapRDBSubScanSpec;
import org.apache.drill.exec.store.mapr.db.RestrictedMapRDBSubScan;
import org.apache.drill.exec.store.mapr.db.RestrictedMapRDBSubScanSpec;
import org.apache.drill.exec.store.mapr.db.TabletFragmentInfo;

/**
 * A RestrictedJsonTableGroupScan encapsulates (along with a subscan) the functionality
 * for doing restricted (i.e skip) scan rather than sequential scan.  The skipping is based
 * on a supplied set of row keys (primary keys) from a join operator.
 */
@JsonTypeName("restricted-json-scan")
public class RestrictedJsonTableGroupScan extends JsonTableGroupScan {

  @JsonCreator
  public RestrictedJsonTableGroupScan(@JsonProperty("userName") String userName,
                            @JsonProperty("storage") FileSystemPlugin storagePlugin,
                            @JsonProperty("format") MapRDBFormatPlugin formatPlugin,
                            @JsonProperty("scanSpec") JsonScanSpec scanSpec, /* scan spec of the original table */
                            @JsonProperty("columns") List<SchemaPath> columns,
                            @JsonProperty("")MapRDBStatistics statistics) {
    super(userName, storagePlugin, formatPlugin, scanSpec, columns, statistics);
  }

  // TODO:  this method needs to be fully implemented
  protected RestrictedMapRDBSubScanSpec getSubScanSpec(TabletFragmentInfo tfi) {
    JsonScanSpec spec = scanSpec;
    RestrictedMapRDBSubScanSpec subScanSpec =
        new RestrictedMapRDBSubScanSpec(
        spec.getTableName(),
        getRegionsToScan().get(tfi), spec.getSerializedFilter(), getUserName());
    return subScanSpec;
  }

  protected NavigableMap<TabletFragmentInfo, String> getRegionsToScan() {
    return getRegionsToScan(formatPlugin.getRestrictedScanRangeSizeMB());
  }

  @Override
  public MapRDBSubScan getSpecificScan(int minorFragmentId) {
    assert minorFragmentId < endpointFragmentMapping.size() : String.format(
        "Mappings length [%d] should be greater than minor fragment id [%d] but it isn't.", endpointFragmentMapping.size(),
        minorFragmentId);
    RestrictedMapRDBSubScan subscan =
        new RestrictedMapRDBSubScan(getUserName(), formatPlugin,
        getEndPointFragmentMapping(minorFragmentId), columns, maxRecordsToRead, TABLE_JSON);

    return subscan;
  }

  private List<RestrictedMapRDBSubScanSpec> getEndPointFragmentMapping(int minorFragmentId) {
    List<RestrictedMapRDBSubScanSpec> restrictedSubScanSpecList = Lists.newArrayList();
    List<MapRDBSubScanSpec> subScanSpecList = endpointFragmentMapping.get(minorFragmentId);
    for (MapRDBSubScanSpec s : subScanSpecList) {
      restrictedSubScanSpecList.add((RestrictedMapRDBSubScanSpec) s);
    }
    return restrictedSubScanSpecList;
  }

  /**
   * Private constructor, used for cloning.
   * @param that The RestrictedJsonTableGroupScan to clone
   */
  private RestrictedJsonTableGroupScan(RestrictedJsonTableGroupScan that) {
    super(that);
  }

  @Override
  public GroupScan clone(JsonScanSpec scanSpec) {
    RestrictedJsonTableGroupScan newScan = new RestrictedJsonTableGroupScan(this);
    newScan.scanSpec = scanSpec;
    newScan.resetRegionsToScan(); // resetting will force recalculation
    return newScan;
  }

  @Override
  public GroupScan clone(List<SchemaPath> columns) {
    RestrictedJsonTableGroupScan newScan = new RestrictedJsonTableGroupScan(this);
    newScan.columns = columns;
    return newScan;
  }

  @Override
  @JsonIgnore
  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
    Preconditions.checkArgument(children.isEmpty());
    return new RestrictedJsonTableGroupScan(this);
  }

  @Override
  public ScanStats getScanStats() {
    // TODO: ideally here we should use the rowcount from index scan, and multiply a factor of restricted scan
    double rowCount;
    PluginCost pluginCostModel = formatPlugin.getPluginCostModel();
    final int avgColumnSize = pluginCostModel.getAverageColumnSize(this);
    int numColumns = (columns == null || columns.isEmpty()) ?  STAR_COLS: columns.size();
    // Get the restricted group scan row count - same as the right side index rows
    rowCount = computeRestrictedScanRowcount();
    // Get the average row size of the primary table
    double avgRowSize = stats.getAvgRowSize(null, true);
    if (avgRowSize == Statistics.AVG_ROWSIZE_UNKNOWN || avgRowSize == 0) {
      avgRowSize = avgColumnSize * numColumns;
    }
    // restricted scan does random lookups and each row may belong to a different block, with the number
    // of blocks upper bounded by the total num blocks in the primary table
    double totalBlocksPrimary = Math.ceil((avgRowSize * fullTableRowCount)/pluginCostModel.getBlockSize(this));
    double numBlocks = Math.min(totalBlocksPrimary, rowCount);
    double diskCost = numBlocks * pluginCostModel.getRandomBlockReadCost(this);
    // For non-covering plans, the dominating cost would be of the join back. Reduce it using the factor
    // for biasing towards non-covering plans.
    diskCost *= stats.getRowKeyJoinBackIOFactor();
    logger.debug("RestrictedJsonGroupScan:{} rowCount:{}, avgRowSize:{}, blocks:{}, totalBlocks:{}, diskCost:{}",
        System.identityHashCode(this), rowCount, avgRowSize, numBlocks, totalBlocksPrimary, diskCost);
    return new ScanStats(GroupScanProperty.NO_EXACT_ROW_COUNT, rowCount, 1, diskCost);
  }

  private double computeRestrictedScanRowcount() {
    double rowCount = Statistics.ROWCOUNT_UNKNOWN;
    // The rowcount should be the same as the build side which was FORCED by putting it in forcedRowCountMap
    if (forcedRowCountMap.get(null) != null) {
      rowCount = forcedRowCountMap.get(null);
    }
    // If limit pushdown has occurred - factor it in the rowcount
    if (rowCount == Statistics.ROWCOUNT_UNKNOWN || rowCount == 0) {
      rowCount = (0.001f * fullTableRowCount);
    }
    if (this.maxRecordsToRead > 0) {
      rowCount = Math.min(rowCount, this.maxRecordsToRead);
    }
    return rowCount;
  }

  @Override
  public boolean isRestrictedScan() {
    return true;
  }

  @Override
  public String toString() {
    return "RestrictedJsonTableGroupScan [ScanSpec=" + scanSpec + ", columns=" + columns
        + ", rowcount=" + computeRestrictedScanRowcount()
        + (maxRecordsToRead > 0 ? ", limit=" + maxRecordsToRead : "")
        + (getMaxParallelizationWidth() > 0 ? ", maxwidth=" + getMaxParallelizationWidth() : "") + "]";
  }
}