summaryrefslogtreecommitdiff
path: root/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java
blob: b880b04f3dada18d17a7374080aba899b9f08957 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
/*
 * Licensed to Elasticsearch under one or more contributor
 * license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright
 * ownership. Elasticsearch licenses this file to you under
 * the Apache License, Version 2.0 (the "License"); you may
 * not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package org.elasticsearch.cluster.routing.allocation.decider;

import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;

import static org.elasticsearch.cluster.routing.allocation.decider.Decision.THROTTLE;
import static org.elasticsearch.cluster.routing.allocation.decider.Decision.YES;

/**
 * {@link ThrottlingAllocationDecider} controls the recovery process per node in
 * the cluster. It exposes two settings via the cluster update API that allow
 * changes in real-time:
 * <ul>
 * <li><tt>cluster.routing.allocation.node_initial_primaries_recoveries</tt> -
 * restricts the number of initial primary shard recovery operations on a single
 * node. The default is <tt>4</tt></li>
 * <li><tt>cluster.routing.allocation.node_concurrent_recoveries</tt> -
 * restricts the number of total concurrent shards initializing on a single node. The
 * default is <tt>2</tt></li>
 * </ul>
 * <p>
 * If one of the above thresholds is exceeded per node this allocation decider
 * will return {@link Decision#THROTTLE} as a hit to upstream logic to throttle
 * the allocation process to prevent overloading nodes due to too many concurrent recovery
 * processes.
 */
public class ThrottlingAllocationDecider extends AllocationDecider {

    public static final int DEFAULT_CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES = 2;
    public static final int DEFAULT_CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES = 4;
    public static final String NAME = "throttling";
    public static final Setting<Integer> CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING =
        new Setting<>("cluster.routing.allocation.node_concurrent_recoveries",
            Integer.toString(DEFAULT_CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES),
            (s) -> Setting.parseInt(s, 0, "cluster.routing.allocation.node_concurrent_recoveries"),
            Property.Dynamic, Property.NodeScope);
    public static final Setting<Integer> CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING =
        Setting.intSetting("cluster.routing.allocation.node_initial_primaries_recoveries",
            DEFAULT_CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES, 0,
            Property.Dynamic, Property.NodeScope);
    public static final Setting<Integer> CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING =
        new Setting<>("cluster.routing.allocation.node_concurrent_incoming_recoveries",
            CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING::getRaw,
            (s) -> Setting.parseInt(s, 0, "cluster.routing.allocation.node_concurrent_incoming_recoveries"),
            Property.Dynamic, Property.NodeScope);
    public static final Setting<Integer> CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING =
        new Setting<>("cluster.routing.allocation.node_concurrent_outgoing_recoveries",
            CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING::getRaw,
            (s) -> Setting.parseInt(s, 0, "cluster.routing.allocation.node_concurrent_outgoing_recoveries"),
            Property.Dynamic, Property.NodeScope);


    private volatile int primariesInitialRecoveries;
    private volatile int concurrentIncomingRecoveries;
    private volatile int concurrentOutgoingRecoveries;


    @Inject
    public ThrottlingAllocationDecider(Settings settings, ClusterSettings clusterSettings) {
        super(settings);
        this.primariesInitialRecoveries = CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING.get(settings);
        concurrentIncomingRecoveries = CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING.get(settings);
        concurrentOutgoingRecoveries = CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING.get(settings);

        clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING,
                this::setPrimariesInitialRecoveries);
        clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING,
                this::setConcurrentIncomingRecoverries);
        clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING,
                this::setConcurrentOutgoingRecoverries);

        logger.debug("using node_concurrent_outgoing_recoveries [{}], node_concurrent_incoming_recoveries [{}], " +
                        "node_initial_primaries_recoveries [{}]",
                concurrentOutgoingRecoveries, concurrentIncomingRecoveries, primariesInitialRecoveries);
    }

    private void setConcurrentIncomingRecoverries(int concurrentIncomingRecoveries) {
        this.concurrentIncomingRecoveries = concurrentIncomingRecoveries;
    }
    private void setConcurrentOutgoingRecoverries(int concurrentOutgoingRecoveries) {
        this.concurrentOutgoingRecoveries = concurrentOutgoingRecoveries;
    }

    private void setPrimariesInitialRecoveries(int primariesInitialRecoveries) {
        this.primariesInitialRecoveries = primariesInitialRecoveries;
    }

    @Override
    public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
        if (shardRouting.primary() && shardRouting.unassigned()) {
            assert initializingShard(shardRouting, node.nodeId()).isPeerRecovery() == false;
            // primary is unassigned, means we are going to do recovery from store, snapshot or local shards
            // count *just the primaries* currently doing recovery on the node and check against primariesInitialRecoveries

            int primariesInRecovery = 0;
            for (ShardRouting shard : node) {
                // when a primary shard is INITIALIZING, it can be because of *initial recovery* or *relocation from another node*
                // we only count initial recoveries here, so we need to make sure that relocating node is null
                if (shard.initializing() && shard.primary() && shard.relocatingNodeId() == null) {
                    primariesInRecovery++;
                }
            }
            if (primariesInRecovery >= primariesInitialRecoveries) {
                // TODO: Should index creation not be throttled for primary shards?
                return allocation.decision(THROTTLE, NAME, "too many primaries are currently recovering [%d], limit: [%d]",
                    primariesInRecovery, primariesInitialRecoveries);
            } else {
                return allocation.decision(YES, NAME, "below primary recovery limit of [%d]", primariesInitialRecoveries);
            }
        } else {
            // Peer recovery
            assert initializingShard(shardRouting, node.nodeId()).isPeerRecovery();

            // Allocating a shard to this node will increase the incoming recoveries
            int currentInRecoveries = allocation.routingNodes().getIncomingRecoveries(node.nodeId());
            if (currentInRecoveries >= concurrentIncomingRecoveries) {
                return allocation.decision(THROTTLE, NAME, "too many incoming shards are currently recovering [%d], limit: [%d]",
                    currentInRecoveries, concurrentIncomingRecoveries);
            } else {
                // search for corresponding recovery source (= primary shard) and check number of outgoing recoveries on that node
                ShardRouting primaryShard = allocation.routingNodes().activePrimary(shardRouting.shardId());
                if (primaryShard == null) {
                    return allocation.decision(Decision.NO, NAME, "primary shard for this replica is not yet active");
                }
                int primaryNodeOutRecoveries = allocation.routingNodes().getOutgoingRecoveries(primaryShard.currentNodeId());
                if (primaryNodeOutRecoveries >= concurrentOutgoingRecoveries) {
                    return allocation.decision(THROTTLE, NAME, "too many outgoing shards are currently recovering [%d], limit: [%d]",
                        primaryNodeOutRecoveries, concurrentOutgoingRecoveries);
                } else {
                    return allocation.decision(YES, NAME, "below shard recovery limit of outgoing: [%d < %d] incoming: [%d < %d]",
                        primaryNodeOutRecoveries,
                        concurrentOutgoingRecoveries,
                        currentInRecoveries,
                        concurrentIncomingRecoveries);
                }
            }
        }
    }

    /**
     * The shard routing passed to {@link #canAllocate(ShardRouting, RoutingNode, RoutingAllocation)} is not the initializing shard to this
     * node but:
     * - the unassigned shard routing in case if we want to assign an unassigned shard to this node.
     * - the initializing shard routing if we want to assign the initializing shard to this node instead
     * - the started shard routing in case if we want to check if we can relocate to this node.
     * - the relocating shard routing if we want to relocate to this node now instead.
     *
     * This method returns the corresponding initializing shard that would be allocated to this node.
     */
    private ShardRouting initializingShard(ShardRouting shardRouting, String currentNodeId) {
        final ShardRouting initializingShard;
        if (shardRouting.unassigned()) {
            initializingShard = shardRouting.initialize(currentNodeId, null, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE);
        } else if (shardRouting.initializing()) {
            UnassignedInfo unassignedInfo = shardRouting.unassignedInfo();
            if (unassignedInfo == null) {
                // unassigned shards must have unassignedInfo (initializing shards might not)
                unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, "fake");
            }
            initializingShard = shardRouting.moveToUnassigned(unassignedInfo)
                .initialize(currentNodeId, null, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE);
        } else if (shardRouting.relocating()) {
            initializingShard = shardRouting.cancelRelocation()
                .relocate(currentNodeId, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE)
                .getTargetRelocatingShard();
        } else {
            assert shardRouting.started();
            initializingShard = shardRouting.relocate(currentNodeId, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE)
                .getTargetRelocatingShard();
        }
        assert initializingShard.initializing();
        return initializingShard;
    }
}