Using Kafka protocol filter with Istio


I’m trying to use the kafka protocol filter ( to get some additional metrics.
I’m new to using envoy filters, so perhaps there’s a couple a things I didn’t fully understand, but I was able to set it up, it kinda produce metrics and my app continues to work, which is already great!
However, all counters are … stuck to 0 :frowning: despite traffic flowing through kafka.

What I’m doing is applying this filter on my client sidecars. Perhaps that’s my mistake and it’s only meant to work on kafka pod itself?

Here’s the EnvoyFilter object I’m using:

kind: EnvoyFilter
  name: kafka-protocol
  namespace: default
    - applyTo: NETWORK_FILTER
        context: SIDECAR_OUTBOUND
          portNumber: 9092
              name: "envoy.tcp_proxy"
        operation: INSERT_BEFORE
          name: ""
            stat_prefix: broker

and in pods, the corresponding annotation has to be set: cluster_manager,listener_manager,broker

Any idea? Someone already managed to use this protocol filter with Istio?


Do someone has any guidance on this topic? I’m interested in running Kafka inside Istio with mTLS. However, I do not find any clear guidance about its feasibility?

I’ve been considering to go for Envoy Filters but your post discouraged me. Did you have any improvements on this issue?

My issue here was about not seeing some additional metrics that I was expecting to see. Other than that, it was working pretty well. Here’s some info about the scenarios that I tried (for testing with Kiali) : (the various issues in kiali mentioned there are fixed). I’ve been using strimzi to deploy kafka, but I guess any other deployment is valid.

In case you give it a try, I’d love to have some feedback!

So it is Strimzi enabling mTLS for Kafka on top of Istio? I will give it a try.

I used the Confluent helm charts to get this working so its certainly possible with other distributions. Some changes i had to make after reading issues elsewhere:

1.) Add service entries for the headless services:

kind: ServiceEntry
    name: zookeeper
        - broker-0.cp-zookeeper-headless.kafka.svc.cluster.local
        - broker-0.cp-zookeeper-headless.kafka.svc.cluster.local
        - broker-0.cp-zookeeper-headless.kafka.svc.cluster.local
        - name: server
           number: 2888
           protocol: TCP
        - name: leader-election
          number: 3888
          protocol: TCP
   location: MESH_INTERNAL
   resolution: NONE

2.) Exclude the Zookeeper internal traffic from MTLS (still to get this working) “2888,3888” “2888,3888”

3.) Update kafka to use the zookeeper standard service rather than headless:
sed -E -i.bak ‘s/cp-zookeeper-headless:2181/cp-zookeeper:2181/g’ kube/kafka-core.yaml

4.) Add service accounts for kafka and zookeeper (if you wish to use RBAC)

Im hoping to put this up on github as OSS just need to go through the internal approvals to do so.

1 Like

I am also trying to get kafka and zookeeper work with mtls using istio peerauthentication set to STRICT mode at namespace level.
I got zookeeper working using port leader and follower exclusion and also adding quorumListenAllIps set to true.
Kafka pods are able to connect zookeeper and start. However one of the pod continuously throwing error as below. I checked tcpdump of kafka pods on 9092 port number, I see plain text traffic.
if I connect any application such as microservice to kafka, it throws error leader not available…
So any help or clue is appreciated. thanks Connection to 1001 was disconnected before the response was read
at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(
at kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:99)
at kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:192)
at kafka.server.AbstractFetcherThread.kafka$server$AbstractFetcherThread$$processFetchRequest(AbstractFetcherThread.scala:274)
at kafka.server.AbstractFetcherThread$$anonfun$maybeFetch$1.apply(AbstractFetcherThread.scala:132)
at kafka.server.AbstractFetcherThread$$anonfun$maybeFetch$1.apply(AbstractFetcherThread.scala:131)
at scala.Option.foreach(Option.scala:257)
at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:131)
at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
[2020-08-24 17:45:50,043] INFO [ReplicaFetcher replicaId=1002, leaderId=1001, fetcherId=0] Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 1001: Connection to 1001 was disconnected before the response was read. (org.apache.kafka.clients.FetchSessionHandler)