LUA read-only filter changes GRPC request upstream

Hello LUA filter gurus,

On Istio 1.5.7, I have some trouble with my LUA HTTP filter. The issue is that, although the filter does not mutate anything and only makes outbound HTTP calls, my Spring boot GRPC servers receive some requests that have unexpected extra data on the request. The following comment explains the Java that errors out when the LUA filter is in place (https://github.com/grpc/grpc-java/blob/fbc48a86fa155c4149496a0df96f7bbc574bba66/stub/src/main/java/io/grpc/stub/ServerCalls.java#L131):

// We expect only 1 request, but we ask for 2 requests here so that if a misbehaving client sends more than 1 requests, ServerCall will catch it.

The code is seeing more than 1 GRPC request and is hitting the code that errors out with Status.INTERNAL.withDescription(TOO_MANY_REQUESTS).

I can only imagine that I’m violating the memory model somehow, because the filter does not mutate anything. The error occurs sometimes, not always.

I’d greatly appreciate any help getting this filter working.

Istio 1.5.7 (Envoy version: fb8723af8b0ffa67a3967f984bdb3a16b28ddd73/1.13.1-dev/Clean/RELEASE/BoringSSL)

LUA:

sampling_interval_sec = 60
min_sampling_interval_sec = 60
sampling_interval_refresh_sec = 300
last_sampled_ts = 0
last_sample_refreshed_ts = 0

thread_id = 0
max_payload_size = 10000
weight = 0


function x42_log(handle, data)
    handle:logDebug("Thread " .. thread_id .. ": " .. data)
end

function bufferToString(handle)
  -- return a tuple, where the first item is a base64 encoding of the bytes in buffer (after truncating
  -- the buffer to max_payload_size), and the second item is a boolean denoting whether buffer was
  -- truncated.
  x42_log(handle, 'Fetching bytes from body.')

  local bytes_received_len = 0
  local chunks = {}
  local truncated = false
  local buffer = 0
  for chunk in handle:bodyChunks() do
    local chunk_len = chunk:length()
    x42_log(handle, 'Got chunk of size ' .. tostring(chunk_len))
    if bytes_received_len + chunk_len > max_payload_size then
      truncated = true
      chunk_len = max_payload_size - bytes_received_len
      table.insert(chunks, chunk:getBytes(0, chunk_len))
      break
    end
    table.insert(chunks, chunk:getBytes(0, chunk_len))
    bytes_received_len = bytes_received_len + chunk_len
  end
  local bytes_received = table.concat(chunks, "")
  x42_log(handle, 'Finished fetching bytes.')
  local encstr = base64.encode(bytes_received)
  x42_log(handle, 'Finished base64 encoding.')
  return encstr, truncated
end

function printDict(object, handle, prefix)
  -- For debugging
  if object == nil then
    return
  end
  for key, value in pairs(object) do
    if type(value) == 'table' then
      printDict(value, handle, prefix .. key .. ' -> ')
    else
      x42_log(handle, prefix .. key .. ' -> ' .. tostring(value))
    end
  end
end

function refresh_sampling_interval(source, request_handle)
  -- make an http call to the collector endpoint to refresh the sampling interval.
  -- The sampling_interval_refresh_sec variable controls how frequently we make this
  -- call.
  local cur_ts = os.time()
  if cur_ts - last_sample_refreshed_ts < sampling_interval_refresh_sec then
    x42_log(request_handle, "refresh_sampling_interval=False")
    return
  end
  last_sample_refreshed_ts = cur_ts
  x42_log(request_handle, "refresh_sampling_interval=True")
  x42_log(request_handle, "calling collector " .. " source=" .. source)
  local headers, body = request_handle:httpCall(
  "outbound|8080||collector.x42.svc.cluster.local",
  {
    [":method"] = "GET",
    [":path"] = "/sample_interval/" .. source,
    [":authority"] = "collector.x42.svc.cluster.local",
    ["x-x42-version"] = "v1"
  },
  nil,
  50)
  if headers[":status"] >= '200' and headers[":status"] < '300' then
    sampling_interval_sec = math.max(min_sampling_interval_sec, tonumber(body))
  end
end

function sample_request(handle)
  -- determine if we should sample the given request.
  cur_ts = os.time()
  x42_log(handle, "cur_ts=" .. cur_ts .. " last_sampled_ts=" .. last_sampled_ts .. " sampling_interval_sec=" .. sampling_interval_sec)
  if cur_ts - last_sampled_ts > sampling_interval_sec then
    last_sampled_ts = cur_ts
    return true
  end
  return false
end

function envoy_on_response(response_handle)
  -- The envoy_on_request function sets some dynamic metadata on this stream. The first
  -- bit we check is an attribute "x42.sampled". If this is set to False, we are not
  -- sampling this call.
  local request_info = response_handle:streamInfo():dynamicMetadata():get("envoy.lua")
  if not request_info["x42.sampled"] then
    return
  end

  local response_body, response_truncated = bufferToString(response_handle)
  local status = response_handle:headers():get(':status')

  local source = request_info["x42.source"]
  local dest = request_info["x42.dest"]
  local endpoint = request_info["x42.endpoint"]
  local method = request_info["x42.method"]
  local request_body = request_info["x42.request_body"]
  local request_truncated = request_info["x42.request_truncated"]
  local request_time = request_info["x42.request_time"]
  local request_clock_time = request_info["x42.request_clock_time"]
  local trace_id = request_info["x42.trace_id"]
  local parent_id = request_info["x42.parent_id"]
  local envoy_internal = request_info["x42.envoy_internal"]

  local duration = os.clock() - request_clock_time

  local data = json.encode({
    source=source,
    dest=dest,
    endpoint=endpoint,
    method=method,
    status=status,
    request_body=request_body,
    request_truncated=request_truncated,
    response_body=response_body,
    response_truncated=response_truncated,
    encoding='base64',
    request_time_sec=request_time,
    duration_sec=duration,
    trace_id=trace_id,
    parent_id=parent_id,
    envoy_internal=envoy_internal,
    weight=weight
  })
  x42_log(response_handle, "LUA sending " .. data)
  local headers, body = response_handle:httpCall(
  "outbound|8080||collector.x42.svc.cluster.local",
  {
    [":method"] = "POST",
    [":path"] = "/1.0/envoy/traces",
    [":authority"] = "collector.x42.svc.cluster.local",
    ["x-x42-version"] = "v1"
  },
  data,
  50,
  true)
  weight = 0
  printDict(headers, response_handle, "COLLECTOR RESPONSE HEADER")
  x42_log(response_handle, "COLLECTOR RESPONSE BODY:" .. tostring(body))
  x42_log(response_handle, "LUA RESPONSE BODY:" .. response_body)
  printDict(response_handle:headers(), response_handle, "LUA RESPONSE HEADER ")
  printDict(response_handle:metadata(), response_handle, "LUA RESPONSE METADATA ")
  printDict(response_handle:streamInfo():dynamicMetadata(), response_handle, "LUA RESPONSE STREAMINFO ")
end

function envoy_on_request(request_handle)
  if thread_id == 0 then
    -- lua uses the same seed by default, so all threads will get the same id unless the seed is randomized.
    math.randomseed(os.clock()*100000000000)
    thread_id = math.random(1, 1000000)
    -- Add a random wait for each thread to avoid thundering herds
    random_initial_wait_sec = math.random(0, sampling_interval_sec)
    last_sampled_ts = os.time() - sampling_interval_sec + random_initial_wait_sec
    x42_log(request_handle, "new thread started.")
    x42_log(request_handle, "sampling_interval_sec: " .. sampling_interval_sec)
    x42_log(request_handle, "sampling_interval_refresh_sec: " .. sampling_interval_refresh_sec)
    x42_log(request_handle, "random_initial_wait_sec: " .. random_initial_wait_sec)
  end
  weight = weight + 1
  if not sample_request(request_handle) then
    request_handle:streamInfo():dynamicMetadata():set("envoy.lua", "x42.sampled", false)
    x42_log(request_handle, "sample=False.")
    return
  end
  x42_log(request_handle, "sample=True.")
  local clock_time = os.clock()
  local real_time = os.time()
  local body, truncated = bufferToString(request_handle)

  printDict(request_handle:headers(), request_handle, "LUA REQUEST HEADER ")
  printDict(request_handle:streamInfo():dynamicMetadata(), request_handle, "LUA REQUEST STREAMINFO ")
  printDict(request_handle:metadata(), request_handle, "LUA REQUEST METADATA ")

  local source = request_handle:headers():get('x-envoy-downstream-service-cluster')
  local dest = request_handle:headers():get(':authority')
  local endpoint = request_handle:headers():get(':path')
  local method = request_handle:headers():get(':method')
  local envoy_internal = request_handle:headers():get('x-envoy-internal')

  request_handle:streamInfo():dynamicMetadata():set("envoy.lua", "x42.sampled", true)
  request_handle:streamInfo():dynamicMetadata():set("envoy.lua", "x42.source", source)
  request_handle:streamInfo():dynamicMetadata():set("envoy.lua", "x42.dest", dest)
  request_handle:streamInfo():dynamicMetadata():set("envoy.lua", "x42.endpoint", endpoint)
  request_handle:streamInfo():dynamicMetadata():set("envoy.lua", "x42.method", method)
  request_handle:streamInfo():dynamicMetadata():set("envoy.lua", "x42.request_body", body)
  request_handle:streamInfo():dynamicMetadata():set("envoy.lua", "x42.request_truncated", truncated)
  request_handle:streamInfo():dynamicMetadata():set("envoy.lua", "x42.request_time", real_time)
  request_handle:streamInfo():dynamicMetadata():set("envoy.lua", "x42.request_clock_time", clock_time)
  request_handle:streamInfo():dynamicMetadata():set("envoy.lua", "x42.envoy_internal", envoy_internal)

  refresh_sampling_interval(source, request_handle)
end