[빅데이터분석] ML을 위한 Data Transform과 단어빈도
<상명대학교 빅데이터분석 임좌상 교수님 수업을 듣고>
데이터 변환¶
ML을 위해서는 원천데이터는 그대로는 사용하기 어렵고, 타겟으로 하는 모델의 입력으로 어떻게든 변환되어야 한다.
Label과 features를 가지고 있는 Labeled Point로 구성해보자
해당 Jupyter Notebook에서는 다음 내용들을 다룬다.
- Vector와 Metrix
- Label Point(Label과 Features)
- Dataframe 단어 빈도(TF-IDF)
- 연속데이터의 변환
- VectorAssembler와 Pipeline
import os, sys
import pyspark
myConf=pyspark.SparkConf()
spark = pyspark.sql.SparkSession.builder\
.master("local")\
.appName("myApp")\
.config(conf=myConf)\
.getOrCreate()
23/12/20 12:45:41 WARN Utils: Your hostname, sojaehwiui-MacBookPro.local resolves to a loopback address: 127.0.0.1; using 172.30.1.29 instead (on interface en0) 23/12/20 12:45:41 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 23/12/20 12:45:41 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 23/12/20 12:45:42 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041. 23/12/20 12:45:42 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042. 23/12/20 12:45:42 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.
Vectors¶
행렬 Vector는 dense와 sparse로 구분할 수 있다.
밀집벡터 dense vector : 빈 값이 별로 없이 모든 행렬이 값을 가지고 있다. dense vectors는 numpy array와 같은 특징을 가진다
희소벡터 sparse vector : 빈 값이 많아서, 값이 있는 경우 그 값이 있는 인덱스로 표현해 배열을 축약하게 된다.
# Dense Vectors 밀집벡터
import numpy as np
dv = np.array([1.0, 2.1, 3])
from pyspark.mllib.linalg import Vectors
dv1mllib=Vectors.dense([1.0, 2.1, 3])
print ("Dense vector: {}\nType: {}".format(dv1mllib, type(dv1mllib)))
Dense vector: [1.0,2.1,3.0] Type: <class 'pyspark.mllib.linalg.DenseVector'>
from pyspark.ml.linalg import Vectors
dv1ml=Vectors.dense([1.0, 2.1, 3])
print ("ml의 dense vector: {}".format(dv1ml))
ml의 dense vector: [1.0,2.1,3.0]
벡터 연산¶
dv1mllib.dot(dv1mllib)
# np.dot(dv,dv)
14.41
dv1mllib*dv1mllib
DenseVector([1.0, 4.41, 9.0])
# Sparse Vectors 희소행렬
# sparse는 실제 값이 없는 요소, '0'을 제거하여 만든 vector
# 5는 컬럼 갯수, 0, 1, 4는 값이 있는 컬럼, [160.0, 69.0, 24.0]는 실제 값을 의미한다.
sv1 = Vectors.sparse(5, [0,1,4],[160.0, 69.0, 24.0])
type(sv1)
sv1.toArray()
array([160., 69., 0., 0., 24.])
Matrix (2차원 Vector)¶
from pyspark.mllib.linalg import Matrices
Matrices.dense(3, 2, [1,2,3,4,5,6])
DenseMatrix(3, 2, [1.0, 2.0, 3.0, 4.0, 5.0, 6.0], False)
Matrices.dense(3, 2, [1,2,3,4,5,6]).toArray()
array([[1., 4.], [2., 5.], [3., 6.]])
# 밀집행렬 -> 열기반 희소행렬
from scipy.sparse import csc_matrix #csc : 컬럼 별로
sparse_csc = csc_matrix([[1, 0, 2],
[0, 0, 3],
[4, 5, 6]])
print(sparse_csc)
(0, 0) 1 (2, 0) 4 (2, 1) 5 (0, 2) 2 (1, 2) 3 (2, 2) 6
from scipy.sparse import csr_matrix #csr : Row 별로
sparse_csr = csr_matrix([[1, 0, 2],
[0, 0, 3],
[4, 5, 6]])
print(sparse_csr)
(0, 0) 1 (0, 2) 2 (1, 2) 3 (2, 0) 4 (2, 1) 5 (2, 2) 6
분산 Matrix¶
매트릭스 역시 로컬과 분산으로 구분할 수 있다
p = [[1.0,2.0,3.0],[1.1,2.1,3.1],[1.2,2.2,3.3]]
my=spark.sparkContext.parallelize(p)
from pyspark.mllib.linalg.distributed import RowMatrix
rm=RowMatrix(my)
rm.rows.collect()
[DenseVector([1.0, 2.0, 3.0]), DenseVector([1.1, 2.1, 3.1]), DenseVector([1.2, 2.2, 3.3])]
Indexed Row Matrix¶
계열 데이터와 같이 순서가 있는 데이터를 저장하기에 적합.
Row Matrix과 유사하지만, 파티션으로 나누어, 그러나 순서를 지켜서 저장
from pyspark.mllib.linalg.distributed import IndexedRow
irRdd = spark.sparkContext.parallelize([
IndexedRow(1, [3, 1, 2]),
IndexedRow(2, [1, 3, 2]),
IndexedRow(3, [5, 4, 3]),
IndexedRow(4, [6, 7, 4]),
IndexedRow(5, [8, 9, 2]),
])
from pyspark.mllib.linalg.distributed import IndexedRowMatrix
irm = IndexedRowMatrix(irRdd)
print(irm.numRows())
print(irm.numCols())
print(irm.rows.collect())
6 3
[Stage 94:> (0 + 1) / 1]
[IndexedRow(1, [3.0,1.0,2.0]), IndexedRow(2, [1.0,3.0,2.0]), IndexedRow(3, [5.0,4.0,3.0]), IndexedRow(4, [6.0,7.0,4.0]), IndexedRow(5, [8.0,9.0,2.0])]
Matrix to Dataframe¶
from pyspark.mllib.linalg import Matrices
my = Matrices.dense(3, 2, [1,2,3,4,5,6])
my = Matrices.dense(3, 2, [1,2,3,4,5,6]).toArray().tolist()
df = spark.createDataFrame(my, ['c1','c2'])
df.show()
+---+---+ | c1| c2| +---+---+ |1.0|4.0| |2.0|5.0| |3.0|6.0| +---+---+
Labeled point¶
label, features로 구성 분류 및 회귀분석에 사용되는 데이터 타잎이다. 'label'과 'features'로 구성된다.
Labeled Point는 로컬벡터로 레이블을 가지고 있는 밀집 또는 희소 행렬을 말한다. 레이블이 있으므로, supervised learning에 요구되는 형식이다. 레이블은 double형식으로 저장되어야 한다. 분류에 사용되려면 예를 들어 긍정, 부정인 경우 정수 1, 0으로 하지 않고 double 형식으로 저장
from pyspark.mllib.regression import LabeledPoint
LabeledPoint(1.0, [1.0, 2.0, 3.0]) #Label, [Features]
LabeledPoint(1.0, [1.0,2.0,3.0])
#희소 행렬 -> Features
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.linalg import Vectors
LabeledPoint(1992, Vectors.sparse(10, {0: 3.0, 1:5.5, 2: 10.0}))
LabeledPoint(1992.0, (10,[0,1,2],[3.0,5.5,10.0]))
ML Vector to mllib Labled Point¶
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.linalg import Vectors
LabeledPoint(1.0, Vectors.fromML(dv1ml))
LabeledPoint(1.0, [1.0,2.1,3.0])
문제 S-1: RDD 데이터를 LabeledPoint로 변환하기¶
머신러닝은 사람이 경험을 통해 배우는 것과 비슷하게 과거 데이터로부터 학습을 한다. 학습이란 어렵게 생각할 필요 없이, 과거 데이터에서 수학적이나 알고리즘을 활용하여 어떤 패턴을 찾아내는 것이다. spark에서 제공한 데이터 파일 data/mllib/sample_svm_data.txt을 읽어서 훈련데이터를 만들어 보자.
데이터를 읽어 보면, 맨 처음 값은 label에 해당하고, 다음은 일련의 수로 구성된다. 이로부터 RDD를 생성하고, label, features를 구성하여 Labeled Point로 만든다.
1 0 2.52078447201548 0 0 0 2.004684436494304 2.000347299268466 0 2.228387042742021 2.228387042742023 0 0 0 0 0 0 ...
import os
_fsvm=os.path.join(os.getcwd(),'data','sample_svm_data.txt')
# try:
# _f=open(_fsvm,'r')
# _lines=_f.readlines()
# _f.close()
# except:
# print("An exception occurred")
# _lines[0]
Spark에서 RDD 생성¶
_rdd=spark.sparkContext.textFile(_fsvm)\
.map(lambda line: [float(x) for x in line.split()])
_rdd.take(2)[0]
[1.0, 0.0, 2.52078447201548, 0.0, 0.0, 0.0, 2.004684436494304, 2.000347299268466, 0.0, 2.228387042742021, 2.228387042742023, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
LabeledPoint 생성¶
from pyspark.mllib.regression import LabeledPoint
_trainRdd0=_rdd.map(lambda line:LabeledPoint(line[0], line[1:]))
_trainRdd0.take(1)
[LabeledPoint(1.0, [0.0,2.52078447201548,0.0,0.0,0.0,2.004684436494304,2.000347299268466,0.0,2.228387042742021,2.228387042742023,0.0,0.0,0.0,0.0,0.0,0.0])]
_trainRdd=spark.sparkContext.textFile(_fsvm)\
.map(lambda line: [float(x) for x in line.split()])\
.map(lambda p:LabeledPoint(p[0], p[1:]))
_trainRdd.take(1)
[LabeledPoint(1.0, [0.0,2.52078447201548,0.0,0.0,0.0,2.004684436494304,2.000347299268466,0.0,2.228387042742021,2.228387042742023,0.0,0.0,0.0,0.0,0.0,0.0])]
TF ( Term Frequency)¶
단어 빈도를 분석해주는 라이브러리이다.
wikiRdd3 = spark.sparkContext\
.textFile(os.path.join("data","ds_spark_wiki.txt"))\
.map(lambda line: line.split())
wikiRdd3.take(3)
[['Wikipedia'], ['Apache', 'Spark', 'is', 'an', 'open', 'source', 'cluster', 'computing', 'framework.'], ['아파치', '스파크는', '오픈', '소스', '클러스터', '컴퓨팅', '프레임워크이다.']]
from pyspark.mllib.feature import HashingTF
hashingTF = HashingTF()
tf = hashingTF.transform(wikiRdd3)
tf.collect()
# tf는 Value가 1 또는 2 따위의 정수이다.
[SparseVector(1048576, {1026674: 1.0}), SparseVector(1048576, {148618: 1.0, 183975: 1.0, 216207: 1.0, 261052: 1.0, 617454: 1.0, 696349: 1.0, 721336: 1.0, 816618: 1.0, 897662: 1.0}), SparseVector(1048576, {60386: 1.0, 177421: 1.0, 568609: 1.0, 569458: 1.0, 847171: 1.0, 850510: 1.0, 1040679: 1.0}), SparseVector(1048576, {261052: 4.0, 816618: 4.0}), SparseVector(1048576, {60386: 4.0, 594754: 4.0}), SparseVector(1048576, {21980: 1.0, 70882: 1.0, 274690: 1.0, 357784: 1.0, 549790: 1.0, 597434: 1.0, 804583: 1.0, 829803: 1.0, 935701: 1.0}), SparseVector(1048576, {154253: 1.0, 261052: 1.0, 438276: 1.0, 460085: 1.0, 585459: 1.0, 664288: 1.0, 816618: 1.0, 935701: 2.0, 948143: 1.0, 1017889: 1.0}), SparseVector(1048576, {270017: 1.0, 472985: 1.0, 511771: 1.0, 718483: 1.0, 820917: 1.0}), SparseVector(1048576, {34116: 1.0, 87407: 1.0, 276491: 1.0, 348943: 1.0, 482882: 1.0, 549350: 1.0, 721336: 1.0, 816618: 1.0, 1025622: 1.0}), SparseVector(1048576, {1769: 1.0, 151357: 1.0, 500659: 1.0, 547760: 1.0, 979482: 1.0})]
# TF-IDF : IDF는 전체에서 몇 개의 문서에 씌였는지를 반대로 계산한 값
from pyspark.mllib.feature import HashingTF, IDF
idf = IDF().fit(tf)
tfidf = idf.transform(tf)
tfidf.collect()
#idf는 Value가 소수점이다. 주요도에 따라서 가중치를 부여하기 때문
[SparseVector(1048576, {1026674: 1.7047}), SparseVector(1048576, {148618: 1.7047, 183975: 1.7047, 216207: 1.7047, 261052: 1.0116, 617454: 1.7047, 696349: 1.7047, 721336: 1.2993, 816618: 0.7885, 897662: 1.7047}), SparseVector(1048576, {60386: 1.2993, 177421: 1.7047, 568609: 1.7047, 569458: 1.7047, 847171: 1.7047, 850510: 1.7047, 1040679: 1.7047}), SparseVector(1048576, {261052: 4.0464, 816618: 3.1538}), SparseVector(1048576, {60386: 5.1971, 594754: 6.819}), SparseVector(1048576, {21980: 1.7047, 70882: 1.7047, 274690: 1.7047, 357784: 1.7047, 549790: 1.7047, 597434: 1.7047, 804583: 1.7047, 829803: 1.7047, 935701: 1.2993}), SparseVector(1048576, {154253: 1.7047, 261052: 1.0116, 438276: 1.7047, 460085: 1.7047, 585459: 1.7047, 664288: 1.7047, 816618: 0.7885, 935701: 2.5986, 948143: 1.7047, 1017889: 1.7047}), SparseVector(1048576, {270017: 1.7047, 472985: 1.7047, 511771: 1.7047, 718483: 1.7047, 820917: 1.7047}), SparseVector(1048576, {34116: 1.7047, 87407: 1.7047, 276491: 1.7047, 348943: 1.7047, 482882: 1.7047, 549350: 1.7047, 721336: 1.2993, 816618: 0.7885, 1025622: 1.7047}), SparseVector(1048576, {1769: 1.7047, 151357: 1.7047, 500659: 1.7047, 547760: 1.7047, 979482: 1.7047})]
# StandardScaler -> 스케일을 표준화해주는 수단( (ex) 키와 몸무게의 스케일이 다를 때 )
# 데이터를 표준화하려면 1) 평균과 표준편차를 계산하고, 2) 측정값에서 평균을 빼고, 표준편차로 나누어 주면 된다. 즉 zscore를 계산하는 것과 같다.
tRdd = spark.sparkContext\
.textFile(os.path.join('data', 'ds_spark_heightweight.txt'))
tRdd.map(lambda x: x.split('\t')).take(1)
[['1', '65.78', '112.99']]
tRdd.map(lambda x: x.split('\t')).map(lambda x: [str(x[0]), float(x[1]), float(x[2])]).take(1)
[['1', 65.78, 112.99]]
tRdd.map(lambda x: x.split('\t'))\
.map(lambda x: [str(x[0]), float(x[1]), float(x[2])])\
.take(1)
[['1', 65.78, 112.99]]
# Dense Vectors에 별도로 저장하기
from pyspark.mllib.linalg import Vectors
_tRdd =tRdd\
.map(lambda x: x.split('\t'))\
.map(lambda x: [str(x[0]), float(x[1]), float(x[2])])\
.map(lambda x: Vectors.dense([x[1], x[2]]))
# 리스트로 저장하기
from pyspark.mllib.linalg import Vectors
_tRdd =tRdd\
.map(lambda x: x.split('\t'))\
.map(lambda x: [str(x[0]), float(x[1]), float(x[2])])\
.map(lambda x: [x[1], x[2]])
# 표준화
from pyspark.mllib.feature import StandardScaler
scaler1 = StandardScaler().fit(_tRdd)
scaler2 = StandardScaler(withMean=True, withStd=True).fit(_tRdd)
scaler2.transform(_tRdd).take(5)
[DenseVector([-1.2458, -1.2299]), DenseVector([1.9011, 0.5934]), DenseVector([0.7388, 1.8767]), DenseVector([0.0919, 1.0473]), DenseVector([-0.1439, 1.1993])]
DataFrame 변환¶
기계학습에 넘겨줄 입력데이터를 형식에 맞추어야 한다.
데이터로부터 특징을 추출하여 feature vectors를 구성한다. 지도학습을 하는 경우에는 class 또는 label 값이 필요
# 레이블이 있는 Python List에서 DataFrame 생성
p = [[1, [1.0, 2.0, 3.0]], [1, [1.1, 2.1, 3.1]], [0, [1.2, 2.2, 3.3]]]
trainDf=spark.createDataFrame(p)
trainDf.collect()
#컬럼이 자동 명명되어서 만족스럽지 못함(_1, _2)
[Row(_1=1, _2=[1.0, 2.0, 3.0]), Row(_1=1, _2=[1.1, 2.1, 3.1]), Row(_1=0, _2=[1.2, 2.2, 3.3])]
LabeledPoint에서 DataFrame 생성¶
LabeledPoint는 RDD에서 사용하는 구조
from pyspark.mllib.regression import LabeledPoint
p = [LabeledPoint(1, [1.0,2.0,3.0]),
LabeledPoint(1, [1.1,2.1,3.1]),
LabeledPoint(0, [1.2,2.2,3.3])]
trainDf=spark.createDataFrame(p)
trainDf.collect()
[Row(features=DenseVector([1.0, 2.0, 3.0]), label=1.0), Row(features=DenseVector([1.1, 2.1, 3.1]), label=1.0), Row(features=DenseVector([1.2, 2.2, 3.3]), label=0.0)]
mllib Vectors를 사용하여 DataFrame을 생성¶
from pyspark.mllib.linalg import Vectors
trainDf = spark.createDataFrame([
(1.0, Vectors.dense([0.0, 1.1, 0.1])),
(0.0, Vectors.dense([2.0, 1.0, 1.0])),
(0.0, Vectors.dense([2.0, 1.3, 1.0])),
(1.0, Vectors.dense([0.0, 1.2, 0.5]))], ["label", "features"])
trainDf.collect()
[Row(label=1.0, features=DenseVector([0.0, 1.1, 0.1])), Row(label=0.0, features=DenseVector([2.0, 1.0, 1.0])), Row(label=0.0, features=DenseVector([2.0, 1.3, 1.0])), Row(label=1.0, features=DenseVector([0.0, 1.2, 0.5]))]
RDD에서 DataFrame 생성¶
rdd에서 DataFrame을 생성하면 labe, features이 당연히 생성이 되지 않는다.
from pyspark.ml.linalg import SparseVector # ml ok
_rdd = spark.sparkContext.parallelize([
(0.0, SparseVector(4, {1: 1.0, 3: 5.5})),
(1.0, SparseVector(4, {0: -1.0, 2: 0.5}))])
_df=_rdd.toDF()
_df=_df.withColumnRenamed('_1', 'label').withColumnRenamed('_2', 'features')
_df.show()
+-----+--------------------+ |label| features| +-----+--------------------+ | 0.0| (4,[1,3],[1.0,5.5])| | 1.0|(4,[0,2],[-1.0,0.5])| +-----+--------------------+
Dataframe 단어 빈도¶
Bag of Words 모델 : 자연어처리 NLP에서 사용하는 모델로, 텍스트를 단어의 집합(Set!)¶
텍스트 변환 단계¶
텍스트를 변환하는 단계를 보자. 순서는 변경될 수 있다.
1단계 : 단어로 분할 Tokenization¶
- 그, 영화는, 매우, 강렬했다, 그냥, 좋았다, 영화관에서, 보는, 동안, 긴장을, 늦출, 수, 없었다, 갑돌이가, 분장한, 악당의, 케릭터가, 만들어지는, 과정은, 흥미롭지, 않을, 수가, 없었다, 무비의, 이야기, 전개는, 빠르고, 무엇이, 진실이고, 거짓인지, 판단할, 수, 없었다, 누가, 이런, 영화를, 좋아, 하지, 않을, 수가, 있겠는가, 이모티콘
2단계 : 정리¶
- 불필요, 오타 등
3단계 : 불용어 stopwords 제거¶
- 그, 수, 수가, 수, 이런, 하지, 수가 등
4단계 : 어간 추출¶
- stemming 영화는, 영화의는 다른 단어지만 조사를 제거하면 동일한 단어
- 좋았다, 좋아 단어들은 어근을 판별하면 동일한 단어이다.
- 영화, 무비의 단어는 이음동의
5단계 : 계량화¶
- word vector로 만든다.
- 있다-없다, 단어빈도, TF-IDF 사용할 수 있다.
- dense, sparse 모두 가능하다.
[0 0 0 0 1 0 0 1 0 0 0 0 0 0 0 1 1 0 0 0 0 1 0 1 1 0 0 0]
[0 0 1 0 0 0 0 0 0 0 0 1 1 1 0 0 0 0 0 0 0 0 1 0 0 0 0 0]
[0 1 0 0 0 0 0 0 0 1 1 0 0 0 0 0 1 0 0 1 0 0 0 0 0 0 1 1]
[1 0 0 1 0 0 1 1 0 0 0 0 0 0 1 0 1 0 0 0 0 0 0 0 0 0 0 0]
[0 0 0 0 0 1 0 1 1 0 0 0 1 0 0 0 1 1 1 0 1 0 0 0 0 0 0 0]
[0 1 0 0 0 0 0 0 0 1 1 0 0 0 0 0 1 0 0 1 0 0 0 0 0 0 1 1]
[0 1 0 0 0 0 0 0 0 1 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]
[0 1 0 0 0 0 0 0 0 1 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]
[0 1 0 0 0 0 0 0 0 1 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]
[0 1 0 0 0 0 0 0 0 1 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]
[0 1 0 0 0 0 0 0 0 1 1 0 0 0 0 0 1 0 0 0 0 0 0 0 0 1 1 1]
doc2d=[
["When I find myself in times of trouble"],
["Mother Mary comes to me"],
["Speaking words of wisdom, let it be"],
["And in my hour of darkness"],
["She is standing right in front of me"],
["Speaking words of wisdom, let it be"],
[u"우리 Let it be"],
[u"나 Let it be"],
[u"너 Let it be"],
["Let it be"],
["Whisper words of wisdom, let it be"]
]
Spark의 transformer, estimator -> RDD의 map-reduce!¶
Dataframe에서는 Map-Reduce를 사용할 수 없다!¶
RDD를 만들고 나서도 데이터를 변환하기 위해 map-reduce와 같은 함수 또는 transform(), fit()을 사용하는 것과 같이, DataFrame도 역시 Transformer, Estimator를 사용할 수 있다
- Estimator: fit() 함수를 제공하는 객체이다. Estimator.fit()함수는 DataFrame에 적용되는 알고리즘을 적용하여, 모델인 Transformer를 생성
- Transformer: transform() 함수를 통해 위 모델을 적용하여 데이터를 변환하여 DataFrame을 생성
- Evaluator: 모델의 정확성을 측정
myDf=spark.createDataFrame(doc2d, ['sent']) # DataFrame을 생성. schema는 만들어 주지 않고 컬럼명을 sent로 한다.
myDf.show(truncate=True) # truncate : 출력을 줄임
+--------------------+ | sent| +--------------------+ |When I find mysel...| |Mother Mary comes...| |Speaking words of...| |And in my hour of...| |She is standing r...| |Speaking words of...| | 우리 Let it be| | 나 Let it be| | 너 Let it be| | Let it be| |Whisper words of ...| +--------------------+
1-1. Tokenizer¶
white space로 단어를 분리
from pyspark.ml.feature import Tokenizer
tokenizer = Tokenizer(inputCol="sent", outputCol="words") #입력 컬럼/출력 컬럼 지정
tokDf = tokenizer.transform(myDf) #Transform
tokDf.show(3)
+--------------------+--------------------+ | sent| words| +--------------------+--------------------+ |When I find mysel...|[when, i, find, m...| |Mother Mary comes...|[mother, mary, co...| |Speaking words of...|[speaking, words,...| +--------------------+--------------------+ only showing top 3 rows
for r in tokDf.select("sent", "words").take(3):
print (r)
Row(sent='When I find myself in times of trouble', words=['when', 'i', 'find', 'myself', 'in', 'times', 'of', 'trouble']) Row(sent='Mother Mary comes to me', words=['mother', 'mary', 'comes', 'to', 'me']) Row(sent='Speaking words of wisdom, let it be', words=['speaking', 'words', 'of', 'wisdom,', 'let', 'it', 'be'])
1-2. RegTokenizer¶
Tokenizer는 white space로 분리하지만, RegexTokenizer는 단어를 분리하기 위해 정규표현식을 적용할 수 있다.
from pyspark.ml.feature import RegexTokenizer
re = RegexTokenizer(inputCol="sent", outputCol="wordsReg", pattern="\\s+")
reDf=re.transform(myDf)
reDf.show()
+--------------------+--------------------+ | sent| wordsReg| +--------------------+--------------------+ |When I find mysel...|[when, i, find, m...| |Mother Mary comes...|[mother, mary, co...| |Speaking words of...|[speaking, words,...| |And in my hour of...|[and, in, my, hou...| |She is standing r...|[she, is, standin...| |Speaking words of...|[speaking, words,...| | 우리 Let it be| [우리, let, it, be]| | 나 Let it be| [나, let, it, be]| | 너 Let it be| [너, let, it, be]| | Let it be| [let, it, be]| |Whisper words of ...|[whisper, words, ...| +--------------------+--------------------+
2. 불용어 제거(Stopwords)¶
별 의미가 없거나 쓸모가 없는 단어들이 존재한다.
예를 들어 이, 그, 저와 같은 한 단어 또는 있다 등과 같은 일부 동사, 그래서, 그러나 등과 같은 접속사 등
from pyspark.ml.feature import StopWordsRemover
stop = StopWordsRemover(inputCol="wordsReg", outputCol="nostops")
# 불용어 리스트 가져오기
stopwords=list()
_stopwords=stop.getStopWords()
for e in _stopwords:
stopwords.append(e)
# 자신의 불용어 리스트 추가
_mystopwords=[u"나",u"너", u"우리"] #u는 유니코드(Python3에서는 안적어도 된다)
for e in _mystopwords:
stopwords.append(e)
stop.setStopWords(stopwords)
StopWordsRemover_ccd7e9a257e2
for e in stop.getStopWords():
print (e, end="/")
i/me/my/myself/we/our/ours/ourselves/you/your/yours/yourself/yourselves/he/him/his/himself/she/her/hers/herself/it/its/itself/they/them/their/theirs/themselves/what/which/who/whom/this/that/these/those/am/is/are/was/were/be/been/being/have/has/had/having/do/does/did/doing/a/an/the/and/but/if/or/because/as/until/while/of/at/by/for/with/about/against/between/into/through/during/before/after/above/below/to/from/up/down/in/out/on/off/over/under/again/further/then/once/here/there/when/where/why/how/all/any/both/each/few/more/most/other/some/such/no/nor/not/only/own/same/so/than/too/very/s/t/can/will/just/don/should/now/i'll/you'll/he'll/she'll/we'll/they'll/i'd/you'd/he'd/she'd/we'd/they'd/i'm/you're/he's/she's/it's/we're/they're/i've/we've/you've/they've/isn't/aren't/wasn't/weren't/haven't/hasn't/hadn't/don't/doesn't/didn't/won't/wouldn't/shan't/shouldn't/mustn't/can't/couldn't/cannot/could/here's/how's/let's/ought/that's/there's/what's/when's/where's/who's/why's/would/나/너/우리/
# Tokenizer한 단어들에서 불용어 제거하기
stopDf=stop.transform(reDf)
stopDf.show()
+--------------------+--------------------+--------------------+ | sent| wordsReg| nostops| +--------------------+--------------------+--------------------+ |When I find mysel...|[when, i, find, m...|[find, times, tro...| |Mother Mary comes...|[mother, mary, co...|[mother, mary, co...| |Speaking words of...|[speaking, words,...|[speaking, words,...| |And in my hour of...|[and, in, my, hou...| [hour, darkness]| |She is standing r...|[she, is, standin...|[standing, right,...| |Speaking words of...|[speaking, words,...|[speaking, words,...| | 우리 Let it be| [우리, let, it, be]| [let]| | 나 Let it be| [나, let, it, be]| [let]| | 너 Let it be| [너, let, it, be]| [let]| | Let it be| [let, it, be]| [let]| |Whisper words of ...|[whisper, words, ...|[whisper, words, ...| +--------------------+--------------------+--------------------+
3. 빈도 계산 (CountVectorizer)¶
단어 빈도를 ML을 위해서 Word Vector로 표현해야 한다.
3-1. ScikitLearn 사용¶
# 2차원 -> 1차원
from functools import reduce
doc = reduce(lambda x,y: x+y, doc2d)
from sklearn.feature_extraction.text import CountVectorizer
vectorizer = CountVectorizer(stop_words='english')
print (vectorizer.fit_transform(doc)) # 괄호(문서번호, 단어번호)의 빈도
(0, 9) 1 (0, 10) 1 (1, 5) 1 (1, 4) 1 (1, 0) 1 (2, 7) 1 (2, 13) 1 (2, 12) 1 (2, 3) 1 (3, 2) 1 (3, 1) 1 (4, 8) 1 (4, 6) 1 (5, 7) 1 (5, 13) 1 (5, 12) 1 (5, 3) 1 (6, 3) 1 (6, 14) 1 (7, 3) 1 (8, 3) 1 (9, 3) 1 (10, 13) 1 (10, 12) 1 (10, 3) 1 (10, 11) 1
print (vectorizer.fit_transform(doc))
(0, 9) 1 (0, 10) 1 (1, 5) 1 (1, 4) 1 (1, 0) 1 (2, 7) 1 (2, 13) 1 (2, 12) 1 (2, 3) 1 (3, 2) 1 (3, 1) 1 (4, 8) 1 (4, 6) 1 (5, 7) 1 (5, 13) 1 (5, 12) 1 (5, 3) 1 (6, 3) 1 (6, 14) 1 (7, 3) 1 (8, 3) 1 (9, 3) 1 (10, 13) 1 (10, 12) 1 (10, 3) 1 (10, 11) 1
vectorizer.vocabulary_
{'times': 9, 'trouble': 10, 'mother': 5, 'mary': 4, 'comes': 0, 'speaking': 7, 'words': 13, 'wisdom': 12, 'let': 3, 'hour': 2, 'darkness': 1, 'standing': 8, 'right': 6, '우리': 14, 'whisper': 11}
vectorizer.fit_transform(doc).todense()
matrix([[0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0, 0, 0, 0], [1, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0], [0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 0, 1, 1, 0], [0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], [0, 0, 0, 0, 0, 0, 1, 0, 1, 0, 0, 0, 0, 0, 0], [0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 0, 1, 1, 0], [0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1], [0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], [0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], [0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], [0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 0]])
3-2 Spark CountVectorizer¶
from pyspark.ml.feature import CountVectorizer
cv = CountVectorizer(inputCol="nostops", outputCol="cv", vocabSize=30, minDF=1.0) #vocabSize : 최대 단어 수 지정
# CountVectorizerModel은 fit()하고 나면 얻어진다. 다음에 사용하는 HashingTF는 fit()하지 않는다는 점에서 차이
cvModel = cv.fit(stopDf)
cvDf = cvModel.transform(stopDf)
cvDf.show(3) #희소벡터(Sparse Vectors) 형태
+--------------------+--------------------+--------------------+--------------------+ | sent| wordsReg| nostops| cv| +--------------------+--------------------+--------------------+--------------------+ |When I find mysel...|[when, i, find, m...|[find, times, tro...|(16,[5,7,9],[1.0,...| |Mother Mary comes...|[mother, mary, co...|[mother, mary, co...|(16,[6,12,13],[1....| |Speaking words of...|[speaking, words,...|[speaking, words,...|(16,[0,1,2,3],[1....| +--------------------+--------------------+--------------------+--------------------+ only showing top 3 rows
cvDf.select('sent','nostops','cv').show()
+--------------------+--------------------+--------------------+ | sent| nostops| cv| +--------------------+--------------------+--------------------+ |When I find mysel...|[find, times, tro...|(16,[5,7,9],[1.0,...| |Mother Mary comes...|[mother, mary, co...|(16,[6,12,13],[1....| |Speaking words of...|[speaking, words,...|(16,[0,1,2,3],[1....| |And in my hour of...| [hour, darkness]|(16,[8,11],[1.0,1...| |She is standing r...|[standing, right,...|(16,[4,10,14],[1....| |Speaking words of...|[speaking, words,...|(16,[0,1,2,3],[1....| | 우리 Let it be| [let]| (16,[0],[1.0])| | 나 Let it be| [let]| (16,[0],[1.0])| | 너 Let it be| [let]| (16,[0],[1.0])| | Let it be| [let]| (16,[0],[1.0])| |Whisper words of ...|[whisper, words, ...|(16,[0,1,2,15],[1...| +--------------------+--------------------+--------------------+
cvModel.vocabulary #CountVectorizer에서 사용된 단어 목록
['let', 'wisdom,', 'words', 'speaking', 'right', 'find', 'mother', 'trouble', 'hour', 'times', 'front', 'darkness', 'mary', 'comes', 'standing', 'whisper']
4. TF-IDF (단어 빈도 분석)¶
CountVectorizer는 텍스트를 단어의 빈도로 변환해주어, 문서 x 단어 표를 출력할 수 있다
그 다음으로, TF-IDF를 계산할 수 있다. 이 때 (문서id, 단어id) 별로 결과가 출력된다.
4-1. ScikitLearn에서의 TF-IDF 분석¶
from sklearn.feature_extraction.text import TfidfVectorizer
vectorizer = TfidfVectorizer(max_df=1.0, stop_words='english',norm = None)
print (vectorizer.fit_transform(doc)) # (Doc Index, Word Index) : TF-IDF
(0, 10) 2.791759469228055 (0, 9) 2.791759469228055 (1, 0) 2.791759469228055 (1, 4) 2.791759469228055 (1, 5) 2.791759469228055 (2, 3) 1.4054651081081644 (2, 12) 2.09861228866811 (2, 13) 2.09861228866811 (2, 7) 2.386294361119891 (3, 1) 2.791759469228055 (3, 2) 2.791759469228055 (4, 6) 2.791759469228055 (4, 8) 2.791759469228055 (5, 3) 1.4054651081081644 (5, 12) 2.09861228866811 (5, 13) 2.09861228866811 (5, 7) 2.386294361119891 (6, 14) 2.791759469228055 (6, 3) 1.4054651081081644 (7, 3) 1.4054651081081644 (8, 3) 1.4054651081081644 (9, 3) 1.4054651081081644 (10, 11) 2.791759469228055 (10, 3) 1.4054651081081644 (10, 12) 2.09861228866811 (10, 13) 2.09861228866811
vectorizer.vocabulary_
{'times': 9, 'trouble': 10, 'mother': 5, 'mary': 4, 'comes': 0, 'speaking': 7, 'words': 13, 'wisdom': 12, 'let': 3, 'hour': 2, 'darkness': 1, 'standing': 8, 'right': 6, '우리': 14, 'whisper': 11}
vectorizer.idf_
array([2.79175947, 2.79175947, 2.79175947, 1.40546511, 2.79175947, 2.79175947, 2.79175947, 2.38629436, 2.79175947, 2.79175947, 2.79175947, 2.79175947, 2.09861229, 2.09861229, 2.79175947])
4-2. Spark를 사용한 TF-IDF¶
from pyspark.ml.feature import HashingTF, IDF
hashTF = HashingTF(inputCol="nostops", outputCol="hash")
hashDf = hashTF.transform(stopDf) #fit() 대신 transform()을 사용
hashDf.select("nostops", "hash").show(truncate=False)
# 262144는 해시 개수 (앞서 CountVectorizer의 경우에서와 같이 전체 단어의 개수가 아니다),
# 그리고 다음 [64317,91878,152481]은 값이 있는 해시 컬럼 번호, 1.0,1.0,1.0은 그 값
+-------------------------------+--------------------------------------------------------+ |nostops |hash | +-------------------------------+--------------------------------------------------------+ |[find, times, trouble] |(262144,[64317,91878,152481],[1.0,1.0,1.0]) | |[mother, mary, comes] |(262144,[24657,63767,245426],[1.0,1.0,1.0]) | |[speaking, words, wisdom,, let]|(262144,[27556,151864,173339,175131],[1.0,1.0,1.0,1.0]) | |[hour, darkness] |(262144,[74517,98431],[1.0,1.0]) | |[standing, right, front] |(262144,[84798,218360,229166],[1.0,1.0,1.0]) | |[speaking, words, wisdom,, let]|(262144,[27556,151864,173339,175131],[1.0,1.0,1.0,1.0]) | |[let] |(262144,[173339],[1.0]) | |[let] |(262144,[173339],[1.0]) | |[let] |(262144,[173339],[1.0]) | |[let] |(262144,[173339],[1.0]) | |[whisper, words, wisdom,, let] |(262144,[151864,173339,175131,188139],[1.0,1.0,1.0,1.0])| +-------------------------------+--------------------------------------------------------+
4-3. Word2Vec¶
Word2Vec은 단어의 문맥을 표현하는 방법.
Word2Vec은 2013년 구글 Tomas Mikolov가 고안한 방법으로, 단어를 벡터로 변환하는 방법을 말하고, 그 벡터에 따라 단어 간의 의미적 유사성을 계산할 수 있다.
단어가 주변의 단어들과 어떻게 관련되어 있는지 서로의 맥락 또는 연관성 Word Embedding을 신경망으로 학습하여 Word2Vec을 계산
비슷한 의미를 가진 단어들의 벡터는 유사한 방향으로 향하게 되고, 벡터 간의 내적이나 코사인 유사도를 통해 단어 간의 유사성을 계산할 수 있다.
벡터('king') - 벡터('man') + 벡터('woman') = 벡터('queen) 이런 연산이 가능해진다. 즉 king 단어벡터에서 man 단어백터를 빼고, woman 단어백터를 더하면, queen 단어백터를 구할 수 있다는 의미이다.
_tigerDf=spark.createDataFrame([["호랑이는 매우 용맹하다"], ["호랑이는 무섭다고 하더라"], ["호랑이는 사납고 날쌔다"]], ['sent'])
from pyspark.ml.feature import Tokenizer
tigerTokenizer = Tokenizer(inputCol="sent", outputCol="words")
tigerDf=tigerTokenizer.transform(_tigerDf)
from pyspark.ml.feature import Word2Vec
word2Vec = Word2Vec(vectorSize=3, minCount=0, inputCol="words", outputCol="w2v")
model=word2Vec.fit(tigerDf) #모델 학습
23/12/20 12:45:55 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS 23/12/20 12:45:55 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS
w2vDf = model.transform(tigerDf) #모델 돌려서 결과 얻어냄
for e in w2vDf.select("w2v").take(3):
print(e)
Row(w2v=DenseVector([-0.0498, 0.0245, 0.0924])) Row(w2v=DenseVector([-0.1594, 0.0053, 0.0547])) Row(w2v=DenseVector([-0.1166, -0.0009, 0.021]))
model.getVectors().show(truncate=False)
+--------+-----------------------------------------------------------------+ |word |vector | +--------+-----------------------------------------------------------------+ |무섭다고|[-0.16658440232276917,0.14524580538272858,-0.05602681636810303] | |용맹하다|[-0.027605753391981125,-0.037912942469120026,0.14706382155418396]| |매우 |[0.027643119916319847,0.07871874421834946,0.03055230900645256] | |사납고 |[-0.08473973721265793,-0.04772188514471054,0.0029143476858735085]| |하더라 |[-0.16238440573215485,-0.16205385327339172,0.12059701234102249] | |날쌔다 |[-0.11577261984348297,0.012463564053177834,-0.03944210335612297] | |호랑이는|[-0.14930878579616547,0.032565854489803314,0.09943961352109909] | +--------+-----------------------------------------------------------------+
model.findSynonyms("용맹하다", 2).show() #Word Token을 유사도 순 출력
+--------+------------------+ | word| similarity| +--------+------------------+ | 하더라|0.7089282274246216| |호랑이는|0.6222048401832581| +--------+------------------+
4-4. N-gram¶
텍스트를 대상으로 하면, n-gram은 연속된 n개의 토큰으로 구성된 순열을 말한다. unigram은 한 단어로, bigram은 두 단어로 구성한다.
from pyspark.ml.feature import NGram
ngram = NGram(n=2, inputCol="words", outputCol="ngrams")
ngramDf = ngram.transform(tokDf)
ngramDf.show()
+--------------------+--------------------+----------------------+ | sent| words| ngrams| +--------------------+--------------------+----------------------+ |When I find mysel...|[when, i, find, m...| [when i, i find, ...| |Mother Mary comes...|[mother, mary, co...| [mother mary, mar...| |Speaking words of...|[speaking, words,...| [speaking words, ...| |And in my hour of...|[and, in, my, hou...| [and in, in my, m...| |She is standing r...|[she, is, standin...| [she is, is stand...| |Speaking words of...|[speaking, words,...| [speaking words, ...| | 우리 Let it be| [우리, let, it, be]|[우리 let, let it, ...| | 나 Let it be| [나, let, it, be]| [나 let, let it, i...| | 너 Let it be| [너, let, it, be]| [너 let, let it, i...| | Let it be| [let, it, be]| [let it, it be]| |Whisper words of ...|[whisper, words, ...| [whisper words, w...| +--------------------+--------------------+----------------------+
for e in ngramDf.select("words","ngrams").take(3):
print (e)
Row(words=['when', 'i', 'find', 'myself', 'in', 'times', 'of', 'trouble'], ngrams=['when i', 'i find', 'find myself', 'myself in', 'in times', 'times of', 'of trouble']) Row(words=['mother', 'mary', 'comes', 'to', 'me'], ngrams=['mother mary', 'mary comes', 'comes to', 'to me']) Row(words=['speaking', 'words', 'of', 'wisdom,', 'let', 'it', 'be'], ngrams=['speaking words', 'words of', 'of wisdom,', 'wisdom, let', 'let it', 'it be'])
4-5. StringIndexer¶
문자열 컬럼을 인덱스 컬럼으로 변환한다. 빈도가 제일 높은 순서로 0.0부터 인덱스 값이 주어진다
from pyspark.ml.feature import StringIndexer
labelIndexer = StringIndexer(inputCol="sent", outputCol="sentLabel")
model=labelIndexer.fit(myDf)
siDf=model.transform(myDf)
siDf.orderBy('sentLabel').show()
+--------------------+---------+ | sent|sentLabel| +--------------------+---------+ |Speaking words of...| 0.0| |Speaking words of...| 0.0| |And in my hour of...| 1.0| | Let it be| 2.0| |Mother Mary comes...| 3.0| |She is standing r...| 4.0| |When I find mysel...| 5.0| |Whisper words of ...| 6.0| | 나 Let it be| 7.0| | 너 Let it be| 8.0| | 우리 Let it be| 9.0| +--------------------+---------+
4-6. One-Hot Encoding¶
Dummy 변수. 명목변수는 ML을 수행할 수 없으므로 변환하는 방법
One-Hot Encoding은 명목변수 인덱스를 이진벡터로 변환하여, 서로 순서가 없도록 한다.
사자, 호랑이, 사람에게 인덱스 0, 1, 2가 배정되었다고 하자. 그렇다고 해서 사자가 호랑이보다 실제로는 그런 순서가 있지 않다.
ex) 사자 -> 10, 호랑이 -> 01, 사람 -> 00..
df = spark.createDataFrame([
(1, "B"),
(2, "C"),
(3, "A"),
(4, "B"),
(5, "C"),
(6, "A")
], ["id", "grade"])
from pyspark.ml.feature import StringIndexer
stringIndexer = StringIndexer(inputCol="grade", outputCol="gradeIndex")
model = stringIndexer.fit(df)
indexed = model.transform(df)
from pyspark.ml.feature import OneHotEncoder
encoder = OneHotEncoder(inputCol="gradeIndex", outputCol="gradeVec")
encoded = encoder.fit(indexed)
encoded.transform(indexed).show()
+---+-----+----------+-------------+ | id|grade|gradeIndex| gradeVec| +---+-----+----------+-------------+ | 1| B| 1.0|(2,[1],[1.0])| | 2| C| 2.0| (2,[],[])| | 3| A| 0.0|(2,[0],[1.0])| | 4| B| 1.0|(2,[1],[1.0])| | 5| C| 2.0| (2,[],[])| | 6| A| 0.0|(2,[0],[1.0])| +---+-----+----------+-------------+
from pyspark.sql.types import *
rdd=spark.sparkContext\
.textFile(os.path.join('data','ds_spark_heightweight.txt'))
myRdd=rdd.map(lambda line:[float(x) for x in line.split('\t')])
myDf=spark.createDataFrame(myRdd,["id","weight","height"])
myDf.printSchema()
root |-- id: double (nullable = true) |-- weight: double (nullable = true) |-- height: double (nullable = true)
from pyspark.ml.feature import Binarizer
binarizer = Binarizer(threshold=68.0, inputCol="weight", outputCol="weight2") # 68을 임계값으로 두고 이진분류
binDf = binarizer.transform(myDf)
binDf.show(10)
+----+------+------+-------+ | id|weight|height|weight2| +----+------+------+-------+ | 1.0| 65.78|112.99| 0.0| | 2.0| 71.52|136.49| 1.0| | 3.0| 69.4|153.03| 1.0| | 4.0| 68.22|142.34| 1.0| | 5.0| 67.79| 144.3| 0.0| | 6.0| 68.7| 123.3| 1.0| | 7.0| 69.8|141.49| 1.0| | 8.0| 70.01|136.46| 1.0| | 9.0| 67.9|112.37| 0.0| |10.0| 66.78|120.67| 0.0| +----+------+------+-------+ only showing top 10 rows
from pyspark.ml.feature import QuantileDiscretizer
discretizer = QuantileDiscretizer(numBuckets=3, inputCol="height", outputCol="height3") #3분위로 키를 분류
qdDf = discretizer.fit(binDf).transform(binDf)
qdDf.show(10)
+----+------+------+-------+-------+ | id|weight|height|weight2|height3| +----+------+------+-------+-------+ | 1.0| 65.78|112.99| 0.0| 0.0| | 2.0| 71.52|136.49| 1.0| 1.0| | 3.0| 69.4|153.03| 1.0| 2.0| | 4.0| 68.22|142.34| 1.0| 2.0| | 5.0| 67.79| 144.3| 0.0| 2.0| | 6.0| 68.7| 123.3| 1.0| 0.0| | 7.0| 69.8|141.49| 1.0| 2.0| | 8.0| 70.01|136.46| 1.0| 1.0| | 9.0| 67.9|112.37| 0.0| 0.0| |10.0| 66.78|120.67| 0.0| 0.0| +----+------+------+-------+-------+ only showing top 10 rows
VectorAssembler¶
열을 묶어서 Vector열로 만든다. features 컬럼을 생성할 경우에 사용한다. 단 문자열 string은 묶을 수 없다.
from pyspark.ml.feature import VectorAssembler
va = VectorAssembler(inputCols=["weight2","height3"],outputCol="features")
vaDf = va.transform(qdDf)
vaDf.printSchema()
vaDf.show(5)
root |-- id: double (nullable = true) |-- weight: double (nullable = true) |-- height: double (nullable = true) |-- weight2: double (nullable = true) |-- height3: double (nullable = true) |-- features: vector (nullable = true) +---+------+------+-------+-------+---------+ | id|weight|height|weight2|height3| features| +---+------+------+-------+-------+---------+ |1.0| 65.78|112.99| 0.0| 0.0|(2,[],[])| |2.0| 71.52|136.49| 1.0| 1.0|[1.0,1.0]| |3.0| 69.4|153.03| 1.0| 2.0|[1.0,2.0]| |4.0| 68.22|142.34| 1.0| 2.0|[1.0,2.0]| |5.0| 67.79| 144.3| 0.0| 2.0|[0.0,2.0]| +---+------+------+-------+-------+---------+ only showing top 5 rows
Pipeline¶
Pipeline은 여러 Estimator를 묶은 Estimator를 반환한다. Pipeline은 여러 작업을 묶어, 순서대로 단계적으로 Estimator를 적용하기 위해 사용한다.
df = spark.createDataFrame([
(0, "a b c d e spark", 1.0),
(1, "b d", 0.0),
(2, "spark f g h", 1.0),
(3, "hadoop mapreduce", 0.0),
(4, "my dog has flea problems. help please.",0.0)
], ["id", "text", "label"])
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.ml.classification import LogisticRegression
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.01)
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr]) #Tokenizer, HashingTF, 선형회귀를 함께 적용
model = pipeline.fit(df)
myDf = model.transform(df)
myDf.select('label', 'features').show()
+-----+--------------------+ |label| features| +-----+--------------------+ | 1.0|(262144,[74920,89...| | 0.0|(262144,[89530,14...| | 1.0|(262144,[36803,17...| | 0.0|(262144,[132966,1...| | 0.0|(262144,[1074,389...| +-----+--------------------+
문제 S-4: 연설문을 기계학습하기 위해 변환¶
2019.10.21일 '제74주년 경찰의 날 기념식 축사' 전문을 변환하세요. 전문은 http://www.korea.kr/archive/speechView.do?newsId=132031636 에서 읽을 수 있고, 해당 사이트에서 텍스트만 파일로 저장해서 사용한다.
- DataFrame 생성
- 단어로 분리해서, 출력
- 정리 strip, replace
- 불용어 구성, 출력 - 축사 전문에서 한글자로 된 단어를 찾아내 스스로 구성
- 불용어 제거하고, 출력
- TF-IDF를 계산하고, 출력
- TF-IDF 컬럼을 features로 구성, 출력
1. Dataframe 생성¶
import os
from pyspark.sql.types import StructType, StructField, StringType
police=spark.read\
.options(header="true", delimiter=" ", inferSchema="true")\
.schema(
StructType([
StructField("sent",StringType()),
])
)\
.text(os.path.join("data", "20191021_policeAddress.txt"))
police.take(5)
[Row(sent='존경하는 국민 여러분, 경찰관 여러분, 일흔네 돌 ‘경찰의 날’입니다.'), Row(sent=' '), Row(sent='국민의 안전을 위해 밤낮없이 애쓰시는 전국의 15만 경찰관 여러분께 먼저 감사를 드립니다. 전몰·순직 경찰관들의 고귀한 희생에 경의를 표합니다. 유가족 여러분께 위로의 마음을 전합니다.'), Row(sent=' '), Row(sent='오늘 홍조근정훈장을 받으신 중앙경찰학교장 이은정 치안감님, 근정포장을 받으신 광주남부경찰서 김동현 경감님을 비롯한 수상자 여러분께 각별한 축하와 감사를 드립니다. 또한 경찰 영웅으로 추서되신 차일혁, 최중락님께 국민의 사랑을 전해드립니다.')]
_police=police.filter("sent != ' '") #빈 줄 제거
_police.show(3)
+----------------------------------+ | sent| +----------------------------------+ | 존경하는 국민 여러분, 경찰관 ...| | 국민의 안전을 위해 밤낮없이 애...| |오늘 홍조근정훈장을 받으신 중앙...| +----------------------------------+ only showing top 3 rows
2. 단어 분리¶
from pyspark.ml.feature import Tokenizer
tokenizer = Tokenizer(inputCol="sent", outputCol="tokens")
tokDf = tokenizer.transform(police)
tokDf.show(3)
+---------------------------------+------------------------------+ | sent| tokens| +---------------------------------+------------------------------+ | 존경하는 국민 여러분, 경찰관 ...| [존경하는, 국민, 여러분,, ...| | | []| |국민의 안전을 위해 밤낮없이 애...|[국민의, 안전을, 위해, 밤낮...| +---------------------------------+------------------------------+ only showing top 3 rows
for r in tokDf.select("sent").take(3):
print (r[0])
존경하는 국민 여러분, 경찰관 여러분, 일흔네 돌 ‘경찰의 날’입니다. 국민의 안전을 위해 밤낮없이 애쓰시는 전국의 15만 경찰관 여러분께 먼저 감사를 드립니다. 전몰·순직 경찰관들의 고귀한 희생에 경의를 표합니다. 유가족 여러분께 위로의 마음을 전합니다.
3. 텍스트 정리¶
컴마, 따옴표, 마침표, 숫자 등을 제거하자.
wordList=['존경하는', '국민', '여러분,', '경찰관', '여러분,', '일흔네', '‘경찰의', '날’입니다.']
cleaned=list()
for w in wordList:
cleaned.append(w.lstrip('‘').rstrip("’").rstrip(',').rstrip('.').replace("’","").replace("”",""))
cleaned
['존경하는', '국민', '여러분', '경찰관', '여러분', '일흔네', '경찰의', '날입니다']
3-1. udf 함수¶
컴마, 따옴표, 마침표, 숫자 제거
import re
def trim(wordList):
regex = re.compile('\d+') #숫자가 하나 이상 있는 경우의 정규식 패턴
cleaned=list()
for w in wordList:
if not regex.match(w):
cleaned.append(w.lstrip('‘').rstrip("’").rstrip(',').rstrip('.').replace("’","").replace("”",""))
return cleaned
myList=["1", "123", "15만", "2015년에", "15.1%", "74.5점", "8,572명을", "Seoul1", "Seoul"]
trim(myList)
['Seoul1', 'Seoul']
from pyspark.sql import functions as f
from pyspark.sql.types import ArrayType, StringType
trimUdf=f.udf(trim, ArrayType(StringType()))
wordsDf = tokDf.withColumn('words', trimUdf(f.col('tokens')))
wordsDf.show(1, False)
[Stage 74:> (0 + 1) / 1]
+-----------------------------------------------------------------+---------------------------------------------------------------------------+----------------------------------------------------------------------+ |sent |tokens |words | +-----------------------------------------------------------------+---------------------------------------------------------------------------+----------------------------------------------------------------------+ |존경하는 국민 여러분, 경찰관 여러분, 일흔네 돌 ‘경찰의 날’입니다.|[존경하는, 국민, 여러분,, 경찰관, 여러분,, 일흔네, 돌, ‘경찰의, 날’입니다.]|[존경하는, 국민, 여러분, 경찰관, 여러분, 일흔네, 돌, 경찰의, 날입니다]| +-----------------------------------------------------------------+---------------------------------------------------------------------------+----------------------------------------------------------------------+ only showing top 1 row
4. 불용어 제거¶
from pyspark.ml.feature import StopWordsRemover
#stop = StopWordsRemover(inputCol="tokens", outputCol="nostops")
stop = StopWordsRemover(inputCol="words", outputCol="nostops")
stop.setStopWords([u"돌", u"너", u"우리", u'있습니다', u'더', u'합니다', u'그', u'드립니다', u'것입니다'])
stopDf=stop.transform(wordsDf)
stopDf.show(1, False)
+-----------------------------------------------------------------+---------------------------------------------------------------------------+----------------------------------------------------------------------+------------------------------------------------------------------+ |sent |tokens |words |nostops | +-----------------------------------------------------------------+---------------------------------------------------------------------------+----------------------------------------------------------------------+------------------------------------------------------------------+ |존경하는 국민 여러분, 경찰관 여러분, 일흔네 돌 ‘경찰의 날’입니다.|[존경하는, 국민, 여러분,, 경찰관, 여러분,, 일흔네, 돌, ‘경찰의, 날’입니다.]|[존경하는, 국민, 여러분, 경찰관, 여러분, 일흔네, 돌, 경찰의, 날입니다]|[존경하는, 국민, 여러분, 경찰관, 여러분, 일흔네, 경찰의, 날입니다]| +-----------------------------------------------------------------+---------------------------------------------------------------------------+----------------------------------------------------------------------+------------------------------------------------------------------+ only showing top 1 row
5. 전체 단어빈도¶
Row 단위 연산 -> RDD로 변환해야 한다.
stopDf.select("nostops").rdd.take(3)
[Row(nostops=['존경하는', '국민', '여러분', '경찰관', '여러분', '일흔네', '경찰의', '날입니다']), Row(nostops=[]), Row(nostops=['국민의', '안전을', '위해', '밤낮없이', '애쓰시는', '전국의', '경찰관', '여러분께', '먼저', '감사를', '전몰·순직', '경찰관들의', '고귀한', '희생에', '경의를', '표합니다', '유가족', '여러분께', '위로의', '마음을', '전합니다'])]
stopDf.select("nostops").rdd.flatMap(lambda x:x).take(2) #RDD 변환 -> Flatmap(Row 제거)
[['존경하는', '국민', '여러분', '경찰관', '여러분', '일흔네', '경찰의', '날입니다'], []]
stopDf.select("nostops").rdd.flatMap(lambda x:x).flatMap(lambda x:x).take(3) #Flatmap 한번 더 -> 1차원
['존경하는', '국민', '여러분']
stopDf.select("nostops").rdd\
.flatMap(lambda x:x).flatMap(lambda x:x)\
.map(lambda x: x.replace("경찰은","경찰"))\
.map(lambda x: x.replace("경찰의","경찰"))\
.map(lambda x:(x,1))\
.reduceByKey(lambda x,y:x+y)\
.map(lambda x:(x[1],x[0]))\
.sortByKey(False)\
.take(20)
#경찰은, 경찰의 -> 이음동의어 처리
[(18, '경찰'), (7, '국민의'), (6, '여러분'), (5, '경찰관'), (4, '우리의'), (3, '여러분께'), (3, '역대'), (3, '가장'), (3, '함께'), (2, '안전을'), (2, '위해'), (2, '먼저'), (2, '감사를'), (2, '받으신'), (2, '비롯한'), (2, '또한'), (2, '외국'), (2, '경찰을'), (2, '경찰헌장은'), (2, '겨레를')]
6. TF-IDF 계산, features로 구성¶
from pyspark.ml.feature import HashingTF, IDF
hashTF = HashingTF(inputCol="nostops", outputCol="hash")
hashDf = hashTF.transform(stopDf)
idf = IDF(inputCol="hash", outputCol="idf")
idfModel = idf.fit(hashDf)
idfDf = idfModel.transform(hashDf)
idfDf.select("nostops", "hash").show(1, truncate=False)
+------------------------------------------------------------------+----------------------------------------------------------------------------------+ |nostops |hash | +------------------------------------------------------------------+----------------------------------------------------------------------------------+ |[존경하는, 국민, 여러분, 경찰관, 여러분, 일흔네, 경찰의, 날입니다]|(262144,[162,62257,80697,80732,89874,127225,160086],[1.0,1.0,1.0,1.0,2.0,1.0,1.0])| +------------------------------------------------------------------+----------------------------------------------------------------------------------+ only showing top 1 row
7. Word2Vec¶
from pyspark.ml.feature import Word2Vec
word2Vec = Word2Vec(vectorSize=20, minCount=1, inputCol="tokens", outputCol="w2v")
model = word2Vec.fit(tokDf)
w2vDf = model.transform(tokDf)
model.getVectors().show(1, truncate=False)
+--------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ |word |vector | +--------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ |공정해야|[0.00914848130196333,-0.016886720433831215,-0.006619544699788094,-0.010087245143949986,-0.007846644148230553,-0.020566700026392937,0.024165349081158638,-0.007920094765722752,0.022425495088100433,-0.0029738033190369606,0.014662348665297031,0.01732100173830986,-0.017258305102586746,-0.008982984349131584,0.006310200318694115,0.014133486896753311,-0.00215858849696815,0.004804850555956364,-0.02151719108223915,-0.023807846009731293]| +--------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ only showing top 1 row
model.findSynonyms("치안이", 5).show()
+-------------+------------------+ | word| similarity| +-------------+------------------+ | 국민이|0.6197648644447327| | 감사의|0.5400314331054688| |경찰위원장님,|0.5180076956748962| | 갖추고|0.5122513771057129| | 국민께|0.5062524080276489| +-------------+------------------+
'Data Science > 데이터분석 (Spark)' 카테고리의 다른 글
[빅데이터분석] Spark에서 머신러닝을 통한 예측 수행 (1) | 2024.01.05 |
---|---|
[빅데이터분석] 통계 이론과 지표 계산하기 (0) | 2023.12.11 |
[빅데이터분석] Spark Dataframe 생성과 Dataframe API 사용하기 (1) | 2023.11.29 |
[빅데이터분석] Spark RDD 다루기 : 데이터 집계와 Paired-RDD (0) | 2023.11.27 |
[빅데이터분석] Spark RDD 다루기 : 구조 생성, 데이터 처리 및 Map-Reduce (0) | 2023.11.27 |