[빅데이터분석] Spark에서 머신러닝을 통한 예측 수행
2024. 1. 5. 04:32
반응형
<상명대학교 빅데이터분석 임좌상 교수님 수업을 듣고>
기계학습¶
ML에 사용할 수 있도록 변환한 데이터들을 다양한 모델을 활용하여 지도학습을 통해 예측을 수행해보자 해당 Jupyter Notebook에서는 다음 내용들을 다룬다.
- 서포트 벡터머신(Support Vector Machine)
- 로지스틱 회귀(Logistic Regression)
- 네이브 베이시안(Naive Bayes)
- 의사결정 트리(Decision Tree)
In [1]:
import os, sys
import pyspark
myConf=pyspark.SparkConf()
spark = pyspark.sql.SparkSession.builder\
.master("local")\
.appName("myApp")\
.config(conf=myConf)\
.getOrCreate()
23/12/19 04:43:55 WARN Utils: Your hostname, sojaehwiui-MacBookPro.local resolves to a loopback address: 127.0.0.1; using 172.30.1.29 instead (on interface en0) 23/12/19 04:43:55 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 23/12/19 04:43:55 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
In [2]:
import os
fsvm=os.path.join(os.getcwd(),'data','sample_libsvm_data.txt')
dfsvm = spark.read.format("libsvm").load(fsvm)
23/12/19 04:43:56 WARN LibSVMFileFormat: 'numFeatures' option not specified, determining the number of features by going though the input. If you know the number in advance, please specify it via 'numFeatures' option to avoid the extra scan.
In [3]:
dfsvm.printSchema()
root |-- label: double (nullable = true) |-- features: vector (nullable = true)
In [4]:
dfsvm.take(1) #Label과 Features로 구성되어 있다. Label + Features(Dictionary)
Out[4]:
[Row(label=0.0, features=SparseVector(692, {127: 51.0, 128: 159.0, 129: 253.0, 130: 159.0, 131: 50.0, 154: 48.0, 155: 238.0, 156: 252.0, 157: 252.0, 158: 252.0, 159: 237.0, 181: 54.0, 182: 227.0, 183: 253.0, 184: 252.0, 185: 239.0, 186: 233.0, 187: 252.0, 188: 57.0, 189: 6.0, 207: 10.0, 208: 60.0, 209: 224.0, 210: 252.0, 211: 253.0, 212: 252.0, 213: 202.0, 214: 84.0, 215: 252.0, 216: 253.0, 217: 122.0, 235: 163.0, 236: 252.0, 237: 252.0, 238: 252.0, 239: 253.0, 240: 252.0, 241: 252.0, 242: 96.0, 243: 189.0, 244: 253.0, 245: 167.0, 262: 51.0, 263: 238.0, 264: 253.0, 265: 253.0, 266: 190.0, 267: 114.0, 268: 253.0, 269: 228.0, 270: 47.0, 271: 79.0, 272: 255.0, 273: 168.0, 289: 48.0, 290: 238.0, 291: 252.0, 292: 252.0, 293: 179.0, 294: 12.0, 295: 75.0, 296: 121.0, 297: 21.0, 300: 253.0, 301: 243.0, 302: 50.0, 316: 38.0, 317: 165.0, 318: 253.0, 319: 233.0, 320: 208.0, 321: 84.0, 328: 253.0, 329: 252.0, 330: 165.0, 343: 7.0, 344: 178.0, 345: 252.0, 346: 240.0, 347: 71.0, 348: 19.0, 349: 28.0, 356: 253.0, 357: 252.0, 358: 195.0, 371: 57.0, 372: 252.0, 373: 252.0, 374: 63.0, 384: 253.0, 385: 252.0, 386: 195.0, 399: 198.0, 400: 253.0, 401: 190.0, 412: 255.0, 413: 253.0, 414: 196.0, 426: 76.0, 427: 246.0, 428: 252.0, 429: 112.0, 440: 253.0, 441: 252.0, 442: 148.0, 454: 85.0, 455: 252.0, 456: 230.0, 457: 25.0, 466: 7.0, 467: 135.0, 468: 253.0, 469: 186.0, 470: 12.0, 482: 85.0, 483: 252.0, 484: 223.0, 493: 7.0, 494: 131.0, 495: 252.0, 496: 225.0, 497: 71.0, 510: 85.0, 511: 252.0, 512: 145.0, 520: 48.0, 521: 165.0, 522: 252.0, 523: 173.0, 538: 86.0, 539: 253.0, 540: 225.0, 547: 114.0, 548: 238.0, 549: 253.0, 550: 162.0, 566: 85.0, 567: 252.0, 568: 249.0, 569: 146.0, 570: 48.0, 571: 29.0, 572: 85.0, 573: 178.0, 574: 225.0, 575: 253.0, 576: 223.0, 577: 167.0, 578: 56.0, 594: 85.0, 595: 252.0, 596: 252.0, 597: 252.0, 598: 229.0, 599: 215.0, 600: 252.0, 601: 252.0, 602: 252.0, 603: 196.0, 604: 130.0, 622: 28.0, 623: 199.0, 624: 252.0, 625: 252.0, 626: 253.0, 627: 252.0, 628: 252.0, 629: 233.0, 630: 145.0, 651: 25.0, 652: 128.0, 653: 252.0, 654: 253.0, 655: 252.0, 656: 141.0, 657: 37.0}))]
Label to String¶
In [5]:
from pyspark.sql.types import StringType, IntegerType
_df=dfsvm.withColumn('labelStr', dfsvm.label.cast(IntegerType()).cast(StringType()))
_df.show(5)
+-----+--------------------+--------+ |label| features|labelStr| +-----+--------------------+--------+ | 0.0|(692,[127,128,129...| 0| | 1.0|(692,[158,159,160...| 1| | 1.0|(692,[124,125,126...| 1| | 1.0|(692,[152,153,154...| 1| | 1.0|(692,[151,152,153...| 1| +-----+--------------------+--------+ only showing top 5 rows
MLUtils를 이용하여 RDD 읽기¶
In [6]:
from pyspark.mllib.util import MLUtils
data = MLUtils.loadLibSVMFile(spark.sparkContext, fsvm)
label = data.map(lambda x: x.label)
features = data.map(lambda x: x.features)
label.take(5)
Out[6]:
[0.0, 1.0, 1.0, 1.0, 1.0]
In [7]:
features.take(1)
Out[7]:
[SparseVector(692, {127: 51.0, 128: 159.0, 129: 253.0, 130: 159.0, 131: 50.0, 154: 48.0, 155: 238.0, 156: 252.0, 157: 252.0, 158: 252.0, 159: 237.0, 181: 54.0, 182: 227.0, 183: 253.0, 184: 252.0, 185: 239.0, 186: 233.0, 187: 252.0, 188: 57.0, 189: 6.0, 207: 10.0, 208: 60.0, 209: 224.0, 210: 252.0, 211: 253.0, 212: 252.0, 213: 202.0, 214: 84.0, 215: 252.0, 216: 253.0, 217: 122.0, 235: 163.0, 236: 252.0, 237: 252.0, 238: 252.0, 239: 253.0, 240: 252.0, 241: 252.0, 242: 96.0, 243: 189.0, 244: 253.0, 245: 167.0, 262: 51.0, 263: 238.0, 264: 253.0, 265: 253.0, 266: 190.0, 267: 114.0, 268: 253.0, 269: 228.0, 270: 47.0, 271: 79.0, 272: 255.0, 273: 168.0, 289: 48.0, 290: 238.0, 291: 252.0, 292: 252.0, 293: 179.0, 294: 12.0, 295: 75.0, 296: 121.0, 297: 21.0, 300: 253.0, 301: 243.0, 302: 50.0, 316: 38.0, 317: 165.0, 318: 253.0, 319: 233.0, 320: 208.0, 321: 84.0, 328: 253.0, 329: 252.0, 330: 165.0, 343: 7.0, 344: 178.0, 345: 252.0, 346: 240.0, 347: 71.0, 348: 19.0, 349: 28.0, 356: 253.0, 357: 252.0, 358: 195.0, 371: 57.0, 372: 252.0, 373: 252.0, 374: 63.0, 384: 253.0, 385: 252.0, 386: 195.0, 399: 198.0, 400: 253.0, 401: 190.0, 412: 255.0, 413: 253.0, 414: 196.0, 426: 76.0, 427: 246.0, 428: 252.0, 429: 112.0, 440: 253.0, 441: 252.0, 442: 148.0, 454: 85.0, 455: 252.0, 456: 230.0, 457: 25.0, 466: 7.0, 467: 135.0, 468: 253.0, 469: 186.0, 470: 12.0, 482: 85.0, 483: 252.0, 484: 223.0, 493: 7.0, 494: 131.0, 495: 252.0, 496: 225.0, 497: 71.0, 510: 85.0, 511: 252.0, 512: 145.0, 520: 48.0, 521: 165.0, 522: 252.0, 523: 173.0, 538: 86.0, 539: 253.0, 540: 225.0, 547: 114.0, 548: 238.0, 549: 253.0, 550: 162.0, 566: 85.0, 567: 252.0, 568: 249.0, 569: 146.0, 570: 48.0, 571: 29.0, 572: 85.0, 573: 178.0, 574: 225.0, 575: 253.0, 576: 223.0, 577: 167.0, 578: 56.0, 594: 85.0, 595: 252.0, 596: 252.0, 597: 252.0, 598: 229.0, 599: 215.0, 600: 252.0, 601: 252.0, 602: 252.0, 603: 196.0, 604: 130.0, 622: 28.0, 623: 199.0, 624: 252.0, 625: 252.0, 626: 253.0, 627: 252.0, 628: 252.0, 629: 233.0, 630: 145.0, 651: 25.0, 652: 128.0, 653: 252.0, 654: 253.0, 655: 252.0, 656: 141.0, 657: 37.0})]
Train, Test Set 분할하기¶
In [8]:
train, test = dfsvm.randomSplit([0.6,0.4])
dfsvm.groupby('label').count().show()
+-----+-----+ |label|count| +-----+-----+ | 0.0| 43| | 1.0| 57| +-----+-----+
모델 적용하기¶
1. Support Vector Machine¶
In [9]:
from pyspark.ml.classification import LinearSVC
lsvc = LinearSVC(maxIter=10, regParam=0.1) #모델 파라미터 설정
In [10]:
lsvcModel = lsvc.fit(train) #훈련데이터에 대해서 훈련
23/12/19 04:44:04 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS 23/12/19 04:44:04 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS
In [11]:
testDf = lsvcModel.transform(test) #테스트셋에 대해서 검증
In [12]:
testDf.select('label', 'prediction').show(100)
+-----+----------+ |label|prediction| +-----+----------+ | 0.0| 0.0| | 0.0| 0.0| | 0.0| 0.0| | 0.0| 0.0| | 0.0| 0.0| | 0.0| 0.0| | 0.0| 0.0| | 0.0| 0.0| | 0.0| 0.0| | 0.0| 0.0| | 0.0| 0.0| | 0.0| 0.0| | 0.0| 0.0| | 0.0| 0.0| | 0.0| 0.0| | 0.0| 0.0| | 0.0| 0.0| | 1.0| 1.0| | 1.0| 1.0| | 1.0| 1.0| | 1.0| 1.0| | 1.0| 1.0| | 1.0| 1.0| | 1.0| 1.0| | 1.0| 1.0| | 1.0| 1.0| | 1.0| 1.0| | 1.0| 1.0| | 1.0| 1.0| | 1.0| 1.0| | 1.0| 1.0| | 1.0| 1.0| | 1.0| 1.0| | 1.0| 1.0| | 1.0| 1.0| | 1.0| 1.0| | 1.0| 1.0| +-----+----------+
2. 로지스틱 회귀¶
이진 분류를 수행해보자
In [13]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)
lrModel = lr.fit(train)
testDf = lrModel.transform(test)
testDf.printSchema()
root |-- label: double (nullable = true) |-- features: vector (nullable = true) |-- rawPrediction: vector (nullable = true) |-- probability: vector (nullable = true) |-- prediction: double (nullable = false)
In [14]:
testDf.select('label','rawPrediction','probability','prediction').show(100)
+-----+--------------------+--------------------+----------+ |label| rawPrediction| probability|prediction| +-----+--------------------+--------------------+----------+ | 0.0|[0.97401772358881...|[0.72591959058934...| 0.0| | 0.0|[0.60872904769431...|[0.64765082619373...| 0.0| | 0.0|[0.99306355311509...|[0.72969260659674...| 0.0| | 0.0|[1.00268802621131...|[0.73158674830029...| 0.0| | 0.0|[0.91463076375201...|[0.71394682481350...| 0.0| | 0.0|[0.99978333499413...|[0.73101597757179...| 0.0| | 0.0|[0.81270585248836...|[0.69268580514083...| 0.0| | 0.0|[0.99119625838965...|[0.72932414101630...| 0.0| | 0.0|[0.94748028228275...|[0.72060816021990...| 0.0| | 0.0|[0.95876875042810...|[0.72287522073720...| 0.0| | 0.0|[0.82460695680365...|[0.69521339313490...| 0.0| | 0.0|[0.95562130296550...|[0.72224426108532...| 0.0| | 0.0|[0.96053938098069...|[0.72322978518508...| 0.0| | 0.0|[0.87098315358903...|[0.70495023062137...| 0.0| | 0.0|[0.77443835100467...|[0.68448021521658...| 0.0| | 0.0|[0.53386977898391...|[0.63038522333004...| 0.0| | 0.0|[0.16498869057610...|[0.54115385994965...| 0.0| | 1.0|[-1.1075440807967...|[0.24832903127093...| 1.0| | 1.0|[0.19258357005735...|[0.54799763770769...| 0.0| | 1.0|[-1.3603911351253...|[0.20417673993183...| 1.0| | 1.0|[-1.1373610788074...|[0.24280519951057...| 1.0| | 1.0|[-1.4255535531960...|[0.19379243771284...| 1.0| | 1.0|[-0.6975110683043...|[0.33236428684076...| 1.0| | 1.0|[-1.0688016208052...|[0.25563104955259...| 1.0| | 1.0|[-1.3304580361043...|[0.20908361076384...| 1.0| | 1.0|[-1.4777620244050...|[0.18576569028698...| 1.0| | 1.0|[-0.6527883422544...|[0.34236146614770...| 1.0| | 1.0|[-1.2591878919041...|[0.22111372378445...| 1.0| | 1.0|[-1.3297142200299...|[0.20920664049995...| 1.0| | 1.0|[-1.1442313835498...|[0.24154432118967...| 1.0| | 1.0|[-1.1515814813750...|[0.24020033758368...| 1.0| | 1.0|[-1.2171965183349...|[0.22843018753402...| 1.0| | 1.0|[-1.0647016435969...|[0.25641199019759...| 1.0| | 1.0|[-1.0212191034092...|[0.26479000200155...| 1.0| | 1.0|[-0.9644219447419...|[0.27599372195909...| 1.0| | 1.0|[-1.2522155017153...|[0.22231686049978...| 1.0| | 1.0|[-1.2909405855289...|[0.21569364951321...| 1.0| +-----+--------------------+--------------------+----------+
3. Naive Bayes¶
조건부 베이지안 확률에 따라 분류하는 모델
In [15]:
from pyspark.ml.classification import NaiveBayes
nb=NaiveBayes(featuresCol='features', labelCol='label', modelType='multinomial', predictionCol='prediction')
#nb = NaiveBayes(smoothing=1.0, modelType="multinomial")
model = nb.fit(train)
predictions=model.transform(test)
predictions.select('label', 'prediction').show(100)
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# compute accuracy on the test set
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",
metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy))
+-----+----------+ |label|prediction| +-----+----------+ | 0.0| 0.0| | 0.0| 0.0| | 0.0| 0.0| | 0.0| 0.0| | 0.0| 0.0| | 0.0| 0.0| | 0.0| 0.0| | 0.0| 0.0| | 0.0| 0.0| | 0.0| 0.0| | 0.0| 0.0| | 0.0| 0.0| | 0.0| 0.0| | 0.0| 0.0| | 0.0| 0.0| | 0.0| 0.0| | 0.0| 0.0| | 1.0| 1.0| | 1.0| 1.0| | 1.0| 1.0| | 1.0| 1.0| | 1.0| 1.0| | 1.0| 1.0| | 1.0| 1.0| | 1.0| 1.0| | 1.0| 1.0| | 1.0| 1.0| | 1.0| 1.0| | 1.0| 1.0| | 1.0| 1.0| | 1.0| 1.0| | 1.0| 1.0| | 1.0| 1.0| | 1.0| 1.0| | 1.0| 1.0| | 1.0| 1.0| | 1.0| 1.0| +-----+----------+ Test set accuracy = 1.0
4. Decision Tree¶
조건에 따라 분기하는 의사결정 모델
In [16]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
dfsvm = spark.read.format("libsvm").load("data/sample_libsvm_data.txt")
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(dfsvm)
# maxCategories > 4보다 크면 연속값
featureIndexer =\
VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(dfsvm)
23/12/19 04:44:06 WARN LibSVMFileFormat: 'numFeatures' option not specified, determining the number of features by going though the input. If you know the number in advance, please specify it via 'numFeatures' option to avoid the extra scan.
In [17]:
(train, test) = dfsvm.randomSplit([0.7, 0.3])
dt = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures")
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, dt])
model = pipeline.fit(train)
predictions = model.transform(test)
predictions.select("prediction", "indexedLabel", "features").show(100)
evaluator = MulticlassClassificationEvaluator(
labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g " % (1.0 - accuracy))
treeModel = model.stages[2]
print(treeModel)
+----------+------------+--------------------+ |prediction|indexedLabel| features| +----------+------------+--------------------+ | 1.0| 1.0|(692,[100,101,102...| | 1.0| 1.0|(692,[121,122,123...| | 1.0| 1.0|(692,[122,123,124...| | 1.0| 1.0|(692,[123,124,125...| | 1.0| 1.0|(692,[124,125,126...| | 1.0| 1.0|(692,[124,125,126...| | 1.0| 1.0|(692,[124,125,126...| | 1.0| 1.0|(692,[126,127,128...| | 1.0| 1.0|(692,[126,127,128...| | 1.0| 1.0|(692,[126,127,128...| | 1.0| 1.0|(692,[126,127,128...| | 1.0| 1.0|(692,[127,128,129...| | 1.0| 1.0|(692,[128,129,130...| | 1.0| 1.0|(692,[129,130,131...| | 1.0| 1.0|(692,[152,153,154...| | 0.0| 0.0|(692,[97,98,99,12...| | 0.0| 0.0|(692,[119,120,121...| | 0.0| 0.0|(692,[123,124,125...| | 0.0| 0.0|(692,[124,125,126...| | 0.0| 0.0|(692,[125,126,127...| | 0.0| 0.0|(692,[125,126,153...| | 0.0| 0.0|(692,[126,127,128...| | 0.0| 0.0|(692,[127,128,129...| | 0.0| 0.0|(692,[127,128,129...| | 0.0| 0.0|(692,[128,129,130...| | 0.0| 0.0|(692,[129,130,131...| | 0.0| 0.0|(692,[129,130,131...| | 0.0| 0.0|(692,[129,130,131...| | 0.0| 0.0|(692,[156,157,158...| | 0.0| 0.0|(692,[158,159,160...| | 0.0| 0.0|(692,[158,159,160...| +----------+------------+--------------------+ Test Error = 0 DecisionTreeClassificationModel: uid=DecisionTreeClassifier_951a69c954b3, depth=2, numNodes=5, numClasses=2, numFeatures=692
In [ ]:
반응형
'Data Science > 데이터분석 (Spark)' 카테고리의 다른 글
[빅데이터분석] ML을 위한 Data Transform과 단어빈도 (1) | 2024.01.05 |
---|---|
[빅데이터분석] 통계 이론과 지표 계산하기 (0) | 2023.12.11 |
[빅데이터분석] Spark Dataframe 생성과 Dataframe API 사용하기 (1) | 2023.11.29 |
[빅데이터분석] Spark RDD 다루기 : 데이터 집계와 Paired-RDD (0) | 2023.11.27 |
[빅데이터분석] Spark RDD 다루기 : 구조 생성, 데이터 처리 및 Map-Reduce (0) | 2023.11.27 |