diff options
Diffstat (limited to 'core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java')
-rw-r--r-- | core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java | 90 |
1 files changed, 62 insertions, 28 deletions
diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java index 54f9b6855a..da0fea69c6 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java @@ -42,6 +42,7 @@ import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.gateway.GatewayAllocator; +import org.elasticsearch.index.shard.ShardId; import java.util.ArrayList; import java.util.Collections; @@ -98,7 +99,7 @@ public class AllocationService extends AbstractComponent { if (withReroute) { reroute(allocation); } - final RoutingAllocation.Result result = buildChangedResult(clusterState.metaData(), routingNodes); + final RoutingAllocation.Result result = buildChangedResult(clusterState.metaData(), clusterState.routingTable(), routingNodes); String startedShardsAsString = firstListElementsToCommaDelimitedString(startedShards, s -> s.shardId().toString()); logClusterHealthStateChange( @@ -107,37 +108,44 @@ public class AllocationService extends AbstractComponent { "shards started [" + startedShardsAsString + "] ..." ); return result; - } + } - protected RoutingAllocation.Result buildChangedResult(MetaData metaData, RoutingNodes routingNodes) { - return buildChangedResult(metaData, routingNodes, new RoutingExplanations()); + protected RoutingAllocation.Result buildChangedResult(MetaData oldMetaData, RoutingTable oldRoutingTable, RoutingNodes newRoutingNodes) { + return buildChangedResult(oldMetaData, oldRoutingTable, newRoutingNodes, new RoutingExplanations()); } - protected RoutingAllocation.Result buildChangedResult(MetaData metaData, RoutingNodes routingNodes, RoutingExplanations explanations) { - final RoutingTable routingTable = new RoutingTable.Builder().updateNodes(routingNodes).build(); - MetaData newMetaData = updateMetaDataWithRoutingTable(metaData,routingTable); - return new RoutingAllocation.Result(true, routingTable.validateRaiseException(newMetaData), newMetaData, explanations); + + protected RoutingAllocation.Result buildChangedResult(MetaData oldMetaData, RoutingTable oldRoutingTable, RoutingNodes newRoutingNodes, + RoutingExplanations explanations) { + final RoutingTable newRoutingTable = new RoutingTable.Builder().updateNodes(newRoutingNodes).build(); + MetaData newMetaData = updateMetaDataWithRoutingTable(oldMetaData, oldRoutingTable, newRoutingTable); + return new RoutingAllocation.Result(true, newRoutingTable.validateRaiseException(newMetaData), newMetaData, explanations); } /** - * Updates the current {@link MetaData} based on the newly created {@link RoutingTable}. + * Updates the current {@link MetaData} based on the newly created {@link RoutingTable}. Specifically + * we update {@link IndexMetaData#getActiveAllocationIds()} and {@link IndexMetaData#primaryTerm(int)} based on + * the changes made during this allocation. * - * @param currentMetaData {@link MetaData} object from before the routing table was changed. + * @param oldMetaData {@link MetaData} object from before the routing table was changed. + * @param oldRoutingTable {@link RoutingTable} from before the change. * @param newRoutingTable new {@link RoutingTable} created by the allocation change * @return adapted {@link MetaData}, potentially the original one if no change was needed. */ - static MetaData updateMetaDataWithRoutingTable(MetaData currentMetaData, RoutingTable newRoutingTable) { - // make sure index meta data and routing tables are in sync w.r.t active allocation ids + static MetaData updateMetaDataWithRoutingTable(MetaData oldMetaData, RoutingTable oldRoutingTable, RoutingTable newRoutingTable) { MetaData.Builder metaDataBuilder = null; - for (IndexRoutingTable indexRoutingTable : newRoutingTable) { - final IndexMetaData indexMetaData = currentMetaData.index(indexRoutingTable.getIndex()); - if (indexMetaData == null) { - throw new IllegalStateException("no metadata found for index " + indexRoutingTable.getIndex().getName()); + for (IndexRoutingTable newIndexTable : newRoutingTable) { + final IndexMetaData oldIndexMetaData = oldMetaData.index(newIndexTable.getIndex()); + if (oldIndexMetaData == null) { + throw new IllegalStateException("no metadata found for index " + newIndexTable.getIndex().getName()); } IndexMetaData.Builder indexMetaDataBuilder = null; - for (IndexShardRoutingTable shardRoutings : indexRoutingTable) { - Set<String> activeAllocationIds = shardRoutings.activeShards().stream() + for (IndexShardRoutingTable newShardTable : newIndexTable) { + final ShardId shardId = newShardTable.shardId(); + + // update activeAllocationIds + Set<String> activeAllocationIds = newShardTable.activeShards().stream() .map(ShardRouting::allocationId) .filter(Objects::nonNull) .map(AllocationId::getId) @@ -145,19 +153,44 @@ public class AllocationService extends AbstractComponent { // only update active allocation ids if there is an active shard if (activeAllocationIds.isEmpty() == false) { // get currently stored allocation ids - Set<String> storedAllocationIds = indexMetaData.activeAllocationIds(shardRoutings.shardId().id()); + Set<String> storedAllocationIds = oldIndexMetaData.activeAllocationIds(shardId.id()); if (activeAllocationIds.equals(storedAllocationIds) == false) { if (indexMetaDataBuilder == null) { - indexMetaDataBuilder = IndexMetaData.builder(indexMetaData); + indexMetaDataBuilder = IndexMetaData.builder(oldIndexMetaData); } + indexMetaDataBuilder.putActiveAllocationIds(shardId.id(), activeAllocationIds); + } + } - indexMetaDataBuilder.putActiveAllocationIds(shardRoutings.shardId().id(), activeAllocationIds); + // update primary terms + final ShardRouting newPrimary = newShardTable.primaryShard(); + if (newPrimary == null) { + throw new IllegalStateException("missing primary shard for " + newShardTable.shardId()); + } + final ShardRouting oldPrimary = oldRoutingTable.shardRoutingTable(shardId).primaryShard(); + if (oldPrimary == null) { + throw new IllegalStateException("missing primary shard for " + newShardTable.shardId()); + } + // we update the primary term on initial assignment or when a replica is promoted. Most notably we do *not* + // update them when a primary relocates + if (newPrimary.unassigned() || + newPrimary.isSameAllocation(oldPrimary) || + // we do not use newPrimary.isTargetRelocationOf(oldPrimary) because that one enforces newPrimary to + // be initializing. However, when the target shard is activated, we still want the primary term to staty + // the same + (oldPrimary.relocating() && newPrimary.isSameAllocation(oldPrimary.buildTargetRelocatingShard()))) { + // do nothing + } else { + // incrementing the primary term + if (indexMetaDataBuilder == null) { + indexMetaDataBuilder = IndexMetaData.builder(oldIndexMetaData); } + indexMetaDataBuilder.primaryTerm(shardId.id(), oldIndexMetaData.primaryTerm(shardId.id()) + 1); } } if (indexMetaDataBuilder != null) { if (metaDataBuilder == null) { - metaDataBuilder = MetaData.builder(currentMetaData); + metaDataBuilder = MetaData.builder(oldMetaData); } metaDataBuilder.put(indexMetaDataBuilder); } @@ -165,7 +198,7 @@ public class AllocationService extends AbstractComponent { if (metaDataBuilder != null) { return metaDataBuilder.build(); } else { - return currentMetaData; + return oldMetaData; } } @@ -196,7 +229,7 @@ public class AllocationService extends AbstractComponent { } gatewayAllocator.applyFailedShards(allocation); reroute(allocation); - final RoutingAllocation.Result result = buildChangedResult(clusterState.metaData(), routingNodes); + final RoutingAllocation.Result result = buildChangedResult(clusterState.metaData(), clusterState.routingTable(), routingNodes); String failedShardsAsString = firstListElementsToCommaDelimitedString(failedShards, s -> s.shard.shardId().toString()); logClusterHealthStateChange( new ClusterStateHealth(clusterState), @@ -243,7 +276,7 @@ public class AllocationService extends AbstractComponent { // the assumption is that commands will move / act on shards (or fail through exceptions) // so, there will always be shard "movements", so no need to check on reroute reroute(allocation); - RoutingAllocation.Result result = buildChangedResult(clusterState.metaData(), routingNodes, explanations); + RoutingAllocation.Result result = buildChangedResult(clusterState.metaData(), clusterState.routingTable(), routingNodes, explanations); logClusterHealthStateChange( new ClusterStateHealth(clusterState), new ClusterStateHealth(clusterState.getMetaData(), result.routingTable()), @@ -252,6 +285,7 @@ public class AllocationService extends AbstractComponent { return result; } + /** * Reroutes the routing table based on the live nodes. * <p> @@ -275,7 +309,7 @@ public class AllocationService extends AbstractComponent { if (!reroute(allocation)) { return new RoutingAllocation.Result(false, clusterState.routingTable(), clusterState.metaData()); } - RoutingAllocation.Result result = buildChangedResult(clusterState.metaData(), routingNodes); + RoutingAllocation.Result result = buildChangedResult(clusterState.metaData(), clusterState.routingTable(), routingNodes); logClusterHealthStateChange( new ClusterStateHealth(clusterState), new ClusterStateHealth(clusterState.getMetaData(), result.routingTable()), @@ -412,8 +446,8 @@ public class AllocationService extends AbstractComponent { boolean changed = false; for (ShardRouting routing : replicas) { changed |= applyFailedShard(allocation, routing, false, - new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, "primary failed while replica initializing", - null, allocation.getCurrentNanoTime(), System.currentTimeMillis())); + new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, "primary failed while replica initializing", + null, allocation.getCurrentNanoTime(), System.currentTimeMillis())); } return changed; } |