/* * Licensed to Elasticsearch under one or more contributor * license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright * ownership. Elasticsearch licenses this file to you under * the Apache License, Version 2.0 (the "License"); you may * not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ package org.elasticsearch.index.replication; import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.replication.ReplicationOperation; import org.elasticsearch.action.support.replication.ReplicationRequest; import org.elasticsearch.action.support.replication.ReplicationResponse; import org.elasticsearch.action.support.replication.TransportReplicationAction.ReplicaResponse; import org.elasticsearch.action.support.replication.TransportWriteActionTestHelper; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.RecoverySource; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingHelper; import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.TestShardRouting; import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.Index; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardTestCase; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardPath; import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.indices.recovery.RecoveryTarget; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.Future; import java.util.concurrent.FutureTask; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.StreamSupport; import static org.elasticsearch.action.bulk.TransportShardBulkAction.executeIndexRequestOnPrimary; import static org.elasticsearch.action.bulk.TransportShardBulkAction.executeIndexRequestOnReplica; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase { protected final Index index = new Index("test", "uuid"); private final ShardId shardId = new ShardId(index, 0); private final Map indexMapping = Collections.singletonMap("type", "{ \"type\": {} }"); protected ReplicationGroup createGroup(int replicas) throws IOException { Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, replicas) .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) .build(); IndexMetaData.Builder metaData = IndexMetaData.builder(index.getName()) .settings(settings) .primaryTerm(0, 1); for (Map.Entry typeMapping : indexMapping.entrySet()) { metaData.putMapping(typeMapping.getKey(), typeMapping.getValue()); } return new ReplicationGroup(metaData.build()); } protected DiscoveryNode getDiscoveryNode(String id) { return new DiscoveryNode(id, id, buildNewFakeTransportAddress(), Collections.emptyMap(), Collections.singleton(DiscoveryNode.Role.DATA), Version.CURRENT); } protected class ReplicationGroup implements AutoCloseable, Iterable { private IndexShard primary; private IndexMetaData indexMetaData; private final List replicas; private final AtomicInteger replicaId = new AtomicInteger(); private final AtomicInteger docId = new AtomicInteger(); boolean closed = false; ReplicationGroup(final IndexMetaData indexMetaData) throws IOException { primary = newShard(shardId, true, "s0", indexMetaData, this::syncGlobalCheckpoint, null); replicas = new ArrayList<>(); this.indexMetaData = indexMetaData; updateAllocationIDsOnPrimary(); for (int i = 0; i < indexMetaData.getNumberOfReplicas(); i++) { addReplica(); } } public int indexDocs(final int numOfDoc) throws Exception { for (int doc = 0; doc < numOfDoc; doc++) { final IndexRequest indexRequest = new IndexRequest(index.getName(), "type", Integer.toString(docId.incrementAndGet())) .source("{}", XContentType.JSON); final IndexResponse response = index(indexRequest); assertEquals(DocWriteResponse.Result.CREATED, response.getResult()); } primary.updateGlobalCheckpointOnPrimary(); return numOfDoc; } public int appendDocs(final int numOfDoc) throws Exception { for (int doc = 0; doc < numOfDoc; doc++) { final IndexRequest indexRequest = new IndexRequest(index.getName(), "type").source("{}", XContentType.JSON); final IndexResponse response = index(indexRequest); assertEquals(DocWriteResponse.Result.CREATED, response.getResult()); } primary.updateGlobalCheckpointOnPrimary(); return numOfDoc; } public IndexResponse index(IndexRequest indexRequest) throws Exception { PlainActionFuture listener = new PlainActionFuture<>(); new IndexingAction(indexRequest, listener, this).execute(); return listener.get(); } public synchronized void startAll() throws IOException { startReplicas(replicas.size()); } public synchronized int startReplicas(int numOfReplicasToStart) throws IOException { if (primary.routingEntry().initializing()) { startPrimary(); } int started = 0; for (IndexShard replicaShard : replicas) { if (replicaShard.routingEntry().initializing()) { recoverReplica(replicaShard); started++; if (started > numOfReplicasToStart) { break; } } } return started; } public void startPrimary() throws IOException { final DiscoveryNode pNode = getDiscoveryNode(primary.routingEntry().currentNodeId()); primary.markAsRecovering("store", new RecoveryState(primary.routingEntry(), pNode, null)); primary.recoverFromStore(); primary.updateRoutingEntry(ShardRoutingHelper.moveToStarted(primary.routingEntry())); updateAllocationIDsOnPrimary(); } public synchronized IndexShard addReplica() throws IOException { final IndexShard replica = newShard(shardId, false, "s" + replicaId.incrementAndGet(), indexMetaData, this::syncGlobalCheckpoint, null); replicas.add(replica); updateAllocationIDsOnPrimary(); return replica; } public synchronized IndexShard addReplicaWithExistingPath(final ShardPath shardPath, final String nodeId) throws IOException { final ShardRouting shardRouting = TestShardRouting.newShardRouting( shardId, nodeId, false, ShardRoutingState.INITIALIZING, RecoverySource.PeerRecoverySource.INSTANCE); final IndexShard newReplica = newShard(shardRouting, shardPath, indexMetaData, null, this::syncGlobalCheckpoint); replicas.add(newReplica); updateAllocationIDsOnPrimary(); return newReplica; } public synchronized List getReplicas() { return Collections.unmodifiableList(replicas); } /** * promotes the specific replica as the new primary */ public synchronized void promoteReplicaToPrimary(IndexShard replica) throws IOException { final long newTerm = indexMetaData.primaryTerm(shardId.id()) + 1; IndexMetaData.Builder newMetaData = IndexMetaData.builder(indexMetaData).primaryTerm(shardId.id(), newTerm); indexMetaData = newMetaData.build(); for (IndexShard shard: replicas) { shard.updatePrimaryTerm(newTerm); } boolean found = replicas.remove(replica); assert found; primary = replica; replica.updateRoutingEntry(replica.routingEntry().moveActiveReplicaToPrimary()); updateAllocationIDsOnPrimary(); } synchronized boolean removeReplica(IndexShard replica) { final boolean removed = replicas.remove(replica); if (removed) { updateAllocationIDsOnPrimary(); } return removed; } public void recoverReplica(IndexShard replica) throws IOException { recoverReplica(replica, (r, sourceNode) -> new RecoveryTarget(r, sourceNode, recoveryListener, version -> {})); } public void recoverReplica(IndexShard replica, BiFunction targetSupplier) throws IOException { recoverReplica(replica, targetSupplier, true); } public void recoverReplica( IndexShard replica, BiFunction targetSupplier, boolean markAsRecovering) throws IOException { ESIndexLevelReplicationTestCase.this.recoverReplica(replica, primary, targetSupplier, markAsRecovering); updateAllocationIDsOnPrimary(); } public synchronized DiscoveryNode getPrimaryNode() { return getDiscoveryNode(primary.routingEntry().currentNodeId()); } public Future asyncRecoverReplica(IndexShard replica, BiFunction targetSupplier) throws IOException { FutureTask task = new FutureTask<>(() -> { recoverReplica(replica, targetSupplier); return null; }); threadPool.generic().execute(task); return task; } public synchronized void assertAllEqual(int expectedCount) throws IOException { Set primaryIds = getShardDocUIDs(primary); assertThat(primaryIds.size(), equalTo(expectedCount)); for (IndexShard replica : replicas) { Set replicaIds = getShardDocUIDs(replica); Set temp = new HashSet<>(primaryIds); temp.removeAll(replicaIds); assertThat(replica.routingEntry() + " is missing docs", temp, empty()); temp = new HashSet<>(replicaIds); temp.removeAll(primaryIds); assertThat(replica.routingEntry() + " has extra docs", temp, empty()); } } public synchronized void refresh(String source) { for (IndexShard shard : this) { shard.refresh(source); } } public synchronized void flush() { final FlushRequest request = new FlushRequest(); for (IndexShard shard : this) { shard.flush(request); } } public synchronized List shardRoutings() { return StreamSupport.stream(this.spliterator(), false).map(IndexShard::routingEntry).collect(Collectors.toList()); } @Override public synchronized void close() throws Exception { if (closed == false) { closed = true; closeShards(this); } else { throw new AlreadyClosedException("too bad"); } } @Override public Iterator iterator() { return Iterators.concat(replicas.iterator(), Collections.singleton(primary).iterator()); } public IndexShard getPrimary() { return primary; } private void syncGlobalCheckpoint() { PlainActionFuture listener = new PlainActionFuture<>(); try { new GlobalCheckpointSync(listener, this).execute(); listener.get(); } catch (Exception e) { throw new AssertionError(e); } } private void updateAllocationIDsOnPrimary() { Set active = new HashSet<>(); Set initializing = new HashSet<>(); for (ShardRouting shard: shardRoutings()) { if (shard.active()) { active.add(shard.allocationId().getId()); } else { initializing.add(shard.allocationId().getId()); } } primary.updateAllocationIdsFromMaster(active, initializing); } } abstract class ReplicationAction, ReplicaRequest extends ReplicationRequest, Response extends ReplicationResponse> { private final Request request; private ActionListener listener; private final ReplicationGroup replicationGroup; private final String opType; public ReplicationAction(Request request, ActionListener listener, ReplicationGroup group, String opType) { this.request = request; this.listener = listener; this.replicationGroup = group; this.opType = opType; } public void execute() throws Exception { new ReplicationOperation(request, new PrimaryRef(), new ActionListener() { @Override public void onResponse(PrimaryResult result) { result.respond(listener); } @Override public void onFailure(Exception e) { listener.onFailure(e); } }, true, new ReplicasRef(), () -> null, logger, opType) { @Override protected List getShards(ShardId shardId, ClusterState state) { return replicationGroup.shardRoutings(); } @Override protected String checkActiveShardCount() { return null; } @Override protected Set getInSyncAllocationIds(ShardId shardId, ClusterState clusterState) { return replicationGroup.shardRoutings().stream().filter(ShardRouting::active).map(r -> r.allocationId().getId()) .collect(Collectors.toSet()); } }.execute(); } protected abstract PrimaryResult performOnPrimary(IndexShard primary, Request request) throws Exception; protected abstract void performOnReplica(ReplicaRequest request, IndexShard replica) throws IOException; class PrimaryRef implements ReplicationOperation.Primary { @Override public ShardRouting routingEntry() { return replicationGroup.primary.routingEntry(); } @Override public void failShard(String message, Exception exception) { throw new UnsupportedOperationException(); } @Override public PrimaryResult perform(Request request) throws Exception { PrimaryResult response = performOnPrimary(replicationGroup.primary, request); response.replicaRequest().primaryTerm(replicationGroup.primary.getPrimaryTerm()); return response; } @Override public void updateLocalCheckpointForShard(String allocationId, long checkpoint) { replicationGroup.getPrimary().updateLocalCheckpointForShard(allocationId, checkpoint); } @Override public long localCheckpoint() { return replicationGroup.getPrimary().getLocalCheckpoint(); } } class ReplicasRef implements ReplicationOperation.Replicas { @Override public void performOn( ShardRouting replicaRouting, ReplicaRequest request, ActionListener listener) { try { IndexShard replica = replicationGroup.replicas.stream() .filter(s -> replicaRouting.isSameAllocation(s.routingEntry())).findFirst().get(); performOnReplica(request, replica); listener.onResponse(new ReplicaResponse(replica.routingEntry().allocationId().getId(), replica.getLocalCheckpoint())); } catch (Exception e) { listener.onFailure(e); } } @Override public void failShard(ShardRouting replica, long primaryTerm, String message, Exception exception, Runnable onSuccess, Consumer onPrimaryDemoted, Consumer onIgnoredFailure) { throw new UnsupportedOperationException(); } @Override public void markShardCopyAsStale(ShardId shardId, String allocationId, long primaryTerm, Runnable onSuccess, Consumer onPrimaryDemoted, Consumer onIgnoredFailure) { throw new UnsupportedOperationException(); } } class PrimaryResult implements ReplicationOperation.PrimaryResult { final ReplicaRequest replicaRequest; final Response finalResponse; public PrimaryResult(ReplicaRequest replicaRequest, Response finalResponse) { this.replicaRequest = replicaRequest; this.finalResponse = finalResponse; } @Override public ReplicaRequest replicaRequest() { return replicaRequest; } @Override public void setShardInfo(ReplicationResponse.ShardInfo shardInfo) { finalResponse.setShardInfo(shardInfo); } public void respond(ActionListener listener) { listener.onResponse(finalResponse); } } } class IndexingAction extends ReplicationAction { public IndexingAction(IndexRequest request, ActionListener listener, ReplicationGroup replicationGroup) { super(request, listener, replicationGroup, "indexing"); request.process(null, true, request.index()); } @Override protected PrimaryResult performOnPrimary(IndexShard primary, IndexRequest request) throws Exception { IndexResponse response = indexOnPrimary(request, primary); return new PrimaryResult(request, response); } @Override protected void performOnReplica(IndexRequest request, IndexShard replica) throws IOException { indexOnReplica(request, replica); } } /** * indexes the given requests on the supplied primary, modifying it for replicas */ protected IndexResponse indexOnPrimary(IndexRequest request, IndexShard primary) throws Exception { final Engine.IndexResult indexResult = executeIndexRequestOnPrimary(request, primary, null); if (indexResult.hasFailure() == false) { // update the version on request so it will happen on the replicas final long version = indexResult.getVersion(); request.version(version); request.versionType(request.versionType().versionTypeForReplicationAndRecovery()); request.setSeqNo(indexResult.getSeqNo()); assert request.versionType().validateVersionForWrites(request.version()); } request.primaryTerm(primary.getPrimaryTerm()); TransportWriteActionTestHelper.performPostWriteActions(primary, request, indexResult.getTranslogLocation(), logger); return new IndexResponse( primary.shardId(), request.type(), request.id(), indexResult.getSeqNo(), indexResult.getVersion(), indexResult.isCreated()); } /** * indexes the given requests on the supplied replica shard */ protected void indexOnReplica(IndexRequest request, IndexShard replica) throws IOException { final Engine.IndexResult result = executeIndexRequestOnReplica(request, replica); TransportWriteActionTestHelper.performPostWriteActions(replica, request, result.getTranslogLocation(), logger); } class GlobalCheckpointSync extends ReplicationAction { public GlobalCheckpointSync(ActionListener listener, ReplicationGroup replicationGroup) { super(new GlobalCheckpointSyncAction.PrimaryRequest(replicationGroup.getPrimary().shardId()), listener, replicationGroup, "global_ckp"); } @Override protected PrimaryResult performOnPrimary(IndexShard primary, GlobalCheckpointSyncAction.PrimaryRequest request) throws Exception { return new PrimaryResult(new GlobalCheckpointSyncAction.ReplicaRequest(request, primary.getGlobalCheckpoint()), new ReplicationResponse()); } @Override protected void performOnReplica(GlobalCheckpointSyncAction.ReplicaRequest request, IndexShard replica) { replica.updateGlobalCheckpointOnReplica(request.getCheckpoint()); } } }