summaryrefslogtreecommitdiff
path: root/plugins/repository-azure/src/main/java/org/elasticsearch/cloud/azure/storage/AzureStorageServiceImpl.java
blob: b7206d3659bbeac7b2b359f2bf9ca65e469e1e8d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
/*
 * Licensed to Elasticsearch under one or more contributor
 * license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright
 * ownership. Elasticsearch licenses this file to you under
 * the Apache License, Version 2.0 (the "License"); you may
 * not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package org.elasticsearch.cloud.azure.storage;

import com.microsoft.azure.storage.CloudStorageAccount;
import com.microsoft.azure.storage.LocationMode;
import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.blob.BlobProperties;
import com.microsoft.azure.storage.blob.CloudBlobClient;
import com.microsoft.azure.storage.blob.CloudBlobContainer;
import com.microsoft.azure.storage.blob.CloudBlockBlob;
import com.microsoft.azure.storage.blob.ListBlobItem;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.support.PlainBlobMetaData;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.repositories.RepositoryException;

import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Map;

public class AzureStorageServiceImpl extends AbstractComponent implements AzureStorageService {

    final AzureStorageSettings primaryStorageSettings;
    final Map<String, AzureStorageSettings> secondariesStorageSettings;

    final Map<String, CloudBlobClient> clients;

    public AzureStorageServiceImpl(Settings settings) {
        super(settings);

        Tuple<AzureStorageSettings, Map<String, AzureStorageSettings>> storageSettings = AzureStorageSettings.parse(settings);
        this.primaryStorageSettings = storageSettings.v1();
        this.secondariesStorageSettings = storageSettings.v2();

        this.clients = new HashMap<>();

        logger.debug("starting azure storage client instance");

        // We register the primary client if any
        if (primaryStorageSettings != null) {
            logger.debug("registering primary client for account [{}]", primaryStorageSettings.getAccount());
            createClient(primaryStorageSettings);
        }

        // We register all secondary clients
        for (Map.Entry<String, AzureStorageSettings> azureStorageSettingsEntry : secondariesStorageSettings.entrySet()) {
            logger.debug("registering secondary client for account [{}]", azureStorageSettingsEntry.getKey());
            createClient(azureStorageSettingsEntry.getValue());
        }
    }

    void createClient(AzureStorageSettings azureStorageSettings) {
        try {
            logger.trace("creating new Azure storage client using account [{}], key [{}]",
                    azureStorageSettings.getAccount(), azureStorageSettings.getKey());

            String storageConnectionString =
                    "DefaultEndpointsProtocol=https;"
                            + "AccountName="+ azureStorageSettings.getAccount() +";"
                            + "AccountKey=" + azureStorageSettings.getKey();

            // Retrieve storage account from connection-string.
            CloudStorageAccount storageAccount = CloudStorageAccount.parse(storageConnectionString);

            // Create the blob client.
            CloudBlobClient client = storageAccount.createCloudBlobClient();

            // Register the client
            this.clients.put(azureStorageSettings.getAccount(), client);
        } catch (Exception e) {
            logger.error("can not create azure storage client: {}", e.getMessage());
        }
    }

    CloudBlobClient getSelectedClient(String account, LocationMode mode) {
        logger.trace("selecting a client for account [{}], mode [{}]", account, mode.name());
        AzureStorageSettings azureStorageSettings = null;

        if (this.primaryStorageSettings == null) {
            throw new IllegalArgumentException("No primary azure storage can be found. Check your elasticsearch.yml.");
        }

        if (Strings.hasLength(account)) {
            azureStorageSettings = this.secondariesStorageSettings.get(account);
        }

        // if account is not secondary, it's the primary
        if (azureStorageSettings == null) {
            if (Strings.hasLength(account) == false || primaryStorageSettings.getName() == null || account.equals(primaryStorageSettings.getName())) {
                azureStorageSettings = primaryStorageSettings;
            }
        }

        if (azureStorageSettings == null) {
            // We did not get an account. That's bad.
            throw new IllegalArgumentException("Can not find azure account [" + account + "]. Check your elasticsearch.yml.");
        }

        CloudBlobClient client = this.clients.get(azureStorageSettings.getAccount());

        if (client == null) {
            throw new IllegalArgumentException("Can not find an azure client for account [" + account + "]");
        }

        // NOTE: for now, just set the location mode in case it is different;
        // only one mode per storage account can be active at a time
        client.getDefaultRequestOptions().setLocationMode(mode);

        // Set timeout option if the user sets cloud.azure.storage.timeout or cloud.azure.storage.xxx.timeout (it's negative by default)
        if (azureStorageSettings.getTimeout().getSeconds() > 0) {
            try {
                int timeout = (int) azureStorageSettings.getTimeout().getMillis();
                client.getDefaultRequestOptions().setTimeoutIntervalInMs(timeout);
            } catch (ClassCastException e) {
                throw new IllegalArgumentException("Can not convert [" + azureStorageSettings.getTimeout() +
                    "]. It can not be longer than 2,147,483,647ms.");
            }
        }
        return client;
    }

    @Override
    public boolean doesContainerExist(String account, LocationMode mode, String container) {
        try {
            CloudBlobClient client = this.getSelectedClient(account, mode);
            CloudBlobContainer blobContainer = client.getContainerReference(container);
            return blobContainer.exists();
        } catch (Exception e) {
            logger.error("can not access container [{}]", container);
        }
        return false;
    }

    @Override
    public void removeContainer(String account, LocationMode mode, String container) throws URISyntaxException, StorageException {
        CloudBlobClient client = this.getSelectedClient(account, mode);
        CloudBlobContainer blobContainer = client.getContainerReference(container);
        logger.trace("removing container [{}]", container);
        blobContainer.deleteIfExists();
    }

    @Override
    public void createContainer(String account, LocationMode mode, String container) throws URISyntaxException, StorageException {
        try {
            CloudBlobClient client = this.getSelectedClient(account, mode);
            CloudBlobContainer blobContainer = client.getContainerReference(container);
            logger.trace("creating container [{}]", container);
            blobContainer.createIfNotExists();
        } catch (IllegalArgumentException e) {
            logger.trace(() -> new ParameterizedMessage("fails creating container [{}]", container), e);
            throw new RepositoryException(container, e.getMessage());
        }
    }

    @Override
    public void deleteFiles(String account, LocationMode mode, String container, String path) throws URISyntaxException, StorageException {
        logger.trace("delete files container [{}], path [{}]", container, path);

        // Container name must be lower case.
        CloudBlobClient client = this.getSelectedClient(account, mode);
        CloudBlobContainer blobContainer = client.getContainerReference(container);
        if (blobContainer.exists()) {
            // We list the blobs using a flat blob listing mode
            for (ListBlobItem blobItem : blobContainer.listBlobs(path, true)) {
                String blobName = blobNameFromUri(blobItem.getUri());
                logger.trace("removing blob [{}] full URI was [{}]", blobName, blobItem.getUri());
                deleteBlob(account, mode, container, blobName);
            }
        }
    }

    /**
     * Extract the blob name from a URI like https://myservice.azure.net/container/path/to/myfile
     * It should remove the container part (first part of the path) and gives path/to/myfile
     * @param uri URI to parse
     * @return The blob name relative to the container
     */
    public static String blobNameFromUri(URI uri) {
        String path = uri.getPath();

        // We remove the container name from the path
        // The 3 magic number cames from the fact if path is /container/path/to/myfile
        // First occurrence is empty "/"
        // Second occurrence is "container
        // Last part contains "path/to/myfile" which is what we want to get
        String[] splits = path.split("/", 3);

        // We return the remaining end of the string
        return splits[2];
    }

    @Override
    public boolean blobExists(String account, LocationMode mode, String container, String blob) throws URISyntaxException, StorageException {
        // Container name must be lower case.
        CloudBlobClient client = this.getSelectedClient(account, mode);
        CloudBlobContainer blobContainer = client.getContainerReference(container);
        if (blobContainer.exists()) {
            CloudBlockBlob azureBlob = blobContainer.getBlockBlobReference(blob);
            return azureBlob.exists();
        }

        return false;
    }

    @Override
    public void deleteBlob(String account, LocationMode mode, String container, String blob) throws URISyntaxException, StorageException {
        logger.trace("delete blob for container [{}], blob [{}]", container, blob);

        // Container name must be lower case.
        CloudBlobClient client = this.getSelectedClient(account, mode);
        CloudBlobContainer blobContainer = client.getContainerReference(container);
        if (blobContainer.exists()) {
            logger.trace("container [{}]: blob [{}] found. removing.", container, blob);
            CloudBlockBlob azureBlob = blobContainer.getBlockBlobReference(blob);
            azureBlob.delete();
        }
    }

    @Override
    public InputStream getInputStream(String account, LocationMode mode, String container, String blob) throws URISyntaxException, StorageException {
        logger.trace("reading container [{}], blob [{}]", container, blob);
        CloudBlobClient client = this.getSelectedClient(account, mode);
        return client.getContainerReference(container).getBlockBlobReference(blob).openInputStream();
    }

    @Override
    public OutputStream getOutputStream(String account, LocationMode mode, String container, String blob) throws URISyntaxException, StorageException {
        logger.trace("writing container [{}], blob [{}]", container, blob);
        CloudBlobClient client = this.getSelectedClient(account, mode);
        return client.getContainerReference(container).getBlockBlobReference(blob).openOutputStream();
    }

    @Override
    public Map<String, BlobMetaData> listBlobsByPrefix(String account, LocationMode mode, String container, String keyPath, String prefix) throws URISyntaxException, StorageException {
        // NOTE: this should be here: if (prefix == null) prefix = "";
        // however, this is really inefficient since deleteBlobsByPrefix enumerates everything and
        // then does a prefix match on the result; it should just call listBlobsByPrefix with the prefix!

        logger.debug("listing container [{}], keyPath [{}], prefix [{}]", container, keyPath, prefix);
        MapBuilder<String, BlobMetaData> blobsBuilder = MapBuilder.newMapBuilder();

        CloudBlobClient client = this.getSelectedClient(account, mode);
        CloudBlobContainer blobContainer = client.getContainerReference(container);
        if (blobContainer.exists()) {
            for (ListBlobItem blobItem : blobContainer.listBlobs(keyPath + (prefix == null ? "" : prefix))) {
                URI uri = blobItem.getUri();
                logger.trace("blob url [{}]", uri);

                // uri.getPath is of the form /container/keyPath.* and we want to strip off the /container/
                // this requires 1 + container.length() + 1, with each 1 corresponding to one of the /
                String blobPath = uri.getPath().substring(1 + container.length() + 1);

                CloudBlockBlob blob = blobContainer.getBlockBlobReference(blobPath);

                // fetch the blob attributes from Azure (getBlockBlobReference does not do this)
                // this is needed to retrieve the blob length (among other metadata) from Azure Storage
                blob.downloadAttributes();

                BlobProperties properties = blob.getProperties();
                String name = blobPath.substring(keyPath.length());
                logger.trace("blob url [{}], name [{}], size [{}]", uri, name, properties.getLength());
                blobsBuilder.put(name, new PlainBlobMetaData(name, properties.getLength()));
            }
        }

        return blobsBuilder.immutableMap();
    }

    @Override
    public void moveBlob(String account, LocationMode mode, String container, String sourceBlob, String targetBlob) throws URISyntaxException, StorageException {
        logger.debug("moveBlob container [{}], sourceBlob [{}], targetBlob [{}]", container, sourceBlob, targetBlob);

        CloudBlobClient client = this.getSelectedClient(account, mode);
        CloudBlobContainer blobContainer = client.getContainerReference(container);
        CloudBlockBlob blobSource = blobContainer.getBlockBlobReference(sourceBlob);
        if (blobSource.exists()) {
            CloudBlockBlob blobTarget = blobContainer.getBlockBlobReference(targetBlob);
            blobTarget.startCopy(blobSource);
            blobSource.delete();
            logger.debug("moveBlob container [{}], sourceBlob [{}], targetBlob [{}] -> done", container, sourceBlob, targetBlob);
        }
    }
}