MCP를 이용할 수 있는 플랫폼은 많이 있지만 개발 환경이나 구현 환경에 따라 본인이 구성한 플랫폼에서 실행하기 위해
MCP 클라이언트와 서버를 구현하는 방법을 알아야한다.
1. 사용하고자 하는 기능을 구현한 MCP 서버들
2. MCP 서버들을 하나로 통합해 실행할 수 있도록하는 클라이언트
위 두가지를 구현해야 한다.
우선 간단한 현재 시간을 알려주는 서버를 구현한다.
class TimeTools(str, Enum):
GET_CURRENT_TIME = "get_current_time"
CONVERT_TIME = "convert_time"
class TimeResult(BaseModel):
timezone: str
datetime: str
is_dst: bool
class TimeConversionResult(BaseModel):
source: TimeResult
target: TimeResult
time_difference: str
class TimeConversionInput(BaseModel):
source_tz: str
time: str
target_tz_list: list[str]
def get_local_tz(local_tz_override: str | None = None) -> ZoneInfo:
if local_tz_override:
return ZoneInfo(local_tz_override)
# Get local timezone from datetime.now()
tzinfo = datetime.now().astimezone(tz=None).tzinfo
if tzinfo is not None:
return ZoneInfo(str(tzinfo))
raise McpError("Could not determine local timezone - tzinfo is None")
def get_zoneinfo(timezone_name: str) -> ZoneInfo:
try:
return ZoneInfo(timezone_name)
except Exception as e:
raise McpError(f"Invalid timezone: {str(e)}")
class TimeServer:
def get_current_time(self, timezone_name: str) -> TimeResult:
"""Get current time in specified timezone"""
timezone = get_zoneinfo(timezone_name)
current_time = datetime.now(timezone)
return TimeResult(
timezone=timezone_name,
datetime=current_time.isoformat(timespec="seconds"),
is_dst=bool(current_time.dst()),
)
def convert_time(
self, source_tz: str, time_str: str, target_tz: str
) -> TimeConversionResult:
"""Convert time between timezones"""
source_timezone = get_zoneinfo(source_tz)
target_timezone = get_zoneinfo(target_tz)
try:
parsed_time = datetime.strptime(time_str, "%H:%M").time()
except ValueError:
raise ValueError("Invalid time format. Expected HH:MM [24-hour format]")
now = datetime.now(source_timezone)
source_time = datetime(
now.year,
now.month,
now.day,
parsed_time.hour,
parsed_time.minute,
tzinfo=source_timezone,
)
target_time = source_time.astimezone(target_timezone)
source_offset = source_time.utcoffset() or timedelta()
target_offset = target_time.utcoffset() or timedelta()
hours_difference = (target_offset - source_offset).total_seconds() / 3600
if hours_difference.is_integer():
time_diff_str = f"{hours_difference:+.1f}h"
else:
# For fractional hours like Nepal's UTC+5:45
time_diff_str = f"{hours_difference:+.2f}".rstrip("0").rstrip(".") + "h"
return TimeConversionResult(
source=TimeResult(
timezone=source_tz,
datetime=source_time.isoformat(timespec="seconds"),
is_dst=bool(source_time.dst()),
),
target=TimeResult(
timezone=target_tz,
datetime=target_time.isoformat(timespec="seconds"),
is_dst=bool(target_time.dst()),
),
time_difference=time_diff_str,
)
사용할 입출력 스키마와 Tools를 미리 작성한다.
이부분은 단순 tool call과 같기 때문에 한번에 작성했다.
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
async def serve(local_timezone: str | None = None) -> None:
server = Server("mcp-time")
time_server = TimeServer()
local_tz = str(get_local_tz(local_timezone))
@server.list_tools()
async def list_tools() -> list[Tool]:
"""List available time tools."""
return [
Tool(
name=TimeTools.GET_CURRENT_TIME.value,
description="Get current time in a specific timezones",
inputSchema={
"type": "object",
"properties": {
"timezone": {
"type": "string",
"description": f"IANA timezone name (e.g., 'America/New_York', 'Europe/London'). Use '{local_tz}' as local timezone if no timezone provided by the user.",
}
},
"required": ["timezone"],
},
),
Tool(
name=TimeTools.CONVERT_TIME.value,
description="Convert time between timezones",
inputSchema={
"type": "object",
"properties": {
"source_timezone": {
"type": "string",
"description": f"Source IANA timezone name (e.g., 'America/New_York', 'Europe/London'). Use '{local_tz}' as local timezone if no source timezone provided by the user.",
},
"time": {
"type": "string",
"description": "Time to convert in 24-hour format (HH:MM)",
},
"target_timezone": {
"type": "string",
"description": f"Target IANA timezone name (e.g., 'Asia/Tokyo', 'America/San_Francisco'). Use '{local_tz}' as local timezone if no target timezone provided by the user.",
},
},
"required": ["source_timezone", "time", "target_timezone"],
},
),
]
@server.call_tool()
async def call_tool(
name: str, arguments: dict
) -> Sequence[TextContent]:
"""Handle tool calls for time queries."""
try:
match name:
case TimeTools.GET_CURRENT_TIME.value:
timezone = arguments.get("timezone")
if not timezone:
raise ValueError("Missing required argument: timezone")
result = time_server.get_current_time(timezone)
case TimeTools.CONVERT_TIME.value:
if not all(
k in arguments
for k in ["source_timezone", "time", "target_timezone"]
):
raise ValueError("Missing required arguments")
result = time_server.convert_time(
arguments["source_timezone"],
arguments["time"],
arguments["target_timezone"],
)
case _:
raise ValueError(f"Unknown tool: {name}")
return [
TextContent(type="text", text=json.dumps(result.model_dump(), indent=2))
]
except Exception as e:
raise ValueError(f"Error processing mcp-server-time query: {str(e)}")
options = server.create_initialization_options()
async with stdio_server() as (read_stream, write_stream):
await server.run(read_stream, write_stream, options)
여기서 중요하게 봐야할것은 각 함수에 선언된 데코레이터들이다.
총 2개로
@server.list_tools()
@server.call_tools()
이다.
list_tools는 MCP 서버에서 사용할 tool들을 선언해두는 것이고
call_tools는 클라이언트에서 요청받은 프롬프트에 대해 각 도구들을 요청할 때 실행되는 로직이다.
options = server.create_initialization_options()
async with stdio_server() as (read_stream, write_stream):
await server.run(read_stream, write_stream, options)
위 코드를 통해 MCP 서버를 실행할 수 있도록 한다.
단순히 IP 포트를 통해 통신 하는것이 아닌 IO 서버를 통해 직접 연결을 통해 구동하게 된다.
다음은 이 구현한 서버를 실행 할 수 있도록 해주는 클라이언트를 구현한다.
class MCPClient:
def __init__(self):
# Initialize session and client objects
self.session: Optional[ClientSession] = None
self.exit_stack = AsyncExitStack()
self.anthropic = Anthropic()
self.servers: Dict[str, ServerConnection] = {}
self.current_server_id: Optional[str] = None
async def connect_to_server(self, server_id: str, server_script_path: str):
"""Connect to an MCP server
Args:
server_id: Unique identifier for this server connection
server_script_path: Path to the server script (.py or .js)
"""
if server_id in self.servers:
print(f"Server with ID '{server_id}' already exists.")
return
is_python = server_script_path.endswith('.py')
is_js = server_script_path.endswith('.js')
if not (is_python or is_js):
raise ValueError("Server script must be a .py or .js file")
command = "python" if is_python else "node"
server_params = StdioServerParameters(
command=command,
args=[server_script_path],
env=None
)
stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params))
self.stdio, self.write = stdio_transport
self.session = await self.exit_stack.enter_async_context(ClientSession(self.stdio, self.write))
await self.session.initialize()
server_conn = ServerConnection(server_id, self.session, self.stdio, self.write)
self.servers[server_id] = server_conn
if self.current_server_id is None:
self.current_server_id = server_id
tools = await server_conn.list_tools()
print("\nConnected to server with tools:", [tool.name for tool in tools])
def switch_server(self, server_id: str) -> bool:
"""Switch the current active server"""
if server_id in self.servers:
self.current_server_id = server_id
print(f"Switched to server: {server_id}")
return True
else:
print(f"Server '{server_id}' not found.")
return False
def list_servers(self):
"""List all connected servers"""
if not self.servers:
print("No servers connected")
return
print("\nConnected servers:")
for server_id, server in self.servers.items():
status = "ACTIVE" if server_id == self.current_server_id else "CONNECTED"
tool_names = [tool.name for tool in server.tools]
print(f" - {server_id} [{status}: Tools: {tool_names}]")
async def disconnected_server(self, server_id:str):
"""Disconnect from a specific server"""
if server_id not in self.servers:
print(f"Server '{server_id}' not found")
return False
del self.servers[server_id]
if server_id == self.current_server_id:
if self.servers:
self.current_server_id = next(iter(self.servers.keys()))
else:
self.current_server_id = None
print(f"Disconnected from server: {server_id}")
return True
async def process_query(self, query: str) -> str:
"""Process a query using Claude and available tools"""
if not self.current_server_id:
return " No active server. Please to at least one server first"
messages = [
{
"role": "user",
"content": query
}
]
all_tools = []
server_tool_map = {}
for server_id, server in self.servers.items():
tools = await server.list_tools()
for tool in tools:
tool_dict = {
"name": tool.name,
"description": f"[Server: {server_id}] {tool.description}",
"input_schema": tool.inputSchema
}
all_tools.append(tool_dict)
server_tool_map[tool.name] = server_id
# Initial Claude API call
response = self.anthropic.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1000,
messages=messages,
tools=all_tools
)
# Process response and handle tool calls
final_text = []
assistant_message_content = []
for content in response.content:
if content.type == 'text':
final_text.append(content.text)
assistant_message_content.append(content)
elif content.type == 'tool_use':
tool_name = content.name
tool_args = content.input
# Find which server this tool belongs to
if tool_name in server_tool_map:
server_id = server_tool_map[tool_name]
server = self.servers[server_id]
# Execute tool call on the appropriate server
result = await server.call_tool(tool_name, tool_args)
final_text.append(f"[Calling tool {tool_name} on server {server_id} with args {tool_args}]")
assistant_message_content.append(content)
messages.append({
"role": "assistant",
"content": assistant_message_content
})
messages.append({
"role": "user",
"content": [
{
"type": "tool_result",
"tool_use_id": content.id,
"content": result.content
}
]
})
# Get next response from Claude
response = self.anthropic.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1000,
messages=messages,
tools=all_tools
)
final_text.append(response.content[0].text)
else:
final_text.append(f"Error: Tool {tool_name} not found on any connected server.")
return "\n".join(final_text)
async def chat_loop(self):
"""Run an interactive chat loop"""
print("\nMCP Client Started!")
print("Type your queries or use commands:")
print(" /connect <server_id> <script_path> - Connect to a new server")
print(" /switch <server_id> - Switch active server")
print(" /list - List connected servers")
print(" /disconnect <server_id> - Disconnect from a server")
print(" /quit - Exit the client")
while True:
try:
user_input = input("\nQuery or command: ").strip()
if user_input.lower() == '/quit':
break
# Handle commands
if user_input.startswith('/'):
parts = user_input.split()
command = parts[0].lower()
if command == '/connect' and len(parts) >= 3:
server_id = parts[1]
script_path = parts[2]
await self.connect_to_server(server_id, script_path)
elif command == '/switch' and len(parts) >= 2:
self.switch_server(parts[1])
elif command == '/list':
self.list_servers()
elif command == '/disconnect' and len(parts) >= 2:
await self.disconnect_server(parts[1])
else:
print("Invalid command format. Type /help for available commands.")
else:
# Process regular query
response = await self.process_query(user_input)
print("\n" + response)
except Exception as e:
print(f"\nError: {str(e)}")
async def cleanup(self):
"""Clean up resources"""
await self.exit_stack.aclose()
세션으로 연결된 MCP 서버의 도구들을 가져와서 적용할 수 있다

다음과 같이 결과를 보여준다.
추가적인 MCP 서버를 적용하고 싶다면 위에서 설명한 형식으로 서버를 구축하고 명령 인자에
{server_id} {경로} 형식으로 추가만 하면 도구를 추가할 수 있다.
전체 구현 정보는 다음 깃허브에서 확인 가능하다.
'A.I.(인공지능) & M.L.(머신러닝) > Agent' 카테고리의 다른 글
Figma-MCP (0) | 2025.04.01 |
---|---|
MCP를 Claude Desktop으로 간단히 구현해보자. (1) | 2025.03.24 |
MCP (Model Context Protocol) 소개 (0) | 2025.03.24 |