pyspark
PySpark agg
PySpark agg PySpark agg Groupby된 데이터프레임의 값을 집계할 때 사용하는 메서드 하나 이상의 값을 집계 가능 df.groupBy().agg(sum("col1"), sum("col2")) 와 같이 사용 가능 groupBy를 생략하고 df.agg()를 사용하면, df.groupBy().agg() 를 사용한 것으로 취급됨 References https://sparkbyexamples.com/pyspark/pyspark-groupby-agg-aggregate-explained/ https://spark.apache.org/docs/3.1.3/api/python/reference/api/pyspark.sql.DataFrame.agg.html
read morepyspark
PySpark Drop Only True
PySpark Drop Only True PySpark Drop Only True PySpark에서 True, False, Null로 이루어진 열이 있을 수 있음 이 열에서 값이 오직 True인 행만 drop하고 싶은 경우, 다음과 같이 구현하면 됨 import pyspark.sql.functions as F df = df.filter((F.col("c1").isNull()) | (F.col("c1") == False)) # 아래와 같이 괄호를 생략하면 안 됨 df = df.filter(F.col("c1").isNull() | F.col("c1") == False)
read morepyspark
Spark Job 2
Spark Job 2 Spark Job 2 Spark는 RDD 혹은 데이터프레임에서 Spark Action을 호출하면 Spark Job이 생성됨 Spark는 lazy 하게 연산하므로, map, filter와 같은 코드는 바로 실행되지 않음 대신 DAG를 만들어서 논리적 실행 계획을 구축 결과를 외부에 저장하거나, 값을 반환해야 하는 등 실제 연산이 필요한 시점에, Action이 실행됨 이 Action이 Job을 생성하는 것 Spark Job을 생성할 수 있는 Action 예시 count() collect() saveAsTextFile() reduce() foreach() References https://sparkbyexamples.com/spark/what-is-spark-job/
read morepyspark
Spark Job 1
Spark Job 1 Spark Job 1 Spark Job이란 single machine에서 수행되기에는 너무 큰 task를 의미 Spakr job은 여러 spark stage로 구분되어서 수행 각 stage는 데이터의 각 파티션에서 병렬로 수행됨 빠른 처리와 병렬성 제공 Spakr Job은 다음과 같이 구성됨 데이터 소스에서 데이터를 로드 맵, 리듀스, 조인 등을 통해 데이터를 조작 및 변환 처리된 데이터를 다시 저장 References https://sparkbyexamples.com/spark/what-is-spark-job/
read morepyspark
PySpark datediff
PySpark datediff PySpark datediff PySpark에서 두 날짜의 차이를 계산하는 함수 다음과 같이 사용하면 날짜 차이만큼의 정수가 반환됨 from pyspark.sql.functions import datediff df = df.withColumn("diff", datediff(df.d2, df.d1)) Spark 3.5.0 에서는 date_diff 메소드를 사용하는 것 같은데, 그 전의 버전에서는 date_diff를 사용하면 에러가 남. 주의할 것 References https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.functions.datediff.html https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.date_diff.html
read morepyspark
PySpark When
PySpark When PySpark When PySpark DataFrame에서 조건에 따라 다른 값을 내고 싶은 경우 사용하는 메서드. If-Else를 생각하면 된다 when(condition, "a").otherwise("b") 와 같이 사용 조건을 만족하지 않는데 otherwise 가 없는 경우, None을 반환 다음은 사용 예제 from pyspark.sql.functions import when # 아래와 같이 chaining도 가능 df = df.withColumn("class", when(df.score > 90, "S") .when(df.score > 75, "A") .when(df.score > 60, "B") .otherwise("C")) References https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.when.html https://sparkbyexamples.com/pyspark/pyspark-when-otherwise/
read morepyspark
PySpark Broadcast Join
PySpark Broadcast Join PySpark Broadcast Join 스파크가 조인할 때 노드간 통신을 하는 방법에 따라 shuffle join과 broadcast join으로 나뉨 Broadcast join은 테이블이 단일 노드의 메모리에 들어갈 정도로 작다면, 사용 가능한 join 방법 작은 DataFrame을 전체 노드에 모두 복사함 조인 시작시 한 번만 데이터가 복제되지만, 통신 비용이 처음 말고는 발생하지 않음 각 노드가 다른 노드를 기다리지 않음 단일 노드에서 소화하기 힘든 큰 테이블의 경우 비효율적일 수 있음 CPU에서 병목 발생 가능 Driver의 메모리가 터지기도 쉬움 큰 테이블과 작은 테이블을 join할 때 가능 잘 사용한다면 속도가 굉장히 빠름 References https://jjaesang.
read morepyspark
PySpark Shuffle Join
PySpark Shuffle Join PySpark Shuffle Join 스파크가 조인할 때 노드간 통신을 하는 방법에 따라 shuffle join과 broadcast join으로 나뉨 Shuffle join은 전체 노드 간 통신을 통해 join을 수행하는 방법 모든 데이터를 지정된 노드로 이동시키며, 동일한 키를 가진 데이터를 동일한 노드로 이동하는 등의 방식 사용 이렇게 데이터를 섞기 때문에 shuffle join 통신 자원이 많이 소요됨 샤딩이 잘 되어 있지 않는 경우 비효율적 큰 테이블과 큰 테이블을 join할 때 사용 References https://jjaesang.github.io/spark/2018/12/23/Spark-join.html https://mjs1995.
read morepyspark
PySpark Certain Column Null Count
PySpark Certain Column Null Count PySpark Certain Column Null Count PySpark에서 특정 열의 null 갯수를 세려면 다음과 같이 하면 됨 다음과 같이 특정 column에 isNull 함수를 적용하는 조건을 filter 안에 두면 됨 from pyspark.sql.functions import col df.filter(col("state").isNull()).count() References https://sparkbyexamples.com/pyspark/pyspark-isnull/
read morepyspark
PySpark Distinct
PySpark Distinct PySpark Distinct pyspark에서 모든 열에서 중복된 행을 제거하는 메서드는 distinct df_distinct = df.distinct()와 같이 간단히 사용 특정 열을 선택하기 위해서는 dropDuplicates 메서드를 사용하면 됨 df_distinct = df.dropDuplicates("col1", "col2") References https://sparkbyexamples.com/pyspark/pyspark-select-distinct/
read more