[빅데이터분석] Spark RDD 다루기 : 구조 생성, 데이터 처리 및 Map-Reduce
<상명대학교 빅데이터분석 임좌상 교수님 수업을 듣고>
RDD¶
RDD(Resilient Distributed Dataset)는 Apache Spark에서 데이터를 표현하는 기본 구조이다. Spark에서는 RDD, Databrame, DataSet 세 가지 데이터구조를 제공 RDD는 데이터가 비구조적인 경우 사용하기 적합하다
Spark의 RDD, DataFrame 모두 immutable이라 일단 생성되고 나면 원본을 수정할 수 없다.
해당 Jupyter Notebook에서는 아래의 내용을 다루려고 한다.
1. RDD 생성과 기본 동작¶
- parallelize(list) : 배멸에서 읽어서 RDD를 생성
- take(출력개수) || collect() : RDD를 출력
- collect() : 모든 파티션의 데이터를 수집하여 로컬의 리스트로 반환하는 액션(Action) 함수
2. RDD 읽기 및 구성¶
- Partition : Spark의 논리적인 데이터 분할, 즉 일정한 크기로 잘라놓은 데이터 뭉치
- 파일에서 RDD로 읽기, CSV에서 RDD로 읽기
3. Map-Reduce¶
- map() : 각 데이터 요소에 함수 적용
- reduce() : 각 데이터에 대해 함수를 반복적으로 적용하여 결과 값을 만들어 냄
4. 데이터 조작과 필터링¶
- filter() : 데이터 선별
- flat : 2차원 -> 1차원으로 변환
- foreach() : 각 데이터 요소에 함수 적용(반환값이 없음)
5. 통계¶
- 통계함수
import pyspark
spark = pyspark.sql.SparkSession.builder\
.master("local")\
.appName("myApp")\
.getOrCreate()
23/11/27 05:24:58 WARN Utils: Your hostname, sojaehwiui-MacBookPro.local resolves to a loopback address: 127.0.0.1; using 172.30.1.29 instead (on interface en0) 23/11/27 05:24:58 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 23/11/27 05:24:59 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
myList=[1,2,3,4,5,6,7]
parallelize(list) : 배멸에서 읽어서 RDD를 생성¶
myRdd1 = spark.sparkContext.parallelize(myList) #RDD
type(myRdd1)
pyspark.rdd.RDD
take(출력개수) || collect() : RDD를 출력¶
myRdd1.take(3) #List
[1, 2, 3]
collect() : 모든 파티션의 데이터를 수집하여 로컬의 리스트로 반환하는 액션(Action) 함수¶
spark.sparkContext.parallelize([0, 2, 3, 4, 6], 2).collect() #List
[0, 2, 3, 4, 6]
Partition
파티션이란 논리적인 데이터 분할, 즉 일정한 크기로 잘라놓은 데이터 뭉치이다.
따라서 RDD는 파티션으로 구성되어 있다고 할 수 있다. 파티션의 수는 원하는 숫자만큼 구성할 수 있다. 데이터를 분할해 놓으면 동시에 여러 파티션을 나누어 여러 노드/스레드에서 병렬처리가 가능해진다.
rdd = spark.sparkContext.parallelize([0, 2, 3, 4, 6], 3) # 3개의 파티션으로 분리.
glom() : 파티션을 유지함¶
rdd.glom().collect()
[[0], [2, 3], [4, 6]]
RDD 읽기¶
파일에서 RDD 읽어오기¶
%%writefile data/ds_spark_wiki.txt
Wikipedia
Apache Spark is an open source cluster computing framework.
아파치 스파크는 오픈 소스 클러스터 컴퓨팅 프레임워크이다.
Apache Spark Apache Spark Apache Spark Apache Spark
아파치 스파크 아파치 스파크 아파치 스파크 아파치 스파크
Originally developed at the University of California, Berkeley's AMPLab,
the Spark codebase was later donated to the Apache Software Foundation,
which has maintained it since.
Spark provides an interface for programming entire clusters with
implicit data parallelism and fault-tolerance.
Overwriting data/ds_spark_wiki.txt
import os
myRdd2=spark.sparkContext\
.textFile(os.path.join("data","ds_spark_wiki.txt"))
myRdd2.first() #첫 데이터만 조회 -> RDD는 줄바꿈을 기준으로 Row 단위로 만든다.
# myRdd2.collect()
'Wikipedia'
CSV에서 RDD 읽어오기¶
%%writefile ./data/ds_spark_2cols.csv
35, 2
40, 27
12, 38
15, 31
21, 1
14, 19
46, 1
10, 34
28, 3
48, 1
16, 2
30, 3
32, 2
48, 1
31, 2
22, 1
12, 3
39, 29
19, 37
25, 2
Overwriting ./data/ds_spark_2cols.csv
myRdd4 = spark.sparkContext\
.textFile(os.path.join("data","ds_spark_2cols.csv"))
myList=myRdd4.take(5)
print(myList)
['35, 2', '40, 27', '12, 38', '15, 31', '21, 1']
myRdd4.map(lambda x: x.split(",")).collect()
#Lambda식으로 ,를 기준으로 쪼갬
[['35', ' 2'], ['40', ' 27'], ['12', ' 38'], ['15', ' 31'], ['21', ' 1'], ['14', ' 19'], ['46', ' 1'], ['10', ' 34'], ['28', ' 3'], ['48', ' 1'], ['16', ' 2'], ['30', ' 3'], ['32', ' 2'], ['48', ' 1'], ['31', ' 2'], ['22', ' 1'], ['12', ' 3'], ['39', ' 29'], ['19', ' 37'], ['25', ' 2']]
myRdd4.map(lambda x: x.split(","))\
.map(lambda x: int(x[0])+int(x[1]))\
.collect()
#Lambda식으로 리스트의 1,2번째 합
[37, 67, 50, 46, 22, 33, 47, 44, 31, 49, 18, 33, 34, 49, 33, 23, 15, 68, 56, 27]
문제: 파일에서 RDD 생성¶
다음 링크에서 파일을 읽어서 RDD를 생성하고, 5줄을 화면출력하세요.
경기도 의정부시 인구현황 (파일명: 경기도 의정부시_인구현황_20230731) https://www.data.go.kr/data/15009613/fileData.do
제주특별자치도 서귀포시 내 연도별 65세이상 인구수 및 고령화비율, 노령화지수 현황 (파일명: 제주특별자치도 서귀포시_고령화비율및노령화지수현황_20200623) https://www.data.go.kr/data/15051545/fileData.do
파일을 읽을 경우, 문자가 한글인지, 영어인지 어떻게 인코딩되었는지 주의해야 한다. 결과가 깨져보인다면, 왜 그런지 이유를 적어보자.
popRdd = spark.sparkContext\
.textFile(os.path.join("data","경기도 의정부시_인구현황_20230831.csv"), use_unicode=True)
popRdd.take(5) # 화면에 출력하면 한글이 깨져있다. use_unicode=True설정을 주었는데도 그렇다. 다운로드 받으면서 한글이 깨져 있기 때문에 그렇다. 다운로드 받은 파일을 수정해서 출력하면 된다.
['�������,�α���(��),�α���(��),�α���(��),������(��),������(��),������(��),����,�����,������α�,���������,�����μ���,�μ���ȭ��ȣ,�����ͱ�������', '������1��,37557 ,19039 ,18518 ,8.08,4.1,3.98,102.81,22514 ,1.67,��\u2d75 �����ν�û,�ο����ǰ�,031-828-2466,2023-06-30', '������2��,29729 ,14817 ,14912 ,6.4,3.19,3.21,99.36,16007 ,1.86,��\u2d75 �����ν�û,�ο����ǰ�,031-828-2466,2023-06-30', 'ȣ��1��,34658 ,16762 ,17896 ,7.46,3.61,3.85,93.66,15296 ,2.27,��\u2d75 �����ν�û,�ο����ǰ�,031-828-2466,2023-06-30', 'ȣ��2��,33400 ,16088 ,17312 ,7.19,3.46,3.72,92.93,13422 ,2.49,��\u2d75 �����ν�û,�ο����ǰ�,031-828-2466,2023-06-30']
agedRdd = spark.sparkContext\
.textFile(os.path.join("data","제주특별자치도 서귀포시_고령화비율및노령화지수현황_20230324.csv"), use_unicode=True)
for i in agedRdd.take(5)[1:]:
j=i.split(",")
j.append(int(j[2])-int(j[3])-int(j[4]))
print(j)
['2008', '12', '153120', '22241', '26792', '14.52', '83.01', '2023-03-24', 104087] ['2009', '12', '152285', '23031', '25504', '15.12', '90.3', '2023-03-24', 103750] ['2010', '12', '153716', '23990', '24633', '15.6', '97.38', '2023-03-24', 105093] ['2011', '12', '153366', '24839', '23686', '16.2', '104.86', '2023-03-24', 104841]
binaryFiles¶
binaryFiles()는 이진파일을 읽는 함수이다. 인코딩된 cp949, euc-kr 등으로 디코딩하여 한글을 불러오는것을 시도한다.
popRddBin = spark.sparkContext.binaryFiles(os.path.join("data","경기도 의정부시_인구현황_20230831.csv"))
_my = popRddBin.map(lambda x :x[1].decode('euc-kr'))
_my.take(1) # RDD binaryFiles로 읽으니, 파일의 전체내용을 하나의 값으로 읽을 뿐만 아니라, 2차원 배열로 읽어도 행렬의 구조가 없어서 이해하기 어렵다.
['행정기관,인구수(계),인구수(남),인구수(여),구성비(계),구성비(남),구성비(여),성비,세대수,세대당인구,관리기관명,관리부서명,부서전화번호,데이터기준일자\r\n의정부1동,37557 ,19039 ,18518 ,8.08,4.1,3.98,102.81,22514 ,1.67,경기도 의정부시청,민원여권과,031-828-2466,2023-06-30\r\n의정부2동,29729 ,14817 ,14912 ,6.4,3.19,3.21,99.36,16007 ,1.86,경기도 의정부시청,민원여권과,031-828-2466,2023-06-30\r\n호원1동,34658 ,16762 ,17896 ,7.46,3.61,3.85,93.66,15296 ,2.27,경기도 의정부시청,민원여권과,031-828-2466,2023-06-30\r\n호원2동,33400 ,16088 ,17312 ,7.19,3.46,3.72,92.93,13422 ,2.49,경기도 의정부시청,민원여권과,031-828-2466,2023-06-30\r\n장암동,19197 ,9156 ,10041 ,4.13,1.97,2.16,91.19,8324 ,2.31,경기도 의정부시청,민원여권과,031-828-2466,2023-06-30\r\n신곡1동,39848 ,19475 ,20373 ,8.57,4.19,4.38,95.59,17028 ,2.34,경기도 의정부시청,민원여권과,031-828-2466,2023-06-30\r\n신곡2동,45593 ,22018 ,23575 ,9.81,4.74,5.07,93.4,19005 ,2.4,경기도 의정부시청,민원여권과,031-828-2466,2023-06-30\r\n송산1동,56595 ,28012 ,28583 ,12.18,6.03,6.15,98,23887 ,2.37,경기도 의정부시청,민원여권과,031-828-2466,2023-06-30\r\n송산2동,32054 ,15897 ,16157 ,6.9,3.42,3.48,98.39,13103 ,2.45,경기도 의정부시청,민원여권과,031-828-2466,2023-06-30\r\n송산3동,45618 ,22119 ,23499 ,9.82,4.76,5.06,94.13,18032 ,2.53,경기도 의정부시청,민원여권과,031-828-2466,2023-06-30\r\n자금동,26064 ,12780 ,13284 ,5.61,2.75,2.86,96.21,11901 ,2.19,경기도 의정부시청,민원여권과,031-828-2466,2023-06-30\r\n가능동,24476 ,12238 ,12238 ,5.27,2.63,2.63,100,12165 ,2.01,경기도 의정부시청,민원여권과,031-828-2466,2023-06-30\r\n흥선동,19543 ,9935 ,9608 ,4.2,2.14,2.07,103.4,9735 ,2.01,경기도 의정부시청,민원여권과,031-828-2466,2023-06-30\r\n녹양동,20446 ,10148 ,10298 ,4.4,2.18,2.22,98.54,9356 ,2.19,경기도 의정부시청,민원여권과,031-828-2466,2023-06-30\r\n\r\n']
popList = _my.map(lambda x: x.split()).take(3)
print("---00: ", popList[0][0])
print("---01: ", popList[0][1])
---00: 행정기관,인구수(계),인구수(남),인구수(여),구성비(계),구성비(남),구성비(여),성비,세대수,세대당인구,관리기관명,관리부서명,부서전화번호,데이터기준일자 ---01: 의정부1동,37557
구조화된 데이터 -> Dataframe¶
popDf = spark\
.read.option("charset", "euc-kr")\
.option("header", "true")\
.csv(os.path.join("data","경기도 의정부시_인구현황_20230831.csv"))
popDf.show(5)
+---------+----------+----------+----------+----------+----------+----------+------+------+----------+-----------------+----------+------------+--------------+ | 행정기관|인구수(계)|인구수(남)|인구수(여)|구성비(계)|구성비(남)|구성비(여)| 성비|세대수|세대당인구| 관리기관명|관리부서명|부서전화번호|데이터기준일자| +---------+----------+----------+----------+----------+----------+----------+------+------+----------+-----------------+----------+------------+--------------+ |의정부1동| 37557 | 19039 | 18518 | 8.08| 4.1| 3.98|102.81|22514 | 1.67|경기도 의정부시청|민원여권과|031-828-2466| 2023-06-30| |의정부2동| 29729 | 14817 | 14912 | 6.4| 3.19| 3.21| 99.36|16007 | 1.86|경기도 의정부시청|민원여권과|031-828-2466| 2023-06-30| | 호원1동| 34658 | 16762 | 17896 | 7.46| 3.61| 3.85| 93.66|15296 | 2.27|경기도 의정부시청|민원여권과|031-828-2466| 2023-06-30| | 호원2동| 33400 | 16088 | 17312 | 7.19| 3.46| 3.72| 92.93|13422 | 2.49|경기도 의정부시청|민원여권과|031-828-2466| 2023-06-30| | 장암동| 19197 | 9156 | 10041 | 4.13| 1.97| 2.16| 91.19| 8324 | 2.31|경기도 의정부시청|민원여권과|031-828-2466| 2023-06-30| +---------+----------+----------+----------+----------+----------+----------+------+------+----------+-----------------+----------+------------+--------------+ only showing top 5 rows
spark-submit 실행¶
위 프로그램을 .py로 저장하고, spark-submit 명령으로 배치 실행을 할 수 있다.(클러스터로 실행)
%%writefile src/ds3_popCsvRead.py
#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
import os
import pyspark
def doIt():
print ("---------RESULT-----------")
popDf = spark\
.read.option("charset", "euc-kr")\
.option("header", "true")\
.csv(os.path.join("data","경기도 의정부시_인구현황_20230831.csv"))
popDf.show(5)
agedDf = spark\
.read.option("charset", "euc-kr")\
.option("header", "true")\
.csv(os.path.join("data","제주특별자치도 서귀포시_고령화비율및노령화지수현황_20230324.csv"))
agedDf.show(5)
if __name__ == "__main__":
os.environ["PYSPARK_PYTHON"]="/usr/bin/python3"
os.environ["PYSPARK_DRIVER_PYTHON"]="/usr/bin/python3"
myConf=pyspark.SparkConf()
spark = pyspark.sql.SparkSession.builder\
.master("local")\
.appName("myApp")\
.config(conf=myConf)\
.getOrCreate()
doIt()
spark.stop()
Overwriting src/ds3_popCsvRead.py
!spark-submit src/ds3_popCsvRead.py
23/11/27 05:26:07 WARN Utils: Your hostname, sojaehwiui-MacBookPro.local resolves to a loopback address: 127.0.0.1; using 172.30.1.29 instead (on interface en0) 23/11/27 05:26:07 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address 23/11/27 05:26:08 INFO SparkContext: Running Spark version 3.4.1 23/11/27 05:26:08 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 23/11/27 05:26:08 INFO ResourceUtils: ============================================================== 23/11/27 05:26:08 INFO ResourceUtils: No custom resources configured for spark.driver. 23/11/27 05:26:08 INFO ResourceUtils: ============================================================== 23/11/27 05:26:08 INFO SparkContext: Submitted application: myApp 23/11/27 05:26:08 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0) 23/11/27 05:26:08 INFO ResourceProfile: Limiting resource is cpu 23/11/27 05:26:08 INFO ResourceProfileManager: Added ResourceProfile id: 0 23/11/27 05:26:08 INFO SecurityManager: Changing view acls to: sojaehwi 23/11/27 05:26:08 INFO SecurityManager: Changing modify acls to: sojaehwi 23/11/27 05:26:08 INFO SecurityManager: Changing view acls groups to: 23/11/27 05:26:08 INFO SecurityManager: Changing modify acls groups to: 23/11/27 05:26:08 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: sojaehwi; groups with view permissions: EMPTY; users with modify permissions: sojaehwi; groups with modify permissions: EMPTY 23/11/27 05:26:09 INFO Utils: Successfully started service 'sparkDriver' on port 59405. 23/11/27 05:26:09 INFO SparkEnv: Registering MapOutputTracker 23/11/27 05:26:09 INFO SparkEnv: Registering BlockManagerMaster 23/11/27 05:26:09 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information 23/11/27 05:26:09 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up 23/11/27 05:26:09 INFO SparkEnv: Registering BlockManagerMasterHeartbeat 23/11/27 05:26:09 INFO DiskBlockManager: Created local directory at /private/var/folders/v4/z0sc2ps925v8zh04gvwkx_500000gn/T/blockmgr-f5e6750f-ea7f-4ba3-b7db-9bfb5f832dba 23/11/27 05:26:09 INFO MemoryStore: MemoryStore started with capacity 434.4 MiB 23/11/27 05:26:09 INFO SparkEnv: Registering OutputCommitCoordinator 23/11/27 05:26:09 INFO JettyUtils: Start Jetty 0.0.0.0:4040 for SparkUI 23/11/27 05:26:09 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041. 23/11/27 05:26:09 INFO Utils: Successfully started service 'SparkUI' on port 4041. 23/11/27 05:26:09 INFO Executor: Starting executor ID driver on host 172.30.1.29 23/11/27 05:26:09 INFO Executor: Starting executor with user classpath (userClassPathFirst = false): '' 23/11/27 05:26:09 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 59406. 23/11/27 05:26:09 INFO NettyBlockTransferService: Server created on 172.30.1.29:59406 23/11/27 05:26:09 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy 23/11/27 05:26:09 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 172.30.1.29, 59406, None) 23/11/27 05:26:09 INFO BlockManagerMasterEndpoint: Registering block manager 172.30.1.29:59406 with 434.4 MiB RAM, BlockManagerId(driver, 172.30.1.29, 59406, None) 23/11/27 05:26:09 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 172.30.1.29, 59406, None) 23/11/27 05:26:09 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 172.30.1.29, 59406, None) ---------RESULT----------- 23/11/27 05:26:09 INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir. 23/11/27 05:26:09 INFO SharedState: Warehouse path is 'file:/Users/sojaehwi/Documents/GitHub/LECTURE_Bigdata/spark-warehouse'. 23/11/27 05:26:10 INFO InMemoryFileIndex: It took 23 ms to list leaf files for 1 paths. 23/11/27 05:26:10 INFO InMemoryFileIndex: It took 1 ms to list leaf files for 1 paths. 23/11/27 05:26:11 INFO FileSourceStrategy: Pushed Filters: 23/11/27 05:26:11 INFO FileSourceStrategy: Post-Scan Filters: 23/11/27 05:26:11 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 199.5 KiB, free 434.2 MiB) 23/11/27 05:26:11 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 34.3 KiB, free 434.2 MiB) 23/11/27 05:26:11 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 172.30.1.29:59406 (size: 34.3 KiB, free: 434.4 MiB) 23/11/27 05:26:11 INFO SparkContext: Created broadcast 0 from csv at NativeMethodAccessorImpl.java:0 23/11/27 05:26:11 INFO FileSourceScanExec: Planning scan with bin packing, max size: 4196062 bytes, open cost is considered as scanning 4194304 bytes. 23/11/27 05:26:11 INFO CodeGenerator: Code generated in 109.328 ms 23/11/27 05:26:11 INFO SparkContext: Starting job: csv at NativeMethodAccessorImpl.java:0 23/11/27 05:26:11 INFO DAGScheduler: Got job 0 (csv at NativeMethodAccessorImpl.java:0) with 1 output partitions 23/11/27 05:26:11 INFO DAGScheduler: Final stage: ResultStage 0 (csv at NativeMethodAccessorImpl.java:0) 23/11/27 05:26:11 INFO DAGScheduler: Parents of final stage: List() 23/11/27 05:26:11 INFO DAGScheduler: Missing parents: List() 23/11/27 05:26:11 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[6] at csv at NativeMethodAccessorImpl.java:0), which has no missing parents 23/11/27 05:26:11 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 17.6 KiB, free 434.2 MiB) 23/11/27 05:26:11 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 7.9 KiB, free 434.1 MiB) 23/11/27 05:26:11 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 172.30.1.29:59406 (size: 7.9 KiB, free: 434.4 MiB) 23/11/27 05:26:11 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1535 23/11/27 05:26:11 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (MapPartitionsRDD[6] at csv at NativeMethodAccessorImpl.java:0) (first 15 tasks are for partitions Vector(0)) 23/11/27 05:26:11 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks resource profile 0 23/11/27 05:26:12 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0) (172.30.1.29, executor driver, partition 0, PROCESS_LOCAL, 7991 bytes) 23/11/27 05:26:12 INFO Executor: Running task 0.0 in stage 0.0 (TID 0) 23/11/27 05:26:12 INFO FileScanRDD: Reading File path: file:///Users/sojaehwi/Documents/GitHub/LECTURE_Bigdata/data/경기도%20의정부시_인구현황_20230831.csv, range: 0-1758, partition values: [empty row] 23/11/27 05:26:12 INFO CodeGenerator: Code generated in 6.352042 ms 23/11/27 05:26:12 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 1860 bytes result sent to driver 23/11/27 05:26:12 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 119 ms on 172.30.1.29 (executor driver) (1/1) 23/11/27 05:26:12 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 23/11/27 05:26:12 INFO DAGScheduler: ResultStage 0 (csv at NativeMethodAccessorImpl.java:0) finished in 0.213 s 23/11/27 05:26:12 INFO DAGScheduler: Job 0 is finished. Cancelling potential speculative or zombie tasks for this job 23/11/27 05:26:12 INFO TaskSchedulerImpl: Killing all running tasks in stage 0: Stage finished 23/11/27 05:26:12 INFO DAGScheduler: Job 0 finished: csv at NativeMethodAccessorImpl.java:0, took 0.237627 s 23/11/27 05:26:12 INFO CodeGenerator: Code generated in 4.697708 ms 23/11/27 05:26:12 INFO BlockManagerInfo: Removed broadcast_1_piece0 on 172.30.1.29:59406 in memory (size: 7.9 KiB, free: 434.4 MiB) 23/11/27 05:26:12 INFO FileSourceStrategy: Pushed Filters: 23/11/27 05:26:12 INFO FileSourceStrategy: Post-Scan Filters: 23/11/27 05:26:12 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 199.4 KiB, free 434.0 MiB) 23/11/27 05:26:12 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 34.2 KiB, free 433.9 MiB) 23/11/27 05:26:12 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 172.30.1.29:59406 (size: 34.2 KiB, free: 434.3 MiB) 23/11/27 05:26:12 INFO SparkContext: Created broadcast 2 from showString at NativeMethodAccessorImpl.java:0 23/11/27 05:26:12 INFO FileSourceScanExec: Planning scan with bin packing, max size: 4196062 bytes, open cost is considered as scanning 4194304 bytes. 23/11/27 05:26:12 INFO SparkContext: Starting job: showString at NativeMethodAccessorImpl.java:0 23/11/27 05:26:12 INFO DAGScheduler: Got job 1 (showString at NativeMethodAccessorImpl.java:0) with 1 output partitions 23/11/27 05:26:12 INFO DAGScheduler: Final stage: ResultStage 1 (showString at NativeMethodAccessorImpl.java:0) 23/11/27 05:26:12 INFO DAGScheduler: Parents of final stage: List() 23/11/27 05:26:12 INFO DAGScheduler: Missing parents: List() 23/11/27 05:26:12 INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[13] at showString at NativeMethodAccessorImpl.java:0), which has no missing parents 23/11/27 05:26:12 INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 12.1 KiB, free 433.9 MiB) 23/11/27 05:26:12 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 6.4 KiB, free 433.9 MiB) 23/11/27 05:26:12 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on 172.30.1.29:59406 (size: 6.4 KiB, free: 434.3 MiB) 23/11/27 05:26:12 INFO SparkContext: Created broadcast 3 from broadcast at DAGScheduler.scala:1535 23/11/27 05:26:12 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[13] at showString at NativeMethodAccessorImpl.java:0) (first 15 tasks are for partitions Vector(0)) 23/11/27 05:26:12 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks resource profile 0 23/11/27 05:26:12 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1) (172.30.1.29, executor driver, partition 0, PROCESS_LOCAL, 7991 bytes) 23/11/27 05:26:12 INFO Executor: Running task 0.0 in stage 1.0 (TID 1) 23/11/27 05:26:12 INFO FileScanRDD: Reading File path: file:///Users/sojaehwi/Documents/GitHub/LECTURE_Bigdata/data/경기도%20의정부시_인구현황_20230831.csv, range: 0-1758, partition values: [empty row] 23/11/27 05:26:12 INFO BlockManagerInfo: Removed broadcast_0_piece0 on 172.30.1.29:59406 in memory (size: 34.3 KiB, free: 434.4 MiB) 23/11/27 05:26:12 INFO CodeGenerator: Code generated in 33.611833 ms 23/11/27 05:26:12 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 2124 bytes result sent to driver 23/11/27 05:26:12 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 77 ms on 172.30.1.29 (executor driver) (1/1) 23/11/27 05:26:12 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 23/11/27 05:26:12 INFO DAGScheduler: ResultStage 1 (showString at NativeMethodAccessorImpl.java:0) finished in 0.088 s 23/11/27 05:26:12 INFO DAGScheduler: Job 1 is finished. Cancelling potential speculative or zombie tasks for this job 23/11/27 05:26:12 INFO TaskSchedulerImpl: Killing all running tasks in stage 1: Stage finished 23/11/27 05:26:12 INFO DAGScheduler: Job 1 finished: showString at NativeMethodAccessorImpl.java:0, took 0.089902 s 23/11/27 05:26:12 INFO CodeGenerator: Code generated in 22.498084 ms +---------+----------+----------+----------+----------+----------+----------+------+------+----------+-----------------+----------+------------+--------------+ | 행정기관|인구수(계)|인구수(남)|인구수(여)|구성비(계)|구성비(남)|구성비(여)| 성비|세대수|세대당인구| 관리기관명|관리부서명|부서전화번호|데이터기준일자| +---------+----------+----------+----------+----------+----------+----------+------+------+----------+-----------------+----------+------------+--------------+ |의정부1동| 37557 | 19039 | 18518 | 8.08| 4.1| 3.98|102.81|22514 | 1.67|경기도 의정부시청|민원여권과|031-828-2466| 2023-06-30| |의정부2동| 29729 | 14817 | 14912 | 6.4| 3.19| 3.21| 99.36|16007 | 1.86|경기도 의정부시청|민원여권과|031-828-2466| 2023-06-30| | 호원1동| 34658 | 16762 | 17896 | 7.46| 3.61| 3.85| 93.66|15296 | 2.27|경기도 의정부시청|민원여권과|031-828-2466| 2023-06-30| | 호원2동| 33400 | 16088 | 17312 | 7.19| 3.46| 3.72| 92.93|13422 | 2.49|경기도 의정부시청|민원여권과|031-828-2466| 2023-06-30| | 장암동| 19197 | 9156 | 10041 | 4.13| 1.97| 2.16| 91.19| 8324 | 2.31|경기도 의정부시청|민원여권과|031-828-2466| 2023-06-30| +---------+----------+----------+----------+----------+----------+----------+------+------+----------+-----------------+----------+------------+--------------+ only showing top 5 rows 23/11/27 05:26:12 INFO InMemoryFileIndex: It took 1 ms to list leaf files for 1 paths. 23/11/27 05:26:12 INFO InMemoryFileIndex: It took 0 ms to list leaf files for 1 paths. 23/11/27 05:26:12 INFO FileSourceStrategy: Pushed Filters: 23/11/27 05:26:12 INFO FileSourceStrategy: Post-Scan Filters: 23/11/27 05:26:12 INFO MemoryStore: Block broadcast_4 stored as values in memory (estimated size 199.5 KiB, free 434.0 MiB) 23/11/27 05:26:12 INFO BlockManagerInfo: Removed broadcast_3_piece0 on 172.30.1.29:59406 in memory (size: 6.4 KiB, free: 434.4 MiB) 23/11/27 05:26:12 INFO MemoryStore: Block broadcast_4_piece0 stored as bytes in memory (estimated size 34.3 KiB, free 433.9 MiB) 23/11/27 05:26:12 INFO BlockManagerInfo: Added broadcast_4_piece0 in memory on 172.30.1.29:59406 (size: 34.3 KiB, free: 434.3 MiB) 23/11/27 05:26:12 INFO SparkContext: Created broadcast 4 from csv at NativeMethodAccessorImpl.java:0 23/11/27 05:26:12 INFO FileSourceScanExec: Planning scan with bin packing, max size: 4195172 bytes, open cost is considered as scanning 4194304 bytes. 23/11/27 05:26:12 INFO SparkContext: Starting job: csv at NativeMethodAccessorImpl.java:0 23/11/27 05:26:12 INFO DAGScheduler: Got job 2 (csv at NativeMethodAccessorImpl.java:0) with 1 output partitions 23/11/27 05:26:12 INFO DAGScheduler: Final stage: ResultStage 2 (csv at NativeMethodAccessorImpl.java:0) 23/11/27 05:26:12 INFO DAGScheduler: Parents of final stage: List() 23/11/27 05:26:12 INFO DAGScheduler: Missing parents: List() 23/11/27 05:26:12 INFO DAGScheduler: Submitting ResultStage 2 (MapPartitionsRDD[20] at csv at NativeMethodAccessorImpl.java:0), which has no missing parents 23/11/27 05:26:12 INFO MemoryStore: Block broadcast_5 stored as values in memory (estimated size 17.6 KiB, free 433.9 MiB) 23/11/27 05:26:12 INFO BlockManagerInfo: Removed broadcast_2_piece0 on 172.30.1.29:59406 in memory (size: 34.2 KiB, free: 434.4 MiB) 23/11/27 05:26:12 INFO MemoryStore: Block broadcast_5_piece0 stored as bytes in memory (estimated size 7.9 KiB, free 433.9 MiB) 23/11/27 05:26:12 INFO BlockManagerInfo: Added broadcast_5_piece0 in memory on 172.30.1.29:59406 (size: 7.9 KiB, free: 434.4 MiB) 23/11/27 05:26:12 INFO SparkContext: Created broadcast 5 from broadcast at DAGScheduler.scala:1535 23/11/27 05:26:12 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 2 (MapPartitionsRDD[20] at csv at NativeMethodAccessorImpl.java:0) (first 15 tasks are for partitions Vector(0)) 23/11/27 05:26:12 INFO TaskSchedulerImpl: Adding task set 2.0 with 1 tasks resource profile 0 23/11/27 05:26:12 INFO TaskSetManager: Starting task 0.0 in stage 2.0 (TID 2) (172.30.1.29, executor driver, partition 0, PROCESS_LOCAL, 8030 bytes) 23/11/27 05:26:12 INFO Executor: Running task 0.0 in stage 2.0 (TID 2) 23/11/27 05:26:12 INFO FileScanRDD: Reading File path: file:///Users/sojaehwi/Documents/GitHub/LECTURE_Bigdata/data/제주특별자치도%20서귀포시_고령화비율및노령화지수현황_20230324.csv, range: 0-868, partition values: [empty row] 23/11/27 05:26:12 INFO Executor: Finished task 0.0 in stage 2.0 (TID 2). 1742 bytes result sent to driver 23/11/27 05:26:12 INFO TaskSetManager: Finished task 0.0 in stage 2.0 (TID 2) in 13 ms on 172.30.1.29 (executor driver) (1/1) 23/11/27 05:26:12 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool 23/11/27 05:26:12 INFO DAGScheduler: ResultStage 2 (csv at NativeMethodAccessorImpl.java:0) finished in 0.028 s 23/11/27 05:26:12 INFO DAGScheduler: Job 2 is finished. Cancelling potential speculative or zombie tasks for this job 23/11/27 05:26:12 INFO TaskSchedulerImpl: Killing all running tasks in stage 2: Stage finished 23/11/27 05:26:12 INFO DAGScheduler: Job 2 finished: csv at NativeMethodAccessorImpl.java:0, took 0.029598 s 23/11/27 05:26:12 INFO BlockManagerInfo: Removed broadcast_5_piece0 on 172.30.1.29:59406 in memory (size: 7.9 KiB, free: 434.4 MiB) 23/11/27 05:26:12 INFO FileSourceStrategy: Pushed Filters: 23/11/27 05:26:12 INFO FileSourceStrategy: Post-Scan Filters: 23/11/27 05:26:12 INFO MemoryStore: Block broadcast_6 stored as values in memory (estimated size 199.4 KiB, free 434.0 MiB) 23/11/27 05:26:12 INFO BlockManagerInfo: Removed broadcast_4_piece0 on 172.30.1.29:59406 in memory (size: 34.3 KiB, free: 434.4 MiB) 23/11/27 05:26:12 INFO MemoryStore: Block broadcast_6_piece0 stored as bytes in memory (estimated size 34.2 KiB, free 434.2 MiB) 23/11/27 05:26:12 INFO BlockManagerInfo: Added broadcast_6_piece0 in memory on 172.30.1.29:59406 (size: 34.2 KiB, free: 434.4 MiB) 23/11/27 05:26:12 INFO SparkContext: Created broadcast 6 from showString at NativeMethodAccessorImpl.java:0 23/11/27 05:26:12 INFO FileSourceScanExec: Planning scan with bin packing, max size: 4195172 bytes, open cost is considered as scanning 4194304 bytes. 23/11/27 05:26:12 INFO SparkContext: Starting job: showString at NativeMethodAccessorImpl.java:0 23/11/27 05:26:12 INFO DAGScheduler: Got job 3 (showString at NativeMethodAccessorImpl.java:0) with 1 output partitions 23/11/27 05:26:12 INFO DAGScheduler: Final stage: ResultStage 3 (showString at NativeMethodAccessorImpl.java:0) 23/11/27 05:26:12 INFO DAGScheduler: Parents of final stage: List() 23/11/27 05:26:12 INFO DAGScheduler: Missing parents: List() 23/11/27 05:26:12 INFO DAGScheduler: Submitting ResultStage 3 (MapPartitionsRDD[27] at showString at NativeMethodAccessorImpl.java:0), which has no missing parents 23/11/27 05:26:12 INFO MemoryStore: Block broadcast_7 stored as values in memory (estimated size 11.4 KiB, free 434.2 MiB) 23/11/27 05:26:12 INFO MemoryStore: Block broadcast_7_piece0 stored as bytes in memory (estimated size 6.1 KiB, free 434.2 MiB) 23/11/27 05:26:12 INFO BlockManagerInfo: Added broadcast_7_piece0 in memory on 172.30.1.29:59406 (size: 6.1 KiB, free: 434.4 MiB) 23/11/27 05:26:12 INFO SparkContext: Created broadcast 7 from broadcast at DAGScheduler.scala:1535 23/11/27 05:26:12 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 3 (MapPartitionsRDD[27] at showString at NativeMethodAccessorImpl.java:0) (first 15 tasks are for partitions Vector(0)) 23/11/27 05:26:12 INFO TaskSchedulerImpl: Adding task set 3.0 with 1 tasks resource profile 0 23/11/27 05:26:12 INFO TaskSetManager: Starting task 0.0 in stage 3.0 (TID 3) (172.30.1.29, executor driver, partition 0, PROCESS_LOCAL, 8030 bytes) 23/11/27 05:26:12 INFO Executor: Running task 0.0 in stage 3.0 (TID 3) 23/11/27 05:26:12 INFO FileScanRDD: Reading File path: file:///Users/sojaehwi/Documents/GitHub/LECTURE_Bigdata/data/제주특별자치도%20서귀포시_고령화비율및노령화지수현황_20230324.csv, range: 0-868, partition values: [empty row] 23/11/27 05:26:12 INFO CodeGenerator: Code generated in 10.106541 ms 23/11/27 05:26:12 INFO Executor: Finished task 0.0 in stage 3.0 (TID 3). 1858 bytes result sent to driver 23/11/27 05:26:12 INFO TaskSetManager: Finished task 0.0 in stage 3.0 (TID 3) in 39 ms on 172.30.1.29 (executor driver) (1/1) 23/11/27 05:26:12 INFO TaskSchedulerImpl: Removed TaskSet 3.0, whose tasks have all completed, from pool 23/11/27 05:26:12 INFO DAGScheduler: ResultStage 3 (showString at NativeMethodAccessorImpl.java:0) finished in 0.050 s 23/11/27 05:26:12 INFO DAGScheduler: Job 3 is finished. Cancelling potential speculative or zombie tasks for this job 23/11/27 05:26:12 INFO TaskSchedulerImpl: Killing all running tasks in stage 3: Stage finished 23/11/27 05:26:12 INFO DAGScheduler: Job 3 finished: showString at NativeMethodAccessorImpl.java:0, took 0.064414 s 23/11/27 05:26:12 INFO CodeGenerator: Code generated in 9.870542 ms +----+---+---------------+----------------+---------------+----------+----------+--------------+ |년도| 월|서귀포시 인구수|65세이상 인구수 |14세이하 인구수|고령화비율|노령화지수|데이터기준일자| +----+---+---------------+----------------+---------------+----------+----------+--------------+ |2008| 12| 153120| 22241| 26792| 14.52| 83.01| 2023-03-24| |2009| 12| 152285| 23031| 25504| 15.12| 90.3| 2023-03-24| |2010| 12| 153716| 23990| 24633| 15.6| 97.38| 2023-03-24| |2011| 12| 153366| 24839| 23686| 16.2| 104.86| 2023-03-24| |2012| 12| 154057| 25826| 22861| 16.76| 112.97| 2023-03-24| +----+---+---------------+----------------+---------------+----------+----------+--------------+ only showing top 5 rows 23/11/27 05:26:12 INFO SparkContext: SparkContext is stopping with exitCode 0. 23/11/27 05:26:12 INFO SparkUI: Stopped Spark web UI at http://172.30.1.29:4041 23/11/27 05:26:12 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 23/11/27 05:26:12 INFO MemoryStore: MemoryStore cleared 23/11/27 05:26:12 INFO BlockManager: BlockManager stopped 23/11/27 05:26:12 INFO BlockManagerMaster: BlockManagerMaster stopped 23/11/27 05:26:12 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 23/11/27 05:26:12 INFO SparkContext: Successfully stopped SparkContext 23/11/27 05:26:12 INFO ShutdownHookManager: Shutdown hook called 23/11/27 05:26:12 INFO ShutdownHookManager: Deleting directory /private/var/folders/v4/z0sc2ps925v8zh04gvwkx_500000gn/T/spark-8effc4bf-1536-44f7-a4d3-ca4a990aff2f 23/11/27 05:26:12 INFO ShutdownHookManager: Deleting directory /private/var/folders/v4/z0sc2ps925v8zh04gvwkx_500000gn/T/spark-a63799cd-1f49-4de6-8560-47c217995782/pyspark-29804b3a-dc10-4427-8a74-03e3803fd0f1 23/11/27 05:26:12 INFO ShutdownHookManager: Deleting directory /private/var/folders/v4/z0sc2ps925v8zh04gvwkx_500000gn/T/spark-a63799cd-1f49-4de6-8560-47c217995782
!which python
/Users/sojaehwi/opt/anaconda3/bin/python
!python src/ds3_popCsvRead.py
23/11/27 05:26:17 WARN Utils: Your hostname, sojaehwiui-MacBookPro.local resolves to a loopback address: 127.0.0.1; using 172.30.1.29 instead (on interface en0) 23/11/27 05:26:17 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 23/11/27 05:26:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 23/11/27 05:26:18 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041. ---------RESULT----------- +---------+----------+----------+----------+----------+----------+----------+------+------+----------+-----------------+----------+------------+--------------+ | 행정기관|인구수(계)|인구수(남)|인구수(여)|구성비(계)|구성비(남)|구성비(여)| 성비|세대수|세대당인구| 관리기관명|관리부서명|부서전화번호|데이터기준일자| +---------+----------+----------+----------+----------+----------+----------+------+------+----------+-----------------+----------+------------+--------------+ |의정부1동| 37557 | 19039 | 18518 | 8.08| 4.1| 3.98|102.81|22514 | 1.67|경기도 의정부시청|민원여권과|031-828-2466| 2023-06-30| |의정부2동| 29729 | 14817 | 14912 | 6.4| 3.19| 3.21| 99.36|16007 | 1.86|경기도 의정부시청|민원여권과|031-828-2466| 2023-06-30| | 호원1동| 34658 | 16762 | 17896 | 7.46| 3.61| 3.85| 93.66|15296 | 2.27|경기도 의정부시청|민원여권과|031-828-2466| 2023-06-30| | 호원2동| 33400 | 16088 | 17312 | 7.19| 3.46| 3.72| 92.93|13422 | 2.49|경기도 의정부시청|민원여권과|031-828-2466| 2023-06-30| | 장암동| 19197 | 9156 | 10041 | 4.13| 1.97| 2.16| 91.19| 8324 | 2.31|경기도 의정부시청|민원여권과|031-828-2466| 2023-06-30| +---------+----------+----------+----------+----------+----------+----------+------+------+----------+-----------------+----------+------------+--------------+ only showing top 5 rows +----+---+---------------+----------------+---------------+----------+----------+--------------+ |년도| 월|서귀포시 인구수|65세이상 인구수 |14세이하 인구수|고령화비율|노령화지수|데이터기준일자| +----+---+---------------+----------------+---------------+----------+----------+--------------+ |2008| 12| 153120| 22241| 26792| 14.52| 83.01| 2023-03-24| |2009| 12| 152285| 23031| 25504| 15.12| 90.3| 2023-03-24| |2010| 12| 153716| 23990| 24633| 15.6| 97.38| 2023-03-24| |2011| 12| 153366| 24839| 23686| 16.2| 104.86| 2023-03-24| |2012| 12| 154057| 25826| 22861| 16.76| 112.97| 2023-03-24| +----+---+---------------+----------------+---------------+----------+----------+--------------+ only showing top 5 rows
Map-Reduce¶
1. Python으로 따라해보기¶
1-1. map() : 각 데이터 요소에 함수를 적용해서 list를 반환¶
def c2f(c):
return (float(9)/5)*c + 32
f= map(c2f, [39.2, 36.5, 37.3, 37.8])
print (list(f))
[102.56, 97.7, 99.14, 100.03999999999999]
x = lambda x, y : x + y
print(x(1, 2))
3
# lambda func
celsius = [10, 20, 30, 40]
f=map(lambda c:(float(9)/5)*c + 32, celsius)
print(f, list(f))
<map object at 0x7fc548f56250> [50.0, 68.0, 86.0, 104.0]
list(map(lambda x:x.split(), "Hello World"))
[['H'], ['e'], ['l'], ['l'], ['o'], [], ['W'], ['o'], ['r'], ['l'], ['d']]
1-2. filter(): 데이터를 선별한다.¶
fib = [0,1,1,2,3,5,8,13,21,34,55]
result = filter(lambda x: x % 2, fib)
print (list(result))
[1, 1, 3, 5, 13, 21, 55]
1-3. reduce() : 데이터에 대해 함수를 반복적으로 적용하여 결과 값을 만들게 된다¶
reduce() 역시 함수와 데이터 2개의 인자를 받는다.
아래 예는 1부터 101까지 두 수 x,y 인자를 반복해서 더한다는 것이다. x는 부분합계로 y를 계속 저장해 나가는 역할을 하며, 최종 합계에 이르게 된다.
from functools import reduce
reduce(lambda x, y: x + y, range(1,101))
5050
2. RDD 사용하기¶
2-1. map() : 각 데이터 요소에 함수 적용¶
nRdd = spark.sparkContext.parallelize([1, 2, 3, 4])
squared = nRdd.map(lambda x: x * x)
print (squared)
PythonRDD[37] at RDD at PythonRDD.scala:53
squared.collect() #변환의 실제 경과 보기
[1, 4, 9, 16]
문자열을 정수로 반환하기¶
myRdd4.take(5)
['35, 2', '40, 27', '12, 38', '15, 31', '21, 1']
myRdd5 = myRdd4.map(lambda x : x.split(','))
myRdd5.take(5)
[['35', ' 2'], ['40', ' 27'], ['12', ' 38'], ['15', ' 31'], ['21', ' 1']]
myRdd6 = myRdd5.map(lambda x: [int(i) for i in x])
myRdd6.take(5)
[[35, 2], [40, 27], [12, 38], [15, 31], [21, 1]]
단어 분리하기¶
myRdd2=spark.sparkContext\
.textFile(os.path.join("data","ds_spark_wiki.txt"))
sentences=myRdd2.map(lambda x:x.split()) #공백을 기준으로 분할
sentences.count()
10
sentences.take(3)
[['Wikipedia'], ['Apache', 'Spark', 'is', 'an', 'open', 'source', 'cluster', 'computing', 'framework.'], ['아파치', '스파크는', '오픈', '소스', '클러스터', '컴퓨팅', '프레임워크이다.']]
for line in sentences.collect():
for word in line:
print (word, end=" ")
print ("\n-----")
Wikipedia ----- Apache Spark is an open source cluster computing framework. ----- 아파치 스파크는 오픈 소스 클러스터 컴퓨팅 프레임워크이다. ----- Apache Spark Apache Spark Apache Spark Apache Spark ----- 아파치 스파크 아파치 스파크 아파치 스파크 아파치 스파크 ----- Originally developed at the University of California, Berkeley's AMPLab, ----- the Spark codebase was later donated to the Apache Software Foundation, ----- which has maintained it since. ----- Spark provides an interface for programming entire clusters with ----- implicit data parallelism and fault-tolerance. -----
myRdd2.map(lambda s:len(s)).collect() #한 줄 Length
[9, 59, 32, 51, 31, 72, 71, 30, 64, 46]
's'.upper() #대소문자 변환
'S'
2-2. reduce() : 각 데이터 요소에 함수 적용
myRdd100 = spark.sparkContext.parallelize(range(1,101))
myRdd100.reduce(lambda subtotal, x: subtotal + x)
5050
Fold¶
reduce와 유사하나 파티션별로 초기값을 할당함(많이 안씀)
spark.sparkContext.parallelize(range(1,11),2).glom().collect()
[[1, 2, 3, 4, 5], [6, 7, 8, 9, 10]]
# 파티션 수 2 * 초기값 10 + (초기값 10 + 55) = 85
spark.sparkContext.parallelize(range(1,11),2).fold(10, lambda subtotal, x: subtotal + x)
85
통계함수¶
print ("sum: ", myRdd100.sum())
print ("min: ", myRdd100.min())
print ("max: ", myRdd100.max())
print ("count: ", myRdd100.count())
print ("standard deviation:", myRdd100.stdev())
print ("variance: ", myRdd100.variance())
sum: 5050 min: 1 max: 100 count: 100 standard deviation: 28.86607004772212 variance: 833.25
filter¶
myRdd2.collect()
['Wikipedia', 'Apache Spark is an open source cluster computing framework.', '아파치 스파크는 오픈 소스 클러스터 컴퓨팅 프레임워크이다.', 'Apache Spark Apache Spark Apache Spark Apache Spark', '아파치 스파크 아파치 스파크 아파치 스파크 아파치 스파크', "Originally developed at the University of California, Berkeley's AMPLab,", 'the Spark codebase was later donated to the Apache Software Foundation,', 'which has maintained it since.', 'Spark provides an interface for programming entire clusters with', 'implicit data parallelism and fault-tolerance.']
myRdd_spark=myRdd2.filter(lambda line: "Spark" in line)
print ("How many lines having 'Spark': ", myRdd_spark.count())
How many lines having 'Spark': 4
myRdd_unicode = myRdd2.filter(lambda line: u"스파크" in line) #u는 유니코드. pyhton3부터는 안해도된다.
print (myRdd_unicode.first())
아파치 스파크는 오픈 소스 클러스터 컴퓨팅 프레임워크이다.
flat : 2차원 -> 1차원으로 변환¶
# 문장 안에 stopwords를 포함한 경우는 제거되지 않는다. 따라서 flatMap()을 하고 단어에 대해 불용어를 제거해야 한다.
# 불용어는 단어빈도를 계산하면서 제거하고 싶은 단어를 말한다. 불용어는 빈도를 세어도 의미가 없는 대명사 (이, 그, 저...) 또는 한 글자 단어 (등...)이
# 될 수 있다.
# 한글은 유니코드로 처리해야 한다. 영어는 대소문자를 모두 처리하기 위해 여기서는 소문자로 만들어 처리한다.
stopwords = ['is','am','are','the','for','a', 'an', 'at']
myRdd_stop = myRdd2.flatMap(lambda x:x.split())\
.filter(lambda x: x not in stopwords)
for words in myRdd_stop.collect():
print (words, end=' ')
Wikipedia Apache Spark open source cluster computing framework. 아파치 스파크는 오픈 소스 클러스터 컴퓨팅 프레임워크이다. Apache Spark Apache Spark Apache Spark Apache Spark 아파치 스파크 아파치 스파크 아파치 스파크 아파치 스파크 Originally developed University of California, Berkeley's AMPLab, Spark codebase was later donated to Apache Software Foundation, which has maintained it since. Spark provides interface programming entire clusters with implicit data parallelism and fault-tolerance.
foreach() : map()과 달리 반환값이 없다¶
spark.sparkContext.parallelize([1, 2, 3, 4, 5]).foreach(lambda x: x + 1)
spark.sparkContext.parallelize([1, 2, 3, 4, 5]).map(lambda x: x + 1).collect()
[2, 3, 4, 5, 6]
def f(x):
print(x)
spark.sparkContext.parallelize([1, 2, 3, 4, 5]).foreach(f)
1 2 3 4 5
Pipeline Chaining¶
Function을 연이어서 적용하는 방식
wordsLength = myRdd_stop\
.map(len)\
.collect()
print (wordsLength)
[9, 6, 5, 4, 6, 7, 9, 10, 3, 4, 2, 2, 4, 3, 8, 6, 5, 6, 5, 6, 5, 6, 5, 3, 3, 3, 3, 3, 3, 3, 3, 10, 9, 10, 2, 11, 10, 7, 5, 8, 3, 5, 7, 2, 6, 8, 11, 5, 3, 10, 2, 6, 5, 8, 9, 11, 6, 8, 4, 8, 4, 11, 3, 16]
'Data Science > 데이터분석 (Spark)' 카테고리의 다른 글
[빅데이터분석] Spark Dataframe 생성과 Dataframe API 사용하기 (1) | 2023.11.29 |
---|---|
[빅데이터분석] Spark RDD 다루기 : 데이터 집계와 Paired-RDD (0) | 2023.11.27 |
[빅데이터분석] Apache Spark가 각광받고 있는 이유에 대한 백엔드 개발자의 관찰 (1) | 2023.11.27 |
[빅데이터분석] OpenAPI를 통한 데이터 수집과 MongoDB에 저장하기 (1) | 2023.11.27 |
[Pandas] 데이터 분석을 위한 판다스 사용법 - 6. 데이터 편집 (0) | 2023.07.06 |