로컬환경에서 MCP 구현

MCP를 이용할 수 있는 플랫폼은 많이 있지만 개발 환경이나 구현 환경에 따라 본인이 구성한 플랫폼에서 실행하기 위해 

MCP 클라이언트와 서버를 구현하는 방법을 알아야한다. 

 

1. 사용하고자 하는 기능을 구현한 MCP 서버들 

2. MCP 서버들을 하나로 통합해 실행할 수 있도록하는 클라이언트

 

위 두가지를 구현해야 한다.

 

우선 간단한 현재 시간을 알려주는 서버를 구현한다.

class TimeTools(str, Enum):
    GET_CURRENT_TIME = "get_current_time"
    CONVERT_TIME = "convert_time"


class TimeResult(BaseModel):
    timezone: str
    datetime: str
    is_dst: bool


class TimeConversionResult(BaseModel):
    source: TimeResult
    target: TimeResult
    time_difference: str


class TimeConversionInput(BaseModel):
    source_tz: str
    time: str
    target_tz_list: list[str]
    
def get_local_tz(local_tz_override: str | None = None) -> ZoneInfo:
    if local_tz_override:
        return ZoneInfo(local_tz_override)

    # Get local timezone from datetime.now()
    tzinfo = datetime.now().astimezone(tz=None).tzinfo
    if tzinfo is not None:
        return ZoneInfo(str(tzinfo))
    raise McpError("Could not determine local timezone - tzinfo is None")


def get_zoneinfo(timezone_name: str) -> ZoneInfo:
    try:
        return ZoneInfo(timezone_name)
    except Exception as e:
        raise McpError(f"Invalid timezone: {str(e)}")
        
        
class TimeServer:
    def get_current_time(self, timezone_name: str) -> TimeResult:
        """Get current time in specified timezone"""
        timezone = get_zoneinfo(timezone_name)
        current_time = datetime.now(timezone)

        return TimeResult(
            timezone=timezone_name,
            datetime=current_time.isoformat(timespec="seconds"),
            is_dst=bool(current_time.dst()),
        )

    def convert_time(
        self, source_tz: str, time_str: str, target_tz: str
    ) -> TimeConversionResult:
        """Convert time between timezones"""
        source_timezone = get_zoneinfo(source_tz)
        target_timezone = get_zoneinfo(target_tz)

        try:
            parsed_time = datetime.strptime(time_str, "%H:%M").time()
        except ValueError:
            raise ValueError("Invalid time format. Expected HH:MM [24-hour format]")

        now = datetime.now(source_timezone)
        source_time = datetime(
            now.year,
            now.month,
            now.day,
            parsed_time.hour,
            parsed_time.minute,
            tzinfo=source_timezone,
        )

        target_time = source_time.astimezone(target_timezone)
        source_offset = source_time.utcoffset() or timedelta()
        target_offset = target_time.utcoffset() or timedelta()
        hours_difference = (target_offset - source_offset).total_seconds() / 3600

        if hours_difference.is_integer():
            time_diff_str = f"{hours_difference:+.1f}h"
        else:
            # For fractional hours like Nepal's UTC+5:45
            time_diff_str = f"{hours_difference:+.2f}".rstrip("0").rstrip(".") + "h"

        return TimeConversionResult(
            source=TimeResult(
                timezone=source_tz,
                datetime=source_time.isoformat(timespec="seconds"),
                is_dst=bool(source_time.dst()),
            ),
            target=TimeResult(
                timezone=target_tz,
                datetime=target_time.isoformat(timespec="seconds"),
                is_dst=bool(target_time.dst()),
            ),
            time_difference=time_diff_str,
        )

사용할 입출력 스키마와 Tools를 미리 작성한다. 

 

이부분은 단순 tool call과 같기 때문에 한번에 작성했다.

 

from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent

async def serve(local_timezone: str | None = None) -> None:
    server = Server("mcp-time")
    time_server = TimeServer()
    local_tz = str(get_local_tz(local_timezone))

    @server.list_tools()
    async def list_tools() -> list[Tool]:
        """List available time tools."""
        return [
            Tool(
                name=TimeTools.GET_CURRENT_TIME.value,
                description="Get current time in a specific timezones",
                inputSchema={
                    "type": "object",
                    "properties": {
                        "timezone": {
                            "type": "string",
                            "description": f"IANA timezone name (e.g., 'America/New_York', 'Europe/London'). Use '{local_tz}' as local timezone if no timezone provided by the user.",
                        }
                    },
                    "required": ["timezone"],
                },
            ),
            Tool(
                name=TimeTools.CONVERT_TIME.value,
                description="Convert time between timezones",
                inputSchema={
                    "type": "object",
                    "properties": {
                        "source_timezone": {
                            "type": "string",
                            "description": f"Source IANA timezone name (e.g., 'America/New_York', 'Europe/London'). Use '{local_tz}' as local timezone if no source timezone provided by the user.",
                        },
                        "time": {
                            "type": "string",
                            "description": "Time to convert in 24-hour format (HH:MM)",
                        },
                        "target_timezone": {
                            "type": "string",
                            "description": f"Target IANA timezone name (e.g., 'Asia/Tokyo', 'America/San_Francisco'). Use '{local_tz}' as local timezone if no target timezone provided by the user.",
                        },
                    },
                    "required": ["source_timezone", "time", "target_timezone"],
                },
            ),
        ]

    @server.call_tool()
    async def call_tool(
        name: str, arguments: dict
    ) -> Sequence[TextContent]:
        """Handle tool calls for time queries."""
        try:
            match name:
                case TimeTools.GET_CURRENT_TIME.value:
                    timezone = arguments.get("timezone")
                    if not timezone:
                        raise ValueError("Missing required argument: timezone")

                    result = time_server.get_current_time(timezone)

                case TimeTools.CONVERT_TIME.value:
                    if not all(
                        k in arguments
                        for k in ["source_timezone", "time", "target_timezone"]
                    ):
                        raise ValueError("Missing required arguments")

                    result = time_server.convert_time(
                        arguments["source_timezone"],
                        arguments["time"],
                        arguments["target_timezone"],
                    )
                case _:
                    raise ValueError(f"Unknown tool: {name}")

            return [
                TextContent(type="text", text=json.dumps(result.model_dump(), indent=2))
            ]

        except Exception as e:
            raise ValueError(f"Error processing mcp-server-time query: {str(e)}")
        
    options = server.create_initialization_options()
    async with stdio_server() as (read_stream, write_stream):
        await server.run(read_stream, write_stream, options)

 

여기서 중요하게 봐야할것은 각 함수에 선언된 데코레이터들이다.

총 2개로

@server.list_tools()

@server.call_tools()

이다.

list_tools는 MCP 서버에서 사용할 tool들을 선언해두는 것이고 

call_tools는 클라이언트에서 요청받은 프롬프트에 대해 각 도구들을 요청할 때 실행되는 로직이다.

 

options = server.create_initialization_options()
    async with stdio_server() as (read_stream, write_stream):
        await server.run(read_stream, write_stream, options)

위 코드를 통해 MCP 서버를 실행할 수 있도록 한다.

 

단순히 IP 포트를 통해 통신 하는것이 아닌 IO 서버를 통해 직접 연결을 통해 구동하게 된다.

 

다음은 이 구현한 서버를 실행 할 수 있도록 해주는 클라이언트를 구현한다.

class MCPClient:
    def __init__(self):
        # Initialize session and client objects
        self.session: Optional[ClientSession] = None
        self.exit_stack = AsyncExitStack()
        self.anthropic = Anthropic()
        self.servers: Dict[str, ServerConnection] = {}
        self.current_server_id: Optional[str] = None

    async def connect_to_server(self, server_id: str, server_script_path: str):
        """Connect to an MCP server

        Args:
            server_id: Unique identifier for this server connection
            server_script_path: Path to the server script (.py or .js)
        """
        if server_id in self.servers:
            print(f"Server with ID '{server_id}' already exists.")
            return
        
        is_python = server_script_path.endswith('.py')
        is_js = server_script_path.endswith('.js')
        if not (is_python or is_js):
            raise ValueError("Server script must be a .py or .js file")

        command = "python" if is_python else "node"
        server_params = StdioServerParameters(
            command=command,
            args=[server_script_path],
            env=None
        )

        stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params))
        self.stdio, self.write = stdio_transport
        self.session = await self.exit_stack.enter_async_context(ClientSession(self.stdio, self.write))

        await self.session.initialize()

        server_conn = ServerConnection(server_id, self.session, self.stdio, self.write)
        self.servers[server_id] = server_conn

        if self.current_server_id is None:
            self.current_server_id = server_id

        tools = await server_conn.list_tools()
        print("\nConnected to server with tools:", [tool.name for tool in tools])

    def switch_server(self, server_id: str) -> bool:
        """Switch the current active server"""
        if server_id in self.servers:
            self.current_server_id = server_id
            print(f"Switched to server: {server_id}")
            return True
        else:
            print(f"Server '{server_id}' not found.")
            return False
        
    def list_servers(self):
        """List all connected servers"""
        if not self.servers:
            print("No servers connected")
            return
        
        print("\nConnected servers:")
        for server_id, server in self.servers.items():
            status = "ACTIVE" if server_id == self.current_server_id else "CONNECTED"
            tool_names = [tool.name for tool in server.tools]
            print(f"  - {server_id} [{status}: Tools: {tool_names}]")

    async def disconnected_server(self, server_id:str):
        """Disconnect from a specific server"""
        if server_id not in self.servers:
            print(f"Server '{server_id}' not found")
            return False

        del self.servers[server_id]

        if server_id == self.current_server_id:
            if self.servers:
                self.current_server_id = next(iter(self.servers.keys()))
            else:
                self.current_server_id = None
        
        print(f"Disconnected from server: {server_id}")
        return True

    async def process_query(self, query: str) -> str:
        """Process a query using Claude and available tools"""
        if not self.current_server_id:
            return " No active server. Please to at least one server first"
        
        messages = [
            {
                "role": "user",
                "content": query
            }
        ]

        all_tools = []
        server_tool_map = {}

        for server_id, server in self.servers.items():
            tools = await server.list_tools()
            for tool in tools:
                tool_dict = {
                    "name": tool.name,
                    "description": f"[Server: {server_id}] {tool.description}",
                    "input_schema": tool.inputSchema
                }
                all_tools.append(tool_dict)
                server_tool_map[tool.name] = server_id

         # Initial Claude API call
        response = self.anthropic.messages.create(
            model="claude-3-5-sonnet-20241022",
            max_tokens=1000,
            messages=messages,
            tools=all_tools
        )

        # Process response and handle tool calls
        final_text = []

        assistant_message_content = []
        for content in response.content:
            if content.type == 'text':
                final_text.append(content.text)
                assistant_message_content.append(content)
            elif content.type == 'tool_use':
                tool_name = content.name
                tool_args = content.input
                
                # Find which server this tool belongs to
                if tool_name in server_tool_map:
                    server_id = server_tool_map[tool_name]
                    server = self.servers[server_id]
                    
                    # Execute tool call on the appropriate server
                    result = await server.call_tool(tool_name, tool_args)
                    final_text.append(f"[Calling tool {tool_name} on server {server_id} with args {tool_args}]")

                    assistant_message_content.append(content)
                    messages.append({
                        "role": "assistant",
                        "content": assistant_message_content
                    })
                    messages.append({
                        "role": "user",
                        "content": [
                            {
                                "type": "tool_result",
                                "tool_use_id": content.id,
                                "content": result.content
                            }
                        ]
                    })

                    # Get next response from Claude
                    response = self.anthropic.messages.create(
                        model="claude-3-5-sonnet-20241022",
                        max_tokens=1000,
                        messages=messages,
                        tools=all_tools
                    )

                    final_text.append(response.content[0].text)
                else:
                    final_text.append(f"Error: Tool {tool_name} not found on any connected server.")

        return "\n".join(final_text)
    
    async def chat_loop(self):
        """Run an interactive chat loop"""
        print("\nMCP Client Started!")
        print("Type your queries or use commands:")
        print("  /connect <server_id> <script_path> - Connect to a new server")
        print("  /switch <server_id> - Switch active server")
        print("  /list - List connected servers")
        print("  /disconnect <server_id> - Disconnect from a server")
        print("  /quit - Exit the client")

        while True:
            try:
                user_input = input("\nQuery or command: ").strip()

                if user_input.lower() == '/quit':
                    break
                    
                # Handle commands
                if user_input.startswith('/'):
                    parts = user_input.split()
                    command = parts[0].lower()
                    
                    if command == '/connect' and len(parts) >= 3:
                        server_id = parts[1]
                        script_path = parts[2]
                        await self.connect_to_server(server_id, script_path)
                    elif command == '/switch' and len(parts) >= 2:
                        self.switch_server(parts[1])
                    elif command == '/list':
                        self.list_servers()
                    elif command == '/disconnect' and len(parts) >= 2:
                        await self.disconnect_server(parts[1])
                    else:
                        print("Invalid command format. Type /help for available commands.")
                else:
                    # Process regular query
                    response = await self.process_query(user_input)
                    print("\n" + response)

            except Exception as e:
                print(f"\nError: {str(e)}")

    async def cleanup(self):
        """Clean up resources"""
        await self.exit_stack.aclose()

 

 

세션으로 연결된 MCP 서버의 도구들을 가져와서 적용할 수 있다

 

다음과 같이 결과를 보여준다.

 

추가적인 MCP 서버를 적용하고 싶다면 위에서 설명한 형식으로 서버를 구축하고 명령 인자에 

{server_id} {경로} 형식으로 추가만 하면 도구를 추가할 수 있다.

 

전체 구현 정보는 다음 깃허브에서 확인 가능하다.

MCP 클라이언트

MCP 서버

  • 네이버 블로그 공유
  • 네이버 밴드 공유
  • 페이스북 공유