summaryrefslogtreecommitdiff
path: root/core/src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java')
-rw-r--r--core/src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java67
1 files changed, 16 insertions, 51 deletions
diff --git a/core/src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java b/core/src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java
index 4a3771c8e5..03a14fe9cf 100644
--- a/core/src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java
+++ b/core/src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java
@@ -19,7 +19,6 @@
package org.elasticsearch.discovery.local;
-import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterService;
@@ -29,7 +28,6 @@ import org.elasticsearch.cluster.Diff;
import org.elasticsearch.cluster.IncompatibleClusterStateVersionException;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.node.DiscoveryNode;
-import org.elasticsearch.cluster.node.DiscoveryNodeService;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
@@ -44,12 +42,10 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.discovery.AckClusterStatePublishResponseHandler;
import org.elasticsearch.discovery.BlockingClusterStatePublishResponseHandler;
import org.elasticsearch.discovery.Discovery;
-import org.elasticsearch.discovery.DiscoveryService;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.discovery.DiscoveryStats;
import org.elasticsearch.discovery.InitialStateDiscoveryListener;
import org.elasticsearch.node.service.NodeService;
-import org.elasticsearch.transport.TransportService;
import java.util.HashSet;
import java.util.Queue;
@@ -67,17 +63,12 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
private static final LocalDiscovery[] NO_MEMBERS = new LocalDiscovery[0];
- private final TransportService transportService;
private final ClusterService clusterService;
- private final DiscoveryNodeService discoveryNodeService;
private RoutingService routingService;
private final ClusterName clusterName;
- private final Version version;
private final DiscoverySettings discoverySettings;
- private DiscoveryNode localNode;
-
private volatile boolean master = false;
private final AtomicBoolean initialStateSent = new AtomicBoolean();
@@ -89,14 +80,11 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
private volatile ClusterState lastProcessedClusterState;
@Inject
- public LocalDiscovery(Settings settings, ClusterName clusterName, TransportService transportService, ClusterService clusterService,
- DiscoveryNodeService discoveryNodeService, Version version, DiscoverySettings discoverySettings) {
+ public LocalDiscovery(Settings settings, ClusterName clusterName, ClusterService clusterService,
+ DiscoverySettings discoverySettings) {
super(settings);
this.clusterName = clusterName;
this.clusterService = clusterService;
- this.transportService = transportService;
- this.discoveryNodeService = discoveryNodeService;
- this.version = version;
this.discoverySettings = discoverySettings;
}
@@ -119,8 +107,6 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
clusterGroups.put(clusterName, clusterGroup);
}
logger.debug("Connected to cluster [{}]", clusterName);
- this.localNode = new DiscoveryNode(settings.get("name"), DiscoveryService.generateNodeId(settings), transportService.boundAddress().publishAddress(),
- discoveryNodeService.buildAttributes(), version);
clusterGroup.members().add(this);
@@ -147,7 +133,7 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
public ClusterState execute(ClusterState currentState) {
DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder();
for (LocalDiscovery discovery : clusterGroups.get(clusterName).members()) {
- nodesBuilder.put(discovery.localNode);
+ nodesBuilder.put(discovery.localNode());
}
nodesBuilder.localNodeId(master.localNode().id()).masterNodeId(master.localNode().id());
// remove the NO_MASTER block in this case
@@ -166,30 +152,9 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
}
});
} else if (firstMaster != null) {
- // update as fast as we can the local node state with the new metadata (so we create indices for example)
- final ClusterState masterState = firstMaster.clusterService.state();
- clusterService.submitStateUpdateTask("local-disco(detected_master)", new ClusterStateUpdateTask() {
- @Override
- public boolean runOnlyOnMaster() {
- return false;
- }
-
- @Override
- public ClusterState execute(ClusterState currentState) {
- // make sure we have the local node id set, we might need it as a result of the new metadata
- DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(currentState.nodes()).put(localNode).localNodeId(localNode.id());
- return ClusterState.builder(currentState).metaData(masterState.metaData()).nodes(nodesBuilder).build();
- }
-
- @Override
- public void onFailure(String source, Throwable t) {
- logger.error("unexpected failure during [{}]", t, source);
- }
- });
-
// tell the master to send the fact that we are here
final LocalDiscovery master = firstMaster;
- firstMaster.clusterService.submitStateUpdateTask("local-disco-receive(from node[" + localNode + "])", new ClusterStateUpdateTask() {
+ firstMaster.clusterService.submitStateUpdateTask("local-disco-receive(from node[" + localNode() + "])", new ClusterStateUpdateTask() {
@Override
public boolean runOnlyOnMaster() {
return false;
@@ -199,7 +164,7 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
public ClusterState execute(ClusterState currentState) {
DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder();
for (LocalDiscovery discovery : clusterGroups.get(clusterName).members()) {
- nodesBuilder.put(discovery.localNode);
+ nodesBuilder.put(discovery.localNode());
}
nodesBuilder.localNodeId(master.localNode().id()).masterNodeId(master.localNode().id());
return ClusterState.builder(currentState).nodes(nodesBuilder).build();
@@ -254,7 +219,7 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
final Set<String> newMembers = new HashSet<>();
for (LocalDiscovery discovery : clusterGroup.members()) {
- newMembers.add(discovery.localNode.id());
+ newMembers.add(discovery.localNode().id());
}
final LocalDiscovery master = firstMaster;
@@ -266,7 +231,7 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
@Override
public ClusterState execute(ClusterState currentState) {
- DiscoveryNodes newNodes = currentState.nodes().removeDeadMembers(newMembers, master.localNode.id());
+ DiscoveryNodes newNodes = currentState.nodes().removeDeadMembers(newMembers, master.localNode().id());
DiscoveryNodes.Delta delta = newNodes.delta(currentState.nodes());
if (delta.added()) {
logger.warn("No new nodes should be created when a new discovery view is accepted");
@@ -293,7 +258,7 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
@Override
public DiscoveryNode localNode() {
- return localNode;
+ return clusterService.localNode();
}
@Override
@@ -308,7 +273,7 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
@Override
public String nodeDescription() {
- return clusterName.value() + "/" + localNode.id();
+ return clusterName.value() + "/" + localNode().id();
}
@Override
@@ -323,7 +288,7 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
if (localDiscovery.master) {
continue;
}
- nodesToPublishTo.add(localDiscovery.localNode);
+ nodesToPublishTo.add(localDiscovery.localNode());
}
publish(members, clusterChangedEvent, new AckClusterStatePublishResponseHandler(nodesToPublishTo, ackListener));
}
@@ -359,7 +324,7 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
synchronized (this) {
// we do the marshaling intentionally, to check it works well...
// check if we publsihed cluster state at least once and node was in the cluster when we published cluster state the last time
- if (discovery.lastProcessedClusterState != null && clusterChangedEvent.previousState().nodes().nodeExists(discovery.localNode.id())) {
+ if (discovery.lastProcessedClusterState != null && clusterChangedEvent.previousState().nodes().nodeExists(discovery.localNode().id())) {
// both conditions are true - which means we can try sending cluster state as diffs
if (clusterStateDiffBytes == null) {
Diff diff = clusterState.diff(clusterChangedEvent.previousState());
@@ -369,7 +334,7 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
}
try {
newNodeSpecificClusterState = discovery.lastProcessedClusterState.readDiffFrom(StreamInput.wrap(clusterStateDiffBytes)).apply(discovery.lastProcessedClusterState);
- logger.trace("sending diff cluster state version [{}] with size {} to [{}]", clusterState.version(), clusterStateDiffBytes.length, discovery.localNode.getName());
+ logger.trace("sending diff cluster state version [{}] with size {} to [{}]", clusterState.version(), clusterStateDiffBytes.length, discovery.localNode().getName());
} catch (IncompatibleClusterStateVersionException ex) {
logger.warn("incompatible cluster state version [{}] - resending complete cluster state", ex, clusterState.version());
}
@@ -378,7 +343,7 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
if (clusterStateBytes == null) {
clusterStateBytes = Builder.toBytes(clusterState);
}
- newNodeSpecificClusterState = ClusterState.Builder.fromBytes(clusterStateBytes, discovery.localNode);
+ newNodeSpecificClusterState = ClusterState.Builder.fromBytes(clusterStateBytes, discovery.localNode());
}
discovery.lastProcessedClusterState = newNodeSpecificClusterState;
}
@@ -423,17 +388,17 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
@Override
public void onFailure(String source, Throwable t) {
logger.error("unexpected failure during [{}]", t, source);
- publishResponseHandler.onFailure(discovery.localNode, t);
+ publishResponseHandler.onFailure(discovery.localNode(), t);
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
sendInitialStateEventIfNeeded();
- publishResponseHandler.onResponse(discovery.localNode);
+ publishResponseHandler.onResponse(discovery.localNode());
}
});
} else {
- publishResponseHandler.onResponse(discovery.localNode);
+ publishResponseHandler.onResponse(discovery.localNode());
}
}