Databricks란
- apache spark를 만든 사람들이 모여 만든 플랫폼
- Delta Lake, mlflow 등을 이용해 통합 클라우드 분석 플랫폼을 제공하는 Saas 회사
- 이것을 LakeHouse라고 부른다.
해결하려고 한 것
- 사일로화된 기술 스택으로 복잡해진 데이터 아키텍처를 해결하고자 함.
- Data Lake
- 무제한적인 확장성
- 저비용, 개방성
- 성능이나 관리성 편의성이 좋지 못함.
- Data Warehouse
- 성능, 관리성, 편의성이 좋음
- 비용 문제, 비정형 데이터 관리 힘듬
- Data Lake 위에 Warehouse의 장점을 얹은다면 좋은 플랫폼을 만들 수 있지 않을까?
핵심 기술
구조
- Data Lake
- 레이크하우스 플랫폼
- 데이터 관리 및 거버넌스
- 오픈 데이터 레이크
- 플랫폼 보안 및 관리
- 단순성
- 사일로화 되어 있는 여러가지 솔루션들을 하나의 플랫폼에서 관리하기 때문에 유지보수 관리가 쉽다.
- 개방성
- 특정 DB에 종속되어 있지않고 오픈 소스 프로젝트를 기반으로 여러가지 혁신을 이루어가는중
- 여러가지 solution과 통합될 수 있다.
- 협업
- 데이터 팀을 통합하고 전체 데이터 및 AI 워크플로에서 공동 작업이 가능한 협업 플랫폼이다.
적용 사례
- 페타바이트 급 Dataset에 빠른 속도로 쿼리 실행, 데이터를 배분하고 비용 절약
- BI와 ML을 Delta Lake에 통합하고 70여 건의 usecase에 IoT와 스트리밍 활용
- 예측기반 유지보수와 인벤토리 관리를 통해 수백만 달러 절약 가능
- 유전체학, 임상 데이터를 분석하여 새로운 치료법 개발 속도 가속.
- 협업 방식을 개선하고 파이프라인 속도를 빠르게 하여 데이터 준비 기간을 기존 3주에서 2일로 단축
- 자사 Dataset에 쿼리 성능 600배 향상
- 데이터와 AI를 활용하여 대량의 뱅킹 정보 구동, 매달 새로운 GTM 역량 도입 가능
- 서로 다른 150종의 소스에서 수집한 수백 TB 규모의 데이터를 하나의 플랫폼에 통합
- 100여 종의 ML 모델을 이전보다 10배 빠릴 출시, 사기 행위 탐지부터 고객 서비스, 마케팅과 물류에 이르기까지 50가지 usecase 지원, 100여 가지 usecase 적용 예정
Databrick Architecture and Service
- Apache spark의 배경
- hadoop의 성능 개선
- 빅데이터 프로세싱의 사실상 표준 통합 분석 엔진
- 데이터 프로세싱 영역에서 가장 큰 오픈 소스 프로젝트
- Spark API
- SQL 코드만 짜면 Spark가 알아서 처리하는 구조
- spark의 구조
- Spark cluster
- Driver Node가 worker 노드를 관리한다.
- 요청을 받게 되면 각각의 worker node에 떠 있는 executor process들에 분산을 하게 된다.
- executor process들은 각 node들에 있는 core에게 큰 작업을 task 단위로 나누어 분산하여 병렬처리를 한다.
- Databricks Architecture
- Control Plane (Databricks가 관리하는 영역)
- Web Application
- Repos / Notebooks
- Job Scheduling
- Cluster Management
- Data Plane (ex. 고객의 AWS 계정)
- Clusters
- Executor node 하나하나가 모두 VM으로 구성
- Exectuor node들의 coordination을 담당하는 Driver가 생긴다.
- Cluster Types
- All-Purpose Cluster
- 여러 분석가들이 모여서 분석할 때 사용하는 cluster
- Jobs Cluster
- ETL, 주기적인 job을 수행할 때 사용하는 cluster
- 훨씬 저렴하다.
Demo 훑어보기
- Photon
- jvm이 갖는 garbage collector 같은 한계를 해결하여 벡터라이징 엔진을 c++로 새로 짜 기존보다 3,4배 빠르고 성능이 최적화되어 있는 런타임을 선택할 수 있음
- Workspace
- Users에 사용자가 있으며 사용자에게 권한 부여가 가능하다.
- compute를 생성
- notebook 생성
- 특정 cluster에 attach하여 실행 시킨다.
- jupyter notebook과 거의 동일해 보임
- 여러 언어로 섞어 쓸 수 있다.
- Repos
DELTA LAKE
- 기존의 스토리지 시스템 위에 데이터 레이크 하우스를 구축할 수 있게 도와주는 오픈 소스 프레임워크
- ACID를 보장하는 object storage에 제공
- ACID 지원으로 해결되는 많은 문제들
- 단일 테이블에 여러 트렌젝션이 동시 Write 처리가 힘들다.
- 기존 데이터의 수정이 힘들다.
- 수정, 삭제 등을 위해 전체 partition을 overwrite 하는 경우도 있었음
- 중간에 실패한 Job - Corrupt Data
- 파일이 정상적으로 쓰였는지 안 쓰였는지도 확인 할 수 없었음
- 실시간 운영 힘듬(Batch중심)
- 하나의 테이블에 업데이트나 write가 힘드니 lambda라는 복잡한 아키텍처로 데이터를 이원화 하거나 해야했다.
- 과거 데이터 스냅샷 유지 비용 증가
Parquet
- 오픈소스 데이터 파일 포맷
- column 기반으로 통계 작업에 최적화 되어 있음.
- 분산 태스크를 할 때 굉장히 유용함.
- 데이터 append만 지원하는 문제점 발생
- Parquet의 파일 처리 문제점
- stream성 데이터를 처리하는 경우 조그마한 파일들이 많이 생길 것이다.
- 데이터가 한쪽으로 치우치는 skew의 문제점
- fail이 되었을 때 corrupt된 데이터를 처리 하기 힘듬
- 해결을 위해선 비슷한 파일 크기로 조정이 되어야 함.
- corrupt data에 대해 atomicity가 보장 되어야함.
Delta가 가져오는 안정성과 성능 향상 기능
- Consistencty
- Optimizations on the fly
- Direct updates and deletes
- Time travel
- 옛날 데이터에 대한 version 정보를 갖고 있음.
Delta Tables
- 실제 데이터는 parquet으로 되어 있다.
- parquet 파일에 있는 디렉토리에 hidden 디렉토리가 생기게된다.
- transaction에 대한 log정보와 파일에 대한 통계 정보를 갖고 있는 metadata를 갖고 있다.
- trasaction이 많아질수록 metadata가 많아지고 metadata를 관리하는 것들이 성능이 느려지지 않을까?
- delta는 transaction log도 spark에서 처리한다.
- spark를 이용하여 처리하는 metadata 엔진임.
Delta Engine
- Optimize
- where data >= current_timestamp() - INTERVAL 1 day ZORDER BY (eventType)
- optimize를 통해 skew를 해결한다.
- Auto Optimize
- stream성 데이터도 data가 write 되는 순간 Bin Packing을 통해 small file problem을 해결 할 수 있음
- Data skipping
- 불필요한 데이터를 읽지 않는다.
- metadata 통계 정보를 활용하여 필요한 데이터만 읽는다.
- Caching
Delta demo
%python
databricks_user = spark.sql("SELECT current_user()").collect()[0][0].split('@')[0].replace(".","_")
print(databricks_user)
spark.sql("DROP DATABASE IF EXISTS delta_{}_db CASCADE".format(str(databricks_user)))
spark.sql("CREATE DATABASE IF NOT EXISTS delta_{}_db".format(str(databricks_user)))
spark.sql("USE delta_{}_db".format(str(databricks_user)))
%sql
CREATE TABLE IF NOT EXISTS students (id INT, name STRING, value DOUBLE);
INSERT INTO students VALUES (1, "Yve", 1.0);
UPDATE students SET value = value + 1 WHERE name LIKE "T%";
- delete, update가 가능해짐.
- merge를 통해 여러개의 command를 처리할 수 있다.
CREATE OR REPLACE TEMP VIEW updates(id, name, value, type) AS VALUES
(2, "Omar", 15.2, "update"),
(3, "", null, "delete"),
(7, "Blue", 7.7, "insert"),
(11, "Diya", 8.8, "update");
SELECT * FROM updates;
MERGE문을 사용해 upsert 데이터의 update,insert 및 기타 데이터 조작을 하나의 명령어로 수행.
변경사항을 기록하는 CDC(Change Data Capture) 로그데이터를 updates라는 임시뷰로 생성.
MERGE INTO students b USING updates u
on b.id=u.id
WHEN MATCHED AND u.type = "update"
THEN UPDATE SET *
WHEN MATCHED AND u.type = "delete"
THEN DELETE
WHEN NOT MATCHED AND u.type = "insert"
THEN INSERT *
3개의 트렌젝션이 되고 만일 이중에 하나라도 실패하게 된다면 invalid한 상태가 될 수 있다.
이 3가지 action을 하나의 atomic 트렌젝션으로 묶어서 한꺼번에 적용되도록 한다.
DESCRIBE EXTENDED students
location 행에서 테이블을 이루는 파일의 위치 정보를 확인
%python
display(dbutils.fs.ls(f"/user/hive/warehouse/delta_{databricks_user}_db.db/students"))
delta Lake file 조사
- 실제 데이터들은 parquet 파일로 존재하고 _delta_log 파일에 transaction log를 갖고 있음
- _delta_log 폴더에는 log들이 json 파일 형태로 되어 있다.
%python
display(spark.sql(f"SELECT * FROM json.`dbfs:/user/hive/warehouse/delta_{databricks_user}_db.db/students/_delta_log/00000000000000000001.json`"))
위와 같이 json 파일을 열어 볼 수 있음
DESCRIBE HISTORY students
Table History를 볼 수 있음
SELECT * FROM students VERSION AS OF 2;
-- SELECT * FROM students@v2;
과거 버전 조회
-- RESTORE TABLE students TO VERSION AS OF 2;
과거 버전으로 돌아가기
Delta Live Tables