본문 바로가기
Data Engineering/PySpark

[PySpark] PySpark 기본 예제

by Toritol 2022. 10. 21.
728x90

 지난 글에서는 PySpark를 설치하고 Pycharm과 연동하는 것까지 살펴보았습니다. 이번 글에서는 간단하게 예제 데이터를 생성하고, 해당 데이터를 다뤄보는 것을 통해 PySpark의 간단한 사용법을 파악하고자 합니다.

 

from pyspark.sql import SparkSession

# 스파크 세션 생성
spark = SparkSession.builder.master("local").appName("SparkSQL").getOrCreate()
# 로그 레벨 정의
spark.sparkContext.setLogLevel("ERROR")

 

 우선 spark 세션을 생성하고 로그 레벨을 정의해줍니다. 로그 레벨을 정의하는 것은 필수는 아니지만, 많은 로그가 나올 경우 필요한 로그를 확인하기 어려울 수 있습니다. 따라서 로그 레벨을 정의하는 것을 통해 원하는 수준의 로그만 확인하는 것이 가능합니다.

 

# 예제
data = [('001','Smith','M',40,'DA',4000),
        ('002','Rose','M',35,'DA',3000),
        ('003','Williams','M',30,'DE',2500),
        ('004','Anne','F',30,'DE',3000),
        ('005','Mary','F',35,'BE',4000),
        ('006','James','M',30,'FE',3500)]

columns = ["cd","name","gender","age","div","salary"]
df = spark.createDataFrame(data = data, schema = columns)

 

 다음은 예제 테이블을 생성하는 과정입니다. 다른 블로그에 있는 예제 테이블을 참고하여 나만의 테이블을 생성하였습니다.

 

>>> df.printSchema()
root
 |-- cd: string (nullable = true)
 |-- name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: long (nullable = true)
 |-- div: string (nullable = true)
 |-- salary: long (nullable = true)
 
>>> df.show()
+---+--------+------+---+---+------+
| cd|    name|gender|age|div|salary|
+---+--------+------+---+---+------+
|001|   Smith|     M| 40| DA|  4000|
|002|    Rose|     M| 35| DA|  3000|
|003|Williams|     M| 30| DE|  2500|
|004|    Anne|     F| 30| DE|  3000|
|005|    Mary|     F| 35| BE|  4000|
|006|   James|     M| 30| FE|  3500|
+---+--------+------+---+---+------+

 

 생성한 테이블의 스키마와 출력하는 과정은 위와 같습니다. 출력 결과가 기존에 사용하던 Data Frame과 다른 것을 알 수 있습니다.

 

>>> df.createOrReplaceTempView("EMP_INFO")
>>> df2 = spark.sql("select name, gender, div, salary from emp_info")
>>> df2.show()

+--------+------+---+------+
|    name|gender|div|salary|
+--------+------+---+------+
|   Smith|     M| DA|  4000|
|    Rose|     M| DA|  3000|
|Williams|     M| DE|  2500|
|    Anne|     F| DE|  3000|
|    Mary|     F| BE|  4000|
|   James|     M| FE|  3500|
+--------+------+---+------+

>>> df.select('name', 'gender', 'div', 'salary').show()
+--------+------+---+------+
|    name|gender|div|salary|
+--------+------+---+------+
|   Smith|     M| DA|  4000|
|    Rose|     M| DA|  3000|
|Williams|     M| DE|  2500|
|    Anne|     F| DE|  3000|
|    Mary|     F| BE|  4000|
|   James|     M| FE|  3500|
+--------+------+---+------+

 

 가장 간단한 select절부터 살펴보면 두 가지 방법이 존재합니다. 첫 번째로 SQL 구문 자체를 활용하는 방법과, 두 번째로 PySpark 함수를 사용하는 방법입니다. 아직 기초적인 단계이기에 실무에서는 어떠한 방법이 실용적인지는 판단하기 어려우나, 본 글에서는 SQL 구문 자체를 활용하는 방법보다는 PySpark 함수를 사용하는 방법을 위주로 설명하고자 합니다.

 

>>> df.filter("gender == 'F'").show()
+---+----+------+---+---+------+
| cd|name|gender|age|div|salary|
+---+----+------+---+---+------+
|004|Anne|     F| 30| DE|  3000|
|005|Mary|     F| 35| BE|  4000|
+---+----+------+---+---+------+

>>> from pyspark.sql import functions as F
>>> df.filter(
...     (F.col("div") == "DA") &
...     (F.col("salary") > 3500)
... ).count()
1

 

 조건절은 filter 함수를 사용하며, 아래의 코드와 같이 여러 개의 조건을 사용하는 것도 가능합니다.

 

>>> df.groupby("div").count().sort("count", ascending=True).show()
+---+-----+
|div|count|
+---+-----+
| FE|    1|
| BE|    1|
| DE|    2|
| DA|    2|
+---+-----+

 

 group by절은 groupby 함수를 사용하고, order by절은 sort 함수를 통해 가능합니다.

 

test_df = df.select('name', 'gender', 'div', 'salary').toPandas()

 

 앞서 사용한 PySpark 함수들 뒤에 toPandas 함수를 사용하게 되면, 기존의 Pandas에서 사용하던 Data Frame 형태로 변형하는 것이 가능합니다. 따라서 PySpark 내에서 수행하기 어려운 분석의 경우에는 Data Frame으로 변형한 후에 수행할 수 있습니다. 이와 같이 데이터베이스에서 데이터를 추출할 수 있는 가장 기본적인 방법들을 탐색해봤으며, 추후 업무에서 활용하는 것에 따라 PySpark에 관한 내용들을 지속적으로 추가하고자 합니다.

 

728x90

'Data Engineering > PySpark' 카테고리의 다른 글

[PySpark] PySpark 설치하기  (0) 2022.10.21

댓글