关于Flink,TaskManager日志问题的一个记录

时间:2025-04-04 09:04:57

关于Flink,TaskManager日志问题的一个记录

疑问:不知道大家有和我一样,开发完的flink代码推送到flink集群上执行的时候(xxxx)打印的日志不会打印到task-manager节点上去(在IDEA上执行可以打印日志到控制台上),为此一直在困扰了,经过一系列的尝试,终于在java和scala中实现(xxx)打印到task-manager节点上。

  • java代码demo:
@Slf4j
public class Test {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironmentUtil.getStreamExecutionEnvironment();

        DataStreamSource<String> source = env.socketTextStream("prod", 9999);
        SingleOutputStreamOperator<Tuple2<String, Integer>> result = source.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                String[] split = value.split(",");
                for (String s : split) {
                    out.collect(Tuple2.of(s, 1));
                }
                log.info("打印的日志信息");
            }
        }).keyBy(0)
                .sum(1);
        result.print(">>>>");

        env.execute("Test");
    }
}

这里主要是利用到了lombok的@Slf4j注解的方式来实现,这里观察到编译后的class文件如下

public class Test {
  //自动增加了一个static final类型的Logger对象,这样在下面使用过程中才会
  private static final Logger log = LoggerFactory.getLogger(Test.class);
  
  public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironmentUtil.getStreamExecutionEnvironment();
    DataStreamSource<String> source = env.socketTextStream("prod", 9999);
    SingleOutputStreamOperator<Tuple2<String, Integer>> result = source.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
          public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
            String[] split = value.split(",");
            for (String s : split)
              out.collect(Tuple2.of(s, Integer.valueOf(1))); 
              //通过类名.方法名进行调用
              Test.log.info("打印的日志信息");
          }
        }).keyBy(new int[] { 0 }).sum(1);
    result.print(">>>>");
    env.execute("Test");
  }
}

这样将代码提交到flink的集群环境,在task-manager日志中也会打印出来

  • sacal代码demo:

scala中不支持使用lombok的方式,所以这里通过伴生对象的方式实现编译后的文件创建static final类型的Logger对象

object StreamWordCount {
  val log = LoggerFactory.getLogger(this.getClass.getName)

  def main(args: Array[String]): Unit = {
    val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    import org.apache.flink.streaming.api.scala._
    val stream: DataStream[String] = streamEnv.socketTextStream("prod", 9999)
    val result: DataStream[(String, Int)] = stream.flatMap(_.split(","))
      .map(new MapFunction[String, Tuple2[String, Int]] {
        override def map(value: String): Tuple2[String, Int] = {
          log.info("打印日志")
          Tuple2.apply(value, 1)
        }
      })
      .keyBy(0)
      .sum(1)
    result.print()

    streamEnv.execute("wordcount")
  }
}

编译后的class文件:

public final class StreamWordCount$ {
  public static StreamWordCount$ MODULE$;
  //  log创建
  private final Logger log;
  
  public Logger log() {
    return this.log;
  }
  
  public void main(String[] args) {
    StreamExecutionEnvironment streamEnv = org.apache.flink.streaming.api.scala.StreamExecutionEnvironment$.MODULE$.getExecutionEnvironment();
    DataStream stream = streamEnv.socketTextStream("prod", 9999, streamEnv.socketTextStream$default$3(), streamEnv.socketTextStream$default$4());
    DataStream result = stream.flatMap(StreamWordCount$::$anonfun$main$1$adapted, (TypeInformation)BasicTypeInfo.getInfoFor(String.class)).map(new StreamWordCount$$anon$3(), (TypeInformation)new StreamWordCount$$anon$2()).keyBy((Seq)scala.Predef$.MODULE$.wrapIntArray(new int[] { 0 })).sum(1);
    result.print();
    streamEnv.execute("wordcount");
  }
  
  public final class StreamWordCount$$anon$2 extends CaseClassTypeInfo<Tuple2<String, Object>> {
    public TypeSerializer<Tuple2<String, Object>> createSerializer(ExecutionConfig executionConfig) {
    }
    
    public StreamWordCount$$anon$2() {
      super(Tuple2.class, (TypeInformation[])(new scala.collection.immutable..colon.colon(BasicTypeInfo.getInfoFor(String.class), (List)new scala.collection.immutable..colon.colon(BasicTypeInfo.getInfoFor(int.class), (List)scala.collection.immutable.Nil$.MODULE$))).toArray((ClassTag)scala.Predef$.MODULE$.implicitly(scala.reflect.ClassTag$.MODULE$.apply(TypeInformation.class))), (Seq)new scala.collection.immutable..colon.colon(BasicTypeInfo.getInfoFor(String.class), (List)new scala.collection.immutable..colon.colon(BasicTypeInfo.getInfoFor(int.class), (List)scala.collection.immutable.Nil$.MODULE$)), (Seq)scala.collection.Seq$.MODULE$.apply((Seq)scala.Predef$.MODULE$.wrapRefArray((Object[])new String[2])));
    }
    
    public final class StreamWordCount$$anon$2$$anon$1 extends ScalaCaseClassSerializer<Tuple2<String, Object>> {
      public Tuple2<String, Object> createInstance(Object[] fields) {
        return new Tuple2(fields[0], BoxesRunTime.boxToInteger(BoxesRunTime.unboxToInt(fields[1])));
      }
      
      public StreamWordCount$$anon$2$$anon$1(StreamWordCount$$anon$2 $outer, TypeSerializer[] fieldSerializers$1) {
        super($outer.getTypeClass(), fieldSerializers$1);
      }
    }
  }
  
  public final class StreamWordCount$$anon$3 implements MapFunction<String, Tuple2<String, Object>> {
    public Tuple2<String, Object> map(String value) {
    	//通过静态方法调用
      StreamWordCount$.MODULE$.log().info("打印日志");
      return new Tuple2(value, BoxesRunTime.boxToInteger(1));
    }
  }
  
  private StreamWordCount$() {
    MODULE$ = this;
    this.log = LoggerFactory.getLogger(getClass().getName());
  }
}

结论:

log对象的是 private static final 修饰的