Big Query_5

이지수·2023년 7월 31일
0

프로그래밍 방식을 활용

참고 : General하게 Dataset 관리 하는 여러가지 방법이 기재되어있는 페이지

1. REST API

1.General _ REST란

  1. Representational State Transfer
  2. 1) 자원을 2) 이름(자원의 표현)으로 구분하여 해당 3) 자원의 상태(정보)를 주고 받는 모든 것을 의미
    자원(resource)의 표현(representation)
  • 자원: 해당 소프트웨어가 관리하는 모든 것 (데이터, 문서 etc)
  • 자원의 표현: 그 자원을 표현하기 위한 이름 (학교 이름이 DB에 자원으로 있을 때, school이 자원의 표현이 됨)
  • 상태(정보) 전달 : 데이터가 요청되어지는 시점에서 자원의 상태(정보)를 (JSON | XML의 형식으로) 전달
  1. Client와 Server 사이의 통신 방식 중 하나

  2. HTTP URI(Uniform Resource Identifier)를 통해 자원(Resource)을 명시하고, HTTP Method(POST, GET, PUT, DELETE)를 통해 해당 자원에 대한 CRUD Operation을 적용

  3. CRUD Operation

  • Create : 생성(POST)
  • Read : 조회(GET)
  • Update : 수정(PUT / PATCH)
  • Delete : 삭제(DELETE)

2. Big Query REST API

빅쿼리 REST URL의 기반은 https://www.googleapis.com/bigquery/v2

그 뒤로 /projects/<PROJECT>/datasets/<DATASET>/tables/<TABLE>

Question > 이해가 잘 안감

2. Google Client Library > Python

참고페이지_클라이언트 라이브러리 사용

1. Google Cloud > Dashboard > Project 선택

2. BigQuery API 사용 설정 (Question. 여기 메뉴에서 어디인지 모르겠음)

3. IAM 및 관리자 - 서비스계정

  1. 서비스 계정 만들기

  2. 서비스 계정 생성 완료

  1. 서비스 계정 키 만들기
  • Google Cloud Console에서 만든 서비스 계정의 이메일 주소를 클릭
  • 키 메뉴 > 새 키 만들기 >
  • JSON 키 파일 생성
  1. 생성된 서비스 계정 키의 위치를 경로로 설정
  • GOOGLE_APPLICATION_CREDENTIALS 환경 변수를 서비스 계정 키가 포함된 JSON 파일의 경로로 설정
  • 이 변수는 현재 셸 세션에만 적용되므로, 새 세션을 열 경우, 변수를 다시 설정해아 함
# Linux & Mac
export GOOGLE_APPLICATION_CREDENTIALS="KEY_PATH"
  
# Microsoft
## Powershell
$env:GOOGLE_APPLICATION_CREDENTIALS="KEY_PATH"

## 명령 프롬프트
set GOOGLE_APPLICATION_CREDENTIALS=KEY_PATH

4. Client Library 설치

  1. Window - Anaconda
conda install -c conda-forge google-cloud-bigquery
  1. Linux - Poetry
    - Question > Poetry add로 되지 않기 때문에 pip install한 후 가져와야 함

5. Library 가져오기

from google.cloud import bigquery

6. BigQuery 클라이언트 초기화

  • client 객체를 이용해 수행하는 모든 작업에 대한 비용이 청구됨
  • Project는 추후 Code로 전달할 수도 있음
client = bigquery.Client(project = <PROJECT>)

7. Query 및 Python Code

Google Cloud Bigquery API Reference
Python-Bigquery Github : 코드 참고

이는 스트리밍 삽입 방식임 기존 스트리밍 API 사용_공식 웹사이트

  • 스트리밍 삽입 방식은 빅쿼리 비용이 청구됨 > 이미 필요한 데이터를 갖고 있는 경우 로드 작업으로 대신할 수 있음, 이 경우 비용이 청구되지 않음

1. Dataset 정보 확인

  • get_dataset : 데이터셋에 대한 정보 확인
  • 프로젝트명을 명시하지 않을 경우, Client의 Instance가 생성될 때 참조했던 프로젝트가 전달됨
dsinfo = client.get_dataset(<Project>.<dataset>)

# dataset 정보 확인
print(dsinfo.dataset_id)
print(dsinfo.created)
print(dsinfo.location)

# dataset 접근 권한 확인 > READER 권한 가진 role 확인 
for access in dfinfo.access_entires:
	if access.role == 'READER':
    	print(access)

2. Dataset 생성

  • create_dataset : 데이터셋 생성
# 기본 Region : US인 경우
dataset_id = "{}.<dataset_이름>".format(<Project_이름>)
ds = client.create_dataset(dataset_id, exists_ok = True)

# Specific한 Region 지정시 Local Dataset 객체 생성 
# > dsinfo 변수에 저장 > location 값 지정 > Dataset 객체 전달 > create_dataset 메서드 호출 
dataset_id = "{}.<dataset_이름>".format(<Project_이름>)
dsinfo = bigquery.Dataset(dataset_id)
dsinfo.location = 'EU'
ds = client.create_dataset(ds_info, exists_ok = True)

3. Dataset 삭제

  • delete_dataset
client.delete_dataset('<dataset_이름>', not_found_ok = True)

# 특정 프로젝트의 데이터셋 삭제는 앞에 project명 추가하면 됨
client.delete_dataset('<Project_이름>.<dataset_이름>', not_found_ok = True)
# '{}.{}'.format(<Project_이름>.<dataset_이름>)도 가능 

4. Dataset 정보 수정

  • update_dataset : dataset 정보 update/수정시 사용
# 데이터셋 설명 업데이트
dsinfo = client.get_dataset("<dataset_이름>")
dsinfo.description = "설명 넣고 싶은 내용"
dsinfo = client.update_dataset(dsinfo, ['description'])
print(dsinfo.description)

# 데이터셋 access 업데이트 
dsinfo = bq.get_dataset("<dataset_이름>")
entry = bigquery.AccessEntry(
    role="READER",
    entity_type="userByEmail",
    entity_id="@gmail.com",
)
if entry not in dsinfo.access_entries:
  entries = list(dsinfo.access_entries)
  entries.append(entry)
  dsinfo.access_entries = entries
  dsinfo = bq.update_dataset(dsinfo, ["access_entries"])  # API request
else:
  print('{} already has access'.format(entry.entity_id))
print(dsinfo.access_entries)

5. Table 정보 확인

  • get_table : 기본 테이블 정보 확인
  • list_tables : 테이블 리스트 확인
  • list_tables 메소드를 이용해 얻은 tables의 정보는 하위 메소드를 이용하여 확인할 수 있음 : table_id, 행의 수, 설명, 태그, 스키마 등
tables = client.list_tables("<project_이름>.<dataset_이름>")
for table in tables:
	print(table.table_id)

COUNT() 쿼리 대신 table.num_rows, table.table_id를 이용하면 요금이 청구되지 않음

table = client.get_table("<Project>.<Dataset>.<Table>")
print(table.table_id)
print(table.num_rows)
print(table.schema) # 스키마 정보 확인

6. Table 삭제

  • delete_table
client.delete_table("<Dataset>.<Table>", not_found_ok = True)

삭제한 테이블 복구 가능 Max 2일 내

  • 타임스탬프를 명시해 복사본 만들면 됨
  • 그러나 테이블 삭제 후 데이터셋에 같은 ID를 가진 테이블을 생성 or 스냅샷을 보관하는 데이터셋도 삭제했다면 불가능
  • Therefore, 같은 이름의 테이블을 생성하는 것을 제한하고, dataset 이름을 결정할 때 조직 단위로 생성
  • Timestamp는 epoch로부터 해당 시간까지 경과한 초
bq --location=US cp <Dataset>.<Table>@<Timestamp정보> <Dataset>.<Table_2>

7. Table 생성

1. Table 생성 create_table

table_id = "<Project>.<Dataset>.<Table>"
table = client.create_table(table_id, exists_ok = True)

2. Table 생성 이후 메타데이터/스키마 update

schema = [
  bigquery.SchemaField("chapter", "INTEGER", mode="REQUIRED"),
  bigquery.SchemaField("title", "STRING", mode="REQUIRED"),
]
table_id = "<Project>.<Dataset>.<Table>"
table = client.get_table(table_id)
print(table.etag)

table.schema = schema
table = client.update_table(table, ["schema"])
print(table.schema)
print(table.etag)

BigQuery는 테이블에 갱신이있을 때마다 태그를 지정함 > update_table로 스키마를 업데이트하면 서버와 업로드한 스키마의 etag가 일치할 때만 실행됨 > update 이후에는 새로운 etag를 가짐

  • 테이블이 비어있으면 원하는대로 스키마 갱신이 가능하지만, 이미 데이터가 저장되어있다면 데이터와 호환되는 스키마로 변경만 가능함
  • Null 값 허용시, 컬럼 제약을 Required에서 Nullable로 완화도 가능

3. Table 생성 + 스키마 지정
테이블을 생성하는 시점에 스키마를 제공하는 것이 더 나은 선택지

schema = [
  bigquery.SchemaField("chapter", "INTEGER", mode="REQUIRED"),
  bigquery.SchemaField("title", "STRING", mode="REQUIRED"),
]

table_id = '{}.ch05.temp_table2'.format(PROJECT)
table = bigquery.Table(table_id, schema)
table = bq.create_table(table, exists_ok=True)
print('{} created on {}'.format(table.table_id, table.created))
print(table.schema)

4. New Table에 데이터 삽입

  • insert_rows(<넣을 table>, <넣을 내용>) 사용
  • 오류나지 않는 경우
    - clinet.insert_rows 메소드가 성공적으로 이뤄지면 해당 variable은 빈값을 가짐 BUT 데이터는 바로 Table에 추가되지 않음 > (어떤 내용이던 상관 없이)Table을 쿼리하면 Update됨
# 오류 없이 새로운 Table에 데이터 삽입한 경우 
# 오류나지 않은 데이터는 바로 Table에 추가되지 않음 > 빅쿼리 웹 UI > Streaming Buffer Statistics에서 확인 가능 > 
rows = [
  (1, u'What is BigQuery?'),
  (2, u'Query essentials'),
]
print(table.table_id, table.num_rows)
errors = bq.insert_rows(table, rows)
print(errors) # 아무것도 안나옴
  • 오류나는 경우
    - 오류가 일어나는 경우 한 ROW만 문제가 있더라도, 전체 INSERT하고자 하는 Row가 다 추가되지 않음
# 지정된 Schema에 적합하지 않은 데이터를 추가하는 try로 인해 오류 생기는 경우
# 현재 Schema는 첫번째 Col > INTEGER / 두번째 Col > STRING

rows = [
  ('3', u'Operating on data types'),
  ('wont work', u'This will fail'),
  ('4', u'Loading data into BigQuery'),
]
errors = bq.insert_rows(table, rows)
print(errors)

8. 데이터를 DF로 로드

  • load_table_from_dataframe(<df>, <table_id>)
# df > data
table_id = "<Project>.<Dataset>.<Table>"
job = client.load_table_from_dataframe(df, table_id)
job.result() # job 시간이 오래 걸리기 때문에 해당 코드로 중단 or 대기 할 수 있음

0개의 댓글