云原生已成为业界的主要趋势之一。将Flink从Yarn迁移到Kubernetes平台带来了许多优势。在这种架构下,将计算和存储解耦,计算部分运行在Kubernetes上,而存储则使用HDFS等分布式存储系统。这样的架构优势在于可以根据实际情况独立调整计算和存储资源,从而提高整体的效率和弹性。
本文将介绍四种Flink在Kubernetes上的部署模式。其中,两种是基于Native Kubernetes部署的,分别有Session模式和Application模式。另外两种是基于Flink Kubernetes Operator部署的,同样包括Session模式和Application模式。
首先介绍基于Flink Kubernetes Operator部署的Application模式。如果要运行自己编写的jar包,需要先构建一个镜像。如果使用了HDFS、Hudi等其他组件,还需要在Dockerfile中将Hadoop客户端和配置文件复制到镜像中,并设置相应的环境变量。同时,将所有依赖的jar包复制到Flink Home的lib目录下。
FROM flink:1.16.1-scala_2.12
USER root
RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo "Asia/Shanghai" > /etc/timezone
ADD hadoop-3.1.1.tar.gz  /opt
ADD jdk1.8.0_121.tar.gz /opt
RUN rm /opt/hadoop-3.1.1/share/hadoop/common/lib/commons-math3-3.1.1.jar
RUN rm /opt/hadoop-3.1.1/share/hadoop/hdfs/lib/commons-math3-3.1.1.jar
COPY commons-math3-3.6.1.jar /opt/hadoop-3.1.1/share/hadoop/common/lib/
COPY commons-math3-3.6.1.jar /opt/hadoop-3.1.1/share/hadoop/hdfs/lib/
RUN chmod -R 777 /opt/hadoop-3.1.1/share/hadoop/common/lib/
RUN mkdir -p /opt/hadoop/conf/
COPY yarn-site.xml /opt/hadoop/conf/
COPY core-site.xml /opt/hadoop/conf/
COPY hdfs-site.xml /opt/hadoop/conf/
COPY flink-shaded-hadoop-3-uber-3.1.1.7.0.3.0-79-7.0.jar $FLINK_HOME/lib/
COPY commons-cli-1.5.0.jar $FLINK_HOME/lib/
RUN mkdir $FLINK_HOME/mylib
COPY xxx-1.0-SNAPSHOT.jar $FLINK_HOME/mylib
RUN chown -R flink:flink $FLINK_HOME/mylib
RUN echo 'export JAVA_HOME=/opt/jdk1.8.0_121 \n\
export HADOOP_HOME=/opt/hadoop-3.1.1 \n\
PATH=$PATH:$JAVA_HOME/bin \n\
PATH=$PATH:$HADOOP_HOME/bin'\
>> ~/.bashrc
EXPOSE 8081构建镜像,在Dockerfile所在的目录中执行以下命令,确保该目录包含用于构建镜像的文件。
docker build -t flink-native/flink-on-k8s-xxxx .安装helm
curl https://baltocdn.com/helm/signing.asc | sudo apt-key add -sudo apt-get install apt-transport-https --yes
echo "deb https://baltocdn.com/helm/stable/debian/ all main" | sudo tee /etc/apt/sources.list.d/helm-stable-debian.list
sudo apt-get update
sudo apt-get install helm安装cert-manager组件,由它提供证书服务。
kubectl create -f https://github.com/jetstack/cert-manager/releases/download/v1.8.2/cert-manager.yaml安装Flink Kubernetes Operator
helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-1.4.0/
helm install flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator --namespace flink --create-namespace执行上述命令后,将会从ghcr.io/apache/flink-kubernetes-operator:7fc23a1镜像仓库拉取镜像。由于下载速度较慢,可以尝试从apache/flink-kubernetes-operator:7fc23a1仓库拉取镜像,然后为其添加标签
docker tag apache/flink-kubernetes-operator:7fc23a1 ghcr.io/apache/flink-kubernetes-operator:7fc23a1如果在重新安装时遇到使用kubectl delete无法删除的情况,可以尝试以下命令来实现删除操作:
kubectl patch crd/flinksessionjobs.flink.apache.org  -p '{"metadata":{"finalizers":[]}}' --type=merge通过执行上述命令,可以成功删除该资源。
查看自定义资源 kubectl get customresourcedefinition


构建一个YAML文件来提交任务。其中,image指定了镜像,jarURI指定了jar包在镜像中的位置,entryClass指定了要执行的类,args指定了该类所需的参数:
kind: FlinkDeployment
metadata:
  name: flink-application-xxx
spec:
  image: flink-native/flink-on-k8s-xxxx
  flinkVersion: v1_16
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "n"
  serviceAccount: flink
  jobManager:
    resource:
      memory: "nm"
      cpu: n
  taskManager:
    resource:
      memory: "nm"
      cpu: n
  job:
    jarURI: local:///opt/flink/mylib/xxx-1.0-SNAPSHOT.jar
    entryClass: com.xxx.run.XXXJob
    parallelism: n
    upgradeMode: stateless
    args: ["hdfs://host:9000/data/input","hdfs://host:9000/data/output","n"]提交任务kubectl create -f xxx.yaml


查看flinkdeployment kubectl get flinkdeployment

查看日志kubectl logs -f deploy/flink-application-xxx

Flink on K8S Session模式和Application模式
需要安装Flink客户端,下载flink压缩包,解压即可
设置命名空间首选项、赋权等
kubectl create ns flink-native
kubectl config set-context --current --namespace=flink-native
kubectl create serviceaccount flink
kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=cluster-admin --serviceaccount=flink-native:flink --namespace=flink-nativeSession模式,启动Flink集群
bin/kubernetes-session.sh \
  -Dkubernetes.cluster-id=xxx\
  -Dkubernetes.container.image=flink-native/flink-on-k8s-xxxx \
  -Dkubernetes.namespace=flink-native\
  -Dkubernetes.service-account=flink \
  -Dclassloader.check-leaked-classloader=false \
  -Dkubernetes.rest-service.exposed.type=ClusterIP \
  -Dtaskmanager.memory.process.size=4096m \
  -Dkubernetes.taskmanager.cpu=2 \
  -Dtaskmanager.numberOfTaskSlots=4 \
  -Dresourcemanager.taskmanager-timeout=60000 \
  -Dkubernetes.container-start-command-template="%java% %classpath% %jvmmem% %jvmopts% %logging% %class% %args%"端口转发
nohup kubectl -n flink-native port-forward --address 0.0.0.0 service/my-first-flink-cluster-rest 8081:8081 >port-forward.log &打开Flink Web UI,可以看到此时只有jobmanager
向集群提交任务,运行测试任务
bin/flink run -e kubernetes-session -Dkubernetes.namespace=flink-native -Dkubernetes.container.image=flink-native/flink-on-k8s-xxxx -Dkubernetes.rest-service.exposed.type=NodePort -Dkubernetes.cluster-id=my-first-flink-cluster examples/streaming/TopSpeedWindowing.jar运行自己的jar包
bin/flink run -e kubernetes-session \
    -Dkubernetes.namespace=flink-native \
    -Dkubernetes.rest-service.exposed.type=NodePort \
    -Dkubernetes.cluster-id=xxx \
    -c com.xxx.run.XXXJob \
   mylib/xxx-1.0-SNAPSHOT.jar hdfs://host:9000/data/input hdfs://host:9000/data/output 2查看pod,此时可以看到生成了taskmanager
kubectl get pod -o wide -A
查看日志,使用以下命令可以看到测试程序TopSpeedWindowing的输出结果
kubectl logs my-first-flink-cluster-taskmanager-1-1  -n flink-native
查看任务列表:
bin/flink list --target kubernetes-session -Dkubernetes.namespace=flink-native -Dkubernetes.jobmanager.service-account=flink -Dkubernetes.cluster-id=xxx  根据ID删除任务:
bin/flink cancel --target kubernetes-session -Dkubernetes.namespace=flink-native -Dkubernetes.jobmanager.service-account=flink -Dkubernetes.cluster-id=xxxxr  3ff3c5a5e3c2f47e024e2771dc108f77Application模式
bin/flink run-application \
    --target kubernetes-application \
    -Dkubernetes.cluster-id=xxx\
    -Dkubernetes.container.image=flink-native/flink-on-k8s-xxxx \
    -Dkubernetes.namespace=flink-native\
    -Dkubernetes.service-account=flink \
    -Dclassloader.check-leaked-classloader=false \
    -Dkubernetes.rest-service.exposed.type=ClusterIP \
    -c com.sohu.longuserprofile.run.TestJob \
    local:///opt/flink/mylib/xxx-1.0-SNAPSHOT.jar hdfs://host:9000/data/input hdfs://host:9000/data/output 2Session模式只能提交本地(宿主机)jar包,Application模式只能使用local:///
常用命令
k8s web ui 登录token获取
kubectl -n kubernetes-dashboard describe secret $(kubectl -n kubernetes-dashboard get secret | grep dashboard-admin | awk '{print $1}') | grep token:
查看所有pod列表
kubectl get pod -o wide -A
查看pod详细信息
kubectl describe pod pod_name -n flink-native
删除deployment
kubectl delete deployment/my-first-flink-cluster
进入pod
kubectl exec -it -n flink-native pod_name /bin/bash
获得所有命名空间
kubectl get namespace
拷贝出来
kubectl cp -n application psqls-0:/var/lib/postgresql/data/pg_wal /home
拷贝进去
kubectl cp /home/dades/pg_wal -n application psqls-0:/var/lib/postgresql/data/pg_wal            
                
            
        
            
评论