zeppelin airline 데이터 탐색

이상민·2023년 4월 3일
0

SQL

목록 보기
5/8

setting

Temp View 등록

%pyspark

df.createOrReplaceTempView("df")

df = spark.read.csv("/skybluelee/dat/airline_on_time")
sql에서 사용하기 위해 tempview로 등록

Catalog 확인

%pyspark

print(spark.catalog.listDatabases())

print(spark.catalog.listTables())

[Database(name='default', description='default database', locationUri='file:/home/spark/spark-warehouse')]
[Table(name='df', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]

Cache 처리(Persist 처리)

df이 자주 사용될 예정이므로 캐시처리함

%pyspark

import pyspark

df.persist(pyspark.StorageLevel.MEMORY_AND_DISK)
df.count()

action을 위한 count

Cache 해제(Unpersist)

%pyspark

df.unpersist()

데이터 탐색....

  • [Q-01] 항공사 목록?
%pyspark
from pyspark.sql.functions import asc
df.select("UniqueCarrier").distinct()\
  .orderBy(asc("UniqueCarrier"))\
  .show(50)


%sql
SELECT	DISTINCT UniqueCarrier
FROM	df
ORDER	BY 1 asc
  • [Q-02] 항공사 개수?
%pyspark
from pyspark.sql.functions import count
df.select("UniqueCarrier").distinct().select(count("UniqueCarrier").alias("carrier count"))
  .show(50)

%sql
SELECT	COUNT(DISTINCT UniqueCarrier) AS `carrier count` -- 공백이 존재한다면 ``로 감싸준다
FROM	df
  • [Q-03] 항공사별 비행 횟수?
%pyspark
from pyspark.sql.functions import count
df.select("UniqueCarrier").agg(count("*").alias("flight_count"))
  .show(50)
-- 집계함수는 agg로 감싸주어야 함

%sql
SELECT	UniqueCarrier, COUNT(*) AS flight_count
FROM	df
GROUP	BY UniqueCarrier
  • [Q-04] 항공사별 계획된 비행 횟수 vs. 실제 비행 횟수 vs. 취소된 비행 횟수? + 취소율(%)?
비행 취소 attribute 확인
%pyspark

df.printSchema()

root
  ...
 |-- Cancelled: integer (nullable = true)
  ...

Cancelled의 type이 int임 -> NA값은 없을 것으로 추정
취소된 값을 파악하기 위해 Cancelled와 연관있는 attribute을 선택해 확인

%sql

SELECT	Cancelled, CancellationCode, DepDelay, ArrDelay
FROM	df
WHERE	TRUE -- and Cancelled = 1 // true로 먼저 설정하고 뒤의 값을 주석처리하여 where문을 더 빠르게 사용
ORDER	BY 1 desc

%sql

SELECT	Cancelled, CancellationCode, DepDelay, ArrDelay
FROM	df
WHERE	TRUE -- and Cancelled = 1 // true로 먼저 설정하고 뒤의 값을 주석처리하여 where문을 더 빠르게 사용
ORDER	BY 1 asc


CancellationCode의 경우 값이 제대로 나오지 않는 경우가 많을 것으로 추정
출발 지연과 도착 지연의 값을 토대로 취소된 경우는 Cancelled의 값이 1인 경우로 추정할 수 있음

%sql

SELECT	DISTINCT Cancelled
FROM	df

에서 Cancelled의 값이 1과 0으로만 이루어져 있는 것을 확인

실제 값 확인
%pyspark
from pyspark.sql.functions import count, sum, when, col, round
df.groupBy("UniqueCarrier")\
  .agg(count(*).alias("flight_count"),
  	   sum("Cancelled").alias("flight_cancel_count"),
       sum(when(col("Cancelled") == 0, 1).otherwise(0)).alias("flight_real_count"),
       (sum(when(df.Cancelled == 1, 1).otherwise(0)) + sum(when(df.Cancelled == 0, 1).otherwise(0))).alias("flight_total_count"))\
  .withColumn("flight_gap", col("flight_count") - col("flight_total_count"))\
  .withColumn("cancel_rate(%)", round(col("flight_cancel_count") / col("flight_count") * 100, 1)) \
  .orderBy(col("cancel_rate(%)").desc(), col("flight_cancel_count").desc(), col("flight_count").desc()) \
  .show(100)

%sql
WITH temp AS -- 각 항공사별 전체 비행 횟수, 실제 비행 횟수, 취소된 비행 횟수
(
	SELECT	UniqueCarrier, COUNT(*) AS flight_count, SUM(Cancelled) AS flight_cancel_count,
    		SUM(case when Cancelled = 0 then 1 else 0 end) AS flight_real_count,
    		SUM(case when Cancelled = 1 then 1 else 0 end) + 
            SUM(case when Cancelled = 0 then 1 else 0 end) as flight_total_count
            -- (실제 비행 + 비행 취소 = 전체 비행) 확인 용도
	FROM	df
    GROUP	BY UniqueCarrier
)      
SELECT 	*, (flight_count - flight_total_count) AS flight_gap,
		ROUND(flight_cancel_count / flight_count * 100, 1) as `cancel_rate(%)` -- 기호가 있다면 `사용
FROM	temp        
ORDER 	BY `cancel_rate(%)` desc, flight_cancel_count desc, flight_count desc

SELECT	DISTINCT(flight_count - flight_total_count) AS flight_gap
FROM	temp

에서 (실제 비행 + 비행 취소 = 전체 비행) 임을 확인할 수 있다.

  • [Q-05] 붐비는 공항?
  • [Q-06] 실제 비행시간 vs. 예상 비행시간.... 차이가 큰 노선은?
  • [Q-07] 각 공항별 노선별 운항 비중(%)은?
  • [Q-08] 각 공항별 출발만 하는 항공사 vs. 도착만 하는 항공사 vs. 출발/도착 모두 하는 항공사?
  • [Q-09] 각 노선별 항공사 운항 횟수?

0개의 댓글