aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorHanifi Gunes <hgunes@maprtech.com>2015-01-13 18:37:12 -0800
committerParth Chandra <pchandra@maprtech.com>2015-01-15 14:48:13 -0800
commit93780281439d8c0a4949dfed054ad34225970665 (patch)
treef6a814b6ffec7aa00da9bf827e0489be50e02267
parent69db15ebbdc3a8f4a038e6f47a0675c32d14cdf4 (diff)
DRILL-2004: Foreman should account for fragment cancellations or query hangs
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java14
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java14
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java13
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java79
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/StateTransitionException.java42
5 files changed, 118 insertions, 44 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java
index b9f0a26b3..d6b86375a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java
@@ -54,7 +54,7 @@ public class WorkEventBus {
}
public void removeFragmentStatusListener(QueryId queryId) {
- logger.debug("Removing framgent status listener for queryId {}.", queryId);
+ logger.debug("Removing fragment status listener for queryId {}.", queryId);
listeners.remove(queryId);
}
@@ -70,10 +70,7 @@ public class WorkEventBus {
public void status(FragmentStatus status) {
FragmentStatusListener l = listeners.get(status.getHandle().getQueryId());
if (l == null) {
-
- logger.error("A fragment message arrived but there was no registered listener for that message for handle {}.",
- status.getHandle());
- return;
+ logger.warn("A fragment message arrived but there was no registered listener for that message: {}.", status);
} else {
l.statusUpdate(status);
}
@@ -102,17 +99,12 @@ public class WorkEventBus {
// since non-leaf fragments are sent first, it is an error condition if the manager is unavailable.
FragmentManager m = managers.get(handle);
- if(m != null){
+ if(m != null) {
return m;
}
throw new FragmentSetupException("Failed to receive plan fragment that was required for id: " + QueryIdHelper.getQueryIdentifier(handle));
}
- public void cancelFragment(FragmentHandle handle) {
- logger.debug("Fragment canceled: {}", QueryIdHelper.getQueryIdentifier(handle));
- removeFragmentManager(handle);
- }
-
public void removeFragmentManager(FragmentHandle handle) {
logger.debug("Removing fragment manager: {}", QueryIdHelper.getQueryIdentifier(handle));
recentlyFinishedFragments.put(handle, 1);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index 5efc9fabf..e10a6aacf 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -155,6 +155,8 @@ public class Foreman implements Runnable, Closeable, Comparable<Object> {
}
private void cleanup(QueryResult result) {
+ logger.info("foreman cleaning up - status: {}", queryManager.getStatus());
+
bee.retireForeman(this);
context.getWorkBus().removeFragmentStatusListener(queryId);
context.getClusterCoordinator().removeDrillbitStatusListener(queryManager);
@@ -384,7 +386,7 @@ public class Foreman implements Runnable, Closeable, Comparable<Object> {
* @return
*/
private synchronized boolean moveToState(QueryState newState, Exception exception){
- logger.debug("State change requested. {} --> {}", state, newState);
+ logger.info("State change requested. {} --> {}", state, newState, exception);
outside: switch(state) {
case PENDING:
@@ -413,7 +415,9 @@ public class Foreman implements Runnable, Closeable, Comparable<Object> {
.setIsLastChunk(true) //
.build();
- cleanup(result);
+ // immediately notify client that cancellation is taking place, final clean-up happens when foreman reaches to
+ // a terminal state(completed, failed)
+ initiatingClient.sendResult(responseListener, new QueryWritableBatch(result), true);
return true;
}
@@ -454,7 +458,7 @@ public class Foreman implements Runnable, Closeable, Comparable<Object> {
case COMPLETED:
case FAILED: {
// no op.
- logger.info("Dropping request to move to {} state as query is already at {} state (which is terminal).", newState, state, exception);
+ logger.warn("Dropping request to move to {} state as query is already at {} state (which is terminal).", newState, state, exception);
return false;
}
@@ -635,9 +639,9 @@ public class Foreman implements Runnable, Closeable, Comparable<Object> {
public class StateListener {
public boolean moveToState(QueryState newState, Exception ex){
- try{
+ try {
acceptExternalEvents.await();
- }catch(InterruptedException e){
+ } catch(InterruptedException e){
logger.warn("Interrupted while waiting to move state.", e);
return false;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
index d4c87d47a..2de3592b7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
@@ -21,11 +21,13 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
+import com.google.common.base.Preconditions;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.proto.BitControl.FragmentStatus;
import org.apache.drill.exec.proto.BitControl.PlanFragment;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
import org.apache.drill.exec.proto.UserProtos.RunQuery;
@@ -51,7 +53,6 @@ public class QueryManager implements FragmentStatusListener, DrillbitStatusListe
private final QueryId queryId;
public QueryManager(QueryId id, RunQuery query, PStoreProvider pStoreProvider, StateListener stateListener, Foreman foreman) {
- super();
this.stateListener = stateListener;
this.queryId = id;
this.remainingFragmentCount = new AtomicInteger(0);
@@ -87,13 +88,14 @@ public class QueryManager implements FragmentStatusListener, DrillbitStatusListe
updateFragmentStatus(status);
break;
case CANCELLED:
- // we don't care about cancellation messages since we're the only entity that should drive cancellations.
+ //TODO: define a new query state to distinguish the state of early termination from cancellation
+ fragmentDone(status);
break;
case FAILED:
stateListener.moveToState(QueryState.FAILED, new RemoteRpcException(status.getProfile().getError()));
break;
case FINISHED:
- finished(status);
+ fragmentDone(status);
break;
case RUNNING:
updateFragmentStatus(status);
@@ -107,11 +109,11 @@ public class QueryManager implements FragmentStatusListener, DrillbitStatusListe
this.status.updateFragmentStatus(status);
}
- private void finished(FragmentStatus status){
+ private void fragmentDone(FragmentStatus status){
this.status.incrementFinishedFragments();
int remaining = remainingFragmentCount.decrementAndGet();
updateFragmentStatus(status);
-
+ logger.debug("waiting for {} fragments", remaining);
if(remaining == 0){
stateListener.moveToState(QueryState.COMPLETED, null);
}
@@ -119,6 +121,7 @@ public class QueryManager implements FragmentStatusListener, DrillbitStatusListe
public void setup(FragmentHandle rootFragmentHandle, DrillbitEndpoint localIdentity, int countOfNonRootFragments){
remainingFragmentCount.set(countOfNonRootFragments + 1);
+ logger.debug("foreman is waiting for {} fragments to finish", countOfNonRootFragments + 1);
status.add(new FragmentData(rootFragmentHandle, localIdentity, true));
this.status.setTotalFragments(countOfNonRootFragments + 1);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
index 27038d360..9ffe643b6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
@@ -25,7 +25,6 @@ import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.base.FragmentRoot;
import org.apache.drill.exec.physical.impl.ImplCreator;
import org.apache.drill.exec.physical.impl.RootExec;
-import org.apache.drill.exec.planner.fragment.Fragment;
import org.apache.drill.exec.proto.BitControl.FragmentStatus;
import org.apache.drill.exec.proto.CoordinationProtos;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
@@ -71,16 +70,13 @@ public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvid
updateState(FragmentState.CANCELLED);
logger.debug("Cancelled Fragment {}", context.getHandle());
context.cancel();
-
- if (executionThread != null) {
- executionThread.interrupt();
- }
}
public void receivingFragmentFinished(FragmentHandle handle) {
- updateState(FragmentState.CANCELLED);
- context.cancel();
- root.receivingFragmentFinished(handle);
+ cancel();
+ if (root != null) {
+ root.receivingFragmentFinished(handle);
+ }
}
public UserClientConnection getClient() {
@@ -104,8 +100,8 @@ public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvid
context.getDrillbitContext().getClusterCoordinator().addDrillbitStatusListener(drillbitStatusListener);
logger.debug("Starting fragment runner. {}:{}", context.getHandle().getMajorFragmentId(), context.getHandle().getMinorFragmentId());
- if (!updateState(FragmentState.AWAITING_ALLOCATION, FragmentState.RUNNING, false)) {
- internalFail(new RuntimeException(String.format("Run was called when fragment was in %s state. FragmentRunnables should only be started when they are currently in awaiting allocation state.", FragmentState.valueOf(state.get()))));
+ if (!updateStateOrFail(FragmentState.AWAITING_ALLOCATION, FragmentState.RUNNING)) {
+ logger.warn("Unable to set fragment state to RUNNING. Cancelled or failed?");
return;
}
@@ -117,7 +113,7 @@ public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvid
closeOutResources(false);
} else {
closeOutResources(true); // make sure to close out resources before we report success.
- updateState(FragmentState.RUNNING, FragmentState.FINISHED, false);
+ updateStateOrFail(FragmentState.RUNNING, FragmentState.FINISHED);
}
break;
@@ -166,23 +162,60 @@ public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvid
listener.fail(context.getHandle(), "Failure while running fragment.", excep);
}
- private void updateState(FragmentState update) {
- state.set(update.getNumber());
- listener.stateChanged(context.getHandle(), update);
+ /**
+ * Updates the fragment state with the given state
+ * @param to target state
+ */
+ protected void updateState(FragmentState to) {;
+ state.set(to.getNumber());
+ listener.stateChanged(context.getHandle(), to);
}
- private boolean updateState(FragmentState current, FragmentState update, boolean exceptionOnFailure) {
- boolean success = state.compareAndSet(current.getNumber(), update.getNumber());
- if (!success && exceptionOnFailure) {
- internalFail(new RuntimeException(String.format(
- "State was different than expected. Attempting to update state from %s to %s however current state was %s.",
- current.name(), update.name(), FragmentState.valueOf(state.get()))));
- return false;
+ /**
+ * Updates the fragment state only if the current state matches the expected.
+ *
+ * @param expected expected current state
+ * @param to target state
+ * @return true only if update succeeds
+ */
+ protected boolean checkAndUpdateState(FragmentState expected, FragmentState to) {
+ boolean success = state.compareAndSet(expected.getNumber(), to.getNumber());
+ if (success) {
+ listener.stateChanged(context.getHandle(), to);
+ } else {
+ logger.debug("State change failed. Expected state: {} -- target state: {} -- current state: {}.",
+ expected.name(), to.name(), FragmentState.valueOf(state.get()));
}
- listener.stateChanged(context.getHandle(), update);
- return true;
+ return success;
}
+ /**
+ * Returns true if the fragment is in a terminal state
+ */
+ protected boolean isCompleted() {
+ return state.get() == FragmentState.CANCELLED_VALUE
+ || state.get() == FragmentState.FAILED_VALUE
+ || state.get() == FragmentState.FINISHED_VALUE;
+ }
+
+ /**
+ * Update the state if current state matches expected or fail the fragment if state transition fails even though
+ * fragment is not in a terminal state.
+ *
+ * @param expected current expected state
+ * @param to target state
+ * @return true only if update succeeds
+ */
+ protected boolean updateStateOrFail(FragmentState expected, FragmentState to) {
+ final boolean updated = checkAndUpdateState(expected, to);
+ if (!updated && !isCompleted()) {
+ final String msg = "State was different than expected while attempting to update state from %s to %s however current state was %s.";
+ internalFail(new StateTransitionException(String.format(msg, expected.name(), to.name(), FragmentState.valueOf(state.get()))));
+ }
+ return updated;
+ }
+
+
@Override
public int compareTo(Object o) {
return o.hashCode() - this.hashCode();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/StateTransitionException.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/StateTransitionException.java
new file mode 100644
index 000000000..7155d4360
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/StateTransitionException.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.fragment;
+
+import org.apache.drill.common.exceptions.DrillException;
+
+public class StateTransitionException extends DrillException {
+ public StateTransitionException() {
+ super();
+ }
+
+ public StateTransitionException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
+ super(message, cause, enableSuppression, writableStackTrace);
+ }
+
+ public StateTransitionException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public StateTransitionException(String message) {
+ super(message);
+ }
+
+ public StateTransitionException(Throwable cause) {
+ super(cause);
+ }
+}