System Cluster中的System.env和值未在Spark Cluster的Slave Nodes上读取

时间:2021-08-29 20:52:55

I do have a multi node spark cluster and submitting my spark program on node where master resides.

我有一个多节点火花簇,并在master所在的节点上提交我的spark程序。

When the job submitted to slave nodes, the HOSTNAME paramter is giving null value. Here is the line where properties are being read as null.

当作业提交到从属节点时,HOSTNAME参数给出空值。以下是将属性读取为null的行。

System.getenv(HOSTNAME) is not being read from slave node.

未从从属节点读取System.getenv(HOSTNAME)。

        System.out.println("line 76 System.getenv(HOSTNAME)=" + System.getenv("HOSTNAME"));

AUDIT_USER, AUDIT_PASSWORD also null when being read(they both were on properties file).

AUDIT_USER,AUDIT_PASSWORD在读取时也为null(它们都在属性文件中)。

If i submit job with one node i have no issues with these parameters. But, if u submit job on standalone mode with 6 nodes i am getting this issue.

如果我用一个节点提交作业,我对这些参数没有任何问题。但是,如果你在6个节点的独立模式下提交作业,我就会遇到这个问题。

I have created the same folder for properties file on all nodes.

我在所有节点上为属性文件创建了相同的文件夹。

Here is my code. could you please let me know why System.env is not giving null and my properties are null?

这是我的代码。你能不能让我知道为什么System.env没有给出null并且我的属性为空?

package com.fb.cpd.myapp;

import java.io.Serializable;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.Future;

import org.apache.commons.configuration.ConfigurationConverter;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.configuration.reloading.FileChangedReloadingStrategy;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.TaskContext;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;

import kafka.common.TopicAndPartition;
import kafka.message.MessageAndMetadata;
import kafka.serializer.DefaultDecoder;
import kafka.serializer.StringDecoder;

public class GenericLogic implements Serializable {
    /**
     * 
     */
    private static final long serialVersionUID = 1L;
    private static final Logger logger = LogManager.getLogger(GenericLogic.class);
    private PropertiesConfiguration props;
    private Producer<String, String> producer = null;
    private Future<RecordMetadata> receipt = null;
    private RecordMetadata receiptInfo = null;
    private ConnectToRDBMS auditor = null;
    private ConnectToRDBMS df = null;

    private static String myId = null;

    private Map<TopicAndPartition, Long> getOffsets(String topic) throws SQLException {
        String appName = "myapp";
        String TopicName = topic;
        Map<TopicAndPartition, Long> topicMap = new HashMap<>(); //
        System.out.println("line 64 before making connection");

        try {
            props = new PropertiesConfiguration("/app/lock/conf/empty.properties");
        } catch (ConfigurationException e) { // TODO Auto-generated catch block
            System.out.println("Line 70");
            e.printStackTrace();
        }

        try {
            System.out.println("line 76 System.getenv(HOSTNAME)=" + System.getenv("HOSTNAME"));
            auditor = new ConnectToRDBMS(System.getenv("HOSTNAME"), "lockSparkCollector", null, null, null, null, null,
                    0, props.getString("AUDIT_USER"), props.getString("AUDIT_PASSWORD"),
                    props.getString("AUDIT_DB_URL"));
        } catch (SQLException e) {
            logger.error("ASSERT: run() ERROR CONNECTING TO AUDIT DB " + e.getMessage());
        }
        System.out.println("line 64 after making connection");

        Statement stmt = null;

        String query = "select va_application, topic_name, partition_id, from_offset,until_offset from lock_spark_offsets where va_application = "
                + "'" + appName + "'" + " and topic_name= " + "'" + TopicName + "'";
        System.out.println("query" + query);
        System.out.println("before query exection");
        try {
            stmt = auditor.dbConnection.createStatement();
            System.out.println("line 81");

            ResultSet rs = stmt.executeQuery(query);
            System.out.println("line 83");
            while (rs.next()) {
                System.out.println("pass 1 of Resultset");
                System.out.println("getOffsets=" + topic.trim() + " " + rs.getInt("partition_id") + " "
                        + rs.getString("until_offset") + " " + rs.getString("until_offset"));
                Integer partition = rs.getInt("partition_id");

                TopicAndPartition tp = new TopicAndPartition(topic.trim(), partition);
                System.out.println("102");
                Long.parseLong(rs.getString("until_offset"));
                topicMap.put(tp, Long.parseLong(rs.getString("until_offset")));
                System.out.println("105");

            }
            System.out.println("after populating topic map");

        } catch (

        SQLException e) {
            System.out.println("printing exception");
            e.printStackTrace();
        } finally {
            if (stmt != null) {
                System.out.println("closing statement");
                stmt.close();
            }
        }
        return topicMap;
    }

    public void setDefaultProperties() {
        FileChangedReloadingStrategy strategy = new FileChangedReloadingStrategy();
        strategy.setRefreshDelay(10000);
        System.out.println("Line 45");
        // supply the properties file.
        try {
            props = new PropertiesConfiguration("/app/lock/conf/empty.properties");
        } catch (ConfigurationException e) {
            // TODO Auto-generated catch block
            System.out.println("Line 51");
            e.printStackTrace();
        }
        props.setReloadingStrategy(strategy);
        System.out.println("Line 56");

        // Producer configs
        if (!props.containsKey("acks")) {
            props.setProperty("acks", "1");
        }

        if (!props.containsKey("retries")) {
            props.setProperty("retries", "1000");
        }

        if (!props.containsKey("compression.type")) {
            props.setProperty("compression.type", "gzip");
        }

        if (!props.containsKey("request.timeout.ms")) {
            props.setProperty("request.timeout.ms", "600000");
        }

        if (!props.containsKey("batch.size")) {
            props.setProperty("batch.size", "32768");
        }

        if (!props.containsKey("buffer.memory")) {
            props.setProperty("buffer.memory", "134217728");
        }

        if (!props.containsKey("block.on.buffer.full")) {
            props.setProperty("block.on.buffer.full", "true");
        }

        if (!props.containsKey("SHUTDOWN")) {
            props.setProperty("SHUTDOWN", "false");
        }

        if (!props.containsKey("producer.topic")) {
            props.setProperty("producer.topic", "mytopic1");
        }

        Properties producer_props = ConfigurationConverter.getProperties(props);

        producer_props.setProperty("bootstrap.servers", props.getString("target.bootstrap.servers"));
        producer_props.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        producer_props.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // ????

        this.producer = new KafkaProducer<String, String>(producer_props);
        System.out.println("Line 107");

    }

    public void PublishMessages(String st) {

        try {
            System.out.println("Line 111");
            String key = UUID.randomUUID().toString().replace("-", "");
            System.out.println("Started Producing...");

            receipt = producer.send(new ProducerRecord<String, String>(props.getString("producer.topic"), key, // Key
                    st));
            System.out.println("After Completion of Producing Producing");
        } catch (Exception e) {
            e.printStackTrace();
            System.out.println("Exception in PublishMessages ");
        }

    }

    public void DBConnect() {
        try {
            auditor = new ConnectToRDBMS(System.getenv("HOSTNAME"), "myapp", props.getString("consumer.topic"), null,
                    null, null, null, 0, props.getString("AUDIT_USER"), props.getString("AUDIT_PASSWORD"),
                    props.getString("AUDIT_DB_URL"));
        } catch (SQLException e) {
            logger.error("ASSERT: run() ERROR CONNECTING TO AUDIT DB " + e.getMessage());
            return;
        }
    }

    private void writeToDB(Long startTime, Integer partnId, String fromOffset, String untilOffset, Integer count) {
        this.auditor.audit(startTime, partnId, fromOffset, untilOffset, count);

    }

    /**
     * 
     * @param jsc
     * @param topicSet
     * @throws Exception
     */
    public static void main(String[] args) {
        String topicNames = "MySourceTopic";
        GenericLogic ec = new GenericLogic();
        Map<TopicAndPartition, Long> topicMap = null;
        try {

            topicMap = ec.getOffsets("MySourceTopic");

        } catch (SQLException e1) {
            // TODO Auto-generated catch block
            e1.printStackTrace();
        }

        boolean clusterMode = false;

        Integer batchDuration = Integer.parseInt("30000");
        JavaSparkContext sparkConf = new JavaSparkContext("abcd.net:7077", "Kafka-Spark-Integration");

        sparkConf.getConf().set("spark.local.ip", "lock-dt-a4d.xyz.com");
        sparkConf.getConf().set("spark.eventLog.enabled", "false");
        sparkConf.getConf().set("spark.shuffle.blockTransferService", "nio");

        JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, new Duration(10000));
        Map<String, String> kafkaParams = new HashMap<String, String>();
        String pollInterval = "10000";
        String zookeeper = "lock-dt-a5d.xyz.com:2181,lock-dt-a6d.xyz.com:2181";

        kafkaParams.put("metadata.broker.list", "lock-dt-a5d.xyz.com:9092,lock-dt-a6d.xyz.com:9092");
        kafkaParams.put("group.id", "Consumer");
        kafkaParams.put("client.id", "Consumer");
        kafkaParams.put("zookeeper.connect", zookeeper);

        JavaInputDStream<byte[]> directKafkaStream = KafkaUtils.createDirectStream(jsc, String.class, byte[].class,
                StringDecoder.class, DefaultDecoder.class, byte[].class, kafkaParams, topicMap,
                (Function<MessageAndMetadata<String, byte[]>, byte[]>) MessageAndMetadata::message);

        directKafkaStream.foreachRDD(rdd -> {
            if (rdd.isEmpty()) {
                System.out.println("No events polled in last " + pollInterval + " milli seconds");
                return;
            }

            rdd.foreachPartition(itr -> {
                Integer partnId = TaskContext.get().partitionId();
                Long systime = System.nanoTime();
                Map<String, String> hmap = new HashMap<String, String>();

                GenericLogic ec2 = new GenericLogic();
                ec2.setDefaultProperties();
                ec2.DBConnect();

                try {

                    while (itr.hasNext()) {
                        System.out.println("232");
                    }

                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                }

            });
        });
        jsc.start();
        jsc.awaitTermination();
    }

}

2 个解决方案

#1


0  

I started the salves with start-slaves.sh. That's the issue. I have to start workers by supplying master addres.

我用start-slaves.sh开始了药膏。这就是问题所在。我必须通过提供主要地址来启动工作人员。

#2


0  

Can you please let us know OS of all the nodes and if you have ensured that noting on Master node is exporting the HOSTNAME. Answering your question will be better if you let us know about your OS detail.

您能告诉我们所有节点的操作系统吗?如果您确保在主节点上注意到正在导出HOSTNAME。如果您告诉我们您的操作系统详细信息,那么回答您的问题会更好。

May not be correctly related to your context but just for information System.getenv("HOSTNAME") may not provide hostname in all the platforms (for example Ubuntu or Mac).

可能与您的上下文没有正确关联,但仅仅是为了获取信息System.getenv(“HOSTNAME”)可能不会在所有平台(例如Ubuntu或Mac)中提供主机名。

Better is why not export the HOSTNAME.

更好的是为什么不导出HOSTNAME。

Note: I am assuming you have already checked that props is not null or empty? If not debug and check whether properties files is loaded or not and if loaded it is not the empty properties file and hence it has loaded the properties from the file.

注意:我假设您已经检查过道具不为空或空?如果不调试并检查是否加载了属性文件,如果加载它不是空属性文件,因此它已加载文件中的属性。

Looking at your problem (not only environment variable but properties are also not returning, there may be something wrong with properties file or its relative location on different computers. If it is not exact copy which is placed at different computers, please also check if it is a file good for Linux (not written and edited in windows and put in linux).

查看您的问题(不仅环境变量,但属性也没有返回,属性文件或其在不同计算机上的相对位置可能有问题。如果不是精确副本放在不同的计算机上,请检查它是否是一个适合Linux的文件(不是在windows中编写和编译并放在linux中)。

#1


0  

I started the salves with start-slaves.sh. That's the issue. I have to start workers by supplying master addres.

我用start-slaves.sh开始了药膏。这就是问题所在。我必须通过提供主要地址来启动工作人员。

#2


0  

Can you please let us know OS of all the nodes and if you have ensured that noting on Master node is exporting the HOSTNAME. Answering your question will be better if you let us know about your OS detail.

您能告诉我们所有节点的操作系统吗?如果您确保在主节点上注意到正在导出HOSTNAME。如果您告诉我们您的操作系统详细信息,那么回答您的问题会更好。

May not be correctly related to your context but just for information System.getenv("HOSTNAME") may not provide hostname in all the platforms (for example Ubuntu or Mac).

可能与您的上下文没有正确关联,但仅仅是为了获取信息System.getenv(“HOSTNAME”)可能不会在所有平台(例如Ubuntu或Mac)中提供主机名。

Better is why not export the HOSTNAME.

更好的是为什么不导出HOSTNAME。

Note: I am assuming you have already checked that props is not null or empty? If not debug and check whether properties files is loaded or not and if loaded it is not the empty properties file and hence it has loaded the properties from the file.

注意:我假设您已经检查过道具不为空或空?如果不调试并检查是否加载了属性文件,如果加载它不是空属性文件,因此它已加载文件中的属性。

Looking at your problem (not only environment variable but properties are also not returning, there may be something wrong with properties file or its relative location on different computers. If it is not exact copy which is placed at different computers, please also check if it is a file good for Linux (not written and edited in windows and put in linux).

查看您的问题(不仅环境变量,但属性也没有返回,属性文件或其在不同计算机上的相对位置可能有问题。如果不是精确副本放在不同的计算机上,请检查它是否是一个适合Linux的文件(不是在windows中编写和编译并放在linux中)。