Knative Event kafka source

发布时间 2023-11-21 18:11:56作者: 小吉猫

环境说明

kafka 集群: strimzi-operator部署的kafka集群
  zookeeper 节点:3个
  kafka 节点: 1个
channel: Apache Kafka Channel
Broker:  Apache Kafka Broker
namespace:event-kafka-demo

创建 ns

# kubectl create ns event-kafka-demo
namespace/event-kafka-demo created

部署 eventing-kafka-controller

下载 eventing-kafka-controller.yaml

# wget https://github.com/knative-extensions/eventing-kafka-broker/releases/download/knative-v1.12.0/eventing-kafka-controller.yaml

替换 gcr.io 地址

# sed -i 's@gcr.io@gcr.dockerproxy.com@g' eventing-kafka-controller.yaml

部署 eventing-kafka-controller

# kubectl apply -f eventing-kafka-controller.yaml
configmap/kafka-broker-config created
configmap/kafka-channel-config created
customresourcedefinition.apiextensions.k8s.io/kafkachannels.messaging.knative.dev created
customresourcedefinition.apiextensions.k8s.io/consumers.internal.kafka.eventing.knative.dev created
customresourcedefinition.apiextensions.k8s.io/consumergroups.internal.kafka.eventing.knative.dev created
customresourcedefinition.apiextensions.k8s.io/kafkasinks.eventing.knative.dev created
customresourcedefinition.apiextensions.k8s.io/kafkasources.sources.knative.dev created
clusterrole.rbac.authorization.k8s.io/eventing-kafka-source-observer created
configmap/config-kafka-source-defaults created
configmap/config-kafka-autoscaler created
configmap/config-kafka-descheduler created
configmap/config-kafka-features created
configmap/config-kafka-leader-election created
configmap/config-kafka-scheduler created
configmap/kafka-config-logging created
configmap/config-namespaced-broker-resources created
configmap/config-tracing configured
clusterrole.rbac.authorization.k8s.io/knative-kafka-addressable-resolver created
clusterrole.rbac.authorization.k8s.io/knative-kafka-channelable-manipulator created
clusterrole.rbac.authorization.k8s.io/kafka-controller created
serviceaccount/kafka-controller created
clusterrolebinding.rbac.authorization.k8s.io/kafka-controller created
clusterrolebinding.rbac.authorization.k8s.io/kafka-controller-addressable-resolver created
deployment.apps/kafka-controller created
clusterrole.rbac.authorization.k8s.io/kafka-webhook-eventing created
serviceaccount/kafka-webhook-eventing created
clusterrolebinding.rbac.authorization.k8s.io/kafka-webhook-eventing created
mutatingwebhookconfiguration.admissionregistration.k8s.io/defaulting.webhook.kafka.eventing.knative.dev created
mutatingwebhookconfiguration.admissionregistration.k8s.io/pods.defaulting.webhook.kafka.eventing.knative.dev created
secret/kafka-webhook-eventing-certs created
validatingwebhookconfiguration.admissionregistration.k8s.io/validation.webhook.kafka.eventing.knative.dev created
deployment.apps/kafka-webhook-eventing created
service/kafka-webhook-eventing created

查看 eventing-kafka-controller pod

# kubectl get pods -n knative-eventing
NAME                                      READY   STATUS    RESTARTS       AGE
eventing-controller-75d79c8bfb-fpv26      1/1     Running   7 (84m ago)    6d16h
eventing-webhook-79bf558944-4j6rn         1/1     Running   7 (84m ago)    6d16h
imc-controller-8d958bbf5-xvhm7            1/1     Running   5 (84m ago)    4d20h
imc-dispatcher-799f9f548-fdd99            1/1     Running   6 (84m ago)    4d20h
kafka-controller-8c5487cdb-6w7gp          1/1     Running   0              72s
kafka-webhook-eventing-5c9c7cdb9d-kqcrq   1/1     Running   0              55s
mt-broker-controller-7b98899b48-m45w6     1/1     Running   5 (84m ago)    4d20h
mt-broker-filter-788b867775-gwqn2         1/1     Running   7 (84m ago)    4d20h
mt-broker-ingress-5f5b69fb49-7d6wn        1/1     Running   14 (82m ago)   4d20h
pingsource-mt-adapter-6588d57445-5mrgq    1/1     Running   6 (84m ago)    5d22h

layer

下载 eventing-kafka-channel.yaml

# wget https://github.com/knative-extensions/eventing-kafka-broker/releases/download/knative-v1.12.0/eventing-kafka-channel.yaml

替换 gcr.io 地址

# sed -i 's@gcr.io@gcr.dockerproxy.com@g' eventing-kafka-channel.yaml

部署 eventing-kafka-channel

# kubectl apply -f eventing-kafka-channel.yaml
deployment.apps/kafka-channel-dispatcher created
deployment.apps/kafka-channel-receiver created
service/kafka-channel-ingress created

查看 eventing-kafka-channel pod

# kubectl get pods -n knative-eventing
NAME                                        READY   STATUS    RESTARTS       AGE
eventing-controller-75d79c8bfb-fpv26        1/1     Running   7 (100m ago)   6d16h
eventing-webhook-79bf558944-4j6rn           1/1     Running   7 (100m ago)   6d16h
imc-controller-8d958bbf5-xvhm7              1/1     Running   5 (100m ago)   4d21h
imc-dispatcher-799f9f548-fdd99              1/1     Running   6 (100m ago)   4d21h
kafka-channel-dispatcher-56b4997b8b-ctvx2   1/1     Running   0              8m52s
kafka-channel-receiver-6dd986d55d-b89m5     1/1     Running   0              8m52s
kafka-controller-8c5487cdb-6w7gp            1/1     Running   0              17m
kafka-webhook-eventing-5c9c7cdb9d-kqcrq     1/1     Running   0              17m
mt-broker-controller-7b98899b48-m45w6       1/1     Running   5 (100m ago)   4d21h
mt-broker-filter-788b867775-gwqn2           1/1     Running   7 (100m ago)   4d21h
mt-broker-ingress-5f5b69fb49-7d6wn          1/1     Running   14 (98m ago)   4d21h
pingsource-mt-adapter-6588d57445-5mrgq      1/1     Running   6 (100m ago)   5d22h

查看 kafka-channel api

# kubectl api-resources |grep channel
channels                          ch                                              messaging.knative.dev/v1                       true         Channel
inmemorychannels                  imc                                             messaging.knative.dev/v1                       true         InMemoryChannel
kafkachannels                     kc                                              messaging.knative.dev/v1beta1                  true         KafkaChannel

配置 KafkaChannel 为默认 channel

查看 KafkaChannel group 信息

# kubectl explain KafkaChannel
GROUP:      messaging.knative.dev
KIND:       KafkaChannel
VERSION:    v1beta1

...

default-channel-config.yaml

apiVersion: v1
kind: ConfigMap
metadata:
  name: default-ch-webhook
  namespace: knative-eventing
  labels:
    eventing.knative.dev/release: devel
    app.kubernetes.io/version: devel
    app.kubernetes.io/part-of: knative-eventing
data:
  default-ch-config: |
    clusterDefault:
      apiVersion: messaging.knative.dev/v1
      kind: InMemoryChannel
    namespaceDefaults:
      event-kafka-demo:               # 按需指定名称空间
        apiVersion: messaging.knative.dev/v1beta1
        kind: KafkaChannel
        spec:
          numPartitions: 5               # 在Topic上默认使用的partition的数量,默认为1;
          replicationFactor: 1           # 在Topic上默认使用的复制因子,其值不能大于Kafka上的broker数量,即可用节点数,默认值为1;

创建 default-channel

# kubectl apply -f default-channel-config.yaml
configmap/default-ch-webhook configured

layer

下载 eventing-kafka-broker.yaml

# wget https://github.com/knative-extensions/eventing-kafka-broker/releases/download/knative-v1.12.0/eventing-kafka-broker.yaml

替换 gcr.io 地址

# sed -i 's@gcr.io@gcr.dockerproxy.com@g' eventing-kafka-broker.yaml

部署 eventing-kafka-broker

# kubectl apply -f eventing-kafka-broker.yaml
configmap/config-kafka-broker-data-plane created
clusterrole.rbac.authorization.k8s.io/knative-kafka-broker-data-plane created
serviceaccount/knative-kafka-broker-data-plane created
clusterrolebinding.rbac.authorization.k8s.io/knative-kafka-broker-data-plane created
deployment.apps/kafka-broker-dispatcher created
deployment.apps/kafka-broker-receiver created
service/kafka-broker-ingress created

查看 eventing-kafka-broker pod

# kubectl get pods -n knative-eventing
NAME                                        READY   STATUS    RESTARTS         AGE
eventing-controller-75d79c8bfb-fpv26        1/1     Running   7 (3h12m ago)    6d18h
eventing-webhook-79bf558944-4j6rn           1/1     Running   7 (3h12m ago)    6d18h
imc-controller-8d958bbf5-xvhm7              1/1     Running   5 (3h12m ago)    4d22h
imc-dispatcher-799f9f548-fdd99              1/1     Running   6 (3h12m ago)    4d22h
kafka-broker-dispatcher-7dcdf8fb6f-smxh8    1/1     Running   0                69s
kafka-broker-receiver-f7f48786f-bz64p       1/1     Running   0                69s
kafka-channel-dispatcher-56b4997b8b-ctvx2   1/1     Running   0                101m
kafka-channel-receiver-6dd986d55d-b89m5     1/1     Running   0                101m
kafka-controller-8c5487cdb-6w7gp            1/1     Running   0                109m
kafka-webhook-eventing-5c9c7cdb9d-kqcrq     1/1     Running   0                109m
mt-broker-controller-7b98899b48-m45w6       1/1     Running   5 (3h12m ago)    4d22h
mt-broker-filter-788b867775-gwqn2           1/1     Running   7 (3h12m ago)    4d22h
mt-broker-ingress-5f5b69fb49-7d6wn          1/1     Running   14 (3h10m ago)   4d22h
pingsource-mt-adapter-6588d57445-5mrgq      1/1     Running   6 (3h12m ago)    6d

配置 kafka broker

configmap-kafka-broker-config.yaml

apiVersion: v1
kind: ConfigMap
metadata:
  name: kafka-broker-config
  namespace: knative-eventing
data:
  # Number of topic partitions
  default.topic.partitions: "5"  # 在Topic上默认使用的partition的数量
  # Replication factor of topic messages.
  default.topic.replication.factor: "1"  # 在Topic上默认使用的复制因子,其值不能大于Kafka上的broker数量,即可用节点数
  # A comma separated list of bootstrap servers. (It can be in or out the k8s cluster)
  bootstrap.servers: "my-cluster-kafka-bootstrap.kafka:9092"   # 即Kafka集群的Bootstrap Server的访问入口

更新 kafka broker 配置

# kubectl apply -f configmap-kafka-broker-config.yaml
configmap/kafka-broker-config configured

配置 kafka broker 为默认的broker

configmap-default-br-config.yaml

apiVersion: v1
kind: ConfigMap
metadata:
  name: config-br-defaults
  namespace: knative-eventing
data:
  default-br-config: |
    clusterDefault:
      brokerClass: MTChannelBasedBroker
      apiVersion: v1
      kind: ConfigMap
      name: config-br-default-channel
      namespace: knative-eventing
      delivery:
        retry: 10
        backoffPolicy: exponential
        backoffDelay: PT0.2S
    namespaceDefaults:
      event-kafka-demo:
        brokerClass: Kafka
        apiVersion: v1
        kind: ConfigMap
        name: kafka-broker-config
        namespace: knative-eventing

更新默认 broker 类型配置

# kubectl apply -f configmap-default-br-config.yaml
configmap/config-br-defaults configured

配置 config-br-default-channel

config-br-default-channel.yaml

apiVersion: v1
kind: ConfigMap
metadata:
  labels:
    app.kubernetes.io/name: knative-eventing
    app.kubernetes.io/version: 1.12.0
  name: config-br-default-channel
  namespace: knative-eventing
data:
  channel-template-spec: |
    apiVersion: messaging.knative.dev/v1beta1
    kind: KafkaChannel

更新 config-br-default-channel

# kubectl apply -f config-br-default-channel.yaml
configmap/config-br-default-channel configured

下载 eventing-kafka-source.yaml

# wget https://github.com/knative-extensions/eventing-kafka-broker/releases/download/knative-v1.12.0/eventing-kafka-source.yaml

替换 gcr.io 地址

# sed -i 's@gcr.io@gcr.dockerproxy.com@g' eventing-kafka-source.yaml

部署 eventing-kafka-source

# kubectl apply -f eventing-kafka-source.yaml
configmap/config-kafka-source-data-plane created
clusterrole.rbac.authorization.k8s.io/knative-kafka-source-data-plane created
serviceaccount/knative-kafka-source-data-plane created
clusterrolebinding.rbac.authorization.k8s.io/knative-kafka-source-data-plane created
statefulset.apps/kafka-source-dispatcher created

查看 eventing-kafka-spurces api

# kubectl api-resources |grep -i sources
apiserversources                                                                  sources.knative.dev/v1                         true         ApiServerSource
containersources                                                                  sources.knative.dev/v1                         true         ContainerSource
gitlabsources                                                                     sources.knative.dev/v1alpha1                   true         GitLabSource
kafkasources                                                                      sources.knative.dev/v1beta1                    true         KafkaSource
pingsources                                                                       sources.knative.dev/v1                         true         PingSource
sinkbindings                                                                      sources.knative.dev/v1                         true         SinkBinding

创建 Kafka topic

 knative-demo-topic.yaml

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
  name: knative-demo-topic
  namespace: kafka         # 和 kafka server端保持一致
  labels:
    strimzi.io/cluster: my-cluster
spec:
  partitions: 5
  replicas: 1
  config:
    retention.ms: 7200000
    segment.bytes: 1073741824

创建 knative topic

# kubectl apply -f knative-demo-topic.yaml
kafkatopic.kafka.strimzi.io/knative-demo-topic created

查看 knative topic

# kubectl get kafkatopics -n kafka
NAME                                                                                               CLUSTER      PARTITIONS   REPLICATION FACTOR   READY
consumer-offsets---84e7a678d08f4bd226872e5cdd4eb527fadc1c6a                                        my-cluster   50           1                    True
knative-demo-topic                                                                                 my-cluster   5            1                    True
strimzi-store-topic---effb8e3e057afce1ecf67c3f5d8e4e3ff177fc55                                     my-cluster   1            1                    True
strimzi-topic-operator-kstreams-topic-store-changelog---b75e702040b99be8a9263134de3507fc0cc4017b   my-cluster   1            1                    True

连接测试

# kubectl -n kafka run kafka-producer -ti --image=quay.io/strimzi/kafka:0.38.0-kafka-3.6.0 --rm=true --restart=Never -- bin/kafka-console-producer.sh --bootstrap-server my-cluster-kafka-bootstrap:9092  --topic knative-demo-topic
If you don't see a command prompt, try pressing enter.
>0
>o
>
没有任何警告提示信息表示topics创建正常。

创建 event-display 

event-display.yaml

apiVersion: serving.knative.dev/v1
kind: Service
metadata:
  name: event-display
  namespace: event-kafka-demo
spec:
  template:
    metadata:
      annotations:
        autoscaling.knative.dev/min-scale: "1"
    spec:
      containers:
        - image: gcr.dockerproxy.com/knative-releases/knative.dev/eventing/cmd/event_display

生成 Knative Service 资源

# kubectl apply -f event-display.yaml
service.serving.knative.dev/event-display created

查看 ksvc 

# kubectl get ksvc -n event-kafka-demo
NAME            URL                                                   LATESTCREATED         LATESTREADY           READY   REASON
event-display   http://event-display.event-kafka-demo.svc.wgs.local   event-display-00001   event-display-00001   True    

创建 broker

kafka-broker.yaml

apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
 name: default
 namespace: event-kafka-demo

创建 broker 资源

# kubectl apply -f kafka-broker.yaml
broker.eventing.knative.dev/default created

查看 broker

# kubectl get broker -n event-kafka-demo
NAME      URL                                                                                   AGE   READY   REASON
default   http://kafka-broker-ingress.knative-eventing.svc.wgs.local/event-kafka-demo/default   67s   True   

查看 broker 类型

# kubectl describe broker -n event-kafka-demo
Name:         default
Namespace:    event-kafka-demo
Labels:       <none>
Annotations:  eventing.knative.dev/broker.class: Kafka
              eventing.knative.dev/creator: kubernetes-admin
              eventing.knative.dev/lastModifier: kubernetes-admin
API Version:  eventing.knative.dev/v1
Kind:         Broker
Metadata:
  Creation Timestamp:  2023-11-21T09:52:16Z
  Finalizers:
    brokers.eventing.knative.dev
  Generation:        1
  Resource Version:  1945640
  UID:               3a2c7ae4-ca80-422b-9d41-136032b97a5c
Spec:
  Config:
    API Version:  v1
    Kind:         ConfigMap
    Name:         kafka-broker-config
    Namespace:    knative-eventing
Status:
  Address:
    Name:  http
    URL:   http://kafka-broker-ingress.knative-eventing.svc.wgs.local/event-kafka-demo/default
  Addresses:
    Name:  http
    URL:   http://kafka-broker-ingress.knative-eventing.svc.wgs.local/event-kafka-demo/default
  Annotations:
    bootstrap.servers:                 my-cluster-kafka-bootstrap.kafka:9092
    default.topic:                     knative-broker-event-kafka-demo-default
    default.topic.partitions:          10
    default.topic.replication.factor:  1
...

创建 trigger

trigger-event.yaml

apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  name: trigger-demo
  namespace: event-kafka-demo
spec:
  broker: default
  #filter:
  #  attributes:
  #    type: dev.knative.kafka.event
  subscriber:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: event-display
      namespace: event-kafka-demo

创建 trigger 资源 

# kubectl apply -f trigger-event.yaml
trigger.eventing.knative.dev/trigger-demo created

查看 trigger 资源

# kubectl get trigger -n event-kafka-demo
NAME           BROKER    SUBSCRIBER_URI                                        AGE   READY   REASON
trigger-demo   default   http://event-display.event-kafka-demo.svc.wgs.local   52s   True    

创建 Kafka event source

event-source.yaml

apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
  name: kafka-source
  namespace: event-kafka-demo
spec:
  consumerGroup: knative-group
  bootstrapServers:
  - my-cluster-kafka-bootstrap.kafka:9092 # note the kafka namespace
  topics:
  - knative-demo-topic
  sink:
    ref:
      apiVersion: eventing.knative.dev/v1
      kind: Broker
      name: default
      namespace: event-kafka-demo

创建 event source

# kubectl apply -f event-source.yaml 
kafkasource.sources.knative.dev/kafka-source created

查看 event source

# kubectl get kafkasource -n event-kafka-demo
NAME           TOPICS                   BOOTSTRAPSERVERS                            READY   REASON   AGE
kafka-source   ["knative-demo-topic"]   ["my-cluster-kafka-bootstrap.kafka:9092"]   True             16m

kafka event 测试

查看 event-display pod

# kubectl get pods -n event-kafka-demo
NAME                                             READY   STATUS    RESTARTS      AGE
event-display-00001-deployment-7764b69f9-66pnh   2/2     Running   6 (43m ago)   168m

发送消息

# kubectl -n kafka run kafka-producer -ti --image=quay.io/strimzi/kafka:0.38.0-kafka-3.6.0 --rm=true --restart=Never -- bin/kafka-console-producer.sh --bootstrap-server my-cluster-kafka-bootstrap.kafka:9092  --topic knative-demo-topic
If you don't see a command prompt, try pressing enter.
>kafka source to broker to trigger to sink
>{"msg": "kafka source to broker to trigger to sink"}
>

查看 event 输出

# kubectl logs event-display-00001-deployment-7764b69f9-66pnh -c user-container -n event-kafka-demo
☁️  cloudevents.Event
Context Attributes,
  specversion: 1.0
  type: dev.knative.kafka.event
  source: /apis/v1/namespaces/event-kafka-demo/kafkasources/kafka-source#knative-demo-topic
  subject: partition:4#2
  id: partition:4/offset:2
  time: 2023-11-21T10:03:13.428Z
Data,
  kafka source to broker to trigger to sink
☁️  cloudevents.Event
Context Attributes,
  specversion: 1.0
  type: dev.knative.kafka.event
  source: /apis/v1/namespaces/event-kafka-demo/kafkasources/kafka-source#knative-demo-topic
  subject: partition:4#3
  id: partition:4/offset:3
  time: 2023-11-21T10:04:35.459Z
Data,
  {"msg": "kafka source to broker to trigger to sink"}

参考文档

https://knative.dev/docs/eventing/sources/kafka-source/