summaryrefslogtreecommitdiff
path: root/core/src/main/java/org/elasticsearch/common/lucene/ShardCoreKeyMap.java
blob: 92aa02ba0020dfb22a763cdb46d9c0c574d86e0e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
/*
 * Licensed to Elasticsearch under one or more contributor
 * license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright
 * ownership. Elasticsearch licenses this file to you under
 * the Apache License, Version 2.0 (the "License"); you may
 * not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package org.elasticsearch.common.lucene;

import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReader.CoreClosedListener;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardUtils;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.Map;
import java.util.Set;

/**
 * A map between segment core cache keys and the shard that these segments
 * belong to. This allows to get the shard that a segment belongs to or to get
 * the entire set of live core cache keys for a given index. In order to work
 * this class needs to be notified about new segments. It modifies the current
 * mappings as segments that were not known before are added and prevents the
 * structure from growing indefinitely by registering close listeners on these
 * segments so that at any time it only tracks live segments.
 *
 * NOTE: This is heavy. Avoid using this class unless absolutely required.
 */
public final class ShardCoreKeyMap {

    private final Map<Object, ShardId> coreKeyToShard;
    private final Map<String, Set<Object>> indexToCoreKey;

    public ShardCoreKeyMap() {
        coreKeyToShard = new IdentityHashMap<>();
        indexToCoreKey = new HashMap<>();
    }

    /**
     * Register a {@link LeafReader}. This is necessary so that the core cache
     * key of this reader can be found later using {@link #getCoreKeysForIndex(String)}.
     */
    public void add(LeafReader reader) {
        final ShardId shardId = ShardUtils.extractShardId(reader);
        if (shardId == null) {
            throw new IllegalArgumentException("Could not extract shard id from " + reader);
        }
        final Object coreKey = reader.getCoreCacheKey();
        final String index = shardId.getIndex();
        synchronized (this) {
            if (coreKeyToShard.put(coreKey, shardId) == null) {
                Set<Object> objects = indexToCoreKey.get(index);
                if (objects == null) {
                    objects = new HashSet<>();
                    indexToCoreKey.put(index, objects);
                }
                final boolean added = objects.add(coreKey);
                assert added;
                CoreClosedListener listener = ownerCoreCacheKey -> {
                    assert coreKey == ownerCoreCacheKey;
                    synchronized (ShardCoreKeyMap.this) {
                        coreKeyToShard.remove(ownerCoreCacheKey);
                        final Set<Object> coreKeys = indexToCoreKey.get(index);
                        final boolean removed = coreKeys.remove(coreKey);
                        assert removed;
                        if (coreKeys.isEmpty()) {
                            indexToCoreKey.remove(index);
                        }
                    }
                };
                boolean addedListener = false;
                try {
                    reader.addCoreClosedListener(listener);
                    addedListener = true;
                } finally {
                    if (false == addedListener) {
                        try {
                            listener.onClose(coreKey);
                        } catch (IOException e) {
                            throw new RuntimeException("Blow up trying to recover from failure to add listener", e);
                        }
                    }
                }
            }
        }
    }

    /**
     * Return the {@link ShardId} that holds the given segment, or {@code null}
     * if this segment is not tracked.
     */
    public synchronized ShardId getShardId(Object coreKey) {
        return coreKeyToShard.get(coreKey);
    }

    /**
     * Get the set of core cache keys associated with the given index.
     */
    public synchronized Set<Object> getCoreKeysForIndex(String index) {
        final Set<Object> objects = indexToCoreKey.get(index);
        if (objects == null) {
            return Collections.emptySet();
        }
        // we have to copy otherwise we risk ConcurrentModificationException
        return Collections.unmodifiableSet(new HashSet<>(objects));
    }

    /**
     * Return the number of tracked segments.
     */
    public synchronized int size() {
        assert assertSize();
        return coreKeyToShard.size();
    }

    private synchronized boolean assertSize() {
        // this is heavy and should only used in assertions
        boolean assertionsEnabled = false;
        assert assertionsEnabled = true;
        if (assertionsEnabled == false) {
            throw new AssertionError("only run this if assertions are enabled");
        }
        Collection<Set<Object>> values = indexToCoreKey.values();
        int size = 0;
        for (Set<Object> value : values) {
            size += value.size();
        }
        return size == coreKeyToShard.size();
    }

}