출장 자동화 시스템
Agent-S agent_s.py
myeongjaechoi
2025. 3. 31. 17:19
import json # JSON 데이터 처리를 위한 라이브러리
import logging # 로깅(기록)을 위한 라이브러리
import os # 운영체제 관련 기능을 제공하는 라이브러리
import shutil # 파일 및 디렉토리 고수준 작업을 위한 라이브러리
from typing import Dict, List, Optional, Tuple # 타입 힌트를 위한 라이브러리
from gui_agents.s2.agents.grounding import ACI # UI 상호작용을 위한 그라운딩 에이전트
from gui_agents.s2.agents.worker import Worker # 작업 실행을 담당하는 워커 에이전트
from gui_agents.s2.agents.manager import Manager # 작업 관리를 담당하는 매니저 에이전트
from gui_agents.s2.utils.common_utils import Node # 노드 데이터 구조
초기화 메서드
class UIAgent:
"""UI 자동화 에이전트의 기본 클래스"""
def __init__(
self,
engine_params: Dict, # LLM 엔진 구성 매개변수
grounding_agent: ACI, # UI 상호작용을 위한 ACI 클래스 인스턴스
platform: str = "macos", # 운영체제 플랫폼 (macos, linux, windows)
action_space: str = "pyautogui", # 사용할 액션 공간 유형 (pyautogui, aci)
observation_type: str = "a11y_tree", # 사용할 관찰 유형 (a11y_tree, mixed)
search_engine: str = "perplexica", # 사용할 검색 엔진 (perplexica, LLM)
):
"""UIAgent 초기화
매개변수:
engine_params: LLM 엔진의 구성 매개변수
grounding_agent: UI 상호작용을 위한 ACI 클래스 인스턴스
platform: 운영체제 플랫폼 (macos, linux, windows)
action_space: 사용할 액션 공간 유형 (pyautogui, aci)
observation_type: 사용할 관찰 유형 (a11y_tree, mixed)
search_engine: 사용할 검색 엔진 (perplexica, LLM)
"""
self.engine_params = engine_params
self.grounding_agent = grounding_agent
self.platform = platform
self.action_space = action_space
self.observation_type = observation_type
self.engine = search_engine
기본 메서드들
def reset(self) -> None:
"""에이전트 상태 초기화"""
pass
def predict(self, instruction: str, observation: Dict) -> Tuple[Dict, List[str]]:
"""다음 행동 예측 생성
매개변수:
instruction: 자연어 지시사항
observation: 현재 UI 상태 관찰
반환:
에이전트 정보 딕셔너리와 행동 목록을 포함하는 튜플
"""
pass
def update_narrative_memory(self, trajectory: str) -> None:
"""작업 궤적으로 내러티브 메모리 업데이트
매개변수:
trajectory: 작업 실행 궤적을 포함하는 문자열
"""
pass
def update_episodic_memory(self, meta_data: Dict, subtask_trajectory: str) -> str:
"""하위 작업 궤적으로 에피소드 메모리 업데이트
매개변수:
meta_data: 현재 하위 작업 실행에 대한 메타데이터
subtask_trajectory: 하위 작업 실행 궤적을 포함하는 문자열
반환:
업데이트된 하위 작업 궤적
"""
pass
GraphSearchAgent 클래스: 그래프 검색 기반 UI 자동화 에이전트
class GraphSearchAgent(UIAgent):
"""계층적 계획 및 방향성 비순환 그래프 모델링을 사용하는 UI 자동화 에이전트"""
def __init__(
self,
engine_params: Dict, # LLM 엔진 구성 매개변수
grounding_agent: ACI, # UI 상호작용을 위한 ACI 클래스 인스턴스
platform: str = "macos", # 운영체제 플랫폼 (macos, ubuntu)
action_space: str = "pyautogui", # 사용할 액션 공간 유형 (pyautogui, other)
observation_type: str = "mixed", # 사용할 관찰 유형 (a11y_tree, screenshot, mixed)
search_engine: Optional[str] = None, # 사용할 검색 엔진 (LLM, perplexica)
memory_root_path: str = os.getcwd(), # 메모리 디렉토리 경로. 기본값은 현재 작업 디렉토리.
memory_folder_name: str = "kb", # 메모리 폴더 이름. 기본값은 "kb".
):
"""GraphSearchAgent 초기화
매개변수:
engine_params: LLM 엔진의 구성 매개변수
grounding_agent: UI 상호작용을 위한 ACI 클래스 인스턴스
platform: 운영체제 플랫폼 (macos, ubuntu)
action_space: 사용할 액션 공간 유형 (pyautogui, other)
observation_type: 사용할 관찰 유형 (a11y_tree, screenshot, mixed)
search_engine: 사용할 검색 엔진 (LLM, perplexica)
memory_root_path: 메모리 디렉토리 경로. 기본값은 현재 작업 디렉토리.
memory_folder_name: 메모리 폴더 이름. 기본값은 "kb".
"""
super().__init__(
engine_params,
grounding_agent,
platform,
action_space,
observation_type,
search_engine,
)
self.memory_root_path = memory_root_path
self.memory_folder_name = memory_folder_name
# 사용자의 현재 작업 디렉토리에 에이전트의 지식 베이스 초기화.
print("Agent-S 지식 초기 지식 베이스 다운로드 중...")
self.local_kb_path = os.path.join(
self.memory_root_path, self.memory_folder_name, self.platform
)
library_kb_path = os.path.join(working_dir, "../kb", self.platform)
if not os.path.exists(self.local_kb_path):
shutil.copytree(library_kb_path, self.local_kb_path)
print("지식 베이스 다운로드를 성공적으로 완료했습니다.")
else:
print(
f"경로 local_kb_path {self.local_kb_path}가 이미 존재합니다. 다운로드를 건너뜁니다."
)
print(
f"초기 지식 베이스를 다시 다운로드하려면 {self.local_kb_path}에 있는 기존 지식 베이스를 삭제하세요."
)
self.reset()
상태 초기화 메서드
def reset(self) -> None:
"""에이전트 상태를 초기화하고 구성 요소를 초기화"""
# 핵심 구성 요소 초기화
self.planner = Manager(
self.engine_params,
self.grounding_agent,
platform=self.platform,
search_engine=self.engine,
local_kb_path=self.local_kb_path,
)
self.executor = Worker(
self.engine_params,
self.grounding_agent,
platform=self.platform,
use_subtask_experience=True,
local_kb_path=self.local_kb_path,
)
# 상태 변수 초기화
self.requires_replan: bool = True # 재계획이 필요한지 여부
self.needs_next_subtask: bool = True # 다음 하위 작업이 필요한지 여부
self.step_count: int = 0 # 단계 카운터
self.turn_count: int = 0 # 턴 카운터
self.failure_subtask: Optional[Node] = None # 실패한 하위 작업
self.should_send_action: bool = False # 행동을 환경에 보내야 하는지 여부
self.completed_tasks: List[Node] = [] # 완료된 작업 리스트
self.current_subtask: Optional[Node] = None # 현재 하위 작업
self.subtasks: List[Node] = [] # 하위 작업 리스트
self.search_query: str = "" # 검색 쿼리
self.subtask_status: str = "Start" # 하위 작업 상태
def reset_executor_state(self) -> None:
"""실행자와 단계 카운터를 초기화"""
self.executor.reset()
self.step_count = 0
예측 메서드
def predict(self, instruction: str, observation: Dict) -> Tuple[Dict, List[str]]:
# 세 가지 정보 딕셔너리 초기화
planner_info = {}
executor_info = {}
evaluator_info = {
"obs_evaluator_response": "",
"num_input_tokens_evaluator": 0,
"num_output_tokens_evaluator": 0,
"evaluator_cost": 0.0,
}
actions = []
# 실행자의 DONE 응답이 하위 작업에 대한 것이라면, 에이전트는 행동을 환경에 보내지 않고 다음 하위 작업으로 계속 진행해야 함
while not self.should_send_action:
self.subtask_status = "In"
# 재계획이 참이면 새 계획을 생성. 시작 시, 계획 실패 후, 또는 하위 작업 완료 후에 참
if self.requires_replan:
logger.info("(재)계획 중...")
planner_info, self.subtasks = self.planner.get_action_queue(
instruction=instruction,
observation=observation,
failed_subtask=self.failure_subtask,
completed_subtasks_list=self.completed_tasks,
remaining_subtasks_list=self.subtasks,
)
self.requires_replan = False
if "search_query" in planner_info:
self.search_query = planner_info["search_query"]
else:
self.search_query = ""
# 가장 상위 하위 작업을 완료하기 위해 실행자 사용
if self.needs_next_subtask:
logger.info("다음 하위 작업 가져오는 중...")
# DAG 계획자가 모든 하위 작업이 완료되었다고 판단하면 비어 있을 수 있음
if len(self.subtasks) <= 0:
self.requires_replan = True
self.needs_next_subtask = True
self.failure_subtask = None
self.completed_tasks.append(self.current_subtask)
# 실행자 상태 초기화
self.reset_executor_state()
self.should_send_action = True
self.subtask_status = "Done"
executor_info = {
"executor_plan": "agent.done()",
"plan_code": "agent.done()",
"reflection": "agent.done()",
}
actions = ["DONE"]
break
self.current_subtask = self.subtasks.pop(0)
logger.info(f"다음 하위 작업: {self.current_subtask}")
self.needs_next_subtask = False
self.subtask_status = "Start"
# 실행자로부터 다음 행동 가져오기
executor_info, actions = self.executor.generate_next_action(
instruction=instruction,
search_query=self.search_query,
subtask=self.current_subtask.name,
subtask_info=self.current_subtask.info,
future_tasks=self.subtasks,
done_task=self.completed_tasks,
obs=observation,
)
self.step_count += 1
# 실행자가 행동을 반환하면 should_send_action 플래그를 True로 설정
self.should_send_action = True
# 실패 시 재계획
if "FAIL" in actions:
self.requires_replan = True
self.needs_next_subtask = True
# 실패한 하위 작업 할당
self.failure_subtask = self.current_subtask
# 단계 카운트, 실행자, 평가자 초기화
self.reset_executor_state()
# 더 많은 하위 작업이 남아 있으면 DONE을 환경에 보내지 않고 다음 하위 작업으로 이동
if self.subtasks:
self.should_send_action = False
# 하위 작업 완료 시 재계획
elif "DONE" in actions:
self.requires_replan = True
self.needs_next_subtask = True
self.failure_subtask = None
self.completed_tasks.append(self.current_subtask)
# 단계 카운트, 실행자, 평가자 초기화
self.reset_executor_state()
# 더 많은 하위 작업이 남아 있으면 DONE을 환경에 보내지 않고 다음 하위 작업으로 이동
if self.subtasks:
self.should_send_action = False
self.subtask_status = "Done"
self.turn_count += 1
# 다음 반복을 위해 should_send_action 플래그 재설정
self.should_send_action = False
# 세 가지 정보 딕셔너리 연결
info = {
**{
k: v
for d in [planner_info or {}, executor_info or {}, evaluator_info or {}]
for k, v in d.items()
}
}
info.update(
{
"subtask": self.current_subtask.name,
"subtask_info": self.current_subtask.info,
"subtask_status": self.subtask_status,
}
)
return info, actions
메모리 업데이트 메서드
def update_narrative_memory(self, trajectory: str) -> None:
"""작업 궤적으로부터 내러티브 메모리 업데이트
매개변수:
trajectory: 작업 실행 궤적을 포함하는 문자열
"""
try:
reflection_path = os.path.join(self.local_kb_path, "narrative_memory.json")
try:
reflections = json.load(open(reflection_path))
except:
reflections = {}
if self.search_query not in reflections:
reflection = self.planner.summarize_narrative(trajectory)
reflections[self.search_query] = reflection
with open(reflection_path, "w") as f:
json.dump(reflections, f, indent=2)
except Exception as e:
logger.error(f"내러티브 메모리 업데이트 실패: {e}")
def update_episodic_memory(self, meta_data: Dict, subtask_trajectory: str) -> str:
"""하위 작업 궤적으로부터 에피소드 메모리 업데이트
매개변수:
meta_data: 현재 하위 작업 실행에 대한 메타데이터
subtask_trajectory: 하위 작업 실행 궤적을 포함하는 문자열
반환:
업데이트된 하위 작업 궤적
"""
subtask = meta_data["subtask"]
subtask_info = meta_data["subtask_info"]
subtask_status = meta_data["subtask_status"]
# 하위 작업 궤적 처리
if subtask_status == "Start" or subtask_status == "Done":
# 새 하위 작업 시작이면 이전 하위 작업 궤적이 있는 경우 마무리
if subtask_trajectory:
subtask_trajectory += "\n하위 작업 완료됨.\n"
subtask_key = subtask_trajectory.split(
"\n----------------------\n\n계획:\n"
)[0]
try:
subtask_path = os.path.join(
self.local_kb_path, "episodic_memory.json"
)
kb = json.load(open(subtask_path))
except:
kb = {}
if subtask_key not in kb.keys():
subtask_summarization = self.planner.summarize_episode(
subtask_trajectory
)
kb[subtask_key] = subtask_summarization
else:
subtask_summarization = kb[subtask_key]
logger.info("subtask_key: %s", subtask_key)
logger.info("subtask_summarization: %s", subtask_summarization)
with open(subtask_path, "w") as fout:
json.dump(kb, fout, indent=2)
# 다음 하위 작업을 위해 초기화
subtask_trajectory = ""
# 새 하위 작업 궤적 시작
subtask_trajectory = (
"작업:\n"
+ self.search_query
+ "\n\n하위 작업: "
+ subtask
+ "\n하위 작업 지시사항: "
+ subtask_info
+ "\n----------------------\n\n계획:\n"
+ meta_data["executor_plan"]
+ "\n"
)
elif subtask_status == "In":
# 하위 작업이 여전히 진행 중이면 현재 하위 작업 궤적에 계속 추가
subtask_trajectory += (
"\n----------------------\n\n계획:\n"
+ meta_data["executor_plan"]
+ "\n"
)
return subtask_trajectory