summaryrefslogtreecommitdiff
path: root/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java')
-rw-r--r--core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java90
1 files changed, 62 insertions, 28 deletions
diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java
index 54f9b6855a..da0fea69c6 100644
--- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java
+++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java
@@ -42,6 +42,7 @@ import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.gateway.GatewayAllocator;
+import org.elasticsearch.index.shard.ShardId;
import java.util.ArrayList;
import java.util.Collections;
@@ -98,7 +99,7 @@ public class AllocationService extends AbstractComponent {
if (withReroute) {
reroute(allocation);
}
- final RoutingAllocation.Result result = buildChangedResult(clusterState.metaData(), routingNodes);
+ final RoutingAllocation.Result result = buildChangedResult(clusterState.metaData(), clusterState.routingTable(), routingNodes);
String startedShardsAsString = firstListElementsToCommaDelimitedString(startedShards, s -> s.shardId().toString());
logClusterHealthStateChange(
@@ -107,37 +108,44 @@ public class AllocationService extends AbstractComponent {
"shards started [" + startedShardsAsString + "] ..."
);
return result;
- }
+ }
- protected RoutingAllocation.Result buildChangedResult(MetaData metaData, RoutingNodes routingNodes) {
- return buildChangedResult(metaData, routingNodes, new RoutingExplanations());
+ protected RoutingAllocation.Result buildChangedResult(MetaData oldMetaData, RoutingTable oldRoutingTable, RoutingNodes newRoutingNodes) {
+ return buildChangedResult(oldMetaData, oldRoutingTable, newRoutingNodes, new RoutingExplanations());
}
- protected RoutingAllocation.Result buildChangedResult(MetaData metaData, RoutingNodes routingNodes, RoutingExplanations explanations) {
- final RoutingTable routingTable = new RoutingTable.Builder().updateNodes(routingNodes).build();
- MetaData newMetaData = updateMetaDataWithRoutingTable(metaData,routingTable);
- return new RoutingAllocation.Result(true, routingTable.validateRaiseException(newMetaData), newMetaData, explanations);
+
+ protected RoutingAllocation.Result buildChangedResult(MetaData oldMetaData, RoutingTable oldRoutingTable, RoutingNodes newRoutingNodes,
+ RoutingExplanations explanations) {
+ final RoutingTable newRoutingTable = new RoutingTable.Builder().updateNodes(newRoutingNodes).build();
+ MetaData newMetaData = updateMetaDataWithRoutingTable(oldMetaData, oldRoutingTable, newRoutingTable);
+ return new RoutingAllocation.Result(true, newRoutingTable.validateRaiseException(newMetaData), newMetaData, explanations);
}
/**
- * Updates the current {@link MetaData} based on the newly created {@link RoutingTable}.
+ * Updates the current {@link MetaData} based on the newly created {@link RoutingTable}. Specifically
+ * we update {@link IndexMetaData#getActiveAllocationIds()} and {@link IndexMetaData#primaryTerm(int)} based on
+ * the changes made during this allocation.
*
- * @param currentMetaData {@link MetaData} object from before the routing table was changed.
+ * @param oldMetaData {@link MetaData} object from before the routing table was changed.
+ * @param oldRoutingTable {@link RoutingTable} from before the change.
* @param newRoutingTable new {@link RoutingTable} created by the allocation change
* @return adapted {@link MetaData}, potentially the original one if no change was needed.
*/
- static MetaData updateMetaDataWithRoutingTable(MetaData currentMetaData, RoutingTable newRoutingTable) {
- // make sure index meta data and routing tables are in sync w.r.t active allocation ids
+ static MetaData updateMetaDataWithRoutingTable(MetaData oldMetaData, RoutingTable oldRoutingTable, RoutingTable newRoutingTable) {
MetaData.Builder metaDataBuilder = null;
- for (IndexRoutingTable indexRoutingTable : newRoutingTable) {
- final IndexMetaData indexMetaData = currentMetaData.index(indexRoutingTable.getIndex());
- if (indexMetaData == null) {
- throw new IllegalStateException("no metadata found for index " + indexRoutingTable.getIndex().getName());
+ for (IndexRoutingTable newIndexTable : newRoutingTable) {
+ final IndexMetaData oldIndexMetaData = oldMetaData.index(newIndexTable.getIndex());
+ if (oldIndexMetaData == null) {
+ throw new IllegalStateException("no metadata found for index " + newIndexTable.getIndex().getName());
}
IndexMetaData.Builder indexMetaDataBuilder = null;
- for (IndexShardRoutingTable shardRoutings : indexRoutingTable) {
- Set<String> activeAllocationIds = shardRoutings.activeShards().stream()
+ for (IndexShardRoutingTable newShardTable : newIndexTable) {
+ final ShardId shardId = newShardTable.shardId();
+
+ // update activeAllocationIds
+ Set<String> activeAllocationIds = newShardTable.activeShards().stream()
.map(ShardRouting::allocationId)
.filter(Objects::nonNull)
.map(AllocationId::getId)
@@ -145,19 +153,44 @@ public class AllocationService extends AbstractComponent {
// only update active allocation ids if there is an active shard
if (activeAllocationIds.isEmpty() == false) {
// get currently stored allocation ids
- Set<String> storedAllocationIds = indexMetaData.activeAllocationIds(shardRoutings.shardId().id());
+ Set<String> storedAllocationIds = oldIndexMetaData.activeAllocationIds(shardId.id());
if (activeAllocationIds.equals(storedAllocationIds) == false) {
if (indexMetaDataBuilder == null) {
- indexMetaDataBuilder = IndexMetaData.builder(indexMetaData);
+ indexMetaDataBuilder = IndexMetaData.builder(oldIndexMetaData);
}
+ indexMetaDataBuilder.putActiveAllocationIds(shardId.id(), activeAllocationIds);
+ }
+ }
- indexMetaDataBuilder.putActiveAllocationIds(shardRoutings.shardId().id(), activeAllocationIds);
+ // update primary terms
+ final ShardRouting newPrimary = newShardTable.primaryShard();
+ if (newPrimary == null) {
+ throw new IllegalStateException("missing primary shard for " + newShardTable.shardId());
+ }
+ final ShardRouting oldPrimary = oldRoutingTable.shardRoutingTable(shardId).primaryShard();
+ if (oldPrimary == null) {
+ throw new IllegalStateException("missing primary shard for " + newShardTable.shardId());
+ }
+ // we update the primary term on initial assignment or when a replica is promoted. Most notably we do *not*
+ // update them when a primary relocates
+ if (newPrimary.unassigned() ||
+ newPrimary.isSameAllocation(oldPrimary) ||
+ // we do not use newPrimary.isTargetRelocationOf(oldPrimary) because that one enforces newPrimary to
+ // be initializing. However, when the target shard is activated, we still want the primary term to staty
+ // the same
+ (oldPrimary.relocating() && newPrimary.isSameAllocation(oldPrimary.buildTargetRelocatingShard()))) {
+ // do nothing
+ } else {
+ // incrementing the primary term
+ if (indexMetaDataBuilder == null) {
+ indexMetaDataBuilder = IndexMetaData.builder(oldIndexMetaData);
}
+ indexMetaDataBuilder.primaryTerm(shardId.id(), oldIndexMetaData.primaryTerm(shardId.id()) + 1);
}
}
if (indexMetaDataBuilder != null) {
if (metaDataBuilder == null) {
- metaDataBuilder = MetaData.builder(currentMetaData);
+ metaDataBuilder = MetaData.builder(oldMetaData);
}
metaDataBuilder.put(indexMetaDataBuilder);
}
@@ -165,7 +198,7 @@ public class AllocationService extends AbstractComponent {
if (metaDataBuilder != null) {
return metaDataBuilder.build();
} else {
- return currentMetaData;
+ return oldMetaData;
}
}
@@ -196,7 +229,7 @@ public class AllocationService extends AbstractComponent {
}
gatewayAllocator.applyFailedShards(allocation);
reroute(allocation);
- final RoutingAllocation.Result result = buildChangedResult(clusterState.metaData(), routingNodes);
+ final RoutingAllocation.Result result = buildChangedResult(clusterState.metaData(), clusterState.routingTable(), routingNodes);
String failedShardsAsString = firstListElementsToCommaDelimitedString(failedShards, s -> s.shard.shardId().toString());
logClusterHealthStateChange(
new ClusterStateHealth(clusterState),
@@ -243,7 +276,7 @@ public class AllocationService extends AbstractComponent {
// the assumption is that commands will move / act on shards (or fail through exceptions)
// so, there will always be shard "movements", so no need to check on reroute
reroute(allocation);
- RoutingAllocation.Result result = buildChangedResult(clusterState.metaData(), routingNodes, explanations);
+ RoutingAllocation.Result result = buildChangedResult(clusterState.metaData(), clusterState.routingTable(), routingNodes, explanations);
logClusterHealthStateChange(
new ClusterStateHealth(clusterState),
new ClusterStateHealth(clusterState.getMetaData(), result.routingTable()),
@@ -252,6 +285,7 @@ public class AllocationService extends AbstractComponent {
return result;
}
+
/**
* Reroutes the routing table based on the live nodes.
* <p>
@@ -275,7 +309,7 @@ public class AllocationService extends AbstractComponent {
if (!reroute(allocation)) {
return new RoutingAllocation.Result(false, clusterState.routingTable(), clusterState.metaData());
}
- RoutingAllocation.Result result = buildChangedResult(clusterState.metaData(), routingNodes);
+ RoutingAllocation.Result result = buildChangedResult(clusterState.metaData(), clusterState.routingTable(), routingNodes);
logClusterHealthStateChange(
new ClusterStateHealth(clusterState),
new ClusterStateHealth(clusterState.getMetaData(), result.routingTable()),
@@ -412,8 +446,8 @@ public class AllocationService extends AbstractComponent {
boolean changed = false;
for (ShardRouting routing : replicas) {
changed |= applyFailedShard(allocation, routing, false,
- new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, "primary failed while replica initializing",
- null, allocation.getCurrentNanoTime(), System.currentTimeMillis()));
+ new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, "primary failed while replica initializing",
+ null, allocation.getCurrentNanoTime(), System.currentTimeMillis()));
}
return changed;
}