summaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorNik Everett <nik9000@gmail.com>2016-02-01 11:13:27 -0500
committerNik Everett <nik9000@gmail.com>2016-02-01 11:13:27 -0500
commita034e12bfa04044dead41956f14d0d53d7bb7438 (patch)
tree71143d9b3589cbe91c6d272d42ac4c5988438796 /core
parent0d6d77328d6a8e9cc41ed8c72aadc3b580e6418f (diff)
parentee35972f3d2274958d7fc9b542a1d98f7cbd4a00 (diff)
Merge pull request #16100 from nik9000/reindex_wait_for_completion
Add wait_for_completion to reindex
Diffstat (limited to 'core')
-rw-r--r--core/src/main/java/org/elasticsearch/action/ActionListener.java8
-rw-r--r--core/src/main/java/org/elasticsearch/action/support/TransportAction.java30
-rw-r--r--core/src/main/java/org/elasticsearch/tasks/LoggingTaskListener.java54
-rw-r--r--core/src/main/java/org/elasticsearch/tasks/TaskListener.java49
4 files changed, 136 insertions, 5 deletions
diff --git a/core/src/main/java/org/elasticsearch/action/ActionListener.java b/core/src/main/java/org/elasticsearch/action/ActionListener.java
index 0b3a69bb81..8447d6cef0 100644
--- a/core/src/main/java/org/elasticsearch/action/ActionListener.java
+++ b/core/src/main/java/org/elasticsearch/action/ActionListener.java
@@ -21,18 +21,16 @@ package org.elasticsearch.action;
/**
* A listener for action responses or failures.
- *
- *
*/
public interface ActionListener<Response> {
-
/**
- * A response handler.
+ * Handle action response. This response may constitute a failure or a
+ * success but it is up to the listener to make that decision.
*/
void onResponse(Response response);
/**
- * A failure handler.
+ * A failure caused by an exception at some phase of the task.
*/
void onFailure(Throwable e);
}
diff --git a/core/src/main/java/org/elasticsearch/action/support/TransportAction.java b/core/src/main/java/org/elasticsearch/action/support/TransportAction.java
index eb62903bf3..0f4b26eb6e 100644
--- a/core/src/main/java/org/elasticsearch/action/support/TransportAction.java
+++ b/core/src/main/java/org/elasticsearch/action/support/TransportAction.java
@@ -30,6 +30,7 @@ import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.tasks.Task;
+import org.elasticsearch.tasks.TaskListener;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.threadpool.ThreadPool;
@@ -67,6 +68,13 @@ public abstract class TransportAction<Request extends ActionRequest<Request>, Re
}
public final Task execute(Request request, ActionListener<Response> listener) {
+ /*
+ * While this version of execute could delegate to the TaskListener
+ * version of execute that'd add yet another layer of wrapping on the
+ * listener and prevent us from using the listener bare if there isn't a
+ * task. That just seems like too many objects. Thus the two versions of
+ * this method.
+ */
Task task = taskManager.register("transport", actionName, request);
if (task == null) {
execute(null, request, listener);
@@ -88,6 +96,28 @@ public abstract class TransportAction<Request extends ActionRequest<Request>, Re
return task;
}
+ public final Task execute(Request request, TaskListener<Response> listener) {
+ Task task = taskManager.register("transport", actionName, request);
+ execute(task, request, new ActionListener<Response>() {
+ @Override
+ public void onResponse(Response response) {
+ if (task != null) {
+ taskManager.unregister(task);
+ }
+ listener.onResponse(task, response);
+ }
+
+ @Override
+ public void onFailure(Throwable e) {
+ if (task != null) {
+ taskManager.unregister(task);
+ }
+ listener.onFailure(task, e);
+ }
+ });
+ return task;
+ }
+
private final void execute(Task task, Request request, ActionListener<Response> listener) {
ActionRequestValidationException validationException = request.validate();
diff --git a/core/src/main/java/org/elasticsearch/tasks/LoggingTaskListener.java b/core/src/main/java/org/elasticsearch/tasks/LoggingTaskListener.java
new file mode 100644
index 0000000000..b2016f094f
--- /dev/null
+++ b/core/src/main/java/org/elasticsearch/tasks/LoggingTaskListener.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.tasks;
+
+import org.elasticsearch.common.logging.ESLogger;
+import org.elasticsearch.common.logging.Loggers;
+
+/**
+ * A TaskListener that just logs the response at the info level. Used when we
+ * need a listener but aren't returning the result to the user.
+ */
+public final class LoggingTaskListener<Response> implements TaskListener<Response> {
+ private final static ESLogger logger = Loggers.getLogger(LoggingTaskListener.class);
+
+ /**
+ * Get the instance of NoopActionListener cast appropriately.
+ */
+ @SuppressWarnings("unchecked") // Safe because we only toString the response
+ public static <Response> TaskListener<Response> instance() {
+ return (TaskListener<Response>) INSTANCE;
+ }
+
+ private static final LoggingTaskListener<Object> INSTANCE = new LoggingTaskListener<Object>();
+
+ private LoggingTaskListener() {
+ }
+
+ @Override
+ public void onResponse(Task task, Response response) {
+ logger.info("{} finished with response {}", task.getId(), response);
+ }
+
+ @Override
+ public void onFailure(Task task, Throwable e) {
+ logger.warn("{} failed with exception", e, task.getId());
+ }
+}
diff --git a/core/src/main/java/org/elasticsearch/tasks/TaskListener.java b/core/src/main/java/org/elasticsearch/tasks/TaskListener.java
new file mode 100644
index 0000000000..6a0c36e0b8
--- /dev/null
+++ b/core/src/main/java/org/elasticsearch/tasks/TaskListener.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.tasks;
+
+/**
+ * Listener for Task success or failure.
+ */
+public interface TaskListener<Response> {
+ /**
+ * Handle task response. This response may constitute a failure or a success
+ * but it is up to the listener to make that decision.
+ *
+ * @param task
+ * the task being executed. May be null if the action doesn't
+ * create a task
+ * @param response
+ * the response from the action that executed the task
+ */
+ void onResponse(Task task, Response response);
+
+ /**
+ * A failure caused by an exception at some phase of the task.
+ *
+ * @param task
+ * the task being executed. May be null if the action doesn't
+ * create a task
+ * @param e
+ * the failure
+ */
+ void onFailure(Task task, Throwable e);
+
+}