summaryrefslogtreecommitdiff
path: root/core/src/main/java/org/elasticsearch/action/bulk/Retry.java
blob: 477e61045bab849525729efff8c2aeaaeb1abab2 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
/*
 * Licensed to Elasticsearch under one or more contributor
 * license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright
 * ownership. Elasticsearch licenses this file to you under
 * the Apache License, Version 2.0 (the "License"); you may
 * not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.elasticsearch.action.bulk;

import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.*;
import java.util.function.Predicate;

/**
 * Encapsulates synchronous and asynchronous retry logic.
 */
class Retry {
    private final Class<? extends Throwable> retryOnThrowable;

    private BackoffPolicy backoffPolicy;

    public static Retry on(Class<? extends Throwable> retryOnThrowable) {
        return new Retry(retryOnThrowable);
    }

    /**
     * @param backoffPolicy The backoff policy that defines how long and how often to wait for retries.
     */
    public Retry policy(BackoffPolicy backoffPolicy) {
        this.backoffPolicy = backoffPolicy;
        return this;
    }

    Retry(Class<? extends Throwable> retryOnThrowable) {
        this.retryOnThrowable = retryOnThrowable;
    }

    /**
     * Invokes #bulk(BulkRequest, ActionListener) on the provided client. Backs off on the provided exception and delegates results to the
     * provided listener.
     *
     * @param client      Client invoking the bulk request.
     * @param bulkRequest The bulk request that should be executed.
     * @param listener    A listener that is invoked when the bulk request finishes or completes with an exception. The listener is not
     */
    public void withAsyncBackoff(Client client, BulkRequest bulkRequest, ActionListener<BulkResponse> listener) {
        AsyncRetryHandler r = new AsyncRetryHandler(retryOnThrowable, backoffPolicy, client, listener);
        r.execute(bulkRequest);

    }

    /**
     * Invokes #bulk(BulkRequest) on the provided client. Backs off on the provided exception.
     *
     * @param client      Client invoking the bulk request.
     * @param bulkRequest The bulk request that should be executed.
     * @return the bulk response as returned by the client.
     * @throws Exception Any exception thrown by the callable.
     */
    public BulkResponse withSyncBackoff(Client client, BulkRequest bulkRequest) throws Exception {
        return SyncRetryHandler
                .create(retryOnThrowable, backoffPolicy, client)
                .executeBlocking(bulkRequest)
                .actionGet();
    }

    static class AbstractRetryHandler implements ActionListener<BulkResponse> {
        private final ESLogger logger;
        private final Client client;
        private final ActionListener<BulkResponse> listener;
        private final Iterator<TimeValue> backoff;
        private final Class<? extends Throwable> retryOnThrowable;
        // Access only when holding a client-side lock, see also #addResponses()
        private final List<BulkItemResponse> responses = new ArrayList<>();
        private final long startTimestampNanos;
        // needed to construct the next bulk request based on the response to the previous one
        // volatile as we're called from a scheduled thread
        private volatile BulkRequest currentBulkRequest;
        private volatile ScheduledFuture<?> scheduledRequestFuture;

        public AbstractRetryHandler(Class<? extends Throwable> retryOnThrowable, BackoffPolicy backoffPolicy, Client client, ActionListener<BulkResponse> listener) {
            this.retryOnThrowable = retryOnThrowable;
            this.backoff = backoffPolicy.iterator();
            this.client = client;
            this.listener = listener;
            this.logger = Loggers.getLogger(getClass(), client.settings());
            // in contrast to System.currentTimeMillis(), nanoTime() uses a monotonic clock under the hood
            this.startTimestampNanos = System.nanoTime();
        }

        @Override
        public void onResponse(BulkResponse bulkItemResponses) {
            if (!bulkItemResponses.hasFailures()) {
                // we're done here, include all responses
                addResponses(bulkItemResponses, (r -> true));
                finishHim();
            } else {
                if (canRetry(bulkItemResponses)) {
                    addResponses(bulkItemResponses, (r -> !r.isFailed()));
                    retry(createBulkRequestForRetry(bulkItemResponses));
                } else {
                    addResponses(bulkItemResponses, (r -> true));
                    finishHim();
                }
            }
        }

        @Override
        public void onFailure(Throwable e) {
            try {
                listener.onFailure(e);
            } finally {
                FutureUtils.cancel(scheduledRequestFuture);
            }
        }

        private void retry(BulkRequest bulkRequestForRetry) {
            assert backoff.hasNext();
            TimeValue next = backoff.next();
            logger.trace("Retry of bulk request scheduled in {} ms.", next.millis());
            scheduledRequestFuture = client.threadPool().schedule(next, ThreadPool.Names.SAME, (() -> this.execute(bulkRequestForRetry)));
        }

        private BulkRequest createBulkRequestForRetry(BulkResponse bulkItemResponses) {
            BulkRequest requestToReissue = new BulkRequest();
            int index = 0;
            for (BulkItemResponse bulkItemResponse : bulkItemResponses.getItems()) {
                if (bulkItemResponse.isFailed()) {
                    requestToReissue.add(currentBulkRequest.requests().get(index));
                }
                index++;
            }
            return requestToReissue;
        }

        private boolean canRetry(BulkResponse bulkItemResponses) {
            if (!backoff.hasNext()) {
                return false;
            }
            for (BulkItemResponse bulkItemResponse : bulkItemResponses) {
                if (bulkItemResponse.isFailed()) {
                    Throwable cause = bulkItemResponse.getFailure().getCause();
                    Throwable rootCause = ExceptionsHelper.unwrapCause(cause);
                    if (!rootCause.getClass().equals(retryOnThrowable)) {
                        return false;
                    }
                }
            }
            return true;
        }

        private void finishHim() {
            try {
                listener.onResponse(getAccumulatedResponse());
            } finally {
                FutureUtils.cancel(scheduledRequestFuture);
            }
        }

        private void addResponses(BulkResponse response, Predicate<BulkItemResponse> filter) {
            for (BulkItemResponse bulkItemResponse : response) {
                if (filter.test(bulkItemResponse)) {
                    // Use client-side lock here to avoid visibility issues. This method may be called multiple times
                    // (based on how many retries we have to issue) and relying that the response handling code will be
                    // scheduled on the same thread is fragile.
                    synchronized (responses) {
                        responses.add(bulkItemResponse);
                    }
                }
            }
        }

        private BulkResponse getAccumulatedResponse() {
            BulkItemResponse[] itemResponses;
            synchronized (responses) {
                itemResponses = responses.toArray(new BulkItemResponse[1]);
            }
            long stopTimestamp = System.nanoTime();
            long totalLatencyMs = TimeValue.timeValueNanos(stopTimestamp - startTimestampNanos).millis();
            return new BulkResponse(itemResponses, totalLatencyMs);
        }

        public void execute(BulkRequest bulkRequest) {
            this.currentBulkRequest = bulkRequest;
            client.bulk(bulkRequest, this);
        }
    }

    static class AsyncRetryHandler extends AbstractRetryHandler {
        public AsyncRetryHandler(Class<? extends Throwable> retryOnThrowable, BackoffPolicy backoffPolicy, Client client, ActionListener<BulkResponse> listener) {
            super(retryOnThrowable, backoffPolicy, client, listener);
        }
    }

    static class SyncRetryHandler extends AbstractRetryHandler {
        private final PlainActionFuture<BulkResponse> actionFuture;

        public static SyncRetryHandler create(Class<? extends Throwable> retryOnThrowable, BackoffPolicy backoffPolicy, Client client) {
            PlainActionFuture<BulkResponse> actionFuture = PlainActionFuture.newFuture();
            return new SyncRetryHandler(retryOnThrowable, backoffPolicy, client, actionFuture);
        }

        public SyncRetryHandler(Class<? extends Throwable> retryOnThrowable, BackoffPolicy backoffPolicy, Client client, PlainActionFuture<BulkResponse> actionFuture) {
            super(retryOnThrowable, backoffPolicy, client, actionFuture);
            this.actionFuture = actionFuture;
        }

        public ActionFuture<BulkResponse> executeBlocking(BulkRequest bulkRequest) {
            super.execute(bulkRequest);
            return actionFuture;
        }
    }
}