fix: correctly pass headers in mcp stdio connections (#11804)
* fix: parse dicts from tweaks (#11753) * Correctly parse dicts from tweaks * Add test * [autofix.ci] apply automated fixes --------- Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> * fix: dict tweak parsing (#11756) * Fix dict handling of different formats * [autofix.ci] apply automated fixes * [autofix.ci] apply automated fixes (attempt 2/3) * [autofix.ci] apply automated fixes (attempt 3/3) * cmp index * [autofix.ci] apply automated fixes --------- Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> * fix: correctly pass headers in mcp stdio connections (#11746) * Update positional arg logic * Add positiona logic tests * ruff * [autofix.ci] apply automated fixes * add check for tweaking code, add check or mcp field type --------- Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
This commit is contained in:
@@ -158,6 +158,15 @@ def apply_tweaks(node: dict[str, Any], node_tweaks: dict[str, Any]) -> None:
|
||||
elif field_type == "mcp":
|
||||
# MCP fields expect dict values to be set directly
|
||||
template_data[tweak_name]["value"] = tweak_value
|
||||
elif field_type == "dict" and isinstance(tweak_value, dict):
|
||||
# Dict fields: set the dict directly as the value.
|
||||
# If the tweak is wrapped in {"value": <actual>}, unwrap it
|
||||
# to support the template-format style (e.g. from UI exports).
|
||||
# Caveat: a legitimate single-key dict {"value": x} will be unwrapped.
|
||||
if len(tweak_value) == 1 and "value" in tweak_value:
|
||||
template_data[tweak_name]["value"] = tweak_value["value"]
|
||||
else:
|
||||
template_data[tweak_name]["value"] = tweak_value
|
||||
elif isinstance(tweak_value, dict):
|
||||
for k, v in tweak_value.items():
|
||||
k_ = "file_path" if field_type == "file" else k
|
||||
|
||||
@@ -19,6 +19,7 @@ from lfx.base.mcp.util import (
|
||||
MCPStdioClient,
|
||||
MCPStreamableHttpClient,
|
||||
_process_headers,
|
||||
update_tools,
|
||||
validate_headers,
|
||||
)
|
||||
|
||||
@@ -379,6 +380,209 @@ class TestStreamableHTTPHeaderIntegration:
|
||||
assert streamable_http_client._connection_params["headers"] == test_headers
|
||||
|
||||
|
||||
class TestUpdateToolsStdioHeaders:
|
||||
"""Test that update_tools injects component headers into stdio args."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_stdio_headers_injected_with_existing_headers_flag(self):
|
||||
"""Headers should be injected as --headers key value before the existing --headers flag."""
|
||||
mock_stdio = AsyncMock(spec=MCPStdioClient)
|
||||
mock_stdio.connect_to_server.return_value = []
|
||||
mock_stdio._connected = True
|
||||
|
||||
server_config = {
|
||||
"command": "uvx",
|
||||
"args": [
|
||||
"mcp-proxy",
|
||||
"--transport",
|
||||
"streamablehttp",
|
||||
"--headers",
|
||||
"x-api-key",
|
||||
"sk-existing",
|
||||
"http://localhost:7860/api/v1/mcp/project/test/streamable",
|
||||
],
|
||||
"headers": {"Authorization": "Bearer token123"},
|
||||
}
|
||||
|
||||
await update_tools("test-server", server_config, mcp_stdio_client=mock_stdio)
|
||||
|
||||
mock_stdio.connect_to_server.assert_called_once()
|
||||
full_command = mock_stdio.connect_to_server.call_args[0][0]
|
||||
|
||||
# The injected --headers should appear before the existing --headers
|
||||
assert "--headers authorization 'Bearer token123' --headers x-api-key sk-existing" in full_command
|
||||
# URL should still be at the end
|
||||
assert full_command.endswith("http://localhost:7860/api/v1/mcp/project/test/streamable")
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_stdio_headers_injected_without_existing_headers_flag(self):
|
||||
"""When no --headers flag exists, headers should be inserted before the last positional arg."""
|
||||
mock_stdio = AsyncMock(spec=MCPStdioClient)
|
||||
mock_stdio.connect_to_server.return_value = []
|
||||
mock_stdio._connected = True
|
||||
|
||||
server_config = {
|
||||
"command": "uvx",
|
||||
"args": [
|
||||
"mcp-proxy",
|
||||
"--transport",
|
||||
"streamablehttp",
|
||||
"http://localhost:7860/streamable",
|
||||
],
|
||||
"headers": {"X-Api-Key": "my-key"},
|
||||
}
|
||||
|
||||
await update_tools("test-server", server_config, mcp_stdio_client=mock_stdio)
|
||||
|
||||
full_command = mock_stdio.connect_to_server.call_args[0][0]
|
||||
|
||||
# --headers should be inserted before the URL
|
||||
assert "--headers x-api-key my-key http://localhost:7860/streamable" in full_command
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_stdio_multiple_headers_each_get_own_flag(self):
|
||||
"""Each header should get its own --headers key value triplet."""
|
||||
mock_stdio = AsyncMock(spec=MCPStdioClient)
|
||||
mock_stdio.connect_to_server.return_value = []
|
||||
mock_stdio._connected = True
|
||||
|
||||
server_config = {
|
||||
"command": "uvx",
|
||||
"args": [
|
||||
"mcp-proxy",
|
||||
"--transport",
|
||||
"streamablehttp",
|
||||
"--headers",
|
||||
"x-api-key",
|
||||
"sk-existing",
|
||||
"http://localhost/streamable",
|
||||
],
|
||||
"headers": {"X-Custom-One": "val1", "X-Custom-Two": "val2"},
|
||||
}
|
||||
|
||||
await update_tools("test-server", server_config, mcp_stdio_client=mock_stdio)
|
||||
|
||||
full_command = mock_stdio.connect_to_server.call_args[0][0]
|
||||
|
||||
# Each header gets its own --headers flag
|
||||
assert "--headers x-custom-one val1" in full_command
|
||||
assert "--headers x-custom-two val2" in full_command
|
||||
# Original header still present
|
||||
assert "--headers x-api-key sk-existing" in full_command
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_stdio_no_headers_leaves_args_unchanged(self):
|
||||
"""When no component headers are set, args should not be modified."""
|
||||
mock_stdio = AsyncMock(spec=MCPStdioClient)
|
||||
mock_stdio.connect_to_server.return_value = []
|
||||
mock_stdio._connected = True
|
||||
|
||||
server_config = {
|
||||
"command": "uvx",
|
||||
"args": ["mcp-proxy", "--transport", "streamablehttp", "http://localhost/streamable"],
|
||||
}
|
||||
|
||||
await update_tools("test-server", server_config, mcp_stdio_client=mock_stdio)
|
||||
|
||||
full_command = mock_stdio.connect_to_server.call_args[0][0]
|
||||
assert full_command == "uvx mcp-proxy --transport streamablehttp http://localhost/streamable"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_stdio_headers_appended_when_all_args_are_flags(self):
|
||||
"""When all args are flags (no positional URL), headers should be appended."""
|
||||
mock_stdio = AsyncMock(spec=MCPStdioClient)
|
||||
mock_stdio.connect_to_server.return_value = []
|
||||
mock_stdio._connected = True
|
||||
|
||||
server_config = {
|
||||
"command": "some-tool",
|
||||
"args": ["--verbose", "--debug"],
|
||||
"headers": {"Authorization": "Bearer tok"},
|
||||
}
|
||||
|
||||
await update_tools("test-server", server_config, mcp_stdio_client=mock_stdio)
|
||||
|
||||
full_command = mock_stdio.connect_to_server.call_args[0][0]
|
||||
assert full_command == "some-tool --verbose --debug --headers authorization 'Bearer tok'"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_stdio_headers_appended_when_last_token_is_flag_value(self):
|
||||
"""When the last token is a flag's value, headers should be appended, not inserted before it."""
|
||||
mock_stdio = AsyncMock(spec=MCPStdioClient)
|
||||
mock_stdio.connect_to_server.return_value = []
|
||||
mock_stdio._connected = True
|
||||
|
||||
server_config = {
|
||||
"command": "some-tool",
|
||||
"args": ["--port", "8080"],
|
||||
"headers": {"Authorization": "Bearer tok"},
|
||||
}
|
||||
|
||||
await update_tools("test-server", server_config, mcp_stdio_client=mock_stdio)
|
||||
|
||||
full_command = mock_stdio.connect_to_server.call_args[0][0]
|
||||
# 8080 is a value for --port, not a positional arg, so headers go at the end
|
||||
assert full_command == "some-tool --port 8080 --headers authorization 'Bearer tok'"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_stdio_headers_inserted_before_positional_with_flag_value_pairs(self):
|
||||
"""Headers should be inserted before the last positional arg even when flag+value pairs precede it."""
|
||||
mock_stdio = AsyncMock(spec=MCPStdioClient)
|
||||
mock_stdio.connect_to_server.return_value = []
|
||||
mock_stdio._connected = True
|
||||
|
||||
server_config = {
|
||||
"command": "uvx",
|
||||
"args": ["mcp-proxy", "--port", "8080", "http://localhost/streamable"],
|
||||
"headers": {"X-Api-Key": "my-key"},
|
||||
}
|
||||
|
||||
await update_tools("test-server", server_config, mcp_stdio_client=mock_stdio)
|
||||
|
||||
full_command = mock_stdio.connect_to_server.call_args[0][0]
|
||||
# --port 8080 is a flag pair; http://localhost/streamable is the positional arg
|
||||
assert full_command == "uvx mcp-proxy --port 8080 --headers x-api-key my-key http://localhost/streamable"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_stdio_headers_inserted_before_last_positional_with_multiple_positionals(self):
|
||||
"""When multiple positional args exist, headers are inserted before the last one."""
|
||||
mock_stdio = AsyncMock(spec=MCPStdioClient)
|
||||
mock_stdio.connect_to_server.return_value = []
|
||||
mock_stdio._connected = True
|
||||
|
||||
server_config = {
|
||||
"command": "uvx",
|
||||
"args": ["mcp-proxy", "--transport", "streamablehttp", "extra-pos-arg", "http://localhost/streamable"],
|
||||
"headers": {"X-Key": "val"},
|
||||
}
|
||||
|
||||
await update_tools("test-server", server_config, mcp_stdio_client=mock_stdio)
|
||||
|
||||
full_command = mock_stdio.connect_to_server.call_args[0][0]
|
||||
# Should insert before the last positional (the URL), not before "extra-pos-arg"
|
||||
assert "--headers x-key val http://localhost/streamable" in full_command
|
||||
assert "extra-pos-arg --headers" in full_command
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_stdio_does_not_mutate_original_config(self):
|
||||
"""The original server_config args list should not be mutated."""
|
||||
mock_stdio = AsyncMock(spec=MCPStdioClient)
|
||||
mock_stdio.connect_to_server.return_value = []
|
||||
mock_stdio._connected = True
|
||||
|
||||
original_args = ["mcp-proxy", "--headers", "x-api-key", "sk-orig", "http://localhost/s"]
|
||||
server_config = {
|
||||
"command": "uvx",
|
||||
"args": original_args,
|
||||
"headers": {"X-Extra": "val"},
|
||||
}
|
||||
|
||||
await update_tools("test-server", server_config, mcp_stdio_client=mock_stdio)
|
||||
|
||||
# Original list should be unchanged
|
||||
assert original_args == ["mcp-proxy", "--headers", "x-api-key", "sk-orig", "http://localhost/s"]
|
||||
|
||||
|
||||
class TestFieldNameConversion:
|
||||
"""Test camelCase to snake_case field name conversion functionality."""
|
||||
|
||||
|
||||
@@ -473,3 +473,115 @@ def test_apply_tweaks_field_type_extraction():
|
||||
# Verify both fields were modified
|
||||
assert node["data"]["node"]["template"]["param_no_type"]["value"] == "new_value_1"
|
||||
assert node["data"]["node"]["template"]["param_with_type"]["value"] == "new_value_2"
|
||||
|
||||
|
||||
def test_apply_tweaks_dict_field_type():
|
||||
"""Test that dict field types (e.g. DictInput headers) set the value directly.
|
||||
|
||||
Previously, passing a dict tweak for a 'dict' field type would iterate over
|
||||
the dict keys and set them as top-level template properties instead of setting
|
||||
the field's value. This caused headers passed via tweaks to be ignored.
|
||||
"""
|
||||
from langflow.processing.process import apply_tweaks
|
||||
|
||||
# Create a node with a dict field type (like MCP Tools headers)
|
||||
node = {
|
||||
"id": "MCPTools-322Z0",
|
||||
"data": {
|
||||
"node": {
|
||||
"template": {
|
||||
"headers": {
|
||||
"value": [
|
||||
{"key": "header1", "value": "default1"},
|
||||
{"key": "header2", "value": "default2"},
|
||||
],
|
||||
"type": "dict",
|
||||
},
|
||||
}
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
# Tweak headers with a plain dict (as sent via API tweaks)
|
||||
node_tweaks = {
|
||||
"headers": {"header1": "override1", "header2": "override2", "header3": "new3"},
|
||||
}
|
||||
|
||||
apply_tweaks(node, node_tweaks)
|
||||
|
||||
# Verify the dict was set directly as the value, not spread as template properties
|
||||
assert node["data"]["node"]["template"]["headers"]["value"] == {
|
||||
"header1": "override1",
|
||||
"header2": "override2",
|
||||
"header3": "new3",
|
||||
}
|
||||
# Ensure the tweak keys were NOT set as top-level template field properties
|
||||
assert "header1" not in node["data"]["node"]["template"]["headers"]
|
||||
assert "header2" not in node["data"]["node"]["template"]["headers"]
|
||||
assert "header3" not in node["data"]["node"]["template"]["headers"]
|
||||
|
||||
|
||||
def test_apply_tweaks_dict_field_overwrites_list_default():
|
||||
"""Test that a dict tweak fully replaces a list-format default value on a dict field."""
|
||||
from langflow.processing.process import apply_tweaks
|
||||
|
||||
node = {
|
||||
"id": "node1",
|
||||
"data": {
|
||||
"node": {
|
||||
"template": {
|
||||
"headers": {
|
||||
"value": [{"key": "old", "value": "old_val"}],
|
||||
"type": "dict",
|
||||
},
|
||||
}
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
apply_tweaks(node, {"headers": {"new_key": "new_val"}})
|
||||
|
||||
# The dict tweak should fully replace the old list value
|
||||
assert node["data"]["node"]["template"]["headers"]["value"] == {"new_key": "new_val"}
|
||||
|
||||
|
||||
def test_apply_tweaks_dict_field_value_wrapped_list():
|
||||
"""Test that dict field tweaks wrapped in {"value": [...]} are unwrapped correctly.
|
||||
|
||||
When users pass tweaks in the template-format style (e.g. from UI exports),
|
||||
the list of key-value pairs is wrapped in a "value" key. The tweak should
|
||||
unwrap this and set the inner list as the field's value.
|
||||
"""
|
||||
from langflow.processing.process import apply_tweaks
|
||||
|
||||
node = {
|
||||
"id": "MCPTools-svrRq",
|
||||
"data": {
|
||||
"node": {
|
||||
"template": {
|
||||
"headers": {
|
||||
"value": [],
|
||||
"type": "dict",
|
||||
},
|
||||
}
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
# Tweak using the template-format wrapper: {"value": [list of key-value pairs]}
|
||||
node_tweaks = {
|
||||
"headers": {
|
||||
"value": [
|
||||
{"key": "header1", "value": "gabriel1"},
|
||||
{"key": "header2", "value": "gabriel2"},
|
||||
]
|
||||
},
|
||||
}
|
||||
|
||||
apply_tweaks(node, node_tweaks)
|
||||
|
||||
# The inner list should be unwrapped and set as the field's value
|
||||
assert node["data"]["node"]["template"]["headers"]["value"] == [
|
||||
{"key": "header1", "value": "gabriel1"},
|
||||
{"key": "header2", "value": "gabriel2"},
|
||||
]
|
||||
|
||||
@@ -5,7 +5,9 @@ import json
|
||||
import os
|
||||
import platform
|
||||
import re
|
||||
import shlex
|
||||
import shutil
|
||||
import subprocess
|
||||
import unicodedata
|
||||
from collections.abc import Awaitable, Callable
|
||||
from typing import Any
|
||||
@@ -1056,7 +1058,7 @@ class MCPStdioClient:
|
||||
"""Connect to MCP server using stdio transport (SDK style)."""
|
||||
from mcp import StdioServerParameters
|
||||
|
||||
command = command_str.split(" ")
|
||||
command = shlex.split(command_str)
|
||||
env_data: dict[str, str] = {"DEBUG": "true", "PATH": os.environ["PATH"], **(env or {})}
|
||||
|
||||
if platform.system() == "Windows":
|
||||
@@ -1064,7 +1066,7 @@ class MCPStdioClient:
|
||||
command="cmd",
|
||||
args=[
|
||||
"/c",
|
||||
f"{command[0]} {' '.join(command[1:])} || echo Command failed with exit code %errorlevel% 1>&2",
|
||||
f"{subprocess.list2cmdline(command)} || echo Command failed with exit code %errorlevel% 1>&2",
|
||||
],
|
||||
env=env_data,
|
||||
)
|
||||
@@ -1586,10 +1588,52 @@ async def update_tools(
|
||||
# Determine connection type and parameters
|
||||
client: MCPStdioClient | MCPStreamableHttpClient | None = None
|
||||
if mode == "Stdio":
|
||||
# Stdio connection
|
||||
args = server_config.get("args", [])
|
||||
args = list(server_config.get("args", []))
|
||||
env = server_config.get("env", {})
|
||||
full_command = " ".join([command, *args])
|
||||
# For stdio mode, inject component headers as --headers CLI args.
|
||||
# This enables passing headers through proxy tools like mcp-proxy
|
||||
# that forward them to the upstream HTTP server.
|
||||
if headers:
|
||||
extra_args = []
|
||||
for key, value in headers.items():
|
||||
extra_args.extend(["--headers", key, str(value)])
|
||||
if "--headers" in args:
|
||||
# Insert before the existing --headers flag so all header
|
||||
# flags are grouped together
|
||||
idx = args.index("--headers")
|
||||
for i, arg in enumerate(extra_args):
|
||||
args.insert(idx + i, arg)
|
||||
else:
|
||||
# No existing --headers flag; try to insert before the last
|
||||
# positional arg (typically the URL in mcp-proxy commands).
|
||||
# Scan args to find the last true positional token by skipping
|
||||
# flag+value pairs so we don't mistake a flag's value for a
|
||||
# positional argument (e.g. "--port 8080").
|
||||
last_positional_idx: int | None = None
|
||||
i = 0
|
||||
while i < len(args):
|
||||
if args[i].startswith("-"):
|
||||
# Skip the flag and its value (assumes each flag
|
||||
# takes at most one value argument; boolean flags
|
||||
# are handled correctly since the next token will
|
||||
# start with '-' or be a URL-like positional).
|
||||
i += 1
|
||||
if (
|
||||
i < len(args)
|
||||
and not args[i].startswith("-")
|
||||
and not args[i].startswith("http://")
|
||||
and not args[i].startswith("https://")
|
||||
):
|
||||
i += 1
|
||||
else:
|
||||
last_positional_idx = i
|
||||
i += 1
|
||||
|
||||
if last_positional_idx is not None:
|
||||
args = args[:last_positional_idx] + extra_args + args[last_positional_idx:]
|
||||
else:
|
||||
args.extend(extra_args)
|
||||
full_command = shlex.join([command, *args])
|
||||
tools = await mcp_stdio_client.connect_to_server(full_command, env)
|
||||
client = mcp_stdio_client
|
||||
elif mode in ["Streamable_HTTP", "SSE"]:
|
||||
|
||||
@@ -266,9 +266,24 @@ class ParameterHandler:
|
||||
"""Handle dictionary field type."""
|
||||
match val:
|
||||
case list():
|
||||
params[field_name] = {k: v for item in val for k, v in item.items()}
|
||||
# Convert list of {"key": k, "value": v} pairs to a flat dict.
|
||||
# e.g. [{"key": "h1", "value": "v1"}, {"key": "h2", "value": "v2"}] -> {"h1": "v1", "h2": "v2"}
|
||||
if val and all(isinstance(item, dict) and "key" in item and "value" in item for item in val):
|
||||
params[field_name] = {item["key"]: item["value"] for item in val}
|
||||
else:
|
||||
# Merge generic list of dicts into a single dict.
|
||||
# e.g. [{"a": 1}, {"b": 2}] -> {"a": 1, "b": 2}
|
||||
params[field_name] = {k: v for item in val for k, v in item.items()}
|
||||
case dict():
|
||||
params[field_name] = val
|
||||
case _:
|
||||
logger.warning(
|
||||
"Unexpected type %s for dict field '%s'; expected list or dict, got %r",
|
||||
type(val).__name__,
|
||||
field_name,
|
||||
val,
|
||||
)
|
||||
params[field_name] = val
|
||||
return params
|
||||
|
||||
def _handle_other_direct_types(
|
||||
|
||||
@@ -172,16 +172,32 @@ def apply_tweaks(node: dict[str, Any], node_tweaks: dict[str, Any]) -> None:
|
||||
for tweak_name, tweak_value in node_tweaks.items():
|
||||
if tweak_name not in template_data:
|
||||
continue
|
||||
if tweak_name == "code":
|
||||
logger.warning("Security: Code field cannot be overridden via tweaks.")
|
||||
continue
|
||||
if tweak_name in template_data:
|
||||
if template_data[tweak_name]["type"] == "NestedDict":
|
||||
field_type = template_data[tweak_name].get("type", "")
|
||||
if field_type == "NestedDict":
|
||||
value = validate_and_repair_json(tweak_value)
|
||||
template_data[tweak_name]["value"] = value
|
||||
elif field_type == "mcp":
|
||||
# MCP fields expect dict values to be set directly
|
||||
template_data[tweak_name]["value"] = tweak_value
|
||||
elif field_type == "dict" and isinstance(tweak_value, dict):
|
||||
# Dict fields: set the dict directly as the value.
|
||||
# If the tweak is wrapped in {"value": <actual>}, unwrap it
|
||||
# to support the template-format style (e.g. from UI exports).
|
||||
# Caveat: a legitimate single-key dict {"value": x} will be unwrapped.
|
||||
if len(tweak_value) == 1 and "value" in tweak_value:
|
||||
template_data[tweak_name]["value"] = tweak_value["value"]
|
||||
else:
|
||||
template_data[tweak_name]["value"] = tweak_value
|
||||
elif isinstance(tweak_value, dict):
|
||||
for k, v in tweak_value.items():
|
||||
k_ = "file_path" if template_data[tweak_name]["type"] == "file" else k
|
||||
k_ = "file_path" if field_type == "file" else k
|
||||
template_data[tweak_name][k_] = v
|
||||
else:
|
||||
key = "file_path" if template_data[tweak_name]["type"] == "file" else "value"
|
||||
key = "file_path" if field_type == "file" else "value"
|
||||
template_data[tweak_name][key] = tweak_value
|
||||
|
||||
|
||||
|
||||
@@ -191,3 +191,50 @@ class TestParameterHandlerTableLoadFromDb:
|
||||
|
||||
assert set(load_from_db_columns) == {"col1", "col2"}
|
||||
assert "table:table_field" in self.handler.load_from_db_fields
|
||||
|
||||
|
||||
class TestParameterHandlerDictField:
|
||||
"""Tests for _handle_dict_field in ParameterHandler."""
|
||||
|
||||
def setup_method(self):
|
||||
"""Set up test fixtures."""
|
||||
self.mock_vertex = MagicMock()
|
||||
self.mock_vertex.data = {"node": {"template": {}}}
|
||||
self.handler = ParameterHandler(self.mock_vertex, storage_service=None)
|
||||
|
||||
def test_handle_dict_field_with_key_value_list(self):
|
||||
"""Test that a list of {"key": k, "value": v} pairs is converted to a flat dict."""
|
||||
val = [
|
||||
{"key": "header1", "value": "value1"},
|
||||
{"key": "header2", "value": "value2"},
|
||||
]
|
||||
params = {}
|
||||
result = self.handler._handle_dict_field("headers", val, params)
|
||||
assert result["headers"] == {"header1": "value1", "header2": "value2"}
|
||||
|
||||
def test_handle_dict_field_with_single_key_value_item(self):
|
||||
"""Test a single-item key-value list."""
|
||||
val = [{"key": "auth", "value": "token123"}]
|
||||
params = {}
|
||||
result = self.handler._handle_dict_field("headers", val, params)
|
||||
assert result["headers"] == {"auth": "token123"}
|
||||
|
||||
def test_handle_dict_field_with_flat_dict(self):
|
||||
"""Test that a plain dict is passed through as-is."""
|
||||
val = {"header1": "value1", "header2": "value2"}
|
||||
params = {}
|
||||
result = self.handler._handle_dict_field("headers", val, params)
|
||||
assert result["headers"] == {"header1": "value1", "header2": "value2"}
|
||||
|
||||
def test_handle_dict_field_with_empty_list(self):
|
||||
"""Test that an empty list produces an empty dict."""
|
||||
params = {}
|
||||
result = self.handler._handle_dict_field("headers", [], params)
|
||||
assert result["headers"] == {}
|
||||
|
||||
def test_handle_dict_field_with_generic_list_of_dicts(self):
|
||||
"""Test that a list of dicts without the key/value pattern merges them."""
|
||||
val = [{"a": 1}, {"b": 2}]
|
||||
params = {}
|
||||
result = self.handler._handle_dict_field("data", val, params)
|
||||
assert result["data"] == {"a": 1, "b": 2}
|
||||
|
||||
Reference in New Issue
Block a user