출장 자동화 시스템
Agent-S cli_app.py, common_utils.py, ocr_server.py, query_perplexica.py, procedural_memory.py
myeongjaechoi
2025. 3. 30. 14:58
cli_app.py
import argparse # 명령줄 인수를 처리하기 위한 argparse 모듈을 가져옵니다.
import datetime # 날짜와 시간을 처리하기 위한 datetime 모듈을 가져옵니다.
import io # 파일 입출력을 위한 io 모듈을 가져옵니다.
import logging # 로그 기록을 관리하기 위한 logging 모듈을 가져옵니다.
import os # 운영 체제 관련 기능을 사용하기 위한 os 모듈을 가져옵니다.
import platform # 운영 체제 정보를 확인하기 위한 platform 모듈을 가져옵니다.
import pyautogui # 화면 캡처 및 GUI 자동화를 위한 pyautogui 모듈을 가져옵니다.
import sys # 시스템 관련 기능을 사용하기 위한 sys 모듈을 가져옵니다.
import time # 시간 관련 기능을 사용하기 위한 time 모듈을 가져옵니다.
from PIL import Image # 이미지 처리 및 변환을 위한 Python Imaging Library(PIL)의 Image 클래스 가져오기.
# 현재 운영 체제 정보를 확인하고 플랫폼 이름을 설정합니다.
if platform.system() == "Darwin":
current_platform = "macos" # macOS인 경우 플랫폼 이름 설정
elif platform.system() == "Linux":
current_platform = "ubuntu" # Linux인 경우 플랫폼 이름 설정
elif platform.system() == "Windows":
current_platform = "windows" # Windows인 경우 플랫폼 이름 설정
else:
raise ValueError("Unsupported platform") # 지원되지 않는 플랫폼일 경우 예외 발생
# 외부 모듈에서 필요한 에이전트 클래스들을 가져옵니다.
from gui_agents.s2.agents.grounding import OSWorldACI
from gui_agents.s2.agents.agent_s import GraphSearchAgent
# 로거를 설정합니다.
logger = logging.getLogger() # 기본 로거 객체를 생성합니다.
logger.setLevel(logging.DEBUG) # 로깅 레벨을 DEBUG로 설정합니다.
# 현재 날짜와 시간을 문자열로 포맷합니다.
datetime_str: str = datetime.datetime.now().strftime("%Y%m%d@%H%M%S")
# 로그 디렉터리를 생성합니다(존재하지 않으면 생성).
log_dir = "logs"
os.makedirs(log_dir, exist_ok=True)
# 여러 종류의 로그 핸들러를 설정합니다.
file_handler = logging.FileHandler(
os.path.join("logs", "normal-{:}.log".format(datetime_str)), encoding="utf-8"
) # 일반 로그 파일 핸들러
debug_handler = logging.FileHandler(
os.path.join("logs", "debug-{:}.log".format(datetime_str)), encoding="utf-8"
) # 디버그 로그 파일 핸들러
stdout_handler = logging.StreamHandler(sys.stdout) # 콘솔 출력 핸들러
sdebug_handler = logging.FileHandler(
os.path.join("logs", "sdebug-{:}.log".format(datetime_str)), encoding="utf-8"
) # 상세 디버그 로그 파일 핸들러
# 각 핸들러의 로깅 레벨을 설정합니다.
file_handler.setLevel(logging.INFO)
debug_handler.setLevel(logging.DEBUG)
stdout_handler.setLevel(logging.INFO)
sdebug_handler.setLevel(logging.DEBUG)
# 로그 메시지 포맷터를 정의하고 각 핸들러에 적용합니다.
formatter = logging.Formatter(
fmt="\x1b[1;33m[%(asctime)s \x1b[31m%(levelname)s \x1b[32m%(module)s/%(lineno)d-%(processName)s\x1b[1;33m] \x1b[0m%(message)s"
)
file_handler.setFormatter(formatter)
debug_handler.setFormatter(formatter)
stdout_handler.setFormatter(formatter)
sdebug_handler.setFormatter(formatter)
# 특정 필터를 추가하여 환경에 따라 로그를 필터링합니다.
stdout_handler.addFilter(logging.Filter("desktopenv"))
sdebug_handler.addFilter(logging.Filter("desktopenv"))
# 로거에 핸들러를 추가합니다.
logger.addHandler(file_handler)
logger.addHandler(debug_handler)
logger.addHandler(stdout_handler)
logger.addHandler(sdebug_handler)
platform_os = platform.system() # 현재 운영 체제 정보를 확인합니다.
# 플랫폼별로 권한 확인 대화 상자를 표시하는 함수입니다.
def show_permission_dialog(code: str, action_description: str):
"""플랫폼별 권한 확인 대화 상자를 표시하고 승인 여부를 반환합니다."""
if platform.system() == "Darwin":
result = os.system(
f'osascript -e \'display dialog "Do you want to execute this action?\n\n{code} which will try to {action_description}" with title "Action Permission" buttons {{"Cancel", "OK"}} default button "OK" cancel button "Cancel"\''
)
return result == 0
elif platform.system() == "Linux":
result = os.system(
f'zenity --question --title="Action Permission" --text="Do you want to execute this action?\n\n{code}" --width=400 --height=200'
)
return result == 0
return False
# 화면 크기를 특정 최대 크기에 맞게 조정하는 함수입니다.
def scale_screen_dimensions(width: int, height: int, max_dim_size: int):
scale_factor = min(max_dim_size / width, max_dim_size / height, 1) # 스케일 비율 계산
safe_width = int(width * scale_factor) # 조정된 너비 계산
safe_height = int(height * scale_factor) # 조정된 높이 계산
return safe_width, safe_height
# 에이전트를 실행하는 함수입니다.
def run_agent(agent, instruction: str, scaled_width: int, scaled_height: int):
obs = {} # 관찰 데이터를 저장할 딕셔너리 초기화
traj = "Task:\n" + instruction # 작업 설명 초기화
subtask_traj = "" # 하위 작업 설명 초기화
for _ in range(15): # 최대 반복 횟수는 15번입니다.
screenshot = pyautogui.screenshot() # 화면 캡처 수행
screenshot = screenshot.resize((scaled_width, scaled_height), Image.LANCZOS) # 캡처 이미지를 크기 조정
buffered = io.BytesIO() # 메모리 버퍼 생성
screenshot.save(buffered, format="PNG") # 캡처 이미지를 PNG 형식으로 저장
screenshot_bytes = buffered.getvalue() # 캡처 이미지의 바이트 데이터를 가져옴
obs["screenshot"] = screenshot_bytes
info, code = agent.predict(instruction=instruction, observation=obs) # 에이전트에게 다음 행동 예측 요청
if "done" in code[0].lower() or "fail" in code[0].lower():
if platform.system() == "Darwin":
os.system(
f'osascript -e \'display dialog "Task Completed" with title "OpenACI Agent" buttons "OK" default button "OK"\''
)
elif platform.system() == "Linux":
os.system(
f'zenity --info --title="OpenACI Agent" --text="Task Completed" --width=200 --height=100'
)
agent.update_narrative_memory(traj) # 작업 내러티브 메모리 업데이트
break
if "next" in code[0].lower():
continue
if "wait" in code[0].lower():
time.sleep(5) # 대기 시간 설정 후 계속 진행
continue
else:
time.sleep(1.0)
print("EXECUTING CODE:", code[0]) # 실행 중인 코드 출력
exec(code[0]) # 코드 실행
time.sleep(1.0)
traj += (
"\n\nReflection:\n"
+ str(info["reflection"])
+ "\n\n———————————\n\nPlan:\n"
+ info["executor_plan"]
)
subtask_traj = agent.update_episodic_memory(info, subtask_traj)
# 프로그램의 진입점(main 함수)을 정의합니다.
def main():
parser = argparse.ArgumentParser(
description="Run GraphSearchAgent with specified model."
)
common_utils.py
import json # JSON 데이터를 처리하기 위한 모듈
import re # 정규표현식 처리를 위한 모듈
from typing import List # 타입 힌트로 리스트를 사용하기 위한 모듈
import time # 시간 관련 기능을 사용하기 위한 모듈
import tiktoken # OpenAI 모델의 토큰화를 처리하기 위한 모듈
from typing import Tuple, List, Union, Dict # 다양한 데이터 타입 힌트를 제공하기 위한 모듈
from pydantic import BaseModel, ValidationError # 데이터 유효성 검사를 위한 Pydantic의 기본 모델 및 예외 처리 클래스
import pickle # 객체를 파일로 저장하거나 로드하기 위한 모듈
# 노드(Node) 클래스 정의 (Pydantic을 사용하여 데이터 구조를 검증)
class Node(BaseModel):
name: str # 노드의 이름
info: str # 노드에 대한 정보
# DAG(Directed Acyclic Graph) 클래스 정의 (노드와 엣지로 구성)
class Dag(BaseModel):
nodes: List[Node] # DAG의 노드 리스트
edges: List[List[Node]] # DAG의 엣지 리스트 (노드 간 연결)
NUM_IMAGE_TOKEN = 1105 # OpenAI Vision에서 사용하는 화면 크기(1920x1080)에 따른 이미지 토큰 수 설정
# 안전하게 LLM(Large Language Model)을 호출하는 함수
def call_llm_safe(agent) -> Union[str, Dag]:
max_retries = 3 # 최대 재시도 횟수 설정
attempt = 0 # 현재 시도 횟수를 초기화
response = "" # 응답 초기화
while attempt < max_retries: # 최대 재시도 횟수까지 반복
try:
response = agent.get_response() # 에이전트에서 응답을 가져옴
break # 성공하면 반복 종료
except Exception as e: # 예외 발생 시 처리
attempt += 1
print(f"Attempt {attempt} failed: {e}") # 실패 메시지 출력
if attempt == max_retries:
print("Max retries reached. Handling failure.") # 최대 재시도 도달 시 메시지 출력
time.sleep(1.0) # 재시도 전 대기 시간 설정
return response
# 메시지에서 입력 및 출력 토큰 수를 계산하는 함수
def calculate_tokens(messages, num_image_token=NUM_IMAGE_TOKEN) -> Tuple[int, int]:
num_input_images = 0 # 입력 이미지 수 초기화
output_message = messages[-1] # 마지막 메시지를 출력 메시지로 설정
input_message = messages[:-1] # 나머지 메시지는 입력 메시지로 설정
input_string = """"""
for message in input_message:
input_string += message["content"][0]["text"] + "\n" # 입력 텍스트를 하나의 문자열로 결합
if len(message["content"]) > 1:
num_input_images += 1
input_text_tokens = get_input_token_length(input_string)
input_image_tokens = num_image_token * num_input_images
output_tokens = get_input_token_length(output_message["content"][0]["text"])
return (input_text_tokens + input_image_tokens), output_tokens
# 텍스트에서 JSON 형식의 DAG를 파싱하는 함수
def parse_dag(text):
pattern = r"<json>(.*?)</json>"
match = re.search(pattern, text, re.DOTALL)
if match:
json_str = match.group(1)
try:
json_data = json.loads(json_str)
return Dag(**json_data["dag"])
except json.JSONDecodeError:
print("Error: Invalid JSON")
return None
except KeyError:
print("Error: 'dag' key not found in JSON")
return None
except ValidationError as e:
print(f"Error: Invalid data structure - {e}")
return None
else:
print("Error: JSON not found")
return None
# 문자열에서 코드 블록을 추출하는 함수
def parse_single_code_from_string(input_string):
input_string = input_string.strip()
if input_string.strip() in ["WAIT", "DONE", "FAIL"]:
return input_string.strip()
pattern = r"``````"
matches = re.findall(pattern, input_string, re.DOTALL)
codes = []
for match in matches:
match = match.strip()
commands = [
"WAIT",
"DONE",
"FAIL",
]
if match in commands:
codes.append(match.strip())
elif match.split("\n")[-1] in commands:
if len(match.split("\n")) > 1:
codes.append("\n".join(match.split("\n")[:-1]))
codes.append(match.split("\n")[-1])
else:
codes.append(match)
if len(codes) <= 0:
return "fail"
return codes[0]
# 입력 문자열의 토큰 길이를 계산하는 함수
def get_input_token_length(input_string):
enc = tiktoken.encoding_for_model("gpt-4")
tokens = enc.encode(input_string)
return len(tokens)
# 코드 문자열을 정리하는 함수 (특정 패턴을 수정)
def sanitize_code(code):
if "\n" in code:
pattern = r'(".*?")'
matches = re.findall(pattern, code, flags=re.DOTALL)
if matches:
first_match = matches[0]
code = code.replace(first_match, f'"""{first_match[1:-1]}"""', 1)
return code
# 코드 문자열에서 첫 번째 에이전트 함수 호출을 추출하는 함수
def extract_first_agent_function(code_string):
pattern = r'agent\.[a-zA-Z_]+\((?:[^()\'"]|\'[^\']*\'|"[^"]*")*\)'
matches = re.findall(pattern, code_string)
return matches[0] if matches else None
# 지식 베이스를 로드하는 함수 (JSON 형식)
def load_knowledge_base(kb_path: str) -> Dict:
try:
with open(kb_path, "r") as f:
return json.load(f)
except Exception as e:
print(f"Error loading knowledge base: {e}")
return {}
# 임베딩 데이터를 로드하는 함수 (Pickle 형식)
def load_embeddings(embeddings_path: str) -> Dict:
try:
with open(embeddings_path, "rb") as f:
return pickle.load(f)
except Exception as e:
print(f"Error loading embeddings: {e}")
return {}
# 임베딩 데이터를 저장하는 함수 (Pickle 형식)
def save_embeddings(embeddings_path: str, embeddings: Dict):
try:
with open(embeddings_path, "wb") as f:
pickle.dump(embeddings, f)
except Exception as e:
print(f"Error saving embeddings: {e}")
ocr_server.py
from fastapi import FastAPI # FastAPI를 사용하여 웹 애플리케이션을 생성하기 위한 모듈
from pydantic import BaseModel # 데이터 유효성 검사를 위한 Pydantic의 기본 모델
from PIL import Image # 이미지 처리를 위한 Python Imaging Library (Pillow)
import io # 바이트 데이터 처리를 위한 모듈
import numpy as np # 배열 및 행렬 연산을 위한 NumPy 라이브러리
from paddleocr import PaddleOCR # OCR(문자 인식)을 수행하기 위한 PaddleOCR 라이브러리
import base64 # Base64 인코딩 및 디코딩을 위한 모듈
import gc # 가비지 컬렉션(메모리 관리)을 위한 모듈
# FastAPI 애플리케이션 생성
app = FastAPI()
# PaddleOCR 객체 초기화 (영어 언어 모델과 각도 분류 활성화)
ocr_module = PaddleOCR(use_angle_cls=True, lang="en")
# 이미지 데이터를 표현하기 위한 Pydantic 모델 정의
class ImageData(BaseModel):
img_bytes: bytes # Base64로 인코딩된 이미지 데이터를 바이트로 받음
# PaddleOCR 결과를 특정 형식으로 변환하는 함수
def text_cvt_orc_format_paddle(paddle_result):
texts = [] # 결과를 저장할 리스트 초기화
print("paddle_result: ", paddle_result) # PaddleOCR 결과 출력 (디버깅용)
for i, line in enumerate(paddle_result[0]): # OCR 결과에서 각 라인을 순회
points = np.array(line[0]) # 텍스트 위치 좌표를 NumPy 배열로 변환
print("points: ", points) # 좌표 출력 (디버깅용)
location = {
"left": int(min(points[:, 0])), # 왼쪽 경계값 계산
"top": int(min(points[:, 1])), # 위쪽 경계값 계산
"right": int(max(points[:, 0])), # 오른쪽 경계값 계산
"bottom": int(max(points[:, 1])), # 아래쪽 경계값 계산
}
print("location: ", location) # 계산된 위치 출력 (디버깅용)
content = line[1][0] # OCR로 인식된 텍스트 내용 추출
texts.append((i, content, location)) # 결과 리스트에 텍스트와 위치 추가
return texts
# OCR 처리를 수행하는 함수
def ocr_results(screenshot):
screenshot_img = Image.open(io.BytesIO(screenshot)) # 바이트 데이터를 이미지로 변환
result = ocr_module.ocr(np.array(screenshot_img), cls=True) # OCR 수행 (이미지를 NumPy 배열로 변환 후 처리)
return text_cvt_orc_format_paddle(result) # 결과를 변환하여 반환
# POST 요청을 처리하는 엔드포인트 정의 (/ocr/)
@app.post("/ocr/")
async def read_image(image_data: ImageData):
image_bytes = base64.b64decode(image_data.img_bytes) # Base64로 인코딩된 이미지를 디코딩하여 바이트 데이터로 변환
results = ocr_results(image_bytes) # OCR 처리 수행
# 사용하지 않는 변수 삭제 및 가비지 컬렉터 실행 (메모리 관리)
del image_bytes
gc.collect()
return {"results": results} # OCR 결과 반환
# FastAPI 애플리케이션 실행 (uvicorn 서버 사용)
if __name__ == "__main__":
import uvicorn # ASGI 서버를 실행하기 위한 Uvicorn 모듈
uvicorn.run(app, host="127.0.0.1", port=8000) # 로컬 호스트(127.0.0.1)에서 포트 번호 8000으로 서버 실행
query_perplexica.py
import requests # HTTP 요청을 보내기 위한 requests 라이브러리
import toml # TOML 형식의 설정 파일을 읽고 쓰기 위한 라이브러리
import os # 운영 체제 관련 기능(파일 경로 등)을 사용하기 위한 라이브러리
# 현재 파일의 디렉토리 경로를 가져옵니다.
current_path = os.path.dirname(os.path.abspath(__file__))
# 현재 디렉토리의 부모의 부모 디렉토리 경로를 가져옵니다.
parent_path = os.path.dirname(os.path.dirname(os.path.dirname(current_path)))
# Perplexica API에 쿼리를 보내는 함수
def query_to_perplexica(query):
# Perplexica 설정 파일에서 포트를 로드합니다.
with open(os.path.join(parent_path, "Perplexica", "config.toml"), "r") as f:
data = toml.load(f) # TOML 파일을 읽어옵니다.
port = data["GENERAL"]["PORT"] # 설정 파일에서 포트 정보를 가져옵니다.
assert port, "You should set valid port in the config.toml" # 포트가 유효하지 않으면 예외를 발생시킵니다.
# API 요청 URL을 설정합니다.
url = f"http://localhost:{port}/api/search"
# 요청 메시지를 생성합니다.
message = {"focusMode": "webSearch", "query": query, "history": [["human", query]]}
# POST 요청을 보냅니다.
response = requests.post(url, json=message)
# 응답 상태 코드가 200이면 결과를 반환합니다.
if response.status_code == 200:
return response.json()["message"]
elif response.status_code == 400: # 요청이 잘못된 경우 예외 발생
raise ValueError(
"The request is malformed or missing required fields, such as FocusModel or query"
)
else: # 서버 내부 오류 발생 시 예외 발생
raise ValueError("Internal Server Error")
# 테스트 코드
if __name__ == "__main__":
query = "What is Agent S?" # 테스트할 쿼리를 정의합니다.
response = query_to_perplexica(query) # Perplexica API를 호출하여 응답을 받습니다.
print(response) # 응답 내용을 출력합니다.