Apollo Server Subscription

pluviabc1·2021년 9월 20일
1

Subscription은 서버 측 이벤트가 발생할 때마다 결과를 업데이트할 수 있는 GraphQL 읽기 작업입니다. 일반적으로 업데이트된 결과는 서버에서 subscribing 클라이언트로 푸쉬됩니다. 예를 들어, 채팅 어플리케이션의 서버가 특정 채팅방에서 모든 클라이언트에 새롭게 받은 메세지를 푸쉬하도록 subscription을 사용할 수 있습니다.

subscription 업데이트는 일반적으로 서버에 의해 푸시 되므로 (클라이언트에서 poll 하는 대신) HTTP 대신 WebSocket 프로토콜을 사용합니다.

참고
query 와 mutation 에 비해 subscription은 구현하기가 훨씬 더 복잡합니다. 따라서 시작하기 전 사용 사례에서 subscription이 필요한지 확인하시길 바랍니다.

스키마 정의

스키마의 Subscrption 유형은 클라이언트가 등록할 수 있는 최상위 필드를 정의합니다.

type Subscription {
  postCreated: Post
}

예제의 PostCreated 필드는 백엔드에 새 게시물이 작성될 때마다 해당 값을 업데이트하여 subscribing 클라이언트에 Post를 푸시합니다.

subscription PostFeed {
  postCreated {
    author
    comment
  }
}

각 subscription 작업은 Subscription 유형의 필드 하나만 subscribe할 수 있습니다.

subscription 사용하기

Express 앱과 별도의 subscription 서버 둘 다 실행하기 위해서 두 개를 효과적으로 감싸는 http.Server를 생성하며 이는 새로운 listener가 됩니다. 또한 기존 코드에서 몇 가지 변경하여 SubscriptionSever를 생성합니다.

  1. subscriptions-transport-ws 과 @graphql-tools/schema 설치
npm install subscriptions-transport-ws @graphql-tools/schema
  1. ApolloServer 인스턴스를 초기화하는 파일을 import 문을 추가합니다.
import { createServer } from 'http';
import { execute, subscribe } from 'graphql';
import { SubscriptionServer } from 'subscriptions-transport-ws';
import { makeExecutableSchema } from '@graphql-tools/schema';
  1. 다음으로 HTTP와 WebSocket 서버를 모두 설정하기 위해서 http.Server를 생성해야 합니다. Express 앱을 http 모듈에 가져온 createServer 함수를 전달하여 이 작업을 수행합니다.
// This `app` is the returned value from `express()`.
const httpServer = createServer(app);
  1. GraphQLSchema의 인스턴스를 생성합니다.

    ApolloServer 생성자에 schema 옵션을 이미 전달했다면 해당 과정은 넘어가도 됩니다.

SubscriptionServer는(다음에 인스턴스화 할 부분)는 typeDefs 및 resolvers 옵션을 사용하지 않습니다. 대신에 실행 가능한 GraphQLSchema 가 필요합니다.SubscriptionServer와 ApolloServer 모두 해당 스키마 객체로 전달할 수 있습니다. 이렇게 하면 동일한 스키마가 모두 사용되고 있는지 확인할 수 있습니다.

const schema = makeExecutableSchema({ typeDefs, resolvers });
// ...
const server = new ApolloServer({
  schema,
});
  1. SubscriptionServer를 생성합니다.
const subscriptionServer = SubscriptionServer.create({
   // This is the `schema` we just created.
   schema,
   // These are imported from `graphql`.
   execute,
   subscribe,
}, {
   // This is the `httpServer` we created in a previous step.
   server: httpServer,
   // This `server` is the instance returned from `new ApolloServer`.
   path: server.graphqlPath,
});
  1. SubscriptionServer 를 닫기 위해서 ApolloServer 생성자에 플러그인을 추가합니다.
const server = new ApolloServer({
  schema,
  plugins: [{
    async serverWillStart() {
      return {
        async drainServer() {
          subscriptionServer.close();
        }
      };
    }
  }],
});
  1. 마지막으로, 기존 listen 호출을 조정할 필요가 있습니다.

대부분의 Express 어플리케이션은 app.listen(...)을 호출합니다. 같은 인자와 함께 httpServer.listen(...)으로 변경합니다. 이렇게 하면 서버는 HTTP와 WebSocket 전송에서 동시 수신을 시작합니다.

subscriptions 마이그레이션의 완료된 예는 다음과 같습니다.

import { createServer } from "http";
import { execute, subscribe } from "graphql";
import { SubscriptionServer } from "subscriptions-transport-ws";
import { makeExecutableSchema } from "@graphql-tools/schema";
import express from "express";
import { ApolloServer } from "apollo-server-express";
import resolvers from "./resolvers";
import typeDefs from "./typeDefs";

(async function () {
  const app = express();

  const httpServer = createServer(app);

  const schema = makeExecutableSchema({
    typeDefs,
    resolvers,
  });

  const subscriptionServer = SubscriptionServer.create(
    { schema, execute, subscribe },
    { server: httpServer, path: server.graphqlPath }
  );

  const server = new ApolloServer({
    schema,
    plugins: [{
      async serverWillStart() {
        return {
          async drainServer() {
            subscriptionServer.close();
          }
        };
      }
    }],
  });
  await server.start();
  server.applyMiddleware({ app });

  const PORT = 4000;
  httpServer.listen(PORT, () =>
    console.log(`Server is now running on http://localhost:${PORT}/graphql`)
  );
})();

Resolving a subscription

Subscription 필드에 대한 resolver는 다른 타입의 필드에 대한 resolver와 다릅니다. 특히, Subscription 필드 resolver는 Subscribe 함수를 정의하는 객체입니다.

const resolvers = {
  Subscription: {
    postCreated: {
      // More on pubsub below
      subscribe: () => pubsub.asyncIterator(['POST_CREATED']),
    },
  },
  // ...other resolvers...
};

subscribe 함수는 비동기 결과를 반복하는 표준 인터페이스인 AsyncIterator 유형의 개체를 반환해야 합니다. 예에서 AsyncIterator는 pubsub.asyncIterator에 의해 생성됩니다(아래의 자세한 내용)

PubSub 클래스

PubSub 클래스는 단일 서버 인스턴스만 지원하는 in-memory 이벤트 시스템이기 때문에 프로덕션 환경에서는 권장되지 않습니다. 개발 중인 subscriptions을 받은 후에는 추상 PubSubEngine 클래스의 다른 하위 클래스로 전환하는 것이 좋습니다. 권장 하위 클래스는 Production PubSub 라이브러리에 나열됩니다.

Apollo Server는 publish-subscribe (pub/sub) 모델을 사용하여 활성 subscriptions을 업데이트하는 이벤트를 추적합니다. graphql-subscriptions 라이브러리는 PubSub 클래스를 기본 in-memory 이벤트 버스로 제공하여 시작할 수 있도록 도와줍니다.

graphql-subscriptions 패키지를 사용하려면 먼저 다음과 같이 설치해야 합니다.

npm install graphql-subscriptions

PubSub 인스턴스를 사용하면 서버 코드가 이벤트를 특정 레이블에 배포하고 특정 레이블과 연결된 이벤트를 listen할 수 있습니다. 다음과 같은 PubSub 인스턴스를 만들 수 있습니다.

import { PubSub } from 'graphql-subscriptions';

const pubsub = new PubSub();

이벤트 배포

PubSub 인스턴스의 publish 메서드와 함께 이벤트를 배포할 수 있습니다.

pubsub.publish('POST_CREATED', {
  postCreated: {
    author: 'Ali Baba',
    comment: 'Open sesame'
  }
});
  • 첫 번째 매개 변수는 문자열로 배포할 이벤트 레이블의 이름입니다.
    - 레이블에 배포하기 전에 레이블 이름을 등록할 필요는 없습니다.
  • 두 번째 매개 변수는 이벤트와 관련된 payload입니다.
    - payload에는 resolvers가 관련 Subscription 필드와 하위 필드를 채우는 데 필요한 데이터가 포함되어야 합니다.

GraphQL subscriptions 관련 작업 시 subscriptions의 반환 값을 업데이트해야 할 때마다 이벤트를 게시합니다. 이러한 업데이트의 일반적인 원인 중 하나는 mutation 이지만 백엔드 로직은 배포해야 하는 변경 사항을 초래할 수 있습니다.

예를 들어, GraphQL API가 createPost mutation을 지원한다고 가정해 봅시다.

type Mutation {
  createPost(author: String, comment: String): Post
}

createPost에 대한 기본 resolver는 아래와 같습니다.

const resolvers = {
  Mutation: {

    createPost(parent, args, context) {
      // Datastore logic lives in postController
      return postController.createPost(args);
    },
  },
  // ...other resolvers...
};

새 게시물의 세부 정보를 데이터 저장소에 유지하기 전에 다음과 같은 세부 정보가 포함된 이벤트를 배포할 수 있습니다.

const resolvers = {
  Mutation: {
    createPost(parent, args, context) {

      pubsub.publish('POST_CREATED', { postCreated: args });
      return postController.createPost(args);
    },
  },
  // ...other resolvers...
};

그런 다음 Subscription 필드의 resolver에서 이 이벤트를 listen할 수 있습니다.

이벤트 listen

AsyncIterator 개체는 특정 레이블(또는 레이블 집합)과 연결된 이벤트를 수신하고 처리를 위해 큐에 추가합니다. PubSub의 asyncIterator 메서드를 호출하여 AsyncIterator 생성합니다.

pubsub.asyncIterator(['POST_CREATED']);

이 메서드는 AsyncIterator가 수신 대기해야 하는 모든 이벤트 레이블의 이름을 포함하는 배열을 전달합니다.

모든 Subscription 필드 resolver의 subscribe 함수는 AsyncIterator 개체를 반환해야 합니다. 그러면 subscription resolving에 대한 맨 위에 있는 코드 샘플로 돌아갑니다.

const resolvers = {
  Subscription: {
    postCreated: {
      subscribe: () => pubsub.asyncIterator(['POST_CREATED']),
    },
  },
  // ...other resolvers...
};

이 subscribe 함수 집합을 사용하여 Apollo Server는 POST_CREATE 이벤트의 payload를 사용하여 postCreated 필드에 대한 업데이트된 값을 푸시합니다.

이벤트 필터링

때때로 클라이언트는 해당 데이터가 특정 기준을 충족하는 경우에만 업데이트된 subscription 데이터를 수신해야 합니다. 이를 지원하기 위해 구독 필드의 해결 프로그램에서 withFilter 도우미 기능을 호출할 수 있습니다.

예제

이 구독은 주석이 지정된 코드 저장소에 추가될 때마다 클라이언트에 통보하는 서버가 commentAdded subscription을 제공한다고 가정해 봅시다. 클라이언트는 다음과 같은 subscription을 실행할 수 있습니다.

subscription($repoName: String!){
 commentAdded(repoFullName: $repoName) {
   id
   content
 }
}

이로 인해 잠재적인 문제가 발생할 수 있습니다. 당사의 서버는 주석이 저장소에 추가될 때마다 Comment_Added 이벤트를 게시할 수 있습니다. 즉, commentAdded resolver 추가된 리포지토리에 관계없이 모든 주석에 대해 실행됩니다. 따라서 subscribing 클라이언트는 원하지 않거나 액세스 권한이 없어야 하는 데이터를 수신할 수 있습니다.

이 문제를 해결하려면 withFilter 도우미 기능을 사용하여 클라이언트별로 업데이트를 제어할 수 있습니다.

다음은 withFilter 함수를 사용하는 설명 추가의 예제 해결 방법입니다.

import { withFilter } from 'graphql-subscriptions';

const resolvers = {
  Subscription: {

    commentAdded: {
      subscribe: withFilter(
        () => pubsub.asyncIterator('COMMENT_ADDED'),
        (payload, variables) => {
          // Only push an update if the comment is on
          // the correct repository for this operation
          return (payload.commentAdded.repository_name === variables.repoFullName);
        },
      ),
    }
  },
    // ...other resolvers...
};

withFilter 함수는 두 가지 매개 변수를 사용합니다.

  • 첫 번째 매개 변수는 필터를 적용하지 않는 경우 subscribe에 사용할 수 있는 함수입니다.
  • 두 번째 매개 변수는 subscription 업데이트를 특정 클라이언트로 보내야 하는 경우 true를 반환하고 그렇지 않은 경우 false를 반환하는 필터 함수입니다(Promise도 허용됨). 이 함수는 자체적인 두 가지 매개 변수를 사용합니다.
    - payload는 배포된 이벤트의 페이로드입니다.
    - 변수는 클라이언트가 구독을 시작할 때 제공한 모든 인수를 포함하는 개체입니다.
    withFilter 사용하면 클라이언트가 원하는 subscription업데이트를 정확하게 받고 수신이 허용되는지 확인할 수 있습니다.

작업 컨텍스트

query 또는 mutation에 대한 컨텍스트를 초기화할 때 일반적으로 컨텍스트 함수에 제공된 req 개체에서 HTTP 헤더 및 기타 요청 메타데이터를 추출합니다.

subscriptions의 경우 SubscriptionServer 생성자에 onConnect 기능을 제공합니다. 이 함수는 각각 WebSocket과 ConnectionContext를 두 번째와 세 번째 인수로 수신하는 것뿐만 아니라 객체의 connectionParams를 첫 번째 인수로 수신한다. onConnect 함수가 개체를 반환하면 개체를 컨텍스트 인수로 resolvers에게 전달됩니다.

SubscriptionServer.create({
// ...other options
async onConnect(connectionParams) {
  if (connectionParams.authorization) {
    const currentUser = await findUser(connectionParams.authorization);
    return { currentUser };
  }
  throw new Error('Missing auth token!');
}
});

인증 토큰과 같은 메타데이터는 전송에 따라 다르게 전송되기 때문에 특히 중요합니다.

onConnect와 onDisconnect

subscription 요청이 연결(onConnect)되거나 연결 해제(onDisconnect)될 때마다 subscription 서버가 실행하는 기능을 정의할 수 있습니다.

onConnect 기능을 정의하면 다음과 같은 이점이 있습니다.

  • onConnect에서 false 반환하거나 예외를 발생시키거나 특정 수신 연결을 거부할 수 있습니다. 이 기능은 인증에 특히 유용합니다.
  • onConnect에서 개체를 반환하면 해당 개체가 작업 컨텍스트로 resolvers에게 전달됩니다.
    다음과 같은 함수 정의를 SubscriptionServer의 생성자 개체에 제공합니다.
SubscriptionServer.create({
  schema,
  execute,
  subscribe,

  onConnect(connectionParams, webSocket, context) {
    console.log('Connected!')
  },
  onDisconnect(webSocket, context) {
    console.log('Disconnected!')

  },
});

onConnect를 가진 인증 예제

클라이언트에서 SubscriptionClient는 첫 번째 WebSocket 메시지와 함께 전송될 connectionParams(예)에 정보 추가를 지원합니다. 서버에서 연결이 완전히 인증되고 onConnect 콜백이 truthy 값을 반환할 때까지 모든 GraphQL 구독이 지연됩니다.

SubscriptionClient가 다음과 같이 구성되어 있다고 가정해 보겠습니다.

new SubscriptionClient(subscriptionUrl, {
// ...other options
connectionParams: {
  authorization: clientToken,
},
})

SubscriptionServer의 onConnect 콜백에 있는 connectParams 인수에는 클라이언트에서 전달된 정보가 포함되어 있으며 사용자 자격 증명을 검증하는 데 사용할 수 있습니다.

그런 다음 onConnect 함수에서 개체를 반환할 수 있습니다. 개체를 실행하는 동안 컨텍스트 인수로 resolvers에 전달됩니다. 이 예에서는 클라이언트가 제공한 인증 토큰을 사용하여 관련 사용자를 찾아 해당 사용자를 resolver에게 전달할 수 있습니다.

async function findUser(authToken) {
  // find a user by auth token
};

SubscriptionServer.create({
  schema,
  execute,
  subscribe,
  async onConnect(connectionParams, webSocket) {
    if (connectionParams.authorization) {
      const currentUser = await findUser(connectionParams.authorization);
      return { currentUser };
    }
    throw new Error('Missing auth token!');
  },
  { server, path }
});

위의 예는 전송 시 첫 번째 초기화 메시지와 함께 전송되는 인증 토큰을 기반으로 사용자를 조회한 다음 사용자 개체를 Promise로 반환합니다. context.currentUser는 GraphQL resolver에서 context.currentUser로 사용할 수 있습니다.

인증 오류가 발생하면 약속이 거부되어 클라이언트의 연결이 차단됩니다.

PubSub 라이브러리 프로덕션

위에서 언급한 것처럼 PubSub 클래스는 이벤트 배포 시스템이 in-memory이기 때문에 프로덕션 환경에서는 권장되지 않습니다. 즉, GraphQL 서버의 한 인스턴스에서 게시된 이벤트는 다른 인스턴스에서 처리되는 구독에서 수신되지 않습니다.

대신 Redis 또는 Kafka와 같은 외부 데이터 저장소로 백업할 수 있는 PubSubEngine 추상 클래스의 하위 클래스를 사용해야 합니다.

다음은 널리 사용되는 event-publishing 시스템을 위해 커뮤니티에서 만든 PubSub 라이브러리입니다.

  • Redis
  • Google PubSub
  • MQTT enabled broker
  • RabbitMQ
  • Kafka
  • Postgres
  • Google Cloud Firestore
  • Ably Realtime

grahpql-ws 전송 라이브러리

이 페이지에서는 subscriptions-transport-ws라이브러리를 아폴로 서버에 연결하는 방법에 대해 설명합니다. 그러나 2021년 현재 이 라이브러리는 활발하게 유지되고 있지 않습니다. subscriptions-transport-ws의 새로운 대안인 graphql-ws는 현재 보다 적극적으로 유지 관리되고 있으며, README는 이 제품을 Apollo Server와 함께 사용하는 방법을 설명합니다.

두 라이브러리는 웹소켓을 통한 GraphQL 구독에 대해 서로 다른 프로토콜을 구현하므로 Graphql-ws를 지원하도록 클라이언트를 조정해야 합니다. Apollo Server 3 출시 당시 Graphql-ws 라이브러리에 사용된 프로토콜은 GraphQL Playground 또는 Apollo Explorer에서 아직 지원되지 않으므로 구독 라이브러리의 주요 권장 사항으로 아직 문서화하지 않았습니다. GraphQL Playground 또는 Apollo Explorer의 구독과 상호 작용할 수 있는 기능이 중요하지 않은 경우 subscriptions-transport-ws보다 graphql-ws를 사용하는 것을 선호할 수 있습니다.

1개의 댓글

comment-user-thumbnail
2022년 2월 3일

오랫동안 찾았는데 Push 알림 포스트까지 있네요 :D
너무 감사합니다!

답글 달기