hadoop运行作业的脚本解析

时间:2023-11-22 08:31:44
#!/usr/bin/env bash

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License. # This script runs the hadoop core commands. bin=`which $`
bin=`dirname ${bin}`
bin=`cd "$bin"; pwd` DEFAULT_LIBEXEC_DIR="$bin"/../libexec HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR}
. $HADOOP_LIBEXEC_DIR/hadoop-config.sh function print_usage(){
echo "Usage: hadoop [--config confdir] [COMMAND | CLASSNAME]"
echo " CLASSNAME run the class named CLASSNAME"
echo " or"
echo " where COMMAND is one of:"
echo " fs run a generic filesystem user client"
echo " version print the version"
echo " jar <jar> run a jar file"
echo " note: please use \"yarn jar\" to launch"
echo " YARN applications, not this command."
echo " checknative [-a|-h] check native hadoop and compression libraries availability"
echo " distcp <srcurl> <desturl> copy file or directories recursively"
echo " archive -archiveName NAME -p <parent path> <src>* <dest> create a hadoop archive"
echo " classpath prints the class path needed to get the"
echo " credential interact with credential providers"
echo " Hadoop jar and the required libraries"
echo " daemonlog get/set the log level for each daemon"
echo " trace view and modify Hadoop tracing settings"
echo ""
echo "Most commands print help when invoked w/o parameters."
} if [ $# = ]; then
print_usage
exit
fi COMMAND=$
case $COMMAND in
# usage flags
--help|-help|-h)
print_usage
exit
;; #hdfs commands
namenode|secondarynamenode|datanode|dfs|dfsadmin|fsck|balancer|fetchdt|oiv|dfsgroups|portmap|nfs3)
echo "DEPRECATED: Use of this script to execute hdfs command is deprecated." >&
echo "Instead use the hdfs command for it." >&
echo "" >&
#try to locate hdfs and if present, delegate to it.
shift
if [ -f "${HADOOP_HDFS_HOME}"/bin/hdfs ]; then
exec "${HADOOP_HDFS_HOME}"/bin/hdfs ${COMMAND/dfsgroups/groups} "$@"
elif [ -f "${HADOOP_PREFIX}"/bin/hdfs ]; then
exec "${HADOOP_PREFIX}"/bin/hdfs ${COMMAND/dfsgroups/groups} "$@"
else
echo "HADOOP_HDFS_HOME not found!"
exit
fi
;; #mapred commands for backwards compatibility
pipes|job|queue|mrgroups|mradmin|jobtracker|tasktracker)
echo "DEPRECATED: Use of this script to execute mapred command is deprecated." >&
echo "Instead use the mapred command for it." >&
echo "" >&
#try to locate mapred and if present, delegate to it.
shift
if [ -f "${HADOOP_MAPRED_HOME}"/bin/mapred ]; then
exec "${HADOOP_MAPRED_HOME}"/bin/mapred ${COMMAND/mrgroups/groups} "$@"
elif [ -f "${HADOOP_PREFIX}"/bin/mapred ]; then
exec "${HADOOP_PREFIX}"/bin/mapred ${COMMAND/mrgroups/groups} "$@"
else
echo "HADOOP_MAPRED_HOME not found!"
exit
fi
;; #core commands
*)
# the core commands
if [ "$COMMAND" = "fs" ] ; then
CLASS=org.apache.hadoop.fs.FsShell
elif [ "$COMMAND" = "version" ] ; then
CLASS=org.apache.hadoop.util.VersionInfo
elif [ "$COMMAND" = "jar" ] ; then
CLASS=org.apache.hadoop.util.RunJar
if [[ -n "${YARN_OPTS}" ]] || [[ -n "${YARN_CLIENT_OPTS}" ]]; then
echo "WARNING: Use \"yarn jar\" to launch YARN applications." >&
fi
elif [ "$COMMAND" = "key" ] ; then
CLASS=org.apache.hadoop.crypto.key.KeyShell
elif [ "$COMMAND" = "checknative" ] ; then
CLASS=org.apache.hadoop.util.NativeLibraryChecker
elif [ "$COMMAND" = "distcp" ] ; then
CLASS=org.apache.hadoop.tools.DistCp
CLASSPATH=${CLASSPATH}:${TOOL_PATH}
elif [ "$COMMAND" = "daemonlog" ] ; then
CLASS=org.apache.hadoop.log.LogLevel
elif [ "$COMMAND" = "archive" ] ; then
CLASS=org.apache.hadoop.tools.HadoopArchives
CLASSPATH=${CLASSPATH}:${TOOL_PATH}
elif [ "$COMMAND" = "credential" ] ; then
CLASS=org.apache.hadoop.security.alias.CredentialShell
elif [ "$COMMAND" = "trace" ] ; then
CLASS=org.apache.hadoop.tracing.TraceAdmin
elif [ "$COMMAND" = "classpath" ] ; then
if [ "$#" -gt ]; then
CLASS=org.apache.hadoop.util.Classpath
else
# No need to bother starting up a JVM for this simple case.
if $cygwin; then
CLASSPATH=$(cygpath -p -w "$CLASSPATH" >/dev/null)
fi
echo $CLASSPATH
exit
fi
elif [[ "$COMMAND" = -* ]] ; then
# class and package names cannot begin with a -
echo "Error: No command named \`$COMMAND' was found. Perhaps you meant \`hadoop ${COMMAND#-}'"
exit
else
CLASS=$COMMAND
fi # cygwin path translation
if $cygwin; then
CLASSPATH=$(cygpath -p -w "$CLASSPATH" >/dev/null)
HADOOP_LOG_DIR=$(cygpath -w "$HADOOP_LOG_DIR" >/dev/null)
HADOOP_PREFIX=$(cygpath -w "$HADOOP_PREFIX" >/dev/null)
HADOOP_CONF_DIR=$(cygpath -w "$HADOOP_CONF_DIR" >/dev/null)
HADOOP_COMMON_HOME=$(cygpath -w "$HADOOP_COMMON_HOME" >/dev/null)
HADOOP_HDFS_HOME=$(cygpath -w "$HADOOP_HDFS_HOME" >/dev/null)
HADOOP_YARN_HOME=$(cygpath -w "$HADOOP_YARN_HOME" >/dev/null)
HADOOP_MAPRED_HOME=$(cygpath -w "$HADOOP_MAPRED_HOME" >/dev/null)
fi shift # Always respect HADOOP_OPTS and HADOOP_CLIENT_OPTS
HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS" #make sure security appender is turned off
HADOOP_OPTS="$HADOOP_OPTS -Dhadoop.security.logger=${HADOOP_SECURITY_LOGGER:-INFO,NullAppender}" export CLASSPATH=$CLASSPATH
exec "$JAVA" $JAVA_HEAP_MAX $HADOOP_OPTS $CLASS "$@"
;; esac

  可以看到当hadoop脚本运行jar包时,会执行org.apache.hadoop.util.RunJar脚本.

package org.apache.hadoop.util;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PrintStream;
import java.lang.reflect.Array;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Enumeration;
import java.util.List;
import java.util.jar.Attributes;
import java.util.jar.JarEntry;
import java.util.jar.JarFile;
import java.util.jar.Manifest;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.io.IOUtils; @InterfaceAudience.Private
@InterfaceStability.Unstable
public class RunJar
{
public static final Pattern MATCH_ANY = Pattern.compile(".*");
public static final int SHUTDOWN_HOOK_PRIORITY = 10;
public static final String HADOOP_USE_CLIENT_CLASSLOADER = "HADOOP_USE_CLIENT_CLASSLOADER";
public static final String HADOOP_CLASSPATH = "HADOOP_CLASSPATH";
public static final String HADOOP_CLIENT_CLASSLOADER_SYSTEM_CLASSES = "HADOOP_CLIENT_CLASSLOADER_SYSTEM_CLASSES"; public static void unJar(File jarFile, File toDir)
throws IOException
{
unJar(jarFile, toDir, MATCH_ANY);
} public static void unJar(File jarFile, File toDir, Pattern unpackRegex)
throws IOException
{
JarFile jar = new JarFile(jarFile);
try {
Enumeration entries = jar.entries();
while (entries.hasMoreElements()) {
JarEntry entry = (JarEntry)entries.nextElement();
if ((!entry.isDirectory()) && (unpackRegex.matcher(entry.getName()).matches()))
{
InputStream in = jar.getInputStream(entry);
try {
File file = new File(toDir, entry.getName());
ensureDirectory(file.getParentFile());
OutputStream out = new FileOutputStream(file);
try {
IOUtils.copyBytes(in, out, 8192);
} finally {
}
}
finally {
}
}
}
}
finally {
jar.close();
}
} private static void ensureDirectory(File dir)
throws IOException
{
if ((!dir.mkdirs()) && (!dir.isDirectory()))
throw new IOException(new StringBuilder().append("Mkdirs failed to create ").append(dir.toString()).toString());
} public static void main(String[] args)
throws Throwable
{
new RunJar().run(args);
} public void run(String[] args) throws Throwable {
String usage = "RunJar jarFile [mainClass] args..."; if (args.length < 1) {
System.err.println(usage);
System.exit(-1);
} int firstArg = 0;
String fileName = args[(firstArg++)];
File file = new File(fileName);
if ((!file.exists()) || (!file.isFile())) {
System.err.println(new StringBuilder().append("Not a valid JAR: ").append(file.getCanonicalPath()).toString());
System.exit(-1);
}
String mainClassName = null;
JarFile jarFile;
try {
jarFile = new JarFile(fileName);
} catch (IOException io) {
throw new IOException(new StringBuilder().append("Error opening job jar: ").append(fileName).toString()).initCause(io);
} Manifest manifest = jarFile.getManifest();
if (manifest != null) {
mainClassName = manifest.getMainAttributes().getValue("Main-Class");
}
jarFile.close(); if (mainClassName == null) {
if (args.length < 2) {
System.err.println(usage);
System.exit(-1);
}
mainClassName = args[(firstArg++)];
}mainClassName = mainClassName.replaceAll("/", "."); File tmpDir = new File(System.getProperty("java.io.tmpdir"));
ensureDirectory(tmpDir);
final File workDir;
try { workDir = File.createTempFile("hadoop-unjar", "", tmpDir); }
catch (IOException ioe)
{
System.err.println(new StringBuilder().append("Error creating temp dir in java.io.tmpdir ").append(tmpDir).append(" due to ").append(ioe.getMessage()).toString()); System.exit(-1);
return;
} if (!workDir.delete()) {
System.err.println(new StringBuilder().append("Delete failed for ").append(workDir).toString());
System.exit(-1);
}
ensureDirectory(workDir); ShutdownHookManager.get().addShutdownHook(new Runnable()
{
public void run()
{
FileUtil.fullyDelete(workDir);
}
}
, 10); unJar(file, workDir); ClassLoader loader = createClassLoader(file, workDir); Thread.currentThread().setContextClassLoader(loader);
Class mainClass = Class.forName(mainClassName, true, loader);
Method main = mainClass.getMethod("main", new Class[] { Array.newInstance(String.class, 0).getClass() }); String[] newArgs = (String[])Arrays.asList(args).subList(firstArg, args.length).toArray(new String[0]);
try
{
main.invoke(null, new Object[] { newArgs });
} catch (InvocationTargetException e) {
throw e.getTargetException();
}
} private ClassLoader createClassLoader(File file, File workDir)
throws MalformedURLException
{
ClassLoader loader;
ClassLoader loader;
if (useClientClassLoader()) {
StringBuilder sb = new StringBuilder();
sb.append(new StringBuilder().append(workDir).append("/").toString()).append(File.pathSeparator).append(file).append(File.pathSeparator).append(new StringBuilder().append(workDir).append("/classes/").toString()).append(File.pathSeparator).append(new StringBuilder().append(workDir).append("/lib/*").toString()); String hadoopClasspath = getHadoopClasspath();
if ((hadoopClasspath != null) && (!hadoopClasspath.isEmpty())) {
sb.append(File.pathSeparator).append(hadoopClasspath);
}
String clientClasspath = sb.toString(); String systemClasses = getSystemClasses();
List systemClassesList = systemClasses == null ? null : Arrays.asList(StringUtils.getTrimmedStrings(systemClasses)); loader = new ApplicationClassLoader(clientClasspath, getClass().getClassLoader(), systemClassesList);
}
else {
List classPath = new ArrayList();
classPath.add(new File(new StringBuilder().append(workDir).append("/").toString()).toURI().toURL());
classPath.add(file.toURI().toURL());
classPath.add(new File(workDir, "classes/").toURI().toURL());
File[] libs = new File(workDir, "lib").listFiles();
if (libs != null) {
for (int i = 0; i < libs.length; i++) {
classPath.add(libs[i].toURI().toURL());
}
} loader = new URLClassLoader((URL[])classPath.toArray(new URL[0]));
}
return loader;
} boolean useClientClassLoader() {
return Boolean.parseBoolean(System.getenv("HADOOP_USE_CLIENT_CLASSLOADER"));
} String getHadoopClasspath() {
return System.getenv("HADOOP_CLASSPATH");
} String getSystemClasses() {
return System.getenv("HADOOP_CLIENT_CLASSLOADER_SYSTEM_CLASSES");
}
}

  可以看到这个类将加载的jar解压掉,然后添加classpath路径,获取jar包的主函数,并执行主函数.