feat: add anthropic /messages endpoint
This commit is contained in:
364
mlx-server/Sources/MLXServer/AnthropicTypes.swift
Normal file
364
mlx-server/Sources/MLXServer/AnthropicTypes.swift
Normal file
@@ -0,0 +1,364 @@
|
||||
import Foundation
|
||||
|
||||
// MARK: - Anthropic API Request
|
||||
|
||||
struct AnthropicRequest: Codable {
|
||||
let model: String
|
||||
let messages: [AnthropicMessage]
|
||||
let max_tokens: Int
|
||||
var system: AnthropicSystem?
|
||||
var temperature: Float?
|
||||
var top_p: Float?
|
||||
var top_k: Int?
|
||||
var stream: Bool?
|
||||
var stop_sequences: [String]?
|
||||
var tools: [AnyCodable]?
|
||||
}
|
||||
|
||||
// MARK: - System Prompt (string or array of text blocks)
|
||||
|
||||
enum AnthropicSystem: Codable {
|
||||
case text(String)
|
||||
case blocks([AnthropicTextBlock])
|
||||
|
||||
init(from decoder: Decoder) throws {
|
||||
let c = try decoder.singleValueContainer()
|
||||
if let s = try? c.decode(String.self) { self = .text(s); return }
|
||||
if let b = try? c.decode([AnthropicTextBlock].self) { self = .blocks(b); return }
|
||||
throw DecodingError.dataCorrupted(
|
||||
.init(codingPath: decoder.codingPath, debugDescription: "system must be string or array"))
|
||||
}
|
||||
|
||||
func encode(to encoder: Encoder) throws {
|
||||
var c = encoder.singleValueContainer()
|
||||
switch self {
|
||||
case .text(let s): try c.encode(s)
|
||||
case .blocks(let b): try c.encode(b)
|
||||
}
|
||||
}
|
||||
|
||||
var text: String {
|
||||
switch self {
|
||||
case .text(let s): return s
|
||||
case .blocks(let b): return b.map(\.text).joined(separator: "\n")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct AnthropicTextBlock: Codable {
|
||||
let type: String
|
||||
let text: String
|
||||
}
|
||||
|
||||
// MARK: - Messages
|
||||
|
||||
struct AnthropicMessage: Codable {
|
||||
let role: String
|
||||
let content: AnthropicMessageContent
|
||||
}
|
||||
|
||||
enum AnthropicMessageContent: Codable {
|
||||
case text(String)
|
||||
case blocks([AnthropicContentBlock])
|
||||
|
||||
init(from decoder: Decoder) throws {
|
||||
let c = try decoder.singleValueContainer()
|
||||
if let s = try? c.decode(String.self) { self = .text(s); return }
|
||||
if let b = try? c.decode([AnthropicContentBlock].self) { self = .blocks(b); return }
|
||||
throw DecodingError.dataCorrupted(
|
||||
.init(codingPath: decoder.codingPath, debugDescription: "content must be string or array"))
|
||||
}
|
||||
|
||||
func encode(to encoder: Encoder) throws {
|
||||
var c = encoder.singleValueContainer()
|
||||
switch self {
|
||||
case .text(let s): try c.encode(s)
|
||||
case .blocks(let b): try c.encode(b)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// MARK: - Content Blocks
|
||||
|
||||
enum AnthropicContentBlock: Codable {
|
||||
case text(AnthropicTextContent)
|
||||
case image(AnthropicImageContent)
|
||||
case toolUse(AnthropicToolUseContent)
|
||||
case toolResult(AnthropicToolResultContent)
|
||||
case unknown
|
||||
|
||||
private enum TypeKey: String, CodingKey { case type }
|
||||
|
||||
init(from decoder: Decoder) throws {
|
||||
let kc = try decoder.container(keyedBy: TypeKey.self)
|
||||
let type = try kc.decode(String.self, forKey: .type)
|
||||
switch type {
|
||||
case "text": self = .text(try AnthropicTextContent(from: decoder))
|
||||
case "image": self = .image(try AnthropicImageContent(from: decoder))
|
||||
case "tool_use": self = .toolUse(try AnthropicToolUseContent(from: decoder))
|
||||
case "tool_result": self = .toolResult(try AnthropicToolResultContent(from: decoder))
|
||||
default: self = .unknown
|
||||
}
|
||||
}
|
||||
|
||||
func encode(to encoder: Encoder) throws {
|
||||
switch self {
|
||||
case .text(let b): try b.encode(to: encoder)
|
||||
case .image(let b): try b.encode(to: encoder)
|
||||
case .toolUse(let b): try b.encode(to: encoder)
|
||||
case .toolResult(let b): try b.encode(to: encoder)
|
||||
case .unknown: break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct AnthropicTextContent: Codable {
|
||||
let type: String
|
||||
let text: String
|
||||
}
|
||||
|
||||
struct AnthropicImageContent: Codable {
|
||||
let type: String
|
||||
let source: AnthropicImageSource
|
||||
}
|
||||
|
||||
struct AnthropicImageSource: Codable {
|
||||
let type: String // "base64" | "url"
|
||||
let media_type: String?
|
||||
let data: String?
|
||||
let url: String?
|
||||
}
|
||||
|
||||
struct AnthropicToolUseContent: Codable {
|
||||
let type: String // "tool_use"
|
||||
let id: String
|
||||
let name: String
|
||||
let input: AnyCodable
|
||||
}
|
||||
|
||||
struct AnthropicToolResultContent: Codable {
|
||||
let type: String // "tool_result"
|
||||
let tool_use_id: String
|
||||
let content: AnthropicToolResultValue
|
||||
}
|
||||
|
||||
enum AnthropicToolResultValue: Codable {
|
||||
case text(String)
|
||||
case blocks([AnthropicTextContent])
|
||||
|
||||
init(from decoder: Decoder) throws {
|
||||
let c = try decoder.singleValueContainer()
|
||||
if let s = try? c.decode(String.self) { self = .text(s); return }
|
||||
if let b = try? c.decode([AnthropicTextContent].self) { self = .blocks(b); return }
|
||||
self = .text("")
|
||||
}
|
||||
|
||||
func encode(to encoder: Encoder) throws {
|
||||
var c = encoder.singleValueContainer()
|
||||
switch self {
|
||||
case .text(let s): try c.encode(s)
|
||||
case .blocks(let b): try c.encode(b)
|
||||
}
|
||||
}
|
||||
|
||||
var text: String {
|
||||
switch self {
|
||||
case .text(let s): return s
|
||||
case .blocks(let b): return b.map(\.text).joined(separator: "\n")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// MARK: - Non-streaming Response
|
||||
|
||||
struct AnthropicResponse: Codable {
|
||||
let id: String
|
||||
let type: String // "message"
|
||||
let role: String // "assistant"
|
||||
let content: [AnthropicResponseBlock]
|
||||
let model: String
|
||||
let stop_reason: String? // "end_turn" | "max_tokens" | "stop_sequence" | "tool_use"
|
||||
let stop_sequence: String?
|
||||
let usage: AnthropicUsage
|
||||
}
|
||||
|
||||
enum AnthropicResponseBlock: Codable {
|
||||
case text(AnthropicTextContent)
|
||||
case toolUse(AnthropicToolUseContent)
|
||||
|
||||
private enum TypeKey: String, CodingKey { case type }
|
||||
|
||||
init(from decoder: Decoder) throws {
|
||||
let kc = try decoder.container(keyedBy: TypeKey.self)
|
||||
let type = try kc.decode(String.self, forKey: .type)
|
||||
switch type {
|
||||
case "tool_use": self = .toolUse(try AnthropicToolUseContent(from: decoder))
|
||||
default: self = .text(try AnthropicTextContent(from: decoder))
|
||||
}
|
||||
}
|
||||
|
||||
func encode(to encoder: Encoder) throws {
|
||||
switch self {
|
||||
case .text(let b): try b.encode(to: encoder)
|
||||
case .toolUse(let b): try b.encode(to: encoder)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct AnthropicUsage: Codable {
|
||||
let input_tokens: Int
|
||||
let output_tokens: Int
|
||||
}
|
||||
|
||||
// MARK: - SSE Streaming Event Envelopes
|
||||
// content_block and delta use AnyCodable to avoid encoding nil optional fields.
|
||||
|
||||
struct AnthropicMessageStartEvent: Codable {
|
||||
let type: String // "message_start"
|
||||
let message: AnthropicStreamMessage
|
||||
}
|
||||
|
||||
struct AnthropicStreamMessage: Codable {
|
||||
let id: String
|
||||
let type: String // "message"
|
||||
let role: String // "assistant"
|
||||
let content: [String] // empty array at start
|
||||
let model: String
|
||||
let stop_reason: String?
|
||||
let stop_sequence: String?
|
||||
let usage: AnthropicUsage
|
||||
}
|
||||
|
||||
struct AnthropicPingEvent: Codable {
|
||||
let type: String // "ping"
|
||||
}
|
||||
|
||||
struct AnthropicContentBlockStartEvent: Codable {
|
||||
let type: String // "content_block_start"
|
||||
let index: Int
|
||||
let content_block: AnyCodable
|
||||
}
|
||||
|
||||
struct AnthropicContentBlockDeltaEvent: Codable {
|
||||
let type: String // "content_block_delta"
|
||||
let index: Int
|
||||
let delta: AnyCodable
|
||||
}
|
||||
|
||||
struct AnthropicContentBlockStopEvent: Codable {
|
||||
let type: String // "content_block_stop"
|
||||
let index: Int
|
||||
}
|
||||
|
||||
struct AnthropicMessageDeltaEvent: Codable {
|
||||
let type: String // "message_delta"
|
||||
let delta: AnthropicMessageDeltaPayload
|
||||
let usage: AnthropicStreamUsage
|
||||
}
|
||||
|
||||
struct AnthropicMessageDeltaPayload: Codable {
|
||||
let stop_reason: String?
|
||||
let stop_sequence: String?
|
||||
}
|
||||
|
||||
struct AnthropicStreamUsage: Codable {
|
||||
let output_tokens: Int
|
||||
}
|
||||
|
||||
struct AnthropicMessageStopEvent: Codable {
|
||||
let type: String // "message_stop"
|
||||
}
|
||||
|
||||
// MARK: - Conversion: Anthropic → Internal
|
||||
|
||||
/// Convert an Anthropic /v1/messages request to the internal [ChatMessage] format.
|
||||
func anthropicToInternalMessages(request: AnthropicRequest) -> [ChatMessage] {
|
||||
var messages: [ChatMessage] = []
|
||||
|
||||
// System prompt becomes a leading system-role message
|
||||
if let system = request.system {
|
||||
messages.append(ChatMessage(role: "system", content: .string(system.text)))
|
||||
}
|
||||
|
||||
for msg in request.messages {
|
||||
switch msg.content {
|
||||
case .text(let text):
|
||||
messages.append(ChatMessage(role: msg.role, content: .string(text)))
|
||||
|
||||
case .blocks(let blocks):
|
||||
var textParts: [String] = []
|
||||
var imageUrls: [String] = []
|
||||
var toolCalls: [ToolCallInfo] = []
|
||||
var hasNonToolResult = false
|
||||
|
||||
for block in blocks {
|
||||
switch block {
|
||||
case .text(let tc):
|
||||
textParts.append(tc.text)
|
||||
hasNonToolResult = true
|
||||
|
||||
case .image(let ic):
|
||||
let src = ic.source
|
||||
if src.type == "url", let url = src.url {
|
||||
imageUrls.append(url)
|
||||
} else if src.type == "base64", let data = src.data, let mt = src.media_type {
|
||||
imageUrls.append("data:\(mt);base64,\(data)")
|
||||
}
|
||||
hasNonToolResult = true
|
||||
|
||||
case .toolUse(let tu):
|
||||
// Serialize the input dict back to a JSON string for ToolCallInfo
|
||||
let argsData = (try? JSONSerialization.data(withJSONObject: tu.input.value)) ?? Data()
|
||||
let argsString = String(data: argsData, encoding: .utf8) ?? "{}"
|
||||
toolCalls.append(ToolCallInfo(
|
||||
id: tu.id,
|
||||
type: "function",
|
||||
function: FunctionCall(name: tu.name, arguments: argsString)
|
||||
))
|
||||
hasNonToolResult = true
|
||||
|
||||
case .toolResult(let tr):
|
||||
// Tool results become separate tool-role messages
|
||||
messages.append(ChatMessage(
|
||||
role: "tool",
|
||||
content: .string(tr.content.text),
|
||||
tool_call_id: tr.tool_use_id
|
||||
))
|
||||
|
||||
case .unknown:
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if hasNonToolResult {
|
||||
messages.append(ChatMessage(
|
||||
role: msg.role,
|
||||
content: .string(textParts.joined(separator: "\n")),
|
||||
images: imageUrls.isEmpty ? nil : imageUrls,
|
||||
tool_calls: toolCalls.isEmpty ? nil : toolCalls
|
||||
))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return messages
|
||||
}
|
||||
|
||||
/// Convert Anthropic tool definitions to OpenAI function-calling format,
|
||||
/// which most open-source MLX models expect in their chat templates.
|
||||
func anthropicToolsToOpenAI(_ tools: [AnyCodable]) -> [AnyCodable] {
|
||||
tools.compactMap { tool -> AnyCodable? in
|
||||
guard let dict = tool.value as? [String: Any] else { return tool }
|
||||
let name = dict["name"] as? String ?? ""
|
||||
let description = dict["description"] as? String ?? ""
|
||||
let inputSchema = dict["input_schema"] ?? [String: Any]()
|
||||
return AnyCodable([
|
||||
"type": "function",
|
||||
"function": [
|
||||
"name": name,
|
||||
"description": description,
|
||||
"parameters": inputSchema,
|
||||
] as [String: Any],
|
||||
] as [String: Any])
|
||||
}
|
||||
}
|
||||
@@ -142,6 +142,80 @@ struct MLXHTTPServer {
|
||||
}
|
||||
}
|
||||
|
||||
// Anthropic Messages API
|
||||
router.post("/v1/messages") { request, context in
|
||||
// Validate API key if set
|
||||
if !self.apiKey.isEmpty {
|
||||
let authHeader = request.headers[.authorization]
|
||||
let expectedAuth = "Bearer \(self.apiKey)"
|
||||
if authHeader != expectedAuth {
|
||||
let error = ErrorResponse(
|
||||
error: ErrorDetail(
|
||||
message: "Unauthorized",
|
||||
type_name: "authentication_error",
|
||||
code: "unauthorized"
|
||||
)
|
||||
)
|
||||
return try Response(
|
||||
status: .unauthorized,
|
||||
headers: [.contentType: "application/json"],
|
||||
body: .init(byteBuffer: encodeJSONBuffer(error))
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
do {
|
||||
let body = try await request.body.collect(upTo: 10 * 1024 * 1024)
|
||||
let anthropicReq = try JSONDecoder().decode(AnthropicRequest.self, from: body)
|
||||
|
||||
let messages = anthropicToInternalMessages(request: anthropicReq)
|
||||
let temperature = anthropicReq.temperature ?? 0.7
|
||||
let topP = anthropicReq.top_p ?? 1.0
|
||||
let maxTokens = anthropicReq.max_tokens
|
||||
let stop = anthropicReq.stop_sequences ?? []
|
||||
let isStreaming = anthropicReq.stream ?? false
|
||||
let tools: [AnyCodable]? = anthropicReq.tools.map { anthropicToolsToOpenAI($0) }
|
||||
|
||||
log("[mlx] Anthropic request: model=\(anthropicReq.model), messages=\(messages.count), stream=\(isStreaming), tools=\(tools?.count ?? 0)")
|
||||
|
||||
if isStreaming {
|
||||
return try await self.handleAnthropicStreaming(
|
||||
anthropicReq: anthropicReq,
|
||||
messages: messages,
|
||||
temperature: temperature,
|
||||
topP: topP,
|
||||
maxTokens: maxTokens,
|
||||
stop: stop,
|
||||
tools: tools
|
||||
)
|
||||
} else {
|
||||
return try await self.handleAnthropicNonStreaming(
|
||||
anthropicReq: anthropicReq,
|
||||
messages: messages,
|
||||
temperature: temperature,
|
||||
topP: topP,
|
||||
maxTokens: maxTokens,
|
||||
stop: stop,
|
||||
tools: tools
|
||||
)
|
||||
}
|
||||
} catch {
|
||||
log("[mlx] Error processing Anthropic request: \(error.localizedDescription)")
|
||||
let errorResp = ErrorResponse(
|
||||
error: ErrorDetail(
|
||||
message: error.localizedDescription,
|
||||
type_name: "invalid_request_error",
|
||||
code: nil
|
||||
)
|
||||
)
|
||||
return try Response(
|
||||
status: .badRequest,
|
||||
headers: [.contentType: "application/json"],
|
||||
body: .init(byteBuffer: encodeJSONBuffer(errorResp))
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// Cancel active generation
|
||||
router.post("/v1/cancel") { _, _ in
|
||||
let cancelled = await self.activeGenerations.cancelAll()
|
||||
@@ -394,6 +468,260 @@ struct MLXHTTPServer {
|
||||
body: .init(asyncSequence: responseStream)
|
||||
)
|
||||
}
|
||||
|
||||
// MARK: - Anthropic Non-Streaming Handler
|
||||
|
||||
private func handleAnthropicNonStreaming(
|
||||
anthropicReq: AnthropicRequest,
|
||||
messages: [ChatMessage],
|
||||
temperature: Float,
|
||||
topP: Float,
|
||||
maxTokens: Int,
|
||||
stop: [String],
|
||||
tools: [AnyCodable]?
|
||||
) async throws -> Response {
|
||||
let (text, toolCalls, usage) = try await modelRunner.generate(
|
||||
messages: messages,
|
||||
temperature: temperature,
|
||||
topP: topP,
|
||||
maxTokens: maxTokens,
|
||||
repetitionPenalty: 1.0,
|
||||
stop: stop,
|
||||
tools: tools
|
||||
)
|
||||
|
||||
var content: [AnthropicResponseBlock] = []
|
||||
if !text.isEmpty {
|
||||
content.append(.text(AnthropicTextContent(type: "text", text: text)))
|
||||
}
|
||||
for tc in toolCalls {
|
||||
let inputDict = (try? JSONSerialization.jsonObject(with: Data(tc.function.arguments.utf8))) ?? [String: Any]()
|
||||
content.append(.toolUse(AnthropicToolUseContent(
|
||||
type: "tool_use",
|
||||
id: tc.id,
|
||||
name: tc.function.name,
|
||||
input: AnyCodable(inputDict)
|
||||
)))
|
||||
}
|
||||
if content.isEmpty {
|
||||
content.append(.text(AnthropicTextContent(type: "text", text: "")))
|
||||
}
|
||||
|
||||
let stopReason = toolCalls.isEmpty ? "end_turn" : "tool_use"
|
||||
let response = AnthropicResponse(
|
||||
id: "msg-\(generateResponseId())",
|
||||
type: "message",
|
||||
role: "assistant",
|
||||
content: content,
|
||||
model: anthropicReq.model,
|
||||
stop_reason: stopReason,
|
||||
stop_sequence: nil,
|
||||
usage: AnthropicUsage(
|
||||
input_tokens: usage.prompt_tokens ?? 0,
|
||||
output_tokens: usage.completion_tokens
|
||||
)
|
||||
)
|
||||
|
||||
log("[mlx] Anthropic response: \(text.count) chars, \(toolCalls.count) tool call(s), stop=\(stopReason)")
|
||||
|
||||
let data = try anthropicEncoder.encode(response)
|
||||
var buffer = ByteBufferAllocator().buffer(capacity: data.count)
|
||||
buffer.writeBytes(data)
|
||||
return Response(
|
||||
status: .ok,
|
||||
headers: [.contentType: "application/json"],
|
||||
body: .init(byteBuffer: buffer)
|
||||
)
|
||||
}
|
||||
|
||||
// MARK: - Anthropic Streaming Handler
|
||||
|
||||
private func handleAnthropicStreaming(
|
||||
anthropicReq: AnthropicRequest,
|
||||
messages: [ChatMessage],
|
||||
temperature: Float,
|
||||
topP: Float,
|
||||
maxTokens: Int,
|
||||
stop: [String],
|
||||
tools: [AnyCodable]?
|
||||
) async throws -> Response {
|
||||
let responseId = "msg-\(generateResponseId())"
|
||||
let model = anthropicReq.model
|
||||
|
||||
let stream = await modelRunner.generateStream(
|
||||
messages: messages,
|
||||
temperature: temperature,
|
||||
topP: topP,
|
||||
maxTokens: maxTokens,
|
||||
repetitionPenalty: 1.0,
|
||||
stop: stop,
|
||||
tools: tools
|
||||
)
|
||||
|
||||
let (responseStream, continuation) = AsyncStream<ByteBuffer>.makeStream()
|
||||
let activeGenerations = self.activeGenerations
|
||||
|
||||
let task = Task {
|
||||
defer {
|
||||
continuation.finish()
|
||||
Task { await activeGenerations.remove(responseId) }
|
||||
}
|
||||
|
||||
// message_start
|
||||
let startEvent = AnthropicMessageStartEvent(
|
||||
type: "message_start",
|
||||
message: AnthropicStreamMessage(
|
||||
id: responseId,
|
||||
type: "message",
|
||||
role: "assistant",
|
||||
content: [],
|
||||
model: model,
|
||||
stop_reason: nil,
|
||||
stop_sequence: nil,
|
||||
usage: AnthropicUsage(input_tokens: 0, output_tokens: 0)
|
||||
)
|
||||
)
|
||||
if let data = try? anthropicEncoder.encode(startEvent) {
|
||||
continuation.yield(buildAnthropicSSEFrame(event: "message_start", data: data))
|
||||
}
|
||||
|
||||
// ping
|
||||
let ping = AnthropicPingEvent(type: "ping")
|
||||
if let data = try? anthropicEncoder.encode(ping) {
|
||||
continuation.yield(buildAnthropicSSEFrame(event: "ping", data: data))
|
||||
}
|
||||
|
||||
// Open the initial text content block
|
||||
let textBlockStart = AnthropicContentBlockStartEvent(
|
||||
type: "content_block_start",
|
||||
index: 0,
|
||||
content_block: AnyCodable(["type": "text", "text": ""] as [String: Any])
|
||||
)
|
||||
if let data = try? anthropicEncoder.encode(textBlockStart) {
|
||||
continuation.yield(buildAnthropicSSEFrame(event: "content_block_start", data: data))
|
||||
}
|
||||
|
||||
var blockIndex = 1 // next available block index (0 is the text block)
|
||||
var textBlockOpen = true
|
||||
var outputTokens = 0
|
||||
|
||||
do {
|
||||
for try await event in stream {
|
||||
if Task.isCancelled { break }
|
||||
|
||||
switch event {
|
||||
case .chunk(let token):
|
||||
outputTokens += 1
|
||||
let delta = AnthropicContentBlockDeltaEvent(
|
||||
type: "content_block_delta",
|
||||
index: 0,
|
||||
delta: AnyCodable(["type": "text_delta", "text": token] as [String: Any])
|
||||
)
|
||||
if let data = try? anthropicEncoder.encode(delta) {
|
||||
continuation.yield(buildAnthropicSSEFrame(event: "content_block_delta", data: data))
|
||||
}
|
||||
|
||||
case .toolCall(let tc):
|
||||
// Close the text block on first tool call
|
||||
if textBlockOpen {
|
||||
let stop = AnthropicContentBlockStopEvent(type: "content_block_stop", index: 0)
|
||||
if let data = try? anthropicEncoder.encode(stop) {
|
||||
continuation.yield(buildAnthropicSSEFrame(event: "content_block_stop", data: data))
|
||||
}
|
||||
textBlockOpen = false
|
||||
}
|
||||
|
||||
// Open a tool_use block
|
||||
let toolStart = AnthropicContentBlockStartEvent(
|
||||
type: "content_block_start",
|
||||
index: blockIndex,
|
||||
content_block: AnyCodable([
|
||||
"type": "tool_use",
|
||||
"id": tc.id,
|
||||
"name": tc.function.name,
|
||||
] as [String: Any])
|
||||
)
|
||||
if let data = try? anthropicEncoder.encode(toolStart) {
|
||||
continuation.yield(buildAnthropicSSEFrame(event: "content_block_start", data: data))
|
||||
}
|
||||
|
||||
// Send the full arguments as a single input_json_delta
|
||||
let toolDelta = AnthropicContentBlockDeltaEvent(
|
||||
type: "content_block_delta",
|
||||
index: blockIndex,
|
||||
delta: AnyCodable([
|
||||
"type": "input_json_delta",
|
||||
"partial_json": tc.function.arguments,
|
||||
] as [String: Any])
|
||||
)
|
||||
if let data = try? anthropicEncoder.encode(toolDelta) {
|
||||
continuation.yield(buildAnthropicSSEFrame(event: "content_block_delta", data: data))
|
||||
}
|
||||
|
||||
// Close the tool_use block
|
||||
let toolStop = AnthropicContentBlockStopEvent(
|
||||
type: "content_block_stop", index: blockIndex)
|
||||
if let data = try? anthropicEncoder.encode(toolStop) {
|
||||
continuation.yield(buildAnthropicSSEFrame(event: "content_block_stop", data: data))
|
||||
}
|
||||
|
||||
blockIndex += 1
|
||||
|
||||
case .done(let usage, _, let hasToolCalls):
|
||||
// Close the text block if still open
|
||||
if textBlockOpen {
|
||||
let stop = AnthropicContentBlockStopEvent(type: "content_block_stop", index: 0)
|
||||
if let data = try? anthropicEncoder.encode(stop) {
|
||||
continuation.yield(buildAnthropicSSEFrame(event: "content_block_stop", data: data))
|
||||
}
|
||||
}
|
||||
|
||||
let stopReason = hasToolCalls ? "tool_use" : "end_turn"
|
||||
let msgDelta = AnthropicMessageDeltaEvent(
|
||||
type: "message_delta",
|
||||
delta: AnthropicMessageDeltaPayload(
|
||||
stop_reason: stopReason, stop_sequence: nil),
|
||||
usage: AnthropicStreamUsage(
|
||||
output_tokens: usage.completion_tokens)
|
||||
)
|
||||
if let data = try? anthropicEncoder.encode(msgDelta) {
|
||||
continuation.yield(buildAnthropicSSEFrame(event: "message_delta", data: data))
|
||||
}
|
||||
|
||||
let msgStop = AnthropicMessageStopEvent(type: "message_stop")
|
||||
if let data = try? anthropicEncoder.encode(msgStop) {
|
||||
continuation.yield(buildAnthropicSSEFrame(event: "message_stop", data: data))
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
if !Task.isCancelled {
|
||||
log("[mlx] Error in Anthropic SSE stream: \(error.localizedDescription)")
|
||||
var buffer = ByteBufferAllocator().buffer(capacity: 256)
|
||||
buffer.writeString(
|
||||
"error: {\"message\":\"\(error.localizedDescription)\"}\n\n")
|
||||
continuation.yield(buffer)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
continuation.onTermination = { @Sendable _ in
|
||||
log("[mlx] Anthropic SSE continuation terminated, cancelling task")
|
||||
task.cancel()
|
||||
}
|
||||
|
||||
await activeGenerations.register(responseId, task: task)
|
||||
|
||||
return Response(
|
||||
status: .ok,
|
||||
headers: [
|
||||
.contentType: "text/event-stream",
|
||||
.init("Cache-Control")!: "no-cache",
|
||||
.init("Connection")!: "keep-alive",
|
||||
],
|
||||
body: .init(asyncSequence: responseStream)
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// MARK: - JSON Encoding Helpers
|
||||
@@ -406,6 +734,24 @@ private let jsonEncoder: JSONEncoder = {
|
||||
return encoder
|
||||
}()
|
||||
|
||||
/// Encoder for Anthropic API responses — no key-encoding strategy so that
|
||||
/// AnyCodable dict keys (e.g. tool input schemas) are preserved as-is.
|
||||
private let anthropicEncoder: JSONEncoder = {
|
||||
let encoder = JSONEncoder()
|
||||
encoder.dateEncodingStrategy = .secondsSince1970
|
||||
return encoder
|
||||
}()
|
||||
|
||||
/// Builds an Anthropic SSE frame: "event: <name>\ndata: <json>\n\n"
|
||||
private func buildAnthropicSSEFrame(event: String, data: Data) -> ByteBuffer {
|
||||
let prefix = "event: \(event)\ndata: "
|
||||
var buffer = sseBufferAllocator.buffer(capacity: prefix.count + data.count + 2)
|
||||
buffer.writeString(prefix)
|
||||
buffer.writeBytes(data)
|
||||
buffer.writeString("\n\n")
|
||||
return buffer
|
||||
}
|
||||
|
||||
private func encodeJSON<T: Encodable>(_ value: T) throws -> Response {
|
||||
let data = try jsonEncoder.encode(value)
|
||||
var buffer = ByteBufferAllocator().buffer(capacity: data.count)
|
||||
|
||||
@@ -66,6 +66,9 @@ enum Commands {
|
||||
/// Model ID to load (omit to pick interactively)
|
||||
#[arg(long)]
|
||||
model: Option<String>,
|
||||
/// Path to the inference binary (auto-discovered from Jan data folder when omitted)
|
||||
#[arg(long)]
|
||||
bin: Option<String>,
|
||||
/// Port the model server listens on
|
||||
#[arg(long, default_value_t = 6767)]
|
||||
port: u16,
|
||||
@@ -298,9 +301,9 @@ async fn main() {
|
||||
Commands::Models { cmd } => handle_models(cmd).await,
|
||||
Commands::App { cmd } => handle_app(cmd),
|
||||
Commands::Serve { args } => handle_serve(args).await,
|
||||
Commands::Launch { program, program_args, model, port, api_key, n_gpu_layers, ctx_size, fit, verbose } => {
|
||||
Commands::Launch { program, program_args, model, bin, port, api_key, n_gpu_layers, ctx_size, fit, verbose } => {
|
||||
let ctx_size_val = ctx_size.unwrap_or(4096);
|
||||
handle_launch(program, program_args, model, port, api_key, n_gpu_layers, ctx_size_val, fit, ctx_size.is_none(), verbose).await
|
||||
handle_launch(program, program_args, model, bin, port, api_key, n_gpu_layers, ctx_size_val, fit, ctx_size.is_none(), verbose).await
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -883,6 +886,7 @@ async fn handle_launch(
|
||||
program: String,
|
||||
program_args: Vec<String>,
|
||||
model: Option<String>,
|
||||
bin: Option<String>,
|
||||
port: u16,
|
||||
api_key: String,
|
||||
n_gpu_layers: i32,
|
||||
@@ -906,7 +910,7 @@ async fn handle_launch(
|
||||
let effective_fit = fit.unwrap_or(is_claude && ctx_size_is_default);
|
||||
|
||||
// Start the model server inline (same process, no detach).
|
||||
let (pid, actual_port) = start_model_server(&model_id, port, api_key.clone(), n_gpu_layers, ctx_size, effective_fit, verbose).await;
|
||||
let (pid, actual_port) = start_model_server(&model_id, bin, port, api_key.clone(), n_gpu_layers, ctx_size, effective_fit, verbose).await;
|
||||
|
||||
// Model is ready — silence server request/response logs so they don't
|
||||
// flood the launched program's terminal (e.g. Claude Code's shell).
|
||||
@@ -960,6 +964,7 @@ async fn handle_launch(
|
||||
/// Resolves the engine automatically (LlamaCPP or MLX).
|
||||
async fn start_model_server(
|
||||
model_id: &str,
|
||||
bin: Option<String>,
|
||||
port: u16,
|
||||
api_key: String,
|
||||
n_gpu_layers: i32,
|
||||
@@ -984,10 +989,11 @@ async fn start_model_server(
|
||||
|
||||
if engine == "mlx" {
|
||||
use std::path::Path;
|
||||
let bin_path = match discover_mlx_binary() {
|
||||
Some(p) => p.to_string_lossy().into_owned(),
|
||||
let bin_path = match bin.or_else(|| discover_mlx_binary().map(|p| p.to_string_lossy().into_owned())) {
|
||||
Some(p) => p,
|
||||
None => {
|
||||
finish_progress(pb, "✗ mlx-server binary not found");
|
||||
eprintln!("Install Jan from https://jan.ai or pass --bin <path>.");
|
||||
std::process::exit(1);
|
||||
}
|
||||
};
|
||||
@@ -1017,10 +1023,11 @@ async fn start_model_server(
|
||||
finish_progress(pb, format!("✓ {model_id} ready · {url}"));
|
||||
(info.pid, info.port as u16)
|
||||
} else {
|
||||
let bin_path = match discover_llamacpp_binary() {
|
||||
Some(p) => p.to_string_lossy().into_owned(),
|
||||
let bin_path = match bin.or_else(|| discover_llamacpp_binary().map(|p| p.to_string_lossy().into_owned())) {
|
||||
Some(p) => p,
|
||||
None => {
|
||||
finish_progress(pb, "✗ llama-server binary not found");
|
||||
eprintln!("Install a backend from Jan's settings or pass --bin <path>.");
|
||||
std::process::exit(1);
|
||||
}
|
||||
};
|
||||
|
||||
@@ -154,7 +154,6 @@ export function DataProvider() {
|
||||
.then((unsub) => {
|
||||
unsubscribe = unsub
|
||||
})
|
||||
|
||||
return () => {
|
||||
unsubscribe()
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user