summaryrefslogtreecommitdiff
path: root/core/src/main/java/org/elasticsearch/cluster/ClusterStateObserver.java
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/java/org/elasticsearch/cluster/ClusterStateObserver.java')
-rw-r--r--core/src/main/java/org/elasticsearch/cluster/ClusterStateObserver.java40
1 files changed, 36 insertions, 4 deletions
diff --git a/core/src/main/java/org/elasticsearch/cluster/ClusterStateObserver.java b/core/src/main/java/org/elasticsearch/cluster/ClusterStateObserver.java
index df85762357..dd30a71168 100644
--- a/core/src/main/java/org/elasticsearch/cluster/ClusterStateObserver.java
+++ b/core/src/main/java/org/elasticsearch/cluster/ClusterStateObserver.java
@@ -23,6 +23,7 @@ import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.util.concurrent.ThreadContext;
import java.util.concurrent.atomic.AtomicReference;
@@ -44,6 +45,7 @@ public class ClusterStateObserver {
};
private final ClusterService clusterService;
+ private final ThreadContext contextHolder;
volatile TimeValue timeOutValue;
@@ -55,8 +57,8 @@ public class ClusterStateObserver {
volatile boolean timedOut;
- public ClusterStateObserver(ClusterService clusterService, ESLogger logger) {
- this(clusterService, new TimeValue(60000), logger);
+ public ClusterStateObserver(ClusterService clusterService, ESLogger logger, ThreadContext contextHolder) {
+ this(clusterService, new TimeValue(60000), logger, contextHolder);
}
/**
@@ -64,7 +66,7 @@ public class ClusterStateObserver {
* will fail any existing or new #waitForNextChange calls. Set to null
* to wait indefinitely
*/
- public ClusterStateObserver(ClusterService clusterService, @Nullable TimeValue timeout, ESLogger logger) {
+ public ClusterStateObserver(ClusterService clusterService, @Nullable TimeValue timeout, ESLogger logger, ThreadContext contextHolder) {
this.clusterService = clusterService;
this.lastObservedState = new AtomicReference<>(new ObservedState(clusterService.state()));
this.timeOutValue = timeout;
@@ -72,6 +74,7 @@ public class ClusterStateObserver {
this.startTimeNS = System.nanoTime();
}
this.logger = logger;
+ this.contextHolder = contextHolder;
}
/** last cluster state observer by this observer. Note that this may not be the current one */
@@ -146,7 +149,7 @@ public class ClusterStateObserver {
listener.onNewClusterState(newState.clusterState);
} else {
logger.trace("observer: sampled state rejected by predicate ({}). adding listener to ClusterService", newState);
- ObservingContext context = new ObservingContext(listener, changePredicate);
+ ObservingContext context = new ObservingContext(new ContextPreservingListener(listener, contextHolder.newStoredContext()), changePredicate);
if (!observingContext.compareAndSet(null, context)) {
throw new ElasticsearchException("already waiting for a cluster state change");
}
@@ -317,4 +320,33 @@ public class ClusterStateObserver {
return "version [" + clusterState.version() + "], status [" + status + "]";
}
}
+
+ private final static class ContextPreservingListener implements Listener {
+ private final Listener delegate;
+ private final ThreadContext.StoredContext tempContext;
+
+
+ private ContextPreservingListener(Listener delegate, ThreadContext.StoredContext storedContext) {
+ this.tempContext = storedContext;
+ this.delegate = delegate;
+ }
+
+ @Override
+ public void onNewClusterState(ClusterState state) {
+ tempContext.restore();
+ delegate.onNewClusterState(state);
+ }
+
+ @Override
+ public void onClusterServiceClose() {
+ tempContext.restore();
+ delegate.onClusterServiceClose();
+ }
+
+ @Override
+ public void onTimeout(TimeValue timeout) {
+ tempContext.restore();
+ delegate.onTimeout(timeout);
+ }
+ }
}