이전에 document들을 불러오고 변경하고 삭제하는 연산들을 하였다. 그런데 어떻게 elasticsearch가 document들이 어디에 있는 지 알 수 있는 것일까? 어떻게 색인된 document를 바로 찾을 수 있는 것일까??
바로 routing덕분이다. routing은 document에 대한 shard를 결정하는 과정을 말한다. (https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-routing-field.html#mapping-routing-field)
routing_factor = num_routing_shards / num_primary_shards
shard_num = (hash(_routing) % num_routing_shards) / routing_factor
다음은 index에서 어떻게 document가 특정 shard를 라우팅하는 지에 대한 공식을 나타낸 것이다. default _routing
은 document _id
와 같다. 만약, document를 만들 때 _routing
을 특정하지 않으면 _id
로 자동으로 들어가는 것이다.
참고로, elasticsearch이 routing 방법은 투명하게 열려있고, custom하게 변경할 수 있다. 단, shard의 개수를 처음에 index를 설정할 때 변경할 수 없는데, 이는 routing 알고리즘과 관련이 깊다. 왜냐하면 위의 식에서 보듯이 routing_factor
에 num_primary_shards
가 있기 때문이다. primary shard수를 변경하면 routing알고리즘이 변경되며, 기존의 routing에서 re-indexing을 할 수 밖에 없다.
elasticsearch에서 query에 따른 shard를 찾는 routing을 간단하게 도식화면 다음과 같다.
GET /products/_doc/100
|
▼
--------
|Node X|
--------
|
▼
---------------------
|Replication Group B| (Routing)
---------------------
|
▼
------------------------------------------- (Adaptive replica selection)
| --------- --------- --------- --------- | ------------
| |Primary| |Replica| |Replica| |Replica| | --▶ | Replica B1|
| |shard | | B1 | |B2 | |B3 | | ------------
| --------- --------- --------- --------- |
-------------------------------------------
모든 query들이 primary shard에서만 데이터를 추출하는 것이 아니다. 그렇게 된다면 scaling하는 의미가 사라질 것이다. elasticsearch는 Adaptive Replica Selection(ARS)를 사용하는데, 이는 replica shear에도 query를 처리할 수 있도록 하는 것을 말한다. query들은 Replication Group에 요청을 라우팅하고 Replication Group에서 shard를 최적의 알고리즘으로 선출하도록 하는 것이다.
따라서, elasticsearch는 data를 읽을 때 Primary shard뿐만 아니라 ARS에 따라 replica shard에도 요청을 보내며, 최적의 shrad에게 요청을 보내는 것으로 생각하면 된다.
elasticsearch에서 데이터를 읽는 것은 ARS로 replica shard에도 요청이 갈수 있다고 했다. 그러나 data를 write하는 operation의 경우에는 무조건 Primary shard로 먼저간다.
PUT /products/_doc/100
|
▼
--------
|Node X|
--------
|
▼
--------------- 1. validate the data to write
|Primary shard| 2. spread data to replica shards on parallel
---------------
|
-------------------
| | |
▼ ▼ ▼
--------- --------- --------- ---------
|Primary| |Replica| |Replica| |Replica|
|shard | | B1 | |B2 | |B3 |
--------- --------- --------- ---------
data(document)쓰기 요청이 들어오면 Node에서 해당 primary shard로 바로 요청을 보낸다. 이때 primary shard는 data를 검증하고, 검증이 완료되면 document를 저장한 다음 replica들에게 병렬적으로 요청을 보내어 data를 복제하도록 한다.
그런데, 하드웨어 문제로 primary shard가 replica shard에게 data저장 요청을 보내기 전에 죽어버리면 어떻게 될까? 데이터 불일치 문제를 어떻게 해결하는가?
---------------
|Primary shard| (Dead)
---------------
|
-----------
| |
O X
| |
▼ ▼
--------- ---------
|Replica| |Replica|
| B1 | |B2 |
--------- ---------
Primary shard가 Replica B1에는 데이터 복제에 성공했지만, Replica B2에는 복제를 실패하고 죽었다고 하자. elasticsearch는 Primary shard가 죽었으므로, replication group 중에 하나를 primary shard로 promotion시키고, 죽은 이전의 primary shard를 되살려 replica shard로 만든다. 우리의 예제에서는 Replica B2
가 Primary shard가 되었다고 하자.
---------------
|Primary shard| (recover this shard as replica shard)
---------------
|
-----------
| |
O X
| |
▼ ▼
--------- ---------
|Replica| |Primary|
| B1 | |Shard |
--------- ---------
문제는 이전에 들어온 data쓰기 요청에 대해서 Replica B2에 반영이 안되었다는 것이다. 이 경우 primary shard와 replica shard간의 데이터 불일치가 발생한다.
이를 해결하기위해서 elasticsearch에서는 primary term과 sequence number를 사용한다.
1. primary term: counter로 primary shard가 변경되면 0에서 1씩 증가한다. 위의 예제에서 primary shard가 변경되었으므로 +1증가한다.
2. sequence number: write operation이 동작할 때 마다 증가하는 counter로 +1씩 증가한다. primary shard가 +1씩 sequence number를 증가시키고, 위의 예제에서 write operation이 동작하였으므로 +1이 증가한다. response에는 _seq
로 나와있다.
primary term과 sequence number를 이용해서 elasticsearch는 primary shard로부터 발생한 실패를 복구한다. primary term이 올라가면, primary shard가 바뀌었다는 것이고, replica shard와 primary shard의 sequence number가 어디까지 반영되었는 지를 기반으로 반영안된 write operation을 업데이트한다.
그러나, index가 커지면 이 과정은 매우 비용이 비싸진다. 따라서 elasticsearch는 속도를 올리기 위해 elasticsearch는 global, local checkpoint를 사용한다.
이들은 sequence number로 각 replication group이 global checkpoint를 갖는다. 각 replica shard는 local checkpoint를 가진다.
따라서, local checkpoints가 global checkpoints보다 낮으면 안되는 것이다. 이 경우에 해당 operation을 replica shard에 반영하도록 한다. 이렇게하면 처음부터 끝까지 어떤 operation이 반영이 안되었는 지 확인할 필요가 없고 전체 replication group안의 shard를 검사할 필요도 없다.
참고로 sequence number는 index에 write operation이 적용되어 하나씩 상승하는 것을 말한다. 이는 _version
과 약간 헷갈릴 수 있는데, _version
은 개별 document에게 발생한 write operation으로 update될 때마다 하나씩 상승한다.
따라서, _version
은 document가 update될 때마다 증가하는 것이다. version에는 두 가지 타입이 있다.
1. internal: elasticsearch 내부에서 document가 변경될 때를 기록하기 위해서 사용하는 것이다.
2. external: 사용자가 document의 schema가 바뀌었을 때, 이를 표시하고싶어 사용하는 것이다. 가령, DBMS에서 schema가 바뀌어서 document구조가 바뀌었다면, 이전 과의 schema구조 차이를 구분하기위해서 external versioning을 둔다. 다음과 같이 가능하다.
PUT /products/_doc/100?version=525&version_type=external
{
"name": "Toaster",
"price": 78,
"in_stock": 4
}
{
"_index" : "products",
"_type" : "_doc",
"_id" : "100",
"_version" : 525,
"result" : "updated",
"_shards" : {
"total" : 2,
"successful" : 1,
"failed" : 0
},
"_seq_no" : 25,
"_primary_term" : 1
}
_version
이 사용자가 지정한 verson으로 설정된 것을 볼 수 있다. 따라서, 해당 version부터 document를 수정하면 하나씩 증가하게 된다.
사실 그러나, _version
을 실용적으로 쓰는 일은 거의없다. 이전의 legacy system에서나 사용되었고, optimistic concurrency control을 위해서 사용되었다. optimistic concurrency control
는 동시성 상황에서 같은 document에 대한 update가 진행될 때, 이를 어떻게 할 것인가에 대한 이야기이다.
---------- -----------------
|client A| --PUT /products/_doc/100?version=1--> | elasticsearch |
---------- | |
| |
---------- | |
|client B| --PUT /products/_doc/100?version=1--> | |
---------- -----------------
client A, client B가 동시에 document id 100의 version 1에 대해서 write operation을 한다면, 어떻게 해야할까?? 이는 어쩔 수 없이 하나를 반영하고 _version
을 올린 뒤에 이전 version에 대해서 온 요청에 대해서는 에러 코드를 보내는 수 밖에 없다.
가령, client A의 요청을 수락하여 version을 2로 만들고 client B의 요청을 거절하여, version 2에 대해서 write operation을 수행하라고 하는 것이다. 이렇게 concurrency 이슈를 해결하기위해서 version을 도입했지만 이는 과거의 수행 방법이고, 최근에는 primary term
과 sequnce number
를 사용한다.
가령 document id 100을 retrive해보도록 하자.
GET /products/_doc/100
{
"_index" : "products",
"_type" : "_doc",
"_id" : "100",
"_version" : 530,
"_seq_no" : 30,
"_primary_term" : 1,
"found" : true,
"_source" : {
"name" : "Toaster",
"price" : 78,
"in_stock" : 4
}
}
_primary_term
이 1이고, _seq_no
가 30인 것을 알 수 있다. 이를 이용해 다음과 같이 업데이트 query를 보낼 수 있다.
PUT /products/_doc/100?if_primary_term=1&if_seq_no=30
{
"name": "Toaster",
"price": 78,
"in_stock": 5
}
{
"_index" : "products",
"_type" : "_doc",
"_id" : "100",
"_version" : 531,
"result" : "updated",
"_shards" : {
"total" : 2,
"successful" : 1,
"failed" : 0
},
"_seq_no" : 31,
"_primary_term" : 1
}
응답에 성공하여 _seq_no
가 30이 된 것을 볼 수 있다. 여기서 만약 위의 PUT
쿼리를 다시 요청하면 어떻게될까?
PUT /products/_doc/100?if_primary_term=1&if_seq_no=30
{
"name": "Toaster",
"price": 78,
"in_stock": 5
}
{
"error" : {
"root_cause" : [
{
"type" : "version_conflict_engine_exception",
"reason" : "[100]: version conflict, required seqNo [30], primary term [1]. current document has seqNo [31] and primary term [1]",
"index_uuid" : "psm_muz4T7ehqucZ9-De0w",
"shard" : "0",
"index" : "products"
}
],
"type" : "version_conflict_engine_exception",
"reason" : "[100]: version conflict, required seqNo [30], primary term [1]. current document has seqNo [31] and primary term [1]",
"index_uuid" : "psm_muz4T7ehqucZ9-De0w",
"shard" : "0",
"index" : "products"
},
"status" : 409
}
다음의 에러 메시지를 받는다. 이는 _seq_no
가 잘못되었다는 것이다. client는 이 에러메시지를 받고, 스스로 고쳐서 다시 요청을 보내는 방법 밖에 없다.
RDBMS에서 where절을 통해서 query를 호출하듯이, elasticsearch에서도 가능하다.
query
의 조건에 일치하면 script.source
을 호출한다.POST /products/_update_by_query
{
"script": {
"source": "ctx._source.in_stock--"
},
"query": {
"match_all": {}
}
}
{
"took" : 299,
"timed_out" : false,
"total" : 4,
"updated" : 4,
"deleted" : 0,
"batches" : 1,
"version_conflicts" : 0,
"noops" : 0,
"retries" : {
"bulk" : 0,
"search" : 0
},
"throttled_millis" : 0,
"requests_per_second" : -1.0,
"throttled_until_millis" : 0,
"failures" : [ ]
}
총 4개의 document가 업데이트 되었다는 것을 말한다.
update by query는 query
에 매칭하는 다량의 data들을 수정하는데 굉장히 유용하다. 해당 operation이 동작하는 과정을 정리하면 다음과 같다.
조심할 것은 transactional한 연산이 아니므로 연산 도중에 실패하면 rollback되지 않는다는 것이다.
그런데, 왜 update by query이전에 snapshot을 도출하는 것일까?? 이는 snapshot에 있는 primary term
과 _seq_no
를 사용하려고 하기 때문이다. 즉, optimistic concurrency control
을 위해서인데 update by query
연산 도중에 다른 query들이 document들을 변경하면 문제가 동시성 문제가 발생할 수 있기 때문이다. 따라서, snapshot을 찍은 다음 연산의 대상이 되는 document가 수정되었다면 해당 query는 aborted되게 된다. 단, conflicts
option을 proceed
로 설정하면 snapshot과 현재 상태가 conflict가 발생해도 그대로 다음 document에 대해서 연산을 진행할 수 있도록 할 수 있다.
Update by query
와 마찬가지로 Delete by query
도 존재한다.
query
의 조건에 일치하면 삭제한다.POST /products/_delete_by_query
{
"query": {
"match_all": {}
}
}
다음의 명령어를 사용하면 query에 해당하는 document는 모두 삭제한다. query가 match_all
이기 때문에 모든 query를 삭제하게 된다.
https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html
bulk API를 사용하면 여러 개의 create, update, delete operation을 단일 API call로 처리할 수 있다. 이는 overhead를 줄이고 indexing speed를 늘릴 수 있다.
bulk API는 NDJSON specification data 포맷을 이용한다. NDJSON은 뭔가 특별한 것은 아니고, 한줄에 하나의 JSON객체를 써서 reuqest body에 한 줄짜리 JSON객체 여러 개를 보내는 것이다. elasticsearch에서 지원하는 NDJSON의 형식은 다음과 같다.
{action_and_metadata}\n
{optional_source}\n
{action_and_metadata}\n
{optional_source}\n
...
action을 먼저 정의하고 이에 대한 option information을 아래에 바로 적어주는 것이다. 필요에 따라서 선행 조건이 있을 수 있고, option도 생략할 수 있다.
create
, delete
, update
, index
등의 action들로 이루어져 있으며, 가령 create
, update
의 경우는 index
또는 write
, index
로 index를 명시해야한다. 다음은 index products
의 document
200, 201 두개를 bulk로 생성하는 문법이다.
POST /_bulk
{ "index": {"_index": "products", "_id": 200 }}
{ "name": "Espresso Machine", "price": 199, "in_stock": 5}
{ "create": {"_index": "products", "_id": 201}}
{ "name": "Milk Frother", "price": 149, "in_stock":14}
이렇게 index
라는 action을 한 줄의 JSON객체로 적고, 다음 줄에 index
action에 필요한 option을 써주는 것이다. 또 다음 create
라는 action을 한 줄의 JSON객체로 적고, 다음 줄에 create
action에 필요한 option을 써주는 것이다.
결과로 다음과 같이 나온다.
{
"took" : 6,
"errors" : false,
"items" : [
{
"index" : {
"_index" : "products",
"_type" : "_doc",
"_id" : "200",
"_version" : 1,
"result" : "created",
"_shards" : {
"total" : 2,
"successful" : 1,
"failed" : 0
},
"_seq_no" : 36,
"_primary_term" : 1,
"status" : 201
}
},
{
"create" : {
"_index" : "products",
"_type" : "_doc",
"_id" : "201",
"_version" : 1,
"result" : "created",
"_shards" : {
"total" : 2,
"successful" : 1,
"failed" : 0
},
"_seq_no" : 37,
"_primary_term" : 1,
"status" : 201
}
}
]
}
created
가 두개 있는 것을 알 수 있다. 즉 데이터가 products
index에 생성된 것을 알 수 있다.
다음으로 update
와 delete
를 해주도록 하자. _bulk
path에는 index를 설정할 수 있는데, index
action을 쓰지 않아도 path parameter로 쓸 수 있다.
POST /products/_bulk
{ "update": {"_id": 201}}
{ "doc": {"price": 129 }}
{ "delete": {"_id": 200}}
다음과 같이 products
라는 index를 path parameter로 지정할 수 있는 것이다. 그 아래 줄에 NDJSON 형식으로 action-option을 차례대로 적어주면 된다. 다만 delete
action의 경우는 option을 추가적으로 적어줄 필요가 없기 때문에 생략해도 된다. 위의 예제는 document _id
201의 price
를 129
로 수정하고 document _id
200을 삭제하라는 bulk연산이다.
다음은 bulk API를 사용할 때 조심해야할 사항이다.
1. HTTP Content-Type
header는 반드시 Content-Type: application/x-ndjson
이어야 한다. 따라서, console이나 SDK를 사용하면 문제 없지만 curl을 사용하거나 http requester를 직접 사용하면 반드시 지정해주어야 한다.
2. 반드시 각 라인은 newline으로 끝나야 한다. 즉, \n
으로 끝나야 한다는 것이다.
3. bulk API에 여러 개의 action이 있을 때, 하나가 실패한다해서 다른 action들이 실행되지 않는 것은 아니다. 따라서, items
key로 성공했는 지 안했는지 확인하도록 하자.
4. 개별적인 query를 사용하는 것보다 bulk API를 사용하는 것이 훨씬 더 효율적이고 빠르다. 왜냐하면 netowrk round trip을 줄일 수 있기 때문이다.