Hello LUA filter gurus,
On Istio 1.5.7, I have some trouble with my LUA HTTP filter. The issue is that, although the filter does not mutate anything and only makes outbound HTTP calls, my Spring boot GRPC servers receive some requests that have unexpected extra data on the request. The following comment explains the Java that errors out when the LUA filter is in place (https://github.com/grpc/grpc-java/blob/fbc48a86fa155c4149496a0df96f7bbc574bba66/stub/src/main/java/io/grpc/stub/ServerCalls.java#L131):
// We expect only 1 request, but we ask for 2 requests here so that if a misbehaving client sends more than 1 requests, ServerCall will catch it.
The code is seeing more than 1 GRPC request and is hitting the code that errors out with Status.INTERNAL.withDescription(TOO_MANY_REQUESTS)
.
I can only imagine that I’m violating the memory model somehow, because the filter does not mutate anything. The error occurs sometimes, not always.
I’d greatly appreciate any help getting this filter working.
Istio 1.5.7 (Envoy version: fb8723af8b0ffa67a3967f984bdb3a16b28ddd73/1.13.1-dev/Clean/RELEASE/BoringSSL)
LUA:
sampling_interval_sec = 60
min_sampling_interval_sec = 60
sampling_interval_refresh_sec = 300
last_sampled_ts = 0
last_sample_refreshed_ts = 0
thread_id = 0
max_payload_size = 10000
weight = 0
function x42_log(handle, data)
handle:logDebug("Thread " .. thread_id .. ": " .. data)
end
function bufferToString(handle)
-- return a tuple, where the first item is a base64 encoding of the bytes in buffer (after truncating
-- the buffer to max_payload_size), and the second item is a boolean denoting whether buffer was
-- truncated.
x42_log(handle, 'Fetching bytes from body.')
local bytes_received_len = 0
local chunks = {}
local truncated = false
local buffer = 0
for chunk in handle:bodyChunks() do
local chunk_len = chunk:length()
x42_log(handle, 'Got chunk of size ' .. tostring(chunk_len))
if bytes_received_len + chunk_len > max_payload_size then
truncated = true
chunk_len = max_payload_size - bytes_received_len
table.insert(chunks, chunk:getBytes(0, chunk_len))
break
end
table.insert(chunks, chunk:getBytes(0, chunk_len))
bytes_received_len = bytes_received_len + chunk_len
end
local bytes_received = table.concat(chunks, "")
x42_log(handle, 'Finished fetching bytes.')
local encstr = base64.encode(bytes_received)
x42_log(handle, 'Finished base64 encoding.')
return encstr, truncated
end
function printDict(object, handle, prefix)
-- For debugging
if object == nil then
return
end
for key, value in pairs(object) do
if type(value) == 'table' then
printDict(value, handle, prefix .. key .. ' -> ')
else
x42_log(handle, prefix .. key .. ' -> ' .. tostring(value))
end
end
end
function refresh_sampling_interval(source, request_handle)
-- make an http call to the collector endpoint to refresh the sampling interval.
-- The sampling_interval_refresh_sec variable controls how frequently we make this
-- call.
local cur_ts = os.time()
if cur_ts - last_sample_refreshed_ts < sampling_interval_refresh_sec then
x42_log(request_handle, "refresh_sampling_interval=False")
return
end
last_sample_refreshed_ts = cur_ts
x42_log(request_handle, "refresh_sampling_interval=True")
x42_log(request_handle, "calling collector " .. " source=" .. source)
local headers, body = request_handle:httpCall(
"outbound|8080||collector.x42.svc.cluster.local",
{
[":method"] = "GET",
[":path"] = "/sample_interval/" .. source,
[":authority"] = "collector.x42.svc.cluster.local",
["x-x42-version"] = "v1"
},
nil,
50)
if headers[":status"] >= '200' and headers[":status"] < '300' then
sampling_interval_sec = math.max(min_sampling_interval_sec, tonumber(body))
end
end
function sample_request(handle)
-- determine if we should sample the given request.
cur_ts = os.time()
x42_log(handle, "cur_ts=" .. cur_ts .. " last_sampled_ts=" .. last_sampled_ts .. " sampling_interval_sec=" .. sampling_interval_sec)
if cur_ts - last_sampled_ts > sampling_interval_sec then
last_sampled_ts = cur_ts
return true
end
return false
end
function envoy_on_response(response_handle)
-- The envoy_on_request function sets some dynamic metadata on this stream. The first
-- bit we check is an attribute "x42.sampled". If this is set to False, we are not
-- sampling this call.
local request_info = response_handle:streamInfo():dynamicMetadata():get("envoy.lua")
if not request_info["x42.sampled"] then
return
end
local response_body, response_truncated = bufferToString(response_handle)
local status = response_handle:headers():get(':status')
local source = request_info["x42.source"]
local dest = request_info["x42.dest"]
local endpoint = request_info["x42.endpoint"]
local method = request_info["x42.method"]
local request_body = request_info["x42.request_body"]
local request_truncated = request_info["x42.request_truncated"]
local request_time = request_info["x42.request_time"]
local request_clock_time = request_info["x42.request_clock_time"]
local trace_id = request_info["x42.trace_id"]
local parent_id = request_info["x42.parent_id"]
local envoy_internal = request_info["x42.envoy_internal"]
local duration = os.clock() - request_clock_time
local data = json.encode({
source=source,
dest=dest,
endpoint=endpoint,
method=method,
status=status,
request_body=request_body,
request_truncated=request_truncated,
response_body=response_body,
response_truncated=response_truncated,
encoding='base64',
request_time_sec=request_time,
duration_sec=duration,
trace_id=trace_id,
parent_id=parent_id,
envoy_internal=envoy_internal,
weight=weight
})
x42_log(response_handle, "LUA sending " .. data)
local headers, body = response_handle:httpCall(
"outbound|8080||collector.x42.svc.cluster.local",
{
[":method"] = "POST",
[":path"] = "/1.0/envoy/traces",
[":authority"] = "collector.x42.svc.cluster.local",
["x-x42-version"] = "v1"
},
data,
50,
true)
weight = 0
printDict(headers, response_handle, "COLLECTOR RESPONSE HEADER")
x42_log(response_handle, "COLLECTOR RESPONSE BODY:" .. tostring(body))
x42_log(response_handle, "LUA RESPONSE BODY:" .. response_body)
printDict(response_handle:headers(), response_handle, "LUA RESPONSE HEADER ")
printDict(response_handle:metadata(), response_handle, "LUA RESPONSE METADATA ")
printDict(response_handle:streamInfo():dynamicMetadata(), response_handle, "LUA RESPONSE STREAMINFO ")
end
function envoy_on_request(request_handle)
if thread_id == 0 then
-- lua uses the same seed by default, so all threads will get the same id unless the seed is randomized.
math.randomseed(os.clock()*100000000000)
thread_id = math.random(1, 1000000)
-- Add a random wait for each thread to avoid thundering herds
random_initial_wait_sec = math.random(0, sampling_interval_sec)
last_sampled_ts = os.time() - sampling_interval_sec + random_initial_wait_sec
x42_log(request_handle, "new thread started.")
x42_log(request_handle, "sampling_interval_sec: " .. sampling_interval_sec)
x42_log(request_handle, "sampling_interval_refresh_sec: " .. sampling_interval_refresh_sec)
x42_log(request_handle, "random_initial_wait_sec: " .. random_initial_wait_sec)
end
weight = weight + 1
if not sample_request(request_handle) then
request_handle:streamInfo():dynamicMetadata():set("envoy.lua", "x42.sampled", false)
x42_log(request_handle, "sample=False.")
return
end
x42_log(request_handle, "sample=True.")
local clock_time = os.clock()
local real_time = os.time()
local body, truncated = bufferToString(request_handle)
printDict(request_handle:headers(), request_handle, "LUA REQUEST HEADER ")
printDict(request_handle:streamInfo():dynamicMetadata(), request_handle, "LUA REQUEST STREAMINFO ")
printDict(request_handle:metadata(), request_handle, "LUA REQUEST METADATA ")
local source = request_handle:headers():get('x-envoy-downstream-service-cluster')
local dest = request_handle:headers():get(':authority')
local endpoint = request_handle:headers():get(':path')
local method = request_handle:headers():get(':method')
local envoy_internal = request_handle:headers():get('x-envoy-internal')
request_handle:streamInfo():dynamicMetadata():set("envoy.lua", "x42.sampled", true)
request_handle:streamInfo():dynamicMetadata():set("envoy.lua", "x42.source", source)
request_handle:streamInfo():dynamicMetadata():set("envoy.lua", "x42.dest", dest)
request_handle:streamInfo():dynamicMetadata():set("envoy.lua", "x42.endpoint", endpoint)
request_handle:streamInfo():dynamicMetadata():set("envoy.lua", "x42.method", method)
request_handle:streamInfo():dynamicMetadata():set("envoy.lua", "x42.request_body", body)
request_handle:streamInfo():dynamicMetadata():set("envoy.lua", "x42.request_truncated", truncated)
request_handle:streamInfo():dynamicMetadata():set("envoy.lua", "x42.request_time", real_time)
request_handle:streamInfo():dynamicMetadata():set("envoy.lua", "x42.request_clock_time", clock_time)
request_handle:streamInfo():dynamicMetadata():set("envoy.lua", "x42.envoy_internal", envoy_internal)
refresh_sampling_interval(source, request_handle)
end