Моя установка
- ОС: Windows 10 Pro, 64-разрядная версия
- Версия Python: 3.12.0
- Прошивка HuskyLens 2: 1.2.1
- Конфигурация: я правильно обновил свой GEMINI_API_KEY и установил SERVER_URL на свой локальный IP-адрес (http://192.168.1.9:3000).
- Ошибка команды переключения: когда я пытаюсь переключить алгоритм с помощью переключателя распознавания лиц, я получаю следующий ответ об ошибке:
- JSON/Data Processing Ошибка: при использовании команды Ask скрипт завершается с ошибкой типа JSON:
Следовательно, Gemini получает неверные данные и не может обработать запрос.
Вывод журнала:
Соединение с SSE на http://192.168.1.9:3000...
...
Пользователь > переключить распознавание лиц
"cmd" : "set_algorithm",
"correlation_id" : "req_8_1261922470226",
"error" : "неподдерживаемая команда set_algorithm",
"ret" : "error"
Пользователь > спрашивает, есть ли TON?
Тип [json.Exception.type_error.302] должен быть строковым, но имеет значение null
Вопросы:
- Поддерживается ли команда set_algorithm в прошивке версии 1.2.1?
- Что может быть причиной ошибки type_error.302 (нулевая строка) при получении данных с объектива?
- Есть ли какие-либо особые настройки на Сам модуль HuskyLens, который мне нужно включить?
Файл huskyMcpChat.py для тестирования:
#!/usr/bin/env python3
"""
Interactive Command Line for Huskylens 2 MCP Server
Author: Roni Bandini
Date: Nov 29, 2025
MIT License
"""
import asyncio
import aiohttp
import json
import re
import logging
import sys
from typing import Optional
import os
os.system('cls')
# --- Gemini SDK section ---
try:
from google import genai
except ImportError:
class GeminiImportError(Exception): pass
def genai(): raise GeminiImportError("Please install google-genai: 'pip install google-genai'")
# --- Configuration ---
# Logging to WARNING in order to keep the console clean for the chat
logging.basicConfig(level=logging.WARNING, format='%(levelname)s: %(message)s')
logger = logging.getLogger(__name__)
# Settings
GEMINI_API_KEY = "******DPid7VicK9s-g9WSah5PirWTq3YItwkxE" # Get your free tier API key at https://aistudio.google.com/app/api-keys
SERVER_URL = "http://192.168.1.9:3000" # get this from Huskylens MCP section
class HuskyLensClient:
def __init__(self, server_url: str):
self.server_url = server_url.rstrip('/')
self.session = None
self.request_id = 0
self.message_url = None
self.pending_responses = {}
self.initialized = False
self.sse_task = None
async def connect(self):
self.session = aiohttp.ClientSession()
print(f"
self.sse_task = asyncio.create_task(self._listen_sse())
# Wait for session URL
for _ in range(50):
if self.message_url: break
await asyncio.sleep(0.1)
if not self.message_url:
raise Exception("Failed to get session URL from MCP server
await self._initialize()
async def _initialize(self):
await self._send_request("initialize", {
"protocolVersion": "2024-11-05",
"capabilities": {},
"clientInfo": {"name": "HuskyLens-Py", "version": "1.2.0"}
})
self.initialized = True
await self._send_notification("notifications/initialized")
async def _listen_sse(self):
"""Background task to listen for server events"""
try:
async with self.session.get(f"{self.server_url}/sse") as response:
async for line in response.content:
line_text = line.decode('utf-8').strip()
if line_text.startswith('data: '):
data = line_text[6:]
# Extract Session URL
if 'session_id=' in data:
match = re.search(r'(/message\?session_id=[a-f0-9-]+)', data)
if match: self.message_url = f"{self.server_url}{match.group(1)}"
continue
elif data.startswith('/message'):
self.message_url = f"{self.server_url}{data}"
continue
if data == '[DONE]': continue
# Handle JSON-RPC Responses
try:
json_data = json.loads(data)
if isinstance(json_data, dict) and 'id' in json_data:
if json_data['id'] in self.pending_responses:
self.pending_responses[json_data['id']].set_result(json_data)
except: pass
except Exception as e:
logger.error(f"SSE Error: {e}")
async def _send_request(self, method: str, params: dict = None, timeout: float = 15.0):
if not self.message_url: raise Exception("Not connected.")
self.request_id += 1
req_id = self.request_id
fut = asyncio.Future()
self.pending_responses[req_id] = fut
try:
payload = {"jsonrpc": "2.0", "id": req_id, "method": method, "params": params or {}}
async with self.session.post(self.message_url, json=payload) as resp:
await resp.text()
return await asyncio.wait_for(fut, timeout=timeout)
except Exception as e:
if req_id in self.pending_responses: del self.pending_responses[req_id]
return {"error": str(e)}
async def _send_notification(self, method: str, params: dict = None):
if self.message_url:
async with self.session.post(self.message_url, json={"jsonrpc": "2.0", "method": method, "params": params or {}}): pass
async def call_tool(self, tool, args=None):
return await self._send_request("tools/call", {"name": tool, "arguments": args or {}})
async def close(self):
if self.sse_task: self.sse_task.cancel()
if self.session: await self.session.close()
@staticmethod
def extract_content(response: dict) -> str:
if not response or "result" not in response or "content" not in response["result"]:
return ""
return "\n".join([b.get("text", "") for b in response["result"]["content"] if b.get("type") == "text"])
class GeminiBrain:
def __init__(self, api_key: str):
try:
self.client = genai.Client(api_key=api_key)
except Exception as e:
self.client = None
print(f"
async def analyze_data(self, json_data: str, user_query: str = None) -> str:
if not self.client: return "Gemini is not configured."
# System Prompt
base_instruction = "You are a computer vision brain. You analyze raw JSON data from a HuskyLens 2 cammera."
if user_query:
# Mode: Interrogation ('ask')
prompt = f'''
{base_instruction}
SENSOR DATA (JSON):
{json_data}
USER QUESTION: "{user_query}"
INSTRUCTION:
Answer the user's question based ONLY on the evidence in the JSON data.
If the JSON is empty or confidence (conf) is low, state that clearly.
Keep the answer concise.
'''
else:
# Mode: Description ('see')
prompt = f'''
{base_instruction}
SENSOR DATA (JSON):
{json_data}
INSTRUCTION:
Briefly describe what is currently visible based on the data. Translate coordinates or IDs into natural language.
'''
loop = asyncio.get_running_loop()
try:
print("
resp = await loop.run_in_executor(None, lambda: self.client.models.generate_content(model='gemini-2.5-flash', contents=prompt))
return resp.text
except Exception as e:
return f"API Error: {e}"
async def main_loop(client: HuskyLensClient, brain: GeminiBrain):
print("\n" + "="*40)
print(" Huskylens2 MCP Command Line ")
print(" Roni Bandini 11/2025 ")
print(" MIT License ")
print("="*40)
print(" 1. list : List algorithms")
print(" 2. current : Show active algorithm")
print(" 3. switch : Switch algorithm (e.g., 'switch FaceRecognition')")
print(" 4. ask : Ask AI about the view")
print(" 5. see : General AI description")
print(" 6. photo : Take photo (to internal memory)")
print(" 7. exit : Quit")
print("-" * 40)
while True:
try:
raw_input = await asyncio.get_event_loop().run_in_executor(None, input, "\nUser > ")
parts = raw_input.strip().split(maxsplit=1)
if not parts: continue
cmd = parts[0].lower()
arg = parts[1] if len(parts) > 1 else None
if cmd in ['exit', 'quit', '7']:
break
# ---------------------------------------------------------
# 1. LIST ALGORITHMS
# ---------------------------------------------------------
elif cmd in ['list', 'ls', '1']:
print("
res = await client.call_tool("manage_applications", {"operation": "application_list"})
print(client.extract_content(res))
# ---------------------------------------------------------
# 2. CURRENT ALGORITHM
# ---------------------------------------------------------
elif cmd in ['current', 'status', '2']:
print("
res = await client.call_tool("manage_applications", {"operation": "current_application"})
content = client.extract_content(res)
print(f"
# ---------------------------------------------------------
# 3. SWITCH ALGORITHM
# ---------------------------------------------------------
elif cmd in ['switch', '3']:
if not arg:
arg = input(" Enter algorithm name (e.g., ObjectTracking) > ").strip()
if arg:
print(f"
res = await client.call_tool("manage_applications", {
"operation": "switch_application",
"algorithm": arg
})
print(f"
# ---------------------------------------------------------
# 4. ASK (Prompt with Context)
# ---------------------------------------------------------
elif cmd in ['ask', '4']:
if not arg:
print("
continue
print("
res = await client.call_tool("get_recognition_result", {"operation": "get_result"})
json_data = client.extract_content(res)
print(f"
answer = await brain.analyze_data(json_data, user_query=arg)
print(f"\n
# ---------------------------------------------------------
# 5. SEE (General Description)
# ---------------------------------------------------------
elif cmd in ['see', 'look', '5']:
print("
res = await client.call_tool("get_recognition_result", {"operation": "get_result"})
json_data = client.extract_content(res)
print(f"
answer = await brain.analyze_data(json_data, user_query=None)
print(f"\n
# ---------------------------------------------------------
# 6. PHOTO (to Internal Memory, microSD card is not used here)
# ---------------------------------------------------------
elif cmd in ['photo', 'snap', '6']:
print("
res = await client.call_tool("multimedia_control", {"operation": "take_photo"})
# Show raw output just in case
raw_out = client.extract_content(res)
print(f"
print("
else:
print("
except Exception as e:
print(f"
async def main():
if not GEMINI_API_KEY:
print("
return
client = HuskyLensClient(SERVER_URL)
brain = GeminiBrain(GEMINI_API_KEY)
try:
await client.connect()
print("
await main_loop(client, brain)
finally:
print("
await client.close()
if __name__ == '__main__':
asyncio.run(main())
Подробнее здесь: https://stackoverflow.com/questions/798 ... mcpchat-py
Мобильная версия