Knative Eventing Parallel Flow 示例

发布时间 2023-11-20 12:13:23作者: 小吉猫

环境说明

◼  PingSource负责生成event
◼ Parallel中有两个Branch
  ◆ 第一个分支接受时间为偶数的事件
  ◆ 第二个分支接受时间为奇数的事件
◼ 所有分支的最终结果均发往ksvc/event-display,内容格式化Cloud Event存储入日志

创建名称空间

# kubectl create ns parallel-demo
namespace/parallel-demo created

创建 Filter

transformers.yaml

apiVersion: serving.knative.dev/v1
kind: Service
metadata:
  name: even-filter
  namespace: parallel-demo
spec:
  template:
    spec:
      containers:
      - image: villardl/filter-nodejs:0.1
        env:
        - name: FILTER
          value: |
            Math.round(Date.parse(event.time) / 60000) % 2 === 0
---
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
  name: odd-filter
  namespace: parallel-demo
spec:
  template:
    spec:
      containers:
      - image: villardl/filter-nodejs:0.1
        env:
        - name: FILTER
          value: |
            Math.round(Date.parse(event.time) / 60000) % 2 !== 0
---
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
  name: even-transformer
  namespace: parallel-demo
spec:
  template:
    spec:
      containers:
      - image: villardl/transformer-nodejs
        env:
        - name: TRANSFORMER
          value: |
            ({"message": "we are even!"})

---
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
  name: odd-transformer
  namespace: parallel-demo
spec:
  template:
    spec:
      containers:
      - image: villardl/transformer-nodejs:0.1
        env:
        - name: TRANSFORMER
          value: |
            ({"message": "this is odd!"})

创建 Filter 资源

# kubectl apply -f transformers.yaml
service.serving.knative.dev/even-filter created
service.serving.knative.dev/odd-filter created
service.serving.knative.dev/even-transformer created
service.serving.knative.dev/odd-transformer created

查看 Filter ksvc

# kubectl get ksvc -n parallel-demo
NAME               URL                                                   LATESTCREATED            LATESTREADY              READY   REASON
even-filter        http://even-filter.parallel-demo.svc.wgs.local        even-filter-00001        even-filter-00001        True    
even-transformer   http://even-transformer.parallel-demo.svc.wgs.local   even-transformer-00001   even-transformer-00001   True    
odd-filter         http://odd-filter.parallel-demo.svc.wgs.local         odd-filter-00001         odd-filter-00001         True    
odd-transformer    http://odd-transformer.parallel-demo.svc.wgs.local    odd-transformer-00001    odd-transformer-00001    True    

创建 event-display

event-display.yaml

apiVersion: serving.knative.dev/v1
kind: Service
metadata:
  name: event-display
  namespace: parallel-demo
spec:
  template:
    metadata:
      annotations:
        autoscaling.knative.dev/min-scale: "1"
    spec:
      containers:
        - image: gcr.dockerproxy.com/knative-releases/knative.dev/eventing/cmd/event_display

创建 event-display 资源

# kubectl apply -f event-display.yaml
service.serving.knative.dev/event-display created

查看 event-display 资源

# kubectl get ksvc -n parallel-demo
NAME               URL                                                   LATESTCREATED            LATESTREADY              READY   REASON
even-filter        http://even-filter.parallel-demo.svc.wgs.local        even-filter-00001        even-filter-00001        True    
even-transformer   http://even-transformer.parallel-demo.svc.wgs.local   even-transformer-00001   even-transformer-00001   True    
event-display      http://event-display.parallel-demo.svc.wgs.local      event-display-00001      event-display-00001      True    
odd-filter         http://odd-filter.parallel-demo.svc.wgs.local         odd-filter-00001         odd-filter-00001         True    
odd-transformer    http://odd-transformer.parallel-demo.svc.wgs.local    odd-transformer-00001    odd-transformer-00001    True    

创建 Parallel

parallel.yaml

apiVersion: flows.knative.dev/v1
kind: Parallel
metadata:
  name: odd-even-parallel
  namespace: parallel-demo
spec:
  channelTemplate:
    apiVersion: messaging.knative.dev/v1
    kind: InMemoryChannel
  branches:
    - filter:
        ref:
          apiVersion: serving.knative.dev/v1
          kind: Service
          name: even-filter
          namespace: parallel-demo
      subscriber:
        ref:
          apiVersion: serving.knative.dev/v1
          kind: Service
          name: even-transformer
          namespace: parallel-demo
    - filter:
        ref:
          apiVersion: serving.knative.dev/v1
          kind: Service
          name: odd-filter
          namespace: parallel-demo
      subscriber:
        ref:
          apiVersion: serving.knative.dev/v1
          kind: Service
          name: odd-transformer
          namespace: parallel-demo
  reply:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: event-display
      namespace: parallel-demo

创建 parallel 资源

# kubectl apply -f parallel.yaml
parallel.flows.knative.dev/odd-even-parallel created

查看 parallel 资源

# kubectl get parallel -n parallel-demo
NAME                URL                                                                           AGE   READY   REASON
odd-even-parallel   http://odd-even-parallel-kn-parallel-kn-channel.parallel-demo.svc.wgs.local   70s   True    

创建 PingSource

ping-source.yaml

apiVersion: sources.knative.dev/v1
kind: PingSource
metadata:
  name: ping-source
  namespace: parallel-demo
spec:
  schedule: "*/1 * * * *"
  contentType: "application/json"
  data: '{"message": "Even or odd?"}'
  sink:
    ref:
      apiVersion: flows.knative.dev/v1
      kind: Parallel
      name: odd-even-parallel
      namespace: parallel-demo

创建 pingsource 资源

# kubectl apply -f ping-source.yaml
pingsource.sources.knative.dev/ping-source created

查看 pingsource 资源

# kubectl get pingsource -n parallel-demo
NAME          SINK                                                                          SCHEDULE      AGE   READY   REASON
ping-source   http://odd-even-parallel-kn-parallel-kn-channel.parallel-demo.svc.wgs.local   */1 * * * *   13s   True    

验证结果

查看 pod

# kubectl get pod -n parallel-demo
NAME                                                 READY   STATUS    RESTARTS   AGE
event-display-00001-deployment-68f569cbc6-6ff8h      2/2     Running   0          5m47s

查看日志

# kubectl logs event-display-00001-deployment-68f569cbc6-6ff8h -c user-container -n parallel-demo
☁️  cloudevents.Event
Context Attributes,
  specversion: 1.0
  type: dev.knative.sources.ping
  source: /apis/v1/namespaces/parallel-demo/pingsources/ping-source
  id: 5ed746c7-5919-4d23-8512-817ce0e6675d
  time: 2023-11-20T03:52:00.078441452Z
  datacontenttype: application/json; charset=utf-8
Data,
  {
    "message": "we are even!"
  }
☁️  cloudevents.Event
Context Attributes,
  specversion: 1.0
  type: dev.knative.sources.ping
  source: /apis/v1/namespaces/parallel-demo/pingsources/ping-source
  id: ecbc2852-2d80-4ffc-9ce4-ad1ae07c98b3
  time: 2023-11-20T03:53:00.343279246Z
  datacontenttype: application/json; charset=utf-8
Data,
  {
    "message": "this is odd!"
  }
☁️  cloudevents.Event
Context Attributes,
  specversion: 1.0
  type: dev.knative.sources.ping
  source: /apis/v1/namespaces/parallel-demo/pingsources/ping-source
  id: 7e5dbcde-6687-4f9f-aa2e-15446a655ec3
  time: 2023-11-20T03:54:00.034946307Z
  datacontenttype: application/json; charset=utf-8
Data,
  {
    "message": "we are even!"
  }
☁️  cloudevents.Event
Context Attributes,
  specversion: 1.0
  type: dev.knative.sources.ping
  source: /apis/v1/namespaces/parallel-demo/pingsources/ping-source
  id: 52fb84bb-bdcc-4c18-ab5b-a485d8be7459
  time: 2023-11-20T03:55:00.034468672Z
  datacontenttype: application/json; charset=utf-8
Data,
  {
    "message": "this is odd!"
  }

参考文档

https://knative.dev/docs/eventing/flows/parallel/

https://github.com/knative/docs/blob/main/code-samples/eventing/parallel/multiple-branches/README.md