CDH添加第三方服务的方法

时间:2024-03-23 09:04:43

前瞻导读

CDH可以很方便的添加一些大数据相关服务,但这仅限于cloudera公司提供。若想将第三方服务(如公司自己开发的组件)添加到CDH集群(托管在CDH上),需要按照一定的规则流程制作相关程序包,最后发布到CDH上。

本文就是指导大家如何打包自己的服务,发布到CDH上,并且由CDH控制服务的运行、监控服务的基本运行状态。

制作相关介绍

名词介绍

parcel:以“.parcel”结尾的gz格式的压缩文件。它必须为一个固定规则的文件名。

命名规则必须如下:

文件名称格式为三段,第一段是包名,第二段是版本号,第三段是运行平台。

例如:FLINK-1.6.0-hadoop_2.6-scala_2.11-el7.parcel

包名:FLINK

版本号:1.6.0-hadoop_2.6-scala_2.11

运行环境:el7

el6是代表centos6系统,centos7则用el7表示

parcel包内包含了你的服务组件,同时包含一个重要的描述性文件parcel.json:

这个文件记录了你的服务信息,如版本、所属用户、适用的CDH平台版本等

parcel必须包置于/opt/cloudera/parcel-repo/目录下才可以被CDH发布程序时识别到。

csd:csd文件是一个jar包,它记录了你的服务在CDH上的管理规则

如你的服务在CDH页面上显示的图标、依赖的服务、暴露的端口、启动规则等。

csd的jar包必须置于/opt/cloudera/csd/目录才可以在添加集群服务时被识别到。

相关下载

https://github.com/cloudera/cm_csds

https://github.com/cloudera/cm_ext

制作CDH组件

整理预发布组件

将你通过测试的服务整理到一个目录内,目录内的子目录结构就是你的工程项目结构,不需要作任何变化。依赖的相关库文件可以由系统环境提供,也可以直接放置在该工程目录下。

任何语言编写的服务都可以托管到CDH。

制作flink组件包

下载flink包

https://archive.apache.org/dist/flink/flink-1.6.0/flink-1.6.0-bin-hadoop26-scala_2.11.tgz

制作parcel

parcel包的根目录结构如下:

CDH添加第三方服务的方法

parcel包目录结构由你的服务目录(lib/flink)和一个meta目录组成。

meta目录组成文件如下:

CDH添加第三方服务的方法

flink_env.sh文件可以声明你的服务运行时的bash环境下的一些变量环境,根据你的服务需要可以自行添加设置。

创建flink_env.sh文件:

vi FLINK-1.6.0-hadoop_2.6-scala_2.11/meta/flink_env.sh

FLINK-1.6.0-hadoop_2.6-scala_2.11/meta/flink_env.sh内容:

#!/bin/bash
FLINK_DIRNAME=${PARCEL_DIRNAME:-“FLINK-1.6.0-hadoop_2.6-scala_2.11”}
export FLINK_HOME=$PARCELS_ROOT/$FLINK_DIRNAME/lib/flink

parcel.json文件需要填写好相关的parcel包名、兼容的CDH平台版本信息。

创建parcel.json文件(parcel包描述):

vi FLINK-1.6.0-hadoop_2.6-scala_2.11/meta/parcel.json

FLINK-1.6.0-hadoop_2.6-scala_2.11/meta/parcel.json内容:

{
“schema_version”: 1,
“name”: “FLINK”,
“version”: “1.6.0-hadoop_2.6-scala_2.11”,
“depends”: “CDH (>= 5.2), CDH (<< 6.0)”,
“setActiveSymlink”: true,
“replaces”: “FLINK”,
“scripts”: {
“defines”: “flink_env.sh”
},
“packages”: [{
“name”: “flink-master”,
“version”: “1.6.0+flink1.6.0”
},
{
“name”: “flink-worker”,
“version”: “1.6.0+flink1.6.0”
}],
“components”: [{
“name”: “flink”,
“version”: “1.6.0-flink1.6.0”,
“pkg_version”: “1.6.0+flink1.6.0”,
“pkg_release”: “hadoop_2.6-scala_2.11”
}],
“provides”: [“flink”],
“users”: {
“flink”: {
“longname”: “Flink”,
“home”: “/var/lib/flink”,
“shell”: “/bin/bash”,
“extra_groups”: []
}
},
“groups”: [“flink”]
}

注意:务必注意文件内容的大小写,否则可能造成parcel包无法发布的情况。

创建flink-master.sh文件:

vi FLINK-1.6.0-hadoop_2.6-scala_2.11/lib/flink/bin/flink-master.sh

FLINK-1.6.0-hadoop_2.6-scala_2.11/lib/flink/bin/flink-master.sh内容:

#!/bin/bash
# Flink Master.
USAGE=“Usage: flink-master.sh (start|stop)”
bin=`dirname “$0”`
bin=`cd “$bin”; pwd`
. “$bin”/config.sh
if [ ! -z “${FLINK_JM_HEAP_MB}” ] && [ “${FLINK_JM_HEAP}” == 0 ]; then
echo “used deprecated key \`${KEY_JOBM_MEM_MB}\`, please replace with
key \`${KEY_JOBM_MEM_SIZE}\`”
else
flink_jm_heap_bytes=$(parseBytes ${FLINK_JM_HEAP})
FLINK_JM_HEAP_MB=$(getMebiBytes ${flink_jm_heap_bytes})
fi
if [[ ! ${FLINK_JM_HEAP_MB} =~ $IS_NUMBER ]] || [[
“${FLINK_JM_HEAP_MB}” -lt “0” ]]; then
echo “[ERROR] Configured JobManager memory size is not a valid value. Please
set ‘${KEY_JOBM_MEM_SIZE}’ in ${FLINK_CONF_FILE}.”
exit 1
fi
if [ “${FLINK_JM_HEAP_MB}” -gt “0” ]; then
export JVM_ARGS="$JVM_ARGS -Xms"$FLINK_JM_HEAP_MB"m
-Xmx"$FLINK_JM_HEAP_MB"m"
fi
# Add JobManager-specific JVM options
export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS}
${FLINK_ENV_JAVA_OPTS_JM}"
# Startup parameters
ARGS=("–configDir" “${FLINK_CONF_DIR}” “–executionMode” “cluster”
“–host” “${FLINK_MASTER_HOST}” “–webui-port” “${FLINK_WEB_UI_PORT}”)
echo “FLINK_MASTER_HOST: $FLINK_MASTER_HOST”
echo “FLINK_WEB_UI_PORT: $FLINK_WEB_UI_PORT”
echo “FLINK_LOG_DIR: ${FLINK_LOG_DIR}”
echo “MASTER_ARGS: ${ARGS[@]}”
CLASS_TO_RUN=org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint
FLINK_TM_CLASSPATH=`constructFlinkClassPath`
FLINK_LOG_PREFIX="${FLINK_LOG_DIR}/flink-master"
log="${FLINK_LOG_PREFIX}.log"
out="${FLINK_LOG_PREFIX}.out"
log_setting=("-Dlog.file=${log}"
“-Dlog4j.configuration=file:${FLINK_CONF_DIR}/log4j.properties”
“-Dlogback.configurationFile=file:${FLINK_CONF_DIR}/logback.xml”)
JAVA_VERSION=$(${JAVA_RUN} -version 2>&1 | sed ‘s/.*version
“\(.*\)\.\(.*\)\…*”/\1\2/; 1q’)
# Only set JVM 8 arguments if we have correctly extracted the version
if [[ ${JAVA_VERSION} =~ ${IS_NUMBER} ]]; then
if [ “$JAVA_VERSION” -lt 18 ]; then
JVM_ARGS="$JVM_ARGS -XX:MaxPermSize=256m"
fi
fi
MY_PID=$(ps -ef | grep “$CLASS_TO_RUN” | grep -v grep | awk ‘{print
$2}’)
if [ “${MY_PID}” = “” ];then
# Rotate log files
rotateLogFilesWithPrefix “$FLINK_LOG_DIR” “$FLINK_LOG_PREFIX”
# Evaluate user options for local variable expansion
FLINK_ENV_JAVA_OPTS=$(eval echo ${FLINK_ENV_JAVA_OPTS})
CLASS_PATH=`manglePathList “$FLINK_TM_CLASSPATH:$(hadoop classpath)”`
CLASS_PATH=$(echo “${CLASS_PATH}” | sed
“s#”$FLINK_HOME"/lib/slf4j-log4j12-1.7.7.jar:##g")
echo “Starting $DAEMON daemon (pid: $!) on host $HOSTNAME.”
exec $JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} “${log_setting[@]}”
-classpath “${CLASS_PATH}” ${CLASS_TO_RUN} “${ARGS[@]}” > “$out” 2>&1
else
echo “$DAEMON daemon (pid: $MY_PID) is running on host $HOSTNAME.”
fi

flink-master.sh文件用于启动flink的master管理节点。

注意:flink-master.sh脚本中的exec命令是必须的。

创建flink-worker.sh文件:

vi FLINK-1.6.0-hadoop_2.6-scala_2.11/lib/flink/bin/flink- worker.sh

FLINK-1.6.0-hadoop_2.6-scala_2.11/lib/flink/bin/flink- worker.sh内容:

#!/bin/bash
#Flink Worker.
USAGE=“Usage: flink-worker.sh (start|stop)”
OPERATION=$1
bin=`dirname “$0”`
bin=`cd “$bin”; pwd`
. “$bin”/config.sh
# if memory allocation mode is lazy and no other JVM options are set,
# set the ‘Concurrent Mark Sweep GC’
if [[ $FLINK_TM_MEM_PRE_ALLOCATE == “false” ]] && [ -z
“${FLINK_ENV_JAVA_OPTS}” ] && [ -z “${FLINK_ENV_JAVA_OPTS_TM}” ]; then
export JVM_ARGS="$JVM_ARGS -XX:+UseG1GC"
fi
if [ ! -z “${FLINK_TM_HEAP_MB}” ] && [ “${FLINK_TM_HEAP}” == 0 ]; then
echo “used deprecated key \`${KEY_TASKM_MEM_MB}\`, pelase replace with
key \`${KEY_TASKM_MEM_SIZE}\`”
else
flink_tm_heap_bytes=$(parseBytes ${FLINK_TM_HEAP})
FLINK_TM_HEAP_MB=$(getMebiBytes ${flink_tm_heap_bytes})
fi
if [[ ! ${FLINK_TM_HEAP_MB} =~ ${IS_NUMBER} ]] || [[
“${FLINK_TM_HEAP_MB}” -lt “0” ]]; then
echo “[ERROR] Configured TaskManager JVM heap size is not a number. Please
set ‘${KEY_TASKM_MEM_SIZE}’ in ${FLINK_CONF_FILE}.”
exit 1
fi
if [ “${FLINK_TM_HEAP_MB}” -gt “0” ]; then
TM_HEAP_SIZE=$(calculateTaskManagerHeapSizeMB)
# Long.MAX_VALUE in TB: This is an upper bound, much less direct memory will be used
TM_MAX_OFFHEAP_SIZE=“8388607T”
export JVM_ARGS="${JVM_ARGS} -Xms${TM_HEAP_SIZE}M -Xmx${TM_HEAP_SIZE}M
-XX:MaxDirectMemorySize=${TM_MAX_OFFHEAP_SIZE}"
fi
# Add TaskManager-specific JVM options
export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS}
${FLINK_ENV_JAVA_OPTS_TM}"
# Startup parameters
ARGS=("–configDir" “${FLINK_CONF_DIR}”)
echo “FLINK_LOG_DIR: ${FLINK_LOG_DIR}”
echo “MASTER_ARGS: ${ARGS[@]}”
CLASS_TO_RUN=org.apache.flink.runtime.taskexecutor.TaskManagerRunner
FLINK_TM_CLASSPATH=`constructFlinkClassPath`
FLINK_LOG_PREFIX="${FLINK_LOG_DIR}/flink-worker"
log="${FLINK_LOG_PREFIX}.log"
out="${FLINK_LOG_PREFIX}.out"
log_setting=("-Dlog.file=${log}"
“-Dlog4j.configuration=file:${FLINK_CONF_DIR}/log4j.properties”
“-Dlogback.configurationFile=file:${FLINK_CONF_DIR}/logback.xml”)
JAVA_VERSION=$(${JAVA_RUN} -version 2>&1 | sed ‘s/.*version
“\(.*\)\.\(.*\)\…*”/\1\2/; 1q’)
# Only set JVM 8 arguments if we have correctly extracted the version
if [[ ${JAVA_VERSION} =~ ${IS_NUMBER} ]]; then
if [ “$JAVA_VERSION” -lt 18 ]; then
JVM_ARGS="$JVM_ARGS -XX:MaxPermSize=256m"
fi
fi
MY_PID=$(ps -ef | grep “$CLASS_TO_RUN” | grep -v grep | awk ‘{print
$2}’)
if [ “${MY_PID}” = “” ];then
# Rotate log files
rotateLogFilesWithPrefix “$FLINK_LOG_DIR” “$FLINK_LOG_PREFIX”
# Evaluate user options for local variable expansion
FLINK_ENV_JAVA_OPTS=$(eval echo ${FLINK_ENV_JAVA_OPTS})
CLASS_PATH=`manglePathList “$FLINK_TM_CLASSPATH:$(hadoop classpath)”`
CLASS_PATH=$(echo “${CLASS_PATH}” | sed
“s#”$FLINK_HOME"/lib/slf4j-log4j12-1.7.7.jar:##g")
echo “Starting $DAEMON daemon (pid: $!) on host $HOSTNAME.”
exec $JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} “${log_setting[@]}”
-classpath “${CLASS_PATH}” ${CLASS_TO_RUN} “${ARGS[@]}” > “$out” 2>&1
else
echo “$DAEMON daemon (pid: $MY_PID) is running on host $HOSTNAME.”
fi

flink-worker.sh文件用于启动flink的worker任务节点。

注意:flink-worker.sh脚本中的exec命令是必须的。

创建flink-yarn.sh文件:

vi FLINK-1.6.0-hadoop_2.6-scala_2.11/lib/flink/bin/flink-yarn.sh

FLINK-1.6.0-hadoop_2.6-scala_2.11/lib/flink/bin/flink-yarn.sh内容:

#!/bin/bash
bin=`dirname “$0”`
bin=`cd “$bin”; pwd`
# get Flink config
. “$bin”/config.sh
JVM_ARGS="$JVM_ARGS -Xmx512m"
CLASS_TO_RUN=org.apache.flink.yarn.cli.FlinkYarnSessionCli
log=$FLINK_LOG_DIR/flink-yarn.log
out=$FLINK_LOG_DIR/flink-yarn.out
log_setting="-Dlog.file="$log"
-Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j-yarn-session.properties
-Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback-yarn.xml"
# Rotate log files
rotateLogFilesWithPrefix “$FLINK_LOG_DIR” “$FLINK_LOG_PREFIX”
CLASS_PATH=`manglePathList $(constructFlinkClassPath)????(hadoop
classpath)`
CLASS_PATH=$(echo “${CLASS_PATH}” | sed
“s#”$FLINK_HOME"/lib/slf4j-log4j12-1.7.7.jar:##g")
exec $JAVA_RUN $JVM_ARGS -classpath “$CLASS_PATH” $log_setting
${CLASS_TO_RUN} -j “$FLINK_LIB_DIR”/flink-dist*.jar “[email protected]” > “$out”
2>&1

flink-yarn.sh文件用于在yarn中启动flink。

注意:flink-yarn.sh脚本中的exec命令是必须的。

创建permissions.json文件:

vi FLINK-1.6.0-hadoop_2.6-scala_2.11/meta/permissions.json

FLINK-1.6.0-hadoop_2.6-scala_2.11/meta/permissions.json内容:

{
“lib/flink/bin/config.sh”: {
“user”: “flink”,
“group”: “flink”,
“permissions”: “0755”
},
“lib/flink/bin/flink-master.sh”: {
“user”: “flink”,
“group”: “flink”,
“permissions”: “0755”
},
“lib/flink/bin/flink-worker.sh”: {
“user”: “flink”,
“group”: “flink”,
“permissions”: “0755”
},
“lib/flink/bin/flink-yarn.sh”: {
“user”: “flink”,
“group”: “flink”,
“permissions”: “0755”
}
}

permissions.json文件用于授予文件或文件夹权限。

进入FLINK-1.6.0-hadoop_2.6-scala_2.11所在目录。

创建文件夹parcel-el6:

mkdir parcel-el6

将目录FLINK-1.6.0-hadoop_2.6-scala_2.11打包,打包成标准名称的parcel文件。

执行以下命令:

tar -czvf parcel-el6/FLINK-1.6.0-hadoop_2.6-scala_2.11-el6.parcel
FLINK-1.6.0-hadoop_2.6-scala_2.11

打包完Parcel文件,需要生成FLINK-1.6.0-hadoop_2.6-scala_2.11-el6.parcel.sha文件,作为FLINK-1.6.0-hadoop_2.6-scala_2.11-el6.parcel文件的hash校验值。

该校验值由make_manifest.py生成,执行完该python命令,生成manifest.json文件,文件中包含hash校验值信息。

make_manifest.py是一个python脚本,在2.4使用工具cm_ext中。

CDH添加第三方服务的方法

执行以下命令:

python cm_ext-master/make_manifest/make_manifest.py parcel-el6

parcel-el6是FLINK-1.6.0-hadoop_2.6-scala_2.11-el6.parcel的父级目录。

上面的命令会在parcel-el6文件夹下生成一个文件manifest.json。

parcel-el6/manifest.json内容:

{
“parcels”: [{
“hash”: “b548e8b4be3db290933222e4bd517c903d36d453”,
“depends”: “CDH (>= 5.2), CDH (<< 6.0)”,
“replaces”: “FLINK”,
“parcelName”: “FLINK-1.6.0-hadoop_2.6-scala_2.11-el6.parcel”,
“components”: [{
“pkg_version”: “1.6.0+flink1.6.0”,
“version”: “1.6.0-flink1.6.0”,
“name”: “flink”,
“pkg_release”: “hadoop_2.6-scala_2.11”
}]
}],
“lastUpdated”: 1538048224076
}

创建FLINK-1.6.0-hadoop_2.6-scala_2.11-el6.parcel.sha文件:

echo $(cat parcel-el6/manifest.json | grep hash | awk -F"\"" ‘{print
$4}’) > parcel-el6/FLINK-1.6.0-hadoop_2.6-scala_2.11-el6.parcel.sha

parcel-el6/FLINK-1.6.0-hadoop_2.6-scala_2.11-el6.parcel.sha内容:

b548e8b4be3db290933222e4bd517c903d36d453

最终你会得到三个文件:

FLINK-1.6.0-hadoop_2.6-scala_2.11-el6.parcel

FLINK-1.6.0-hadoop_2.6-scala_2.11-el6.parcel.sha

manifest.json

将这三个文件复制到/opt/cloudera/parcel-repo/目录下即可。

如果/opt/cloudera/parcel-repo/目录已存在manifest.json文件,需将以下内容:

{
“parcelName”: “FLINK-1.6.0-hadoop_2.6-scala_2.11-el7.parcel”,
“components”: [{
“pkg_version”: “1.6.0+flink1.6.0”,
“version”: “1.6.0-flink1.6.0”,
“name”: “flink”,
“pkg_release”: “hadoop_2.6-scala_2.11”
}],
“depends”: “CDH (>= 5.2), CDH (<< 6.0)”,
“hash”: “ce75a90cd57aecd7e31bef15dd1221c6182e38c6”
}

添加到已有manifest.json文件中。

制作standlaone csd jar

csd文件的目录结构如下:

CDH添加第三方服务的方法

descriptor 放服务的规则描述文件service.sdl。

images 放服务的图标文件,png格式。不放图标文件,则CDH页面不显示图标。

scripts
放你的服务的启动脚本,你的服务如何启动在scripts目录下的control.sh中自行定义即可。

创建文件夹:

mkdir -p FLINK-1.6.0/descriptor

mkdir -p FLINK-1.6.0/images

mkdir -p FLINK-1.6.0/scripts

创建service.sdl文件:

vi FLINK-1.6.0/descriptor/service.sdl

FLINK-1.6.0/descriptor/service.sdl内容:

{
“name”: “FLINK”,
“label”: “Flink(Standalone)”,
“description”: “Apache Flink is a framework and distributed processing
engine for stateful computations over unbounded and bounded data streams.”,
“version”: “1.6.0”,
“compatibility”: {
“generation”: 1,
“cdhVersion”: {
“min”: “5”,
“max”: “5”
}
},
“runAs”: {
“user”: “flink”,
“group”: “flink”
},
“icon”: “images/flink.png”,
“serviceDependencies”: [{
“name”: “ZOOKEEPER”,
“required”: “true”
},
{
“name”: “HDFS”,
“required”: “true”
}],
“parameters”: [{
“name”: “high-availability.storageDir”,
“label”: “high-availability storageDir”,
“description”: “HDFS path (URI) where Flink persists metadata in
high-availability setups.”,
“type”: “string”,
“default”: “hdfs:///user/flink/ha”,
“configurableInWizard”: true
},
{
“name”: “high-availability.zookeeper.path.root”,
“label”: “high-availability zookeeper path root”,
“description”: “The root path under which Flink stores its entries in
ZooKeeper.”,
“type”: “string”,
“default”: “/flink”,
“configurableInWizard”: true
},
{
“name”: “high-availability.cluster-id”,
“label”: “high-availability cluster-id”,
“description”: “The ID of the Flink cluster, used to separate multiple Flink
clusters from each other.”,
“type”: “string”,
“default”: “cluster_standalone”,
“configurableInWizard”: true
},
{
“name”: “state.checkpoints.dir”,
“label”: “state checkpoints dir”,
“description”: “HDFS path (URI) for checkpoints.”,
“type”: “string”,
“default”: “hdfs:///user/flink/cluster_standalone/checkpoints”,
“configurableInWizard”: true
},
{
“name”: “state.savepoints.dir”,
“label”: “state savepoints dir”,
“description”: “HDFS path (URI) for savepoints.”,
“type”: “string”,
“default”: “hdfs:///user/flink/cluster_standalone/savepoints”,
“configurableInWizard”: true
},
{
“name”: “parallelism.default”,
“label”: “parallelism default”,
“description”: “The parallelism used for programs that did not specify and
other parallelism.”,
“type”: “long”,
“default”: 1,
“configurableInWizard”: true
}],
“hdfsDirs”: [{
“name”: “CreateFlinkUserDirCommand”,
“label”: “Create Flink User Dir”,
“description”: “Creates the Flink user directory in HDFS.”,
“directoryDescription”: “Flink HDFS user directory”,
“path”: “/user/${user}”,
“permissions”: “0751”
}],
“serviceInit”: {
“preStartSteps”: [{
“commandName”: “CreateFlinkUserDirCommand”
}]},
“roles”: [{
“name”: “FLINK_MASTER”,
“label”: “Flink Master”,
“pluralLabel”: “Flink Masters”,
“startRunner”: {
“program”: “scripts/control.sh”,
“args”: [“master”],
“environmentVariables”: {
“FLINK_MASTER_HOST”: “${host}”,
“FLINK_WEB_UI_PORT”: “${rest.port}”,
“FLINK_RUN_AS_USER”: “${user}”
}
},
“externalLink”: {
“name”: “web_dashboard”,
“label”: “Web Dashboard”,
“url”: “http://${host}????{rest.port}”
},
“parameters”: [
{
“name”: “jobmanager.heap.size”,
“label”: “jobmanager heap size”,
“description”: “The heap size for the JobManager JVM.”,
“type”: “string”,
“default”: “1024m”,
“configurableInWizard”: true
},
{
“name”: “rest.port”,
“label”: “rest port”,
“description”: “The port under which the web-based runtime monitor
listens.”,
“type”: “long”,
“default”: 8081,
“configurableInWizard”: true
}],
“topology”: {
“minInstances”: 1
},
“logging”: {
“filename”: “flink-master.log”,
“isModifiable”: true,
“configName”: “env.log.dir”,
“loggingType”: “log4j”,
“dir”: “/var/log/flink”
},
“configWriter”: {
“generators”: [{
“filename”: “flink-conf.properties”,
“configFormat”: “properties”,
“includeParams”: [
“high-availability.storageDir”,
“high-availability.zookeeper.path.root”,
“high-availability.cluster-id”,
“state.savepoints.dir”,
“state.checkpoints.dir”,
“jobmanager.heap.size”,
“parallelism.default”,
“rest.port”
]
}]
}
},
{
“name”: “FLINK_WORKER”,
“label”: “Flink Worker”,
“pluralLabel”: “Flink Workers”,
“startRunner”: {
“program”: “scripts/control.sh”,
“args”: [“worker”],
“environmentVariables”: {
“FLINK_RUN_AS_USER”: “${user}”
}
},
“parameters”: [{
“name”: “taskmanager.heap.size”,
“label”: “taskmanager heap size”,
“description”: “The heap size for the TaskManager JVM.”,
“type”: “string”,
“default”: “1024m”,
“configurableInWizard”: true
},
{
“name”: “taskmanager.numberOfTaskSlots”,
“label”: “taskmanager numberOfTaskSlots”,
“description”: “The number of task slots that each TaskManager offers. Each
slot runs one parallel pipeline.”,
“type”: “long”,
“default”: 1,
“configurableInWizard”: true
}],
“topology”: {
“minInstances”: 1
},
“logging”: {
“filename”: “flink-worker.log”,
“isModifiable”: true,
“configName”: “env.log.dir”,
“loggingType”: “log4j”,
“dir”: “/var/log/flink”
},
“configWriter”: {
“generators”: [{
“filename”: “flink-conf.properties”,
“configFormat”: “properties”,
“includeParams”: [
“high-availability.storageDir”,
“high-availability.zookeeper.path.root”,
“high-availability.cluster-id”,
“state.savepoints.dir”,
“state.checkpoints.dir”,
“taskmanager.heap.size”,
“taskmanager.numberOfTaskSlots”,
“parallelism.default”
]
}]
}
}]
}

运行用户配置:

CDH添加第三方服务的方法

图标配置:

CDH添加第三方服务的方法

依赖组件配置:

CDH添加第三方服务的方法

通用参数配置:

CDH添加第三方服务的方法

hdfs目录创建配置:

CDH添加第三方服务的方法

Flink Master节点启动配置:

CDH添加第三方服务的方法

WEB UI链接配置:

CDH添加第三方服务的方法

Flink Master节点参数配置:

CDH添加第三方服务的方法

Flink Master节点topology配置:

CDH添加第三方服务的方法

Flink Master节点日志配置:

CDH添加第三方服务的方法

Flink Master节点配置文件生成配置:

CDH添加第三方服务的方法

创建control.sh文件:

vi FLINK-1.6.0/scripts/control.sh

FLINK-1.6.0/scripts/control.sh内容:

#!/bin/bash
# For better debugging
USAGE=“Usage: control.sh ((master|worker) (start|stop))”
NODE_TYPE=$1
NODE_HOST=`hostname -f`
#Determine if the directory exists
TEMP_PATH=$CMF_VAR/…/cloudera/parcels
if [ ! -d “$TEMP_PATH” ];then
TEMP_PATH=$CMF_VAR/…/…/cloudera/parcels
fi
PARCELS_DIR=`cd $TEMP_PATH; pwd`
FLINK_HOME=$PARCELS_DIR/FLINK-1.6.0-hadoop_2.6-scala_2.11/lib/flink
#Determine if the configuration file directory exists
FLINK_CONF_DIR=$CONF_DIR/flink-conf
if [ ! -d “$FLINK_CONF_DIR” ];then
mkdir $FLINK_CONF_DIR
else
rm -rf $FLINK_CONF_DIR/*
fi
cp $FLINK_HOME/conf/* $FLINK_CONF_DIR/
sed -i ‘s#=#: #g’ $CONF_DIR/flink-conf.properties
if [ “$NODE_TYPE” = “master” ]; then
RPC_ADDRESS=`cat $CONF_DIR/flink-conf.properties | grep
“jobmanager.rpc.address:”`
#Determine if the variable RPC_ADDRESS is empty
if [ “$RPC_ADDRESS” = “” ]; then
echo “jobmanager.rpc.address: $FLINK_MASTER_HOST” >>
$CONF_DIR/flink-conf.properties
RPC_ADDRESS=`cat $CONF_DIR/flink-conf.properties | grep
“jobmanager.rpc.address:”`
echo “RPC_ADDRESS: $RPC_ADDRESS”
fi
fi
HIGH_MODE=`cat $CONF_DIR/flink-conf.properties | grep
“high-availability:”`
#Determine if the variable HIGH_MODE is empty
if [ “$HIGH_MODE” = “” ]; then
echo “high-availability: zookeeper” >> $CONF_DIR/flink-conf.properties
HIGH_MODE=`cat $CONF_DIR/flink-conf.properties | grep
“high-availability:”`
echo “HIGH_MODE: $HIGH_MODE”
fi
HIGH_ZK_QUORUM=`cat $CONF_DIR/flink-conf.properties | grep
“high-availability.zookeeper.quorum:”`
#Determine if the variable HIGH_ZK_QUORUM is empty
if [ “$HIGH_ZK_QUORUM” = “” ]; then
echo "high-availability.zookeeper.quorum: "$ZK_QUORUM >>
$CONF_DIR/flink-conf.properties
HIGH_ZK_QUORUM=`cat $CONF_DIR/flink-conf.properties | grep
“high-availability.zookeeper.quorum:”`
echo “HIGH_ZK_QUORUM: $HIGH_ZK_QUORUM”
fi
cp $CONF_DIR/flink-conf.properties $FLINK_CONF_DIR/flink-conf.yaml
HADOOP_CONF_DIR=$CONF_DIR/hadoop-conf
export FLINK_HOME FLINK_CONF_DIR HADOOP_CONF_DIR
echo “”
echo “Date: `date`”
echo “Host: $NODE_HOST”
echo “NODE_TYPE: $NODE_TYPE”
echo “ZK_QUORUM: $ZK_QUORUM”
echo “FLINK_HOME: $FLINK_HOME”
echo “FLINK_CONF_DIR: $FLINK_CONF_DIR”
echo “”
exec $FLINK_HOME/bin/flink-$NODE_TYPE.sh

 注意:control.sh脚本中的exec命令是必须的。

用jar命令打包,得到csd的jar文件,进入FLINK-1.6.0文件夹。

执行命令:

jar -cvf FLINK-1.6.0 *

将FLINK-1.6.0.jar复制到/opt/cloudera/csd/目录下。

制作yarn csd jar

csd文件的目录结构如下:

CDH添加第三方服务的方法

descriptor 放服务的规则描述文件service.sdl。

images 放服务的图标文件,png格式。不放图标文件,则CDH页面不显示图标。

scripts
放你的服务的启动脚本,你的服务如何启动在scripts目录下的control.sh中自行定义即可。

创建文件夹:

mkdir -p FLINK_ON_YARN-1.6.0/descriptor

mkdir -p FLINK_ON_YARN-1.6.0/images

mkdir -p FLINK_ON_YARN-1.6.0/scripts

创建service.sdl文件:

vi FLINK_ON_YARN-1.6.0/descriptor/service.sdl

FLINK_ON_YARN-1.6.0/descriptor/service.sdl内容:

{
“name”: “FLINK_ON_YARN”,
“label”: “Flink(yarn)”,
“description”: “Apache Flink is a framework and distributed processing
engine for stateful computations over unbounded and bounded data streams.”,
“version”: “1.6.0”,
“compatibility”: {
“generation”: 1,
“cdhVersion”: {
“min”: “5”,
“max”: “5”
}
},
“runAs”: {
“user”: “flink”,
“group”: “flink”
},
“icon”: “images/flink.png”,
“serviceDependencies”: [{
“name”: “ZOOKEEPER”,
“required”: “true”
},
{
“name”: “YARN”,
“required”: “true”
}],
“parameters”: [{
“name”: “high-availability.storageDir”,
“label”: “high-availability storageDir”,
“description”: “HDFS path (URI) where Flink persists metadata in
high-availability setups.”,
“type”: “string”,
“default”: “hdfs:///user/flink/ha”,
“configurableInWizard”: true
},
{
“name”: “high-availability.zookeeper.path.root”,
“label”: “high-availability zookeeper path root”,
“description”: “The root path under which Flink stores its entries in
ZooKeeper.”,
“type”: “string”,
“default”: “/flink”,
“configurableInWizard”: true
},
{
“name”: “high-availability.cluster-id”,
“label”: “high-availability cluster-id”,
“description”: “The ID of the Flink cluster, used to separate multiple Flink
clusters from each other.”,
“type”: “string”,
“default”: “cluster_yarn”,
“configurableInWizard”: true
},
{
“name”: “state.checkpoints.dir”,
“label”: “state checkpoints dir”,
“description”: “HDFS path (URI) for checkpoints.”,
“type”: “string”,
“default”: “hdfs:///user/flink/cluster_yarn/checkpoints”,
“configurableInWizard”: true
},
{
“name”: “state.savepoints.dir”,
“label”: “state savepoints dir”,
“description”: “HDFS path (URI) for savepoints.”,
“type”: “string”,
“default”: “hdfs:///user/flink/cluster_yarn/savepoints”,
“configurableInWizard”: true
},
{
“name”: “jobmanager.heap.size”,
“label”: “jobmanager heap size”,
“description”: “The heap size for the JobManager JVM.”,
“type”: “string”,
“default”: “1024m”,
“configurableInWizard”: true
},
{
“name”: “taskmanager.heap.size”,
“label”: “taskmanager heap size”,
“description”: “The heap size for the TaskManager JVM.”,
“type”: “string”,
“default”: “1024m”,
“configurableInWizard”: true
},
{
“name”: “taskmanager.numberOfTaskSlots”,
“label”: “taskmanager numberOfTaskSlots”,
“description”: “The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.”,
“type”: “long”,
“default”: 1,
“configurableInWizard”: true
},
{
“name”: “yarn.application-attempts”,
“label”: “yarn.application-attempts”,
“description”: “Number of ApplicationMaster restarts.”,
“type”: “long”,
“default”: 1,
“configurableInWizard”: true
},
{
“name”: “yarn.taskmanagers”,
“label”: “yarn.taskmanagers”,
“description”: “Number of YARN container to allocate (=Number of Task
Managers).”,
“type”: “long”,
“default”: 1,
“configurableInWizard”: true
},
{
“name”: “streaming.mode”,
“label”: “streaming.mode”,
“description”: “Start Flink in streaming mode.”,
“type”: “boolean”,
“default”: true,
“configurableInWizard”: true
},
{
“name”: “rest.port”,
“label”: “rest port”,
“description”: “The port under which the web-based runtime monitor
listens.”,
“type”: “long”,
“default”: 8081,
“configurableInWizard”: true
}],
“hdfsDirs”: [{
“name”: “CreateFlinkUserDirCommand”,
“label”: “Create Flink User Dir”,
“description”: “Creates the Flink user directory in HDFS.”,
“directoryDescription”: “Flink HDFS user directory”,
“path”: “/user/${user}”,
“permissions”: “0751”
}],
“serviceInit”: {
“preStartSteps”: [{
“commandName”: “CreateFlinkUserDirCommand”
}]
},
“roles”: [{
“name”: “FLINK_YARN”,
“label”: “Flink Yarn”,
“pluralLabel”: “Flink Yarns”,
“startRunner”: {
“program”: “scripts/control.sh”,
“args”: [“start”],
“environmentVariables”: {
“FLINK_TASK_MANAGERS”: “${yarn.taskmanagers}”,
“FLINK_STREAMING_MODE”: “${streaming.mode}”,
“FLINK_RUN_AS_USER”: “${user}”
}
},
“stopRunner”: {
“runner”: {
“program”: “scripts/control.sh”,
“args”: [“stop”],
“environmentVariables”: {
“FLINK_RUN_AS_USER”: “${user}”
}
}
},
“topology”: {
“minInstances”: 1
},
“logging”: {
“filename”: “flink-yarn.out”,
“isModifiable”: true,
“configName”: “env.log.dir”,
“loggingType”: “log4j”,
“dir”: “/var/log/flink”
},
“configWriter”: {
“generators”: [{
“filename”: “flink-conf.properties”,
“configFormat”: “properties”,
“includeParams”: [
“high-availability.storageDir”,
“high-availability.zookeeper.path.root”,
“high-availability.cluster-id”,
“state.savepoints.dir”,
“state.checkpoints.dir”,
“yarn.application-attempts”,
“jobmanager.heap.size”,
“taskmanager.heap.size”,
“taskmanager.numberOfTaskSlots”,
“rest.port”
]
}]
}
}]
}

Flink Yarn停止配置:

CDH添加第三方服务的方法

其他配置说明同3.2.2制作standlaone csd jar

创建control.sh文件:

vi FLINK_ON_YARN-1.6.0/scripts/control.sh

FLINK_ON_YARN-1.6.0/scripts/control.sh内容:

#!/bin/bash
# For better debugging
USAGE=“Usage: control.sh (start|stop)”
OPERATION=$1
case $OPERATION in
(start)
NODE_HOST=`hostname -f`
#Determine if the directory exists
TEMP_PATH=$CMF_VAR/…/cloudera/parcels
if [ ! -d “$TEMP_PATH” ];then
TEMP_PATH=$CMF_VAR/…/…/cloudera/parcels
fi
PARCELS_DIR=`cd $TEMP_PATH; pwd`
FLINK_HOME=$PARCELS_DIR/FLINK-1.6.0-hadoop_2.6-scala_2.11/lib/flink
#Determine if the configuration file directory exists
FLINK_CONF_DIR=$CONF_DIR/flink-conf
if [ ! -d “$FLINK_CONF_DIR” ];then
mkdir $FLINK_CONF_DIR
else
rm -rf $FLINK_CONF_DIR/*
fi
cp $FLINK_HOME/conf/* $FLINK_CONF_DIR/
sed -i ‘s#=#: #g’ $CONF_DIR/flink-conf.properties
HIGH_MODE=`cat $CONF_DIR/flink-conf.properties | grep
“high-availability:”`
#Determine if the variable HIGH_MODE is empty
if [ “$HIGH_MODE” = “” ]; then
echo “high-availability: zookeeper” >> $CONF_DIR/flink-conf.properties
HIGH_MODE=`cat $CONF_DIR/flink-conf.properties | grep
“high-availability:”`
echo “HIGH_MODE: $HIGH_MODE”
fi
HIGH_ZK_QUORUM=`cat $CONF_DIR/flink-conf.properties | grep
“high-availability.zookeeper.quorum:”`
#Determine if the variable HIGH_ZK_QUORUM is empty
if [ “$HIGH_ZK_QUORUM” = “” ]; then
echo "high-availability.zookeeper.quorum: "$ZK_QUORUM >>
$CONF_DIR/flink-conf.properties
HIGH_ZK_QUORUM=`cat $CONF_DIR/flink-conf.properties | grep
“high-availability.zookeeper.quorum:”`
echo “HIGH_ZK_QUORUM: $HIGH_ZK_QUORUM”
fi
cp $CONF_DIR/flink-conf.properties $FLINK_CONF_DIR/flink-conf.yaml
HADOOP_CONF_DIR=$CONF_DIR/yarn-conf
export FLINK_HOME FLINK_CONF_DIR HADOOP_CONF_DIR
echo “”
echo “Date: `date`”
echo “Host: $NODE_HOST”
echo “NODE_TYPE: $NODE_TYPE”
echo “ZK_QUORUM: $ZK_QUORUM”
echo “FLINK_HOME: $FLINK_HOME”
echo “FLINK_CONF_DIR: $FLINK_CONF_DIR”
echo “”

if [ “$FLINK_STREAMING_MODE” = “true” ]; then
exec $FLINK_HOME/bin/flink-yarn.sh --container $FLINK_TASK_MANAGERS
–streaming
else
exec $FLINK_HOME/bin/flink-yarn.sh --container $FLINK_TASK_MANAGERS
fi
;;
(stop)
YARN_CONFIG_FILE=/tmp/.yarn-properties-$FLINK_RUN_AS_USER
if [ -r “$YARN_CONFIG_FILE” ]; then
YARN_APP_ID=`cat $YARN_CONFIG_FILE | grep applicationID | awk -F “=”
‘{print $2}’`
if [ “$YARN_APP_ID” != “” ]; then
echo “kill flink yarn application $YARN_APP_ID .”
yarn application -kill $YARN_APP_ID
fi
fi
CLASS_TO_RUN=org.apache.flink.yarn.cli.FlinkYarnSessionCli
FLINK_YARN_PID=$(ps -ef | grep $CLASS_TO_RUN | grep -v grep | awk
‘{print $2}’)
if [ “$FLINK_YARN_PID” != “” ]; then
echo “kill flink yarn client $FLINK_YARN_PID .”
kill $FLINK_YARN_PID
fi
;;
(*)
echo “Unknown daemon ‘${OPERATION}’. $USAGE.”
exit 1
;;
esac

注意:control.sh脚本中的exec命令是必须的。

用jar命令打包,得到csd的jar文件,进入FLINK-1.6.0文件夹。

执行命令:

jar -cvf FLINK_ON_YARN-1.6.0 *

将FLINK_ON_YARN-1.6.0.jar复制到/opt/cloudera/csd/目录下。

制作activemq组件包

下载activemq包

http://archive.apache.org/dist/activemq/5.10.2/apache-activemq-5.10.2-bin.tar.gz

http://central.maven.org/maven2/io/hawt/hawtio-default-offline/1.4.68/hawtio-default-offline-1.4.68.war

制作parcel

parcel包的根目录结构如下:

CDH添加第三方服务的方法

parcel包目录结构由你的服务目录(lib/activemq)和一个meta目录组成。

meta目录组成文件如下:

CDH添加第三方服务的方法

activemq_env.sh文件可以声明你的服务运行时的bash环境下的一些变量环境,根据你的服务需要可以自行添加设置。

创建activemq_env.sh文件:

#!/bin/bash
ACTIVEMQ_DIRNAME=${PARCEL_DIRNAME:-“ACTIVEMQ-5.10.2-hawtio_1.4.68”}
export ACTIVEMQ_HOME=$PARCELS_ROOT/$ACTIVEMQ_DIRNAME/lib/activemq

parcel.json文件需要填写好相关的parcel包名、兼容的CDH平台版本信息。

创建parcel.json文件:

vi ACTIVEMQ-5.10.2-hawtio_1.4.68/meta/parcel.json

ACTIVEMQ-5.10.2-hawtio_1.4.68/meta/parcel.json内容:

{
“schema_version”: 1,
“name”: “ACTIVEMQ”,
“version”: “5.10.2-hawtio_1.4.68”,
“depends”: “CDH (>= 5.2), CDH (<< 6.0)”,
“setActiveSymlink”: true,
“replaces”: “ACTIVEMQ”,
“scripts”: {
“defines”: “activemq_env.sh”
},
“packages”: [{
“name”: “activemq”,
“version”: “5.10.2+activemq5.10.2”
}],
“components”: [{
“name”: “activemq”,
“version”: “5.10.2-activemq5.10.2”,
“pkg_version”: “5.10.2+activemq5.10.2”,
“pkg_release”: “5.10.2-hawtio_1.4.68”
}],
“provides”: [“activemq”],
“users”: {
“activemq”: {
“longname”: “Activemq”,
“home”: “/var/lib/activemq”,
“shell”: “/bin/bash”,
“extra_groups”: []
}
},
“groups”: [“activemq”]
}

注意:务必注意文件内容的大小写,否则可能造成parcel包无法发布的情况。

进入ACTIVEMQ-5.10.2-hawtio_1.4.68文件夹所在目录

删除ACTIVEMQ-5.10.2-hawtio_1.4.68/lib/activemq/lib/pax-url-aether-1.5.2.jar

将下载的hawtio-default-offline-1.4.68.war重命名为hawtio.rar

然后解压到ACTIVEMQ-5.10.2-hawtio_1.4.68/lib/activemq/webapps目录,如下图:

CDH添加第三方服务的方法

修改log4j.properties文件:

vi ACTIVEMQ-5.10.2-hawtio_1.4.68/lib/activemq/conf/log4j.properties

ACTIVEMQ-5.10.2-hawtio_1.4.68/lib/activemq/conf/log4j.properties内容:

log4j.rootLogger=INFO, logfile
log4j.logger.org.apache.activemq.spring=WARN
log4j.logger.org.apache.activemq.web.handler=WARN
log4j.logger.org.springframework=WARN
log4j.logger.org.apache.xbean=WARN
log4j.logger.org.apache.camel=INFO
log4j.logger.org.eclipse.jetty=WARN
log4j.appender.logfile=org.apache.log4j.RollingFileAppender
log4j.appender.logfile.file=${activemq.logs}/activemq-broker.log
log4j.appender.logfile.maxFileSize=10MB
log4j.appender.logfile.maxBackupIndex=5
log4j.appender.logfile.append=false
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS}
%-5p %-60c %x - %m%n
log4j.throwableRenderer=org.apache.log4j.EnhancedThrowableRenderer

修改activemq脚本:

vi ACTIVEMQ-5.10.2-hawtio_1.4.68/lib/activemq/bin/activemq

修改内容1:

COMMANDLINE_ARGS="[email protected]"

修改为:

ACTIVEMQ_CONF=$2
ACTIVEMQ_DATA=$3
ACTIVEMQ_LOGS=$4
ACTIVEMQ_TEMP=$5
COMMANDLINE_ARGS="${@:6}"

修改内容2:

if [ -z “$ACTIVEMQ_OPTS” ] ; then
ACTIVEMQ_OPTS="$ACTIVEMQ_OPTS_MEMORY
-Djava.util.logging.config.file=logging.properties
-Djava.security.auth.login.config=$ACTIVEMQ_CONF/login.config"
fi

修改为:

if [ -z “$ACTIVEMQ_OPTS” ] ; then
ACTIVEMQ_OPTS="$ACTIVEMQ_OPTS_MEMORY -Dhawtio.realm=activemq
-Dhawtio.role=admins
-Dhawtio.rolePrincipalClasses=org.apache.activemq.jaas.GroupPrincipal
-Djava.util.logging.config.file=logging.properties
-Djava.security.auth.login.config=$ACTIVEMQ_CONF/login.config"
fi

修改内容3:

if [ -n “$PIDFILE” ] && [ “$PIDFILE” != “stop” ];then
$EXEC_OPTION $DOIT_PREFIX “$JAVACMD $ACTIVEMQ_OPTS $ACTIVEMQ_DEBUG_OPTS
\
-Dactivemq.classpath=\”${ACTIVEMQ_CLASSPATH}\" \
-Dactivemq.home=\"${ACTIVEMQ_HOME}\" \
-Dactivemq.base=\"${ACTIVEMQ_BASE}\" \
-Dactivemq.conf=\"${ACTIVEMQ_CONF}\" \
-Dactivemq.data=\"${ACTIVEMQ_DATA}\" \
$ACTIVEMQ_CYGWIN \
-jar \"${ACTIVEMQ_HOME}/bin/activemq.jar\" $COMMANDLINE_ARGS
>/dev/null 2>&1 &
RET=\"\$?\"; APID=\"\$!\";
echo \$APID > $PIDFILE;
echo \“INFO: pidfile created : ‘$PIDFILE’ (pid ‘\$APID’)\”;exit
\$RET" $DOIT_POSTFIX
RET="$?"
elif [ -n “$PIDFILE” ] && [ “$PIDFILE” = “stop” ];then
PID=`cat $ACTIVEMQ_PIDFILE`
$EXEC_OPTION $DOIT_PREFIX “$JAVACMD $ACTIVEMQ_OPTS $ACTIVEMQ_DEBUG_OPTS
\
-Dactivemq.classpath=\”${ACTIVEMQ_CLASSPATH}\" \
-Dactivemq.home=\"${ACTIVEMQ_HOME}\" \
-Dactivemq.base=\"${ACTIVEMQ_BASE}\" \
-Dactivemq.conf=\"${ACTIVEMQ_CONF}\" \
-Dactivemq.data=\"${ACTIVEMQ_DATA}\" \
$ACTIVEMQ_CYGWIN \
-jar \"${ACTIVEMQ_HOME}/bin/activemq.jar\" $COMMANDLINE_ARGS --pid
$PID &
RET=\"\$?\"; APID=\"\$!\";
echo \$APID > $ACTIVEMQ_DATA/stop.pid; exit \$RET" $DOIT_POSTFIX
RET="$?"
else
$EXEC_OPTION $DOIT_PREFIX “$JAVACMD $ACTIVEMQ_OPTS $ACTIVEMQ_DEBUG_OPTS
\
-Dactivemq.classpath=\”${ACTIVEMQ_CLASSPATH}\" \
-Dactivemq.home=\"${ACTIVEMQ_HOME}\" \
-Dactivemq.base=\"${ACTIVEMQ_BASE}\" \
-Dactivemq.conf=\"${ACTIVEMQ_CONF}\" \
-Dactivemq.data=\"${ACTIVEMQ_DATA}\" \
$ACTIVEMQ_CYGWIN \
-jar \"${ACTIVEMQ_HOME}/bin/activemq.jar\" $COMMANDLINE_ARGS"
$DOIT_POSTFIX
RET="$?"
fi

修改为:

if [ -n “$PIDFILE” ] && [ “$PIDFILE” != “stop” ];then
$EXEC_OPTION $DOIT_PREFIX “$JAVACMD $ACTIVEMQ_OPTS $ACTIVEMQ_DEBUG_OPTS
\
-Dactivemq.classpath=\”${ACTIVEMQ_CLASSPATH}\" \
-Dactivemq.home=\"${ACTIVEMQ_HOME}\" \
-Dactivemq.base=\"${ACTIVEMQ_BASE}\" \
-Dactivemq.conf=\"${ACTIVEMQ_CONF}\" \
-Dactivemq.data=\"${ACTIVEMQ_DATA}\" \
-Dactivemq.logs=\"${ACTIVEMQ_LOGS}\" \
$ACTIVEMQ_CYGWIN \
-jar \"${ACTIVEMQ_HOME}/bin/activemq.jar\" $COMMANDLINE_ARGS
>/dev/null 2>&1 &
RET=\"\$?\"; APID=\"\$!\";
echo \$APID > $PIDFILE;
echo \“INFO: pidfile created : ‘$PIDFILE’ (pid ‘\$APID’)\”;exit
\$RET" $DOIT_POSTFIX
RET="$?"
elif [ -n “$PIDFILE” ] && [ “$PIDFILE” = “stop” ];then
PID=`cat $ACTIVEMQ_PIDFILE`
$EXEC_OPTION $DOIT_PREFIX “$JAVACMD $ACTIVEMQ_OPTS $ACTIVEMQ_DEBUG_OPTS
\
-Dactivemq.classpath=\”${ACTIVEMQ_CLASSPATH}\" \
-Dactivemq.home=\"${ACTIVEMQ_HOME}\" \
-Dactivemq.base=\"${ACTIVEMQ_BASE}\" \
-Dactivemq.conf=\"${ACTIVEMQ_CONF}\" \
-Dactivemq.data=\"${ACTIVEMQ_DATA}\" \
-Dactivemq.logs=\"${ACTIVEMQ_LOGS}\" \
$ACTIVEMQ_CYGWIN \
-jar \"${ACTIVEMQ_HOME}/bin/activemq.jar\" $COMMANDLINE_ARGS --pid
$PID &
RET=\"\$?\"; APID=\"\$!\";
echo \$APID > $ACTIVEMQ_DATA/stop.pid; exit \$RET" $DOIT_POSTFIX
RET="$?"
else
$EXEC_OPTION $DOIT_PREFIX “$JAVACMD $ACTIVEMQ_OPTS $ACTIVEMQ_DEBUG_OPTS
\
-Dactivemq.classpath=\”${ACTIVEMQ_CLASSPATH}\" \
-Dactivemq.home=\"${ACTIVEMQ_HOME}\" \
-Dactivemq.base=\"${ACTIVEMQ_BASE}\" \
-Dactivemq.conf=\"${ACTIVEMQ_CONF}\" \
-Dactivemq.data=\"${ACTIVEMQ_DATA}\" \
-Dactivemq.logs=\"${ACTIVEMQ_LOGS}\" \
$ACTIVEMQ_CYGWIN \
-jar \"${ACTIVEMQ_HOME}/bin/activemq.jar\" $COMMANDLINE_ARGS"
$DOIT_POSTFIX
RET="$?"
fi

创建permissions.json文件:

vi ACTIVEMQ-5.10.2-hawtio_1.4.68/meta/permissions.json

ACTIVEMQ-5.10.2-hawtio_1.4.68/meta/permissions.json内容:

{
“lib/activemq/bin/activemq-admin”: {
“user”: “activemq”,
“group”: “activemq”,
“permissions”: “0755”
},

“lib/activemq/bin/activemq”: {
“user”: “activemq”,
“group”: “activemq”,
“permissions”: “0755”
},
“lib/activemq/bin/diag”: {
“user”: “activemq”,
“group”: “activemq”,
“permissions”: “0755”
}
}

permissions.json文件用于授予文件或文件夹权限。

将目录ACTIVEMQ-5.10.2-hawtio_1.4.68打包,打包成标准名称的parcel文件。

执行以下命令:

tar -czvf parcel-el6/ACTIVEMQ-5.10.2-hawtio_1.4.68-el6.parcel
ACTIVEMQ-5.10.2-hawtio_1.4.68

打包完Parcel文件,需要生成ACTIVEMQ-5.10.2-hawtio_1.4.68-el6.parcel.sha文件,作为ACTIVEMQ-5.10.2-hawtio_1.4.68-el6.parcel文件的hash校验值。

执行以下命令:

python cm_ext-master/make_manifest/make_manifest.py parcel-el6

parcel-el6是ACTIVEMQ-5.10.2-hawtio_1.4.68-el6.parcel的父级目录。

上面的命令会在parcel-el6文件夹下生成一个文件manifest.json。

parcel-el6/manifest.json内容:

{
“parcels”: [{
“hash”: “03ccbceb485a44f4340fba5a1389ad8ac444b1a7”,
“depends”: “CDH (>= 5.2), CDH (<< 6.0)”,
“replaces”: “ACTIVEMQ”,
“parcelName”: “ACTIVEMQ-5.10.2-hawtio_1.4.68-el6.parcel”,
“components”: [{
“pkg_version”: “5.10.2+activemq5.10.2”,
“version”: “5.10.2-activemq5.10.2”,
“name”: “activemq”,
“pkg_release”: “5.10.2-hawtio_1.4.68”
}]
}],
“lastUpdated”: 1539916493195
}

创建ACTIVEMQ-5.10.2-hawtio_1.4.68-el6.parcel.sha文件:

echo $(cat parcel-el6/manifest.json | grep hash | awk -F"\"" ‘{print
$4}’) > parcel-el6/ACTIVEMQ-5.10.2-hawtio_1.4.68-el6.parcel.sha

parcel-el6/ACTIVEMQ-5.10.2-hawtio_1.4.68-el6.parcel.sha内容:

03ccbceb485a44f4340fba5a1389ad8ac444b1a7

最终你会得到三个文件:

ACTIVEMQ-5.10.2-hawtio_1.4.68-el6.parcel

ACTIVEMQ-5.10.2-hawtio_1.4.68-el6.parcel.sha

manifest.json

将这三个文件复制到/opt/cloudera/parcel-repo/目录下即可。

如果/opt/cloudera/parcel-repo/目录已存在manifest.json文件,需将以下内容:

{
“hash”: “03ccbceb485a44f4340fba5a1389ad8ac444b1a7”,
“depends”: “CDH (>= 5.2), CDH (<< 6.0)”,
“replaces”: “ACTIVEMQ”,
“parcelName”: “ACTIVEMQ-5.10.2-hawtio_1.4.68-el6.parcel”,
“components”: [{
“pkg_version”: “5.10.2+activemq5.10.2”,
“version”: “5.10.2-activemq5.10.2”,
“name”: “activemq”,
“pkg_release”: “5.10.2-hawtio_1.4.68”
}]
}

添加到已有manifest.json文件中。

制作csd jar

csd文件的目录结构如下:

CDH添加第三方服务的方法

descriptor 放服务的规则描述文件service.sdl。

images 放服务的图标文件,png格式。不放图标文件,则CDH页面不显示图标。

scripts
放你的服务的启动脚本,你的服务如何启动在scripts目录下的control.sh中自行定义即可。

创建文件夹:

mkdir -p ACTIVEMQ-5.10.2/descriptor
mkdir -p ACTIVEMQ-5.10.2/images
mkdir -p ACTIVEMQ-5.10.2/scripts

创建service.sdl文件:

vi ACTIVEMQ-5.10.2/descriptor/service.sdl

ACTIVEMQ-5.10.2/descriptor/service.sdl内容:

{
“name”: “ACTIVEMQ”,
“label”: “ActiveMQ”,
“description”: “Apache ActiveMQ is the most popular and powerful open source
messaging and Integration Patterns server.”,
“version”: “5.10.2”,
“compatibility”: {
“generation”: 1,
“cdhVersion”: {
“min”: “5”,
“max”: “5”
}
},
“runAs”: {
“user”: “activemq”,
“group”: “activemq”
},
“icon”: “images/activemq.png”,
“serviceDependencies”: [{
“name”: “ZOOKEEPER”,
“required”: “true”
}],
“parameters”: [{
“name”: “activemq.leveldb.replicas”,
“label”: “activemq.leveldb.replicas”,
“description”: “Number of data stored in LealDB.”,
“type”: “long”,
“default”: 1,
“configurableInWizard”: true
},
{
“name”: “activemq.leveldb.path”,
“label”: “activemq.leveldb.path”,
“description”: “The Leveldb store data path.”,
“type”: “string”,
“default”: “/var/lib/activemq/leveldb-stores”,
“configurableInWizard”: true
},
{
“name”: “activemq.leveldb.zkPath”,
“label”: “activemq.leveldb.zkPath”,
“description”: “The zookeeper store data path.”,
“type”: “string”,
“default”: “/activemq/leveldb-stores”,
“configurableInWizard”: true
}],
“roles”: [{
“name”: “ACTIVEMQ_BROKER_A”,
“label”: “ActiveMQ Broker A”,
“uniqueIdParameters”: [“activemq.brokerId”],
“pluralLabel”: “ActiveMQ Brokers”,
“startRunner”: {
“program”: “scripts/control.sh”,
“args”: [“console”,
“${activemq.brokerId}”,
“mq-broker-a”,
“${activemq.leveldb.port}”,
“${activemq.leveldb.path}”,
“${activemq.leveldb.replicas}”,
“${activemq.leveldb.zkPath}”,
“${activemq.openwire.port}”,
“${activemq.mqtt.port}”,
“${activemq.jetty.port}”,
“/var/log/activemq/mq-broker-a”],
“environmentVariables”: {
“ACTIVEMQ_RUN_AS_USER”: “${user}”
}
},
“externalLink”: {
“name”: “hawtio_ui”,
“label”: “Hawtio UI”,
“url”: “http://${host}????{activemq.jetty.port}/hawtio/”
},
“parameters”: [{
“name”: “activemq.brokerId”,
“label”: “activemq.brokerId”,
“description”: "ID uniquely identifying each broker. ",
“type”: “string”,
“default”: “a1”,
“configurableInWizard”: true
},
{
“name”: “activemq.leveldb.port”,
“label”: “activemq.leveldb.port”,
“description”: “The Leveldb bind port.”,
“type”: “long”,
“default”: 61619,
“configurableInWizard”: true
},
{
“name”: “activemq.openwire.port”,
“label”: “activemq.openwire.port”,
“description”: “The transport openwire connector port.”,
“type”: “long”,
“default”: 61616,
“configurableInWizard”: true
},
{
“name”: “activemq.mqtt.port”,
“label”: “activemq.mqtt.port”,
“description”: “The transport mqtt connector port.”,
“type”: “long”,
“default”: 61613,
“configurableInWizard”: true
},
{
“name”: “activemq.jetty.port”,
“label”: “activemq.jetty.port”,
“description”: “The jetty web server port.”,
“type”: “long”,
“default”: 8161,
“configurableInWizard”: true
}],
“topology”: {
“minInstances”: “1”
},
“logging”: {
“filename”: “activemq-broker.log”,
“isModifiable”: true,
“configName”: “activemq.logs”,
“loggingType”: “log4j”,
“dir”: “/var/log/activemq/mq-broker-a”
},
“configWriter”: {
“generators”: [{
“filename”: “activemq-conf.properties”,
“configFormat”: “properties”,
“includeParams”: [“activemq.leveldb.replicas”,
“activemq.brokerId”,
“activemq.leveldb.path”,
“activemq.leveldb.zkPath”,
“activemq.leveldb.port”,
“activemq.openwire.port”,
“activemq.mqtt.port”,
“activemq.jetty.port”]
}]
}
},
{
“name”: “ACTIVEMQ_BROKER_B”,
“label”: “ActiveMQ Broker B”,
“uniqueIdParameters”: [“activemq.brokerId”],
“pluralLabel”: “ActiveMQ Brokers”,
“startRunner”: {
“program”: “scripts/control.sh”,
“args”: [“console”,
“${activemq.brokerId}”,
“mq-broker-b”,
“${activemq.leveldb.port}”,
“${activemq.leveldb.path}”,
“${activemq.leveldb.replicas}”,
“${activemq.leveldb.zkPath}”,
“${activemq.openwire.port}”,
“${activemq.mqtt.port}”,
“${activemq.jetty.port}”,
“/var/log/activemq/mq-broker-b”],
“environmentVariables”: {
“ACTIVEMQ_RUN_AS_USER”: “${user}”
}
},
“externalLink”: {
“name”: “hawtio_ui”,
“label”: “Hawtio UI”,
“url”: “http://${host}????{activemq.jetty.port}/hawtio/”
},
“parameters”: [{
“name”: “activemq.brokerId”,
“label”: “activemq.brokerId”,
“description”: “ID uniquely identifying each broker.”,
“type”: “string”,
“default”: “b1”,
“configurableInWizard”: true
},
{
“name”: “activemq.leveldb.port”,
“label”: “activemq.leveldb.port”,
“description”: “The Leveldb bind port.”,
“type”: “long”,
“default”: 61629,
“configurableInWizard”: true
},
{
“name”: “activemq.openwire.port”,
“label”: “activemq.openwire.port”,
“description”: “The transport openwire connector port.”,
“type”: “long”,
“default”: 61626,
“configurableInWizard”: true
},
{
“name”: “activemq.mqtt.port”,
“label”: “activemq.mqtt.port”,
“description”: “The transport mqtt connector port.”,
“type”: “long”,
“default”: 61623,
“configurableInWizard”: true
},
{
“name”: “activemq.jetty.port”,
“label”: “activemq.jetty.port”,
“description”: “The jetty web server port.”,
“type”: “long”,
“default”: 8171,
“configurableInWizard”: true
}],
“topology”: {
“minInstances”: “0”
},
“logging”: {
“filename”: “activemq-broker.log”,
“isModifiable”: true,
“configName”: “activemq.logs”,
“loggingType”: “log4j”,
“dir”: “/var/log/activemq/mq-broker-b”
},
“configWriter”: {
“generators”: [{
“filename”: “activemq-conf.properties”,
“configFormat”: “properties”,
“includeParams”: [“activemq.leveldb.replicas”,
“activemq.brokerId”,
“activemq.leveldb.path”,
“activemq.leveldb.zkPath”,
“activemq.leveldb.port”,
“activemq.openwire.port”,
“activemq.mqtt.port”,
“activemq.jetty.port”]
}]
}
},
{
“name”: “ACTIVEMQ_BROKER_C”,
“label”: “ActiveMQ Broker C”,
“uniqueIdParameters”: [“activemq.brokerId”],
“pluralLabel”: “ActiveMQ Brokers”,
“startRunner”: {
“program”: “scripts/control.sh”,
“args”: [“console”,
“${activemq.brokerId}”,
“mq-broker-c”,
“${activemq.leveldb.port}”,
“${activemq.leveldb.path}”,
“${activemq.leveldb.replicas}”,
“${activemq.leveldb.zkPath}”,
“${activemq.openwire.port}”,
“${activemq.mqtt.port}”,
“${activemq.jetty.port}”,
“/var/log/activemq/mq-broker-c”],
“environmentVariables”: {
“ACTIVEMQ_RUN_AS_USER”: “${user}”
}
},
“externalLink”: {
“name”: “hawtio_ui”,
“label”: “Hawtio UI”,
“url”: “http://${host}????{activemq.jetty.port}/hawtio/”
},
“parameters”: [{
“name”: “activemq.brokerId”,
“label”: “activemq.brokerId”,
“description”: "ID uniquely identifying each broker. ",
“type”: “string”,
“default”: “c1”,
“configurableInWizard”: true
},
{
“name”: “activemq.leveldb.port”,
“label”: “activemq.leveldb.port”,
“description”: “The Leveldb bind port.”,
“type”: “long”,
“default”: 61639,
“configurableInWizard”: true
},
{
“name”: “activemq.openwire.port”,
“label”: “activemq.openwire.port”,
“description”: “The transport openwire connector port.”,
“type”: “long”,
“default”: 61636,
“configurableInWizard”: true
},
{
“name”: “activemq.mqtt.port”,
“label”: “activemq.mqtt.port”,
“description”: “The transport mqtt connector port.”,
“type”: “long”,
“default”: 61633,
“configurableInWizard”: true
},
{
“name”: “activemq.jetty.port”,
“label”: “activemq.jetty.port”,
“description”: “The jetty web server port.”,
“type”: “long”,
“default”: 8181,
“configurableInWizard”: true
}],
“topology”: {
“minInstances”: “0”
},
“logging”: {
“filename”: “activemq-broker.log”,
“isModifiable”: true,
“configName”: “activemq.logs”,
“loggingType”: “log4j”,
“dir”: “/var/log/activemq/mq-broker-c”
},
“configWriter”: {
“generators”: [{
“filename”: “activemq-conf.properties”,
“configFormat”: “properties”,
“includeParams”: [“activemq.leveldb.replicas”,
“activemq.brokerId”,
“activemq.leveldb.path”,
“activemq.leveldb.zkPath”,
“activemq.leveldb.port”,
“activemq.openwire.port”,
“activemq.mqtt.port”,
“activemq.jetty.port”]
}]
}
}]
}

配置说明同3.2.2制作standlaone csd jar

创建control.sh文件:

vi ACTIVEMQ-5.10.2/scripts/control.sh

ACTIVEMQ-5.10.2/scripts/control.sh内容:

#!/bin/bash
# For better debugging
USAGE=“Usage: control.sh (start|stop)”
OPERTION=$1
ACTIVEMQ_BROKER_ID=$2
ACTIVEMQ_BROKER_NAME=$3
ACTIVEMQ_LEVELDB_PORT=$4
ACTIVEMQ_LEVELDB_PATH=$5
ACTIVEMQ_LEVELDB_REPLICAS=$6
ACTIVEMQ_LEVELDB_ZKPATH=$7
ACTIVEMQ_OPENWIRE_PORT=$8
ACTIVEMQ_MQTT_PORT=$9
ACTIVEMQ_JETTY_PORT=${10}
ACTIVEMQ_LOG_DIR=${11}
NODE_HOST=`hostname -f`
#Determine if the directory exists
TEMP_PATH=$CMF_VAR/…/cloudera/parcels
if [ ! -d “$TEMP_PATH” ];then
TEMP_PATH=$CMF_VAR/…/…/cloudera/parcels
fi
PARCELS_DIR=`cd $TEMP_PATH; pwd`
ACTIVEMQ_HOME=$PARCELS_DIR/ACTIVEMQ-5.10.2-hawtio_1.4.68/lib/activemq
export ACTIVEMQ_HOME
#Determine if the configuration file directory exists
ACTIVEMQ_CONF=$CONF_DIR/activemq-conf
if [ -d “$ACTIVEMQ_CONF” ];then
rm -rf $ACTIVEMQ_CONF/*
else
mkdir -p $ACTIVEMQ_CONF
fi
cp $ACTIVEMQ_HOME/conf/* $ACTIVEMQ_CONF/
ACTIVEMQ_DATA="/var/lib/activemq/data"
if [ ! -d “$ACTIVEMQ_DATA” ];then
mkdir -p $ACTIVEMQ_DATA
fi
ACTIVEMQ_TEMP="/var/lib/activemq/temp"
if [ ! -d “$ACTIVEMQ_TEMP” ];then
mkdir -p $ACTIVEMQ_TEMP
fi
LEVELDB_PATH="$ACTIVEMQ_LEVELDB_PATH/$ACTIVEMQ_BROKER_NAME"
if [ ! -d “$LEVELDB_PATH” ];then
mkdir -p $LEVELDB_PATH
fi
chown -R $ACTIVEMQ_RUN_AS_USER:$ACTIVEMQ_RUN_AS_USER $LEVELDB_PATH
BROKER_ID="$ACTIVEMQ_BROKER_NAME-$ACTIVEMQ_BROKER_ID"
LEVELDB_ZKPATH="$ACTIVEMQ_LEVELDB_ZKPATH/$ACTIVEMQ_BROKER_NAME"
echo “<?xml version=\“1.0\” encoding=\“UTF-8\”?>” >
$ACTIVEMQ_CONF/activemq.xml
echo “<beans xmlns=\“http://www.springframework.org/schema/beans\
xmlns:xsi=\“http://www.w3.org/2001/XMLSchema-instance\
xsi:schemaLocation=\“http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core.xsd\">" >>
$ACTIVEMQ_CONF/activemq.xml
echo “<bean
class=\“org.springframework.beans.factory.config.PropertyPlaceholderConfigurer\”>”
>> $ACTIVEMQ_CONF/activemq.xml
echo “<property
name=\“locations\”><value>file:${ACTIVEMQ_CONF}/credentials.properties</value></property></bean>”
>> $ACTIVEMQ_CONF/activemq.xml
echo “<broker xmlns=\“http://activemq.apache.org/schema/core\
brokerName=\”$ACTIVEMQ_BROKER_NAME\” brokerId=\”$BROKER_ID\"
dataDirectory=\"${ACTIVEMQ_DATA}\">" >> $ACTIVEMQ_CONF/activemq.xml
echo “<destinationPolicy><policyMap><policyEntries><policyEntry
topic=\”>\"><pendingMessageLimitStrategy>" >>
$ACTIVEMQ_CONF/activemq.xml
echo “<constantPendingMessageLimitStrategy
limit=\“1000\”/></pendingMessageLimitStrategy></policyEntry></policyEntries></policyMap>”
>> $ACTIVEMQ_CONF/activemq.xml
echo “</destinationPolicy><managementContext><managementContext
createConnector=\“false\”/></managementContext><persistenceAdapter>”
>> $ACTIVEMQ_CONF/activemq.xml
echo “<replicatedLevelDB directory=\”$LEVELDB_PATH\"
replicas=\"$ACTIVEMQ_LEVELDB_REPLICAS\"
bind=\“tcp://0.0.0.0:$ACTIVEMQ_LEVELDB_PORT\” zkAddress=\"$ZK_QUORUM\"
zkPath=\"$LEVELDB_ZKPATH\" hostname=\"$NODE_HOST\"/>" >>
$ACTIVEMQ_CONF/activemq.xml
echo
“</persistenceAdapter><systemUsage><systemUsage><memoryUsage><memoryUsage
percentOfJvmHeap=\“70\”/></memoryUsage>” >>
$ACTIVEMQ_CONF/activemq.xml
echo “<storeUsage><storeUsage limit=\“100
gb\”/></storeUsage><tempUsage><tempUsage limit=\“50
gb\”/></tempUsage></systemUsage></systemUsage>” >>
$ACTIVEMQ_CONF/activemq.xml
echo “<transportConnectors><transportConnector name=\“openwire\”
uri=\“tcp://0.0.0.0:$ACTIVEMQ_OPENWIRE_PORT?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600\”
discoveryUri=\“multicast://default\”/>” >> $ACTIVEMQ_CONF/activemq.xml
echo “<transportConnector name=\“mqtt\”
uri=\“mqtt://0.0.0.0:$ACTIVEMQ_MQTT_PORT?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600\”/></transportConnectors>”
>> $ACTIVEMQ_CONF/activemq.xml
echo “<networkConnectors><networkConnector userName=\“admin\”
password=\“admin\” name=\“default-nc\”
uri=\“multicast://default\”/></networkConnectors><shutdownHooks>”
>> $ACTIVEMQ_CONF/activemq.xml
echo “<bean xmlns=\“http://www.springframework.org/schema/beans\
class=\“org.apache.activemq.hooks.SpringContextHook\”
/></shutdownHooks>” >> $ACTIVEMQ_CONF/activemq.xml
echo “<plugins><simpleAuthenticationPlugin><users><authenticationUser
username=\“admin\” password=\“admin\” groups=\“admins\”/>” >>
$ACTIVEMQ_CONF/activemq.xml
echo “<authenticationUser username=\“user\” password=\“suning\”
groups=\“users\”/></users></simpleAuthenticationPlugin></plugins></broker>”
>> $ACTIVEMQ_CONF/activemq.xml
echo “<import resource=\“jetty.xml\”/></beans>” >>
$ACTIVEMQ_CONF/activemq.xml
echo “<?xml version=\“1.0\” encoding=\“UTF-8\”?>” >
$ACTIVEMQ_CONF/jetty.xml
echo “<beans xmlns=\“http://www.springframework.org/schema/beans\
xmlns:xsi=\“http://www.w3.org/2001/XMLSchema-instance\
xsi:schemaLocation=\“http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd\">" >>
$ACTIVEMQ_CONF/jetty.xml
echo “<bean id=\“securityLoginService\”
class=\“org.eclipse.jetty.security.HashLoginService\”><property
name=\“name\” value=\“ActiveMQRealm\” />” >>
$ACTIVEMQ_CONF/jetty.xml
echo “<property name=\“config\”
value=\”${ACTIVEMQ_CONF}/jetty-realm.properties\”/></bean><bean
id=\“securityConstraint\”
class=\“org.eclipse.jetty.util.security.Constraint\”>” >>
$ACTIVEMQ_CONF/jetty.xml
echo “<property name=\“name\” value=\“BASIC\”/><property
name=\“roles\” value=\“user,admin\”/><property name=\“authenticate\”
value=\“true\”/></bean>” >> $ACTIVEMQ_CONF/jetty.xml
echo “<bean id=\“adminSecurityConstraint\”
class=\“org.eclipse.jetty.util.security.Constraint\”>” >>
$ACTIVEMQ_CONF/jetty.xml
echo “<property name=\“name\” value=\“BASIC\” /><property
name=\“roles\” value=\“admin\”/><property name=\“authenticate\”
value=\“true\”/></bean>” >> $ACTIVEMQ_CONF/jetty.xml
echo “<bean id=\“securityConstraintMapping\”
class=\“org.eclipse.jetty.security.ConstraintMapping\”><property
name=\“constraint\” ref=\“securityConstraint\”/>” >>
$ACTIVEMQ_CONF/jetty.xml
echo “<property name=\“pathSpec\”
value=\”/api/*,/admin/*,*.jsp\"/></bean><bean
id=\“adminSecurityConstraintMapping\”
class=\“org.eclipse.jetty.security.ConstraintMapping\”>" >>
$ACTIVEMQ_CONF/jetty.xml
echo “<property name=\“constraint\”
ref=\“adminSecurityConstraint\”/><property name=\“pathSpec\”
value=\”*.action\"/></bean>" >> $ACTIVEMQ_CONF/jetty.xml
echo “<bean id=\“securityHandler\”
class=\“org.eclipse.jetty.security.ConstraintSecurityHandler\”><property
name=\“loginService\” ref=\“securityLoginService\”/>” >>
$ACTIVEMQ_CONF/jetty.xml
echo “<property name=\“authenticator\”><bean
class=\“org.eclipse.jetty.security.authentication.BasicAuthenticator\”/>”
>> $ACTIVEMQ_CONF/jetty.xml
echo “</property><property name=\“constraintMappings\”><list><ref
bean=\“adminSecurityConstraintMapping\”/><ref
bean=\“securityConstraintMapping\”/>” >> $ACTIVEMQ_CONF/jetty.xml
echo “</list></property><property name=\“handler\”><bean
id=\“sec\”
class=\“org.eclipse.jetty.server.handler.HandlerCollection\”>” >>
$ACTIVEMQ_CONF/jetty.xml
echo “<property name=\“handlers\”><list><bean
class=\“org.eclipse.jetty.webapp.WebAppContext\”><property
name=\“contextPath\” value=\”/admin\"/>" >> $ACTIVEMQ_CONF/jetty.xml
echo “<property name=\“resourceBase\”
value=\”${ACTIVEMQ_HOME}/webapps/admin\"/><property
name=\“logUrlOnStart\” value=\“true\”/></bean>" >>
$ACTIVEMQ_CONF/jetty.xml
echo “<bean class=\“org.eclipse.jetty.webapp.WebAppContext\”><property
name=\“contextPath\” value=\”/fileserver\" />" >>
$ACTIVEMQ_CONF/jetty.xml
echo “<property name=\“resourceBase\”
value=\”${ACTIVEMQ_HOME}/webapps/fileserver\"/><property
name=\“logUrlOnStart\” value=\“true\”/>" >> $ACTIVEMQ_CONF/jetty.xml
echo “<property name=\“parentLoaderPriority\”
value=\“true\”/></bean><bean
class=\“org.eclipse.jetty.webapp.WebAppContext\”><property
name=\“contextPath\” value=\”/api\"/>" >> $ACTIVEMQ_CONF/jetty.xml
echo “<property name=\“resourceBase\”
value=\”${ACTIVEMQ_HOME}/webapps/api\"/><property
name=\“logUrlOnStart\” value=\“true\”/></bean>" >>
$ACTIVEMQ_CONF/jetty.xml
echo “<bean class=\“org.eclipse.jetty.webapp.WebAppContext\”><property
name=\“contextPath\” value=\”/hawtio\"/>" >>
$ACTIVEMQ_CONF/jetty.xml
echo “<property name=\“war\”
value=\”${ACTIVEMQ_HOME}/webapps/hawtio\"/><property
name=\“logUrlOnStart\” value=\“true\”/></bean><bean
class=\“org.eclipse.jetty.server.handler.ResourceHandler\”>" >>
$ACTIVEMQ_CONF/jetty.xml
echo “<property name=\“directoriesListed\” value=\“false\”/><property
name=\“welcomeFiles\”><list><value>index.html</value></list></property>”
>> $ACTIVEMQ_CONF/jetty.xml
echo “<property name=\“resourceBase\”
value=\”${ACTIVEMQ_HOME}/webapps/\"/></bean><bean
id=\“defaultHandler\”
class=\“org.eclipse.jetty.server.handler.DefaultHandler\”>" >>
$ACTIVEMQ_CONF/jetty.xml
echo “<property name=\“serveIcon\”
value=\“false\”/></bean></list></property></bean></property></bean><bean
id=\“contexts\”
class=\“org.eclipse.jetty.server.handler.ContextHandlerCollection\”>”
>> $ACTIVEMQ_CONF/jetty.xml
echo “</bean><bean id=\“jettyPort\”
class=\“org.apache.activemq.web.WebConsolePort\”
init-method=\“start\”>” >> $ACTIVEMQ_CONF/jetty.xml
echo “<property name=\“port\”
value=\”$ACTIVEMQ_JETTY_PORT\"/></bean><bean id=\“Server\”
depends-on=\“jettyPort\” class=\“org.eclipse.jetty.server.Server\”
init-method=\“start\” destroy-method=\“stop\”>" >>
$ACTIVEMQ_CONF/jetty.xml
echo “<property name=\“connectors\”><list><bean id=\“Connector\”
class=\“org.eclipse.jetty.server.nio.SelectChannelConnector\”>” >>
$ACTIVEMQ_CONF/jetty.xml
echo “<property name=\“port\”
value=\”#{systemProperties[‘jetty.port’]}\"/></bean></list></property><property
name=\“handler\”>" >> $ACTIVEMQ_CONF/jetty.xml
echo “<bean id=\“handlers\”
class=\“org.eclipse.jetty.server.handler.HandlerCollection\”><property
name=\“handlers\”><list><ref bean=\“contexts\”/>” >>
$ACTIVEMQ_CONF/jetty.xml
echo “<ref
bean=\“securityHandler\”/></list></property></bean></property></bean></beans>”
>> $ACTIVEMQ_CONF/jetty.xml
echo “”
echo “Date: `date`”
echo “Host: $NODE_HOST”
echo “ZK_QUORUM: $ZK_QUORUM”
echo “ACTIVEMQ_HOME: $ACTIVEMQ_HOME”
echo “ACTIVEMQ_CONF: $ACTIVEMQ_CONF”
echo “ACTIVEMQ_DATA: $ACTIVEMQ_DATA”
echo “ACTIVEMQ_BROKER_ID: $ACTIVEMQ_BROKER_ID”
echo “ACTIVEMQ_BROKER_NAME: $ACTIVEMQ_BROKER_NAME”
echo “ACTIVEMQ_LEVELDB_PORT: $ACTIVEMQ_LEVELDB_PORT”
echo “ACTIVEMQ_LEVELDB_PATH: $LEVELDB_PATH”
echo “ACTIVEMQ_LEVELDB_REPLICAS: $ACTIVEMQ_LEVELDB_REPLICAS”
echo “ACTIVEMQ_LEVELDB_ZKPATH: $ACTIVEMQ_LEVELDB_ZKPATH”
echo “ACTIVEMQ_OPENWIRE_PORT: $ACTIVEMQ_OPENWIRE_PORT”
echo “ACTIVEMQ_MQTT_PORT: $ACTIVEMQ_MQTT_PORT”
echo “ACTIVEMQ_JETTY_PORT: $ACTIVEMQ_JETTY_PORT”
echo “ACTIVEMQ_LOG_DIR: $ACTIVEMQ_LOG_DIR”
echo “”
exec $ACTIVEMQ_HOME/bin/activemq $OPERTION “$ACTIVEMQ_CONF”
“$ACTIVEMQ_DATA” “$ACTIVEMQ_LOG_DIR” “$ACTIVEMQ_TEMP”

注意:control.sh脚本中的exec命令是必须的。

用jar命令打包,得到csd的jar文件,进入ACTIVEMQ-5.10.2文件夹。

执行命令:

jar -cvf ACTIVEMQ-5.10.2 *

将ACTIVEMQ-5.10.2.jar复制到/opt/cloudera/csd/目录下。

发布到CDH上

**parcel包

CDH添加第三方服务的方法

点击检查新Parcel后,就会显示你添加的parcel包,然后点击**、启用。CDH会自动将你的parcel包分发到集群的各个节点上。也就是会解压在个节点的/opt/cloudera/parcels目录下。

这里的parcel包发布跟csd的jar文件无关。

添加服务

CDH添加第三方服务的方法

在CDH首页点击添加服务,会看到你发布的flink或activemq服务,按照提示一步一步选择即可。

最终服务添加成功后,会执行control.sh脚本,然后根据env.sh文件自动找到/opt/cloudera/parcels目录下的服务工程目录,并启动你的服务

服务启动后,运行状态受CDH监控,因为CDH是通过exec执行的control.sh脚本,会接管control.sh的bash环境,其实也就是fork出来的子进程。当进程挂了,CDH也就知道你的服务挂了

注:一定要注意上面的各个文件的服务名称大小写问题,出错了可能导致发布不成功的。

至此,整个第三方服务的添加就结束了。