Flink K8s Operator 测试验证

时间:2024-03-16 20:30:09

一 Submitting a Flink job

basic.yaml

apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-example
spec:
  image: 10.177.85.101:8000/flink/flink:1.16
  flinkVersion: v1_16
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
  serviceAccount: flink
  jobManager:
    resource:
      memory: "2048m"
      cpu: 1
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
  job:
    jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
    parallelism: 2
    upgradeMode: stateless

提交job:

kubectl create -f basic.yaml

To expose the Flink Dashboard you may add a port-forward rule or look the ingress configuration options:

kubectl port-forward --address 0.0.0.0 svc/basic-example-rest 8081 -n flink-operator

Now the Flink Dashboard is accessible at ip:8081.

删除job:

kubectl delete flinkdeployment/basic-example

二 HA and CheckPoint

basic-checkpoint-ha.yaml

apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-checkpoint-ha-example
spec:
  image: 10.177.85.101:8000/flink/flink:1.16
  flinkVersion: v1_16
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
    state.savepoints.dir: file:///flink-data/savepoints
    state.checkpoints.dir: file:///flink-data/checkpoints
    high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
    high-availability.storageDir: file:///flink-data/ha
  serviceAccount: flink
  jobManager:
    resource:
      memory: "2048m"
      cpu: 1
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
  podTemplate:
    spec:
      containers:
        - name: flink-main-container
          volumeMounts:
          - mountPath: /flink-data
            name: flink-volume
      volumes:
      - name: flink-volume
        hostPath:
          # directory location on host
          path: /tmp/flink
          # this field is optional
          type: Directory
  job:
    jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
    parallelism: 2
    upgradeMode: savepoint
    state: running
    savepointTriggerNonce: 0

提交job:

kubectl create -f basic-checkpoint-ha.yaml

欢迎关注微信公众号:大数据AI