diff options
author | Hanifi Gunes <hgunes@maprtech.com> | 2015-01-13 18:37:12 -0800 |
---|---|---|
committer | Parth Chandra <pchandra@maprtech.com> | 2015-01-15 14:48:13 -0800 |
commit | 93780281439d8c0a4949dfed054ad34225970665 (patch) | |
tree | f6a814b6ffec7aa00da9bf827e0489be50e02267 | |
parent | 69db15ebbdc3a8f4a038e6f47a0675c32d14cdf4 (diff) |
DRILL-2004: Foreman should account for fragment cancellations or query hangs
5 files changed, 118 insertions, 44 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java index b9f0a26b3..d6b86375a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java @@ -54,7 +54,7 @@ public class WorkEventBus { } public void removeFragmentStatusListener(QueryId queryId) { - logger.debug("Removing framgent status listener for queryId {}.", queryId); + logger.debug("Removing fragment status listener for queryId {}.", queryId); listeners.remove(queryId); } @@ -70,10 +70,7 @@ public class WorkEventBus { public void status(FragmentStatus status) { FragmentStatusListener l = listeners.get(status.getHandle().getQueryId()); if (l == null) { - - logger.error("A fragment message arrived but there was no registered listener for that message for handle {}.", - status.getHandle()); - return; + logger.warn("A fragment message arrived but there was no registered listener for that message: {}.", status); } else { l.statusUpdate(status); } @@ -102,17 +99,12 @@ public class WorkEventBus { // since non-leaf fragments are sent first, it is an error condition if the manager is unavailable. FragmentManager m = managers.get(handle); - if(m != null){ + if(m != null) { return m; } throw new FragmentSetupException("Failed to receive plan fragment that was required for id: " + QueryIdHelper.getQueryIdentifier(handle)); } - public void cancelFragment(FragmentHandle handle) { - logger.debug("Fragment canceled: {}", QueryIdHelper.getQueryIdentifier(handle)); - removeFragmentManager(handle); - } - public void removeFragmentManager(FragmentHandle handle) { logger.debug("Removing fragment manager: {}", QueryIdHelper.getQueryIdentifier(handle)); recentlyFinishedFragments.put(handle, 1); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java index 5efc9fabf..e10a6aacf 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java @@ -155,6 +155,8 @@ public class Foreman implements Runnable, Closeable, Comparable<Object> { } private void cleanup(QueryResult result) { + logger.info("foreman cleaning up - status: {}", queryManager.getStatus()); + bee.retireForeman(this); context.getWorkBus().removeFragmentStatusListener(queryId); context.getClusterCoordinator().removeDrillbitStatusListener(queryManager); @@ -384,7 +386,7 @@ public class Foreman implements Runnable, Closeable, Comparable<Object> { * @return */ private synchronized boolean moveToState(QueryState newState, Exception exception){ - logger.debug("State change requested. {} --> {}", state, newState); + logger.info("State change requested. {} --> {}", state, newState, exception); outside: switch(state) { case PENDING: @@ -413,7 +415,9 @@ public class Foreman implements Runnable, Closeable, Comparable<Object> { .setIsLastChunk(true) // .build(); - cleanup(result); + // immediately notify client that cancellation is taking place, final clean-up happens when foreman reaches to + // a terminal state(completed, failed) + initiatingClient.sendResult(responseListener, new QueryWritableBatch(result), true); return true; } @@ -454,7 +458,7 @@ public class Foreman implements Runnable, Closeable, Comparable<Object> { case COMPLETED: case FAILED: { // no op. - logger.info("Dropping request to move to {} state as query is already at {} state (which is terminal).", newState, state, exception); + logger.warn("Dropping request to move to {} state as query is already at {} state (which is terminal).", newState, state, exception); return false; } @@ -635,9 +639,9 @@ public class Foreman implements Runnable, Closeable, Comparable<Object> { public class StateListener { public boolean moveToState(QueryState newState, Exception ex){ - try{ + try { acceptExternalEvents.await(); - }catch(InterruptedException e){ + } catch(InterruptedException e){ logger.warn("Interrupted while waiting to move state.", e); return false; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java index d4c87d47a..2de3592b7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java @@ -21,11 +21,13 @@ import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; +import com.google.common.base.Preconditions; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.proto.BitControl.FragmentStatus; import org.apache.drill.exec.proto.BitControl.PlanFragment; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; +import org.apache.drill.exec.proto.UserBitShared; import org.apache.drill.exec.proto.UserBitShared.QueryId; import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState; import org.apache.drill.exec.proto.UserProtos.RunQuery; @@ -51,7 +53,6 @@ public class QueryManager implements FragmentStatusListener, DrillbitStatusListe private final QueryId queryId; public QueryManager(QueryId id, RunQuery query, PStoreProvider pStoreProvider, StateListener stateListener, Foreman foreman) { - super(); this.stateListener = stateListener; this.queryId = id; this.remainingFragmentCount = new AtomicInteger(0); @@ -87,13 +88,14 @@ public class QueryManager implements FragmentStatusListener, DrillbitStatusListe updateFragmentStatus(status); break; case CANCELLED: - // we don't care about cancellation messages since we're the only entity that should drive cancellations. + //TODO: define a new query state to distinguish the state of early termination from cancellation + fragmentDone(status); break; case FAILED: stateListener.moveToState(QueryState.FAILED, new RemoteRpcException(status.getProfile().getError())); break; case FINISHED: - finished(status); + fragmentDone(status); break; case RUNNING: updateFragmentStatus(status); @@ -107,11 +109,11 @@ public class QueryManager implements FragmentStatusListener, DrillbitStatusListe this.status.updateFragmentStatus(status); } - private void finished(FragmentStatus status){ + private void fragmentDone(FragmentStatus status){ this.status.incrementFinishedFragments(); int remaining = remainingFragmentCount.decrementAndGet(); updateFragmentStatus(status); - + logger.debug("waiting for {} fragments", remaining); if(remaining == 0){ stateListener.moveToState(QueryState.COMPLETED, null); } @@ -119,6 +121,7 @@ public class QueryManager implements FragmentStatusListener, DrillbitStatusListe public void setup(FragmentHandle rootFragmentHandle, DrillbitEndpoint localIdentity, int countOfNonRootFragments){ remainingFragmentCount.set(countOfNonRootFragments + 1); + logger.debug("foreman is waiting for {} fragments to finish", countOfNonRootFragments + 1); status.add(new FragmentData(rootFragmentHandle, localIdentity, true)); this.status.setTotalFragments(countOfNonRootFragments + 1); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java index 27038d360..9ffe643b6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java @@ -25,7 +25,6 @@ import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.base.FragmentRoot; import org.apache.drill.exec.physical.impl.ImplCreator; import org.apache.drill.exec.physical.impl.RootExec; -import org.apache.drill.exec.planner.fragment.Fragment; import org.apache.drill.exec.proto.BitControl.FragmentStatus; import org.apache.drill.exec.proto.CoordinationProtos; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; @@ -71,16 +70,13 @@ public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvid updateState(FragmentState.CANCELLED); logger.debug("Cancelled Fragment {}", context.getHandle()); context.cancel(); - - if (executionThread != null) { - executionThread.interrupt(); - } } public void receivingFragmentFinished(FragmentHandle handle) { - updateState(FragmentState.CANCELLED); - context.cancel(); - root.receivingFragmentFinished(handle); + cancel(); + if (root != null) { + root.receivingFragmentFinished(handle); + } } public UserClientConnection getClient() { @@ -104,8 +100,8 @@ public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvid context.getDrillbitContext().getClusterCoordinator().addDrillbitStatusListener(drillbitStatusListener); logger.debug("Starting fragment runner. {}:{}", context.getHandle().getMajorFragmentId(), context.getHandle().getMinorFragmentId()); - if (!updateState(FragmentState.AWAITING_ALLOCATION, FragmentState.RUNNING, false)) { - internalFail(new RuntimeException(String.format("Run was called when fragment was in %s state. FragmentRunnables should only be started when they are currently in awaiting allocation state.", FragmentState.valueOf(state.get())))); + if (!updateStateOrFail(FragmentState.AWAITING_ALLOCATION, FragmentState.RUNNING)) { + logger.warn("Unable to set fragment state to RUNNING. Cancelled or failed?"); return; } @@ -117,7 +113,7 @@ public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvid closeOutResources(false); } else { closeOutResources(true); // make sure to close out resources before we report success. - updateState(FragmentState.RUNNING, FragmentState.FINISHED, false); + updateStateOrFail(FragmentState.RUNNING, FragmentState.FINISHED); } break; @@ -166,23 +162,60 @@ public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvid listener.fail(context.getHandle(), "Failure while running fragment.", excep); } - private void updateState(FragmentState update) { - state.set(update.getNumber()); - listener.stateChanged(context.getHandle(), update); + /** + * Updates the fragment state with the given state + * @param to target state + */ + protected void updateState(FragmentState to) {; + state.set(to.getNumber()); + listener.stateChanged(context.getHandle(), to); } - private boolean updateState(FragmentState current, FragmentState update, boolean exceptionOnFailure) { - boolean success = state.compareAndSet(current.getNumber(), update.getNumber()); - if (!success && exceptionOnFailure) { - internalFail(new RuntimeException(String.format( - "State was different than expected. Attempting to update state from %s to %s however current state was %s.", - current.name(), update.name(), FragmentState.valueOf(state.get())))); - return false; + /** + * Updates the fragment state only if the current state matches the expected. + * + * @param expected expected current state + * @param to target state + * @return true only if update succeeds + */ + protected boolean checkAndUpdateState(FragmentState expected, FragmentState to) { + boolean success = state.compareAndSet(expected.getNumber(), to.getNumber()); + if (success) { + listener.stateChanged(context.getHandle(), to); + } else { + logger.debug("State change failed. Expected state: {} -- target state: {} -- current state: {}.", + expected.name(), to.name(), FragmentState.valueOf(state.get())); } - listener.stateChanged(context.getHandle(), update); - return true; + return success; } + /** + * Returns true if the fragment is in a terminal state + */ + protected boolean isCompleted() { + return state.get() == FragmentState.CANCELLED_VALUE + || state.get() == FragmentState.FAILED_VALUE + || state.get() == FragmentState.FINISHED_VALUE; + } + + /** + * Update the state if current state matches expected or fail the fragment if state transition fails even though + * fragment is not in a terminal state. + * + * @param expected current expected state + * @param to target state + * @return true only if update succeeds + */ + protected boolean updateStateOrFail(FragmentState expected, FragmentState to) { + final boolean updated = checkAndUpdateState(expected, to); + if (!updated && !isCompleted()) { + final String msg = "State was different than expected while attempting to update state from %s to %s however current state was %s."; + internalFail(new StateTransitionException(String.format(msg, expected.name(), to.name(), FragmentState.valueOf(state.get())))); + } + return updated; + } + + @Override public int compareTo(Object o) { return o.hashCode() - this.hashCode(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/StateTransitionException.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/StateTransitionException.java new file mode 100644 index 000000000..7155d4360 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/StateTransitionException.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.work.fragment; + +import org.apache.drill.common.exceptions.DrillException; + +public class StateTransitionException extends DrillException { + public StateTransitionException() { + super(); + } + + public StateTransitionException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } + + public StateTransitionException(String message, Throwable cause) { + super(message, cause); + } + + public StateTransitionException(String message) { + super(message); + } + + public StateTransitionException(Throwable cause) { + super(cause); + } +} |