Using Kafka protocol filter with Istio

Hi,

I’m trying to use the kafka protocol filter (https://www.envoyproxy.io/docs/envoy/latest/configuration/listeners/network_filters/kafka_broker_filter) to get some additional metrics.
I’m new to using envoy filters, so perhaps there’s a couple a things I didn’t fully understand, but I was able to set it up, it kinda produce metrics and my app continues to work, which is already great!
However, all counters are … stuck to 0 :frowning: despite traffic flowing through kafka.

What I’m doing is applying this filter on my client sidecars. Perhaps that’s my mistake and it’s only meant to work on kafka pod itself?

Here’s the EnvoyFilter object I’m using:

apiVersion: networking.istio.io/v1alpha3
kind: EnvoyFilter
metadata:
  name: kafka-protocol
  namespace: default
spec:
  configPatches:
    - applyTo: NETWORK_FILTER
      match:
        context: SIDECAR_OUTBOUND
        listener:
          portNumber: 9092
          filterChain:
            filter:
              name: "envoy.tcp_proxy"
      patch:
        operation: INSERT_BEFORE
        value:
          name: "envoy.filters.network.kafka_broker"
          typed_config:
            "@type": type.googleapis.com/envoy.config.filter.network.kafka_broker.v2alpha1.KafkaBroker
            stat_prefix: broker

and in pods, the corresponding annotation has to be set:

sidecar.istio.io/statsInclusionPrefixes: cluster_manager,listener_manager,broker

Any idea? Someone already managed to use this protocol filter with Istio?

1 Like

Do someone has any guidance on this topic? I’m interested in running Kafka inside Istio with mTLS. However, I do not find any clear guidance about its feasibility?

I’ve been considering to go for Envoy Filters but your post discouraged me. Did you have any improvements on this issue?

My issue here was about not seeing some additional metrics that I was expecting to see. Other than that, it was working pretty well. Here’s some info about the scenarios that I tried (for testing with Kiali) : Make Kiali working better with Kafka · Issue #2197 · kiali/kiali · GitHub (the various issues in kiali mentioned there are fixed). I’ve been using strimzi to deploy kafka, but I guess any other deployment is valid.

In case you give it a try, I’d love to have some feedback!

So it is Strimzi enabling mTLS for Kafka on top of Istio? I will give it a try.

I used the Confluent helm charts to get this working so its certainly possible with other distributions. Some changes i had to make after reading issues elsewhere:

1.) Add service entries for the headless services:

apiVersion: networking.istio.io/v1alpha3
kind: ServiceEntry
metadata:
    name: zookeeper
spec:
    hosts:
        - broker-0.cp-zookeeper-headless.kafka.svc.cluster.local
        - broker-0.cp-zookeeper-headless.kafka.svc.cluster.local
        - broker-0.cp-zookeeper-headless.kafka.svc.cluster.local
    ports:
        - name: server
           number: 2888
           protocol: TCP
        - name: leader-election
          number: 3888
          protocol: TCP
   location: MESH_INTERNAL
   resolution: NONE

2.) Exclude the Zookeeper internal traffic from MTLS (still to get this working)
traffic.sidecar.istio.io/excludeInboundPorts: “2888,3888”
traffic.sidecar.istio.io/excludeOutboundPorts: “2888,3888”

3.) Update kafka to use the zookeeper standard service rather than headless:
sed -E -i.bak ‘s/cp-zookeeper-headless:2181/cp-zookeeper:2181/g’ kube/kafka-core.yaml

4.) Add service accounts for kafka and zookeeper (if you wish to use RBAC)

Im hoping to put this up on github as OSS just need to go through the internal approvals to do so.

1 Like

HI
I am also trying to get kafka and zookeeper work with mtls using istio peerauthentication set to STRICT mode at namespace level.
I got zookeeper working using port leader and follower exclusion and also adding quorumListenAllIps set to true.
Kafka pods are able to connect zookeeper and start. However one of the pod continuously throwing error as below. I checked tcpdump of kafka pods on 9092 port number, I see plain text traffic.
if I connect any application such as microservice to kafka, it throws error leader not available…
So any help or clue is appreciated. thanks

a.io.IOException: Connection to 1001 was disconnected before the response was read
at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
at kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:99)
at kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:192)
at kafka.server.AbstractFetcherThread.kafka$server$AbstractFetcherThread$$processFetchRequest(AbstractFetcherThread.scala:274)
at kafka.server.AbstractFetcherThread$$anonfun$maybeFetch$1.apply(AbstractFetcherThread.scala:132)
at kafka.server.AbstractFetcherThread$$anonfun$maybeFetch$1.apply(AbstractFetcherThread.scala:131)
at scala.Option.foreach(Option.scala:257)
at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:131)
at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
[2020-08-24 17:45:50,043] INFO [ReplicaFetcher replicaId=1002, leaderId=1001, fetcherId=0] Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 1001: java.io.IOException: Connection to 1001 was disconnected before the response was read. (org.apache.kafka.clients.FetchSessionHandler)

I have successfully gotten the envoy kafka_broker filter to produce the extra stats. These are the steps I used to get it working:

  1. Build a version of the proxies that include the filters. By adding
  1. Usie an EnvoyFilter more or less the same as shown in the start of this issue.
  2. Including the following two annotations in my pods metadata:
    sidecar.istio.io/statsInclusionPrefixes: broker
    sidecar.istio.io/extraStatTags: kafka