diff options
author | oleewere <oleewere@gmail.com> | 2016-07-04 11:50:06 +0200 |
---|---|---|
committer | oleewere <oleewere@gmail.com> | 2016-07-04 11:57:26 +0200 |
commit | 9ea8cd820b4de48934d830ed2025476509c6c918 (patch) | |
tree | 01dc07555d8ee8d8133e55a264ff2382bfe0627a /ambari-logsearch/ambari-logsearch-logfeeder | |
parent | 2f86680432bd9905d3661a3090f08420c6649173 (diff) |
AMBARI-17046. Support loading logs to HDFS (Hayat Behlim via oleewere)
Diffstat (limited to 'ambari-logsearch/ambari-logsearch-logfeeder')
8 files changed, 475 insertions, 3 deletions
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml b/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml index e3762f7e1d..c1f3ec9b31 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml +++ b/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml @@ -139,6 +139,21 @@ <artifactId>aws-java-sdk-iam</artifactId> <version>1.11.5</version> </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <version>${hadoop.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs</artifactId> + <version>${hadoop.version}</version> + </dependency> + <dependency> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + <version>${common.io.version}</version> + </dependency> </dependencies> <build> <finalName>LogFeeder</finalName> diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/ConfigBlock.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/ConfigBlock.java index 088472e274..19bb191ddd 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/ConfigBlock.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/ConfigBlock.java @@ -204,6 +204,20 @@ public abstract class ConfigBlock { } return retValue; } + + public long getLongValue(String key, long defaultValue) { + String strValue = getStringValue(key); + Long retValue = defaultValue; + if (!StringUtils.isEmpty(strValue)) { + try { + retValue = Long.parseLong(strValue); + } catch (Throwable t) { + logger.error("Error parsing long value. key=" + key + ", value=" + + strValue); + } + } + return retValue; + } public Map<String, String> getContextFields() { return contextFields; diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java index c5d4fd57ad..b0c43bbc4b 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java @@ -43,6 +43,7 @@ import org.apache.ambari.logfeeder.input.Input; import org.apache.ambari.logfeeder.logconfig.LogfeederScheduler; import org.apache.ambari.logfeeder.output.Output; import org.apache.ambari.logfeeder.util.FileUtil; +import org.apache.hadoop.util.ShutdownHookManager; import org.apache.log4j.Level; import org.apache.log4j.Logger; @@ -52,6 +53,8 @@ public class LogFeeder { static Logger logger = Logger.getLogger(LogFeeder.class); Collection<Output> outputList = new ArrayList<Output>(); + + private static final int LOGFEEDER_SHUTDOWN_HOOK_PRIORITY = 30; OutputMgr outMgr = new OutputMgr(); InputMgr inputMgr = new InputMgr(); @@ -448,8 +451,10 @@ public class LogFeeder { private void monitor() throws Exception { inputMgr.monitor(); - Runtime.getRuntime().addShutdownHook(new JVMShutdownHook()); - + JVMShutdownHook logfeederJVMHook = new JVMShutdownHook(); + ShutdownHookManager.get().addShutdownHook(logfeederJVMHook, + LOGFEEDER_SHUTDOWN_HOOK_PRIORITY); + statLoggerThread = new Thread("statLogger") { @Override diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputHDFSFile.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputHDFSFile.java new file mode 100644 index 0000000000..9272636113 --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputHDFSFile.java @@ -0,0 +1,273 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ambari.logfeeder.output; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileWriter; +import java.io.IOException; +import java.io.PrintWriter; +import java.util.Date; +import java.util.HashMap; +import java.util.Iterator; +import java.util.concurrent.ConcurrentLinkedQueue; + +import org.apache.ambari.logfeeder.LogFeederUtil; +import org.apache.ambari.logfeeder.input.InputMarker; +import org.apache.ambari.logfeeder.util.DateUtil; +import org.apache.ambari.logfeeder.util.LogfeederHDFSUtil; +import org.apache.ambari.logfeeder.util.PlaceholderUtil; +import org.apache.hadoop.fs.FileSystem; +import org.apache.log4j.Logger; + +public class OutputHDFSFile extends Output { + private final static Logger logger = Logger.getLogger(OutputHDFSFile.class); + + private ConcurrentLinkedQueue<File> localReadyFiles = new ConcurrentLinkedQueue<File>(); + + private final Object readyMonitor = new Object(); + + private Thread hdfsCopyThread = null; + + private PrintWriter outWriter = null; + // local writer variables + private String localFilePath = null; + private String filenamePrefix = "service-logs-"; + private String localFileDir = null; + private File localcurrentFile = null; + private Date localFileCreateTime = null; + private long localFileRolloverSec = 5 * 1 * 60;// 5 min by default + + private String hdfsOutDir = null; + private String hdfsHost = null; + private String hdfsPort = null; + private FileSystem fileSystem = null; + + private String fileDateFormat = "yyyy-MM-dd-HH-mm-ss"; + + @Override + public void init() throws Exception { + super.init(); + hdfsOutDir = getStringValue("hdfs_out_dir"); + hdfsHost = getStringValue("hdfs_host"); + hdfsPort = getStringValue("hdfs_port"); + localFileRolloverSec = getLongValue("rollover_sec", localFileRolloverSec); + filenamePrefix = getStringValue("file_name_prefix", filenamePrefix); + if (hdfsOutDir == null || hdfsOutDir.isEmpty()) { + logger + .error("Filepath config property <path> is not set in config file."); + return; + } + HashMap<String, String> contextParam = buildContextParam(); + hdfsOutDir = PlaceholderUtil.replaceVariables(hdfsOutDir, contextParam); + logger.info("hdfs Output dir=" + hdfsOutDir); + localFileDir = LogFeederUtil.getLogfeederTempDir() + "hdfs/service/"; + localFilePath = localFileDir; + this.startHDFSCopyThread(); + } + + @Override + public void close() { + logger.info("Closing file." + getShortDescription()); + if (outWriter != null) { + try { + outWriter.flush(); + outWriter.close(); + addFileInReadyList(localcurrentFile); + } catch (Throwable t) { + // Ignore this exception + } + } + this.stopHDFSCopyThread(); + isClosed = true; + } + + @Override + synchronized public void write(String block, InputMarker inputMarker) + throws Exception { + if (block != null) { + buildOutWriter(); + if (outWriter != null) { + statMetric.count++; + outWriter.println(block); + closeFileIfNeeded(); + } + } + } + + + @Override + public String getShortDescription() { + return "output:destination=hdfs,hdfsOutDir=" + hdfsOutDir; + } + + private synchronized void closeFileIfNeeded() throws FileNotFoundException, + IOException { + if (outWriter == null) { + return; + } + // TODO: Close the file on absolute time. Currently it is implemented as + // relative time + if (System.currentTimeMillis() - localFileCreateTime.getTime() > localFileRolloverSec * 1000) { + logger.info("Closing file. Rolling over. name=" + + localcurrentFile.getName() + ", filePath=" + + localcurrentFile.getAbsolutePath()); + try { + outWriter.flush(); + outWriter.close(); + addFileInReadyList(localcurrentFile); + } catch (Throwable t) { + logger + .error("Error on closing output writter. Exception will be ignored. name=" + + localcurrentFile.getName() + + ", filePath=" + + localcurrentFile.getAbsolutePath()); + } + + outWriter = null; + localcurrentFile = null; + } + } + + public synchronized void buildOutWriter() { + if (outWriter == null) { + String currentFilePath = localFilePath + getCurrentFileName(); + localcurrentFile = new File(currentFilePath); + if (localcurrentFile.getParentFile() != null) { + File parentDir = localcurrentFile.getParentFile(); + if (!parentDir.isDirectory()) { + parentDir.mkdirs(); + } + } + try { + outWriter = new PrintWriter(new BufferedWriter(new FileWriter( + localcurrentFile, true))); + } catch (IOException e) { + logger.error("= OutputHDFSFile.buidOutWriter failed for file : " + + localcurrentFile.getAbsolutePath() + " Desc: " + + getShortDescription() + " errorMsg: " + e.getLocalizedMessage(), + e); + } + localFileCreateTime = new Date(); + logger.info("Create file is successful. localFilePath=" + + localcurrentFile.getAbsolutePath()); + } + } + + private void startHDFSCopyThread() { + + hdfsCopyThread = new Thread("hdfsCopyThread") { + @Override + public void run() { + try { + while (true) { + Iterator<File> localFileIterator = localReadyFiles.iterator(); + while (localFileIterator.hasNext()) { + File localFile = localFileIterator.next(); + fileSystem = LogfeederHDFSUtil.INSTANCE.buildFileSystem(hdfsHost, + hdfsPort); + if (fileSystem != null && localFile.exists()) { + String destFilePath = hdfsOutDir + "/" + localFile.getName(); + String localPath = localFile.getAbsolutePath(); + boolean overWrite = true; + boolean delSrc = true; + boolean isCopied = LogfeederHDFSUtil.INSTANCE.copyFromLocal( + localFile.getAbsolutePath(), destFilePath, fileSystem, + overWrite, delSrc); + if (isCopied) { + logger.debug("File copy to hdfs hdfspath :" + destFilePath + + " and deleted local file :" + localPath); + } else { + // TODO Need to write retry logic, in next release we can + // handle it + logger.error("Hdfs file copy failed for hdfspath :" + + destFilePath + " and localpath :" + localPath); + } + + } + + } + try { + // wait till new file comes in reayList + synchronized (readyMonitor) { + if (localReadyFiles.size() == 0) { + readyMonitor.wait(); + } + } + } catch (InterruptedException e) { + // ignore + } + } + } catch (Exception e) { + logger + .error( + "Exception in hdfsCopyThread errorMsg:" + + e.getLocalizedMessage(), e); + } + } + }; + hdfsCopyThread.setDaemon(true); + hdfsCopyThread.start(); + } + + private void stopHDFSCopyThread() { + if (hdfsCopyThread != null) { + logger.info("waiting till copy all local files to hdfs......."); + while (localReadyFiles.size() != 0) { + + } + logger.info("calling interrupt method for hdfsCopyThread to stop it."); + hdfsCopyThread.interrupt(); + LogfeederHDFSUtil.INSTANCE.closeFileSystem(fileSystem); + } + } + + public String getCurrentFileName() { + Date currentDate = new Date(); + String dateStr = DateUtil.dateToString(currentDate, fileDateFormat); + String fileName = filenamePrefix + dateStr; + return fileName; + } + + public HashMap<String, String> buildContextParam() { + HashMap<String, String> contextParam = new HashMap<String, String>(); + contextParam.put("host", LogFeederUtil.hostName); + return contextParam; + } + + public void addFileInReadyList(File localFile) { + localReadyFiles.add(localFile); + try { + synchronized (readyMonitor) { + readyMonitor.notifyAll(); + } + } catch (Exception exception) { + // ignore + } + } + + @Override + public void copyFile(File inputFile, InputMarker inputMarker) + throws UnsupportedOperationException { + throw new UnsupportedOperationException( + "copyFile method is not yet supported for output=hdfs"); + } +} diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/DateUtil.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/DateUtil.java new file mode 100644 index 0000000000..1c0ce67d24 --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/DateUtil.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.logfeeder.util; + +import java.text.SimpleDateFormat; +import java.util.Date; + +import org.apache.log4j.Logger; + +public class DateUtil { + private static final Logger logger = Logger.getLogger(DateUtil.class); + + public static String dateToString(Date date, String dateFormat) { + if (date == null || dateFormat == null || dateFormat.isEmpty()) { + return ""; + } + try { + SimpleDateFormat formatter = new SimpleDateFormat(dateFormat); + return formatter.format(date).toString(); + } catch (Exception e) { + logger.error("Error in coverting dateToString format :" + dateFormat, e); + } + return ""; + } + + public static void main(String[] args) { + Date currentDate = new Date(); + String fileDateFormat = "yyyy-MM-dd-HH-mm-ss"; + System.out.println(dateToString(currentDate, fileDateFormat)); + } +} diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogfeederHDFSUtil.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogfeederHDFSUtil.java new file mode 100644 index 0000000000..fd96f8af2c --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogfeederHDFSUtil.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.logfeeder.util; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.Logger; + +public enum LogfeederHDFSUtil { + INSTANCE; + private static Logger logger = Logger.getLogger(LogfeederHDFSUtil.class); + + public void createHDFSDir(String dirPath, FileSystem dfs) { + Path src = new Path(dirPath); + try { + if (dfs.isDirectory(src)) { + logger.info("hdfs dir dirPath=" + dirPath + " is already exist."); + return; + } + boolean isDirCreated = dfs.mkdirs(src); + if (isDirCreated) { + logger.debug("HDFS dirPath=" + dirPath + " created successfully."); + } else { + logger.warn("HDFS dir creation failed dirPath=" + dirPath); + } + } catch (IOException e) { + logger.error("HDFS dir creation failed dirPath=" + dirPath, e.getCause()); + } + } + + public boolean copyFromLocal(String sourceFilepath, String destFilePath, + FileSystem fileSystem, boolean overwrite, boolean delSrc) { + Path src = new Path(sourceFilepath); + Path dst = new Path(destFilePath); + boolean isCopied = false; + try { + logger.info("copying localfile := " + sourceFilepath + " to hdfsPath := " + + destFilePath); + fileSystem.copyFromLocalFile(delSrc, overwrite, src, dst); + isCopied = true; + } catch (Exception e) { + logger.error("Error copying local file :" + sourceFilepath + + " to hdfs location : " + destFilePath, e); + } + return isCopied; + } + + public FileSystem buildFileSystem(String hdfsHost, String hdfsPort) { + try { + Configuration configuration = buildHdfsConfiguration(hdfsHost, hdfsPort); + FileSystem fs = FileSystem.get(configuration); + return fs; + } catch (Exception e) { + logger.error("Exception is buildFileSystem :", e); + } + return null; + } + + public void closeFileSystem(FileSystem fileSystem) { + if (fileSystem != null) { + try { + fileSystem.close(); + } catch (IOException e) { + logger.error(e.getLocalizedMessage(), e.getCause()); + } + } + } + + public Configuration buildHdfsConfiguration(String hdfsHost, String hdfsPort) { + String url = "hdfs://" + hdfsHost + ":" + hdfsPort + "/"; + Configuration configuration = new Configuration(); + configuration.set("fs.default.name", url); + return configuration; + } + +}
\ No newline at end of file diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/alias_config.json b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/alias_config.json index 58bcdae9fb..978f581bc3 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/alias_config.json +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/alias_config.json @@ -46,6 +46,9 @@ }, "s3_file": { "klass": "org.apache.ambari.logfeeder.output.OutputS3File" - } + }, + "hdfs": { + "klass": "org.apache.ambari.logfeeder.output.OutputHDFSFile" + } } }
\ No newline at end of file diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/resources/samples/config/output-hdfs-config.json b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/resources/samples/config/output-hdfs-config.json new file mode 100644 index 0000000000..336934a917 --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/resources/samples/config/output-hdfs-config.json @@ -0,0 +1,20 @@ +{ + +"output": [{ + "comment": "Write log to hdfs", + "destination": "hdfs", + "hdfs_out_dir": "logfeeder/$HOST/service", + "file_name_prefix":"service-logs-", + "hdfs_host": "hdfs_host", + "hdfs_port": "8020", + "rollover_sec":"300", + "conditions": { + "fields": { + "rowtype": [ + "service" + ] + } + } + }] + +} |