1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
|
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.rest.action.bulk;
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.bulk.BulkShardRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestResponse;
import org.elasticsearch.rest.action.support.RestBuilderListener;
import static org.elasticsearch.rest.RestRequest.Method.POST;
import static org.elasticsearch.rest.RestRequest.Method.PUT;
import static org.elasticsearch.rest.RestStatus.OK;
/**
* <pre>
* { "index" : { "_index" : "test", "_type" : "type1", "_id" : "1" }
* { "type1" : { "field1" : "value1" } }
* { "delete" : { "_index" : "test", "_type" : "type1", "_id" : "2" } }
* { "create" : { "_index" : "test", "_type" : "type1", "_id" : "1" }
* { "type1" : { "field1" : "value1" } }
* </pre>
*/
public class RestBulkAction extends BaseRestHandler {
private final boolean allowExplicitIndex;
@Inject
public RestBulkAction(Settings settings, RestController controller, Client client) {
super(settings, controller, client);
controller.registerHandler(POST, "/_bulk", this);
controller.registerHandler(PUT, "/_bulk", this);
controller.registerHandler(POST, "/{index}/_bulk", this);
controller.registerHandler(PUT, "/{index}/_bulk", this);
controller.registerHandler(POST, "/{index}/{type}/_bulk", this);
controller.registerHandler(PUT, "/{index}/{type}/_bulk", this);
this.allowExplicitIndex = settings.getAsBoolean("rest.action.multi.allow_explicit_index", true);
}
@Override
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) throws Exception {
BulkRequest bulkRequest = Requests.bulkRequest();
String defaultIndex = request.param("index");
String defaultType = request.param("type");
String defaultRouting = request.param("routing");
String fieldsParam = request.param("fields");
String[] defaultFields = fieldsParam != null ? Strings.commaDelimitedListToStringArray(fieldsParam) : null;
String consistencyLevel = request.param("consistency");
if (consistencyLevel != null) {
bulkRequest.consistencyLevel(WriteConsistencyLevel.fromString(consistencyLevel));
}
bulkRequest.timeout(request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT));
bulkRequest.refresh(request.paramAsBoolean("refresh", bulkRequest.refresh()));
bulkRequest.add(request.content(), defaultIndex, defaultType, defaultRouting, defaultFields, null, allowExplicitIndex);
client.bulk(bulkRequest, new RestBuilderListener<BulkResponse>(channel) {
@Override
public RestResponse buildResponse(BulkResponse response, XContentBuilder builder) throws Exception {
builder.startObject();
builder.field(Fields.TOOK, response.getTookInMillis());
builder.field(Fields.ERRORS, response.hasFailures());
builder.startArray(Fields.ITEMS);
for (BulkItemResponse itemResponse : response) {
builder.startObject();
itemResponse.toXContent(builder, request);
builder.endObject();
}
builder.endArray();
builder.endObject();
return new BytesRestResponse(OK, builder);
}
});
}
static final class Fields {
static final XContentBuilderString ITEMS = new XContentBuilderString("items");
static final XContentBuilderString ERRORS = new XContentBuilderString("errors");
static final XContentBuilderString TOOK = new XContentBuilderString("took");
}
}
|