summaryrefslogtreecommitdiff
path: root/ambari-logsearch/ambari-logsearch-logfeeder
diff options
context:
space:
mode:
authoroleewere <oleewere@gmail.com>2016-06-20 18:29:55 +0200
committeroleewere <oleewere@gmail.com>2016-06-20 18:36:34 +0200
commit94716ff18c5df31da6a401921e8bbb6de294471a (patch)
tree79f37a79b18b5c1fba9e7e9c3799ede3d2972a07 /ambari-logsearch/ambari-logsearch-logfeeder
parent6b33a6c27d4e835d586a05cd22e34701f9e41a3b (diff)
AMBARI-17045. Support loading logs to S3 (Hayat Behlim via oleewere)
Diffstat (limited to 'ambari-logsearch/ambari-logsearch-logfeeder')
-rw-r--r--ambari-logsearch/ambari-logsearch-logfeeder/pom.xml17
-rw-r--r--ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/ConfigBlock.java3
-rw-r--r--ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/InputMgr.java35
-rw-r--r--ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java100
-rw-r--r--ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeederUtil.java68
-rw-r--r--ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/OutputMgr.java46
-rw-r--r--ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java4
-rw-r--r--ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java61
-rw-r--r--ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java494
-rw-r--r--ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/Output.java10
-rw-r--r--ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputDevNull.java10
-rw-r--r--ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputFile.java7
-rw-r--r--ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputKafka.java8
-rw-r--r--ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java227
-rw-r--r--ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java14
-rw-r--r--ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/s3/AWSUtil.java92
-rw-r--r--ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/s3/S3Util.java233
-rw-r--r--ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/CompressionUtil.java89
-rw-r--r--ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/FileUtil.java44
-rw-r--r--ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/PlaceholderUtil.java79
-rw-r--r--ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/alias_config.json8
-rw-r--r--ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/logfeeder.properties3
-rw-r--r--ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/s3/AWSUtilTest.java30
-rw-r--r--ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/s3/S3UtilTest.java44
-rw-r--r--ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/util/PlaceholderUtilTest.java48
25 files changed, 1697 insertions, 77 deletions
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml b/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml
index 5c30610edb..c7202c98cc 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml
@@ -120,7 +120,22 @@
<artifactId>ambari-metrics-common</artifactId>
<version>${project.version}</version>
</dependency>
- </dependencies>
+ <dependency>
+ <groupId>com.amazonaws</groupId>
+ <artifactId>aws-java-sdk-s3</artifactId>
+ <version>1.11.5</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-compress</artifactId>
+ <version>1.11</version>
+ </dependency>
+ <dependency>
+ <groupId>com.amazonaws</groupId>
+ <artifactId>aws-java-sdk-iam</artifactId>
+ <version>1.11.5</version>
+ </dependency>
+ </dependencies>
<build>
<finalName>LogFeeder</finalName>
<pluginManagement>
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/ConfigBlock.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/ConfigBlock.java
index 6b78e2addb..521319e647 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/ConfigBlock.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/ConfigBlock.java
@@ -22,11 +22,11 @@ package org.apache.ambari.logfeeder;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-
import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Logger;
import org.apache.log4j.Priority;
+
public abstract class ConfigBlock {
static private Logger logger = Logger.getLogger(ConfigBlock.class);
@@ -258,5 +258,4 @@ public abstract class ConfigBlock {
public void setDrain(boolean drain) {
this.drain = drain;
}
-
}
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/InputMgr.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/InputMgr.java
index 445c294191..4359c78ca1 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/InputMgr.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/InputMgr.java
@@ -65,6 +65,8 @@ public class InputMgr {
MetricCount filesCountMetric = new MetricCount();
private String checkPointExtension = ".cp";
+
+ private Thread inputIsReadyMonitor = null;
public List<Input> getInputList() {
return inputList;
@@ -224,7 +226,7 @@ public class InputMgr {
}
// Start the monitoring thread if any file is in tail mode
if (isAnyInputTail) {
- Thread monitorThread = new Thread("InputIsReadyMonitor") {
+ inputIsReadyMonitor = new Thread("InputIsReadyMonitor") {
@Override
public void run() {
logger.info("Going to monitor for these missing files: "
@@ -255,7 +257,7 @@ public class InputMgr {
}
}
};
- monitorThread.start();
+ inputIsReadyMonitor.start();
}
}
@@ -542,4 +544,33 @@ public class InputMgr {
}
+ /**
+ *
+ */
+ public void waitOnAllInputs() {
+ //wait on inputs
+ if (inputList != null) {
+ for (Input input : inputList) {
+ if (input != null) {
+ Thread inputThread = input.getThread();
+ if (inputThread != null) {
+ try {
+ inputThread.join();
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+ }
+ }
+ }
+ // wait on monitor
+ if (inputIsReadyMonitor != null) {
+ try {
+ this.close();
+ inputIsReadyMonitor.join();
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+ }
}
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
index f3dd4bf5f8..166c0f39e0 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
@@ -42,6 +42,7 @@ import org.apache.ambari.logfeeder.filter.Filter;
import org.apache.ambari.logfeeder.input.Input;
import org.apache.ambari.logfeeder.logconfig.LogfeederScheduler;
import org.apache.ambari.logfeeder.output.Output;
+import org.apache.ambari.logfeeder.util.FileUtil;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
@@ -57,16 +58,20 @@ public class LogFeeder {
InputMgr inputMgr = new InputMgr();
MetricsMgr metricsMgr = new MetricsMgr();
- Map<String, Object> globalMap = null;
+ public static Map<String, Object> globalMap = null;
String[] inputParams;
List<Map<String, Object>> globalConfigList = new ArrayList<Map<String, Object>>();
List<Map<String, Object>> inputConfigList = new ArrayList<Map<String, Object>>();
List<Map<String, Object>> filterConfigList = new ArrayList<Map<String, Object>>();
List<Map<String, Object>> outputConfigList = new ArrayList<Map<String, Object>>();
-
+
int checkPointCleanIntervalMS = 24 * 60 * 60 * 60 * 1000; // 24 hours
long lastCheckPointCleanedMS = 0;
+
+ private static boolean isLogfeederCompleted = false;
+
+ private Thread statLoggerThread = null;
public LogFeeder(String[] args) {
inputParams = args;
@@ -80,14 +85,26 @@ public class LogFeeder {
// loop the properties and load them
// Load the configs
String configFiles = LogFeederUtil.getStringProperty("logfeeder.config.files");
- if (configFiles == null) {
- configFiles = LogFeederUtil.getStringProperty("config.file",
- "config.json");
- }
logger.info("logfeeder.config.files=" + configFiles);
- String[] configFileList = configFiles.split(",");
- for (String configFileName : configFileList) {
+
+ String[] configFileList = null;
+ if (configFiles != null) {
+ configFileList = configFiles.split(",");
+ }
+ //list of config those are there in cmd line config dir , end with .json
+ String[] cmdLineConfigs = getConfigFromCmdLine();
+ //merge both config
+ String mergedConfigList[] = LogFeederUtil.mergeArray(configFileList,
+ cmdLineConfigs);
+ //mergedConfigList is null then set default conifg
+ if (mergedConfigList == null || mergedConfigList.length == 0) {
+ mergedConfigList = LogFeederUtil.getStringProperty("config.file",
+ "config.json").split(",");
+ }
+ for (String configFileName : mergedConfigList) {
logger.info("Going to load config file:" + configFileName);
+ //escape space from config file path
+ configFileName= configFileName.replace("\\ ", "%20");
File configFile = new File(configFileName);
if (configFile.exists() && configFile.isFile()) {
logger.info("Config file exists in path."
@@ -97,7 +114,7 @@ public class LogFeeder {
// Let's try to load it from class loader
logger.info("Trying to load config file from classloader: "
+ configFileName);
- laodConfigsUsingClassLoader(configFileName);
+ loadConfigsUsingClassLoader(configFileName);
logger.info("Loaded config file from classloader: "
+ configFileName);
}
@@ -114,7 +131,7 @@ public class LogFeeder {
logger.debug("==============");
}
- void laodConfigsUsingClassLoader(String configFileName) throws Exception {
+ void loadConfigsUsingClassLoader(String configFileName) throws Exception {
BufferedInputStream fileInputStream = (BufferedInputStream) this
.getClass().getClassLoader()
.getResourceAsStream(configFileName);
@@ -451,7 +468,7 @@ public class LogFeeder {
inputMgr.monitor();
Runtime.getRuntime().addShutdownHook(new JVMShutdownHook());
- Thread statLogger = new Thread("statLogger") {
+ statLoggerThread = new Thread("statLogger") {
@Override
public void run() {
@@ -473,12 +490,17 @@ public class LogFeeder {
lastCheckPointCleanedMS = System.currentTimeMillis();
inputMgr.cleanCheckPointFiles();
}
+
+ // logfeeder is stopped then break the loop
+ if (isLogfeederCompleted) {
+ break;
+ }
}
}
};
- statLogger.setDaemon(true);
- statLogger.start();
+ statLoggerThread.setDaemon(true);
+ statLoggerThread.start();
}
@@ -524,24 +546,25 @@ public class LogFeeder {
public static void main(String[] args) {
LogFeeder logFeeder = new LogFeeder(args);
- logFeeder.run(logFeeder);
+ logFeeder.run();
}
public static void run(String[] args) {
LogFeeder logFeeder = new LogFeeder(args);
- logFeeder.run(logFeeder);
+ logFeeder.run();
}
- public void run(LogFeeder logFeeder) {
+ public void run() {
try {
Date startTime = new Date();
- logFeeder.init();
+ this.init();
Date endTime = new Date();
logger.info("Took " + (endTime.getTime() - startTime.getTime())
+ " ms to initialize");
- logFeeder.monitor();
-
+ this.monitor();
+ //wait for all background thread before stop main thread
+ this.waitOnAllDaemonThreads();
} catch (Throwable t) {
logger.fatal("Caught exception in main.", t);
System.exit(1);
@@ -566,5 +589,42 @@ public class LogFeeder {
}
}
}
-
+
+ public void waitOnAllDaemonThreads() {
+ String foreground = LogFeederUtil.getStringProperty("foreground");
+ if (foreground != null && foreground.equalsIgnoreCase("true")) {
+ // wait on inputmgr daemon threads
+ inputMgr.waitOnAllInputs();
+ // set isLogfeederCompleted to true to stop statLoggerThread
+ isLogfeederCompleted = true;
+ if (statLoggerThread != null) {
+ try {
+ statLoggerThread.join();
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+
+ private String[] getConfigFromCmdLine() {
+ String inputConfigDir = LogFeederUtil.getStringProperty("input_config_dir");
+ if (inputConfigDir != null && !inputConfigDir.isEmpty()) {
+ String[] searchFileWithExtensions = new String[] { "json" };
+ File configDirFile = new File(inputConfigDir);
+ List<File> configFiles = FileUtil.getAllFileFromDir(configDirFile,
+ searchFileWithExtensions, false);
+ if (configFiles != null && configFiles.size() > 0) {
+ String configPaths[] = new String[configFiles.size()];
+ for (int index = 0; index < configFiles.size(); index++) {
+ File configFile = configFiles.get(index);
+ String configFilePath = configFile.getAbsolutePath();
+ configPaths[index] = configFilePath;
+ }
+ return configPaths;
+ }
+ }
+ return new String[0];
+ }
}
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeederUtil.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeederUtil.java
index 7a30d724f7..7a68b4d4a2 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeederUtil.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeederUtil.java
@@ -24,7 +24,9 @@ import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.lang.reflect.Type;
+import java.net.InetAddress;
import java.net.URL;
+import java.net.UnknownHostException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
@@ -40,6 +42,7 @@ import org.apache.ambari.logfeeder.input.Input;
import org.apache.ambari.logfeeder.logconfig.LogFeederConstants;
import org.apache.ambari.logfeeder.mapper.Mapper;
import org.apache.ambari.logfeeder.output.Output;
+import org.apache.ambari.logfeeder.util.PlaceholderUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
@@ -48,6 +51,7 @@ import org.codehaus.jackson.map.JsonMappingException;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.type.TypeReference;
+import com.google.common.collect.ObjectArrays;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.reflect.TypeToken;
@@ -68,6 +72,18 @@ public class LogFeederUtil {
private static Map<String, LogHistory> logHistoryList = new Hashtable<String, LogHistory>();
private static int logInterval = 30000; // 30 seconds
+ public static String hostName = null;
+ public static String ipAddress = null;
+
+ public static String logfeederTempDir = null;
+
+ public static final Object _LOCK = new Object();
+
+ static{
+ //set hostname and hostIp
+ setHostNameAndIP();
+ }
+
public static Gson getGson() {
return gson;
}
@@ -483,5 +499,57 @@ public class LogFeederUtil {
}
return false;
}
+
+
+ public static synchronized String setHostNameAndIP() {
+ if (hostName == null || ipAddress == null) {
+ try {
+ InetAddress ip = InetAddress.getLocalHost();
+ ipAddress = ip.getHostAddress();
+ String getHostName = ip.getHostName();
+ String getCanonicalHostName = ip.getCanonicalHostName();
+ if (!getCanonicalHostName.equalsIgnoreCase(ipAddress)) {
+ logger.info("Using getCanonicalHostName()=" + getCanonicalHostName);
+ hostName = getCanonicalHostName;
+ } else {
+ logger.info("Using getHostName()=" + getHostName);
+ hostName = getHostName;
+ }
+ logger.info("ipAddress=" + ipAddress + ", getHostName=" + getHostName
+ + ", getCanonicalHostName=" + getCanonicalHostName + ", hostName="
+ + hostName);
+ } catch (UnknownHostException e) {
+ logger.error("Error getting hostname.", e);
+ }
+ }
+ return hostName;
+ }
+ public static String[] mergeArray(String[] first, String[] second) {
+ if (first == null) {
+ first = new String[0];
+ }
+ if (second == null) {
+ second = new String[0];
+ }
+ String[] mergedArray = ObjectArrays.concat(first, second, String.class);
+ return mergedArray;
+ }
+
+ public static String getLogfeederTempDir() {
+ if (logfeederTempDir == null) {
+ synchronized (_LOCK) {
+ if (logfeederTempDir == null) {
+ String tempDirValue = getStringProperty("logfeeder.tmp.dir",
+ "/tmp/$username/logfeeder/");
+ HashMap<String, String> contextParam = new HashMap<String, String>();
+ String username = System.getProperty("user.name");
+ contextParam.put("username", username);
+ logfeederTempDir = PlaceholderUtil.replaceVariables(tempDirValue,
+ contextParam);
+ }
+ }
+ }
+ return logfeederTempDir;
+ }
}
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/OutputMgr.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/OutputMgr.java
index 841387845d..f84457e45d 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/OutputMgr.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/OutputMgr.java
@@ -19,8 +19,7 @@
package org.apache.ambari.logfeeder;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
+import java.io.File;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
@@ -40,34 +39,13 @@ public class OutputMgr {
Collection<Output> outputList = new ArrayList<Output>();
- String hostName = null;
- String ipAddress = null;
boolean addMessageMD5 = true;
private int MAX_OUTPUT_SIZE = 32765; // 32766-1
static long doc_counter = 0;
public MetricCount messageTruncateMetric = new MetricCount();
- public OutputMgr() {
- // Set the host for this server
- try {
- InetAddress ip = InetAddress.getLocalHost();
- ipAddress = ip.getHostAddress();
- String getHostName = ip.getHostName();
- String getCanonicalHostName = ip.getCanonicalHostName();
- if (!getCanonicalHostName.equalsIgnoreCase(ipAddress)) {
- logger.info("Using getCanonicalHostName()=" + getCanonicalHostName);
- hostName = getCanonicalHostName;
- } else {
- logger.info("Using getHostName()=" + getHostName);
- hostName = getHostName;
- }
- logger.info("ipAddress=" + ipAddress + ", getHostName=" + getHostName + ", getCanonicalHostName=" + getCanonicalHostName + ", hostName=" + hostName);
- } catch (UnknownHostException e) {
- logger.error("Error getting hostname.", e);
- }
- }
-
+
public Collection<Output> getOutputList() {
return outputList;
}
@@ -106,12 +84,12 @@ public class OutputMgr {
}
// Add host if required
- if (jsonObj.get("host") == null && hostName != null) {
- jsonObj.put("host", hostName);
+ if (jsonObj.get("host") == null && LogFeederUtil.hostName != null) {
+ jsonObj.put("host", LogFeederUtil.hostName);
}
// Add IP if required
- if (jsonObj.get("ip") == null && ipAddress != null) {
- jsonObj.put("ip", ipAddress);
+ if (jsonObj.get("ip") == null && LogFeederUtil.ipAddress != null) {
+ jsonObj.put("ip", LogFeederUtil.ipAddress);
}
if (input.isUseEventMD5() || input.isGenEventMD5()) {
@@ -278,4 +256,16 @@ public class OutputMgr {
}
}
+
+ public void copyFile(File inputFile, InputMarker inputMarker) {
+ Input input = inputMarker.input;
+ for (Output output : input.getOutputList()) {
+ try {
+ output.copyFile(inputFile, inputMarker);
+ }catch (Exception e) {
+ logger.error("Error coyping file . to " + output.getShortDescription(),
+ e);
+ }
+ }
+ }
}
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java
index ec75f2d78e..18e21843cc 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java
@@ -317,5 +317,9 @@ public abstract class Input extends ConfigBlock implements Runnable {
}
metricsList.add(readBytesMetric);
}
+
+ public Thread getThread(){
+ return thread;
+ }
}
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java
index 420610a424..7107a692ed 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java
@@ -251,31 +251,36 @@ public class InputFile extends Input {
*/
@Override
void start() throws Exception {
+
if (logPathFiles == null || logPathFiles.length == 0) {
return;
}
-
- if (isTail()) {
- // Just process the first file
- processFile(logPathFiles[0]);
- } else {
- for (File file : logPathFiles) {
- try {
- processFile(file);
- if (isClosed() || isDrain()) {
- logger.info("isClosed or isDrain. Now breaking loop.");
- break;
+ boolean isProcessFile = getBooleanValue("process_file", true);
+ if (isProcessFile) {
+ if (isTail()) {
+ // Just process the first file
+ processFile(logPathFiles[0]);
+ } else {
+ for (File file : logPathFiles) {
+ try {
+ processFile(file);
+ if (isClosed() || isDrain()) {
+ logger.info("isClosed or isDrain. Now breaking loop.");
+ break;
+ }
+ } catch (Throwable t) {
+ logger.error("Error processing file=" + file.getAbsolutePath(), t);
}
- } catch (Throwable t) {
- logger.error(
- "Error processing file=" + file.getAbsolutePath(),
- t);
}
}
+ // Call the close for the input. Which should flush to the filters and
+ // output
+ close();
+ }else{
+ //copy files
+ copyFiles(logPathFiles);
}
- // Call the close for the input. Which should flush to the filters and
- // output
- close();
+
}
@Override
@@ -559,4 +564,24 @@ public class InputFile extends Input {
+ (logPathFiles != null && logPathFiles.length > 0 ? logPathFiles[0]
.getAbsolutePath() : getStringValue("path"));
}
+
+
+ public void copyFiles(File[] files) {
+ boolean isCopyFile = getBooleanValue("copy_file", false);
+ if (isCopyFile && files != null) {
+ for (File file : files) {
+ try {
+ InputMarker marker = new InputMarker();
+ marker.input = this;
+ outputMgr.copyFile(file, marker);
+ if (isClosed() || isDrain()) {
+ logger.info("isClosed or isDrain. Now breaking loop.");
+ break;
+ }
+ } catch (Throwable t) {
+ logger.error("Error processing file=" + file.getAbsolutePath(), t);
+ }
+ }
+ }
+ }
}
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java
new file mode 100644
index 0000000000..d68ab96105
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.input;
+
+import java.io.BufferedReader;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.ambari.logfeeder.LogFeederUtil;
+import org.apache.ambari.logfeeder.s3.S3Util;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.solr.common.util.Base64;
+
+public class InputS3File extends Input {
+ static private Logger logger = Logger.getLogger(InputS3File.class);
+
+ String logPath = null;
+ boolean isStartFromBegining = true;
+
+ boolean isReady = false;
+ String[] s3LogPathFiles = null;
+ Object fileKey = null;
+ String base64FileKey = null;
+
+ private boolean isRolledOver = false;
+ boolean addWildCard = false;
+
+ long lastCheckPointTimeMS = 0;
+ int checkPointIntervalMS = 5 * 1000; // 5 seconds
+ RandomAccessFile checkPointWriter = null;
+ Map<String, Object> jsonCheckPoint = null;
+
+ File checkPointFile = null;
+
+ private InputMarker lastCheckPointInputMarker = null;
+
+ private String checkPointExtension = ".cp";
+
+
+ @Override
+ public void init() throws Exception {
+ logger.info("init() called");
+ statMetric.metricsName = "input.files.read_lines";
+ readBytesMetric.metricsName = "input.files.read_bytes";
+ checkPointExtension = LogFeederUtil.getStringProperty(
+ "logfeeder.checkpoint.extension", checkPointExtension);
+
+ // Let's close the file and set it to true after we start monitoring it
+ setClosed(true);
+ logPath = getStringValue("path");
+ tail = getBooleanValue("tail", tail);
+ addWildCard = getBooleanValue("add_wild_card", addWildCard);
+ checkPointIntervalMS = getIntValue("checkpoint.interval.ms",
+ checkPointIntervalMS);
+ if (logPath == null || logPath.isEmpty()) {
+ logger.error("path is empty for file input. " + getShortDescription());
+ return;
+ }
+
+ String startPosition = getStringValue("start_position");
+ if (StringUtils.isEmpty(startPosition)
+ || startPosition.equalsIgnoreCase("beginning")
+ || startPosition.equalsIgnoreCase("begining")) {
+ isStartFromBegining = true;
+ }
+
+ if (!tail) {
+ // start position end doesn't apply if we are not tailing
+ isStartFromBegining = true;
+ }
+
+ setFilePath(logPath);
+ boolean isFileReady = isReady();
+
+ logger.info("File to monitor " + logPath + ", tail=" + tail
+ + ", addWildCard=" + addWildCard + ", isReady=" + isFileReady);
+
+ super.init();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.ambari.logfeeder.input.Input#isReady()
+ */
+ @Override
+ public boolean isReady() {
+ if (!isReady) {
+ // Let's try to check whether the file is available
+ s3LogPathFiles = getActualFiles(logPath);
+ if (s3LogPathFiles != null && s3LogPathFiles.length > 0) {
+ if (isTail() && s3LogPathFiles.length > 1) {
+ logger.warn("Found multiple files (" + s3LogPathFiles.length
+ + ") for the file filter " + filePath
+ + ". Will use only the first one. Using " + s3LogPathFiles[0]);
+ }
+ logger.info("File filter " + filePath + " expanded to "
+ + s3LogPathFiles[0]);
+ isReady = true;
+ } else {
+ logger.debug(logPath + " file doesn't exist. Ignoring for now");
+ }
+ }
+ return isReady;
+ }
+
+ private String[] getActualFiles(String searchPath) {
+ // TODO search file on s3
+ return new String[] { searchPath };
+ }
+
+ @Override
+ synchronized public void checkIn(InputMarker inputMarker) {
+ super.checkIn(inputMarker);
+ if (checkPointWriter != null) {
+ try {
+ int lineNumber = LogFeederUtil.objectToInt(
+ jsonCheckPoint.get("line_number"), 0, "line_number");
+ if (lineNumber > inputMarker.lineNumber) {
+ // Already wrote higher line number for this input
+ return;
+ }
+ // If interval is greater than last checkPoint time, then write
+ long currMS = System.currentTimeMillis();
+ if (!isClosed()
+ && (currMS - lastCheckPointTimeMS) < checkPointIntervalMS) {
+ // Let's save this one so we can update the check point file
+ // on flush
+ lastCheckPointInputMarker = inputMarker;
+ return;
+ }
+ lastCheckPointTimeMS = currMS;
+
+ jsonCheckPoint.put("line_number", ""
+ + new Integer(inputMarker.lineNumber));
+ jsonCheckPoint.put("last_write_time_ms", "" + new Long(currMS));
+ jsonCheckPoint.put("last_write_time_date", new Date());
+
+ String jsonStr = LogFeederUtil.getGson().toJson(jsonCheckPoint);
+
+ // Let's rewind
+ checkPointWriter.seek(0);
+ checkPointWriter.writeInt(jsonStr.length());
+ checkPointWriter.write(jsonStr.getBytes());
+
+ if (isClosed()) {
+ final String LOG_MESSAGE_KEY = this.getClass().getSimpleName()
+ + "_FINAL_CHECKIN";
+ LogFeederUtil.logErrorMessageByInterval(LOG_MESSAGE_KEY,
+ "Wrote final checkPoint, input=" + getShortDescription()
+ + ", checkPointFile=" + checkPointFile.getAbsolutePath()
+ + ", checkPoint=" + jsonStr, null, logger, Level.INFO);
+ }
+ } catch (Throwable t) {
+ final String LOG_MESSAGE_KEY = this.getClass().getSimpleName()
+ + "_CHECKIN_EXCEPTION";
+ LogFeederUtil.logErrorMessageByInterval(LOG_MESSAGE_KEY,
+ "Caught exception checkIn. , input=" + getShortDescription(), t,
+ logger, Level.ERROR);
+ }
+ }
+
+ }
+
+ @Override
+ public void checkIn() {
+ super.checkIn();
+ if (lastCheckPointInputMarker != null) {
+ checkIn(lastCheckPointInputMarker);
+ }
+ }
+
+ @Override
+ public void rollOver() {
+ logger.info("Marking this input file for rollover. "
+ + getShortDescription());
+ isRolledOver = true;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.ambari.logfeeder.input.Input#monitor()
+ */
+ @Override
+ void start() throws Exception {
+ if (s3LogPathFiles == null || s3LogPathFiles.length == 0) {
+ return;
+ }
+
+ if (isTail()) {
+ // Just process the first file
+ processFile(s3LogPathFiles[0]);
+ } else {
+ for (String s3FilePath : s3LogPathFiles) {
+ try {
+ processFile(s3FilePath);
+ if (isClosed() || isDrain()) {
+ logger.info("isClosed or isDrain. Now breaking loop.");
+ break;
+ }
+ } catch (Throwable t) {
+ logger.error("Error processing file=" + s3FilePath, t);
+ }
+ }
+ }
+ // Call the close for the input. Which should flush to the filters and
+ // output
+ close();
+ }
+
+ @Override
+ public void close() {
+ super.close();
+ logger.info("close() calling checkPoint checkIn(). "
+ + getShortDescription());
+ checkIn();
+ }
+
+ private void processFile(String logPathFile) throws FileNotFoundException,
+ IOException {
+ logger.info("Monitoring logPath=" + logPath + ", logPathFile="
+ + logPathFile);
+ BufferedReader br = null;
+ checkPointFile = null;
+ checkPointWriter = null;
+ jsonCheckPoint = null;
+ int resumeFromLineNumber = 0;
+
+ int lineCount = 0;
+ try {
+ setFilePath(logPathFile);
+ String s3AccessKey = getStringValue("s3_access_key");
+ String s3SecretKey = getStringValue("s3_secret_key");
+ br = S3Util.INSTANCE.getReader(logPathFile,s3AccessKey,s3SecretKey);
+ if(br==null){
+ //log err
+ return;
+ }
+
+ // Whether to send to output from the beginning.
+ boolean resume = isStartFromBegining;
+
+ // Seems FileWatch is not reliable, so let's only use file key
+ // comparison
+ // inputMgr.monitorSystemFileChanges(this);
+ fileKey = getFileKey(logPathFile);
+ base64FileKey = Base64.byteArrayToBase64(fileKey.toString().getBytes());
+ logger.info("fileKey=" + fileKey + ", base64=" + base64FileKey + ". "
+ + getShortDescription());
+
+ if (isTail()) {
+ try {
+ // Let's see if there is a checkpoint for this file
+ logger.info("Checking existing checkpoint file. "
+ + getShortDescription());
+
+ String fileBase64 = Base64.byteArrayToBase64(fileKey.toString()
+ .getBytes());
+ String checkPointFileName = fileBase64 + checkPointExtension;
+ File checkPointFolder = inputMgr.getCheckPointFolderFile();
+ checkPointFile = new File(checkPointFolder, checkPointFileName);
+ checkPointWriter = new RandomAccessFile(checkPointFile, "rw");
+
+ try {
+ int contentSize = checkPointWriter.readInt();
+ byte b[] = new byte[contentSize];
+ int readSize = checkPointWriter.read(b, 0, contentSize);
+ if (readSize != contentSize) {
+ logger
+ .error("Couldn't read expected number of bytes from checkpoint file. expected="
+ + contentSize
+ + ", read="
+ + readSize
+ + ", checkPointFile="
+ + checkPointFile
+ + ", input="
+ + getShortDescription());
+ } else {
+ // Create JSON string
+ String jsonCheckPointStr = new String(b, 0, readSize);
+ jsonCheckPoint = LogFeederUtil.toJSONObject(jsonCheckPointStr);
+
+ resumeFromLineNumber = LogFeederUtil.objectToInt(
+ jsonCheckPoint.get("line_number"), 0, "line_number");
+
+ if (resumeFromLineNumber > 0) {
+ // Let's read from last line read
+ resume = false;
+ }
+ logger.info("CheckPoint. checkPointFile=" + checkPointFile
+ + ", json=" + jsonCheckPointStr + ", resumeFromLineNumber="
+ + resumeFromLineNumber + ", resume=" + resume);
+ }
+ } catch (EOFException eofEx) {
+ logger.info("EOFException. Will reset checkpoint file "
+ + checkPointFile.getAbsolutePath() + " for "
+ + getShortDescription());
+ }
+ if (jsonCheckPoint == null) {
+ // This seems to be first time, so creating the initial
+ // checkPoint object
+ jsonCheckPoint = new HashMap<String, Object>();
+ jsonCheckPoint.put("file_path", filePath);
+ jsonCheckPoint.put("file_key", fileBase64);
+ }
+
+ } catch (Throwable t) {
+ logger.error(
+ "Error while configuring checkpoint file. Will reset file. checkPointFile="
+ + checkPointFile, t);
+ }
+ }
+
+ setClosed(false);
+ int sleepStep = 2;
+ int sleepIteration = 0;
+ while (true) {
+ try {
+ if (isDrain()) {
+ break;
+ }
+
+ String line = br.readLine();
+ if (line == null) {
+ if (!resume) {
+ resume = true;
+ }
+ sleepIteration++;
+ try {
+ // Since FileWatch service is not reliable, we will
+ // check
+ // file inode every n seconds after no write
+ if (sleepIteration > 4) {
+ Object newFileKey = getFileKey(logPathFile);
+ if (newFileKey != null) {
+ if (fileKey == null || !newFileKey.equals(fileKey)) {
+ logger
+ .info("File key is different. Calling rollover. oldKey="
+ + fileKey
+ + ", newKey="
+ + newFileKey
+ + ". "
+ + getShortDescription());
+ // File has rotated.
+ rollOver();
+ }
+ }
+ }
+ // Flush on the second iteration
+ if (!tail && sleepIteration >= 2) {
+ logger.info("End of file. Done with filePath=" + logPathFile
+ + ", lineCount=" + lineCount);
+ flush();
+ break;
+ } else if (sleepIteration == 2) {
+ flush();
+ } else if (sleepIteration >= 2) {
+ if (isRolledOver) {
+ isRolledOver = false;
+ // Close existing file
+ try {
+ logger
+ .info("File is rolled over. Closing current open file."
+ + getShortDescription() + ", lineCount="
+ + lineCount);
+ br.close();
+ } catch (Exception ex) {
+ logger.error("Error closing file" + getShortDescription());
+ break;
+ }
+ try {
+ // Open new file
+ logger.info("Opening new rolled over file."
+ + getShortDescription());
+ br = S3Util.INSTANCE.getReader(logPathFile,s3AccessKey,s3SecretKey);
+ lineCount = 0;
+ fileKey = getFileKey(logPathFile);
+ base64FileKey = Base64.byteArrayToBase64(fileKey.toString()
+ .getBytes());
+ logger.info("fileKey=" + fileKey + ", base64="
+ + base64FileKey + ", " + getShortDescription());
+ } catch (Exception ex) {
+ logger.error("Error opening rolled over file. "
+ + getShortDescription());
+ // Let's add this to monitoring and exit
+ // this
+ // thread
+ logger.info("Added input to not ready list."
+ + getShortDescription());
+ isReady = false;
+ inputMgr.addToNotReady(this);
+ break;
+ }
+ logger.info("File is successfully rolled over. "
+ + getShortDescription());
+ continue;
+ }
+ }
+ Thread.sleep(sleepStep * 1000);
+ sleepStep = (sleepStep * 2);
+ sleepStep = sleepStep > 10 ? 10 : sleepStep;
+ } catch (InterruptedException e) {
+ logger.info("Thread interrupted." + getShortDescription());
+ }
+ } else {
+ lineCount++;
+ sleepStep = 1;
+ sleepIteration = 0;
+
+ if (!resume && lineCount > resumeFromLineNumber) {
+ logger.info("Resuming to read from last line. lineCount="
+ + lineCount + ", input=" + getShortDescription());
+ resume = true;
+ }
+ if (resume) {
+ InputMarker marker = new InputMarker();
+ marker.fileKey = fileKey;
+ marker.base64FileKey = base64FileKey;
+ marker.filePath = filePath;
+ marker.input = this;
+ marker.lineNumber = lineCount;
+ outputLine(line, marker);
+ }
+ }
+ } catch (Throwable t) {
+ final String LOG_MESSAGE_KEY = this.getClass().getSimpleName()
+ + "_READ_LOOP_EXCEPTION";
+ LogFeederUtil.logErrorMessageByInterval(LOG_MESSAGE_KEY,
+ "Caught exception in read loop. lineNumber=" + lineCount
+ + ", input=" + getShortDescription(), t, logger, Level.ERROR);
+
+ }
+ }
+ } finally {
+ if (br != null) {
+ logger.info("Closing reader." + getShortDescription() + ", lineCount="
+ + lineCount);
+ try {
+ br.close();
+ } catch (Throwable t) {
+ // ignore
+ }
+ }
+ }
+ }
+
+ /**
+ * @param s3FilePath
+ * @return
+ */
+ static public Object getFileKey(String s3FilePath) {
+ return s3FilePath.toString();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.ambari.logfeeder.input.Input#getShortDescription()
+ */
+ @Override
+ public String getShortDescription() {
+ return "input:source="
+ + getStringValue("source")
+ + ", path="
+ + (s3LogPathFiles != null && s3LogPathFiles.length > 0 ? s3LogPathFiles[0]
+ : getStringValue("path"));
+ }
+
+}
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/Output.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/Output.java
index c067680e4d..99a2909e40 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/Output.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/Output.java
@@ -19,6 +19,7 @@
package org.apache.ambari.logfeeder.output;
+import java.io.File;
import java.lang.reflect.Type;
import java.util.List;
import java.util.Map;
@@ -27,7 +28,6 @@ import java.util.Map.Entry;
import org.apache.ambari.logfeeder.ConfigBlock;
import org.apache.ambari.logfeeder.LogFeederUtil;
import org.apache.ambari.logfeeder.MetricCount;
-import org.apache.ambari.logfeeder.input.Input;
import org.apache.ambari.logfeeder.input.InputMarker;
import org.apache.log4j.Logger;
@@ -57,9 +57,11 @@ public abstract class Output extends ConfigBlock {
return super.getNameForThread();
}
- public void write(String block, InputMarker inputMarker) throws Exception {
- // No-op. Please implement in sub classes
- }
+ public abstract void write(String block, InputMarker inputMarker)
+ throws Exception;
+
+ public abstract void copyFile(File inputFile, InputMarker inputMarker)
+ throws UnsupportedOperationException;
/**
* @param jsonObj
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputDevNull.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputDevNull.java
index b6188cb35f..7cfcb98184 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputDevNull.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputDevNull.java
@@ -18,6 +18,8 @@
*/
package org.apache.ambari.logfeeder.output;
+import java.io.File;
+
import org.apache.ambari.logfeeder.input.InputMarker;
import org.apache.log4j.Logger;
@@ -30,8 +32,14 @@ public class OutputDevNull extends Output {
private static Logger logger = Logger.getLogger(OutputDevNull.class);
@Override
- public void write(String block, InputMarker inputMarker) throws Exception {
+ public void write(String block, InputMarker inputMarker){
// just ignore the logs
logger.trace("Ignore log block: " + block);
}
+
+ @Override
+ public void copyFile(File inputFile, InputMarker inputMarker) {
+ throw new UnsupportedOperationException(
+ "copyFile method is not yet supported for output=dev_null");
+ }
}
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputFile.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputFile.java
index b6e36d692b..4327f6f626 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputFile.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputFile.java
@@ -135,4 +135,11 @@ public class OutputFile extends Output {
return "output:destination=file,path=" + filePath;
}
+ @Override
+ public void copyFile(File inputFile, InputMarker inputMarker)
+ throws UnsupportedOperationException {
+ throw new UnsupportedOperationException(
+ "copyFile method is not yet supported for output=file");
+ }
+
}
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputKafka.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputKafka.java
index efbc366c10..120d071ae3 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputKafka.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputKafka.java
@@ -19,6 +19,7 @@
package org.apache.ambari.logfeeder.output;
+import java.io.File;
import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
@@ -283,4 +284,11 @@ public class OutputKafka extends Output {
}
}
}
+
+ @Override
+ public void copyFile(File inputFile, InputMarker inputMarker)
+ throws UnsupportedOperationException {
+ throw new UnsupportedOperationException(
+ "copyFile method is not yet supported for output=kafka");
+ }
} \ No newline at end of file
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java
new file mode 100644
index 0000000000..4cdf82d196
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.output;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.ambari.logfeeder.LogFeeder;
+import org.apache.ambari.logfeeder.LogFeederUtil;
+import org.apache.ambari.logfeeder.filter.Filter;
+import org.apache.ambari.logfeeder.input.InputMarker;
+import org.apache.ambari.logfeeder.s3.S3Util;
+import org.apache.ambari.logfeeder.util.CompressionUtil;
+import org.apache.ambari.logfeeder.util.PlaceholderUtil;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+
+/**
+ *
+ * Write log file into s3 bucket
+ */
+public class OutputS3File extends Output {
+
+
+ private static boolean uploadedGlobalConfig = false;
+
+ @Override
+ public void copyFile(File inputFile, InputMarker inputMarker) {
+ String bucketName = getStringValue("s3_bucket");
+ String s3LogDir = getStringValue("s3_log_dir");
+ HashMap<String, String> contextParam = buildContextParam();
+ s3LogDir = PlaceholderUtil.replaceVariables(s3LogDir, contextParam);
+ String s3AccessKey = getStringValue("s3_access_key");
+ String s3SecretKey = getStringValue("s3_secret_key");
+ String compressionAlgo = getStringValue("compression_algo");
+ String fileName = inputFile.getName();
+ // create tmp compressed File
+ String tmpDir = LogFeederUtil.getLogfeederTempDir();
+ File outputFile = new File(tmpDir + fileName + "_"
+ + new Date().getTime() + "." + compressionAlgo);
+ outputFile = CompressionUtil.compressFile(inputFile, outputFile,
+ compressionAlgo);
+ String type = inputMarker.input.getStringValue("type");
+ String s3Path = s3LogDir + S3Util.INSTANCE.S3_PATH_SEPARATOR + type
+ + S3Util.INSTANCE.S3_PATH_SEPARATOR + fileName + "."
+ + compressionAlgo;
+ S3Util.INSTANCE.uploadFileTos3(bucketName, s3Path, outputFile, s3AccessKey,
+ s3SecretKey);
+ // delete local compressed file
+ outputFile.deleteOnExit();
+ ArrayList<Map<String, Object>> filters = new ArrayList<Map<String, Object>>();
+ addFilters(filters, inputMarker.input.getFirstFilter());
+ Map<String, Object> inputConfig = new HashMap<String, Object>();
+ inputConfig.putAll(inputMarker.input.getConfigs());
+ String s3CompletePath = S3Util.INSTANCE.S3_PATH_START_WITH + bucketName
+ + S3Util.INSTANCE.S3_PATH_SEPARATOR + s3Path;
+ inputConfig.put("path", s3CompletePath);
+
+ ArrayList<Map<String, Object>> inputConfigList = new ArrayList<Map<String, Object>>();
+ inputConfigList.add(inputConfig);
+ // set source s3_file
+ // remove global config from filter config
+ removeGlobalConfig(inputConfigList);
+ removeGlobalConfig(filters);
+ // write config into s3 file
+ String s3Key = getComponentConfigFileName(type);
+ Map<String, Object> config = new HashMap<String, Object>();
+ config.put("filter", filters);
+ config.put("input", inputConfigList);
+ writeConfigToS3(config, bucketName, s3AccessKey, s3SecretKey, contextParam,
+ s3Key);
+ // write global config
+ writeGlobalConfig();
+
+ }
+
+ /**
+ *
+ * @param filters
+ * @param filter
+ */
+ public void addFilters(ArrayList<Map<String, Object>> filters, Filter filter) {
+ if (filter != null) {
+ Map<String, Object> filterConfig = new HashMap<String, Object>();
+ filterConfig.putAll(filter.getConfigs());
+ filters.add(filterConfig);
+ if (filter.getNextFilter() != null) {
+ addFilters(filters, filter.getNextFilter());
+ }
+ }
+ }
+
+ /**
+ *
+ * @param filters
+ * @param inputConfig
+ * @param bucketName
+ * @param componentName
+ */
+ public void writeConfigToS3(Map<String, Object> config, String bucketName,
+ String accessKey, String secretKey, HashMap<String, String> contextParam,
+ String s3Key) {
+ String s3ConfigDir = getStringValue("s3_config_dir");
+ s3ConfigDir = PlaceholderUtil.replaceVariables(s3ConfigDir, contextParam);
+ Gson gson = new GsonBuilder().setPrettyPrinting().create();
+ String configJson = gson.toJson(config);
+ // write json to s3 file
+ s3Key = s3ConfigDir + S3Util.INSTANCE.S3_PATH_SEPARATOR + s3Key;
+ S3Util.INSTANCE.writeIntoS3File(configJson, bucketName, s3Key, accessKey,
+ secretKey);
+ }
+
+ /**
+ *
+ * @param componentName
+ * @return String
+ */
+ public String getComponentConfigFileName(String componentName) {
+ String fileName = "input.config-" + componentName + ".json";
+ return fileName;
+ }
+
+ public HashMap<String, String> buildContextParam() {
+ HashMap<String, String> contextParam = new HashMap<String, String>();
+ contextParam.put("host", LogFeederUtil.hostName);
+ contextParam.put("ip", LogFeederUtil.ipAddress);
+ String cluster = getNVList("add_fields").get("cluster");
+ contextParam.put("cluster", cluster);
+ return contextParam;
+ }
+
+
+ private Map<String, Object> getGlobalConfig() {
+ Map<String, Object> globalConfig = LogFeeder.globalMap;
+ if (globalConfig == null) {
+ globalConfig = new HashMap<String, Object>();
+ }
+ return globalConfig;
+ }
+
+ private void removeGlobalConfig(List<Map<String, Object>> configList) {
+ Map<String, Object> globalConfig = getGlobalConfig();
+ if (configList != null && globalConfig != null) {
+ for (Entry<String, Object> globalConfigEntry : globalConfig.entrySet()) {
+ if (globalConfigEntry != null) {
+ String globalKey = globalConfigEntry.getKey();
+ if (globalKey != null && !globalKey.trim().isEmpty()) {
+ for (Map<String, Object> config : configList) {
+ if (config != null) {
+ config.remove(globalKey);
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * write global config in s3 file Invoke only once
+ */
+ @SuppressWarnings("unchecked")
+ private synchronized void writeGlobalConfig() {
+ if (!uploadedGlobalConfig) {
+ Map<String, Object> globalConfig = LogFeederUtil.cloneObject(getGlobalConfig());
+ //updating global config before write to s3
+ globalConfig.put("source", "s3_file");
+ globalConfig.put("copy_file", false);
+ globalConfig.put("process_file", true);
+ globalConfig.put("tail", false);
+ Map<String, Object> addFields = (Map<String, Object>) globalConfig
+ .get("add_fields");
+ if (addFields == null) {
+ addFields = new HashMap<String, Object>();
+ }
+ addFields.put("ip", LogFeederUtil.ipAddress);
+ addFields.put("host", LogFeederUtil.hostName);
+ // add bundle id same as cluster if its not there
+ String bundle_id = (String) addFields.get("bundle_id");
+ if (bundle_id == null || bundle_id.isEmpty()) {
+ String cluster = (String) addFields.get("cluster");
+ if (cluster != null && !cluster.isEmpty()) {
+ addFields.put("bundle_id", bundle_id);
+ }
+ }
+ globalConfig.put("add_fields", addFields);
+ Map<String, Object> config = new HashMap<String, Object>();
+ config.put("global", globalConfig);
+ String s3AccessKey = getStringValue("s3_access_key");
+ String s3SecretKey = getStringValue("s3_secret_key");
+ String bucketName = getStringValue("s3_bucket");
+ String s3Key = "global.config.json";
+ HashMap<String, String> contextParam = buildContextParam();
+ writeConfigToS3(config, bucketName, s3AccessKey, s3SecretKey,
+ contextParam, s3Key);
+ uploadedGlobalConfig = true;
+ }
+ }
+
+ @Override
+ public void write(String block, InputMarker inputMarker) throws Exception {
+ throw new UnsupportedOperationException(
+ "write method is not yet supported for output=s3_file");
+ }
+}
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java
index 14b2093e36..6e3248b2ab 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java
@@ -19,6 +19,7 @@
package org.apache.ambari.logfeeder.output;
+import java.io.File;
import java.io.IOException;
import java.net.MalformedURLException;
import java.util.ArrayList;
@@ -454,4 +455,17 @@ public class OutputSolr extends Output {
return localBuffer.isEmpty();
}
}
+
+ @Override
+ public void write(String block, InputMarker inputMarker) throws Exception {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void copyFile(File inputFile, InputMarker inputMarker)
+ throws UnsupportedOperationException {
+ throw new UnsupportedOperationException(
+ "copyFile method is not yet supported for output=solr");
+ }
}
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/s3/AWSUtil.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/s3/AWSUtil.java
new file mode 100644
index 0000000000..050b69b284
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/s3/AWSUtil.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.s3;
+
+import org.apache.log4j.Logger;
+
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.identitymanagement.AmazonIdentityManagementClient;
+
+public enum AWSUtil {
+ INSTANCE;
+ private static final Logger LOG = Logger.getLogger(AWSUtil.class);
+
+ /**
+ * Get aws username
+ *
+ * @param accessKey
+ * @param secretKey
+ * @return String
+ */
+ public String getAwsUserName(String accessKey, String secretKey) {
+ String username = null;
+ AWSCredentials awsCredentials = createAWSCredentials(accessKey, secretKey);
+ AmazonIdentityManagementClient amazonIdentityManagementClient;
+ if (awsCredentials != null) {
+ amazonIdentityManagementClient = new AmazonIdentityManagementClient(
+ awsCredentials);
+ } else {
+ // create default client
+ amazonIdentityManagementClient = new AmazonIdentityManagementClient();
+ }
+ try {
+ username = amazonIdentityManagementClient.getUser().getUser()
+ .getUserName();
+ } catch (AmazonServiceException e) {
+ if (e.getErrorCode().compareTo("AccessDenied") == 0) {
+ String arn = null;
+ String msg = e.getMessage();
+ int arnIdx = msg.indexOf("arn:aws");
+ if (arnIdx != -1) {
+ int arnSpace = msg.indexOf(" ", arnIdx);
+ // should be similar to "arn:aws:iam::111111111111:user/username"
+ arn = msg.substring(arnIdx, arnSpace);
+ }
+ if (arn != null) {
+ String[] arnParts = arn.split(":");
+ if (arnParts != null && arnParts.length > 5) {
+ username = arnParts[5];
+ if (username != null) {
+ username = username.replace("user/", "");
+ }
+ }
+ }
+ }
+ } catch (Exception exception) {
+ LOG.error(
+ "Error in getting username :" + exception.getLocalizedMessage(),
+ exception.getCause());
+ }
+ return username;
+ }
+
+ public AWSCredentials createAWSCredentials(String accessKey, String secretKey) {
+ if (accessKey != null && secretKey != null) {
+ LOG.debug("Creating aws client as per new accesskey and secretkey");
+ AWSCredentials awsCredentials = new BasicAWSCredentials(accessKey,
+ secretKey);
+ return awsCredentials;
+ } else {
+ // retrun null
+ return null;
+ }
+ }
+}
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/s3/S3Util.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/s3/S3Util.java
new file mode 100644
index 0000000000..f49837c553
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/s3/S3Util.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.s3;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.zip.GZIPInputStream;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.log4j.Logger;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.identitymanagement.AmazonIdentityManagementClient;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.GetObjectRequest;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.PutObjectRequest;
+import com.amazonaws.services.s3.model.S3Object;
+import com.amazonaws.services.s3.model.S3ObjectInputStream;
+import com.amazonaws.services.s3.transfer.TransferManager;
+import com.amazonaws.services.s3.transfer.Upload;
+import com.amazonaws.services.s3.transfer.model.UploadResult;
+
+/**
+ * Utility to connect to s3
+ *
+ */
+public enum S3Util {
+ INSTANCE;
+
+ private static final Logger LOG = Logger.getLogger(S3Util.class);
+
+ public final String S3_PATH_START_WITH = "s3://";
+ public final String S3_PATH_SEPARATOR = "/";
+
+ /**
+ * get s3 client
+ *
+ * @return AmazonS3
+ */
+ public AmazonS3 getS3Client(String accessKey, String secretKey) {
+ AWSCredentials awsCredentials = AWSUtil.INSTANCE.createAWSCredentials(
+ accessKey, secretKey);
+ AmazonS3 s3client;
+ if (awsCredentials != null) {
+ s3client = new AmazonS3Client(awsCredentials);
+ } else {
+ s3client = new AmazonS3Client();
+ }
+ return s3client;
+ }
+
+ /**
+ *
+ * @return TransferManager
+ */
+ public TransferManager getTransferManager(String accessKey, String secretKey) {
+ AWSCredentials awsCredentials = AWSUtil.INSTANCE.createAWSCredentials(
+ accessKey, secretKey);
+ TransferManager transferManager;
+ if (awsCredentials != null) {
+ transferManager = new TransferManager(awsCredentials);
+ } else {
+ transferManager = new TransferManager();
+ }
+ return transferManager;
+ }
+
+ /**
+ * shutdown s3 transfer manager
+ */
+ public void shutdownTransferManager(TransferManager transferManager) {
+ if (transferManager != null) {
+ transferManager.shutdownNow();
+ }
+ }
+
+ /**
+ * Extract bucket name from s3 file complete path
+ *
+ * @param s3Path
+ * @return String
+ */
+ public String getBucketName(String s3Path) {
+ String bucketName = null;
+ // s3path
+ if (s3Path != null) {
+ String[] s3PathParts = s3Path.replace(S3_PATH_START_WITH, "").split(
+ S3_PATH_SEPARATOR);
+ bucketName = s3PathParts[0];
+ }
+ return bucketName;
+ }
+
+ /**
+ * get s3 key from s3Path after removing bucketname
+ *
+ * @param s3Path
+ * @return String
+ */
+ public String getS3Key(String s3Path) {
+ StringBuilder s3Key = new StringBuilder();
+ // s3path
+ if (s3Path != null) {
+ String[] s3PathParts = s3Path.replace(S3_PATH_START_WITH, "").split(
+ S3_PATH_SEPARATOR);
+ ArrayList<String> s3PathList = new ArrayList<String>(
+ Arrays.asList(s3PathParts));
+ s3PathList.remove(0);// remove bucketName
+ for (int index = 0; index < s3PathList.size(); index++) {
+ if (index > 0) {
+ s3Key.append(S3_PATH_SEPARATOR);
+ }
+ s3Key.append(s3PathList.get(index));
+ }
+ }
+ return s3Key.toString();
+ }
+
+ /**
+ *
+ * @param bucketName
+ * @param s3Key
+ * @param localFile
+ */
+ public void uploadFileTos3(String bucketName, String s3Key, File localFile,
+ String accessKey, String secretKey) {
+ TransferManager transferManager = getTransferManager(accessKey, secretKey);
+ try {
+ Upload upload = transferManager.upload(bucketName, s3Key, localFile);
+ UploadResult uploadResult = upload.waitForUploadResult();
+ } catch (AmazonClientException | InterruptedException e) {
+ LOG.error("s3 uploading failed for file :" + localFile.getAbsolutePath(),
+ e);
+ } finally {
+ shutdownTransferManager(transferManager);
+ }
+ }
+
+ /**
+ * Get the buffer reader to read s3 file as a stream
+ *
+ * @param s3Path
+ * @return BufferedReader
+ * @throws IOException
+ */
+ public BufferedReader getReader(String s3Path, String accessKey,
+ String secretKey) throws IOException {
+ // TODO error handling
+ // Compression support
+ // read header and decide the compression(auto detection)
+ // For now hard-code GZIP compression
+ String s3Bucket = getBucketName(s3Path);
+ String s3Key = getS3Key(s3Path);
+ S3Object fileObj = getS3Client(accessKey, secretKey).getObject(
+ new GetObjectRequest(s3Bucket, s3Key));
+ GZIPInputStream objectInputStream;
+ try {
+ objectInputStream = new GZIPInputStream(fileObj.getObjectContent());
+ BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(
+ objectInputStream));
+ return bufferedReader;
+ } catch (IOException e) {
+ LOG.error("Error in creating stream reader for s3 file :" + s3Path,
+ e.getCause());
+ throw e;
+ }
+ }
+
+ /**
+ *
+ * @param data
+ * @param bucketName
+ * @param s3Key
+ */
+ public void writeIntoS3File(String data, String bucketName, String s3Key,
+ String accessKey, String secretKey) {
+ InputStream in = null;
+ try {
+ in = IOUtils.toInputStream(data, "UTF-8");
+ } catch (IOException e) {
+ LOG.error(e);
+ }
+ if (in != null) {
+ TransferManager transferManager = getTransferManager(accessKey, secretKey);
+ try {
+ if (transferManager != null) {
+ UploadResult uploadResult = transferManager
+ .upload(
+ new PutObjectRequest(bucketName, s3Key, in,
+ new ObjectMetadata())).waitForUploadResult();
+ LOG.debug("Data Uploaded to s3 file :" + s3Key + " in bucket :"
+ + bucketName);
+ }
+ } catch (AmazonClientException | InterruptedException e) {
+ LOG.error(e);
+ } finally {
+ try {
+ shutdownTransferManager(transferManager);
+ in.close();
+ } catch (IOException e) {
+ // ignore
+ }
+ }
+ }
+ }
+
+}
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/CompressionUtil.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/CompressionUtil.java
new file mode 100644
index 0000000000..54008ec335
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/CompressionUtil.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.util;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import org.apache.commons.compress.compressors.CompressorOutputStream;
+import org.apache.commons.compress.compressors.CompressorStreamFactory;
+import org.apache.commons.compress.utils.IOUtils;
+import org.apache.log4j.Logger;
+
+public class CompressionUtil {
+
+ private static final Logger LOG = Logger.getLogger(CompressionUtil.class);
+
+ /**
+ * Compress file
+ *
+ * @param inputFile
+ * @param outputFile
+ * @param algoName
+ */
+ public static File compressFile(File inputFile, File outputFile, String algoName) {
+ CompressorOutputStream cos = null;
+ FileInputStream ios = null;
+ try {
+ if (!inputFile.exists()) {
+ throw new IllegalArgumentException("Input File:"
+ + inputFile.getAbsolutePath() + " is not exist.");
+ }
+ if (inputFile.isDirectory()) {
+ throw new IllegalArgumentException("Input File:"
+ + inputFile.getAbsolutePath() + " is a directory.");
+ }
+ File parent = outputFile.getParentFile();
+ if (parent != null && !parent.exists()) {
+ boolean isParentCreated = parent.mkdirs();
+ if (!isParentCreated) {
+ throw new IllegalAccessException(
+ "User does not have permission to create parent directory :"
+ + parent.getAbsolutePath());
+ }
+ }
+ final OutputStream out = new FileOutputStream(outputFile);
+ cos = new CompressorStreamFactory().createCompressorOutputStream(
+ algoName, out);
+ ios = new FileInputStream(inputFile);
+ IOUtils.copy(ios, cos);
+ } catch (Exception e) {
+ LOG.error(e);
+ } finally {
+ // Close the stream
+ if (cos != null) {
+ try {
+ cos.close();
+ } catch (IOException e) {
+ LOG.error(e);
+ }
+ }
+ if (ios != null) {
+ try {
+ ios.close();
+ } catch (IOException e) {
+ LOG.error(e);
+ }
+ }
+ }
+ return outputFile;
+ }
+}
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/FileUtil.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/FileUtil.java
new file mode 100644
index 0000000000..ec26a880e9
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/FileUtil.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logfeeder.util;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.log4j.Logger;
+
+public class FileUtil {
+ private static final Logger logger = Logger.getLogger(FileUtil.class);
+
+ public static List<File> getAllFileFromDir(File directory,
+ String[] searchFileWithExtensions, boolean checkInSubDir) {
+ if (!directory.exists()) {
+ logger.error(directory.getAbsolutePath() + " is not exists ");
+ } else if (directory.isDirectory()) {
+ return (List<File>) FileUtils.listFiles(directory,
+ searchFileWithExtensions, checkInSubDir);
+ } else {
+ logger.error(directory.getAbsolutePath() + " is not Directory ");
+ }
+ return new ArrayList<File>();
+ }
+}
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/PlaceholderUtil.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/PlaceholderUtil.java
new file mode 100644
index 0000000000..9be85ee173
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/PlaceholderUtil.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.logfeeder.util;
+
+import java.util.HashMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.log4j.Logger;
+
+public class PlaceholderUtil {
+
+ private static Logger LOG = Logger.getLogger(PlaceholderUtil.class);
+
+ private static Pattern placeHolderPattern;
+ static {
+// placeHolderPattern = Pattern.compile("\\{(.*?)\\}");
+ placeHolderPattern = Pattern.compile("\\$\\s*(\\w+)");
+ }
+
+ /**
+ *
+ * @param inputStr
+ * @param contextParam
+ * @return String
+ */
+ public static String replaceVariables(String inputStr,
+ HashMap<String, String> contextParam) {
+ Matcher m = placeHolderPattern.matcher(inputStr);
+ String placeholder;
+ String replacement;
+ String output = new String(inputStr);
+ while (m.find()) {
+ placeholder = m.group();
+ if (placeholder != null && !placeholder.isEmpty()) {
+ String key = placeholder.replace("$","").toLowerCase();// remove
+ // brace
+ replacement = getFromContext(contextParam, placeholder, key);
+ output = output.replace(placeholder, replacement);
+ }
+ }
+ return output;
+ }
+
+ /**
+ *
+ * @param contextParam
+ * @param defaultValue
+ * @param key
+ * @return String
+ */
+ private static String getFromContext(HashMap<String, String> contextParam,
+ String defaultValue, String key) {
+ String returnValue = defaultValue;// by default set default value as a
+ // return
+ if (contextParam != null) {
+ String value = contextParam.get(key);
+ if (value != null && !value.trim().isEmpty()) {
+ returnValue = value;
+ }
+ }
+ return returnValue;
+ }
+}
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/alias_config.json b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/alias_config.json
index a55b348ff5..58bcdae9fb 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/alias_config.json
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/alias_config.json
@@ -2,6 +2,9 @@
"input": {
"file": {
"klass": "org.apache.ambari.logfeeder.input.InputFile"
+ },
+ "s3_file": {
+ "klass": "org.apache.ambari.logfeeder.input.InputS3File"
}
},
@@ -40,6 +43,9 @@
},
"dev_null": {
"klass": "org.apache.ambari.logfeeder.output.OutputDevNull"
- }
+ },
+ "s3_file": {
+ "klass": "org.apache.ambari.logfeeder.output.OutputS3File"
+ }
}
} \ No newline at end of file
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/logfeeder.properties b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/logfeeder.properties
index 076c09c643..fc72ce08f2 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/logfeeder.properties
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/logfeeder.properties
@@ -22,3 +22,6 @@ logfeeder.solr.config.interval=5
logfeeder.solr.core.history=history
logfeeder.solr.zkhosts=
logfeeder.solr.url=
+
+#logfeeder tmp dir
+logfeeder.tmp.dir=/tmp/$username/logfeeder/
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/s3/AWSUtilTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/s3/AWSUtilTest.java
new file mode 100644
index 0000000000..1e2be37d55
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/s3/AWSUtilTest.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.s3;
+
+public class AWSUtilTest {
+// @Test
+ public void testAWSUtil_getAwsUserName() throws Exception {
+ String S3_ACCESS_KEY = "S3_ACCESS_KEY";
+ String S3_SECRET_KEY = "S3_SECRET_KEY";
+ String expectedUsername = "";
+ String username = AWSUtil.INSTANCE.getAwsUserName(S3_ACCESS_KEY,
+ S3_SECRET_KEY);
+ }
+}
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/s3/S3UtilTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/s3/S3UtilTest.java
new file mode 100644
index 0000000000..d07ae2b823
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/s3/S3UtilTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.s3;
+
+import org.apache.log4j.Logger;
+import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+
+public class S3UtilTest {
+ private static final Logger LOG = Logger.getLogger(S3UtilTest.class);
+
+ // @Test
+ public void testS3Util_pathToBucketName() throws Exception {
+ String s3Path = "s3://bucket_name/path/file.txt";
+ String expectedBucketName = "bucket_name";
+ String actualBucketName = S3Util.INSTANCE.getBucketName(s3Path);
+ assertEquals(expectedBucketName, actualBucketName);
+ }
+
+ // @Test
+ public void testS3Util_pathToS3Key() throws Exception {
+ String s3Path = "s3://bucket_name/path/file.txt";
+ String expectedS3key = "path/file.txt";
+ String actualS3key = S3Util.INSTANCE.getS3Key(s3Path);
+ assertEquals(expectedS3key, actualS3key);
+ }
+
+}
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/util/PlaceholderUtilTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/util/PlaceholderUtilTest.java
new file mode 100644
index 0000000000..373a52fc95
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/util/PlaceholderUtilTest.java
@@ -0,0 +1,48 @@
+package org.apache.ambari.logfeeder.util;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.util.HashMap;
+
+import org.apache.log4j.Logger;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class PlaceholderUtilTest {
+
+ private static final Logger LOG = Logger.getLogger(PlaceholderUtilTest.class);
+
+ @Test
+ public void testPlaceholderUtil_replaceVariables() {
+ HashMap<String, String> contextParam = new HashMap<String, String>();
+ String hostName = "host1";
+ String ip = "127.0.0.1";
+ String clusterName = "test-cluster";
+ contextParam.put("host", hostName);
+ contextParam.put("ip", ip);
+ contextParam.put("cluster", clusterName);
+ String inputStr = "$CLUSTER/logfeeder/$HOST-$IP/logs";
+ String resultStr = PlaceholderUtil.replaceVariables(inputStr, contextParam);
+ String expectedStr = clusterName + "/logfeeder/" + hostName + "-" + ip + "/logs";
+ assertEquals("Result string :" + resultStr
+ + " is not equal to exptected string :" + expectedStr, resultStr,
+ expectedStr);
+ }
+
+}