출장 자동화 시스템

Agent-S agent_s.py

myeongjaechoi 2025. 3. 31. 17:19
import json  # JSON 데이터 처리를 위한 라이브러리
import logging  # 로깅(기록)을 위한 라이브러리
import os  # 운영체제 관련 기능을 제공하는 라이브러리
import shutil  # 파일 및 디렉토리 고수준 작업을 위한 라이브러리
from typing import Dict, List, Optional, Tuple  # 타입 힌트를 위한 라이브러리

from gui_agents.s2.agents.grounding import ACI  # UI 상호작용을 위한 그라운딩 에이전트
from gui_agents.s2.agents.worker import Worker  # 작업 실행을 담당하는 워커 에이전트
from gui_agents.s2.agents.manager import Manager  # 작업 관리를 담당하는 매니저 에이전트
from gui_agents.s2.utils.common_utils import Node  # 노드 데이터 구조

초기화 메서드

class UIAgent:
    """UI 자동화 에이전트의 기본 클래스"""

    def __init__(
        self,
        engine_params: Dict,  # LLM 엔진 구성 매개변수
        grounding_agent: ACI,  # UI 상호작용을 위한 ACI 클래스 인스턴스
        platform: str = "macos",  # 운영체제 플랫폼 (macos, linux, windows)
        action_space: str = "pyautogui",  # 사용할 액션 공간 유형 (pyautogui, aci)
        observation_type: str = "a11y_tree",  # 사용할 관찰 유형 (a11y_tree, mixed)
        search_engine: str = "perplexica",  # 사용할 검색 엔진 (perplexica, LLM)
    ):
        """UIAgent 초기화

        매개변수:
            engine_params: LLM 엔진의 구성 매개변수
            grounding_agent: UI 상호작용을 위한 ACI 클래스 인스턴스
            platform: 운영체제 플랫폼 (macos, linux, windows)
            action_space: 사용할 액션 공간 유형 (pyautogui, aci)
            observation_type: 사용할 관찰 유형 (a11y_tree, mixed)
            search_engine: 사용할 검색 엔진 (perplexica, LLM)
        """
        self.engine_params = engine_params
        self.grounding_agent = grounding_agent
        self.platform = platform
        self.action_space = action_space
        self.observation_type = observation_type
        self.engine = search_engine

기본 메서드들

def reset(self) -> None:
    """에이전트 상태 초기화"""
    pass

def predict(self, instruction: str, observation: Dict) -> Tuple[Dict, List[str]]:
    """다음 행동 예측 생성

    매개변수:
        instruction: 자연어 지시사항
        observation: 현재 UI 상태 관찰

    반환:
        에이전트 정보 딕셔너리와 행동 목록을 포함하는 튜플
    """
    pass

def update_narrative_memory(self, trajectory: str) -> None:
    """작업 궤적으로 내러티브 메모리 업데이트

    매개변수:
        trajectory: 작업 실행 궤적을 포함하는 문자열
    """
    pass

def update_episodic_memory(self, meta_data: Dict, subtask_trajectory: str) -> str:
    """하위 작업 궤적으로 에피소드 메모리 업데이트

    매개변수:
        meta_data: 현재 하위 작업 실행에 대한 메타데이터
        subtask_trajectory: 하위 작업 실행 궤적을 포함하는 문자열

    반환:
        업데이트된 하위 작업 궤적
    """
    pass

GraphSearchAgent 클래스: 그래프 검색 기반 UI 자동화 에이전트

 

class GraphSearchAgent(UIAgent):
    """계층적 계획 및 방향성 비순환 그래프 모델링을 사용하는 UI 자동화 에이전트"""

    def __init__(
        self,
        engine_params: Dict,  # LLM 엔진 구성 매개변수
        grounding_agent: ACI,  # UI 상호작용을 위한 ACI 클래스 인스턴스
        platform: str = "macos",  # 운영체제 플랫폼 (macos, ubuntu)
        action_space: str = "pyautogui",  # 사용할 액션 공간 유형 (pyautogui, other)
        observation_type: str = "mixed",  # 사용할 관찰 유형 (a11y_tree, screenshot, mixed)
        search_engine: Optional[str] = None,  # 사용할 검색 엔진 (LLM, perplexica)
        memory_root_path: str = os.getcwd(),  # 메모리 디렉토리 경로. 기본값은 현재 작업 디렉토리.
        memory_folder_name: str = "kb",  # 메모리 폴더 이름. 기본값은 "kb".
    ):
        """GraphSearchAgent 초기화

        매개변수:
            engine_params: LLM 엔진의 구성 매개변수
            grounding_agent: UI 상호작용을 위한 ACI 클래스 인스턴스
            platform: 운영체제 플랫폼 (macos, ubuntu)
            action_space: 사용할 액션 공간 유형 (pyautogui, other)
            observation_type: 사용할 관찰 유형 (a11y_tree, screenshot, mixed)
            search_engine: 사용할 검색 엔진 (LLM, perplexica)
            memory_root_path: 메모리 디렉토리 경로. 기본값은 현재 작업 디렉토리.
            memory_folder_name: 메모리 폴더 이름. 기본값은 "kb".
        """
        super().__init__(
            engine_params,
            grounding_agent,
            platform,
            action_space,
            observation_type,
            search_engine,
        )

        self.memory_root_path = memory_root_path
        self.memory_folder_name = memory_folder_name

        # 사용자의 현재 작업 디렉토리에 에이전트의 지식 베이스 초기화.
        print("Agent-S 지식 초기 지식 베이스 다운로드 중...")
        self.local_kb_path = os.path.join(
            self.memory_root_path, self.memory_folder_name, self.platform
        )

        library_kb_path = os.path.join(working_dir, "../kb", self.platform)
        if not os.path.exists(self.local_kb_path):
            shutil.copytree(library_kb_path, self.local_kb_path)
            print("지식 베이스 다운로드를 성공적으로 완료했습니다.")
        else:
            print(
                f"경로 local_kb_path {self.local_kb_path}가 이미 존재합니다. 다운로드를 건너뜁니다."
            )
            print(
                f"초기 지식 베이스를 다시 다운로드하려면 {self.local_kb_path}에 있는 기존 지식 베이스를 삭제하세요."
            )

        self.reset()

상태 초기화 메서드

def reset(self) -> None:
    """에이전트 상태를 초기화하고 구성 요소를 초기화"""
    # 핵심 구성 요소 초기화
    self.planner = Manager(
        self.engine_params,
        self.grounding_agent,
        platform=self.platform,
        search_engine=self.engine,
        local_kb_path=self.local_kb_path,
    )
    self.executor = Worker(
        self.engine_params,
        self.grounding_agent,
        platform=self.platform,
        use_subtask_experience=True,
        local_kb_path=self.local_kb_path,
    )

    # 상태 변수 초기화
    self.requires_replan: bool = True  # 재계획이 필요한지 여부
    self.needs_next_subtask: bool = True  # 다음 하위 작업이 필요한지 여부
    self.step_count: int = 0  # 단계 카운터
    self.turn_count: int = 0  # 턴 카운터
    self.failure_subtask: Optional[Node] = None  # 실패한 하위 작업
    self.should_send_action: bool = False  # 행동을 환경에 보내야 하는지 여부
    self.completed_tasks: List[Node] = []  # 완료된 작업 리스트
    self.current_subtask: Optional[Node] = None  # 현재 하위 작업
    self.subtasks: List[Node] = []  # 하위 작업 리스트
    self.search_query: str = ""  # 검색 쿼리
    self.subtask_status: str = "Start"  # 하위 작업 상태
def reset_executor_state(self) -> None:
    """실행자와 단계 카운터를 초기화"""
    self.executor.reset()
    self.step_count = 0

예측 메서드

def predict(self, instruction: str, observation: Dict) -> Tuple[Dict, List[str]]:
    # 세 가지 정보 딕셔너리 초기화
    planner_info = {}
    executor_info = {}
    evaluator_info = {
        "obs_evaluator_response": "",
        "num_input_tokens_evaluator": 0,
        "num_output_tokens_evaluator": 0,
        "evaluator_cost": 0.0,
    }
    actions = []

    # 실행자의 DONE 응답이 하위 작업에 대한 것이라면, 에이전트는 행동을 환경에 보내지 않고 다음 하위 작업으로 계속 진행해야 함
    while not self.should_send_action:
        self.subtask_status = "In"
        # 재계획이 참이면 새 계획을 생성. 시작 시, 계획 실패 후, 또는 하위 작업 완료 후에 참
        if self.requires_replan:
            logger.info("(재)계획 중...")
            planner_info, self.subtasks = self.planner.get_action_queue(
                instruction=instruction,
                observation=observation,
                failed_subtask=self.failure_subtask,
                completed_subtasks_list=self.completed_tasks,
                remaining_subtasks_list=self.subtasks,
            )

            self.requires_replan = False
            if "search_query" in planner_info:
                self.search_query = planner_info["search_query"]
            else:
                self.search_query = ""

        # 가장 상위 하위 작업을 완료하기 위해 실행자 사용
        if self.needs_next_subtask:
            logger.info("다음 하위 작업 가져오는 중...")
            # DAG 계획자가 모든 하위 작업이 완료되었다고 판단하면 비어 있을 수 있음
            if len(self.subtasks) <= 0:
                self.requires_replan = True
                self.needs_next_subtask = True
                self.failure_subtask = None
                self.completed_tasks.append(self.current_subtask)

                # 실행자 상태 초기화
                self.reset_executor_state()
                self.should_send_action = True
                self.subtask_status = "Done"
                executor_info = {
                    "executor_plan": "agent.done()",
                    "plan_code": "agent.done()",
                    "reflection": "agent.done()",
                }
                actions = ["DONE"]
                break

            self.current_subtask = self.subtasks.pop(0)
            logger.info(f"다음 하위 작업: {self.current_subtask}")
            self.needs_next_subtask = False
            self.subtask_status = "Start"

        # 실행자로부터 다음 행동 가져오기
        executor_info, actions = self.executor.generate_next_action(
            instruction=instruction,
            search_query=self.search_query,
            subtask=self.current_subtask.name,
            subtask_info=self.current_subtask.info,
            future_tasks=self.subtasks,
            done_task=self.completed_tasks,
            obs=observation,
        )

        self.step_count += 1

        # 실행자가 행동을 반환하면 should_send_action 플래그를 True로 설정
        self.should_send_action = True

        # 실패 시 재계획
        if "FAIL" in actions:
            self.requires_replan = True
            self.needs_next_subtask = True

            # 실패한 하위 작업 할당
            self.failure_subtask = self.current_subtask

            # 단계 카운트, 실행자, 평가자 초기화
            self.reset_executor_state()

            # 더 많은 하위 작업이 남아 있으면 DONE을 환경에 보내지 않고 다음 하위 작업으로 이동
            if self.subtasks:
                self.should_send_action = False

        # 하위 작업 완료 시 재계획
        elif "DONE" in actions:
            self.requires_replan = True
            self.needs_next_subtask = True
            self.failure_subtask = None
            self.completed_tasks.append(self.current_subtask)

            # 단계 카운트, 실행자, 평가자 초기화
            self.reset_executor_state()

            # 더 많은 하위 작업이 남아 있으면 DONE을 환경에 보내지 않고 다음 하위 작업으로 이동
            if self.subtasks:
                self.should_send_action = False
            self.subtask_status = "Done"

        self.turn_count += 1

    # 다음 반복을 위해 should_send_action 플래그 재설정
    self.should_send_action = False

    # 세 가지 정보 딕셔너리 연결
    info = {
        **{
            k: v
            for d in [planner_info or {}, executor_info or {}, evaluator_info or {}]
            for k, v in d.items()
        }
    }
    info.update(
        {
            "subtask": self.current_subtask.name,
            "subtask_info": self.current_subtask.info,
            "subtask_status": self.subtask_status,
        }
    )

    return info, actions

메모리 업데이트 메서드

def update_narrative_memory(self, trajectory: str) -> None:
    """작업 궤적으로부터 내러티브 메모리 업데이트

    매개변수:
        trajectory: 작업 실행 궤적을 포함하는 문자열
    """
    try:
        reflection_path = os.path.join(self.local_kb_path, "narrative_memory.json")
        try:
            reflections = json.load(open(reflection_path))
        except:
            reflections = {}

        if self.search_query not in reflections:
            reflection = self.planner.summarize_narrative(trajectory)
            reflections[self.search_query] = reflection

        with open(reflection_path, "w") as f:
            json.dump(reflections, f, indent=2)

    except Exception as e:
        logger.error(f"내러티브 메모리 업데이트 실패: {e}")
def update_episodic_memory(self, meta_data: Dict, subtask_trajectory: str) -> str:
    """하위 작업 궤적으로부터 에피소드 메모리 업데이트

    매개변수:
        meta_data: 현재 하위 작업 실행에 대한 메타데이터
        subtask_trajectory: 하위 작업 실행 궤적을 포함하는 문자열

    반환:
        업데이트된 하위 작업 궤적
    """
    subtask = meta_data["subtask"]
    subtask_info = meta_data["subtask_info"]
    subtask_status = meta_data["subtask_status"]
    # 하위 작업 궤적 처리
    if subtask_status == "Start" or subtask_status == "Done":
        # 새 하위 작업 시작이면 이전 하위 작업 궤적이 있는 경우 마무리
        if subtask_trajectory:
            subtask_trajectory += "\n하위 작업 완료됨.\n"
            subtask_key = subtask_trajectory.split(
                "\n----------------------\n\n계획:\n"
            )[0]
            try:
                subtask_path = os.path.join(
                    self.local_kb_path, "episodic_memory.json"
                )
                kb = json.load(open(subtask_path))
            except:
                kb = {}
            if subtask_key not in kb.keys():
                subtask_summarization = self.planner.summarize_episode(
                    subtask_trajectory
                )
                kb[subtask_key] = subtask_summarization
            else:
                subtask_summarization = kb[subtask_key]
            logger.info("subtask_key: %s", subtask_key)
            logger.info("subtask_summarization: %s", subtask_summarization)
            with open(subtask_path, "w") as fout:
                json.dump(kb, fout, indent=2)
            # 다음 하위 작업을 위해 초기화
            subtask_trajectory = ""
        # 새 하위 작업 궤적 시작
        subtask_trajectory = (
            "작업:\n"
            + self.search_query
            + "\n\n하위 작업: "
            + subtask
            + "\n하위 작업 지시사항: "
            + subtask_info
            + "\n----------------------\n\n계획:\n"
            + meta_data["executor_plan"]
            + "\n"
        )
    elif subtask_status == "In":
        # 하위 작업이 여전히 진행 중이면 현재 하위 작업 궤적에 계속 추가
        subtask_trajectory += (
            "\n----------------------\n\n계획:\n"
            + meta_data["executor_plan"]
            + "\n"
        )

    return subtask_trajectory