참고 : General하게 Dataset 관리 하는 여러가지 방법이 기재되어있는 페이지
Client와 Server 사이의 통신 방식 중 하나
HTTP URI(Uniform Resource Identifier)를 통해 자원(Resource)을 명시하고, HTTP Method(POST, GET, PUT, DELETE)를 통해 해당 자원에 대한 CRUD Operation을 적용
CRUD Operation
빅쿼리 REST URL의 기반은 https://www.googleapis.com/bigquery/v2
그 뒤로 /projects/<PROJECT>/datasets/<DATASET>/tables/<TABLE>
Question > 이해가 잘 안감
서비스 계정 만들기
서비스 계정 생성 완료
GOOGLE_APPLICATION_CREDENTIALS
환경 변수를 서비스 계정 키가 포함된 JSON 파일의 경로로 설정# Linux & Mac
export GOOGLE_APPLICATION_CREDENTIALS="KEY_PATH"
# Microsoft
## Powershell
$env:GOOGLE_APPLICATION_CREDENTIALS="KEY_PATH"
## 명령 프롬프트
set GOOGLE_APPLICATION_CREDENTIALS=KEY_PATH
conda install -c conda-forge google-cloud-bigquery
from google.cloud import bigquery
client = bigquery.Client(project = <PROJECT>)
Google Cloud Bigquery API Reference
Python-Bigquery Github : 코드 참고
이는 스트리밍 삽입 방식임 기존 스트리밍 API 사용_공식 웹사이트
- 스트리밍 삽입 방식은 빅쿼리 비용이 청구됨 > 이미 필요한 데이터를 갖고 있는 경우 로드 작업으로 대신할 수 있음, 이 경우 비용이 청구되지 않음
get_dataset
: 데이터셋에 대한 정보 확인dsinfo = client.get_dataset(<Project>.<dataset>)
# dataset 정보 확인
print(dsinfo.dataset_id)
print(dsinfo.created)
print(dsinfo.location)
# dataset 접근 권한 확인 > READER 권한 가진 role 확인
for access in dfinfo.access_entires:
if access.role == 'READER':
print(access)
create_dataset
: 데이터셋 생성# 기본 Region : US인 경우
dataset_id = "{}.<dataset_이름>".format(<Project_이름>)
ds = client.create_dataset(dataset_id, exists_ok = True)
# Specific한 Region 지정시 Local Dataset 객체 생성
# > dsinfo 변수에 저장 > location 값 지정 > Dataset 객체 전달 > create_dataset 메서드 호출
dataset_id = "{}.<dataset_이름>".format(<Project_이름>)
dsinfo = bigquery.Dataset(dataset_id)
dsinfo.location = 'EU'
ds = client.create_dataset(ds_info, exists_ok = True)
delete_dataset
client.delete_dataset('<dataset_이름>', not_found_ok = True)
# 특정 프로젝트의 데이터셋 삭제는 앞에 project명 추가하면 됨
client.delete_dataset('<Project_이름>.<dataset_이름>', not_found_ok = True)
# '{}.{}'.format(<Project_이름>.<dataset_이름>)도 가능
update_dataset
: dataset 정보 update/수정시 사용 # 데이터셋 설명 업데이트
dsinfo = client.get_dataset("<dataset_이름>")
dsinfo.description = "설명 넣고 싶은 내용"
dsinfo = client.update_dataset(dsinfo, ['description'])
print(dsinfo.description)
# 데이터셋 access 업데이트
dsinfo = bq.get_dataset("<dataset_이름>")
entry = bigquery.AccessEntry(
role="READER",
entity_type="userByEmail",
entity_id="@gmail.com",
)
if entry not in dsinfo.access_entries:
entries = list(dsinfo.access_entries)
entries.append(entry)
dsinfo.access_entries = entries
dsinfo = bq.update_dataset(dsinfo, ["access_entries"]) # API request
else:
print('{} already has access'.format(entry.entity_id))
print(dsinfo.access_entries)
get_table
: 기본 테이블 정보 확인list_tables
: 테이블 리스트 확인list_tables
메소드를 이용해 얻은 tables의 정보는 하위 메소드를 이용하여 확인할 수 있음 : table_id, 행의 수, 설명, 태그, 스키마 등tables = client.list_tables("<project_이름>.<dataset_이름>")
for table in tables:
print(table.table_id)
COUNT()
쿼리 대신table.num_rows
,table.table_id
를 이용하면 요금이 청구되지 않음
table = client.get_table("<Project>.<Dataset>.<Table>")
print(table.table_id)
print(table.num_rows)
print(table.schema) # 스키마 정보 확인
delete_table
client.delete_table("<Dataset>.<Table>", not_found_ok = True)
삭제한 테이블 복구 가능 Max 2일 내
- 타임스탬프를 명시해 복사본 만들면 됨
- 그러나 테이블 삭제 후 데이터셋에 같은 ID를 가진 테이블을 생성 or 스냅샷을 보관하는 데이터셋도 삭제했다면 불가능
- Therefore, 같은 이름의 테이블을 생성하는 것을 제한하고, dataset 이름을 결정할 때 조직 단위로 생성
- Timestamp는 epoch로부터 해당 시간까지 경과한 초
bq --location=US cp <Dataset>.<Table>@<Timestamp정보> <Dataset>.<Table_2>
1. Table 생성 create_table
table_id = "<Project>.<Dataset>.<Table>"
table = client.create_table(table_id, exists_ok = True)
2. Table 생성 이후 메타데이터/스키마 update
schema = [
bigquery.SchemaField("chapter", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("title", "STRING", mode="REQUIRED"),
]
table_id = "<Project>.<Dataset>.<Table>"
table = client.get_table(table_id)
print(table.etag)
table.schema = schema
table = client.update_table(table, ["schema"])
print(table.schema)
print(table.etag)
BigQuery는 테이블에 갱신이있을 때마다 태그를 지정함 >
update_table
로 스키마를 업데이트하면 서버와 업로드한 스키마의 etag가 일치할 때만 실행됨 > update 이후에는 새로운 etag를 가짐
- 테이블이 비어있으면 원하는대로 스키마 갱신이 가능하지만, 이미 데이터가 저장되어있다면 데이터와 호환되는 스키마로 변경만 가능함
- Null 값 허용시, 컬럼 제약을 Required에서 Nullable로 완화도 가능
3. Table 생성 + 스키마 지정
테이블을 생성하는 시점에 스키마를 제공하는 것이 더 나은 선택지
schema = [
bigquery.SchemaField("chapter", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("title", "STRING", mode="REQUIRED"),
]
table_id = '{}.ch05.temp_table2'.format(PROJECT)
table = bigquery.Table(table_id, schema)
table = bq.create_table(table, exists_ok=True)
print('{} created on {}'.format(table.table_id, table.created))
print(table.schema)
4. New Table에 데이터 삽입
insert_rows(<넣을 table>, <넣을 내용>)
사용clinet.insert_rows
메소드가 성공적으로 이뤄지면 해당 variable은 빈값을 가짐 BUT 데이터는 바로 Table에 추가되지 않음 > (어떤 내용이던 상관 없이)Table을 쿼리하면 Update됨# 오류 없이 새로운 Table에 데이터 삽입한 경우
# 오류나지 않은 데이터는 바로 Table에 추가되지 않음 > 빅쿼리 웹 UI > Streaming Buffer Statistics에서 확인 가능 >
rows = [
(1, u'What is BigQuery?'),
(2, u'Query essentials'),
]
print(table.table_id, table.num_rows)
errors = bq.insert_rows(table, rows)
print(errors) # 아무것도 안나옴
# 지정된 Schema에 적합하지 않은 데이터를 추가하는 try로 인해 오류 생기는 경우
# 현재 Schema는 첫번째 Col > INTEGER / 두번째 Col > STRING
rows = [
('3', u'Operating on data types'),
('wont work', u'This will fail'),
('4', u'Loading data into BigQuery'),
]
errors = bq.insert_rows(table, rows)
print(errors)
load_table_from_dataframe(<df>, <table_id>
)# df > data
table_id = "<Project>.<Dataset>.<Table>"
job = client.load_table_from_dataframe(df, table_id)
job.result() # job 시간이 오래 걸리기 때문에 해당 코드로 중단 or 대기 할 수 있음