diff options
author | Jay Modi <jaymode@users.noreply.github.com> | 2017-06-28 15:50:24 -0600 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-06-28 15:50:24 -0600 |
commit | 64d11b883170393b05316d9a4db9261fd30ae2b8 (patch) | |
tree | 682bda99ac2d7324f4fa8adee73a881bbf39d82c /core/src/main/java/org/elasticsearch | |
parent | cad57959e11ae37936fa7dcf7cc4aaa4ba9b5bb0 (diff) |
Fix race condition in RemoteClusterConnection node supplier (#25432)
This commit fixes a race condition in the node supplier used by the RemoteClusterConnection. The
node supplier stores an iterator over a set backed by a ConcurrentHashMap, but the get operation
of the supplier uses multiple methods of the iterator and is suceptible to a race between the
calls to hasNext() and next(). The test in this commit fails under the old implementation with a
NoSuchElementException. This commit adds a wrapper object over a set and a iterator, with all methods
being synchronized to avoid races. Modifications to the set result in the iterator being set to null
and the next retrieval creates a new iterator.
Diffstat (limited to 'core/src/main/java/org/elasticsearch')
-rw-r--r-- | core/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java | 110 |
1 files changed, 82 insertions, 28 deletions
diff --git a/core/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java b/core/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java index 59da9bee7e..af8ecdbf53 100644 --- a/core/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java +++ b/core/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java @@ -33,7 +33,6 @@ import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse import org.elasticsearch.action.admin.cluster.state.ClusterStateAction; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; -import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.component.AbstractComponent; @@ -56,7 +55,6 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.Semaphore; @@ -64,7 +62,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import java.util.function.Predicate; import java.util.stream.Collectors; -import java.util.stream.Stream; /** * Represents a connection to a single remote cluster. In contrast to a local cluster a remote cluster is not joined such that the @@ -83,8 +80,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo private final TransportService transportService; private final ConnectionProfile remoteProfile; - private final Set<DiscoveryNode> connectedNodes = Collections.newSetFromMap(new ConcurrentHashMap<>()); - private final Supplier<DiscoveryNode> nodeSupplier; + private final ConnectedNodes connectedNodes; private final String clusterAlias; private final int maxNumRemoteConnections; private final Predicate<DiscoveryNode> nodePredicate; @@ -116,19 +112,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo TransportRequestOptions.Type.STATE, TransportRequestOptions.Type.RECOVERY); remoteProfile = builder.build(); - nodeSupplier = new Supplier<DiscoveryNode>() { - private volatile Iterator<DiscoveryNode> current; - @Override - public DiscoveryNode get() { - if (current == null || current.hasNext() == false) { - current = connectedNodes.iterator(); - if (current.hasNext() == false) { - throw new IllegalStateException("No node available for cluster: " + clusterAlias + " nodes: " + connectedNodes); - } - } - return current.next(); - } - }; + connectedNodes = new ConnectedNodes(clusterAlias); this.seedNodes = Collections.unmodifiableList(seedNodes); this.connectHandler = new ConnectHandler(); transportService.addConnectionListener(this); @@ -156,7 +140,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo */ public void fetchSearchShards(ClusterSearchShardsRequest searchRequest, ActionListener<ClusterSearchShardsResponse> listener) { - if (connectedNodes.isEmpty()) { + if (connectedNodes.size() == 0) { // just in case if we are not connected for some reason we try to connect and if we fail we have to notify the listener // this will cause some back pressure on the search end and eventually will cause rejections but that's fine // we can't proceed with a search on a cluster level. @@ -173,7 +157,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo * will invoke the listener immediately. */ public void ensureConnected(ActionListener<Void> voidActionListener) { - if (connectedNodes.isEmpty()) { + if (connectedNodes.size() == 0) { connectHandler.connect(voidActionListener); } else { voidActionListener.onResponse(null); @@ -182,7 +166,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo private void fetchShardsInternal(ClusterSearchShardsRequest searchShardsRequest, final ActionListener<ClusterSearchShardsResponse> listener) { - final DiscoveryNode node = nodeSupplier.get(); + final DiscoveryNode node = connectedNodes.get(); transportService.sendRequest(node, ClusterSearchShardsAction.NAME, searchShardsRequest, new TransportResponseHandler<ClusterSearchShardsResponse>() { @@ -218,7 +202,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo request.clear(); request.nodes(true); request.local(true); // run this on the node that gets the request it's as good as any other - final DiscoveryNode node = nodeSupplier.get(); + final DiscoveryNode node = connectedNodes.get(); transportService.sendRequest(node, ClusterStateAction.NAME, request, TransportRequestOptions.EMPTY, new TransportResponseHandler<ClusterStateResponse>() { @Override @@ -243,7 +227,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo } }); }; - if (connectedNodes.isEmpty()) { + if (connectedNodes.size() == 0) { // just in case if we are not connected for some reason we try to connect and if we fail we have to notify the listener // this will cause some back pressure on the search end and eventually will cause rejections but that's fine // we can't proceed with a search on a cluster level. @@ -260,7 +244,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo * given node. */ Transport.Connection getConnection(DiscoveryNode remoteClusterNode) { - DiscoveryNode discoveryNode = nodeSupplier.get(); + DiscoveryNode discoveryNode = connectedNodes.get(); Transport.Connection connection = transportService.getConnection(discoveryNode); return new Transport.Connection() { @Override @@ -283,12 +267,11 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo } Transport.Connection getConnection() { - DiscoveryNode discoveryNode = nodeSupplier.get(); + DiscoveryNode discoveryNode = connectedNodes.get(); return transportService.getConnection(discoveryNode); } - - @Override + @Override public void close() throws IOException { connectHandler.close(); } @@ -583,12 +566,19 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo return connectedNodes.contains(node); } + DiscoveryNode getConnectedNode() { + return connectedNodes.get(); + } + + void addConnectedNode(DiscoveryNode node) { + connectedNodes.add(node); + } /** * Fetches connection info for this connection */ public void getConnectionInfo(ActionListener<RemoteConnectionInfo> listener) { - final Optional<DiscoveryNode> anyNode = connectedNodes.stream().findAny(); + final Optional<DiscoveryNode> anyNode = connectedNodes.getAny(); if (anyNode.isPresent() == false) { // not connected we return immediately RemoteConnectionInfo remoteConnectionStats = new RemoteConnectionInfo(clusterAlias, @@ -650,4 +640,68 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo int getNumNodesConnected() { return connectedNodes.size(); } + + private static class ConnectedNodes implements Supplier<DiscoveryNode> { + + private final Set<DiscoveryNode> nodeSet = new HashSet<>(); + private final String clusterAlias; + + private Iterator<DiscoveryNode> currentIterator = null; + + private ConnectedNodes(String clusterAlias) { + this.clusterAlias = clusterAlias; + } + + @Override + public synchronized DiscoveryNode get() { + ensureIteratorAvailable(); + if (currentIterator.hasNext()) { + return currentIterator.next(); + } else { + throw new IllegalStateException("No node available for cluster: " + clusterAlias); + } + } + + synchronized boolean remove(DiscoveryNode node) { + final boolean setRemoval = nodeSet.remove(node); + if (setRemoval) { + currentIterator = null; + } + return setRemoval; + } + + synchronized boolean add(DiscoveryNode node) { + final boolean added = nodeSet.add(node); + if (added) { + currentIterator = null; + } + return added; + } + + synchronized int size() { + return nodeSet.size(); + } + + synchronized boolean contains(DiscoveryNode node) { + return nodeSet.contains(node); + } + + synchronized Optional<DiscoveryNode> getAny() { + ensureIteratorAvailable(); + if (currentIterator.hasNext()) { + return Optional.of(currentIterator.next()); + } else { + return Optional.empty(); + } + } + + private synchronized void ensureIteratorAvailable() { + if (currentIterator == null) { + currentIterator = nodeSet.iterator(); + } else if (currentIterator.hasNext() == false && nodeSet.isEmpty() == false) { + // iterator rollover + currentIterator = nodeSet.iterator(); + } + } + } } |