
지난 글에서는 PySpark를 설치하고 Pycharm과 연동하는 것까지 살펴보았습니다. 이번 글에서는 간단하게 예제 데이터를 생성하고, 해당 데이터를 다뤄보는 것을 통해 PySpark의 간단한 사용법을 파악하고자 합니다.
from pyspark.sql import SparkSession
# 스파크 세션 생성
spark = SparkSession.builder.master("local").appName("SparkSQL").getOrCreate()
# 로그 레벨 정의
spark.sparkContext.setLogLevel("ERROR")
우선 spark 세션을 생성하고 로그 레벨을 정의해줍니다. 로그 레벨을 정의하는 것은 필수는 아니지만, 많은 로그가 나올 경우 필요한 로그를 확인하기 어려울 수 있습니다. 따라서 로그 레벨을 정의하는 것을 통해 원하는 수준의 로그만 확인하는 것이 가능합니다.
# 예제
data = [('001','Smith','M',40,'DA',4000),
('002','Rose','M',35,'DA',3000),
('003','Williams','M',30,'DE',2500),
('004','Anne','F',30,'DE',3000),
('005','Mary','F',35,'BE',4000),
('006','James','M',30,'FE',3500)]
columns = ["cd","name","gender","age","div","salary"]
df = spark.createDataFrame(data = data, schema = columns)
다음은 예제 테이블을 생성하는 과정입니다. 다른 블로그에 있는 예제 테이블을 참고하여 나만의 테이블을 생성하였습니다.
>>> df.printSchema()
root
|-- cd: string (nullable = true)
|-- name: string (nullable = true)
|-- gender: string (nullable = true)
|-- age: long (nullable = true)
|-- div: string (nullable = true)
|-- salary: long (nullable = true)
>>> df.show()
+---+--------+------+---+---+------+
| cd| name|gender|age|div|salary|
+---+--------+------+---+---+------+
|001| Smith| M| 40| DA| 4000|
|002| Rose| M| 35| DA| 3000|
|003|Williams| M| 30| DE| 2500|
|004| Anne| F| 30| DE| 3000|
|005| Mary| F| 35| BE| 4000|
|006| James| M| 30| FE| 3500|
+---+--------+------+---+---+------+
생성한 테이블의 스키마와 출력하는 과정은 위와 같습니다. 출력 결과가 기존에 사용하던 Data Frame과 다른 것을 알 수 있습니다.
>>> df.createOrReplaceTempView("EMP_INFO")
>>> df2 = spark.sql("select name, gender, div, salary from emp_info")
>>> df2.show()
+--------+------+---+------+
| name|gender|div|salary|
+--------+------+---+------+
| Smith| M| DA| 4000|
| Rose| M| DA| 3000|
|Williams| M| DE| 2500|
| Anne| F| DE| 3000|
| Mary| F| BE| 4000|
| James| M| FE| 3500|
+--------+------+---+------+
>>> df.select('name', 'gender', 'div', 'salary').show()
+--------+------+---+------+
| name|gender|div|salary|
+--------+------+---+------+
| Smith| M| DA| 4000|
| Rose| M| DA| 3000|
|Williams| M| DE| 2500|
| Anne| F| DE| 3000|
| Mary| F| BE| 4000|
| James| M| FE| 3500|
+--------+------+---+------+
가장 간단한 select절부터 살펴보면 두 가지 방법이 존재합니다. 첫 번째로 SQL 구문 자체를 활용하는 방법과, 두 번째로 PySpark 함수를 사용하는 방법입니다. 아직 기초적인 단계이기에 실무에서는 어떠한 방법이 실용적인지는 판단하기 어려우나, 본 글에서는 SQL 구문 자체를 활용하는 방법보다는 PySpark 함수를 사용하는 방법을 위주로 설명하고자 합니다.
>>> df.filter("gender == 'F'").show()
+---+----+------+---+---+------+
| cd|name|gender|age|div|salary|
+---+----+------+---+---+------+
|004|Anne| F| 30| DE| 3000|
|005|Mary| F| 35| BE| 4000|
+---+----+------+---+---+------+
>>> from pyspark.sql import functions as F
>>> df.filter(
... (F.col("div") == "DA") &
... (F.col("salary") > 3500)
... ).count()
1
조건절은 filter 함수를 사용하며, 아래의 코드와 같이 여러 개의 조건을 사용하는 것도 가능합니다.
>>> df.groupby("div").count().sort("count", ascending=True).show()
+---+-----+
|div|count|
+---+-----+
| FE| 1|
| BE| 1|
| DE| 2|
| DA| 2|
+---+-----+
group by절은 groupby 함수를 사용하고, order by절은 sort 함수를 통해 가능합니다.
test_df = df.select('name', 'gender', 'div', 'salary').toPandas()
앞서 사용한 PySpark 함수들 뒤에 toPandas 함수를 사용하게 되면, 기존의 Pandas에서 사용하던 Data Frame 형태로 변형하는 것이 가능합니다. 따라서 PySpark 내에서 수행하기 어려운 분석의 경우에는 Data Frame으로 변형한 후에 수행할 수 있습니다. 이와 같이 데이터베이스에서 데이터를 추출할 수 있는 가장 기본적인 방법들을 탐색해봤으며, 추후 업무에서 활용하는 것에 따라 PySpark에 관한 내용들을 지속적으로 추가하고자 합니다.
'Data Engineering > PySpark' 카테고리의 다른 글
| [PySpark] PySpark 설치하기 (0) | 2022.10.21 |
|---|
댓글