summaryrefslogtreecommitdiff
path: root/core/src/main/java/org/elasticsearch
diff options
context:
space:
mode:
authorJay Modi <jaymode@users.noreply.github.com>2017-06-28 15:50:24 -0600
committerGitHub <noreply@github.com>2017-06-28 15:50:24 -0600
commit64d11b883170393b05316d9a4db9261fd30ae2b8 (patch)
tree682bda99ac2d7324f4fa8adee73a881bbf39d82c /core/src/main/java/org/elasticsearch
parentcad57959e11ae37936fa7dcf7cc4aaa4ba9b5bb0 (diff)
Fix race condition in RemoteClusterConnection node supplier (#25432)
This commit fixes a race condition in the node supplier used by the RemoteClusterConnection. The node supplier stores an iterator over a set backed by a ConcurrentHashMap, but the get operation of the supplier uses multiple methods of the iterator and is suceptible to a race between the calls to hasNext() and next(). The test in this commit fails under the old implementation with a NoSuchElementException. This commit adds a wrapper object over a set and a iterator, with all methods being synchronized to avoid races. Modifications to the set result in the iterator being set to null and the next retrieval creates a new iterator.
Diffstat (limited to 'core/src/main/java/org/elasticsearch')
-rw-r--r--core/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java110
1 files changed, 82 insertions, 28 deletions
diff --git a/core/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java b/core/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java
index 59da9bee7e..af8ecdbf53 100644
--- a/core/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java
+++ b/core/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java
@@ -33,7 +33,6 @@ import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
-import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.component.AbstractComponent;
@@ -56,7 +55,6 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
@@ -64,7 +62,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
-import java.util.stream.Stream;
/**
* Represents a connection to a single remote cluster. In contrast to a local cluster a remote cluster is not joined such that the
@@ -83,8 +80,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
private final TransportService transportService;
private final ConnectionProfile remoteProfile;
- private final Set<DiscoveryNode> connectedNodes = Collections.newSetFromMap(new ConcurrentHashMap<>());
- private final Supplier<DiscoveryNode> nodeSupplier;
+ private final ConnectedNodes connectedNodes;
private final String clusterAlias;
private final int maxNumRemoteConnections;
private final Predicate<DiscoveryNode> nodePredicate;
@@ -116,19 +112,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
TransportRequestOptions.Type.STATE,
TransportRequestOptions.Type.RECOVERY);
remoteProfile = builder.build();
- nodeSupplier = new Supplier<DiscoveryNode>() {
- private volatile Iterator<DiscoveryNode> current;
- @Override
- public DiscoveryNode get() {
- if (current == null || current.hasNext() == false) {
- current = connectedNodes.iterator();
- if (current.hasNext() == false) {
- throw new IllegalStateException("No node available for cluster: " + clusterAlias + " nodes: " + connectedNodes);
- }
- }
- return current.next();
- }
- };
+ connectedNodes = new ConnectedNodes(clusterAlias);
this.seedNodes = Collections.unmodifiableList(seedNodes);
this.connectHandler = new ConnectHandler();
transportService.addConnectionListener(this);
@@ -156,7 +140,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
*/
public void fetchSearchShards(ClusterSearchShardsRequest searchRequest,
ActionListener<ClusterSearchShardsResponse> listener) {
- if (connectedNodes.isEmpty()) {
+ if (connectedNodes.size() == 0) {
// just in case if we are not connected for some reason we try to connect and if we fail we have to notify the listener
// this will cause some back pressure on the search end and eventually will cause rejections but that's fine
// we can't proceed with a search on a cluster level.
@@ -173,7 +157,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
* will invoke the listener immediately.
*/
public void ensureConnected(ActionListener<Void> voidActionListener) {
- if (connectedNodes.isEmpty()) {
+ if (connectedNodes.size() == 0) {
connectHandler.connect(voidActionListener);
} else {
voidActionListener.onResponse(null);
@@ -182,7 +166,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
private void fetchShardsInternal(ClusterSearchShardsRequest searchShardsRequest,
final ActionListener<ClusterSearchShardsResponse> listener) {
- final DiscoveryNode node = nodeSupplier.get();
+ final DiscoveryNode node = connectedNodes.get();
transportService.sendRequest(node, ClusterSearchShardsAction.NAME, searchShardsRequest,
new TransportResponseHandler<ClusterSearchShardsResponse>() {
@@ -218,7 +202,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
request.clear();
request.nodes(true);
request.local(true); // run this on the node that gets the request it's as good as any other
- final DiscoveryNode node = nodeSupplier.get();
+ final DiscoveryNode node = connectedNodes.get();
transportService.sendRequest(node, ClusterStateAction.NAME, request, TransportRequestOptions.EMPTY,
new TransportResponseHandler<ClusterStateResponse>() {
@Override
@@ -243,7 +227,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
}
});
};
- if (connectedNodes.isEmpty()) {
+ if (connectedNodes.size() == 0) {
// just in case if we are not connected for some reason we try to connect and if we fail we have to notify the listener
// this will cause some back pressure on the search end and eventually will cause rejections but that's fine
// we can't proceed with a search on a cluster level.
@@ -260,7 +244,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
* given node.
*/
Transport.Connection getConnection(DiscoveryNode remoteClusterNode) {
- DiscoveryNode discoveryNode = nodeSupplier.get();
+ DiscoveryNode discoveryNode = connectedNodes.get();
Transport.Connection connection = transportService.getConnection(discoveryNode);
return new Transport.Connection() {
@Override
@@ -283,12 +267,11 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
}
Transport.Connection getConnection() {
- DiscoveryNode discoveryNode = nodeSupplier.get();
+ DiscoveryNode discoveryNode = connectedNodes.get();
return transportService.getConnection(discoveryNode);
}
-
- @Override
+ @Override
public void close() throws IOException {
connectHandler.close();
}
@@ -583,12 +566,19 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
return connectedNodes.contains(node);
}
+ DiscoveryNode getConnectedNode() {
+ return connectedNodes.get();
+ }
+
+ void addConnectedNode(DiscoveryNode node) {
+ connectedNodes.add(node);
+ }
/**
* Fetches connection info for this connection
*/
public void getConnectionInfo(ActionListener<RemoteConnectionInfo> listener) {
- final Optional<DiscoveryNode> anyNode = connectedNodes.stream().findAny();
+ final Optional<DiscoveryNode> anyNode = connectedNodes.getAny();
if (anyNode.isPresent() == false) {
// not connected we return immediately
RemoteConnectionInfo remoteConnectionStats = new RemoteConnectionInfo(clusterAlias,
@@ -650,4 +640,68 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
int getNumNodesConnected() {
return connectedNodes.size();
}
+
+ private static class ConnectedNodes implements Supplier<DiscoveryNode> {
+
+ private final Set<DiscoveryNode> nodeSet = new HashSet<>();
+ private final String clusterAlias;
+
+ private Iterator<DiscoveryNode> currentIterator = null;
+
+ private ConnectedNodes(String clusterAlias) {
+ this.clusterAlias = clusterAlias;
+ }
+
+ @Override
+ public synchronized DiscoveryNode get() {
+ ensureIteratorAvailable();
+ if (currentIterator.hasNext()) {
+ return currentIterator.next();
+ } else {
+ throw new IllegalStateException("No node available for cluster: " + clusterAlias);
+ }
+ }
+
+ synchronized boolean remove(DiscoveryNode node) {
+ final boolean setRemoval = nodeSet.remove(node);
+ if (setRemoval) {
+ currentIterator = null;
+ }
+ return setRemoval;
+ }
+
+ synchronized boolean add(DiscoveryNode node) {
+ final boolean added = nodeSet.add(node);
+ if (added) {
+ currentIterator = null;
+ }
+ return added;
+ }
+
+ synchronized int size() {
+ return nodeSet.size();
+ }
+
+ synchronized boolean contains(DiscoveryNode node) {
+ return nodeSet.contains(node);
+ }
+
+ synchronized Optional<DiscoveryNode> getAny() {
+ ensureIteratorAvailable();
+ if (currentIterator.hasNext()) {
+ return Optional.of(currentIterator.next());
+ } else {
+ return Optional.empty();
+ }
+ }
+
+ private synchronized void ensureIteratorAvailable() {
+ if (currentIterator == null) {
+ currentIterator = nodeSet.iterator();
+ } else if (currentIterator.hasNext() == false && nodeSet.isEmpty() == false) {
+ // iterator rollover
+ currentIterator = nodeSet.iterator();
+ }
+ }
+ }
}