출장 자동화 시스템

Agent-S cli_app.py, common_utils.py, ocr_server.py, query_perplexica.py, procedural_memory.py

myeongjaechoi 2025. 3. 30. 14:58
cli_app.py
import argparse  # 명령줄 인수를 처리하기 위한 argparse 모듈을 가져옵니다.
import datetime  # 날짜와 시간을 처리하기 위한 datetime 모듈을 가져옵니다.
import io  # 파일 입출력을 위한 io 모듈을 가져옵니다.
import logging  # 로그 기록을 관리하기 위한 logging 모듈을 가져옵니다.
import os  # 운영 체제 관련 기능을 사용하기 위한 os 모듈을 가져옵니다.
import platform  # 운영 체제 정보를 확인하기 위한 platform 모듈을 가져옵니다.
import pyautogui  # 화면 캡처 및 GUI 자동화를 위한 pyautogui 모듈을 가져옵니다.
import sys  # 시스템 관련 기능을 사용하기 위한 sys 모듈을 가져옵니다.
import time  # 시간 관련 기능을 사용하기 위한 time 모듈을 가져옵니다.

from PIL import Image  # 이미지 처리 및 변환을 위한 Python Imaging Library(PIL)의 Image 클래스 가져오기.

# 현재 운영 체제 정보를 확인하고 플랫폼 이름을 설정합니다.
if platform.system() == "Darwin":
    current_platform = "macos"  # macOS인 경우 플랫폼 이름 설정
elif platform.system() == "Linux":
    current_platform = "ubuntu"  # Linux인 경우 플랫폼 이름 설정
elif platform.system() == "Windows":
    current_platform = "windows"  # Windows인 경우 플랫폼 이름 설정
else:
    raise ValueError("Unsupported platform")  # 지원되지 않는 플랫폼일 경우 예외 발생

# 외부 모듈에서 필요한 에이전트 클래스들을 가져옵니다.
from gui_agents.s2.agents.grounding import OSWorldACI
from gui_agents.s2.agents.agent_s import GraphSearchAgent

# 로거를 설정합니다.
logger = logging.getLogger()  # 기본 로거 객체를 생성합니다.
logger.setLevel(logging.DEBUG)  # 로깅 레벨을 DEBUG로 설정합니다.

# 현재 날짜와 시간을 문자열로 포맷합니다.
datetime_str: str = datetime.datetime.now().strftime("%Y%m%d@%H%M%S")

# 로그 디렉터리를 생성합니다(존재하지 않으면 생성).
log_dir = "logs"
os.makedirs(log_dir, exist_ok=True)

# 여러 종류의 로그 핸들러를 설정합니다.
file_handler = logging.FileHandler(
    os.path.join("logs", "normal-{:}.log".format(datetime_str)), encoding="utf-8"
)  # 일반 로그 파일 핸들러
debug_handler = logging.FileHandler(
    os.path.join("logs", "debug-{:}.log".format(datetime_str)), encoding="utf-8"
)  # 디버그 로그 파일 핸들러
stdout_handler = logging.StreamHandler(sys.stdout)  # 콘솔 출력 핸들러
sdebug_handler = logging.FileHandler(
    os.path.join("logs", "sdebug-{:}.log".format(datetime_str)), encoding="utf-8"
)  # 상세 디버그 로그 파일 핸들러

# 각 핸들러의 로깅 레벨을 설정합니다.
file_handler.setLevel(logging.INFO)
debug_handler.setLevel(logging.DEBUG)
stdout_handler.setLevel(logging.INFO)
sdebug_handler.setLevel(logging.DEBUG)

# 로그 메시지 포맷터를 정의하고 각 핸들러에 적용합니다.
formatter = logging.Formatter(
    fmt="\x1b[1;33m[%(asctime)s \x1b[31m%(levelname)s \x1b[32m%(module)s/%(lineno)d-%(processName)s\x1b[1;33m] \x1b[0m%(message)s"
)
file_handler.setFormatter(formatter)
debug_handler.setFormatter(formatter)
stdout_handler.setFormatter(formatter)
sdebug_handler.setFormatter(formatter)

# 특정 필터를 추가하여 환경에 따라 로그를 필터링합니다.
stdout_handler.addFilter(logging.Filter("desktopenv"))
sdebug_handler.addFilter(logging.Filter("desktopenv"))

# 로거에 핸들러를 추가합니다.
logger.addHandler(file_handler)
logger.addHandler(debug_handler)
logger.addHandler(stdout_handler)
logger.addHandler(sdebug_handler)

platform_os = platform.system()  # 현재 운영 체제 정보를 확인합니다.

# 플랫폼별로 권한 확인 대화 상자를 표시하는 함수입니다.
def show_permission_dialog(code: str, action_description: str):
    """플랫폼별 권한 확인 대화 상자를 표시하고 승인 여부를 반환합니다."""
    if platform.system() == "Darwin":
        result = os.system(
            f'osascript -e \'display dialog "Do you want to execute this action?\n\n{code} which will try to {action_description}" with title "Action Permission" buttons {{"Cancel", "OK"}} default button "OK" cancel button "Cancel"\''
        )
        return result == 0
    elif platform.system() == "Linux":
        result = os.system(
            f'zenity --question --title="Action Permission" --text="Do you want to execute this action?\n\n{code}" --width=400 --height=200'
        )
        return result == 0
    return False

# 화면 크기를 특정 최대 크기에 맞게 조정하는 함수입니다.
def scale_screen_dimensions(width: int, height: int, max_dim_size: int):
    scale_factor = min(max_dim_size / width, max_dim_size / height, 1)  # 스케일 비율 계산
    safe_width = int(width * scale_factor)  # 조정된 너비 계산
    safe_height = int(height * scale_factor)  # 조정된 높이 계산
    return safe_width, safe_height

# 에이전트를 실행하는 함수입니다.
def run_agent(agent, instruction: str, scaled_width: int, scaled_height: int):
    obs = {}  # 관찰 데이터를 저장할 딕셔너리 초기화
    traj = "Task:\n" + instruction  # 작업 설명 초기화
    subtask_traj = ""  # 하위 작업 설명 초기화
    for _ in range(15):  # 최대 반복 횟수는 15번입니다.
        screenshot = pyautogui.screenshot()  # 화면 캡처 수행
        screenshot = screenshot.resize((scaled_width, scaled_height), Image.LANCZOS)  # 캡처 이미지를 크기 조정

        buffered = io.BytesIO()  # 메모리 버퍼 생성
        screenshot.save(buffered, format="PNG")  # 캡처 이미지를 PNG 형식으로 저장

        screenshot_bytes = buffered.getvalue()  # 캡처 이미지의 바이트 데이터를 가져옴
        obs["screenshot"] = screenshot_bytes

        info, code = agent.predict(instruction=instruction, observation=obs)  # 에이전트에게 다음 행동 예측 요청

        if "done" in code[0].lower() or "fail" in code[0].lower():
            if platform.system() == "Darwin":
                os.system(
                    f'osascript -e \'display dialog "Task Completed" with title "OpenACI Agent" buttons "OK" default button "OK"\''
                )
            elif platform.system() == "Linux":
                os.system(
                    f'zenity --info --title="OpenACI Agent" --text="Task Completed" --width=200 --height=100'
                )

            agent.update_narrative_memory(traj)  # 작업 내러티브 메모리 업데이트
            break

        if "next" in code[0].lower():
            continue

        if "wait" in code[0].lower():
            time.sleep(5)  # 대기 시간 설정 후 계속 진행
            continue

        else:
            time.sleep(1.0)
            print("EXECUTING CODE:", code[0])  # 실행 중인 코드 출력

            exec(code[0])  # 코드 실행
            time.sleep(1.0)

            traj += (
                "\n\nReflection:\n"
                + str(info["reflection"])
                + "\n\n———————————\n\nPlan:\n"
                + info["executor_plan"]
            )  
            subtask_traj = agent.update_episodic_memory(info, subtask_traj)

# 프로그램의 진입점(main 함수)을 정의합니다.
def main():
    parser = argparse.ArgumentParser(
        description="Run GraphSearchAgent with specified model."
    )
common_utils.py
import json  # JSON 데이터를 처리하기 위한 모듈
import re  # 정규표현식 처리를 위한 모듈
from typing import List  # 타입 힌트로 리스트를 사용하기 위한 모듈
import time  # 시간 관련 기능을 사용하기 위한 모듈
import tiktoken  # OpenAI 모델의 토큰화를 처리하기 위한 모듈

from typing import Tuple, List, Union, Dict  # 다양한 데이터 타입 힌트를 제공하기 위한 모듈

from pydantic import BaseModel, ValidationError  # 데이터 유효성 검사를 위한 Pydantic의 기본 모델 및 예외 처리 클래스

import pickle  # 객체를 파일로 저장하거나 로드하기 위한 모듈

# 노드(Node) 클래스 정의 (Pydantic을 사용하여 데이터 구조를 검증)
class Node(BaseModel):
    name: str  # 노드의 이름
    info: str  # 노드에 대한 정보

# DAG(Directed Acyclic Graph) 클래스 정의 (노드와 엣지로 구성)
class Dag(BaseModel):
    nodes: List[Node]  # DAG의 노드 리스트
    edges: List[List[Node]]  # DAG의 엣지 리스트 (노드 간 연결)

NUM_IMAGE_TOKEN = 1105  # OpenAI Vision에서 사용하는 화면 크기(1920x1080)에 따른 이미지 토큰 수 설정

# 안전하게 LLM(Large Language Model)을 호출하는 함수
def call_llm_safe(agent) -> Union[str, Dag]:
    max_retries = 3  # 최대 재시도 횟수 설정
    attempt = 0  # 현재 시도 횟수를 초기화
    response = ""  # 응답 초기화
    while attempt < max_retries:  # 최대 재시도 횟수까지 반복
        try:
            response = agent.get_response()  # 에이전트에서 응답을 가져옴
            break  # 성공하면 반복 종료
        except Exception as e:  # 예외 발생 시 처리
            attempt += 1
            print(f"Attempt {attempt} failed: {e}")  # 실패 메시지 출력
            if attempt == max_retries:  
                print("Max retries reached. Handling failure.")  # 최대 재시도 도달 시 메시지 출력
        time.sleep(1.0)  # 재시도 전 대기 시간 설정
    return response

# 메시지에서 입력 및 출력 토큰 수를 계산하는 함수
def calculate_tokens(messages, num_image_token=NUM_IMAGE_TOKEN) -> Tuple[int, int]:
    num_input_images = 0  # 입력 이미지 수 초기화
    output_message = messages[-1]  # 마지막 메시지를 출력 메시지로 설정

    input_message = messages[:-1]  # 나머지 메시지는 입력 메시지로 설정

    input_string = """"""  
    for message in input_message:  
        input_string += message["content"][0]["text"] + "\n"  # 입력 텍스트를 하나의 문자열로 결합
        if len(message["content"]) > 1:
            num_input_images += 1  

    input_text_tokens = get_input_token_length(input_string)  

    input_image_tokens = num_image_token * num_input_images  

    output_tokens = get_input_token_length(output_message["content"][0]["text"])  

    return (input_text_tokens + input_image_tokens), output_tokens  

# 텍스트에서 JSON 형식의 DAG를 파싱하는 함수
def parse_dag(text):
    pattern = r"<json>(.*?)</json>"  
    match = re.search(pattern, text, re.DOTALL)  
    if match:
        json_str = match.group(1)  
        try:
            json_data = json.loads(json_str)  
            return Dag(**json_data["dag"])  
        except json.JSONDecodeError:
            print("Error: Invalid JSON")  
            return None  
        except KeyError:
            print("Error: 'dag' key not found in JSON")  
            return None  
        except ValidationError as e:
            print(f"Error: Invalid data structure - {e}")  
            return None  
    else:
        print("Error: JSON not found")  
        return None  

# 문자열에서 코드 블록을 추출하는 함수
def parse_single_code_from_string(input_string):
    input_string = input_string.strip()  
    if input_string.strip() in ["WAIT", "DONE", "FAIL"]:  
        return input_string.strip()  

    pattern = r"``````"  
    matches = re.findall(pattern, input_string, re.DOTALL)  

    codes = []  

    for match in matches:
        match = match.strip()  
        commands = [
            "WAIT",
            "DONE",
            "FAIL",
        ]  

        if match in commands:
            codes.append(match.strip())  
        elif match.split("\n")[-1] in commands:
            if len(match.split("\n")) > 1:
                codes.append("\n".join(match.split("\n")[:-1])) 
            codes.append(match.split("\n")[-1]) 
        else:
            codes.append(match) 

    if len(codes) <= 0:
        return "fail" 
    return codes[0] 

# 입력 문자열의 토큰 길이를 계산하는 함수
def get_input_token_length(input_string):
    enc = tiktoken.encoding_for_model("gpt-4") 
    tokens = enc.encode(input_string) 
    return len(tokens) 

# 코드 문자열을 정리하는 함수 (특정 패턴을 수정)
def sanitize_code(code):
    if "\n" in code: 
        pattern = r'(".*?")' 
        matches = re.findall(pattern, code, flags=re.DOTALL) 
        if matches:
            first_match = matches[0] 
            code = code.replace(first_match, f'"""{first_match[1:-1]}"""', 1) 
    return code 

# 코드 문자열에서 첫 번째 에이전트 함수 호출을 추출하는 함수
def extract_first_agent_function(code_string):
    pattern = r'agent\.[a-zA-Z_]+\((?:[^()\'"]|\'[^\']*\'|"[^"]*")*\)' 
    matches = re.findall(pattern, code_string) 
    return matches[0] if matches else None 

# 지식 베이스를 로드하는 함수 (JSON 형식)
def load_knowledge_base(kb_path: str) -> Dict:
    try:
        with open(kb_path, "r") as f: 
            return json.load(f) 
    except Exception as e:
        print(f"Error loading knowledge base: {e}") 
        return {} 

# 임베딩 데이터를 로드하는 함수 (Pickle 형식)
def load_embeddings(embeddings_path: str) -> Dict:
    try:
        with open(embeddings_path, "rb") as f: 
            return pickle.load(f) 
    except Exception as e:
        print(f"Error loading embeddings: {e}") 
        return {} 

# 임베딩 데이터를 저장하는 함수 (Pickle 형식)
def save_embeddings(embeddings_path: str, embeddings: Dict):
    try:
        with open(embeddings_path, "wb") as f: 
            pickle.dump(embeddings, f) 
    except Exception as e:
        print(f"Error saving embeddings: {e}")
ocr_server.py
from fastapi import FastAPI  # FastAPI를 사용하여 웹 애플리케이션을 생성하기 위한 모듈
from pydantic import BaseModel  # 데이터 유효성 검사를 위한 Pydantic의 기본 모델
from PIL import Image  # 이미지 처리를 위한 Python Imaging Library (Pillow)
import io  # 바이트 데이터 처리를 위한 모듈
import numpy as np  # 배열 및 행렬 연산을 위한 NumPy 라이브러리
from paddleocr import PaddleOCR  # OCR(문자 인식)을 수행하기 위한 PaddleOCR 라이브러리
import base64  # Base64 인코딩 및 디코딩을 위한 모듈
import gc  # 가비지 컬렉션(메모리 관리)을 위한 모듈

# FastAPI 애플리케이션 생성
app = FastAPI()

# PaddleOCR 객체 초기화 (영어 언어 모델과 각도 분류 활성화)
ocr_module = PaddleOCR(use_angle_cls=True, lang="en")


# 이미지 데이터를 표현하기 위한 Pydantic 모델 정의
class ImageData(BaseModel):
    img_bytes: bytes  # Base64로 인코딩된 이미지 데이터를 바이트로 받음


# PaddleOCR 결과를 특정 형식으로 변환하는 함수
def text_cvt_orc_format_paddle(paddle_result):
    texts = []  # 결과를 저장할 리스트 초기화
    print("paddle_result: ", paddle_result)  # PaddleOCR 결과 출력 (디버깅용)
    for i, line in enumerate(paddle_result[0]):  # OCR 결과에서 각 라인을 순회
        points = np.array(line[0])  # 텍스트 위치 좌표를 NumPy 배열로 변환
        print("points: ", points)  # 좌표 출력 (디버깅용)
        location = {  
            "left": int(min(points[:, 0])),  # 왼쪽 경계값 계산
            "top": int(min(points[:, 1])),  # 위쪽 경계값 계산
            "right": int(max(points[:, 0])),  # 오른쪽 경계값 계산
            "bottom": int(max(points[:, 1])),  # 아래쪽 경계값 계산
        }
        print("location: ", location)  # 계산된 위치 출력 (디버깅용)
        content = line[1][0]  # OCR로 인식된 텍스트 내용 추출
        texts.append((i, content, location))  # 결과 리스트에 텍스트와 위치 추가
    return texts


# OCR 처리를 수행하는 함수
def ocr_results(screenshot):
    screenshot_img = Image.open(io.BytesIO(screenshot))  # 바이트 데이터를 이미지로 변환
    result = ocr_module.ocr(np.array(screenshot_img), cls=True)  # OCR 수행 (이미지를 NumPy 배열로 변환 후 처리)
    return text_cvt_orc_format_paddle(result)  # 결과를 변환하여 반환


# POST 요청을 처리하는 엔드포인트 정의 (/ocr/)
@app.post("/ocr/")
async def read_image(image_data: ImageData):
    image_bytes = base64.b64decode(image_data.img_bytes)  # Base64로 인코딩된 이미지를 디코딩하여 바이트 데이터로 변환
    results = ocr_results(image_bytes)  # OCR 처리 수행

    # 사용하지 않는 변수 삭제 및 가비지 컬렉터 실행 (메모리 관리)
    del image_bytes  
    gc.collect()  

    return {"results": results}  # OCR 결과 반환


# FastAPI 애플리케이션 실행 (uvicorn 서버 사용)
if __name__ == "__main__":
    import uvicorn  # ASGI 서버를 실행하기 위한 Uvicorn 모듈

    uvicorn.run(app, host="127.0.0.1", port=8000)  # 로컬 호스트(127.0.0.1)에서 포트 번호 8000으로 서버 실행
query_perplexica.py
import requests  # HTTP 요청을 보내기 위한 requests 라이브러리
import toml  # TOML 형식의 설정 파일을 읽고 쓰기 위한 라이브러리
import os  # 운영 체제 관련 기능(파일 경로 등)을 사용하기 위한 라이브러리

# 현재 파일의 디렉토리 경로를 가져옵니다.
current_path = os.path.dirname(os.path.abspath(__file__))
# 현재 디렉토리의 부모의 부모 디렉토리 경로를 가져옵니다.
parent_path = os.path.dirname(os.path.dirname(os.path.dirname(current_path)))


# Perplexica API에 쿼리를 보내는 함수
def query_to_perplexica(query):
    # Perplexica 설정 파일에서 포트를 로드합니다.
    with open(os.path.join(parent_path, "Perplexica", "config.toml"), "r") as f:
        data = toml.load(f)  # TOML 파일을 읽어옵니다.
    port = data["GENERAL"]["PORT"]  # 설정 파일에서 포트 정보를 가져옵니다.
    assert port, "You should set valid port in the config.toml"  # 포트가 유효하지 않으면 예외를 발생시킵니다.
    
    # API 요청 URL을 설정합니다.
    url = f"http://localhost:{port}/api/search"
    
    # 요청 메시지를 생성합니다.
    message = {"focusMode": "webSearch", "query": query, "history": [["human", query]]}

    # POST 요청을 보냅니다.
    response = requests.post(url, json=message)

    # 응답 상태 코드가 200이면 결과를 반환합니다.
    if response.status_code == 200:
        return response.json()["message"]
    elif response.status_code == 400:  # 요청이 잘못된 경우 예외 발생
        raise ValueError(
            "The request is malformed or missing required fields, such as FocusModel or query"
        )
    else:  # 서버 내부 오류 발생 시 예외 발생
        raise ValueError("Internal Server Error")


# 테스트 코드
if __name__ == "__main__":
    query = "What is Agent S?"  # 테스트할 쿼리를 정의합니다.
    response = query_to_perplexica(query)  # Perplexica API를 호출하여 응답을 받습니다.
    print(response)  # 응답 내용을 출력합니다.