diff options
author | Yannick Welsch <yannick@welsch.lu> | 2017-06-28 15:48:47 +0200 |
---|---|---|
committer | Jason Tedor <jason@tedor.me> | 2017-06-28 09:48:47 -0400 |
commit | 5a4a47332c5a9a21531eb29c3ab4fd264e7fcf36 (patch) | |
tree | 0b2b4fe630b8a4c3d80f5882835c01304287332f /core/src | |
parent | ebdae09df3137e3dee3a7556de0909d2223ea96b (diff) |
Use a single method to update shard state
This commit refactors index shard to provide a single method for
updating the shard state on an incoming cluster state update.
Relates #25431
Diffstat (limited to 'core/src')
9 files changed, 265 insertions, 269 deletions
diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 5d39292a46..db0f27a28c 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -19,6 +19,7 @@ package org.elasticsearch.index.shard; +import org.apache.logging.log4j.Logger; import org.apache.lucene.index.CheckIndex; import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexOptions; @@ -283,7 +284,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl searcherWrapper = indexSearcherWrapper; primaryTerm = indexSettings.getIndexMetaData().primaryTerm(shardId.id()); refreshListeners = buildRefreshListeners(); - persistMetadata(shardRouting, null); + persistMetadata(path, indexSettings, shardRouting, null, logger); } public Store store() { @@ -343,86 +344,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl } /** - * Notifies the shard of an increase in the primary term. - * - * @param newPrimaryTerm the new primary term - * @param primaryReplicaSyncer the primary-replica resync action to trigger when a term is increased on a primary - */ - @Override - public void updatePrimaryTerm(final long newPrimaryTerm, - CheckedBiConsumer<IndexShard, ActionListener<ResyncTask>, IOException> primaryReplicaSyncer) { - assert shardRouting.primary() : "primary term can only be explicitly updated on a primary shard"; - synchronized (mutex) { - if (newPrimaryTerm != primaryTerm) { - // Note that due to cluster state batching an initializing primary shard term can failed and re-assigned - // in one state causing it's term to be incremented. Note that if both current shard state and new - // shard state are initializing, we could replace the current shard and reinitialize it. It is however - // possible that this shard is being started. This can happen if: - // 1) Shard is post recovery and sends shard started to the master - // 2) Node gets disconnected and rejoins - // 3) Master assigns the shard back to the node - // 4) Master processes the shard started and starts the shard - // 5) The node process the cluster state where the shard is both started and primary term is incremented. - // - // We could fail the shard in that case, but this will cause it to be removed from the insync allocations list - // potentially preventing re-allocation. - assert shardRouting.initializing() == false : - "a started primary shard should never update its term; " - + "shard " + shardRouting + ", " - + "current term [" + primaryTerm + "], " - + "new term [" + newPrimaryTerm + "]"; - assert newPrimaryTerm > primaryTerm : - "primary terms can only go up; current term [" + primaryTerm + "], new term [" + newPrimaryTerm + "]"; - /* - * Before this call returns, we are guaranteed that all future operations are delayed and so this happens before we - * increment the primary term. The latch is needed to ensure that we do not unblock operations before the primary term is - * incremented. - */ - final CountDownLatch latch = new CountDownLatch(1); - // to prevent primary relocation handoff while resync is not completed - boolean resyncStarted = primaryReplicaResyncInProgress.compareAndSet(false, true); - if (resyncStarted == false) { - throw new IllegalStateException("cannot start resync while it's already in progress"); - } - indexShardOperationPermits.asyncBlockOperations( - 30, - TimeUnit.MINUTES, - () -> { - latch.await(); - try { - getEngine().fillSeqNoGaps(newPrimaryTerm); - primaryReplicaSyncer.accept(IndexShard.this, new ActionListener<ResyncTask>() { - @Override - public void onResponse(ResyncTask resyncTask) { - logger.info("primary-replica resync completed with {} operations", - resyncTask.getResyncedOperations()); - boolean resyncCompleted = primaryReplicaResyncInProgress.compareAndSet(true, false); - assert resyncCompleted : "primary-replica resync finished but was not started"; - } - - @Override - public void onFailure(Exception e) { - boolean resyncCompleted = primaryReplicaResyncInProgress.compareAndSet(true, false); - assert resyncCompleted : "primary-replica resync finished but was not started"; - if (state == IndexShardState.CLOSED) { - // ignore, shutting down - } else { - failShard("exception during primary-replica resync", e); - } - } - }); - } catch (final AlreadyClosedException e) { - // okay, the index was deleted - } - }, - e -> failShard("exception during primary term transition", e)); - primaryTerm = newPrimaryTerm; - latch.countDown(); - } - } - } - - /** * Returns the latest cluster routing entry received with this shard. */ @Override @@ -434,50 +355,29 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl return cachingPolicy; } - /** - * Updates the shards routing entry. This mutate the shards internal state depending - * on the changes that get introduced by the new routing value. This method will persist shard level metadata. - * - * @throws IndexShardRelocatedException if shard is marked as relocated and relocation aborted - * @throws IOException if shard state could not be persisted - */ - public void updateRoutingEntry(ShardRouting newRouting) throws IOException { + + @Override + public void updateShardState(final ShardRouting newRouting, + final long newPrimaryTerm, + final CheckedBiConsumer<IndexShard, ActionListener<ResyncTask>, IOException> primaryReplicaSyncer, + final long applyingClusterStateVersion, + final Set<String> activeAllocationIds, + final Set<String> initializingAllocationIds) throws IOException { final ShardRouting currentRouting; synchronized (mutex) { currentRouting = this.shardRouting; + updateRoutingEntry(newRouting); - if (!newRouting.shardId().equals(shardId())) { - throw new IllegalArgumentException("Trying to set a routing entry with shardId " + newRouting.shardId() + " on a shard with shardId " + shardId()); - } - if ((currentRouting == null || newRouting.isSameAllocation(currentRouting)) == false) { - throw new IllegalArgumentException("Trying to set a routing entry with a different allocation. Current " + currentRouting + ", new " + newRouting); - } - if (currentRouting != null && currentRouting.primary() && newRouting.primary() == false) { - throw new IllegalArgumentException("illegal state: trying to move shard from primary mode to replica mode. Current " - + currentRouting + ", new " + newRouting); - } + if (shardRouting.primary()) { + updatePrimaryTerm(newPrimaryTerm, primaryReplicaSyncer); - if (state == IndexShardState.POST_RECOVERY && newRouting.active()) { - assert currentRouting.active() == false : "we are in POST_RECOVERY, but our shard routing is active " + currentRouting; - // we want to refresh *before* we move to internal STARTED state - try { - getEngine().refresh("cluster_state_started"); - } catch (Exception e) { - logger.debug("failed to refresh due to move to cluster wide started", e); + final Engine engine = getEngineOrNull(); + // if the engine is not yet started, we are not ready yet and can just ignore this + if (engine != null) { + engine.seqNoService().updateAllocationIdsFromMaster( + applyingClusterStateVersion, activeAllocationIds, initializingAllocationIds); } - changeState(IndexShardState.STARTED, "global state is [" + newRouting.state() + "]"); - } else if (state == IndexShardState.RELOCATED && - (newRouting.relocating() == false || newRouting.equalsIgnoringMetaData(currentRouting) == false)) { - // if the shard is marked as RELOCATED we have to fail when any changes in shard routing occur (e.g. due to recovery - // failure / cancellation). The reason is that at the moment we cannot safely move back to STARTED without risking two - // active primaries. - throw new IndexShardRelocatedException(shardId(), "Shard is marked as relocated, cannot safely move to state " + newRouting.state()); } - assert newRouting.active() == false || state == IndexShardState.STARTED || state == IndexShardState.RELOCATED || - state == IndexShardState.CLOSED : - "routing is active, but local shard state isn't. routing: " + newRouting + ", local state: " + state; - this.shardRouting = newRouting; - persistMetadata(newRouting, currentRouting); } if (currentRouting != null && currentRouting.active() == false && newRouting.active()) { indexEventListener.afterIndexShardStarted(this); @@ -487,6 +387,117 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl } } + private void updateRoutingEntry(ShardRouting newRouting) throws IOException { + assert Thread.holdsLock(mutex); + final ShardRouting currentRouting = this.shardRouting; + + if (!newRouting.shardId().equals(shardId())) { + throw new IllegalArgumentException("Trying to set a routing entry with shardId " + newRouting.shardId() + " on a shard with shardId " + shardId()); + } + if ((currentRouting == null || newRouting.isSameAllocation(currentRouting)) == false) { + throw new IllegalArgumentException("Trying to set a routing entry with a different allocation. Current " + currentRouting + ", new " + newRouting); + } + if (currentRouting != null && currentRouting.primary() && newRouting.primary() == false) { + throw new IllegalArgumentException("illegal state: trying to move shard from primary mode to replica mode. Current " + + currentRouting + ", new " + newRouting); + } + + if (state == IndexShardState.POST_RECOVERY && newRouting.active()) { + assert currentRouting.active() == false : "we are in POST_RECOVERY, but our shard routing is active " + currentRouting; + // we want to refresh *before* we move to internal STARTED state + try { + getEngine().refresh("cluster_state_started"); + } catch (Exception e) { + logger.debug("failed to refresh due to move to cluster wide started", e); + } + changeState(IndexShardState.STARTED, "global state is [" + newRouting.state() + "]"); + } else if (state == IndexShardState.RELOCATED && + (newRouting.relocating() == false || newRouting.equalsIgnoringMetaData(currentRouting) == false)) { + // if the shard is marked as RELOCATED we have to fail when any changes in shard routing occur (e.g. due to recovery + // failure / cancellation). The reason is that at the moment we cannot safely move back to STARTED without risking two + // active primaries. + throw new IndexShardRelocatedException(shardId(), "Shard is marked as relocated, cannot safely move to state " + newRouting.state()); + } + assert newRouting.active() == false || state == IndexShardState.STARTED || state == IndexShardState.RELOCATED || + state == IndexShardState.CLOSED : + "routing is active, but local shard state isn't. routing: " + newRouting + ", local state: " + state; + this.shardRouting = newRouting; + persistMetadata(path, indexSettings, newRouting, currentRouting, logger); + } + + private void updatePrimaryTerm( + final long newPrimaryTerm, final CheckedBiConsumer<IndexShard, ActionListener<ResyncTask>, IOException> primaryReplicaSyncer) { + assert Thread.holdsLock(mutex); + assert shardRouting.primary() : "primary term can only be explicitly updated on a primary shard"; + if (newPrimaryTerm != primaryTerm) { + /* Note that due to cluster state batching an initializing primary shard term can failed and re-assigned + * in one state causing it's term to be incremented. Note that if both current shard state and new + * shard state are initializing, we could replace the current shard and reinitialize it. It is however + * possible that this shard is being started. This can happen if: + * 1) Shard is post recovery and sends shard started to the master + * 2) Node gets disconnected and rejoins + * 3) Master assigns the shard back to the node + * 4) Master processes the shard started and starts the shard + * 5) The node process the cluster state where the shard is both started and primary term is incremented. + * + * We could fail the shard in that case, but this will cause it to be removed from the insync allocations list + * potentially preventing re-allocation. + */ + assert shardRouting.initializing() == false : + "a started primary shard should never update its term; " + + "shard " + shardRouting + ", " + + "current term [" + primaryTerm + "], " + + "new term [" + newPrimaryTerm + "]"; + assert newPrimaryTerm > primaryTerm : + "primary terms can only go up; current term [" + primaryTerm + "], new term [" + newPrimaryTerm + "]"; + /* + * Before this call returns, we are guaranteed that all future operations are delayed and so this happens before we + * increment the primary term. The latch is needed to ensure that we do not unblock operations before the primary term is + * incremented. + */ + final CountDownLatch latch = new CountDownLatch(1); + // to prevent primary relocation handoff while resync is not completed + boolean resyncStarted = primaryReplicaResyncInProgress.compareAndSet(false, true); + if (resyncStarted == false) { + throw new IllegalStateException("cannot start resync while it's already in progress"); + } + indexShardOperationPermits.asyncBlockOperations( + 30, + TimeUnit.MINUTES, + () -> { + latch.await(); + try { + getEngine().fillSeqNoGaps(newPrimaryTerm); + primaryReplicaSyncer.accept(IndexShard.this, new ActionListener<ResyncTask>() { + @Override + public void onResponse(ResyncTask resyncTask) { + logger.info("primary-replica resync completed with {} operations", + resyncTask.getResyncedOperations()); + boolean resyncCompleted = primaryReplicaResyncInProgress.compareAndSet(true, false); + assert resyncCompleted : "primary-replica resync finished but was not started"; + } + + @Override + public void onFailure(Exception e) { + boolean resyncCompleted = primaryReplicaResyncInProgress.compareAndSet(true, false); + assert resyncCompleted : "primary-replica resync finished but was not started"; + if (state == IndexShardState.CLOSED) { + // ignore, shutting down + } else { + failShard("exception during primary-replica resync", e); + } + } + }); + } catch (final AlreadyClosedException e) { + // okay, the index was deleted + } + }, + e -> failShard("exception during primary term transition", e)); + primaryTerm = newPrimaryTerm; + latch.countDown(); + } + } + /** * Marks the shard as recovering based on a recovery state, fails with exception is recovering is not allowed to be set. */ @@ -1684,25 +1695,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl } /** - * Notifies the service of the current allocation IDs in the cluster state. See - * {@link org.elasticsearch.index.seqno.GlobalCheckpointTracker#updateAllocationIdsFromMaster(long, Set, Set)} - * for details. - * - * @param applyingClusterStateVersion the cluster state version being applied when updating the allocation IDs from the master - * @param activeAllocationIds the allocation IDs of the currently active shard copies - * @param initializingAllocationIds the allocation IDs of the currently initializing shard copies - */ - public void updateAllocationIdsFromMaster( - final long applyingClusterStateVersion, final Set<String> activeAllocationIds, final Set<String> initializingAllocationIds) { - verifyPrimary(); - final Engine engine = getEngineOrNull(); - // if the engine is not yet started, we are not ready yet and can just ignore this - if (engine != null) { - engine.seqNoService().updateAllocationIdsFromMaster(applyingClusterStateVersion, activeAllocationIds, initializingAllocationIds); - } - } - - /** * Updates the known allocation IDs and the local checkpoints for the corresponding allocations from a primary relocation source. * * @param primaryContext the sequence number context @@ -1972,11 +1964,16 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl return engineFactory.newReadWriteEngine(config); } - // pkg private for testing - void persistMetadata(ShardRouting newRouting, @Nullable ShardRouting currentRouting) throws IOException { + private static void persistMetadata( + final ShardPath shardPath, + final IndexSettings indexSettings, + final ShardRouting newRouting, + final @Nullable ShardRouting currentRouting, + final Logger logger) throws IOException { assert newRouting != null : "newRouting must not be null"; // only persist metadata if routing information that is persisted in shard state metadata actually changed + final ShardId shardId = newRouting.shardId(); if (currentRouting == null || currentRouting.primary() != newRouting.primary() || currentRouting.allocationId().equals(newRouting.allocationId()) == false) { @@ -1988,17 +1985,14 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl writeReason = "routing changed from " + currentRouting + " to " + newRouting; } logger.trace("{} writing shard state, reason [{}]", shardId, writeReason); - final ShardStateMetaData newShardStateMetadata = new ShardStateMetaData(newRouting.primary(), getIndexUUID(), newRouting.allocationId()); - ShardStateMetaData.FORMAT.write(newShardStateMetadata, shardPath().getShardStatePath()); + final ShardStateMetaData newShardStateMetadata = + new ShardStateMetaData(newRouting.primary(), indexSettings.getUUID(), newRouting.allocationId()); + ShardStateMetaData.FORMAT.write(newShardStateMetadata, shardPath.getShardStatePath()); } else { logger.trace("{} skip writing shard state, has been written before", shardId); } } - private String getIndexUUID() { - return indexSettings.getUUID(); - } - private DocumentMapperForType docMapper(String type) { return mapperService.documentMapperWithAutoCreate(type); } diff --git a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index 81c0f601e1..97f57e216c 100644 --- a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -87,7 +87,6 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; -import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -558,21 +557,17 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple + "cluster state: " + shardRouting + " local: " + currentRoutingEntry; try { - shard.updateRoutingEntry(shardRouting); - if (shardRouting.primary()) { - final IndexShardRoutingTable indexShardRoutingTable = routingTable.shardRoutingTable(shardRouting.shardId()); - /* - * Filter to shards that track sequence numbers and should be taken into consideration for checkpoint tracking. Shards on - * old nodes will go through a file-based recovery which will also transfer sequence number information. - */ - final Set<String> activeIds = - allocationIdsForShardsOnNodesThatUnderstandSeqNos(indexShardRoutingTable.activeShards(), nodes); - final Set<String> initializingIds = - allocationIdsForShardsOnNodesThatUnderstandSeqNos(indexShardRoutingTable.getAllInitializingShards(), nodes); - shard.updatePrimaryTerm(clusterState.metaData().index(shard.shardId().getIndex()).primaryTerm(shard.shardId().id()), - primaryReplicaSyncer::resync); - shard.updateAllocationIdsFromMaster(clusterState.version(), activeIds, initializingIds); - } + final long primaryTerm = clusterState.metaData().index(shard.shardId().getIndex()).primaryTerm(shard.shardId().id()); + final IndexShardRoutingTable indexShardRoutingTable = routingTable.shardRoutingTable(shardRouting.shardId()); + /* + * Filter to shards that track sequence numbers and should be taken into consideration for checkpoint tracking. Shards on old + * nodes will go through a file-based recovery which will also transfer sequence number information. + */ + final Set<String> activeIds = allocationIdsForShardsOnNodesThatUnderstandSeqNos(indexShardRoutingTable.activeShards(), nodes); + final Set<String> initializingIds = + allocationIdsForShardsOnNodesThatUnderstandSeqNos(indexShardRoutingTable.getAllInitializingShards(), nodes); + shard.updateShardState( + shardRouting, primaryTerm, primaryReplicaSyncer::resync, clusterState.version(), activeIds, initializingIds); } catch (Exception e) { failAndRemoveShard(shardRouting, true, "failed updating shard routing entry", e, clusterState); return; @@ -739,33 +734,27 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple RecoveryState recoveryState(); /** - * Updates the shards routing entry. This mutate the shards internal state depending - * on the changes that get introduced by the new routing value. This method will persist shard level metadata. - * - * @throws IndexShardRelocatedException if shard is marked as relocated and relocation aborted - * @throws IOException if shard state could not be persisted - */ - void updateRoutingEntry(ShardRouting shardRouting) throws IOException; - - /** - * Update the primary term. This method should only be invoked on primary shards. - * - * @param primaryTerm the new primary term - * @param primaryReplicaSyncer the primary-replica resync action to trigger when a term is increased on a primary - */ - void updatePrimaryTerm(long primaryTerm, - CheckedBiConsumer<IndexShard, ActionListener<ResyncTask>, IOException> primaryReplicaSyncer); - - /** - * Notifies the service of the current allocation ids in the cluster state. + * Updates the shard state based on an incoming cluster state: + * - Updates and persists the new routing value. + * - Updates the primary term if this shard is a primary. + * - Updates the allocation ids that are tracked by the shard if it is a primary. * See {@link GlobalCheckpointTracker#updateAllocationIdsFromMaster(long, Set, Set)} for details. * + * @param shardRouting the new routing entry + * @param primaryTerm the new primary term + * @param primaryReplicaSyncer the primary-replica resync action to trigger when a term is increased on a primary * @param applyingClusterStateVersion the cluster state version being applied when updating the allocation IDs from the master * @param activeAllocationIds the allocation ids of the currently active shard copies * @param initializingAllocationIds the allocation ids of the currently initializing shard copies + * @throws IndexShardRelocatedException if shard is marked as relocated and relocation aborted + * @throws IOException if shard state could not be persisted */ - void updateAllocationIdsFromMaster( - long applyingClusterStateVersion, Set<String> activeAllocationIds, Set<String> initializingAllocationIds); + void updateShardState(ShardRouting shardRouting, + long primaryTerm, + CheckedBiConsumer<IndexShard, ActionListener<ResyncTask>, IOException> primaryReplicaSyncer, + long applyingClusterStateVersion, + Set<String> activeAllocationIds, + Set<String> initializingAllocationIds) throws IOException; } public interface AllocatedIndex<T extends Shard> extends Iterable<T>, IndexComponent { diff --git a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index b74596cda6..75a59a36d7 100644 --- a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -44,6 +44,7 @@ import org.elasticsearch.action.support.replication.TransportWriteActionTestHelp import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.AllocationId; import org.elasticsearch.cluster.routing.RecoverySource; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingHelper; @@ -223,9 +224,14 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase final DiscoveryNode pNode = getDiscoveryNode(primary.routingEntry().currentNodeId()); primary.markAsRecovering("store", new RecoveryState(primary.routingEntry(), pNode, null)); primary.recoverFromStore(); - primary.updateRoutingEntry(ShardRoutingHelper.moveToStarted(primary.routingEntry())); - clusterStateVersion++; - updateAllocationIDsOnPrimary(); + HashSet<String> activeIds = new HashSet<>(); + activeIds.addAll(activeIds()); + activeIds.add(primary.routingEntry().allocationId().getId()); + HashSet<String> initializingIds = new HashSet<>(); + initializingIds.addAll(initializingIds()); + initializingIds.remove(primary.routingEntry().allocationId().getId()); + primary.updateShardState(ShardRoutingHelper.moveToStarted(primary.routingEntry()), primary.getPrimaryTerm(), null, + ++clusterStateVersion, activeIds, initializingIds); for (final IndexShard replica : replicas) { recoverReplica(replica); } @@ -239,7 +245,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase return replica; } - public synchronized void addReplica(IndexShard replica) { + public synchronized void addReplica(IndexShard replica) throws IOException { assert shardRoutings().stream() .filter(shardRouting -> shardRouting.isSameAllocation(replica.routingEntry())).findFirst().isPresent() == false : "replica with aId [" + replica.routingEntry().allocationId() + "] already exists"; @@ -278,29 +284,43 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase assertTrue(replicas.remove(replica)); closeShards(primary); primary = replica; - primary.updateRoutingEntry(replica.routingEntry().moveActiveReplicaToPrimary()); - PlainActionFuture<PrimaryReplicaSyncer.ResyncTask> fut = new PlainActionFuture<>(); - primary.updatePrimaryTerm(newTerm, (shard, listener) -> primaryReplicaSyncer.resync(shard, - new ActionListener<PrimaryReplicaSyncer.ResyncTask>() { - @Override - public void onResponse(PrimaryReplicaSyncer.ResyncTask resyncTask) { - listener.onResponse(resyncTask); - fut.onResponse(resyncTask); - } + HashSet<String> activeIds = new HashSet<>(); + activeIds.addAll(activeIds()); + activeIds.add(replica.routingEntry().allocationId().getId()); + HashSet<String> initializingIds = new HashSet<>(); + initializingIds.addAll(initializingIds()); + initializingIds.remove(replica.routingEntry().allocationId().getId()); + primary.updateShardState(replica.routingEntry().moveActiveReplicaToPrimary(), + newTerm, (shard, listener) -> primaryReplicaSyncer.resync(shard, + new ActionListener<PrimaryReplicaSyncer.ResyncTask>() { + @Override + public void onResponse(PrimaryReplicaSyncer.ResyncTask resyncTask) { + listener.onResponse(resyncTask); + fut.onResponse(resyncTask); + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + fut.onFailure(e); + } + }), ++clusterStateVersion, activeIds, initializingIds); - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - fut.onFailure(e); - } - })); - clusterStateVersion++; - updateAllocationIDsOnPrimary(); return fut; } - synchronized boolean removeReplica(IndexShard replica) { + private synchronized Set<String> activeIds() { + return shardRoutings().stream() + .filter(ShardRouting::active).map(ShardRouting::allocationId).map(AllocationId::getId).collect(Collectors.toSet()); + } + + private synchronized Set<String> initializingIds() { + return shardRoutings().stream() + .filter(ShardRouting::initializing).map(ShardRouting::allocationId).map(AllocationId::getId).collect(Collectors.toSet()); + } + + synchronized boolean removeReplica(IndexShard replica) throws IOException { final boolean removed = replicas.remove(replica); if (removed) { clusterStateVersion++; @@ -401,17 +421,9 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase } } - private void updateAllocationIDsOnPrimary() { - Set<String> active = new HashSet<>(); - Set<String> initializing = new HashSet<>(); - for (ShardRouting shard: shardRoutings()) { - if (shard.active()) { - active.add(shard.allocationId().getId()); - } else { - initializing.add(shard.allocationId().getId()); - } - } - primary.updateAllocationIdsFromMaster(clusterStateVersion, active, initializing); + private void updateAllocationIDsOnPrimary() throws IOException { + primary.updateShardState(primary.routingEntry(), primary.getPrimaryTerm(), null, clusterStateVersion, + activeIds(), initializingIds()); } } diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java index 8b585c4e65..0c07f4cf77 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -525,16 +525,16 @@ public class IndexShardIT extends ESSingleNodeTestCase { } - public static final IndexShard recoverShard(IndexShard newShard) throws IOException { + public static final IndexShard recoverShard(IndexShard newShard) throws IOException { DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); newShard.markAsRecovering("store", new RecoveryState(newShard.routingEntry(), localNode, null)); assertTrue(newShard.recoverFromStore()); - newShard.updateRoutingEntry(newShard.routingEntry().moveToStarted()); + IndexShardTestCase.updateRoutingEntry(newShard, newShard.routingEntry().moveToStarted()); return newShard; } - public static final IndexShard newIndexShard(IndexService indexService, IndexShard shard, IndexSearcherWrapper wrapper, - IndexingOperationListener... listeners) throws IOException { + public static final IndexShard newIndexShard(IndexService indexService, IndexShard shard, IndexSearcherWrapper wrapper, + IndexingOperationListener... listeners) throws IOException { ShardRouting initializingShardRouting = getInitializingShardRouting(shard.routingEntry()); IndexShard newShard = new IndexShard(initializingShardRouting, indexService.getIndexSettings(), shard.shardPath(), shard.store(), indexService.getIndexSortSupplier(), indexService.cache(), indexService.mapperService(), indexService.similarityService(), diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 16baf57fd7..9093274a49 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -192,7 +192,7 @@ public class IndexShardTests extends IndexShardTestCase { ShardStateMetaData shardStateMetaData = load(logger, shardStatePath); assertEquals(getShardStateMetadata(shard), shardStateMetaData); ShardRouting routing = shard.shardRouting; - shard.updateRoutingEntry(routing); + IndexShardTestCase.updateRoutingEntry(shard, routing); shardStateMetaData = load(logger, shardStatePath); assertEquals(shardStateMetaData, getShardStateMetadata(shard)); @@ -200,7 +200,7 @@ public class IndexShardTests extends IndexShardTestCase { new ShardStateMetaData(routing.primary(), shard.indexSettings().getUUID(), routing.allocationId())); routing = TestShardRouting.relocate(shard.shardRouting, "some node", 42L); - shard.updateRoutingEntry(routing); + IndexShardTestCase.updateRoutingEntry(shard, routing); shardStateMetaData = load(logger, shardStatePath); assertEquals(shardStateMetaData, getShardStateMetadata(shard)); assertEquals(shardStateMetaData, @@ -345,8 +345,8 @@ public class IndexShardTests extends IndexShardTestCase { true, ShardRoutingState.STARTED, replicaRouting.allocationId()); - indexShard.updateRoutingEntry(primaryRouting); - indexShard.updatePrimaryTerm(indexShard.getPrimaryTerm() + 1, (shard, listener) -> {}); + indexShard.updateShardState(primaryRouting, indexShard.getPrimaryTerm() + 1, (shard, listener) -> {}, + 0L, Collections.emptySet(), Collections.emptySet()); final int delayedOperations = scaledRandomIntBetween(1, 64); final CyclicBarrier delayedOperationsBarrier = new CyclicBarrier(1 + delayedOperations); @@ -436,8 +436,8 @@ public class IndexShardTests extends IndexShardTestCase { true, ShardRoutingState.STARTED, replicaRouting.allocationId()); - indexShard.updateRoutingEntry(primaryRouting); - indexShard.updatePrimaryTerm(indexShard.getPrimaryTerm() + 1, (shard, listener) -> {}); + indexShard.updateShardState(primaryRouting, indexShard.getPrimaryTerm() + 1, (shard, listener) -> {}, + 0L, Collections.emptySet(), Collections.emptySet()); /* * This operation completing means that the delay operation executed as part of increasing the primary term has completed and the @@ -478,8 +478,8 @@ public class IndexShardTests extends IndexShardTestCase { ShardRouting replicaRouting = indexShard.routingEntry(); ShardRouting primaryRouting = TestShardRouting.newShardRouting(replicaRouting.shardId(), replicaRouting.currentNodeId(), null, true, ShardRoutingState.STARTED, replicaRouting.allocationId()); - indexShard.updateRoutingEntry(primaryRouting); - indexShard.updatePrimaryTerm(indexShard.getPrimaryTerm() + 1, (shard, listener) -> {}); + indexShard.updateShardState(primaryRouting, indexShard.getPrimaryTerm() + 1, (shard, listener) -> {}, + 0L, Collections.emptySet(), Collections.emptySet()); } else { indexShard = newStartedShard(true); } @@ -545,7 +545,7 @@ public class IndexShardTests extends IndexShardTestCase { ShardRouting routing = indexShard.routingEntry(); routing = TestShardRouting.newShardRouting(routing.shardId(), routing.currentNodeId(), "otherNode", true, ShardRoutingState.RELOCATING, AllocationId.newRelocation(routing.allocationId())); - indexShard.updateRoutingEntry(routing); + IndexShardTestCase.updateRoutingEntry(indexShard, routing); indexShard.relocated("test", primaryContext -> {}); engineClosed = false; break; @@ -830,7 +830,7 @@ public class IndexShardTests extends IndexShardTestCase { snapshot = newShard.snapshotStoreMetadata(); assertThat(snapshot.getSegmentsFile().name(), equalTo("segments_2")); - newShard.updateRoutingEntry(newShard.routingEntry().moveToStarted()); + IndexShardTestCase.updateRoutingEntry(newShard, newShard.routingEntry().moveToStarted()); snapshot = newShard.snapshotStoreMetadata(); assertThat(snapshot.getSegmentsFile().name(), equalTo("segments_2")); @@ -1088,7 +1088,7 @@ public class IndexShardTests extends IndexShardTestCase { public void testLockingBeforeAndAfterRelocated() throws Exception { final IndexShard shard = newStartedShard(true); - shard.updateRoutingEntry(ShardRoutingHelper.relocate(shard.routingEntry(), "other_node")); + IndexShardTestCase.updateRoutingEntry(shard, ShardRoutingHelper.relocate(shard.routingEntry(), "other_node")); CountDownLatch latch = new CountDownLatch(1); Thread recoveryThread = new Thread(() -> { latch.countDown(); @@ -1119,7 +1119,7 @@ public class IndexShardTests extends IndexShardTestCase { public void testDelayedOperationsBeforeAndAfterRelocated() throws Exception { final IndexShard shard = newStartedShard(true); - shard.updateRoutingEntry(ShardRoutingHelper.relocate(shard.routingEntry(), "other_node")); + IndexShardTestCase.updateRoutingEntry(shard, ShardRoutingHelper.relocate(shard.routingEntry(), "other_node")); Thread recoveryThread = new Thread(() -> { try { shard.relocated("simulated recovery", primaryContext -> {}); @@ -1153,7 +1153,7 @@ public class IndexShardTests extends IndexShardTestCase { public void testStressRelocated() throws Exception { final IndexShard shard = newStartedShard(true); - shard.updateRoutingEntry(ShardRoutingHelper.relocate(shard.routingEntry(), "other_node")); + IndexShardTestCase.updateRoutingEntry(shard, ShardRoutingHelper.relocate(shard.routingEntry(), "other_node")); final int numThreads = randomIntBetween(2, 4); Thread[] indexThreads = new Thread[numThreads]; CountDownLatch allPrimaryOperationLocksAcquired = new CountDownLatch(numThreads); @@ -1208,17 +1208,17 @@ public class IndexShardTests extends IndexShardTestCase { public void testRelocatedShardCanNotBeRevived() throws IOException, InterruptedException { final IndexShard shard = newStartedShard(true); final ShardRouting originalRouting = shard.routingEntry(); - shard.updateRoutingEntry(ShardRoutingHelper.relocate(originalRouting, "other_node")); + IndexShardTestCase.updateRoutingEntry(shard, ShardRoutingHelper.relocate(originalRouting, "other_node")); shard.relocated("test", primaryContext -> {}); - expectThrows(IllegalIndexShardStateException.class, () -> shard.updateRoutingEntry(originalRouting)); + expectThrows(IllegalIndexShardStateException.class, () -> IndexShardTestCase.updateRoutingEntry(shard, originalRouting)); closeShards(shard); } public void testShardCanNotBeMarkedAsRelocatedIfRelocationCancelled() throws IOException, InterruptedException { final IndexShard shard = newStartedShard(true); final ShardRouting originalRouting = shard.routingEntry(); - shard.updateRoutingEntry(ShardRoutingHelper.relocate(originalRouting, "other_node")); - shard.updateRoutingEntry(originalRouting); + IndexShardTestCase.updateRoutingEntry(shard, ShardRoutingHelper.relocate(originalRouting, "other_node")); + IndexShardTestCase.updateRoutingEntry(shard, originalRouting); expectThrows(IllegalIndexShardStateException.class, () -> shard.relocated("test", primaryContext -> {})); closeShards(shard); } @@ -1227,7 +1227,7 @@ public class IndexShardTests extends IndexShardTestCase { public void testRelocatedShardCanNotBeRevivedConcurrently() throws IOException, InterruptedException, BrokenBarrierException { final IndexShard shard = newStartedShard(true); final ShardRouting originalRouting = shard.routingEntry(); - shard.updateRoutingEntry(ShardRoutingHelper.relocate(originalRouting, "other_node")); + IndexShardTestCase.updateRoutingEntry(shard, ShardRoutingHelper.relocate(originalRouting, "other_node")); CyclicBarrier cyclicBarrier = new CyclicBarrier(3); AtomicReference<Exception> relocationException = new AtomicReference<>(); Thread relocationThread = new Thread(new AbstractRunnable() { @@ -1253,7 +1253,7 @@ public class IndexShardTests extends IndexShardTestCase { @Override protected void doRun() throws Exception { cyclicBarrier.await(); - shard.updateRoutingEntry(originalRouting); + IndexShardTestCase.updateRoutingEntry(shard, originalRouting); } }); cancellingThread.start(); @@ -1291,7 +1291,7 @@ public class IndexShardTests extends IndexShardTestCase { assertEquals(translogOps, newShard.recoveryState().getTranslog().totalOperations()); assertEquals(translogOps, newShard.recoveryState().getTranslog().totalOperationsOnStart()); assertEquals(100.0f, newShard.recoveryState().getTranslog().recoveredPercent(), 0.01f); - newShard.updateRoutingEntry(newShard.routingEntry().moveToStarted()); + IndexShardTestCase.updateRoutingEntry(newShard, newShard.routingEntry().moveToStarted()); assertDocCount(newShard, 1); closeShards(newShard); } @@ -1330,7 +1330,7 @@ public class IndexShardTests extends IndexShardTestCase { } } assertEquals(1, numNoops); - newShard.updateRoutingEntry(newShard.routingEntry().moveToStarted()); + IndexShardTestCase.updateRoutingEntry(newShard, newShard.routingEntry().moveToStarted()); assertDocCount(newShard, 1); assertDocCount(shard, 2); closeShards(newShard, shard); @@ -1354,7 +1354,7 @@ public class IndexShardTests extends IndexShardTestCase { assertEquals(0, newShard.recoveryState().getTranslog().totalOperations()); assertEquals(0, newShard.recoveryState().getTranslog().totalOperationsOnStart()); assertEquals(100.0f, newShard.recoveryState().getTranslog().recoveredPercent(), 0.01f); - newShard.updateRoutingEntry(newShard.routingEntry().moveToStarted()); + IndexShardTestCase.updateRoutingEntry(newShard, newShard.routingEntry().moveToStarted()); assertDocCount(newShard, 0); closeShards(newShard); } @@ -1397,7 +1397,7 @@ public class IndexShardTests extends IndexShardTestCase { newShard.markAsRecovering("store", new RecoveryState(newShard.routingEntry(), localNode, null)); assertTrue("recover even if there is nothing to recover", newShard.recoverFromStore()); - newShard.updateRoutingEntry(newShard.routingEntry().moveToStarted()); + IndexShardTestCase.updateRoutingEntry(newShard, newShard.routingEntry().moveToStarted()); assertDocCount(newShard, 0); // we can't issue this request through a client because of the inconsistencies we created with the cluster state // doing it directly instead @@ -1413,11 +1413,11 @@ public class IndexShardTests extends IndexShardTestCase { ShardRouting origRouting = shard.routingEntry(); assertThat(shard.state(), equalTo(IndexShardState.STARTED)); ShardRouting inRecoveryRouting = ShardRoutingHelper.relocate(origRouting, "some_node"); - shard.updateRoutingEntry(inRecoveryRouting); + IndexShardTestCase.updateRoutingEntry(shard, inRecoveryRouting); shard.relocated("simulate mark as relocated", primaryContext -> {}); assertThat(shard.state(), equalTo(IndexShardState.RELOCATED)); try { - shard.updateRoutingEntry(origRouting); + IndexShardTestCase.updateRoutingEntry(shard, origRouting); fail("Expected IndexShardRelocatedException"); } catch (IndexShardRelocatedException expected) { } @@ -1466,7 +1466,7 @@ public class IndexShardTests extends IndexShardTestCase { } })); - target.updateRoutingEntry(routing.moveToStarted()); + IndexShardTestCase.updateRoutingEntry(target, routing.moveToStarted()); assertDocs(target, "0"); closeShards(source, target); @@ -1843,7 +1843,7 @@ public class IndexShardTests extends IndexShardTestCase { assertEquals(file.recovered(), file.length()); } } - targetShard.updateRoutingEntry(ShardRoutingHelper.moveToStarted(targetShard.routingEntry())); + IndexShardTestCase.updateRoutingEntry(targetShard, ShardRoutingHelper.moveToStarted(targetShard.routingEntry())); assertDocCount(targetShard, 2); } // now check that it's persistent ie. that the added shards are committed diff --git a/core/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java b/core/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java index bf0b728674..c2c44421b8 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java @@ -62,7 +62,8 @@ public class PrimaryReplicaSyncerTests extends IndexShardTestCase { boolean syncNeeded = numDocs > 0 && globalCheckPoint < numDocs - 1; String allocationId = shard.routingEntry().allocationId().getId(); - shard.updateAllocationIdsFromMaster(randomNonNegativeLong(), Collections.singleton(allocationId), Collections.emptySet()); + shard.updateShardState(shard.routingEntry(), shard.getPrimaryTerm(), null, 1000L, Collections.singleton(allocationId), + Collections.emptySet()); shard.updateLocalCheckpointForShard(allocationId, globalCheckPoint); assertEquals(globalCheckPoint, shard.getGlobalCheckpoint()); diff --git a/core/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java b/core/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java index 0c34bb54eb..d8367b0d6a 100644 --- a/core/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java +++ b/core/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java @@ -31,6 +31,7 @@ import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.shard.IndexSearcherWrapper; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardIT; +import org.elasticsearch.index.shard.IndexShardTestCase; import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.test.ESSingleNodeTestCase; import org.elasticsearch.threadpool.ThreadPool; @@ -450,7 +451,7 @@ public class IndexingMemoryControllerTests extends ESSingleNodeTestCase { assertEquals(1, imc.availableShards().size()); assertTrue(newShard.recoverFromStore()); assertTrue("we should have flushed in IMC at least once but did: " + flushes.get(), flushes.get() >= 1); - newShard.updateRoutingEntry(routing.moveToStarted()); + IndexShardTestCase.updateRoutingEntry(newShard, routing.moveToStarted()); } finally { newShard.close("simon says", false); } diff --git a/core/src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerSingleNodeTests.java b/core/src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerSingleNodeTests.java index 1fe4def962..0dc760d63b 100644 --- a/core/src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerSingleNodeTests.java +++ b/core/src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerSingleNodeTests.java @@ -31,6 +31,7 @@ import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.shard.IndexEventListener; import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.IndexShardTestCase; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason; import org.elasticsearch.indices.recovery.RecoveryState; @@ -130,14 +131,14 @@ public class IndicesLifecycleListenerSingleNodeTests extends ESSingleNodeTestCas .updateUnassigned(unassignedInfo, RecoverySource.StoreRecoverySource.EMPTY_STORE_INSTANCE); newRouting = ShardRoutingHelper.initialize(newRouting, nodeId); IndexShard shard = index.createShard(newRouting); - shard.updateRoutingEntry(newRouting); + IndexShardTestCase.updateRoutingEntry(shard, newRouting); assertEquals(5, counter.get()); final DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); shard.markAsRecovering("store", new RecoveryState(newRouting, localNode, null)); shard.recoverFromStore(); newRouting = ShardRoutingHelper.moveToStarted(newRouting); - shard.updateRoutingEntry(newRouting); + IndexShardTestCase.updateRoutingEntry(shard, newRouting); assertEquals(6, counter.get()); } finally { indicesService.removeIndex(idx, DELETED, "simon says"); diff --git a/core/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java b/core/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java index 73e2d7eb0b..419b7a430d 100644 --- a/core/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java +++ b/core/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java @@ -35,6 +35,7 @@ import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.shard.IndexEventListener; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardState; +import org.elasticsearch.index.shard.PrimaryReplicaSyncer; import org.elasticsearch.index.shard.PrimaryReplicaSyncer.ResyncTask; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; @@ -345,17 +346,12 @@ public abstract class AbstractIndicesClusterStateServiceTestCase extends ESTestC } @Override - public ShardRouting routingEntry() { - return shardRouting; - } - - @Override - public IndexShardState state() { - return null; - } - - @Override - public void updateRoutingEntry(ShardRouting shardRouting) throws IOException { + public void updateShardState(ShardRouting shardRouting, + long newPrimaryTerm, + CheckedBiConsumer<IndexShard, ActionListener<ResyncTask>, IOException> primaryReplicaSyncer, + long applyingClusterStateVersion, + Set<String> activeAllocationIds, + Set<String> initializingAllocationIds) throws IOException { failRandomly(); assertThat(this.shardId(), equalTo(shardRouting.shardId())); assertTrue("current: " + this.shardRouting + ", got: " + shardRouting, this.shardRouting.isSameAllocation(shardRouting)); @@ -364,20 +360,22 @@ public abstract class AbstractIndicesClusterStateServiceTestCase extends ESTestC shardRouting.active()); } this.shardRouting = shardRouting; + if (shardRouting.primary()) { + term = newPrimaryTerm; + this.clusterStateVersion = applyingClusterStateVersion; + this.activeAllocationIds = activeAllocationIds; + this.initializingAllocationIds = initializingAllocationIds; + } } @Override - public void updatePrimaryTerm(final long newPrimaryTerm, - CheckedBiConsumer<IndexShard, ActionListener<ResyncTask>, IOException> primaryReplicaSyncer) { - term = newPrimaryTerm; + public ShardRouting routingEntry() { + return shardRouting; } @Override - public void updateAllocationIdsFromMaster( - long applyingClusterStateVersion, Set<String> activeAllocationIds, Set<String> initializingAllocationIds) { - this.clusterStateVersion = applyingClusterStateVersion; - this.activeAllocationIds = activeAllocationIds; - this.initializingAllocationIds = initializingAllocationIds; + public IndexShardState state() { + return null; } public void updateTerm(long newTerm) { |