diff options
Diffstat (limited to 'core/src/main/java/org/elasticsearch/cluster/ClusterStateObserver.java')
-rw-r--r-- | core/src/main/java/org/elasticsearch/cluster/ClusterStateObserver.java | 40 |
1 files changed, 36 insertions, 4 deletions
diff --git a/core/src/main/java/org/elasticsearch/cluster/ClusterStateObserver.java b/core/src/main/java/org/elasticsearch/cluster/ClusterStateObserver.java index df85762357..dd30a71168 100644 --- a/core/src/main/java/org/elasticsearch/cluster/ClusterStateObserver.java +++ b/core/src/main/java/org/elasticsearch/cluster/ClusterStateObserver.java @@ -23,6 +23,7 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.ThreadContext; import java.util.concurrent.atomic.AtomicReference; @@ -44,6 +45,7 @@ public class ClusterStateObserver { }; private final ClusterService clusterService; + private final ThreadContext contextHolder; volatile TimeValue timeOutValue; @@ -55,8 +57,8 @@ public class ClusterStateObserver { volatile boolean timedOut; - public ClusterStateObserver(ClusterService clusterService, ESLogger logger) { - this(clusterService, new TimeValue(60000), logger); + public ClusterStateObserver(ClusterService clusterService, ESLogger logger, ThreadContext contextHolder) { + this(clusterService, new TimeValue(60000), logger, contextHolder); } /** @@ -64,7 +66,7 @@ public class ClusterStateObserver { * will fail any existing or new #waitForNextChange calls. Set to null * to wait indefinitely */ - public ClusterStateObserver(ClusterService clusterService, @Nullable TimeValue timeout, ESLogger logger) { + public ClusterStateObserver(ClusterService clusterService, @Nullable TimeValue timeout, ESLogger logger, ThreadContext contextHolder) { this.clusterService = clusterService; this.lastObservedState = new AtomicReference<>(new ObservedState(clusterService.state())); this.timeOutValue = timeout; @@ -72,6 +74,7 @@ public class ClusterStateObserver { this.startTimeNS = System.nanoTime(); } this.logger = logger; + this.contextHolder = contextHolder; } /** last cluster state observer by this observer. Note that this may not be the current one */ @@ -146,7 +149,7 @@ public class ClusterStateObserver { listener.onNewClusterState(newState.clusterState); } else { logger.trace("observer: sampled state rejected by predicate ({}). adding listener to ClusterService", newState); - ObservingContext context = new ObservingContext(listener, changePredicate); + ObservingContext context = new ObservingContext(new ContextPreservingListener(listener, contextHolder.newStoredContext()), changePredicate); if (!observingContext.compareAndSet(null, context)) { throw new ElasticsearchException("already waiting for a cluster state change"); } @@ -317,4 +320,33 @@ public class ClusterStateObserver { return "version [" + clusterState.version() + "], status [" + status + "]"; } } + + private final static class ContextPreservingListener implements Listener { + private final Listener delegate; + private final ThreadContext.StoredContext tempContext; + + + private ContextPreservingListener(Listener delegate, ThreadContext.StoredContext storedContext) { + this.tempContext = storedContext; + this.delegate = delegate; + } + + @Override + public void onNewClusterState(ClusterState state) { + tempContext.restore(); + delegate.onNewClusterState(state); + } + + @Override + public void onClusterServiceClose() { + tempContext.restore(); + delegate.onClusterServiceClose(); + } + + @Override + public void onTimeout(TimeValue timeout) { + tempContext.restore(); + delegate.onTimeout(timeout); + } + } } |