[NodeJS] Stream

moongq·2020년 12월 30일
2

Node.js

목록 보기
1/1

https://nodejs.org/api/stream.html#stream_stream_pipeline_source_transforms_destination_callback / node.js doc를 번역한 글입니다

Stream

stream은 노드에서 스트리밍 데이터를 이용하기위한 추상 인터페이스입니다. 그리고 stream 모듈은 stream 인터페이스를 구현하기 위한 API를 제공해줍니다. 예를 들면, http server requestprocess.stdout 두가지 모두 스트림 객체입니다. 스트림은 읽을수 있고(readable), 쓸수 있으며(writable), 혹은 두가지 특성을 모두 가질 수 있습니다. 그리고 모든 스트림들은 'EventEmitter'의 인스턴스들입니다.

stream modulem에 불러오기.

const stream = require('stream');

스트림 모듈은 새로운 타입의 스트림 인스턴스를 만들때 유용합니다. 하지만 보통 스트림을 사용하기 위해 스트림 모듈을 사용할 필요는 없긴합니다. 이미 대부분이 스트림으로 구현되어 있어서 스트림을 따로 불러와서 만들어줄 필요가 없습니다.

Types of streams

노드에는 기본적으로 4가지의 기본적인 스트림 타입이 있습니다.

  • Writable: 데이터를 쓸수 있는 스트림 (예: fs.createWriteStream())
  • Readable: 데이터를 읽을수 있는 스트립 (예: fs.createReadStream())
  • Duplex: 읽고 쓸수 있는 스트림 (예: net.Socket)
  • Transform: 데이터를 쓰고 읽을때 데이터를 수정하거나 변환할 수 있는 스트림 (예: zlib.createDeflate())

Object mode

Node.js API로 생성된 모든 스트림은 문자열과 버퍼(또는 Uint8Array) 객체에서만 작동합니다. 만약 다른 형태의 데이터로 작동되길 원한다면 Object mode를 이용하시면 됩니다. 예를 들어 JavaScript 값들과 작업하고 싶다면 말이죠(object mode를 이용해도 null 값은 제외됩니다.)

Object mode를 이용하기 위해 스트림 인스턴스를 생성할 때 'object Mode' 옵션을 사용하세요. ! 기존 스트림을 객체 모드로 중간에 전환하는 것은 안전하지 않습니다. 꼭 생성할 때 정한 옵션으로 쭉 가세요 !

Buffering

Writable, Readable 스트림 모두 내부 버퍼에 데이터를 저장합니다. 내부 버퍼는 writable.writableBuffer, readable.readableBuffer을 통해 검색되어 집니다.

버퍼에 모일 데이터의 총 크기는 highWaterMark option에 의해 정해집니다. (highWaterMark는 stream 생성자에 의해 전해짐.)

stream.push(chunk)를 호출하면 데이터는 Readable stream에서 버퍼링됩니다. 만약 사용자가 stream.read()를 호출하지 않는다면 데이터가 소비되기 전까지 내부 큐에 저장됩니다.

내부 읽기 버퍼의 총 크기가 highWaterMark에서 설정한 임계값에 도달하면, 스트림은 현재 버퍼링 된 데이터를 사용할 수 있을 때까지 데이터 읽기를 일시적으로 중지합니다. (즉, 스트림은 읽기 버퍼를 채우는 데 사용되는 내부 Readable._read() 메서드 호출을 중지합니다)

writable.write(chunk) 메서드가 반복적으로 호출되면 데이터가 쓰기 가능한 스트림에 버퍼링됩니다. 내부 쓰기 버퍼의 총 크기가 highWaterMark에서 설정 한 임계값 아래라면 writable.write()는 true를 리턴합니다. 내부 버퍼의 크기가 highWaterMark에 도달하거나 초과하면 false가 반환됩니다.

스트림 API는, 특히 stream.pipe() 메서드의 핵심 목표는 데이터 버퍼링을 허용가능한 수준으로 제한하여 다른 소스들이 사용해야하는 메모리를 건들지 않는 것입니다.

Duplex와 Transform 스트림은 모두 읽기 가능하고 쓰기 가능하기 때문에 각각 읽기 및 쓰기에 사용되는 두 개의 별도 내부 버퍼를 유지합니다. 그리고 각각이 효율적인 데이터 흐름을 유지하면서 서로 독립적으로 작동할 수 있도록 합니다. 예를 들어, net.Socket 인스턴스는 읽기 가능한 쪽이 소켓에서 수신된 데이터의 소비를 허용하고 쓰기 가능한 쪽이 소켓에 데이터 쓰기를 허용하는 이중 스트림입니다.

API for stream consumers

대부분의 Node.js 어플리에키션은(아무리 간단할지라도) 어떠한 방식으로든 stream을 이용합니다. 아래 예시는 HTTP server를 구현하기 위해 stream을 이용합니다.

const http = require('http');

const server = http.createServer((req, res) => {
	// req, res 모두 streamd입니다 !
	// req: http.IncomingMessage(readable stream), res: http.ServerResponse(writable stream)
	
	let body = '';
	// utf8 스트링으로 데이터를 받아옵니다.
	// 만약 인코딩이 설정되어있지 않다면 버퍼 객체로 받아집니다.
	req.setEncoding('utf8');

	req.on('data', (chunk) => {
		body += chunk;
	});

	// 'end' 이벤트는 모든 바디가 받아졌다는걸 나타냅니다.
	req.on('end', () => {
		try {
			const data = JSON.parse(body);
			res.write(typeof data);
			res.end();
		} catch (er) {
			res.statusCode = 400;
			return res.end(`error: ${er.message}`);
		}
	});
});

server.listen(1337);

Writable 스트림들은 (위 코드의 'res'와 같은) stream에 데이터를 쓰는데 이용되는 write() 혹은 end()같은 메서드들을 가지고 있습니다.

Readable 스트림은 EventEmitter API를 사용하여 데이터를 스트림에서 읽을 수있을 때 애플리케이션 코드에 알립니다. 사용 가능한 데이터는 여러 가지 방법으로 스트림에서 읽을 수 있습니다.

Readable, Writable 스트림 모두 EventEmitter API를 다양한 방법으로 사용하여 스트림의 현재 상태를 전달합니다.

스트림에 데이터를 쓰거나 데이터를 소비하는 애플리케이션은 스트림 인터페이스를 직접 구현할 필요가 없으며 일반적으로 require ( 'stream')을 호출 할 이유가 없습니다.

Writable streams

Writable 스트림은 데이터가 쓰여질 목적지에 대한 추상화입니다.

writable stream example

  • HTTP requests, on the client
  • HTTP responses, on the server
  • fs write streams
  • zlib streams
  • crypto streams
  • TCP sockets
  • child process stdin
  • process.stdout, process.stderr

모든 writable stream은 stream.Writable 클래스에 정의된 인터페이스를 구현합니다.

writable stream의 종류들이 다를지라도 모든 writable stream은 기본적인 사용 패턴을 따릅니다. 아래 코드가 예시 입니다.

const myStream = getWritableStreamSomehow();
myStream.write('some data');
myStream.write('some more data');
myStream.end('done writing data');

Class: stream.Writable

Event: 'close'

close 이벤트는 스트림이나 리소스가 닫혔을 때 발생됩니다. 다시말하면 새로 발생될 이벤트가 없음을 나타냅니다.

Event: 'drain'

만약 stream.write(chunk)가 false를 리턴한 상황에서, write 작업이 다시 실행될수 있는 상황이 되면 drain이벤트가 발생됩니다.

Event: 'error'

데이터 쓰기 또는 파이핑 중 오류가 발생하면 'error' 이벤트가 발생합니다.
스트림을 만들 때 autoDestroy 옵션이 false로 설정되지 않은 경우 'error'이벤트가 발생하면 스트림이 닫힙니다.
'error'이후에는 'close' 이외의 다른 이벤트가 생성되지 않아야합니다 ('error' 이벤트 포함).

Event: 'finish'

stream.end() 메서드가 호출된 뒤에 발생하는 이벤트,

Event: 'pipe'

stream.pipe() 메서드가 readable stream에서 호출되었을 때 발생하는 이벤트, pipe의 도착지로 writable을 세팅해주어야합니다.

const writer = getWritableStreamSomehow();
const reader = getReadableStreamSomehow();
writer.on('pipe', (src) => {
  console.log('Something is piping into the writer.');
  assert.equal(src, reader);
});
reader.pipe(writer); // writable을 세팅해주었죠.

Event: 'unpipe'

pipe와 반대되는 이벤트라고 보시면 됩니다. readable stream에서 stream.unpipe() 메서드가 호출되면 readable stream의 도착지로 설정되어있던 writable stream이 제거됩니다.

const writer = getWritableStreamSomehow();
const reader = getReadableStreamSomehow();
writer.on('unpipe', (src) => {
  console.log('Something has stopped piping into the writer.');
  assert.equal(src, reader);
});
reader.pipe(writer);  // 붙이고
reader.unpipe(writer);  // 다시 떼고.

writable.write(chunk,[, encoding][, callback])

  • chunk: 쓰기할 데이터(optional), object mode가 아닐 때는 chunk는 무조건 string, Buffer 혹은 Unit8Array이어야만 합니다. object mode일 때는 null 값을 제외한 javascript값이 올수 있습니다.
  • encoding: 인코딩 형식, 만약 chunk가 string이라면 'utf8'이 디폴트 값입니다.
  • callback: 데이터가 모두 쓰여졌을 때의 콜백입니다.
  • Returns: 쓰기 작업이 완료 되지 못했다면 false를 리턴하면서 'drain' 이벤트가 발생되길 기다립니다. 모두 쓰여졌다면 true를 리턴합니다.

writable.write() 메서드는 스트림에 일부 데이터를 쓰고 데이터가 완전히 처리되면 제공된 콜백을 호출합니다. 오류가 발생하면 콜백이 첫 번째 인수로 오류와 함께 호출되거나 호출되지 않을 수 있습니다. 쓰기 오류를 안정적으로 감지하려면 'error'이벤트에 대한 리스너를 추가하십시오. 콜백은 '오류'가 발생하기 전에 비동기 적으로 호출됩니다.
내부버퍼가 stream이 생성될때 세팅한 highWaterMark보다 작다면 'true'를 반환합니다. 만약 'false'가 리턴된다면 stream은 'drain'이벤트가 발생되기전까지 멈춰집니다.

스트림이 drain 이벤트가 발생되기 전에 write를 호출한다면 false가 반환됩니다.
현재 버퍼링된 모든 chunk가 비워지면(os에 의해 어딘가로 전달되면) 'drain'이벤트가 발생합니다. 한번 false가 반환되었다면 'drain'이벤트가 발생되기전까지는 chunk가 쓰여지지 않게 하는 것이 node가 추천하는 방법입니다. 만약 계속 write()를 호출한다면 node.js는 메모리의 한계치까지 버퍼링하게 됩니다. 그렇게 되면 모든게 중단될 것입니다.

데이터를 쓸때는 Readable과 stream.pipe()를 이용하는 것을 추천드립니다.
만약 write()를 이용하는 것을 선호하신다면 'drain' 이벤트를 이용하셔서 메모리 이슈와 스트림 부하를 잘 조절하시길 바랍니다.

function write(data, cb)  {
  if (!stream.write(data)) {
    stream.once('drain', cb);
  } else {
    process.nextTick(cb);
  }
}

// 다른 쓰기 작업 수행 전에 콜백이 호출되길 기다립니다.
write('helo', () => {
  console.log('Write completed, do more writes now.');
});

Readable Streams

Readable Streams는 소비될 데이터에 대한 추상화입니다.

Readable streams 예시

  • HTTP responses, on the client
  • HTTP requests, on the server
  • fs read streams
  • zlib streams
  • crypto streams
  • TCP sockets
  • child process stdout and stderr
  • process.stdin

모든 Readable streams은 stream.Readable 클래스의 구현체입니다.

두가지 타입의 읽기 모드

Readable streams는 2가지의 모드[flowing and paused]를 이용하여 효과적으로 작동합니다.

  • flowing 모드에서는 기본 시스템안에서 자동으로 데이터가 읽힙니다. 그리고 EventEmitter를 통해 가능한 빨리 어플리케이션에 데이터를 제공합니다.
  • paused 모드에서는 stream.read() 메서드를 호출하여 스트림에서 데이터 청크를 읽게 만들어야 합니다.

모든 Readable stream은 paused 모드에서 시작합니다. flowing 모드로 변경하기 위해서는 아래 방법을 통해 가능합니다.

  • event handler에 data를 추가합니다.
  • stream.resume() 메서드를 호출합니다.
  • stream.pipe() 메서드를 호출하여 Writable에 데이터를 보냅니다.

반대로 flowing에서 puased 모드로 바꾸려면,

  • pipe 도착지가 없을때, stream.pause() 메서드를 호출합니다.
  • pipe 도착지가 있을때, pipe 도착지를 모두 지웁니다.(복수개의 도착지가 있을 때는 'stream.unpipe()' 메서드를 호출하면 모두 지워집니다.)

! 중요한 개념하나가 있습니다.
데이터를 소비하거나 무시하는 매커니즘이 제공될 때까지 Readable은 데이터에 관해 아무일도 하지 않습니다.소비 매커니즘이 비활성화되거나 제거되면, 데이터 관련 작업을 정지하려 시도합니다.(시도만 할뿐 완벽히 정지될수도 있고 안될수도 있습니다.)
읽으려는 데이터를 작업 도중 삭제한다고 하더라도 스트림을 완전히 멈추게 할수는 없다는 말입니다.

세가지 상태.

위의 두가지 모드에서 더 깊이 들어가면 3가지의 상태가 더 있습니다.

  • readable.readableFlowing === null
  • readable.readableFlowing === false
  • readable.readableFlowing === true

Readable.ReadableFlowing이 null이면 스트림의 데이터를 사용하기위한 메커니즘이 제공되지 않습니다. 따라서 스트림은 데이터를 생성하지 않습니다. 이 상태에서 'data'이벤트에 대한 리스너를 연결하거나, Readable.pipe() 메서드를 호출하거나, Readable.resume() 메서드를 호출하면 Readable.ReadableFlowing이 true로 전환되어 Readable이 다음과 같이 이벤트를 방출하기 시작합니다. 데이터가 생성됩니다.

Readable.pause(), Readable.unpipe()를 호출하거나 스트림 부하를 수신하면 Readable.ReadableFlowing이 false로 설정되어 일시적으로 이벤트 흐름이 중단되지만 데이터 생성은 중단되지 않습니다. 이 상태에서 'data'이벤트에 대한 리스너를 연결해도 Readable.ReadableFlowing이 true로 전환되지 않습니다.

const { PassThrough, Writable } = require('stream');
const pass = new PassThrough();
const writable = new Writable();

pass.pipe(writable);
pass.unpipe(writable);
// readableFolowing 가 false로 바뀜.

pass.on('data', (chunk) => { console.log(chunk.toString()); });
pass.write('ok');  // write()를 해도 데이터가 생성되지 않습니다.
pass.resume();     // unpipe된 후에는, 반드시 resume()을 호출해야 데이터가 생성됩니다.

api 스타일 고르기.

readable은 데이터를 읽어올 때 여러가지 방식을 제공해줍니다. 그렇다보니 api들을 섞어서 쓸실수도 있는데 그렇게 이용하시면 예상치 못한 문제가 발생할수 있습니다. 그러니 각각의 stream에는 한가지 방식만을 이용하셔야 합니다.

추천드리는 가장 쉬운 방법은 readable.pipe() 메서드입니다.
혹시 더 정교하게 데이터를 다루고 싶으시다면 EventEmitter와 readable.on('readable')/ readable.read() 를 이용하시기 바랍니다.

Class: stream.Readable

Event: 'close'

'close'이벤트는 스트림 및 기본 리소스가 닫힐 때 발생합니다. 더 이상 이벤트가 발생하지 않으며 더 이상 어떠한 계산이 발생하지 않게 됩니다.
Readable 스트림이 emitClose 옵션과 합께 생성 된 경우 항상 'close'이벤트를 내보냅니다.

Event: 'pause'

stream.pause() 메서드가 호출되었을때, 그리고 readableFlowing이 false가 아닐때 발생됩니다.

Event: 'readable'

stream으로부터 데이터를 읽어올 수 있을 때 발생됩니다.

const readable = getReadableStreamSomehow();
readable.on('readable', function() {
  // There is some data to read now.
  let data;

  while (data = this.read()) {
    console.log(data);
  }
});

readable 이벤트는 스트림 데이터의 끝에 도달하면 'end' 이벤트를 발생시키기 전에 한번 더 발생합니다.
readable 이벤트는 stream에 새로운 정보가 있다는 것을 알려줍니다. 두가지가 있습니다. 1. 새로운 데이터가 읽힐 준비가 되었다,/ 2. 데이터를 끝까지 읽었다.
전자는 stream.read() 메서드가 읽을 수 있는 데이터를 리턴합니다. 후자에서는 stream.read() 메서드가 null을 리턴하게 됩니다. 아래 예시의 "foo.text"는 빈 파일입니다.

const fs = require('fs');
const rr = fs.createReadStream('foo.txt');
rr.on('readable', () => {
  console.log(`readable: ${rr.read()}`);
});
rr.on('end', () => {
  console.log('end');
});

위 코드의 결과입니다.

$ node test.js
readable: null
end

일반적으로 readable.pipe()data 이벤트 매커니즘은 readable 이벤트보다 쉽게 이해할 수 있습니다. 그러나, readable은 처리량이 더 많아집니다.

readabledata를 함께 이용한다면, readable이 흐름을 주도하게 됩니다. 예를들면 datastream.read()가 호출되었을 때만 발생하게 됩니다.

Event: 'destory'

stream을 파괴합니다. 선택적으로 error 이벤트와 close 이벤트 발생시킵니다. destory가 호출되면 readable stream은 내부적으로 이어질 작업들을 무시하게 만들기 위해 내부 리소스와 이어질 호출들을 모두 방출(해제)시킵니다.

readable.pipe(destination[, options])

readable.pipe() 메서드는 readable에 Writable을 붙여줍니다. 그리고 flowing mode로 변경시키고 Writable에 데이터를 보냅니다. readable로부터 writable의 데이터 흐름은 writable stream이 너무 빠른 readable stream의 속도를 감당할 수 있도록 자동으로 조절됩니다.

예시 pipe입니다.

const fs =require('fs');
const readable = getReadableStreamSomehow();
const writable = fs.createWriteStream('file.txt');
// 모든 readable의 데이터는 'file.txt'로 가게 됩니다.
readable.pipe(writable);

한개의 Readable에 복수개의 Writable을 붙이는 것 또한 가능합니다.

readable.pipe() 메서드는 데이터 흐름의 도착지의 참조를 리턴해줍니다. 그 참조를 이용해서 piped streams을 연결할 수 있습니다.

const fs = require('fs');
const r = fs.createReadStream('file.txt');
const z = zlib.createGzip();
const w = fs.createWriteStream('file.txt.gz');
r.pipe(z).pipe(w);

기본적으로 stream.end() 는 Readable stream이 'end' 이벤트를 발생시켰을 때에만 목적지 writable stream에서 호출됩니다. 그렇게 되면 목적지는 더이상 쓰기 작업을 진행할 수 없게 됩니다.

reader.pipe(writer, { end: false });
reader.on('end', () => {
  writer.end('Goodbye\n');
});

한가지 중요한 사실은 Readable stream이 과정중에 error을 발생시킨다면, Writable 목적지는 자동으로 닫히지 않습니다. 에러가 발생하면 직접 stream을 닫아주어야 메모리 누수가 발생하지 않습니다.

process.stderr와 process.stdout Writable stream은 node.js 프로세스가 종료될때까지 특정 옵션을 설정해놓지 않는한 절대 알아서 닫히지 않습니다.

readable.Read() 메서드는 내부 버퍼에서 일부 데이터를 가져 와서 반환합니다. 읽을 수있는 데이터가 없으면 null이 반환됩니다. 기본적으로 Readable.setEncoding() 메서드를 사용하여 인코딩이 지정되지 않았거나 스트림이 object mode에서 작동하지 않는 한 데이터는 Buffer 객체로 반환됩니다.

stream.pipeline(source[, ...tramsforms], destination, callback),
stream.pipeline(streams, callback)

stream과 생성기를 연결시켜주는 모듈 메서드입니다. 그리고 이 메서드는 error를 전달하고 파이프라인이 완료되면 적절히 정리하고 콜백을 제공합니다.

const { pipeline } = require('stream');
const fs = require('fs');
const zlib = require('zlib');

// pipeline API를 이용하면
// 여러가지 스트림을 쉽게 이어줄 수 있고,
// pipeline이 완료되면 알림을 받을 수 있습니다.

pipeline(
  fs.createReadStream('archive.tar'),
  zlib.createGzip(),
  fs.createWriteStream('archive.tar.gz'),
  (err) => {
    if (err) {
      console.error('Pipeline failed.', err);
    } else {
      console.log('Pipeline succeeded.');
    }
  }
);

stream.pipeline() 메서드는 아래 몇가지를 제외한 상황에서 stream.destory(err)를 호출합니다.

  • Readable stream이 'end'혹은 'close' 이벤트를 발생시켰을 때.
  • Writable stream이 'finish' 혹은 'close' 이벤트를 발생시켰을 때.


중요한 것들만 번역해둔 글입니다. 디테일한 정보가 필요하시다면 nodejs.org 사이트의 원본을 보시길 바랍니다 !
https://nodejs.org/api/stream.html#stream_stream_pipeline_source_transforms_destination_callback

profile
https://medium.com/nodejs-server

10개의 댓글

comment-user-thumbnail
2021년 1월 2일

잘 읽었습니다.. 많이 저한테는 어려워서 그러는데
stream이라는게 실제 코드(실무)에서는 어떤 역할을 할 수가 있나요?

1개의 답글
comment-user-thumbnail
2021년 1월 3일

Readable streams에서 flowing모드가 필요할 때, paused모드가 필요할 때 각각 어떤 상황에 필요한지 궁금합니다.

1개의 답글
comment-user-thumbnail
2021년 1월 3일

http request랑 response가 모두 스트림 객체라는 걸 처음 알았어요
노드js 공부는 많이 해보지 않아서 글을 다 이해하긴 어려웠는데
노드js에서 데이터를 읽고 쓰고 처리하는 방식에 대해서 자세하게 나와있어서 좋았네요~!

1개의 답글
comment-user-thumbnail
2021년 1월 3일

글을 작성하는 데 얼만큼의 시간이 걸리셨나요?

1개의 답글
comment-user-thumbnail
2021년 4월 28일

좋은 자료 감사합니다.
Node.JS 에서 가장 중요한 요소중 하나가 Stream 인거 같아요
최근의 코드 패턴을 보면 async await, iterable, promise 를 stream과 결합해서 가독성을 높히는 코드가 자주 보이는데 앞으로는 더욱더 Stream의 중요성이 높아질거같습니다.

답글 달기