%pyspark
df.createOrReplaceTempView("df")
df = spark.read.csv("/skybluelee/dat/airline_on_time")
sql에서 사용하기 위해 tempview로 등록
%pyspark
print(spark.catalog.listDatabases())
print(spark.catalog.listTables())
[Database(name='default', description='default database', locationUri='file:/home/spark/spark-warehouse')]
[Table(name='df', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]
df이 자주 사용될 예정이므로 캐시처리함
%pyspark
import pyspark
df.persist(pyspark.StorageLevel.MEMORY_AND_DISK)
df.count()
action을 위한 count
%pyspark
df.unpersist()
%pyspark
from pyspark.sql.functions import asc
df.select("UniqueCarrier").distinct()\
.orderBy(asc("UniqueCarrier"))\
.show(50)
%sql
SELECT DISTINCT UniqueCarrier
FROM df
ORDER BY 1 asc
%pyspark
from pyspark.sql.functions import count
df.select("UniqueCarrier").distinct().select(count("UniqueCarrier").alias("carrier count"))
.show(50)
%sql
SELECT COUNT(DISTINCT UniqueCarrier) AS `carrier count` -- 공백이 존재한다면 ``로 감싸준다
FROM df
%pyspark
from pyspark.sql.functions import count
df.select("UniqueCarrier").agg(count("*").alias("flight_count"))
.show(50)
-- 집계함수는 agg로 감싸주어야 함
%sql
SELECT UniqueCarrier, COUNT(*) AS flight_count
FROM df
GROUP BY UniqueCarrier
%pyspark
df.printSchema()
root
...
|-- Cancelled: integer (nullable = true)
...
Cancelled의 type이 int임 -> NA값은 없을 것으로 추정
취소된 값을 파악하기 위해 Cancelled와 연관있는 attribute을 선택해 확인
%sql
SELECT Cancelled, CancellationCode, DepDelay, ArrDelay
FROM df
WHERE TRUE -- and Cancelled = 1 // true로 먼저 설정하고 뒤의 값을 주석처리하여 where문을 더 빠르게 사용
ORDER BY 1 desc
%sql
SELECT Cancelled, CancellationCode, DepDelay, ArrDelay
FROM df
WHERE TRUE -- and Cancelled = 1 // true로 먼저 설정하고 뒤의 값을 주석처리하여 where문을 더 빠르게 사용
ORDER BY 1 asc
CancellationCode의 경우 값이 제대로 나오지 않는 경우가 많을 것으로 추정
출발 지연과 도착 지연의 값을 토대로 취소된 경우는 Cancelled의 값이 1인 경우로 추정할 수 있음
%sql
SELECT DISTINCT Cancelled
FROM df
에서 Cancelled의 값이 1과 0으로만 이루어져 있는 것을 확인
%pyspark
from pyspark.sql.functions import count, sum, when, col, round
df.groupBy("UniqueCarrier")\
.agg(count(*).alias("flight_count"),
sum("Cancelled").alias("flight_cancel_count"),
sum(when(col("Cancelled") == 0, 1).otherwise(0)).alias("flight_real_count"),
(sum(when(df.Cancelled == 1, 1).otherwise(0)) + sum(when(df.Cancelled == 0, 1).otherwise(0))).alias("flight_total_count"))\
.withColumn("flight_gap", col("flight_count") - col("flight_total_count"))\
.withColumn("cancel_rate(%)", round(col("flight_cancel_count") / col("flight_count") * 100, 1)) \
.orderBy(col("cancel_rate(%)").desc(), col("flight_cancel_count").desc(), col("flight_count").desc()) \
.show(100)
%sql
WITH temp AS -- 각 항공사별 전체 비행 횟수, 실제 비행 횟수, 취소된 비행 횟수
(
SELECT UniqueCarrier, COUNT(*) AS flight_count, SUM(Cancelled) AS flight_cancel_count,
SUM(case when Cancelled = 0 then 1 else 0 end) AS flight_real_count,
SUM(case when Cancelled = 1 then 1 else 0 end) +
SUM(case when Cancelled = 0 then 1 else 0 end) as flight_total_count
-- (실제 비행 + 비행 취소 = 전체 비행) 확인 용도
FROM df
GROUP BY UniqueCarrier
)
SELECT *, (flight_count - flight_total_count) AS flight_gap,
ROUND(flight_cancel_count / flight_count * 100, 1) as `cancel_rate(%)` -- 기호가 있다면 `사용
FROM temp
ORDER BY `cancel_rate(%)` desc, flight_cancel_count desc, flight_count desc
SELECT DISTINCT(flight_count - flight_total_count) AS flight_gap
FROM temp
에서 (실제 비행 + 비행 취소 = 전체 비행) 임을 확인할 수 있다.