summaryrefslogtreecommitdiff
path: root/ambari-logsearch/ambari-logsearch-logfeeder
diff options
context:
space:
mode:
authoroleewere <oleewere@gmail.com>2016-06-13 13:23:25 +0200
committeroleewere <oleewere@gmail.com>2016-06-13 16:24:06 +0200
commit151c0fd578ec3217293aceafa1f33b55b536a7ad (patch)
tree5f3569765999f8122a9b3e9d886340876be25c43 /ambari-logsearch/ambari-logsearch-logfeeder
parent8edd8fd3a2c3235ef7a80ecc962c700b49cb7872 (diff)
AMBARI-17136. If Solr is down or not ready, then LogFeeder to should retry (Bosco Durai via oleewere)
Diffstat (limited to 'ambari-logsearch/ambari-logsearch-logfeeder')
-rw-r--r--ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java102
1 files changed, 55 insertions, 47 deletions
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java
index 6fb0b0e423..b14c2735b1 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java
@@ -44,6 +44,7 @@ import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.impl.LBHttpSolrClient;
import org.apache.solr.client.solrj.response.SolrPingResponse;
import org.apache.solr.client.solrj.response.UpdateResponse;
+import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
public class OutputSolr extends Output {
@@ -299,33 +300,21 @@ public class OutputSolr extends Output {
if (localBuffer.size() > 0 && ((outputData == null && isDrain())
|| (nextDispatchDuration <= 0 || localBuffer.size() >= maxBufferSize))) {
- try {
- if (isComputeCurrentCollection) {
- // Compute the current router value
- addRouterField();
- }
-
- addToSolr(outputData);
-
- resetLocalBuffer();
- lastDispatchTime = System.currentTimeMillis();
- } catch (IOException ioException) {
- // Transient error, lets block till it is available
- waitForSolr();
- } catch (Throwable serverException) {
- // Clear the buffer
- resetLocalBuffer();
- String logMessageKey = this.getClass().getSimpleName() + "_SOLR_UPDATE_EXCEPTION";
- LogFeederUtil.logErrorMessageByInterval(logMessageKey,
- "Error sending log message to server. " + outputData, serverException, LOG, Level.ERROR);
+ boolean response = sendToSolr(outputData);
+ if( isDrain() && !response) {
+ //Since sending to Solr response failed and it is in draining mode, let's break;
+ LOG.warn("In drain mode and sending to Solr failed. So exiting. output="
+ + getShortDescription());
+ break;
}
+ lastDispatchTime = currTimeMS;
}
} catch (InterruptedException e) {
// Handle thread exiting
} catch (Throwable t) {
String logMessageKey = this.getClass().getSimpleName() + "_SOLR_MAINLOOP_EXCEPTION";
LogFeederUtil.logErrorMessageByInterval(logMessageKey, "Caught exception in main loop. " + outputData, t, LOG,
- Level.ERROR);
+ Level.ERROR);
}
}
@@ -335,6 +324,50 @@ public class OutputSolr extends Output {
LOG.info("Exiting Solr worker thread. output=" + getShortDescription());
}
+ /**
+ * This will loop till Solr is available and LogFeeder is
+ * successfully able to write to the collection or shard. It will block till
+ * it can write. The outgoingBuffer is a BlockingQueue and when it is full, it
+ * will automatically stop parsing the log files.
+ * @param outputData
+ * @return
+ */
+ private boolean sendToSolr(OutputData outputData) {
+ boolean result = false;
+ while (!isDrain()) {
+ try {
+ if (isComputeCurrentCollection) {
+ // Compute the current router value
+ addRouterField();
+ }
+ addToSolr(outputData);
+ resetLocalBuffer();
+ //Send successful, will return
+ result = true;
+ break;
+ } catch (IOException | SolrException exception) {
+ // Transient error, lets block till it is available
+ try {
+ LOG.warn("Solr is not reachable. Going to retry after "
+ + RETRY_INTERVAL + " seconds. " + "output="
+ + getShortDescription(), exception);
+ Thread.sleep(RETRY_INTERVAL * 1000);
+ } catch (Throwable t) {
+ // ignore
+ }
+ } catch (Throwable serverException) {
+ // Something unknown happened. Let's not block because of this error.
+ // Clear the buffer
+ String logMessageKey = this.getClass().getSimpleName() + "_SOLR_UPDATE_EXCEPTION";
+ LogFeederUtil.logErrorMessageByInterval(logMessageKey,
+ "Error sending log message to server. Dropping logs", serverException, LOG, Level.ERROR);
+ resetLocalBuffer();
+ break;
+ }
+ }
+ return result;
+ }
+
private OutputData getOutputData(long nextDispatchDuration) throws InterruptedException {
OutputData outputData = outgoingBuffer.poll();
if (outputData == null && !isDrain() && nextDispatchDuration > 0) {
@@ -380,7 +413,7 @@ public class OutputSolr extends Output {
}
for (SolrInputDocument solrInputDocument : localBuffer) {
- solrInputDocument.addField(ROUTER_FIELD, shard);
+ solrInputDocument.setField(ROUTER_FIELD, shard);
}
}
@@ -398,31 +431,6 @@ public class OutputSolr extends Output {
}
}
- private void waitForSolr() {
- while (!isDrain()) {
- try {
- LOG.warn(
- "Solr is down. Going to sleep for " + RETRY_INTERVAL + " seconds. " + "output=" + getShortDescription());
- Thread.sleep(RETRY_INTERVAL * 1000);
- } catch (Throwable t) {
- // ignore
- break;
- }
- if (isDrain()) {
- break;
- }
- try {
- SolrPingResponse pingResponse = solrClient.ping();
- if (pingResponse.getStatus() == 0) {
- LOG.info("Solr seems to be up now. Resuming... output=" + getShortDescription());
- break;
- }
- } catch (Throwable t) {
- // Ignore
- }
- }
- }
-
private void closeSolrClient() {
if (solrClient != null) {
try {
@@ -443,4 +451,4 @@ public class OutputSolr extends Output {
return localBuffer.isEmpty();
}
}
-} \ No newline at end of file
+}