summaryrefslogtreecommitdiff
path: root/core/src/main/java/org/elasticsearch/indices/flush/SyncedFlushService.java
blob: 90f2cb5073bbd8a5f313871f74bdca516ac3a48f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
/*
 * Licensed to Elasticsearch under one or more contributor
 * license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright
 * ownership. Elasticsearch licenses this file to you under
 * the Apache License, Version 2.0 (the "License"); you may
 * not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.elasticsearch.indices.flush;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.flush.SyncedFlushResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardNotFoundException;
import org.elasticsearch.indices.IndexClosedException;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.BaseTransportResponseHandler;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;

public class SyncedFlushService extends AbstractComponent implements IndexEventListener {

    private static final String PRE_SYNCED_FLUSH_ACTION_NAME = "internal:indices/flush/synced/pre";
    private static final String SYNCED_FLUSH_ACTION_NAME = "internal:indices/flush/synced/sync";
    private static final String IN_FLIGHT_OPS_ACTION_NAME = "internal:indices/flush/synced/in_flight";

    private final IndicesService indicesService;
    private final ClusterService clusterService;
    private final TransportService transportService;
    private final IndexNameExpressionResolver indexNameExpressionResolver;

    @Inject
    public SyncedFlushService(Settings settings, IndicesService indicesService, ClusterService clusterService, TransportService transportService, IndexNameExpressionResolver indexNameExpressionResolver) {
        super(settings);
        this.indicesService = indicesService;
        this.clusterService = clusterService;
        this.transportService = transportService;
        this.indexNameExpressionResolver = indexNameExpressionResolver;
        transportService.registerRequestHandler(PRE_SYNCED_FLUSH_ACTION_NAME, PreShardSyncedFlushRequest::new, ThreadPool.Names.FLUSH, new PreSyncedFlushTransportHandler());
        transportService.registerRequestHandler(SYNCED_FLUSH_ACTION_NAME, ShardSyncedFlushRequest::new, ThreadPool.Names.FLUSH, new SyncedFlushTransportHandler());
        transportService.registerRequestHandler(IN_FLIGHT_OPS_ACTION_NAME, InFlightOpsRequest::new, ThreadPool.Names.SAME, new InFlightOpCountTransportHandler());
    }

    @Override
    public void onShardInactive(final IndexShard indexShard) {
        // we only want to call sync flush once, so only trigger it when we are on a primary
        if (indexShard.routingEntry().primary()) {
            attemptSyncedFlush(indexShard.shardId(), new ActionListener<ShardsSyncedFlushResult>() {
                @Override
                public void onResponse(ShardsSyncedFlushResult syncedFlushResult) {
                    logger.trace("{} sync flush on inactive shard returned successfully for sync_id: {}", syncedFlushResult.getShardId(), syncedFlushResult.syncId());
                }

                @Override
                public void onFailure(Throwable e) {
                    logger.debug("{} sync flush on inactive shard failed", e, indexShard.shardId());
                }
            });
        }
    }

    /**
     * a utility method to perform a synced flush for all shards of multiple indices. see {@link #attemptSyncedFlush(ShardId, ActionListener)}
     * for more details.
     */
    public void attemptSyncedFlush(final String[] aliasesOrIndices, IndicesOptions indicesOptions, final ActionListener<SyncedFlushResponse> listener) {
        final ClusterState state = clusterService.state();
        final String[] concreteIndices = indexNameExpressionResolver.concreteIndices(state, indicesOptions, aliasesOrIndices);
        final Map<String, List<ShardsSyncedFlushResult>> results = ConcurrentCollections.newConcurrentMap();
        int totalNumberOfShards = 0;
        int numberOfShards = 0;
        for (String index : concreteIndices) {
            final IndexMetaData indexMetaData = state.metaData().index(index);
            totalNumberOfShards += indexMetaData.getTotalNumberOfShards();
            numberOfShards += indexMetaData.getNumberOfShards();
            results.put(index, Collections.synchronizedList(new ArrayList<>()));

        }
        if (numberOfShards == 0) {
            listener.onResponse(new SyncedFlushResponse(results));
            return;
        }
        final int finalTotalNumberOfShards = totalNumberOfShards;
        final CountDown countDown = new CountDown(numberOfShards);

        for (final String index : concreteIndices) {
            final IndexMetaData indexMetaData = state.metaData().index(index);
            final int indexNumberOfShards = indexMetaData.getNumberOfShards();
            for (int shard = 0; shard < indexNumberOfShards; shard++) {
                final ShardId shardId = new ShardId(indexMetaData.getIndex(), shard);
                attemptSyncedFlush(shardId, new ActionListener<ShardsSyncedFlushResult>() {
                    @Override
                    public void onResponse(ShardsSyncedFlushResult syncedFlushResult) {
                        results.get(index).add(syncedFlushResult);
                        if (countDown.countDown()) {
                            listener.onResponse(new SyncedFlushResponse(results));
                        }
                    }

                    @Override
                    public void onFailure(Throwable e) {
                        logger.debug("{} unexpected error while executing synced flush", shardId);
                        results.get(index).add(new ShardsSyncedFlushResult(shardId, finalTotalNumberOfShards, e.getMessage()));
                        if (countDown.countDown()) {
                            listener.onResponse(new SyncedFlushResponse(results));
                        }
                    }
                });
            }
        }
    }

    /*
    * Tries to flush all copies of a shard and write a sync id to it.
    * After a synced flush two shard copies may only contain the same sync id if they contain the same documents.
    * To ensure this, synced flush works in three steps:
    * 1. Flush all shard copies and gather the commit ids for each copy after the flush
    * 2. Ensure that there are no ongoing indexing operations on the primary
    * 3. Perform an additional flush on each shard copy that writes the sync id
    *
    * Step 3 is only executed on a shard if
    * a) the shard has no uncommitted changes since the last flush
    * b) the last flush was the one executed in 1 (use the collected commit id to verify this)
    *
    * This alone is not enough to ensure that all copies contain the same documents. Without step 2 a sync id would be written for inconsistent copies in the following scenario:
    *
    * Write operation has completed on a primary and is being sent to replicas. The write request does not reach the replicas until sync flush is finished.
    * Step 1 is executed. After the flush the commit points on primary contains a write operation that the replica does not have.
    * Step 3 will be executed on primary and replica as well because there are no uncommitted changes on primary (the first flush committed them) and there are no uncommitted
    * changes on the replica (the write operation has not reached the replica yet).
    *
    * Step 2 detects this scenario and fails the whole synced flush if a write operation is ongoing on the primary.
    * Together with the conditions for step 3 (same commit id and no uncommitted changes) this guarantees that a snc id will only
    * be written on a primary if no write operation was executed between step 1 and step 3 and sync id will only be written on
    * the replica if it contains the same changes that the primary contains.
    *
    * Synced flush is a best effort operation. The sync id may be written on all, some or none of the copies.
    **/
    public void attemptSyncedFlush(final ShardId shardId, final ActionListener<ShardsSyncedFlushResult> actionListener) {
        try {
            final ClusterState state = clusterService.state();
            final IndexShardRoutingTable shardRoutingTable = getShardRoutingTable(shardId, state);
            final List<ShardRouting> activeShards = shardRoutingTable.activeShards();
            final int totalShards = shardRoutingTable.getSize();

            if (activeShards.size() == 0) {
                actionListener.onResponse(new ShardsSyncedFlushResult(shardId, totalShards, "no active shards"));
                return;
            }

            final ActionListener<Map<String, Engine.CommitId>> commitIdsListener = new ActionListener<Map<String, Engine.CommitId>>() {
                @Override
                public void onResponse(final Map<String, Engine.CommitId> commitIds) {
                    if (commitIds.isEmpty()) {
                        actionListener.onResponse(new ShardsSyncedFlushResult(shardId, totalShards, "all shards failed to commit on pre-sync"));
                        return;
                    }
                    final ActionListener<InFlightOpsResponse> inflightOpsListener = new ActionListener<InFlightOpsResponse>() {
                        @Override
                        public void onResponse(InFlightOpsResponse response) {
                            final int inflight = response.opCount();
                            assert inflight >= 0;
                            if (inflight != 0) {
                                actionListener.onResponse(new ShardsSyncedFlushResult(shardId, totalShards, "[" + inflight + "] ongoing operations on primary"));
                            } else {
                                // 3. now send the sync request to all the shards
                                String syncId = Strings.base64UUID();
                                sendSyncRequests(syncId, activeShards, state, commitIds, shardId, totalShards, actionListener);
                            }
                        }

                        @Override
                        public void onFailure(Throwable e) {
                            actionListener.onFailure(e);
                        }
                    };
                    // 2. fetch in flight operations
                    getInflightOpsCount(shardId, state, shardRoutingTable, inflightOpsListener);
                }

                @Override
                public void onFailure(Throwable e) {
                    actionListener.onFailure(e);
                }
            };

            // 1. send pre-sync flushes to all replicas
            sendPreSyncRequests(activeShards, state, shardId, commitIdsListener);
        } catch (Throwable t) {
            actionListener.onFailure(t);
        }
    }

    final IndexShardRoutingTable getShardRoutingTable(ShardId shardId, ClusterState state) {
        final IndexRoutingTable indexRoutingTable = state.routingTable().index(shardId.getIndexName());
        if (indexRoutingTable == null) {
            IndexMetaData index = state.getMetaData().index(shardId.getIndexName());
            if (index != null && index.getState() == IndexMetaData.State.CLOSE) {
                throw new IndexClosedException(shardId.getIndex());
            }
            throw new IndexNotFoundException(shardId.getIndexName());
        }
        final IndexShardRoutingTable shardRoutingTable = indexRoutingTable.shard(shardId.id());
        if (shardRoutingTable == null) {
            throw new ShardNotFoundException(shardId);
        }
        return shardRoutingTable;
    }

    /**
     * returns the number of in flight operations on primary. -1 upon error.
     */
    protected void getInflightOpsCount(final ShardId shardId, ClusterState state, IndexShardRoutingTable shardRoutingTable, final ActionListener<InFlightOpsResponse> listener) {
        try {
            final ShardRouting primaryShard = shardRoutingTable.primaryShard();
            final DiscoveryNode primaryNode = state.nodes().get(primaryShard.currentNodeId());
            if (primaryNode == null) {
                logger.trace("{} failed to resolve node for primary shard {}, skipping sync", shardId, primaryShard);
                listener.onResponse(new InFlightOpsResponse(-1));
                return;
            }
            logger.trace("{} retrieving in flight operation count", shardId);
            transportService.sendRequest(primaryNode, IN_FLIGHT_OPS_ACTION_NAME, new InFlightOpsRequest(shardId),
                    new BaseTransportResponseHandler<InFlightOpsResponse>() {
                        @Override
                        public InFlightOpsResponse newInstance() {
                            return new InFlightOpsResponse();
                        }

                        @Override
                        public void handleResponse(InFlightOpsResponse response) {
                            listener.onResponse(response);
                        }

                        @Override
                        public void handleException(TransportException exp) {
                            logger.debug("{} unexpected error while retrieving in flight op count", shardId);
                            listener.onFailure(exp);
                        }

                        @Override
                        public String executor() {
                            return ThreadPool.Names.SAME;
                        }
                    });
        } catch (Throwable t) {
            listener.onFailure(t);
        }
    }


    void sendSyncRequests(final String syncId, final List<ShardRouting> shards, ClusterState state, Map<String, Engine.CommitId> expectedCommitIds,
                          final ShardId shardId, final int totalShards, final ActionListener<ShardsSyncedFlushResult> listener) {
        final CountDown countDown = new CountDown(shards.size());
        final Map<ShardRouting, ShardSyncedFlushResponse> results = ConcurrentCollections.newConcurrentMap();
        for (final ShardRouting shard : shards) {
            final DiscoveryNode node = state.nodes().get(shard.currentNodeId());
            if (node == null) {
                logger.trace("{} is assigned to an unknown node. skipping for sync id [{}]. shard routing {}", shardId, syncId, shard);
                results.put(shard, new ShardSyncedFlushResponse("unknown node"));
                contDownAndSendResponseIfDone(syncId, shards, shardId, totalShards, listener, countDown, results);
                continue;
            }
            final Engine.CommitId expectedCommitId = expectedCommitIds.get(shard.currentNodeId());
            if (expectedCommitId == null) {
                logger.trace("{} can't resolve expected commit id for {}, skipping for sync id [{}]. shard routing {}", shardId, syncId, shard);
                results.put(shard, new ShardSyncedFlushResponse("no commit id from pre-sync flush"));
                contDownAndSendResponseIfDone(syncId, shards, shardId, totalShards, listener, countDown, results);
                continue;
            }
            logger.trace("{} sending synced flush request to {}. sync id [{}].", shardId, shard, syncId);
            transportService.sendRequest(node, SYNCED_FLUSH_ACTION_NAME, new ShardSyncedFlushRequest(shard.shardId(), syncId, expectedCommitId),
                    new BaseTransportResponseHandler<ShardSyncedFlushResponse>() {
                        @Override
                        public ShardSyncedFlushResponse newInstance() {
                            return new ShardSyncedFlushResponse();
                        }

                        @Override
                        public void handleResponse(ShardSyncedFlushResponse response) {
                            ShardSyncedFlushResponse existing = results.put(shard, response);
                            assert existing == null : "got two answers for node [" + node + "]";
                            // count after the assert so we won't decrement twice in handleException
                            contDownAndSendResponseIfDone(syncId, shards, shardId, totalShards, listener, countDown, results);
                        }

                        @Override
                        public void handleException(TransportException exp) {
                            logger.trace("{} error while performing synced flush on [{}], skipping", exp, shardId, shard);
                            results.put(shard, new ShardSyncedFlushResponse(exp.getMessage()));
                            contDownAndSendResponseIfDone(syncId, shards, shardId, totalShards, listener, countDown, results);
                        }

                        @Override
                        public String executor() {
                            return ThreadPool.Names.SAME;
                        }
                    });
        }

    }

    private void contDownAndSendResponseIfDone(String syncId, List<ShardRouting> shards, ShardId shardId, int totalShards,
            ActionListener<ShardsSyncedFlushResult> listener, CountDown countDown, Map<ShardRouting, ShardSyncedFlushResponse> results) {
        if (countDown.countDown()) {
            assert results.size() == shards.size();
            listener.onResponse(new ShardsSyncedFlushResult(shardId, syncId, totalShards, results));
        }
    }

    /**
     * send presync requests to all started copies of the given shard
     */
    void sendPreSyncRequests(final List<ShardRouting> shards, final ClusterState state, final ShardId shardId, final ActionListener<Map<String, Engine.CommitId>> listener) {
        final CountDown countDown = new CountDown(shards.size());
        final ConcurrentMap<String, Engine.CommitId> commitIds = ConcurrentCollections.newConcurrentMap();
        for (final ShardRouting shard : shards) {
            logger.trace("{} sending pre-synced flush request to {}", shardId, shard);
            final DiscoveryNode node = state.nodes().get(shard.currentNodeId());
            if (node == null) {
                logger.trace("{} shard routing {} refers to an unknown node. skipping.", shardId, shard);
                if (countDown.countDown()) {
                    listener.onResponse(commitIds);
                }
                continue;
            }
            transportService.sendRequest(node, PRE_SYNCED_FLUSH_ACTION_NAME, new PreShardSyncedFlushRequest(shard.shardId()), new BaseTransportResponseHandler<PreSyncedFlushResponse>() {
                @Override
                public PreSyncedFlushResponse newInstance() {
                    return new PreSyncedFlushResponse();
                }

                @Override
                public void handleResponse(PreSyncedFlushResponse response) {
                    Engine.CommitId existing = commitIds.putIfAbsent(node.id(), response.commitId());
                    assert existing == null : "got two answers for node [" + node + "]";
                    // count after the assert so we won't decrement twice in handleException
                    if (countDown.countDown()) {
                        listener.onResponse(commitIds);
                    }
                }

                @Override
                public void handleException(TransportException exp) {
                    logger.trace("{} error while performing pre synced flush on [{}], skipping", exp, shardId, shard);
                    if (countDown.countDown()) {
                        listener.onResponse(commitIds);
                    }
                }

                @Override
                public String executor() {
                    return ThreadPool.Names.SAME;
                }
            });
        }
    }

    private PreSyncedFlushResponse performPreSyncedFlush(PreShardSyncedFlushRequest request) {
        IndexShard indexShard = indicesService.indexServiceSafe(request.shardId().getIndex()).getShard(request.shardId().id());
        FlushRequest flushRequest = new FlushRequest().force(false).waitIfOngoing(true);
        logger.trace("{} performing pre sync flush", request.shardId());
        Engine.CommitId commitId = indexShard.flush(flushRequest);
        logger.trace("{} pre sync flush done. commit id {}", request.shardId(), commitId);
        return new PreSyncedFlushResponse(commitId);
    }

    private ShardSyncedFlushResponse performSyncedFlush(ShardSyncedFlushRequest request) {
        IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());
        IndexShard indexShard = indexService.getShard(request.shardId().id());
        logger.trace("{} performing sync flush. sync id [{}], expected commit id {}", request.shardId(), request.syncId(), request.expectedCommitId());
        Engine.SyncedFlushResult result = indexShard.syncFlush(request.syncId(), request.expectedCommitId());
        logger.trace("{} sync flush done. sync id [{}], result [{}]", request.shardId(), request.syncId(), result);
        switch (result) {
            case SUCCESS:
                return new ShardSyncedFlushResponse();
            case COMMIT_MISMATCH:
                return new ShardSyncedFlushResponse("commit has changed");
            case PENDING_OPERATIONS:
                return new ShardSyncedFlushResponse("pending operations");
            default:
                throw new ElasticsearchException("unknown synced flush result [" + result + "]");
        }
    }

    private InFlightOpsResponse performInFlightOps(InFlightOpsRequest request) {
        IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());
        IndexShard indexShard = indexService.getShard(request.shardId().id());
        if (indexShard.routingEntry().primary() == false) {
            throw new IllegalStateException("[" + request.shardId() +"] expected a primary shard");
        }
        int opCount = indexShard.getOperationsCount();
        logger.trace("{} in flight operations sampled at [{}]", request.shardId(), opCount);
        return new InFlightOpsResponse(opCount);
    }

    public final static class PreShardSyncedFlushRequest extends TransportRequest {
        private ShardId shardId;

        public PreShardSyncedFlushRequest() {
        }

        public PreShardSyncedFlushRequest(ShardId shardId) {
            this.shardId = shardId;
        }

        @Override
        public String toString() {
            return "PreShardSyncedFlushRequest{" +
                    "shardId=" + shardId +
                    '}';
        }

        @Override
        public void writeTo(StreamOutput out) throws IOException {
            super.writeTo(out);
            shardId.writeTo(out);
        }

        @Override
        public void readFrom(StreamInput in) throws IOException {
            super.readFrom(in);
            this.shardId = ShardId.readShardId(in);
        }

        public ShardId shardId() {
            return shardId;
        }
    }

    /**
     * Response for first step of synced flush (flush) for one shard copy
     */
    final static class PreSyncedFlushResponse extends TransportResponse {

        Engine.CommitId commitId;

        PreSyncedFlushResponse() {
        }

        PreSyncedFlushResponse(Engine.CommitId commitId) {
            this.commitId = commitId;
        }

        public Engine.CommitId commitId() {
            return commitId;
        }

        @Override
        public void readFrom(StreamInput in) throws IOException {
            super.readFrom(in);
            commitId = new Engine.CommitId(in);
        }

        @Override
        public void writeTo(StreamOutput out) throws IOException {
            super.writeTo(out);
            commitId.writeTo(out);
        }
    }

    public static final class ShardSyncedFlushRequest extends TransportRequest {

        private String syncId;
        private Engine.CommitId expectedCommitId;
        private ShardId shardId;

        public ShardSyncedFlushRequest() {
        }

        public ShardSyncedFlushRequest(ShardId shardId, String syncId, Engine.CommitId expectedCommitId) {
            this.expectedCommitId = expectedCommitId;
            this.shardId = shardId;
            this.syncId = syncId;
        }

        @Override
        public void readFrom(StreamInput in) throws IOException {
            super.readFrom(in);
            shardId = ShardId.readShardId(in);
            expectedCommitId = new Engine.CommitId(in);
            syncId = in.readString();
        }

        @Override
        public void writeTo(StreamOutput out) throws IOException {
            super.writeTo(out);
            shardId.writeTo(out);
            expectedCommitId.writeTo(out);
            out.writeString(syncId);
        }

        public ShardId shardId() {
            return shardId;
        }

        public String syncId() {
            return syncId;
        }

        public Engine.CommitId expectedCommitId() {
            return expectedCommitId;
        }

        @Override
        public String toString() {
            return "ShardSyncedFlushRequest{" +
                    "shardId=" + shardId +
                    ",syncId='" + syncId + '\'' +
                    '}';
        }
    }

    /**
     * Response for third step of synced flush (writing the sync id) for one shard copy
     */
    public static final class ShardSyncedFlushResponse extends TransportResponse {

        /**
         * a non null value indicates a failure to sync flush. null means success
         */
        String failureReason;

        public ShardSyncedFlushResponse() {
            failureReason = null;
        }

        public ShardSyncedFlushResponse(String failureReason) {
            this.failureReason = failureReason;
        }

        @Override
        public void readFrom(StreamInput in) throws IOException {
            super.readFrom(in);
            failureReason = in.readOptionalString();
        }

        @Override
        public void writeTo(StreamOutput out) throws IOException {
            super.writeTo(out);
            out.writeOptionalString(failureReason);
        }

        public boolean success() {
            return failureReason == null;
        }

        public String failureReason() {
            return failureReason;
        }

        @Override
        public String toString() {
            return "ShardSyncedFlushResponse{" +
                    "success=" + success() +
                    ", failureReason='" + failureReason + '\'' +
                    '}';
        }

        public static ShardSyncedFlushResponse readSyncedFlushResponse(StreamInput in) throws IOException {
            ShardSyncedFlushResponse shardSyncedFlushResponse = new ShardSyncedFlushResponse();
            shardSyncedFlushResponse.readFrom(in);
            return shardSyncedFlushResponse;
        }
    }


    public static final class InFlightOpsRequest extends TransportRequest {

        private ShardId shardId;

        public InFlightOpsRequest() {
        }

        public InFlightOpsRequest(ShardId shardId) {
            this.shardId = shardId;
        }

        @Override
        public void readFrom(StreamInput in) throws IOException {
            super.readFrom(in);
            shardId = ShardId.readShardId(in);
        }

        @Override
        public void writeTo(StreamOutput out) throws IOException {
            super.writeTo(out);
            shardId.writeTo(out);
        }

        public ShardId shardId() {
            return shardId;
        }

        @Override
        public String toString() {
            return "InFlightOpsRequest{" +
                    "shardId=" + shardId +
                    '}';
        }
    }

    /**
     * Response for second step of synced flush (check operations in flight)
     */
    static final class InFlightOpsResponse extends TransportResponse {

        int opCount;

        public InFlightOpsResponse() {
        }

        public InFlightOpsResponse(int opCount) {
            this.opCount = opCount;
        }

        @Override
        public void readFrom(StreamInput in) throws IOException {
            super.readFrom(in);
            opCount = in.readVInt();
        }

        @Override
        public void writeTo(StreamOutput out) throws IOException {
            super.writeTo(out);
            out.writeVInt(opCount);
        }

        public int opCount() {
            return opCount;
        }

        @Override
        public String toString() {
            return "InFlightOpsResponse{" +
                    "opCount=" + opCount +
                    '}';
        }
    }

    private final class PreSyncedFlushTransportHandler implements TransportRequestHandler<PreShardSyncedFlushRequest> {

        @Override
        public void messageReceived(PreShardSyncedFlushRequest request, TransportChannel channel) throws Exception {
            channel.sendResponse(performPreSyncedFlush(request));
        }
    }

    private final class SyncedFlushTransportHandler implements TransportRequestHandler<ShardSyncedFlushRequest> {

        @Override
        public void messageReceived(ShardSyncedFlushRequest request, TransportChannel channel) throws Exception {
            channel.sendResponse(performSyncedFlush(request));
        }
    }

    private final class InFlightOpCountTransportHandler implements TransportRequestHandler<InFlightOpsRequest> {

        @Override
        public void messageReceived(InFlightOpsRequest request, TransportChannel channel) throws Exception {
            channel.sendResponse(performInFlightOps(request));
        }
    }

}