Python - Langchain처럼 파이프연산자 사용하기

JunMyung Lee·2025년 4월 1일
0

파이썬

목록 보기
5/5

Langchain에서는 각각의 Chain을 | 기호로 연결하여 체인 파이프라인을 구성하고 있습니다.
처음에는 이것이 Python에서 기본적으로 지원하는 기능인지 궁금하여 찾아보게 되었습니다.

Python 문법인가요?

| 연산자는 Python의 문법이며, 다만 특정 버전(Python 3.10 이상)에서 도입된 기능입니다.
LangChain에서 | 기호를 사용하는 이유는 LCEL(LangChain Expression Language)이라는 문법을 통해 체인을 함수처럼 이어주기 위함인데요,
이는 Python 3.10 이상에서 도입된 __or__ 메서드 오버라이딩 기능을 활용한 것이라고 합니다.

예시:

from langchain_core.runnables import RunnableLambda

chain1 = RunnableLambda(lambda x: x + 1)
chain2 = RunnableLambda(lambda x: x * 2)

combined = chain1 | chain2
print(combined.invoke(3))  # (3 + 1) * 2 = 8

즉, | 연산자는 파이프라인처럼 체인을 직렬로 연결하기 위한 문법이며, LangChain에서는 이를 오버라이딩하여 사용하고 있습니다.

해당 방법을 활용하여 여러 클래스의 순차적인 실행을 해보겠습니다

LangChain이 아니더라도, 추후 이러한 방식으로 코드를 작성하여 순차적인 체인 파이프라인을 구성하면 좋을 것 같아 예제로 표현해 보았습니다.

Runnable이라는 추상 클래스를 만들고, | 연산자와 .invoke() 메서드를 지원하도록 확장한 예제입니다.

from abc import ABC, abstractmethod

# 추상 베이스 클래스 정의
class Runnable(ABC):
    @abstractmethod
    def invoke(self, input):
        pass

    def __or__(self, other):
        if not isinstance(other, Runnable):
            raise TypeError("오른쪽 인자는 Runnable이어야 합니다.")
        return ChainedRunnable(self, other)

# 체인 연결용 클래스
class ChainedRunnable(Runnable):
    def __init__(self, first: Runnable, second: Runnable):
        self.first = first
        self.second = second

    def invoke(self, input):
        intermediate = self.first.invoke(input)
        return self.second.invoke(intermediate)

# 실제 동작 클래스 예시
class Double(Runnable):
    def invoke(self, input):
        return input * 2

class AddThree(Runnable):
    def invoke(self, input):
        return input + 3

class Square(Runnable):
    def invoke(self, input):
        return input ** 2
pipeline = Double() | AddThree() | Square()
result = pipeline.invoke(4)  # ((4 * 2) + 3) ** 2 = 121
print(result)
profile
11년차 검색개발자 입니다. 여러 지식과 함께 실제 서비스를 운영 하면서 발생한 이슈에 대해서 정리하고 공유하고자 합니다.

0개의 댓글