summaryrefslogtreecommitdiff
path: root/ambari-logsearch/ambari-logsearch-logfeeder
diff options
context:
space:
mode:
authoroleewere <oleewere@gmail.com>2016-07-04 11:50:06 +0200
committeroleewere <oleewere@gmail.com>2016-07-04 11:57:26 +0200
commit9ea8cd820b4de48934d830ed2025476509c6c918 (patch)
tree01dc07555d8ee8d8133e55a264ff2382bfe0627a /ambari-logsearch/ambari-logsearch-logfeeder
parent2f86680432bd9905d3661a3090f08420c6649173 (diff)
AMBARI-17046. Support loading logs to HDFS (Hayat Behlim via oleewere)
Diffstat (limited to 'ambari-logsearch/ambari-logsearch-logfeeder')
-rw-r--r--ambari-logsearch/ambari-logsearch-logfeeder/pom.xml15
-rw-r--r--ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/ConfigBlock.java14
-rw-r--r--ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java9
-rw-r--r--ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputHDFSFile.java273
-rw-r--r--ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/DateUtil.java47
-rw-r--r--ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogfeederHDFSUtil.java95
-rw-r--r--ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/alias_config.json5
-rw-r--r--ambari-logsearch/ambari-logsearch-logfeeder/src/test/resources/samples/config/output-hdfs-config.json20
8 files changed, 475 insertions, 3 deletions
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml b/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml
index e3762f7e1d..c1f3ec9b31 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml
@@ -139,6 +139,21 @@
<artifactId>aws-java-sdk-iam</artifactId>
<version>1.11.5</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ <version>${common.io.version}</version>
+ </dependency>
</dependencies>
<build>
<finalName>LogFeeder</finalName>
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/ConfigBlock.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/ConfigBlock.java
index 088472e274..19bb191ddd 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/ConfigBlock.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/ConfigBlock.java
@@ -204,6 +204,20 @@ public abstract class ConfigBlock {
}
return retValue;
}
+
+ public long getLongValue(String key, long defaultValue) {
+ String strValue = getStringValue(key);
+ Long retValue = defaultValue;
+ if (!StringUtils.isEmpty(strValue)) {
+ try {
+ retValue = Long.parseLong(strValue);
+ } catch (Throwable t) {
+ logger.error("Error parsing long value. key=" + key + ", value="
+ + strValue);
+ }
+ }
+ return retValue;
+ }
public Map<String, String> getContextFields() {
return contextFields;
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
index c5d4fd57ad..b0c43bbc4b 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
@@ -43,6 +43,7 @@ import org.apache.ambari.logfeeder.input.Input;
import org.apache.ambari.logfeeder.logconfig.LogfeederScheduler;
import org.apache.ambari.logfeeder.output.Output;
import org.apache.ambari.logfeeder.util.FileUtil;
+import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
@@ -52,6 +53,8 @@ public class LogFeeder {
static Logger logger = Logger.getLogger(LogFeeder.class);
Collection<Output> outputList = new ArrayList<Output>();
+
+ private static final int LOGFEEDER_SHUTDOWN_HOOK_PRIORITY = 30;
OutputMgr outMgr = new OutputMgr();
InputMgr inputMgr = new InputMgr();
@@ -448,8 +451,10 @@ public class LogFeeder {
private void monitor() throws Exception {
inputMgr.monitor();
- Runtime.getRuntime().addShutdownHook(new JVMShutdownHook());
-
+ JVMShutdownHook logfeederJVMHook = new JVMShutdownHook();
+ ShutdownHookManager.get().addShutdownHook(logfeederJVMHook,
+ LOGFEEDER_SHUTDOWN_HOOK_PRIORITY);
+
statLoggerThread = new Thread("statLogger") {
@Override
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputHDFSFile.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputHDFSFile.java
new file mode 100644
index 0000000000..9272636113
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputHDFSFile.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logfeeder.output;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.apache.ambari.logfeeder.LogFeederUtil;
+import org.apache.ambari.logfeeder.input.InputMarker;
+import org.apache.ambari.logfeeder.util.DateUtil;
+import org.apache.ambari.logfeeder.util.LogfeederHDFSUtil;
+import org.apache.ambari.logfeeder.util.PlaceholderUtil;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.log4j.Logger;
+
+public class OutputHDFSFile extends Output {
+ private final static Logger logger = Logger.getLogger(OutputHDFSFile.class);
+
+ private ConcurrentLinkedQueue<File> localReadyFiles = new ConcurrentLinkedQueue<File>();
+
+ private final Object readyMonitor = new Object();
+
+ private Thread hdfsCopyThread = null;
+
+ private PrintWriter outWriter = null;
+ // local writer variables
+ private String localFilePath = null;
+ private String filenamePrefix = "service-logs-";
+ private String localFileDir = null;
+ private File localcurrentFile = null;
+ private Date localFileCreateTime = null;
+ private long localFileRolloverSec = 5 * 1 * 60;// 5 min by default
+
+ private String hdfsOutDir = null;
+ private String hdfsHost = null;
+ private String hdfsPort = null;
+ private FileSystem fileSystem = null;
+
+ private String fileDateFormat = "yyyy-MM-dd-HH-mm-ss";
+
+ @Override
+ public void init() throws Exception {
+ super.init();
+ hdfsOutDir = getStringValue("hdfs_out_dir");
+ hdfsHost = getStringValue("hdfs_host");
+ hdfsPort = getStringValue("hdfs_port");
+ localFileRolloverSec = getLongValue("rollover_sec", localFileRolloverSec);
+ filenamePrefix = getStringValue("file_name_prefix", filenamePrefix);
+ if (hdfsOutDir == null || hdfsOutDir.isEmpty()) {
+ logger
+ .error("Filepath config property <path> is not set in config file.");
+ return;
+ }
+ HashMap<String, String> contextParam = buildContextParam();
+ hdfsOutDir = PlaceholderUtil.replaceVariables(hdfsOutDir, contextParam);
+ logger.info("hdfs Output dir=" + hdfsOutDir);
+ localFileDir = LogFeederUtil.getLogfeederTempDir() + "hdfs/service/";
+ localFilePath = localFileDir;
+ this.startHDFSCopyThread();
+ }
+
+ @Override
+ public void close() {
+ logger.info("Closing file." + getShortDescription());
+ if (outWriter != null) {
+ try {
+ outWriter.flush();
+ outWriter.close();
+ addFileInReadyList(localcurrentFile);
+ } catch (Throwable t) {
+ // Ignore this exception
+ }
+ }
+ this.stopHDFSCopyThread();
+ isClosed = true;
+ }
+
+ @Override
+ synchronized public void write(String block, InputMarker inputMarker)
+ throws Exception {
+ if (block != null) {
+ buildOutWriter();
+ if (outWriter != null) {
+ statMetric.count++;
+ outWriter.println(block);
+ closeFileIfNeeded();
+ }
+ }
+ }
+
+
+ @Override
+ public String getShortDescription() {
+ return "output:destination=hdfs,hdfsOutDir=" + hdfsOutDir;
+ }
+
+ private synchronized void closeFileIfNeeded() throws FileNotFoundException,
+ IOException {
+ if (outWriter == null) {
+ return;
+ }
+ // TODO: Close the file on absolute time. Currently it is implemented as
+ // relative time
+ if (System.currentTimeMillis() - localFileCreateTime.getTime() > localFileRolloverSec * 1000) {
+ logger.info("Closing file. Rolling over. name="
+ + localcurrentFile.getName() + ", filePath="
+ + localcurrentFile.getAbsolutePath());
+ try {
+ outWriter.flush();
+ outWriter.close();
+ addFileInReadyList(localcurrentFile);
+ } catch (Throwable t) {
+ logger
+ .error("Error on closing output writter. Exception will be ignored. name="
+ + localcurrentFile.getName()
+ + ", filePath="
+ + localcurrentFile.getAbsolutePath());
+ }
+
+ outWriter = null;
+ localcurrentFile = null;
+ }
+ }
+
+ public synchronized void buildOutWriter() {
+ if (outWriter == null) {
+ String currentFilePath = localFilePath + getCurrentFileName();
+ localcurrentFile = new File(currentFilePath);
+ if (localcurrentFile.getParentFile() != null) {
+ File parentDir = localcurrentFile.getParentFile();
+ if (!parentDir.isDirectory()) {
+ parentDir.mkdirs();
+ }
+ }
+ try {
+ outWriter = new PrintWriter(new BufferedWriter(new FileWriter(
+ localcurrentFile, true)));
+ } catch (IOException e) {
+ logger.error("= OutputHDFSFile.buidOutWriter failed for file : "
+ + localcurrentFile.getAbsolutePath() + " Desc: "
+ + getShortDescription() + " errorMsg: " + e.getLocalizedMessage(),
+ e);
+ }
+ localFileCreateTime = new Date();
+ logger.info("Create file is successful. localFilePath="
+ + localcurrentFile.getAbsolutePath());
+ }
+ }
+
+ private void startHDFSCopyThread() {
+
+ hdfsCopyThread = new Thread("hdfsCopyThread") {
+ @Override
+ public void run() {
+ try {
+ while (true) {
+ Iterator<File> localFileIterator = localReadyFiles.iterator();
+ while (localFileIterator.hasNext()) {
+ File localFile = localFileIterator.next();
+ fileSystem = LogfeederHDFSUtil.INSTANCE.buildFileSystem(hdfsHost,
+ hdfsPort);
+ if (fileSystem != null && localFile.exists()) {
+ String destFilePath = hdfsOutDir + "/" + localFile.getName();
+ String localPath = localFile.getAbsolutePath();
+ boolean overWrite = true;
+ boolean delSrc = true;
+ boolean isCopied = LogfeederHDFSUtil.INSTANCE.copyFromLocal(
+ localFile.getAbsolutePath(), destFilePath, fileSystem,
+ overWrite, delSrc);
+ if (isCopied) {
+ logger.debug("File copy to hdfs hdfspath :" + destFilePath
+ + " and deleted local file :" + localPath);
+ } else {
+ // TODO Need to write retry logic, in next release we can
+ // handle it
+ logger.error("Hdfs file copy failed for hdfspath :"
+ + destFilePath + " and localpath :" + localPath);
+ }
+
+ }
+
+ }
+ try {
+ // wait till new file comes in reayList
+ synchronized (readyMonitor) {
+ if (localReadyFiles.size() == 0) {
+ readyMonitor.wait();
+ }
+ }
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+ } catch (Exception e) {
+ logger
+ .error(
+ "Exception in hdfsCopyThread errorMsg:"
+ + e.getLocalizedMessage(), e);
+ }
+ }
+ };
+ hdfsCopyThread.setDaemon(true);
+ hdfsCopyThread.start();
+ }
+
+ private void stopHDFSCopyThread() {
+ if (hdfsCopyThread != null) {
+ logger.info("waiting till copy all local files to hdfs.......");
+ while (localReadyFiles.size() != 0) {
+
+ }
+ logger.info("calling interrupt method for hdfsCopyThread to stop it.");
+ hdfsCopyThread.interrupt();
+ LogfeederHDFSUtil.INSTANCE.closeFileSystem(fileSystem);
+ }
+ }
+
+ public String getCurrentFileName() {
+ Date currentDate = new Date();
+ String dateStr = DateUtil.dateToString(currentDate, fileDateFormat);
+ String fileName = filenamePrefix + dateStr;
+ return fileName;
+ }
+
+ public HashMap<String, String> buildContextParam() {
+ HashMap<String, String> contextParam = new HashMap<String, String>();
+ contextParam.put("host", LogFeederUtil.hostName);
+ return contextParam;
+ }
+
+ public void addFileInReadyList(File localFile) {
+ localReadyFiles.add(localFile);
+ try {
+ synchronized (readyMonitor) {
+ readyMonitor.notifyAll();
+ }
+ } catch (Exception exception) {
+ // ignore
+ }
+ }
+
+ @Override
+ public void copyFile(File inputFile, InputMarker inputMarker)
+ throws UnsupportedOperationException {
+ throw new UnsupportedOperationException(
+ "copyFile method is not yet supported for output=hdfs");
+ }
+}
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/DateUtil.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/DateUtil.java
new file mode 100644
index 0000000000..1c0ce67d24
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/DateUtil.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.util;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+import org.apache.log4j.Logger;
+
+public class DateUtil {
+ private static final Logger logger = Logger.getLogger(DateUtil.class);
+
+ public static String dateToString(Date date, String dateFormat) {
+ if (date == null || dateFormat == null || dateFormat.isEmpty()) {
+ return "";
+ }
+ try {
+ SimpleDateFormat formatter = new SimpleDateFormat(dateFormat);
+ return formatter.format(date).toString();
+ } catch (Exception e) {
+ logger.error("Error in coverting dateToString format :" + dateFormat, e);
+ }
+ return "";
+ }
+
+ public static void main(String[] args) {
+ Date currentDate = new Date();
+ String fileDateFormat = "yyyy-MM-dd-HH-mm-ss";
+ System.out.println(dateToString(currentDate, fileDateFormat));
+ }
+}
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogfeederHDFSUtil.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogfeederHDFSUtil.java
new file mode 100644
index 0000000000..fd96f8af2c
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogfeederHDFSUtil.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.util;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+public enum LogfeederHDFSUtil {
+ INSTANCE;
+ private static Logger logger = Logger.getLogger(LogfeederHDFSUtil.class);
+
+ public void createHDFSDir(String dirPath, FileSystem dfs) {
+ Path src = new Path(dirPath);
+ try {
+ if (dfs.isDirectory(src)) {
+ logger.info("hdfs dir dirPath=" + dirPath + " is already exist.");
+ return;
+ }
+ boolean isDirCreated = dfs.mkdirs(src);
+ if (isDirCreated) {
+ logger.debug("HDFS dirPath=" + dirPath + " created successfully.");
+ } else {
+ logger.warn("HDFS dir creation failed dirPath=" + dirPath);
+ }
+ } catch (IOException e) {
+ logger.error("HDFS dir creation failed dirPath=" + dirPath, e.getCause());
+ }
+ }
+
+ public boolean copyFromLocal(String sourceFilepath, String destFilePath,
+ FileSystem fileSystem, boolean overwrite, boolean delSrc) {
+ Path src = new Path(sourceFilepath);
+ Path dst = new Path(destFilePath);
+ boolean isCopied = false;
+ try {
+ logger.info("copying localfile := " + sourceFilepath + " to hdfsPath := "
+ + destFilePath);
+ fileSystem.copyFromLocalFile(delSrc, overwrite, src, dst);
+ isCopied = true;
+ } catch (Exception e) {
+ logger.error("Error copying local file :" + sourceFilepath
+ + " to hdfs location : " + destFilePath, e);
+ }
+ return isCopied;
+ }
+
+ public FileSystem buildFileSystem(String hdfsHost, String hdfsPort) {
+ try {
+ Configuration configuration = buildHdfsConfiguration(hdfsHost, hdfsPort);
+ FileSystem fs = FileSystem.get(configuration);
+ return fs;
+ } catch (Exception e) {
+ logger.error("Exception is buildFileSystem :", e);
+ }
+ return null;
+ }
+
+ public void closeFileSystem(FileSystem fileSystem) {
+ if (fileSystem != null) {
+ try {
+ fileSystem.close();
+ } catch (IOException e) {
+ logger.error(e.getLocalizedMessage(), e.getCause());
+ }
+ }
+ }
+
+ public Configuration buildHdfsConfiguration(String hdfsHost, String hdfsPort) {
+ String url = "hdfs://" + hdfsHost + ":" + hdfsPort + "/";
+ Configuration configuration = new Configuration();
+ configuration.set("fs.default.name", url);
+ return configuration;
+ }
+
+} \ No newline at end of file
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/alias_config.json b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/alias_config.json
index 58bcdae9fb..978f581bc3 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/alias_config.json
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/alias_config.json
@@ -46,6 +46,9 @@
},
"s3_file": {
"klass": "org.apache.ambari.logfeeder.output.OutputS3File"
- }
+ },
+ "hdfs": {
+ "klass": "org.apache.ambari.logfeeder.output.OutputHDFSFile"
+ }
}
} \ No newline at end of file
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/resources/samples/config/output-hdfs-config.json b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/resources/samples/config/output-hdfs-config.json
new file mode 100644
index 0000000000..336934a917
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/resources/samples/config/output-hdfs-config.json
@@ -0,0 +1,20 @@
+{
+
+"output": [{
+ "comment": "Write log to hdfs",
+ "destination": "hdfs",
+ "hdfs_out_dir": "logfeeder/$HOST/service",
+ "file_name_prefix":"service-logs-",
+ "hdfs_host": "hdfs_host",
+ "hdfs_port": "8020",
+ "rollover_sec":"300",
+ "conditions": {
+ "fields": {
+ "rowtype": [
+ "service"
+ ]
+ }
+ }
+ }]
+
+}