summaryrefslogtreecommitdiff
path: root/ambari-logsearch/ambari-logsearch-logfeeder
diff options
context:
space:
mode:
authorMiklos Gergely <mgergely@hortonworks.com>2016-06-21 16:19:46 +0200
committeroleewere <oleewere@gmail.com>2016-06-21 16:34:49 +0200
commit92c8a265e85ba6783f26f1042dd8d13d7e3e4e15 (patch)
treef0a2f67f9d20426be095b66bdf1d4ffe114ce80f /ambari-logsearch/ambari-logsearch-logfeeder
parent3ce7d10b72605b900a8d70d402755149ba1cde79 (diff)
AMBARI-17277. Log Level filter not applied before Log Search Starts at first (Miklos Gergely via oleewere)
Diffstat (limited to 'ambari-logsearch/ambari-logsearch-logfeeder')
-rw-r--r--ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java4
-rw-r--r--ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/FetchConfigFromSolr.java4
-rw-r--r--ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogFeederConstants.java2
-rw-r--r--ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java71
-rw-r--r--ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/SolrUtil.java4
-rw-r--r--ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/logfeeder.properties3
-rw-r--r--ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputSolrTest.java2
7 files changed, 63 insertions, 27 deletions
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
index 166c0f39e0..d00ed67864 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
@@ -120,6 +120,9 @@ public class LogFeeder {
}
}
mergeAllConfigs();
+
+ LogfeederScheduler.INSTANCE.start();
+
outMgr.setOutputList(outputList);
for (Output output : outputList) {
output.init();
@@ -127,7 +130,6 @@ public class LogFeeder {
inputMgr.init();
metricsMgr.init();
//starting timer to fetch config from solr
- LogfeederScheduler.INSTANCE.start();
logger.debug("==============");
}
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/FetchConfigFromSolr.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/FetchConfigFromSolr.java
index f2d074a420..4240b86a3b 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/FetchConfigFromSolr.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/FetchConfigFromSolr.java
@@ -153,6 +153,10 @@ public class FetchConfigFromSolr extends Thread {
return defaultLevels;
}
+ public static boolean isFilterAvailable() {
+ return logfeederFilterWrapper != null;
+ }
+
public static VLogfeederFilter findComponentFilter(String componentName) {
if (logfeederFilterWrapper != null) {
HashMap<String, VLogfeederFilter> filter = logfeederFilterWrapper.getFilter();
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogFeederConstants.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogFeederConstants.java
index f61dc1b32a..f177e49e9c 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogFeederConstants.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogFeederConstants.java
@@ -21,7 +21,7 @@ package org.apache.ambari.logfeeder.logconfig;
public class LogFeederConstants {
public static final String ALL = "all";
- public static final String NAME = "log_feeder_config";
+ public static final String LOGFEEDER_FILTER_NAME = "log_feeder_config";
// solr fields
public static final String SOLR_LEVEL = "level";
public static final String SOLR_COMPONENT = "type";
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java
index a7202e7192..c945ed7ab2 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java
@@ -35,6 +35,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.ambari.logfeeder.LogFeederUtil;
import org.apache.ambari.logfeeder.input.InputMarker;
+import org.apache.ambari.logfeeder.logconfig.FetchConfigFromSolr;
import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
@@ -59,6 +60,8 @@ public class OutputSolr extends Output {
private static final int DEFAULT_SPLIT_INTERVAL = 30;
private static final int DEFAULT_NUMBER_OF_WORKERS = 1;
+ private static final int RETRY_INTERVAL = 30;
+
private String collection;
private String splitMode;
private int splitInterval;
@@ -81,7 +84,7 @@ public class OutputSolr extends Output {
createSolrWorkers();
}
- private void initParams() {
+ private void initParams() throws Exception {
statMetric.metricsName = "output.solr.write_logs";
writeBytesMetric.metricsName = "output.solr.write_bytes";
@@ -89,6 +92,8 @@ public class OutputSolr extends Output {
if (!splitMode.equalsIgnoreCase("none")) {
splitInterval = getIntValue("split_interval_mins", DEFAULT_SPLIT_INTERVAL);
}
+ isComputeCurrentCollection = !splitMode.equalsIgnoreCase("none");
+
numberOfShards = getIntValue("number_of_shards", DEFAULT_NUMBER_OF_SHARDS);
maxIntervalMS = getIntValue("idle_flush_time_ms", DEFAULT_MAX_INTERVAL_MS);
@@ -100,7 +105,12 @@ public class OutputSolr extends Output {
maxBufferSize = 1;
}
- LOG.info(String.format("Config: Number of workers=%d, splitMode=%s, splitInterval=%d, " + "numberOfShards=%d. "
+ collection = getStringValue("collection");
+ if (StringUtils.isEmpty(collection)) {
+ throw new Exception("Collection property is mandatory");
+ }
+
+ LOG.info(String.format("Config: Number of workers=%d, splitMode=%s, splitInterval=%d, numberOfShards=%d. "
+ getShortDescription(), workers, splitMode, splitInterval, numberOfShards));
}
@@ -135,45 +145,44 @@ public class OutputSolr extends Output {
}
SolrClient getSolrClient(String solrUrl, String zkHosts, int count) throws Exception, MalformedURLException {
- SolrClient solrClient = null;
+ SolrClient solrClient = createSolrClient(solrUrl, zkHosts, collection);
+ pingSolr(solrUrl, zkHosts, count, solrClient);
+ waitForConfig();
+
+ return solrClient;
+ }
+ private SolrClient createSolrClient(String solrUrl, String zkHosts, String collection) throws Exception, MalformedURLException {
+ SolrClient solrClient;
if (zkHosts != null) {
- solrClient = createCloudSolrClient(zkHosts);
+ solrClient = createCloudSolrClient(zkHosts, collection);
} else {
- solrClient = createHttpSolarClient(solrUrl);
+ solrClient = createHttpSolarClient(solrUrl, collection);
}
-
- pingSolr(solrUrl, zkHosts, count, solrClient);
-
return solrClient;
}
- private SolrClient createCloudSolrClient(String zkHosts) throws Exception {
+ private SolrClient createCloudSolrClient(String zkHosts, String collection) throws Exception {
LOG.info("Using zookeepr. zkHosts=" + zkHosts);
- collection = getStringValue("collection");
- if (StringUtils.isEmpty(collection)) {
- throw new Exception("For solr cloud property collection is mandatory");
- }
LOG.info("Using collection=" + collection);
CloudSolrClient solrClient = new CloudSolrClient(zkHosts);
solrClient.setDefaultCollection(collection);
- isComputeCurrentCollection = !splitMode.equalsIgnoreCase("none");
return solrClient;
}
- private SolrClient createHttpSolarClient(String solrUrl) throws MalformedURLException {
+ private SolrClient createHttpSolarClient(String solrUrl, String collection) throws MalformedURLException {
String[] solrUrls = StringUtils.split(solrUrl, ",");
if (solrUrls.length == 1) {
LOG.info("Using SolrURL=" + solrUrl);
- return new HttpSolrClient(solrUrl);
+ return new HttpSolrClient(solrUrl + "/" + collection);
} else {
LOG.info("Using load balance solr client. solrUrls=" + solrUrl);
- LOG.info("Initial URL for LB solr=" + solrUrls[0]);
- LBHttpSolrClient lbSolrClient = new LBHttpSolrClient(solrUrls[0]);
+ LOG.info("Initial URL for LB solr=" + solrUrls[0] + "/" + collection);
+ LBHttpSolrClient lbSolrClient = new LBHttpSolrClient(solrUrls[0] + "/" + collection);
for (int i = 1; i < solrUrls.length; i++) {
- LOG.info("Adding URL for LB solr=" + solrUrls[i]);
- lbSolrClient.addSolrServer(solrUrls[i]);
+ LOG.info("Adding URL for LB solr=" + solrUrls[i] + "/" + collection);
+ lbSolrClient.addSolrServer(solrUrls[i] + "/" + collection);
}
return lbSolrClient;
}
@@ -194,11 +203,30 @@ public class OutputSolr extends Output {
}
} catch (Throwable t) {
LOG.warn(String.format(
- "Ping to Solr server failed. It would check again. worker=%d, " + "solrUrl=%s, zkHosts=%s, collection=%s",
+ "Ping to Solr server failed. It would check again. worker=%d, solrUrl=%s, zkHosts=%s, collection=%s",
count, solrUrl, zkHosts, collection), t);
}
}
+ private void waitForConfig() throws SolrServerException, IOException {
+ if (!LogFeederUtil.getBooleanProperty("logfeeder.log.filter.enable", false)) {
+ return;
+ }
+
+ while (true) {
+ LOG.info("Checking if config is available");
+ if (FetchConfigFromSolr.isFilterAvailable()) {
+ LOG.info("Config is available");
+ return;
+ }
+ try {
+ Thread.sleep(RETRY_INTERVAL * 1000);
+ } catch (InterruptedException e) {
+ LOG.error(e);
+ }
+ }
+ }
+
private void createSolrWorkerThread(int count, SolrClient solrClient) {
SolrWorkerThread solrWorkerThread = new SolrWorkerThread(solrClient);
solrWorkerThread.setName(getNameForThread() + "," + collection + ",worker=" + count);
@@ -281,7 +309,6 @@ public class OutputSolr extends Output {
class SolrWorkerThread extends Thread {
private static final String ROUTER_FIELD = "_router_field_";
- private static final int RETRY_INTERVAL = 30;
private final SolrClient solrClient;
private final Collection<SolrInputDocument> localBuffer = new ArrayList<>();
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/SolrUtil.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/SolrUtil.java
index 29feef7b45..31fbdedb44 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/SolrUtil.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/SolrUtil.java
@@ -54,7 +54,7 @@ public class SolrUtil {
private SolrUtil() throws Exception {
String url = LogFeederUtil.getStringProperty("logfeeder.solr.url");
String zkHosts = LogFeederUtil.getStringProperty("logfeeder.solr.zkhosts");
- String collection = LogFeederUtil.getStringProperty("logfeeder.solr.core.history", "history");
+ String collection = LogFeederUtil.getStringProperty("logfeeder.solr.core.config.name", "history");
connectToSolr(url, zkHosts, collection);
}
@@ -180,7 +180,7 @@ public class SolrUtil {
HashMap<String, Object> configMap = new HashMap<String, Object>();
SolrQuery solrQuery = new SolrQuery();
solrQuery.setQuery("*:*");
- String fq = LogFeederConstants.ROW_TYPE + ":" + LogFeederConstants.NAME;
+ String fq = LogFeederConstants.ROW_TYPE + ":" + LogFeederConstants.LOGFEEDER_FILTER_NAME;
solrQuery.setFilterQueries(fq);
try {
QueryResponse response = process(solrQuery);
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/logfeeder.properties b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/logfeeder.properties
index 6cba826c03..b4655cce3a 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/logfeeder.properties
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/logfeeder.properties
@@ -19,7 +19,6 @@ logfeeder.metrics.collector.hosts=
#filter config
logfeeder.log.filter.enable=true
logfeeder.solr.config.interval=5
-logfeeder.solr.core.history=history
logfeeder.solr.zkhosts=
logfeeder.solr.url=
@@ -28,3 +27,5 @@ logfeeder.solr.jaas.file=/usr/lib/ambari-logsearch-logfeeder/logsearch_solr_jaas
#logfeeder tmp dir
logfeeder.tmp.dir=/tmp/$username/logfeeder/
+
+logfeeder.solr.core.config.name=history
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputSolrTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputSolrTest.java
index afbccca1e2..3014ed8342 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputSolrTest.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputSolrTest.java
@@ -83,6 +83,7 @@ public class OutputSolrTest {
Map<String, Object> config = new HashMap<String, Object>();
config.put("url", "some url");
config.put("workers", "3");
+ config.put("collection", "some collection");
outputSolr.loadConfig(config);
outputSolr.init();
@@ -153,6 +154,7 @@ public class OutputSolrTest {
Map<String, Object> config = new HashMap<String, Object>();
config.put("workers", "3");
+ config.put("collection", "some collection");
outputSolr.loadConfig(config);
outputSolr.init();