diff options
author | oleewere <oleewere@gmail.com> | 2016-06-20 18:29:55 +0200 |
---|---|---|
committer | oleewere <oleewere@gmail.com> | 2016-06-20 18:36:34 +0200 |
commit | 94716ff18c5df31da6a401921e8bbb6de294471a (patch) | |
tree | 79f37a79b18b5c1fba9e7e9c3799ede3d2972a07 /ambari-logsearch/ambari-logsearch-logfeeder | |
parent | 6b33a6c27d4e835d586a05cd22e34701f9e41a3b (diff) |
AMBARI-17045. Support loading logs to S3 (Hayat Behlim via oleewere)
Diffstat (limited to 'ambari-logsearch/ambari-logsearch-logfeeder')
25 files changed, 1697 insertions, 77 deletions
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml b/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml index 5c30610edb..c7202c98cc 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml +++ b/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml @@ -120,7 +120,22 @@ <artifactId>ambari-metrics-common</artifactId> <version>${project.version}</version> </dependency> - </dependencies> + <dependency> + <groupId>com.amazonaws</groupId> + <artifactId>aws-java-sdk-s3</artifactId> + <version>1.11.5</version> + </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-compress</artifactId> + <version>1.11</version> + </dependency> + <dependency> + <groupId>com.amazonaws</groupId> + <artifactId>aws-java-sdk-iam</artifactId> + <version>1.11.5</version> + </dependency> + </dependencies> <build> <finalName>LogFeeder</finalName> <pluginManagement> diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/ConfigBlock.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/ConfigBlock.java index 6b78e2addb..521319e647 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/ConfigBlock.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/ConfigBlock.java @@ -22,11 +22,11 @@ package org.apache.ambari.logfeeder; import java.util.HashMap; import java.util.List; import java.util.Map; - import org.apache.commons.lang3.StringUtils; import org.apache.log4j.Logger; import org.apache.log4j.Priority; + public abstract class ConfigBlock { static private Logger logger = Logger.getLogger(ConfigBlock.class); @@ -258,5 +258,4 @@ public abstract class ConfigBlock { public void setDrain(boolean drain) { this.drain = drain; } - } diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/InputMgr.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/InputMgr.java index 445c294191..4359c78ca1 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/InputMgr.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/InputMgr.java @@ -65,6 +65,8 @@ public class InputMgr { MetricCount filesCountMetric = new MetricCount(); private String checkPointExtension = ".cp"; + + private Thread inputIsReadyMonitor = null; public List<Input> getInputList() { return inputList; @@ -224,7 +226,7 @@ public class InputMgr { } // Start the monitoring thread if any file is in tail mode if (isAnyInputTail) { - Thread monitorThread = new Thread("InputIsReadyMonitor") { + inputIsReadyMonitor = new Thread("InputIsReadyMonitor") { @Override public void run() { logger.info("Going to monitor for these missing files: " @@ -255,7 +257,7 @@ public class InputMgr { } } }; - monitorThread.start(); + inputIsReadyMonitor.start(); } } @@ -542,4 +544,33 @@ public class InputMgr { } + /** + * + */ + public void waitOnAllInputs() { + //wait on inputs + if (inputList != null) { + for (Input input : inputList) { + if (input != null) { + Thread inputThread = input.getThread(); + if (inputThread != null) { + try { + inputThread.join(); + } catch (InterruptedException e) { + // ignore + } + } + } + } + } + // wait on monitor + if (inputIsReadyMonitor != null) { + try { + this.close(); + inputIsReadyMonitor.join(); + } catch (InterruptedException e) { + // ignore + } + } + } } diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java index f3dd4bf5f8..166c0f39e0 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java @@ -42,6 +42,7 @@ import org.apache.ambari.logfeeder.filter.Filter; import org.apache.ambari.logfeeder.input.Input; import org.apache.ambari.logfeeder.logconfig.LogfeederScheduler; import org.apache.ambari.logfeeder.output.Output; +import org.apache.ambari.logfeeder.util.FileUtil; import org.apache.log4j.Level; import org.apache.log4j.Logger; @@ -57,16 +58,20 @@ public class LogFeeder { InputMgr inputMgr = new InputMgr(); MetricsMgr metricsMgr = new MetricsMgr(); - Map<String, Object> globalMap = null; + public static Map<String, Object> globalMap = null; String[] inputParams; List<Map<String, Object>> globalConfigList = new ArrayList<Map<String, Object>>(); List<Map<String, Object>> inputConfigList = new ArrayList<Map<String, Object>>(); List<Map<String, Object>> filterConfigList = new ArrayList<Map<String, Object>>(); List<Map<String, Object>> outputConfigList = new ArrayList<Map<String, Object>>(); - + int checkPointCleanIntervalMS = 24 * 60 * 60 * 60 * 1000; // 24 hours long lastCheckPointCleanedMS = 0; + + private static boolean isLogfeederCompleted = false; + + private Thread statLoggerThread = null; public LogFeeder(String[] args) { inputParams = args; @@ -80,14 +85,26 @@ public class LogFeeder { // loop the properties and load them // Load the configs String configFiles = LogFeederUtil.getStringProperty("logfeeder.config.files"); - if (configFiles == null) { - configFiles = LogFeederUtil.getStringProperty("config.file", - "config.json"); - } logger.info("logfeeder.config.files=" + configFiles); - String[] configFileList = configFiles.split(","); - for (String configFileName : configFileList) { + + String[] configFileList = null; + if (configFiles != null) { + configFileList = configFiles.split(","); + } + //list of config those are there in cmd line config dir , end with .json + String[] cmdLineConfigs = getConfigFromCmdLine(); + //merge both config + String mergedConfigList[] = LogFeederUtil.mergeArray(configFileList, + cmdLineConfigs); + //mergedConfigList is null then set default conifg + if (mergedConfigList == null || mergedConfigList.length == 0) { + mergedConfigList = LogFeederUtil.getStringProperty("config.file", + "config.json").split(","); + } + for (String configFileName : mergedConfigList) { logger.info("Going to load config file:" + configFileName); + //escape space from config file path + configFileName= configFileName.replace("\\ ", "%20"); File configFile = new File(configFileName); if (configFile.exists() && configFile.isFile()) { logger.info("Config file exists in path." @@ -97,7 +114,7 @@ public class LogFeeder { // Let's try to load it from class loader logger.info("Trying to load config file from classloader: " + configFileName); - laodConfigsUsingClassLoader(configFileName); + loadConfigsUsingClassLoader(configFileName); logger.info("Loaded config file from classloader: " + configFileName); } @@ -114,7 +131,7 @@ public class LogFeeder { logger.debug("=============="); } - void laodConfigsUsingClassLoader(String configFileName) throws Exception { + void loadConfigsUsingClassLoader(String configFileName) throws Exception { BufferedInputStream fileInputStream = (BufferedInputStream) this .getClass().getClassLoader() .getResourceAsStream(configFileName); @@ -451,7 +468,7 @@ public class LogFeeder { inputMgr.monitor(); Runtime.getRuntime().addShutdownHook(new JVMShutdownHook()); - Thread statLogger = new Thread("statLogger") { + statLoggerThread = new Thread("statLogger") { @Override public void run() { @@ -473,12 +490,17 @@ public class LogFeeder { lastCheckPointCleanedMS = System.currentTimeMillis(); inputMgr.cleanCheckPointFiles(); } + + // logfeeder is stopped then break the loop + if (isLogfeederCompleted) { + break; + } } } }; - statLogger.setDaemon(true); - statLogger.start(); + statLoggerThread.setDaemon(true); + statLoggerThread.start(); } @@ -524,24 +546,25 @@ public class LogFeeder { public static void main(String[] args) { LogFeeder logFeeder = new LogFeeder(args); - logFeeder.run(logFeeder); + logFeeder.run(); } public static void run(String[] args) { LogFeeder logFeeder = new LogFeeder(args); - logFeeder.run(logFeeder); + logFeeder.run(); } - public void run(LogFeeder logFeeder) { + public void run() { try { Date startTime = new Date(); - logFeeder.init(); + this.init(); Date endTime = new Date(); logger.info("Took " + (endTime.getTime() - startTime.getTime()) + " ms to initialize"); - logFeeder.monitor(); - + this.monitor(); + //wait for all background thread before stop main thread + this.waitOnAllDaemonThreads(); } catch (Throwable t) { logger.fatal("Caught exception in main.", t); System.exit(1); @@ -566,5 +589,42 @@ public class LogFeeder { } } } - + + public void waitOnAllDaemonThreads() { + String foreground = LogFeederUtil.getStringProperty("foreground"); + if (foreground != null && foreground.equalsIgnoreCase("true")) { + // wait on inputmgr daemon threads + inputMgr.waitOnAllInputs(); + // set isLogfeederCompleted to true to stop statLoggerThread + isLogfeederCompleted = true; + if (statLoggerThread != null) { + try { + statLoggerThread.join(); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + } + } + + private String[] getConfigFromCmdLine() { + String inputConfigDir = LogFeederUtil.getStringProperty("input_config_dir"); + if (inputConfigDir != null && !inputConfigDir.isEmpty()) { + String[] searchFileWithExtensions = new String[] { "json" }; + File configDirFile = new File(inputConfigDir); + List<File> configFiles = FileUtil.getAllFileFromDir(configDirFile, + searchFileWithExtensions, false); + if (configFiles != null && configFiles.size() > 0) { + String configPaths[] = new String[configFiles.size()]; + for (int index = 0; index < configFiles.size(); index++) { + File configFile = configFiles.get(index); + String configFilePath = configFile.getAbsolutePath(); + configPaths[index] = configFilePath; + } + return configPaths; + } + } + return new String[0]; + } } diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeederUtil.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeederUtil.java index 7a30d724f7..7a68b4d4a2 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeederUtil.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeederUtil.java @@ -24,7 +24,9 @@ import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.lang.reflect.Type; +import java.net.InetAddress; import java.net.URL; +import java.net.UnknownHostException; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.Date; @@ -40,6 +42,7 @@ import org.apache.ambari.logfeeder.input.Input; import org.apache.ambari.logfeeder.logconfig.LogFeederConstants; import org.apache.ambari.logfeeder.mapper.Mapper; import org.apache.ambari.logfeeder.output.Output; +import org.apache.ambari.logfeeder.util.PlaceholderUtil; import org.apache.commons.lang3.StringUtils; import org.apache.log4j.Level; import org.apache.log4j.Logger; @@ -48,6 +51,7 @@ import org.codehaus.jackson.map.JsonMappingException; import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.type.TypeReference; +import com.google.common.collect.ObjectArrays; import com.google.gson.Gson; import com.google.gson.GsonBuilder; import com.google.gson.reflect.TypeToken; @@ -68,6 +72,18 @@ public class LogFeederUtil { private static Map<String, LogHistory> logHistoryList = new Hashtable<String, LogHistory>(); private static int logInterval = 30000; // 30 seconds + public static String hostName = null; + public static String ipAddress = null; + + public static String logfeederTempDir = null; + + public static final Object _LOCK = new Object(); + + static{ + //set hostname and hostIp + setHostNameAndIP(); + } + public static Gson getGson() { return gson; } @@ -483,5 +499,57 @@ public class LogFeederUtil { } return false; } + + + public static synchronized String setHostNameAndIP() { + if (hostName == null || ipAddress == null) { + try { + InetAddress ip = InetAddress.getLocalHost(); + ipAddress = ip.getHostAddress(); + String getHostName = ip.getHostName(); + String getCanonicalHostName = ip.getCanonicalHostName(); + if (!getCanonicalHostName.equalsIgnoreCase(ipAddress)) { + logger.info("Using getCanonicalHostName()=" + getCanonicalHostName); + hostName = getCanonicalHostName; + } else { + logger.info("Using getHostName()=" + getHostName); + hostName = getHostName; + } + logger.info("ipAddress=" + ipAddress + ", getHostName=" + getHostName + + ", getCanonicalHostName=" + getCanonicalHostName + ", hostName=" + + hostName); + } catch (UnknownHostException e) { + logger.error("Error getting hostname.", e); + } + } + return hostName; + } + public static String[] mergeArray(String[] first, String[] second) { + if (first == null) { + first = new String[0]; + } + if (second == null) { + second = new String[0]; + } + String[] mergedArray = ObjectArrays.concat(first, second, String.class); + return mergedArray; + } + + public static String getLogfeederTempDir() { + if (logfeederTempDir == null) { + synchronized (_LOCK) { + if (logfeederTempDir == null) { + String tempDirValue = getStringProperty("logfeeder.tmp.dir", + "/tmp/$username/logfeeder/"); + HashMap<String, String> contextParam = new HashMap<String, String>(); + String username = System.getProperty("user.name"); + contextParam.put("username", username); + logfeederTempDir = PlaceholderUtil.replaceVariables(tempDirValue, + contextParam); + } + } + } + return logfeederTempDir; + } } diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/OutputMgr.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/OutputMgr.java index 841387845d..f84457e45d 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/OutputMgr.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/OutputMgr.java @@ -19,8 +19,7 @@ package org.apache.ambari.logfeeder; -import java.net.InetAddress; -import java.net.UnknownHostException; +import java.io.File; import java.util.ArrayList; import java.util.Collection; import java.util.Date; @@ -40,34 +39,13 @@ public class OutputMgr { Collection<Output> outputList = new ArrayList<Output>(); - String hostName = null; - String ipAddress = null; boolean addMessageMD5 = true; private int MAX_OUTPUT_SIZE = 32765; // 32766-1 static long doc_counter = 0; public MetricCount messageTruncateMetric = new MetricCount(); - public OutputMgr() { - // Set the host for this server - try { - InetAddress ip = InetAddress.getLocalHost(); - ipAddress = ip.getHostAddress(); - String getHostName = ip.getHostName(); - String getCanonicalHostName = ip.getCanonicalHostName(); - if (!getCanonicalHostName.equalsIgnoreCase(ipAddress)) { - logger.info("Using getCanonicalHostName()=" + getCanonicalHostName); - hostName = getCanonicalHostName; - } else { - logger.info("Using getHostName()=" + getHostName); - hostName = getHostName; - } - logger.info("ipAddress=" + ipAddress + ", getHostName=" + getHostName + ", getCanonicalHostName=" + getCanonicalHostName + ", hostName=" + hostName); - } catch (UnknownHostException e) { - logger.error("Error getting hostname.", e); - } - } - + public Collection<Output> getOutputList() { return outputList; } @@ -106,12 +84,12 @@ public class OutputMgr { } // Add host if required - if (jsonObj.get("host") == null && hostName != null) { - jsonObj.put("host", hostName); + if (jsonObj.get("host") == null && LogFeederUtil.hostName != null) { + jsonObj.put("host", LogFeederUtil.hostName); } // Add IP if required - if (jsonObj.get("ip") == null && ipAddress != null) { - jsonObj.put("ip", ipAddress); + if (jsonObj.get("ip") == null && LogFeederUtil.ipAddress != null) { + jsonObj.put("ip", LogFeederUtil.ipAddress); } if (input.isUseEventMD5() || input.isGenEventMD5()) { @@ -278,4 +256,16 @@ public class OutputMgr { } } + + public void copyFile(File inputFile, InputMarker inputMarker) { + Input input = inputMarker.input; + for (Output output : input.getOutputList()) { + try { + output.copyFile(inputFile, inputMarker); + }catch (Exception e) { + logger.error("Error coyping file . to " + output.getShortDescription(), + e); + } + } + } } diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java index ec75f2d78e..18e21843cc 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java @@ -317,5 +317,9 @@ public abstract class Input extends ConfigBlock implements Runnable { } metricsList.add(readBytesMetric); } + + public Thread getThread(){ + return thread; + } } diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java index 420610a424..7107a692ed 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java @@ -251,31 +251,36 @@ public class InputFile extends Input { */ @Override void start() throws Exception { + if (logPathFiles == null || logPathFiles.length == 0) { return; } - - if (isTail()) { - // Just process the first file - processFile(logPathFiles[0]); - } else { - for (File file : logPathFiles) { - try { - processFile(file); - if (isClosed() || isDrain()) { - logger.info("isClosed or isDrain. Now breaking loop."); - break; + boolean isProcessFile = getBooleanValue("process_file", true); + if (isProcessFile) { + if (isTail()) { + // Just process the first file + processFile(logPathFiles[0]); + } else { + for (File file : logPathFiles) { + try { + processFile(file); + if (isClosed() || isDrain()) { + logger.info("isClosed or isDrain. Now breaking loop."); + break; + } + } catch (Throwable t) { + logger.error("Error processing file=" + file.getAbsolutePath(), t); } - } catch (Throwable t) { - logger.error( - "Error processing file=" + file.getAbsolutePath(), - t); } } + // Call the close for the input. Which should flush to the filters and + // output + close(); + }else{ + //copy files + copyFiles(logPathFiles); } - // Call the close for the input. Which should flush to the filters and - // output - close(); + } @Override @@ -559,4 +564,24 @@ public class InputFile extends Input { + (logPathFiles != null && logPathFiles.length > 0 ? logPathFiles[0] .getAbsolutePath() : getStringValue("path")); } + + + public void copyFiles(File[] files) { + boolean isCopyFile = getBooleanValue("copy_file", false); + if (isCopyFile && files != null) { + for (File file : files) { + try { + InputMarker marker = new InputMarker(); + marker.input = this; + outputMgr.copyFile(file, marker); + if (isClosed() || isDrain()) { + logger.info("isClosed or isDrain. Now breaking loop."); + break; + } + } catch (Throwable t) { + logger.error("Error processing file=" + file.getAbsolutePath(), t); + } + } + } + } } diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java new file mode 100644 index 0000000000..d68ab96105 --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java @@ -0,0 +1,494 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.logfeeder.input; + +import java.io.BufferedReader; +import java.io.EOFException; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.util.Date; +import java.util.HashMap; +import java.util.Map; + +import org.apache.ambari.logfeeder.LogFeederUtil; +import org.apache.ambari.logfeeder.s3.S3Util; +import org.apache.commons.lang3.StringUtils; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.solr.common.util.Base64; + +public class InputS3File extends Input { + static private Logger logger = Logger.getLogger(InputS3File.class); + + String logPath = null; + boolean isStartFromBegining = true; + + boolean isReady = false; + String[] s3LogPathFiles = null; + Object fileKey = null; + String base64FileKey = null; + + private boolean isRolledOver = false; + boolean addWildCard = false; + + long lastCheckPointTimeMS = 0; + int checkPointIntervalMS = 5 * 1000; // 5 seconds + RandomAccessFile checkPointWriter = null; + Map<String, Object> jsonCheckPoint = null; + + File checkPointFile = null; + + private InputMarker lastCheckPointInputMarker = null; + + private String checkPointExtension = ".cp"; + + + @Override + public void init() throws Exception { + logger.info("init() called"); + statMetric.metricsName = "input.files.read_lines"; + readBytesMetric.metricsName = "input.files.read_bytes"; + checkPointExtension = LogFeederUtil.getStringProperty( + "logfeeder.checkpoint.extension", checkPointExtension); + + // Let's close the file and set it to true after we start monitoring it + setClosed(true); + logPath = getStringValue("path"); + tail = getBooleanValue("tail", tail); + addWildCard = getBooleanValue("add_wild_card", addWildCard); + checkPointIntervalMS = getIntValue("checkpoint.interval.ms", + checkPointIntervalMS); + if (logPath == null || logPath.isEmpty()) { + logger.error("path is empty for file input. " + getShortDescription()); + return; + } + + String startPosition = getStringValue("start_position"); + if (StringUtils.isEmpty(startPosition) + || startPosition.equalsIgnoreCase("beginning") + || startPosition.equalsIgnoreCase("begining")) { + isStartFromBegining = true; + } + + if (!tail) { + // start position end doesn't apply if we are not tailing + isStartFromBegining = true; + } + + setFilePath(logPath); + boolean isFileReady = isReady(); + + logger.info("File to monitor " + logPath + ", tail=" + tail + + ", addWildCard=" + addWildCard + ", isReady=" + isFileReady); + + super.init(); + } + + /* + * (non-Javadoc) + * + * @see org.apache.ambari.logfeeder.input.Input#isReady() + */ + @Override + public boolean isReady() { + if (!isReady) { + // Let's try to check whether the file is available + s3LogPathFiles = getActualFiles(logPath); + if (s3LogPathFiles != null && s3LogPathFiles.length > 0) { + if (isTail() && s3LogPathFiles.length > 1) { + logger.warn("Found multiple files (" + s3LogPathFiles.length + + ") for the file filter " + filePath + + ". Will use only the first one. Using " + s3LogPathFiles[0]); + } + logger.info("File filter " + filePath + " expanded to " + + s3LogPathFiles[0]); + isReady = true; + } else { + logger.debug(logPath + " file doesn't exist. Ignoring for now"); + } + } + return isReady; + } + + private String[] getActualFiles(String searchPath) { + // TODO search file on s3 + return new String[] { searchPath }; + } + + @Override + synchronized public void checkIn(InputMarker inputMarker) { + super.checkIn(inputMarker); + if (checkPointWriter != null) { + try { + int lineNumber = LogFeederUtil.objectToInt( + jsonCheckPoint.get("line_number"), 0, "line_number"); + if (lineNumber > inputMarker.lineNumber) { + // Already wrote higher line number for this input + return; + } + // If interval is greater than last checkPoint time, then write + long currMS = System.currentTimeMillis(); + if (!isClosed() + && (currMS - lastCheckPointTimeMS) < checkPointIntervalMS) { + // Let's save this one so we can update the check point file + // on flush + lastCheckPointInputMarker = inputMarker; + return; + } + lastCheckPointTimeMS = currMS; + + jsonCheckPoint.put("line_number", "" + + new Integer(inputMarker.lineNumber)); + jsonCheckPoint.put("last_write_time_ms", "" + new Long(currMS)); + jsonCheckPoint.put("last_write_time_date", new Date()); + + String jsonStr = LogFeederUtil.getGson().toJson(jsonCheckPoint); + + // Let's rewind + checkPointWriter.seek(0); + checkPointWriter.writeInt(jsonStr.length()); + checkPointWriter.write(jsonStr.getBytes()); + + if (isClosed()) { + final String LOG_MESSAGE_KEY = this.getClass().getSimpleName() + + "_FINAL_CHECKIN"; + LogFeederUtil.logErrorMessageByInterval(LOG_MESSAGE_KEY, + "Wrote final checkPoint, input=" + getShortDescription() + + ", checkPointFile=" + checkPointFile.getAbsolutePath() + + ", checkPoint=" + jsonStr, null, logger, Level.INFO); + } + } catch (Throwable t) { + final String LOG_MESSAGE_KEY = this.getClass().getSimpleName() + + "_CHECKIN_EXCEPTION"; + LogFeederUtil.logErrorMessageByInterval(LOG_MESSAGE_KEY, + "Caught exception checkIn. , input=" + getShortDescription(), t, + logger, Level.ERROR); + } + } + + } + + @Override + public void checkIn() { + super.checkIn(); + if (lastCheckPointInputMarker != null) { + checkIn(lastCheckPointInputMarker); + } + } + + @Override + public void rollOver() { + logger.info("Marking this input file for rollover. " + + getShortDescription()); + isRolledOver = true; + } + + /* + * (non-Javadoc) + * + * @see org.apache.ambari.logfeeder.input.Input#monitor() + */ + @Override + void start() throws Exception { + if (s3LogPathFiles == null || s3LogPathFiles.length == 0) { + return; + } + + if (isTail()) { + // Just process the first file + processFile(s3LogPathFiles[0]); + } else { + for (String s3FilePath : s3LogPathFiles) { + try { + processFile(s3FilePath); + if (isClosed() || isDrain()) { + logger.info("isClosed or isDrain. Now breaking loop."); + break; + } + } catch (Throwable t) { + logger.error("Error processing file=" + s3FilePath, t); + } + } + } + // Call the close for the input. Which should flush to the filters and + // output + close(); + } + + @Override + public void close() { + super.close(); + logger.info("close() calling checkPoint checkIn(). " + + getShortDescription()); + checkIn(); + } + + private void processFile(String logPathFile) throws FileNotFoundException, + IOException { + logger.info("Monitoring logPath=" + logPath + ", logPathFile=" + + logPathFile); + BufferedReader br = null; + checkPointFile = null; + checkPointWriter = null; + jsonCheckPoint = null; + int resumeFromLineNumber = 0; + + int lineCount = 0; + try { + setFilePath(logPathFile); + String s3AccessKey = getStringValue("s3_access_key"); + String s3SecretKey = getStringValue("s3_secret_key"); + br = S3Util.INSTANCE.getReader(logPathFile,s3AccessKey,s3SecretKey); + if(br==null){ + //log err + return; + } + + // Whether to send to output from the beginning. + boolean resume = isStartFromBegining; + + // Seems FileWatch is not reliable, so let's only use file key + // comparison + // inputMgr.monitorSystemFileChanges(this); + fileKey = getFileKey(logPathFile); + base64FileKey = Base64.byteArrayToBase64(fileKey.toString().getBytes()); + logger.info("fileKey=" + fileKey + ", base64=" + base64FileKey + ". " + + getShortDescription()); + + if (isTail()) { + try { + // Let's see if there is a checkpoint for this file + logger.info("Checking existing checkpoint file. " + + getShortDescription()); + + String fileBase64 = Base64.byteArrayToBase64(fileKey.toString() + .getBytes()); + String checkPointFileName = fileBase64 + checkPointExtension; + File checkPointFolder = inputMgr.getCheckPointFolderFile(); + checkPointFile = new File(checkPointFolder, checkPointFileName); + checkPointWriter = new RandomAccessFile(checkPointFile, "rw"); + + try { + int contentSize = checkPointWriter.readInt(); + byte b[] = new byte[contentSize]; + int readSize = checkPointWriter.read(b, 0, contentSize); + if (readSize != contentSize) { + logger + .error("Couldn't read expected number of bytes from checkpoint file. expected=" + + contentSize + + ", read=" + + readSize + + ", checkPointFile=" + + checkPointFile + + ", input=" + + getShortDescription()); + } else { + // Create JSON string + String jsonCheckPointStr = new String(b, 0, readSize); + jsonCheckPoint = LogFeederUtil.toJSONObject(jsonCheckPointStr); + + resumeFromLineNumber = LogFeederUtil.objectToInt( + jsonCheckPoint.get("line_number"), 0, "line_number"); + + if (resumeFromLineNumber > 0) { + // Let's read from last line read + resume = false; + } + logger.info("CheckPoint. checkPointFile=" + checkPointFile + + ", json=" + jsonCheckPointStr + ", resumeFromLineNumber=" + + resumeFromLineNumber + ", resume=" + resume); + } + } catch (EOFException eofEx) { + logger.info("EOFException. Will reset checkpoint file " + + checkPointFile.getAbsolutePath() + " for " + + getShortDescription()); + } + if (jsonCheckPoint == null) { + // This seems to be first time, so creating the initial + // checkPoint object + jsonCheckPoint = new HashMap<String, Object>(); + jsonCheckPoint.put("file_path", filePath); + jsonCheckPoint.put("file_key", fileBase64); + } + + } catch (Throwable t) { + logger.error( + "Error while configuring checkpoint file. Will reset file. checkPointFile=" + + checkPointFile, t); + } + } + + setClosed(false); + int sleepStep = 2; + int sleepIteration = 0; + while (true) { + try { + if (isDrain()) { + break; + } + + String line = br.readLine(); + if (line == null) { + if (!resume) { + resume = true; + } + sleepIteration++; + try { + // Since FileWatch service is not reliable, we will + // check + // file inode every n seconds after no write + if (sleepIteration > 4) { + Object newFileKey = getFileKey(logPathFile); + if (newFileKey != null) { + if (fileKey == null || !newFileKey.equals(fileKey)) { + logger + .info("File key is different. Calling rollover. oldKey=" + + fileKey + + ", newKey=" + + newFileKey + + ". " + + getShortDescription()); + // File has rotated. + rollOver(); + } + } + } + // Flush on the second iteration + if (!tail && sleepIteration >= 2) { + logger.info("End of file. Done with filePath=" + logPathFile + + ", lineCount=" + lineCount); + flush(); + break; + } else if (sleepIteration == 2) { + flush(); + } else if (sleepIteration >= 2) { + if (isRolledOver) { + isRolledOver = false; + // Close existing file + try { + logger + .info("File is rolled over. Closing current open file." + + getShortDescription() + ", lineCount=" + + lineCount); + br.close(); + } catch (Exception ex) { + logger.error("Error closing file" + getShortDescription()); + break; + } + try { + // Open new file + logger.info("Opening new rolled over file." + + getShortDescription()); + br = S3Util.INSTANCE.getReader(logPathFile,s3AccessKey,s3SecretKey); + lineCount = 0; + fileKey = getFileKey(logPathFile); + base64FileKey = Base64.byteArrayToBase64(fileKey.toString() + .getBytes()); + logger.info("fileKey=" + fileKey + ", base64=" + + base64FileKey + ", " + getShortDescription()); + } catch (Exception ex) { + logger.error("Error opening rolled over file. " + + getShortDescription()); + // Let's add this to monitoring and exit + // this + // thread + logger.info("Added input to not ready list." + + getShortDescription()); + isReady = false; + inputMgr.addToNotReady(this); + break; + } + logger.info("File is successfully rolled over. " + + getShortDescription()); + continue; + } + } + Thread.sleep(sleepStep * 1000); + sleepStep = (sleepStep * 2); + sleepStep = sleepStep > 10 ? 10 : sleepStep; + } catch (InterruptedException e) { + logger.info("Thread interrupted." + getShortDescription()); + } + } else { + lineCount++; + sleepStep = 1; + sleepIteration = 0; + + if (!resume && lineCount > resumeFromLineNumber) { + logger.info("Resuming to read from last line. lineCount=" + + lineCount + ", input=" + getShortDescription()); + resume = true; + } + if (resume) { + InputMarker marker = new InputMarker(); + marker.fileKey = fileKey; + marker.base64FileKey = base64FileKey; + marker.filePath = filePath; + marker.input = this; + marker.lineNumber = lineCount; + outputLine(line, marker); + } + } + } catch (Throwable t) { + final String LOG_MESSAGE_KEY = this.getClass().getSimpleName() + + "_READ_LOOP_EXCEPTION"; + LogFeederUtil.logErrorMessageByInterval(LOG_MESSAGE_KEY, + "Caught exception in read loop. lineNumber=" + lineCount + + ", input=" + getShortDescription(), t, logger, Level.ERROR); + + } + } + } finally { + if (br != null) { + logger.info("Closing reader." + getShortDescription() + ", lineCount=" + + lineCount); + try { + br.close(); + } catch (Throwable t) { + // ignore + } + } + } + } + + /** + * @param s3FilePath + * @return + */ + static public Object getFileKey(String s3FilePath) { + return s3FilePath.toString(); + } + + /* + * (non-Javadoc) + * + * @see org.apache.ambari.logfeeder.input.Input#getShortDescription() + */ + @Override + public String getShortDescription() { + return "input:source=" + + getStringValue("source") + + ", path=" + + (s3LogPathFiles != null && s3LogPathFiles.length > 0 ? s3LogPathFiles[0] + : getStringValue("path")); + } + +} diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/Output.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/Output.java index c067680e4d..99a2909e40 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/Output.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/Output.java @@ -19,6 +19,7 @@ package org.apache.ambari.logfeeder.output; +import java.io.File; import java.lang.reflect.Type; import java.util.List; import java.util.Map; @@ -27,7 +28,6 @@ import java.util.Map.Entry; import org.apache.ambari.logfeeder.ConfigBlock; import org.apache.ambari.logfeeder.LogFeederUtil; import org.apache.ambari.logfeeder.MetricCount; -import org.apache.ambari.logfeeder.input.Input; import org.apache.ambari.logfeeder.input.InputMarker; import org.apache.log4j.Logger; @@ -57,9 +57,11 @@ public abstract class Output extends ConfigBlock { return super.getNameForThread(); } - public void write(String block, InputMarker inputMarker) throws Exception { - // No-op. Please implement in sub classes - } + public abstract void write(String block, InputMarker inputMarker) + throws Exception; + + public abstract void copyFile(File inputFile, InputMarker inputMarker) + throws UnsupportedOperationException; /** * @param jsonObj diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputDevNull.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputDevNull.java index b6188cb35f..7cfcb98184 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputDevNull.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputDevNull.java @@ -18,6 +18,8 @@ */ package org.apache.ambari.logfeeder.output; +import java.io.File; + import org.apache.ambari.logfeeder.input.InputMarker; import org.apache.log4j.Logger; @@ -30,8 +32,14 @@ public class OutputDevNull extends Output { private static Logger logger = Logger.getLogger(OutputDevNull.class); @Override - public void write(String block, InputMarker inputMarker) throws Exception { + public void write(String block, InputMarker inputMarker){ // just ignore the logs logger.trace("Ignore log block: " + block); } + + @Override + public void copyFile(File inputFile, InputMarker inputMarker) { + throw new UnsupportedOperationException( + "copyFile method is not yet supported for output=dev_null"); + } } diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputFile.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputFile.java index b6e36d692b..4327f6f626 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputFile.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputFile.java @@ -135,4 +135,11 @@ public class OutputFile extends Output { return "output:destination=file,path=" + filePath; } + @Override + public void copyFile(File inputFile, InputMarker inputMarker) + throws UnsupportedOperationException { + throw new UnsupportedOperationException( + "copyFile method is not yet supported for output=file"); + } + } diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputKafka.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputKafka.java index efbc366c10..120d071ae3 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputKafka.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputKafka.java @@ -19,6 +19,7 @@ package org.apache.ambari.logfeeder.output; +import java.io.File; import java.util.Properties; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutionException; @@ -283,4 +284,11 @@ public class OutputKafka extends Output { } } } + + @Override + public void copyFile(File inputFile, InputMarker inputMarker) + throws UnsupportedOperationException { + throw new UnsupportedOperationException( + "copyFile method is not yet supported for output=kafka"); + } }
\ No newline at end of file diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java new file mode 100644 index 0000000000..4cdf82d196 --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.logfeeder.output; + +import java.io.File; +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.ambari.logfeeder.LogFeeder; +import org.apache.ambari.logfeeder.LogFeederUtil; +import org.apache.ambari.logfeeder.filter.Filter; +import org.apache.ambari.logfeeder.input.InputMarker; +import org.apache.ambari.logfeeder.s3.S3Util; +import org.apache.ambari.logfeeder.util.CompressionUtil; +import org.apache.ambari.logfeeder.util.PlaceholderUtil; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; + +/** + * + * Write log file into s3 bucket + */ +public class OutputS3File extends Output { + + + private static boolean uploadedGlobalConfig = false; + + @Override + public void copyFile(File inputFile, InputMarker inputMarker) { + String bucketName = getStringValue("s3_bucket"); + String s3LogDir = getStringValue("s3_log_dir"); + HashMap<String, String> contextParam = buildContextParam(); + s3LogDir = PlaceholderUtil.replaceVariables(s3LogDir, contextParam); + String s3AccessKey = getStringValue("s3_access_key"); + String s3SecretKey = getStringValue("s3_secret_key"); + String compressionAlgo = getStringValue("compression_algo"); + String fileName = inputFile.getName(); + // create tmp compressed File + String tmpDir = LogFeederUtil.getLogfeederTempDir(); + File outputFile = new File(tmpDir + fileName + "_" + + new Date().getTime() + "." + compressionAlgo); + outputFile = CompressionUtil.compressFile(inputFile, outputFile, + compressionAlgo); + String type = inputMarker.input.getStringValue("type"); + String s3Path = s3LogDir + S3Util.INSTANCE.S3_PATH_SEPARATOR + type + + S3Util.INSTANCE.S3_PATH_SEPARATOR + fileName + "." + + compressionAlgo; + S3Util.INSTANCE.uploadFileTos3(bucketName, s3Path, outputFile, s3AccessKey, + s3SecretKey); + // delete local compressed file + outputFile.deleteOnExit(); + ArrayList<Map<String, Object>> filters = new ArrayList<Map<String, Object>>(); + addFilters(filters, inputMarker.input.getFirstFilter()); + Map<String, Object> inputConfig = new HashMap<String, Object>(); + inputConfig.putAll(inputMarker.input.getConfigs()); + String s3CompletePath = S3Util.INSTANCE.S3_PATH_START_WITH + bucketName + + S3Util.INSTANCE.S3_PATH_SEPARATOR + s3Path; + inputConfig.put("path", s3CompletePath); + + ArrayList<Map<String, Object>> inputConfigList = new ArrayList<Map<String, Object>>(); + inputConfigList.add(inputConfig); + // set source s3_file + // remove global config from filter config + removeGlobalConfig(inputConfigList); + removeGlobalConfig(filters); + // write config into s3 file + String s3Key = getComponentConfigFileName(type); + Map<String, Object> config = new HashMap<String, Object>(); + config.put("filter", filters); + config.put("input", inputConfigList); + writeConfigToS3(config, bucketName, s3AccessKey, s3SecretKey, contextParam, + s3Key); + // write global config + writeGlobalConfig(); + + } + + /** + * + * @param filters + * @param filter + */ + public void addFilters(ArrayList<Map<String, Object>> filters, Filter filter) { + if (filter != null) { + Map<String, Object> filterConfig = new HashMap<String, Object>(); + filterConfig.putAll(filter.getConfigs()); + filters.add(filterConfig); + if (filter.getNextFilter() != null) { + addFilters(filters, filter.getNextFilter()); + } + } + } + + /** + * + * @param filters + * @param inputConfig + * @param bucketName + * @param componentName + */ + public void writeConfigToS3(Map<String, Object> config, String bucketName, + String accessKey, String secretKey, HashMap<String, String> contextParam, + String s3Key) { + String s3ConfigDir = getStringValue("s3_config_dir"); + s3ConfigDir = PlaceholderUtil.replaceVariables(s3ConfigDir, contextParam); + Gson gson = new GsonBuilder().setPrettyPrinting().create(); + String configJson = gson.toJson(config); + // write json to s3 file + s3Key = s3ConfigDir + S3Util.INSTANCE.S3_PATH_SEPARATOR + s3Key; + S3Util.INSTANCE.writeIntoS3File(configJson, bucketName, s3Key, accessKey, + secretKey); + } + + /** + * + * @param componentName + * @return String + */ + public String getComponentConfigFileName(String componentName) { + String fileName = "input.config-" + componentName + ".json"; + return fileName; + } + + public HashMap<String, String> buildContextParam() { + HashMap<String, String> contextParam = new HashMap<String, String>(); + contextParam.put("host", LogFeederUtil.hostName); + contextParam.put("ip", LogFeederUtil.ipAddress); + String cluster = getNVList("add_fields").get("cluster"); + contextParam.put("cluster", cluster); + return contextParam; + } + + + private Map<String, Object> getGlobalConfig() { + Map<String, Object> globalConfig = LogFeeder.globalMap; + if (globalConfig == null) { + globalConfig = new HashMap<String, Object>(); + } + return globalConfig; + } + + private void removeGlobalConfig(List<Map<String, Object>> configList) { + Map<String, Object> globalConfig = getGlobalConfig(); + if (configList != null && globalConfig != null) { + for (Entry<String, Object> globalConfigEntry : globalConfig.entrySet()) { + if (globalConfigEntry != null) { + String globalKey = globalConfigEntry.getKey(); + if (globalKey != null && !globalKey.trim().isEmpty()) { + for (Map<String, Object> config : configList) { + if (config != null) { + config.remove(globalKey); + } + } + } + } + } + } + } + + /** + * write global config in s3 file Invoke only once + */ + @SuppressWarnings("unchecked") + private synchronized void writeGlobalConfig() { + if (!uploadedGlobalConfig) { + Map<String, Object> globalConfig = LogFeederUtil.cloneObject(getGlobalConfig()); + //updating global config before write to s3 + globalConfig.put("source", "s3_file"); + globalConfig.put("copy_file", false); + globalConfig.put("process_file", true); + globalConfig.put("tail", false); + Map<String, Object> addFields = (Map<String, Object>) globalConfig + .get("add_fields"); + if (addFields == null) { + addFields = new HashMap<String, Object>(); + } + addFields.put("ip", LogFeederUtil.ipAddress); + addFields.put("host", LogFeederUtil.hostName); + // add bundle id same as cluster if its not there + String bundle_id = (String) addFields.get("bundle_id"); + if (bundle_id == null || bundle_id.isEmpty()) { + String cluster = (String) addFields.get("cluster"); + if (cluster != null && !cluster.isEmpty()) { + addFields.put("bundle_id", bundle_id); + } + } + globalConfig.put("add_fields", addFields); + Map<String, Object> config = new HashMap<String, Object>(); + config.put("global", globalConfig); + String s3AccessKey = getStringValue("s3_access_key"); + String s3SecretKey = getStringValue("s3_secret_key"); + String bucketName = getStringValue("s3_bucket"); + String s3Key = "global.config.json"; + HashMap<String, String> contextParam = buildContextParam(); + writeConfigToS3(config, bucketName, s3AccessKey, s3SecretKey, + contextParam, s3Key); + uploadedGlobalConfig = true; + } + } + + @Override + public void write(String block, InputMarker inputMarker) throws Exception { + throw new UnsupportedOperationException( + "write method is not yet supported for output=s3_file"); + } +} diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java index 14b2093e36..6e3248b2ab 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java @@ -19,6 +19,7 @@ package org.apache.ambari.logfeeder.output; +import java.io.File; import java.io.IOException; import java.net.MalformedURLException; import java.util.ArrayList; @@ -454,4 +455,17 @@ public class OutputSolr extends Output { return localBuffer.isEmpty(); } } + + @Override + public void write(String block, InputMarker inputMarker) throws Exception { + // TODO Auto-generated method stub + + } + + @Override + public void copyFile(File inputFile, InputMarker inputMarker) + throws UnsupportedOperationException { + throw new UnsupportedOperationException( + "copyFile method is not yet supported for output=solr"); + } } diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/s3/AWSUtil.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/s3/AWSUtil.java new file mode 100644 index 0000000000..050b69b284 --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/s3/AWSUtil.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.logfeeder.s3; + +import org.apache.log4j.Logger; + +import com.amazonaws.AmazonServiceException; +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.services.identitymanagement.AmazonIdentityManagementClient; + +public enum AWSUtil { + INSTANCE; + private static final Logger LOG = Logger.getLogger(AWSUtil.class); + + /** + * Get aws username + * + * @param accessKey + * @param secretKey + * @return String + */ + public String getAwsUserName(String accessKey, String secretKey) { + String username = null; + AWSCredentials awsCredentials = createAWSCredentials(accessKey, secretKey); + AmazonIdentityManagementClient amazonIdentityManagementClient; + if (awsCredentials != null) { + amazonIdentityManagementClient = new AmazonIdentityManagementClient( + awsCredentials); + } else { + // create default client + amazonIdentityManagementClient = new AmazonIdentityManagementClient(); + } + try { + username = amazonIdentityManagementClient.getUser().getUser() + .getUserName(); + } catch (AmazonServiceException e) { + if (e.getErrorCode().compareTo("AccessDenied") == 0) { + String arn = null; + String msg = e.getMessage(); + int arnIdx = msg.indexOf("arn:aws"); + if (arnIdx != -1) { + int arnSpace = msg.indexOf(" ", arnIdx); + // should be similar to "arn:aws:iam::111111111111:user/username" + arn = msg.substring(arnIdx, arnSpace); + } + if (arn != null) { + String[] arnParts = arn.split(":"); + if (arnParts != null && arnParts.length > 5) { + username = arnParts[5]; + if (username != null) { + username = username.replace("user/", ""); + } + } + } + } + } catch (Exception exception) { + LOG.error( + "Error in getting username :" + exception.getLocalizedMessage(), + exception.getCause()); + } + return username; + } + + public AWSCredentials createAWSCredentials(String accessKey, String secretKey) { + if (accessKey != null && secretKey != null) { + LOG.debug("Creating aws client as per new accesskey and secretkey"); + AWSCredentials awsCredentials = new BasicAWSCredentials(accessKey, + secretKey); + return awsCredentials; + } else { + // retrun null + return null; + } + } +} diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/s3/S3Util.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/s3/S3Util.java new file mode 100644 index 0000000000..f49837c553 --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/s3/S3Util.java @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.logfeeder.s3; + +import java.io.BufferedReader; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.zip.GZIPInputStream; + +import org.apache.commons.io.IOUtils; +import org.apache.log4j.Logger; + +import com.amazonaws.AmazonClientException; +import com.amazonaws.AmazonServiceException; +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.services.identitymanagement.AmazonIdentityManagementClient; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.model.GetObjectRequest; +import com.amazonaws.services.s3.model.ObjectMetadata; +import com.amazonaws.services.s3.model.PutObjectRequest; +import com.amazonaws.services.s3.model.S3Object; +import com.amazonaws.services.s3.model.S3ObjectInputStream; +import com.amazonaws.services.s3.transfer.TransferManager; +import com.amazonaws.services.s3.transfer.Upload; +import com.amazonaws.services.s3.transfer.model.UploadResult; + +/** + * Utility to connect to s3 + * + */ +public enum S3Util { + INSTANCE; + + private static final Logger LOG = Logger.getLogger(S3Util.class); + + public final String S3_PATH_START_WITH = "s3://"; + public final String S3_PATH_SEPARATOR = "/"; + + /** + * get s3 client + * + * @return AmazonS3 + */ + public AmazonS3 getS3Client(String accessKey, String secretKey) { + AWSCredentials awsCredentials = AWSUtil.INSTANCE.createAWSCredentials( + accessKey, secretKey); + AmazonS3 s3client; + if (awsCredentials != null) { + s3client = new AmazonS3Client(awsCredentials); + } else { + s3client = new AmazonS3Client(); + } + return s3client; + } + + /** + * + * @return TransferManager + */ + public TransferManager getTransferManager(String accessKey, String secretKey) { + AWSCredentials awsCredentials = AWSUtil.INSTANCE.createAWSCredentials( + accessKey, secretKey); + TransferManager transferManager; + if (awsCredentials != null) { + transferManager = new TransferManager(awsCredentials); + } else { + transferManager = new TransferManager(); + } + return transferManager; + } + + /** + * shutdown s3 transfer manager + */ + public void shutdownTransferManager(TransferManager transferManager) { + if (transferManager != null) { + transferManager.shutdownNow(); + } + } + + /** + * Extract bucket name from s3 file complete path + * + * @param s3Path + * @return String + */ + public String getBucketName(String s3Path) { + String bucketName = null; + // s3path + if (s3Path != null) { + String[] s3PathParts = s3Path.replace(S3_PATH_START_WITH, "").split( + S3_PATH_SEPARATOR); + bucketName = s3PathParts[0]; + } + return bucketName; + } + + /** + * get s3 key from s3Path after removing bucketname + * + * @param s3Path + * @return String + */ + public String getS3Key(String s3Path) { + StringBuilder s3Key = new StringBuilder(); + // s3path + if (s3Path != null) { + String[] s3PathParts = s3Path.replace(S3_PATH_START_WITH, "").split( + S3_PATH_SEPARATOR); + ArrayList<String> s3PathList = new ArrayList<String>( + Arrays.asList(s3PathParts)); + s3PathList.remove(0);// remove bucketName + for (int index = 0; index < s3PathList.size(); index++) { + if (index > 0) { + s3Key.append(S3_PATH_SEPARATOR); + } + s3Key.append(s3PathList.get(index)); + } + } + return s3Key.toString(); + } + + /** + * + * @param bucketName + * @param s3Key + * @param localFile + */ + public void uploadFileTos3(String bucketName, String s3Key, File localFile, + String accessKey, String secretKey) { + TransferManager transferManager = getTransferManager(accessKey, secretKey); + try { + Upload upload = transferManager.upload(bucketName, s3Key, localFile); + UploadResult uploadResult = upload.waitForUploadResult(); + } catch (AmazonClientException | InterruptedException e) { + LOG.error("s3 uploading failed for file :" + localFile.getAbsolutePath(), + e); + } finally { + shutdownTransferManager(transferManager); + } + } + + /** + * Get the buffer reader to read s3 file as a stream + * + * @param s3Path + * @return BufferedReader + * @throws IOException + */ + public BufferedReader getReader(String s3Path, String accessKey, + String secretKey) throws IOException { + // TODO error handling + // Compression support + // read header and decide the compression(auto detection) + // For now hard-code GZIP compression + String s3Bucket = getBucketName(s3Path); + String s3Key = getS3Key(s3Path); + S3Object fileObj = getS3Client(accessKey, secretKey).getObject( + new GetObjectRequest(s3Bucket, s3Key)); + GZIPInputStream objectInputStream; + try { + objectInputStream = new GZIPInputStream(fileObj.getObjectContent()); + BufferedReader bufferedReader = new BufferedReader(new InputStreamReader( + objectInputStream)); + return bufferedReader; + } catch (IOException e) { + LOG.error("Error in creating stream reader for s3 file :" + s3Path, + e.getCause()); + throw e; + } + } + + /** + * + * @param data + * @param bucketName + * @param s3Key + */ + public void writeIntoS3File(String data, String bucketName, String s3Key, + String accessKey, String secretKey) { + InputStream in = null; + try { + in = IOUtils.toInputStream(data, "UTF-8"); + } catch (IOException e) { + LOG.error(e); + } + if (in != null) { + TransferManager transferManager = getTransferManager(accessKey, secretKey); + try { + if (transferManager != null) { + UploadResult uploadResult = transferManager + .upload( + new PutObjectRequest(bucketName, s3Key, in, + new ObjectMetadata())).waitForUploadResult(); + LOG.debug("Data Uploaded to s3 file :" + s3Key + " in bucket :" + + bucketName); + } + } catch (AmazonClientException | InterruptedException e) { + LOG.error(e); + } finally { + try { + shutdownTransferManager(transferManager); + in.close(); + } catch (IOException e) { + // ignore + } + } + } + } + +} diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/CompressionUtil.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/CompressionUtil.java new file mode 100644 index 0000000000..54008ec335 --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/CompressionUtil.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.logfeeder.util; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import org.apache.commons.compress.compressors.CompressorOutputStream; +import org.apache.commons.compress.compressors.CompressorStreamFactory; +import org.apache.commons.compress.utils.IOUtils; +import org.apache.log4j.Logger; + +public class CompressionUtil { + + private static final Logger LOG = Logger.getLogger(CompressionUtil.class); + + /** + * Compress file + * + * @param inputFile + * @param outputFile + * @param algoName + */ + public static File compressFile(File inputFile, File outputFile, String algoName) { + CompressorOutputStream cos = null; + FileInputStream ios = null; + try { + if (!inputFile.exists()) { + throw new IllegalArgumentException("Input File:" + + inputFile.getAbsolutePath() + " is not exist."); + } + if (inputFile.isDirectory()) { + throw new IllegalArgumentException("Input File:" + + inputFile.getAbsolutePath() + " is a directory."); + } + File parent = outputFile.getParentFile(); + if (parent != null && !parent.exists()) { + boolean isParentCreated = parent.mkdirs(); + if (!isParentCreated) { + throw new IllegalAccessException( + "User does not have permission to create parent directory :" + + parent.getAbsolutePath()); + } + } + final OutputStream out = new FileOutputStream(outputFile); + cos = new CompressorStreamFactory().createCompressorOutputStream( + algoName, out); + ios = new FileInputStream(inputFile); + IOUtils.copy(ios, cos); + } catch (Exception e) { + LOG.error(e); + } finally { + // Close the stream + if (cos != null) { + try { + cos.close(); + } catch (IOException e) { + LOG.error(e); + } + } + if (ios != null) { + try { + ios.close(); + } catch (IOException e) { + LOG.error(e); + } + } + } + return outputFile; + } +} diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/FileUtil.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/FileUtil.java new file mode 100644 index 0000000000..ec26a880e9 --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/FileUtil.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ambari.logfeeder.util; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.io.FileUtils; +import org.apache.log4j.Logger; + +public class FileUtil { + private static final Logger logger = Logger.getLogger(FileUtil.class); + + public static List<File> getAllFileFromDir(File directory, + String[] searchFileWithExtensions, boolean checkInSubDir) { + if (!directory.exists()) { + logger.error(directory.getAbsolutePath() + " is not exists "); + } else if (directory.isDirectory()) { + return (List<File>) FileUtils.listFiles(directory, + searchFileWithExtensions, checkInSubDir); + } else { + logger.error(directory.getAbsolutePath() + " is not Directory "); + } + return new ArrayList<File>(); + } +} diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/PlaceholderUtil.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/PlaceholderUtil.java new file mode 100644 index 0000000000..9be85ee173 --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/PlaceholderUtil.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.logfeeder.util; + +import java.util.HashMap; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.log4j.Logger; + +public class PlaceholderUtil { + + private static Logger LOG = Logger.getLogger(PlaceholderUtil.class); + + private static Pattern placeHolderPattern; + static { +// placeHolderPattern = Pattern.compile("\\{(.*?)\\}"); + placeHolderPattern = Pattern.compile("\\$\\s*(\\w+)"); + } + + /** + * + * @param inputStr + * @param contextParam + * @return String + */ + public static String replaceVariables(String inputStr, + HashMap<String, String> contextParam) { + Matcher m = placeHolderPattern.matcher(inputStr); + String placeholder; + String replacement; + String output = new String(inputStr); + while (m.find()) { + placeholder = m.group(); + if (placeholder != null && !placeholder.isEmpty()) { + String key = placeholder.replace("$","").toLowerCase();// remove + // brace + replacement = getFromContext(contextParam, placeholder, key); + output = output.replace(placeholder, replacement); + } + } + return output; + } + + /** + * + * @param contextParam + * @param defaultValue + * @param key + * @return String + */ + private static String getFromContext(HashMap<String, String> contextParam, + String defaultValue, String key) { + String returnValue = defaultValue;// by default set default value as a + // return + if (contextParam != null) { + String value = contextParam.get(key); + if (value != null && !value.trim().isEmpty()) { + returnValue = value; + } + } + return returnValue; + } +} diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/alias_config.json b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/alias_config.json index a55b348ff5..58bcdae9fb 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/alias_config.json +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/alias_config.json @@ -2,6 +2,9 @@ "input": { "file": { "klass": "org.apache.ambari.logfeeder.input.InputFile" + }, + "s3_file": { + "klass": "org.apache.ambari.logfeeder.input.InputS3File" } }, @@ -40,6 +43,9 @@ }, "dev_null": { "klass": "org.apache.ambari.logfeeder.output.OutputDevNull" - } + }, + "s3_file": { + "klass": "org.apache.ambari.logfeeder.output.OutputS3File" + } } }
\ No newline at end of file diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/logfeeder.properties b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/logfeeder.properties index 076c09c643..fc72ce08f2 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/logfeeder.properties +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/logfeeder.properties @@ -22,3 +22,6 @@ logfeeder.solr.config.interval=5 logfeeder.solr.core.history=history logfeeder.solr.zkhosts= logfeeder.solr.url= + +#logfeeder tmp dir +logfeeder.tmp.dir=/tmp/$username/logfeeder/ diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/s3/AWSUtilTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/s3/AWSUtilTest.java new file mode 100644 index 0000000000..1e2be37d55 --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/s3/AWSUtilTest.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.logfeeder.s3; + +public class AWSUtilTest { +// @Test + public void testAWSUtil_getAwsUserName() throws Exception { + String S3_ACCESS_KEY = "S3_ACCESS_KEY"; + String S3_SECRET_KEY = "S3_SECRET_KEY"; + String expectedUsername = ""; + String username = AWSUtil.INSTANCE.getAwsUserName(S3_ACCESS_KEY, + S3_SECRET_KEY); + } +} diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/s3/S3UtilTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/s3/S3UtilTest.java new file mode 100644 index 0000000000..d07ae2b823 --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/s3/S3UtilTest.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.logfeeder.s3; + +import org.apache.log4j.Logger; +import org.junit.Test; +import static org.junit.Assert.assertEquals; + +public class S3UtilTest { + private static final Logger LOG = Logger.getLogger(S3UtilTest.class); + + // @Test + public void testS3Util_pathToBucketName() throws Exception { + String s3Path = "s3://bucket_name/path/file.txt"; + String expectedBucketName = "bucket_name"; + String actualBucketName = S3Util.INSTANCE.getBucketName(s3Path); + assertEquals(expectedBucketName, actualBucketName); + } + + // @Test + public void testS3Util_pathToS3Key() throws Exception { + String s3Path = "s3://bucket_name/path/file.txt"; + String expectedS3key = "path/file.txt"; + String actualS3key = S3Util.INSTANCE.getS3Key(s3Path); + assertEquals(expectedS3key, actualS3key); + } + +} diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/util/PlaceholderUtilTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/util/PlaceholderUtilTest.java new file mode 100644 index 0000000000..373a52fc95 --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/util/PlaceholderUtilTest.java @@ -0,0 +1,48 @@ +package org.apache.ambari.logfeeder.util; + +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import java.util.HashMap; + +import org.apache.log4j.Logger; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class PlaceholderUtilTest { + + private static final Logger LOG = Logger.getLogger(PlaceholderUtilTest.class); + + @Test + public void testPlaceholderUtil_replaceVariables() { + HashMap<String, String> contextParam = new HashMap<String, String>(); + String hostName = "host1"; + String ip = "127.0.0.1"; + String clusterName = "test-cluster"; + contextParam.put("host", hostName); + contextParam.put("ip", ip); + contextParam.put("cluster", clusterName); + String inputStr = "$CLUSTER/logfeeder/$HOST-$IP/logs"; + String resultStr = PlaceholderUtil.replaceVariables(inputStr, contextParam); + String expectedStr = clusterName + "/logfeeder/" + hostName + "-" + ip + "/logs"; + assertEquals("Result string :" + resultStr + + " is not equal to exptected string :" + expectedStr, resultStr, + expectedStr); + } + +} |