[빅데이터분석] Spark Dataframe 생성과 Dataframe API 사용하기
<상명대학교 빅데이터분석 임좌상 교수님 수업을 듣고>
DataFrame¶
DataFrame은 행, 열로 구조화된 데이터구조이다. 관계형데이터베이스 RDB의 테이블이나 엑셀 sheet와 비슷하다. 또는 Pandas 또는 R을 사용해 보았다면 거기서 제공되는 DataFrame과 유사하다.
Dataframe은 RDD에 스키마를 얹어서 만든 개념이다. Spark의 DataFrame은 대용량 데이터를 처리하기 위해 만들어진 프레임워크로 분산해서 사용할 수 있게 고안되었다.
앞서 사용했던 RDD가 schema를 정하지 않는 것과 달리, DataFrame은 모델 schema를 설정해서 사용을 한다. '열'에 대해 명칭 및 데이터 Type을 가지고 있고, 이를 지켜서 데이터를 저장하게 된다.
Schema¶
Row는 DataFrame의 행으로, 데이터 요소항목을 묶어서 구성한다. Python list나 dict를 사용하여 Row를 구성할 수 있다.
해당 Notebook에서는 아래의 내용을 다룬다.
- Dataframe 생성
- Schema 정의하기
- Pandas와 Dataframe
- CSV, TSV, JSON을 Dataframe으로 만들기
- Dataframe API
- 컬럼 추가하기
- 컬럼명 변경하기
- UDF 함수 사용하기
- 행 추가하기
- 행과 열 선택하기(select, when, otherwise)
- GroupBy & 집계함수
- 집계함수
import os, sys
import pyspark
myConf=pyspark.SparkConf()
spark = pyspark.sql.SparkSession\
.builder\
.master("local")\
.appName("myApp")\
.config(conf=myConf)\
.getOrCreate()
23/11/29 06:12:13 WARN Utils: Your hostname, sojaehwiui-MacBookPro.local resolves to a loopback address: 127.0.0.1; using 172.30.1.29 instead (on interface en0) 23/11/29 06:12:13 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 23/11/29 06:12:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Dataframe 생성하기¶
myList=[
('1','kim, js', 170),
('1','lee, sm', 175),
('2','lim, yg',180),
('2','lee', 170)
]
myDf=spark.createDataFrame(myList)
myDf.columns # schema를 정하지 않았으므로, 열은 '_1', '_2'와 같이 명명
['_1', '_2', '_3']
myDf.printSchema() # 스키마 출력
print (myDf.take(1)) # 한 줄의 Row 출력
root |-- _1: string (nullable = true) |-- _2: string (nullable = true) |-- _3: long (nullable = true)
[Stage 0:> (0 + 1) / 1]
[Row(_1='1', _2='kim, js', _3=170)]
컬럼 명 설정¶
# 컬럼 명 설정하기
cols = ['year','name','height']
_myDf = spark.createDataFrame(myList, cols)
_myDf.columns
['year', 'name', 'height']
myDf.printSchema() # 스키마 출력
print (_myDf.take(1))
root |-- _1: string (nullable = true) |-- _2: string (nullable = true) |-- _3: long (nullable = true) [Row(year='1', name='kim, js', height=170)]
Row 에서 생성하기¶
from pyspark.sql import Row
Person = Row('year','name', 'height')
row1=Person('1','kim, js', 170)
myRows = [
row1,
Person('1', 'lee, sm', 175),
Person('2', 'lim, yg', 180),
Person('2', 'lee', 170)
]
myDf=spark.createDataFrame(myRows)
myDf.printSchema()
myDf.show()
root |-- year: string (nullable = true) |-- name: string (nullable = true) |-- height: long (nullable = true) +----+-------+------+ |year| name|height| +----+-------+------+ | 1|kim, js| 170| | 1|lee, sm| 175| | 2|lim, yg| 180| | 2| lee| 170| +----+-------+------+
# (Row에서 RDD 생성하기)
from pyspark.sql import Row
r1=Row(name="js1", age=10)
r2=Row(name="js2", age=20)
_myRdd=spark.sparkContext.parallelize( [r1,r2] )
스키마 정의후 생성하기¶
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import StringType, IntegerType
schema=StructType([
StructField("name", StringType(), True),
StructField("age", IntegerType(), True),
#StructField("created", TimestampType(), True)
])
_myDf=spark.createDataFrame(_myRdd, schema)
_myDf.printSchema()
_myDf.show()
root |-- name: string (nullable = true) |-- age: integer (nullable = true) +----+---+ |name|age| +----+---+ | js1| 10| | js2| 20| +----+---+
Pandas와 Dataframe¶
myDf.toPandas() #DF to Pandas
year | name | height | |
---|---|---|---|
0 | 1 | kim, js | 170 |
1 | 1 | lee, sm | 175 |
2 | 2 | lim, yg | 180 |
3 | 2 | lee | 170 |
import os
# _myDf=spark.createDataFrame([('1','kim, js', 170),('1','lee, sm', 175)])
# _myDf.write.format('com.databricks.spark.csv').save(os.path.join('data','_myDf.csv')) #Dataframe을 csv로 내보내기
# os.listdir("data/_myDf.csv")
_myDf.toPandas().to_csv(os.path.join('data','myDf.csv')) #Pandas를 이용하여 내보내기
import pandas as pd
icc = pd.DataFrame( { 'country': ['South Korea','Japan','Hong Kong'],'codes': [81, 82, 852] })
icc
country | codes | |
---|---|---|
0 | South Korea | 81 |
1 | Japan | 82 |
2 | Hong Kong | 852 |
CSV 읽어서 Dataframe 생성하기¶
RDD로 읽어서 Dataframe으로 변경¶
from pyspark.sql import Row
cfile= os.path.join("data", "ds_spark_2cols.csv")
lines = spark.sparkContext.textFile(cfile)
type(lines)
pyspark.rdd.RDD
_col12 = lines.map(lambda l: l.split(","))
col12 = _col12.map(lambda p: Row(col1=int(p[0].strip()), col2=int(p[1].strip())))
_myDf = spark.createDataFrame(col12)
_myDf.printSchema()
_myDf.collect()
root |-- col1: long (nullable = true) |-- col2: long (nullable = true)
[Row(col1=35, col2=2), Row(col1=40, col2=27), Row(col1=12, col2=38), Row(col1=15, col2=31), Row(col1=21, col2=1), Row(col1=14, col2=19), Row(col1=46, col2=1), Row(col1=10, col2=34), Row(col1=28, col2=3), Row(col1=48, col2=1), Row(col1=16, col2=2), Row(col1=30, col2=3), Row(col1=32, col2=2), Row(col1=48, col2=1), Row(col1=31, col2=2), Row(col1=22, col2=1), Row(col1=12, col2=3), Row(col1=39, col2=29), Row(col1=19, col2=37), Row(col1=25, col2=2)]
Dataframe format으로 load¶
%%writefile data/ds_spark.csv
1,2,3,4
11,22,33,44
111,222,333,444
Overwriting data/ds_spark.csv
df = spark\
.read\
.format('csv')\
.options(header='true', inferschema='true', delimiter=',')\
.load(os.path.join('data','ds_spark.csv'))
df.show()
df.printSchema()
+---+---+---+---+ | 1| 2| 3| 4| +---+---+---+---+ | 11| 22| 33| 44| |111|222|333|444| +---+---+---+---+ root |-- 1: integer (nullable = true) |-- 2: integer (nullable = true) |-- 3: integer (nullable = true) |-- 4: integer (nullable = true)
df = spark\
.read\
.options(header='true', inferschema='true', delimiter=',')\
.csv(os.path.join('data', 'ds_spark.csv'))
df.show()
df.printSchema()
+---+---+---+---+ | 1| 2| 3| 4| +---+---+---+---+ | 11| 22| 33| 44| |111|222|333|444| +---+---+---+---+ root |-- 1: integer (nullable = true) |-- 2: integer (nullable = true) |-- 3: integer (nullable = true) |-- 4: integer (nullable = true)
TSV 파일 읽기¶
tsv (tab-separated values)는 Tab으로 분리된 파일을 말한다. '\t'이 포함되어 있는 경우, 혹시 string으로 데이터타잎을 설정하기도 한다 (과거 Spark 버전에서)
%%writefile data/ds_spark_heightweight.txt
1 65.78 112.99
2 71.52 136.49
3 69.40 153.03
4 68.22 142.34
5 67.79 144.30
6 68.70 123.30
7 69.80 141.49
8 70.01 136.46
9 67.90 112.37
10 66.78 120.67
11 66.49 127.45
12 67.62 114.14
13 68.30 125.61
14 67.12 122.46
15 68.28 116.09
16 71.09 140.00
17 66.46 129.50
18 68.65 142.97
19 71.23 137.90
20 67.13 124.04
21 67.83 141.28
22 68.88 143.54
23 63.48 97.90
24 68.42 129.50
25 67.63 141.85
26 67.21 129.72
27 70.84 142.42
28 67.49 131.55
29 66.53 108.33
30 65.44 113.89
31 69.52 103.30
32 65.81 120.75
33 67.82 125.79
34 70.60 136.22
35 71.80 140.10
36 69.21 128.75
37 66.80 141.80
38 67.66 121.23
39 67.81 131.35
40 64.05 106.71
41 68.57 124.36
42 65.18 124.86
43 69.66 139.67
44 67.97 137.37
45 65.98 106.45
46 68.67 128.76
47 66.88 145.68
48 67.70 116.82
49 69.82 143.62
50 69.09 134.93
Overwriting data/ds_spark_heightweight.txt
from pyspark.sql.types import *
_tRdd=spark.sparkContext\
.textFile(os.path.join('data','ds_spark_heightweight.txt'))
# 탭으로 분리하거나
tRdd=_tRdd.map(lambda x:x.split('\t'))
tRdd.take(1)
[['1', '65.78', '112.99']]
# Whitespace로 자동 분리됨 어차피
_tRddSplitted = _tRdd.map(lambda x:x.split())
_tRddSplitted.take(1)
[['1', '65.78', '112.99']]
# 탭으로 분리하면서 형변환
tRdd=_tRdd.map(lambda line:[float(x) for x in line.split('\t')])
tRdd.take(1)
[[1.0, 65.78, 112.99]]
TSV to Dataframe¶
tDfNamed = spark.createDataFrame(tRdd, ["id","weight","height"])
tDfNamed.printSchema()
tDfNamed.take(1)
root |-- id: double (nullable = true) |-- weight: double (nullable = true) |-- height: double (nullable = true)
[Row(id=1.0, weight=65.78, height=112.99)]
# 스키마 명시하기
mySchema = StructType([
StructField("id", DoubleType(), True),
StructField("weight", DoubleType(), True),
StructField("height", DoubleType(), True)
])
myDf=spark.createDataFrame(tRdd, mySchema)
myDf.printSchema()
myDf.take(1)
root |-- id: double (nullable = true) |-- weight: double (nullable = true) |-- height: double (nullable = true)
[Row(id=1.0, weight=65.78, height=112.99)]
TSV를 텍스트로 읽어 DF 만들기¶
tDftxt = spark.read.text(os.path.join('data','ds_spark_heightweight.txt'))
tDftxt.printSchema()
root |-- value: string (nullable = true)
from pyspark.sql.functions import split
split_col = split(tDftxt['value'], '\t')
tDftxt = tDftxt.withColumn('weight', split_col.getItem(1))
tDftxt = tDftxt.withColumn('height', split_col.getItem(2))
tDftxt.show()
+-----------------+------+------+ | value|weight|height| +-----------------+------+------+ | 1\t65.78\t112.99| 65.78|112.99| | 2\t71.52\t136.49| 71.52|136.49| | 3\t69.40\t153.03| 69.40|153.03| | 4\t68.22\t142.34| 68.22|142.34| | 5\t67.79\t144.30| 67.79|144.30| | 6\t68.70\t123.30| 68.70|123.30| | 7\t69.80\t141.49| 69.80|141.49| | 8\t70.01\t136.46| 70.01|136.46| | 9\t67.90\t112.37| 67.90|112.37| |10\t66.78\t120.67| 66.78|120.67| |11\t66.49\t127.45| 66.49|127.45| |12\t67.62\t114.14| 67.62|114.14| |13\t68.30\t125.61| 68.30|125.61| |14\t67.12\t122.46| 67.12|122.46| |15\t68.28\t116.09| 68.28|116.09| |16\t71.09\t140.00| 71.09|140.00| |17\t66.46\t129.50| 66.46|129.50| |18\t68.65\t142.97| 68.65|142.97| |19\t71.23\t137.90| 71.23|137.90| |20\t67.13\t124.04| 67.13|124.04| +-----------------+------+------+ only showing top 20 rows
CSV 함수로 TSV 읽기¶
tDf = spark\
.read\
.options(header='false', inferschema='true', delimiter='\t')\
.csv(os.path.join('data', 'ds_spark_heightweight.txt'))
tDf.show()
+---+-----+------+ |_c0| _c1| _c2| +---+-----+------+ | 1|65.78|112.99| | 2|71.52|136.49| | 3| 69.4|153.03| | 4|68.22|142.34| | 5|67.79| 144.3| | 6| 68.7| 123.3| | 7| 69.8|141.49| | 8|70.01|136.46| | 9| 67.9|112.37| | 10|66.78|120.67| | 11|66.49|127.45| | 12|67.62|114.14| | 13| 68.3|125.61| | 14|67.12|122.46| | 15|68.28|116.09| | 16|71.09| 140.0| | 17|66.46| 129.5| | 18|68.65|142.97| | 19|71.23| 137.9| | 20|67.13|124.04| +---+-----+------+ only showing top 20 rows
JSON 파일에서 DataFrame 생성하기¶
import pandas as pd
jfile= os.path.join('src','ds_twitter_seoul_3.json')
tweetDf= spark.read.json(jfile)
23/11/29 06:12:20 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
tweetDf.printSchema()
root |-- contributors: string (nullable = true) |-- coordinates: struct (nullable = true) | |-- coordinates: array (nullable = true) | | |-- element: double (containsNull = true) | |-- type: string (nullable = true) |-- created_at: string (nullable = true) |-- entities: struct (nullable = true) | |-- hashtags: array (nullable = true) | | |-- element: struct (containsNull = true) | | | |-- indices: array (nullable = true) | | | | |-- element: long (containsNull = true) | | | |-- text: string (nullable = true) | |-- media: array (nullable = true) | | |-- element: struct (containsNull = true) | | | |-- display_url: string (nullable = true) | | | |-- expanded_url: string (nullable = true) | | | |-- id: long (nullable = true) | | | |-- id_str: string (nullable = true) | | | |-- indices: array (nullable = true) | | | | |-- element: long (containsNull = true) | | | |-- media_url: string (nullable = true) | | | |-- media_url_https: string (nullable = true) | | | |-- sizes: struct (nullable = true) | | | | |-- large: struct (nullable = true) | | | | | |-- h: long (nullable = true) | | | | | |-- resize: string (nullable = true) | | | | | |-- w: long (nullable = true) | | | | |-- medium: struct (nullable = true) | | | | | |-- h: long (nullable = true) | | | | | |-- resize: string (nullable = true) | | | | | |-- w: long (nullable = true) | | | | |-- small: struct (nullable = true) | | | | | |-- h: long (nullable = true) | | | | | |-- resize: string (nullable = true) | | | | | |-- w: long (nullable = true) | | | | |-- thumb: struct (nullable = true) | | | | | |-- h: long (nullable = true) | | | | | |-- resize: string (nullable = true) | | | | | |-- w: long (nullable = true) | | | |-- source_status_id: long (nullable = true) | | | |-- source_status_id_str: string (nullable = true) | | | |-- source_user_id: long (nullable = true) | | | |-- source_user_id_str: string (nullable = true) | | | |-- type: string (nullable = true) | | | |-- url: string (nullable = true) | |-- symbols: array (nullable = true) | | |-- element: string (containsNull = true) | |-- urls: array (nullable = true) | | |-- element: struct (containsNull = true) | | | |-- display_url: string (nullable = true) | | | |-- expanded_url: string (nullable = true) | | | |-- indices: array (nullable = true) | | | | |-- element: long (containsNull = true) | | | |-- url: string (nullable = true) | |-- user_mentions: array (nullable = true) | | |-- element: struct (containsNull = true) | | | |-- id: long (nullable = true) | | | |-- id_str: string (nullable = true) | | | |-- indices: array (nullable = true) | | | | |-- element: long (containsNull = true) | | | |-- name: string (nullable = true) | | | |-- screen_name: string (nullable = true) |-- extended_entities: struct (nullable = true) | |-- media: array (nullable = true) | | |-- element: struct (containsNull = true) | | | |-- additional_media_info: struct (nullable = true) | | | | |-- description: string (nullable = true) | | | | |-- embeddable: boolean (nullable = true) | | | | |-- monetizable: boolean (nullable = true) | | | | |-- source_user: struct (nullable = true) | | | | | |-- contributors_enabled: boolean (nullable = true) | | | | | |-- created_at: string (nullable = true) | | | | | |-- default_profile: boolean (nullable = true) | | | | | |-- default_profile_image: boolean (nullable = true) | | | | | |-- description: string (nullable = true) | | | | | |-- entities: struct (nullable = true) | | | | | | |-- description: struct (nullable = true) | | | | | | | |-- urls: array (nullable = true) | | | | | | | | |-- element: struct (containsNull = true) | | | | | | | | | |-- display_url: string (nullable = true) | | | | | | | | | |-- expanded_url: string (nullable = true) | | | | | | | | | |-- indices: array (nullable = true) | | | | | | | | | | |-- element: long (containsNull = true) | | | | | | | | | |-- url: string (nullable = true) | | | | | | |-- url: struct (nullable = true) | | | | | | | |-- urls: array (nullable = true) | | | | | | | | |-- element: struct (containsNull = true) | | | | | | | | | |-- display_url: string (nullable = true) | | | | | | | | | |-- expanded_url: string (nullable = true) | | | | | | | | | |-- indices: array (nullable = true) | | | | | | | | | | |-- element: long (containsNull = true) | | | | | | | | | |-- url: string (nullable = true) | | | | | |-- favourites_count: long (nullable = true) | | | | | |-- follow_request_sent: boolean (nullable = true) | | | | | |-- followers_count: long (nullable = true) | | | | | |-- following: boolean (nullable = true) | | | | | |-- friends_count: long (nullable = true) | | | | | |-- geo_enabled: boolean (nullable = true) | | | | | |-- has_extended_profile: boolean (nullable = true) | | | | | |-- id: long (nullable = true) | | | | | |-- id_str: string (nullable = true) | | | | | |-- is_translation_enabled: boolean (nullable = true) | | | | | |-- is_translator: boolean (nullable = true) | | | | | |-- lang: string (nullable = true) | | | | | |-- listed_count: long (nullable = true) | | | | | |-- location: string (nullable = true) | | | | | |-- name: string (nullable = true) | | | | | |-- notifications: boolean (nullable = true) | | | | | |-- profile_background_color: string (nullable = true) | | | | | |-- profile_background_image_url: string (nullable = true) | | | | | |-- profile_background_image_url_https: string (nullable = true) | | | | | |-- profile_background_tile: boolean (nullable = true) | | | | | |-- profile_banner_url: string (nullable = true) | | | | | |-- profile_image_url: string (nullable = true) | | | | | |-- profile_image_url_https: string (nullable = true) | | | | | |-- profile_link_color: string (nullable = true) | | | | | |-- profile_sidebar_border_color: string (nullable = true) | | | | | |-- profile_sidebar_fill_color: string (nullable = true) | | | | | |-- profile_text_color: string (nullable = true) | | | | | |-- profile_use_background_image: boolean (nullable = true) | | | | | |-- protected: boolean (nullable = true) | | | | | |-- screen_name: string (nullable = true) | | | | | |-- statuses_count: long (nullable = true) | | | | | |-- time_zone: string (nullable = true) | | | | | |-- translator_type: string (nullable = true) | | | | | |-- url: string (nullable = true) | | | | | |-- utc_offset: long (nullable = true) | | | | | |-- verified: boolean (nullable = true) | | | | |-- title: string (nullable = true) | | | |-- display_url: string (nullable = true) | | | |-- expanded_url: string (nullable = true) | | | |-- id: long (nullable = true) | | | |-- id_str: string (nullable = true) | | | |-- indices: array (nullable = true) | | | | |-- element: long (containsNull = true) | | | |-- media_url: string (nullable = true) | | | |-- media_url_https: string (nullable = true) | | | |-- sizes: struct (nullable = true) | | | | |-- large: struct (nullable = true) | | | | | |-- h: long (nullable = true) | | | | | |-- resize: string (nullable = true) | | | | | |-- w: long (nullable = true) | | | | |-- medium: struct (nullable = true) | | | | | |-- h: long (nullable = true) | | | | | |-- resize: string (nullable = true) | | | | | |-- w: long (nullable = true) | | | | |-- small: struct (nullable = true) | | | | | |-- h: long (nullable = true) | | | | | |-- resize: string (nullable = true) | | | | | |-- w: long (nullable = true) | | | | |-- thumb: struct (nullable = true) | | | | | |-- h: long (nullable = true) | | | | | |-- resize: string (nullable = true) | | | | | |-- w: long (nullable = true) | | | |-- source_status_id: long (nullable = true) | | | |-- source_status_id_str: string (nullable = true) | | | |-- source_user_id: long (nullable = true) | | | |-- source_user_id_str: string (nullable = true) | | | |-- type: string (nullable = true) | | | |-- url: string (nullable = true) | | | |-- video_info: struct (nullable = true) | | | | |-- aspect_ratio: array (nullable = true) | | | | | |-- element: long (containsNull = true) | | | | |-- duration_millis: long (nullable = true) | | | | |-- variants: array (nullable = true) | | | | | |-- element: struct (containsNull = true) | | | | | | |-- bitrate: long (nullable = true) | | | | | | |-- content_type: string (nullable = true) | | | | | | |-- url: string (nullable = true) |-- favorite_count: long (nullable = true) |-- favorited: boolean (nullable = true) |-- geo: struct (nullable = true) | |-- coordinates: array (nullable = true) | | |-- element: double (containsNull = true) | |-- type: string (nullable = true) |-- id: long (nullable = true) |-- id_str: string (nullable = true) |-- in_reply_to_screen_name: string (nullable = true) |-- in_reply_to_status_id: long (nullable = true) |-- in_reply_to_status_id_str: string (nullable = true) |-- in_reply_to_user_id: long (nullable = true) |-- in_reply_to_user_id_str: string (nullable = true) |-- is_quote_status: boolean (nullable = true) |-- lang: string (nullable = true) |-- metadata: struct (nullable = true) | |-- iso_language_code: string (nullable = true) | |-- result_type: string (nullable = true) |-- place: struct (nullable = true) | |-- bounding_box: struct (nullable = true) | | |-- coordinates: array (nullable = true) | | | |-- element: array (containsNull = true) | | | | |-- element: array (containsNull = true) | | | | | |-- element: double (containsNull = true) | | |-- type: string (nullable = true) | |-- contained_within: array (nullable = true) | | |-- element: string (containsNull = true) | |-- country: string (nullable = true) | |-- country_code: string (nullable = true) | |-- full_name: string (nullable = true) | |-- id: string (nullable = true) | |-- name: string (nullable = true) | |-- place_type: string (nullable = true) | |-- url: string (nullable = true) |-- possibly_sensitive: boolean (nullable = true) |-- quoted_status: struct (nullable = true) | |-- contributors: string (nullable = true) | |-- coordinates: struct (nullable = true) | | |-- coordinates: array (nullable = true) | | | |-- element: double (containsNull = true) | | |-- type: string (nullable = true) | |-- created_at: string (nullable = true) | |-- entities: struct (nullable = true) | | |-- hashtags: array (nullable = true) | | | |-- element: struct (containsNull = true) | | | | |-- indices: array (nullable = true) | | | | | |-- element: long (containsNull = true) | | | | |-- text: string (nullable = true) | | |-- media: array (nullable = true) | | | |-- element: struct (containsNull = true) | | | | |-- display_url: string (nullable = true) | | | | |-- expanded_url: string (nullable = true) | | | | |-- id: long (nullable = true) | | | | |-- id_str: string (nullable = true) | | | | |-- indices: array (nullable = true) | | | | | |-- element: long (containsNull = true) | | | | |-- media_url: string (nullable = true) | | | | |-- media_url_https: string (nullable = true) | | | | |-- sizes: struct (nullable = true) | | | | | |-- large: struct (nullable = true) | | | | | | |-- h: long (nullable = true) | | | | | | |-- resize: string (nullable = true) | | | | | | |-- w: long (nullable = true) | | | | | |-- medium: struct (nullable = true) | | | | | | |-- h: long (nullable = true) | | | | | | |-- resize: string (nullable = true) | | | | | | |-- w: long (nullable = true) | | | | | |-- small: struct (nullable = true) | | | | | | |-- h: long (nullable = true) | | | | | | |-- resize: string (nullable = true) | | | | | | |-- w: long (nullable = true) | | | | | |-- thumb: struct (nullable = true) | | | | | | |-- h: long (nullable = true) | | | | | | |-- resize: string (nullable = true) | | | | | | |-- w: long (nullable = true) | | | | |-- source_status_id: long (nullable = true) | | | | |-- source_status_id_str: string (nullable = true) | | | | |-- source_user_id: long (nullable = true) | | | | |-- source_user_id_str: string (nullable = true) | | | | |-- type: string (nullable = true) | | | | |-- url: string (nullable = true) | | |-- symbols: array (nullable = true) | | | |-- element: string (containsNull = true) | | |-- urls: array (nullable = true) | | | |-- element: struct (containsNull = true) | | | | |-- display_url: string (nullable = true) | | | | |-- expanded_url: string (nullable = true) | | | | |-- indices: array (nullable = true) | | | | | |-- element: long (containsNull = true) | | | | |-- url: string (nullable = true) | | |-- user_mentions: array (nullable = true) | | | |-- element: struct (containsNull = true) | | | | |-- id: long (nullable = true) | | | | |-- id_str: string (nullable = true) | | | | |-- indices: array (nullable = true) | | | | | |-- element: long (containsNull = true) | | | | |-- name: string (nullable = true) | | | | |-- screen_name: string (nullable = true) | |-- extended_entities: struct (nullable = true) | | |-- media: array (nullable = true) | | | |-- element: struct (containsNull = true) | | | | |-- display_url: string (nullable = true) | | | | |-- expanded_url: string (nullable = true) | | | | |-- id: long (nullable = true) | | | | |-- id_str: string (nullable = true) | | | | |-- indices: array (nullable = true) | | | | | |-- element: long (containsNull = true) | | | | |-- media_url: string (nullable = true) | | | | |-- media_url_https: string (nullable = true) | | | | |-- sizes: struct (nullable = true) | | | | | |-- large: struct (nullable = true) | | | | | | |-- h: long (nullable = true) | | | | | | |-- resize: string (nullable = true) | | | | | | |-- w: long (nullable = true) | | | | | |-- medium: struct (nullable = true) | | | | | | |-- h: long (nullable = true) | | | | | | |-- resize: string (nullable = true) | | | | | | |-- w: long (nullable = true) | | | | | |-- small: struct (nullable = true) | | | | | | |-- h: long (nullable = true) | | | | | | |-- resize: string (nullable = true) | | | | | | |-- w: long (nullable = true) | | | | | |-- thumb: struct (nullable = true) | | | | | | |-- h: long (nullable = true) | | | | | | |-- resize: string (nullable = true) | | | | | | |-- w: long (nullable = true) | | | | |-- source_status_id: long (nullable = true) | | | | |-- source_status_id_str: string (nullable = true) | | | | |-- source_user_id: long (nullable = true) | | | | |-- source_user_id_str: string (nullable = true) | | | | |-- type: string (nullable = true) | | | | |-- url: string (nullable = true) | |-- favorite_count: long (nullable = true) | |-- favorited: boolean (nullable = true) | |-- geo: struct (nullable = true) | | |-- coordinates: array (nullable = true) | | | |-- element: double (containsNull = true) | | |-- type: string (nullable = true) | |-- id: long (nullable = true) | |-- id_str: string (nullable = true) | |-- in_reply_to_screen_name: string (nullable = true) | |-- in_reply_to_status_id: long (nullable = true) | |-- in_reply_to_status_id_str: string (nullable = true) | |-- in_reply_to_user_id: long (nullable = true) | |-- in_reply_to_user_id_str: string (nullable = true) | |-- is_quote_status: boolean (nullable = true) | |-- lang: string (nullable = true) | |-- metadata: struct (nullable = true) | | |-- iso_language_code: string (nullable = true) | | |-- result_type: string (nullable = true) | |-- place: struct (nullable = true) | | |-- bounding_box: struct (nullable = true) | | | |-- coordinates: array (nullable = true) | | | | |-- element: array (containsNull = true) | | | | | |-- element: array (containsNull = true) | | | | | | |-- element: double (containsNull = true) | | | |-- type: string (nullable = true) | | |-- contained_within: array (nullable = true) | | | |-- element: string (containsNull = true) | | |-- country: string (nullable = true) | | |-- country_code: string (nullable = true) | | |-- full_name: string (nullable = true) | | |-- id: string (nullable = true) | | |-- name: string (nullable = true) | | |-- place_type: string (nullable = true) | | |-- url: string (nullable = true) | |-- possibly_sensitive: boolean (nullable = true) | |-- quoted_status_id: long (nullable = true) | |-- quoted_status_id_str: string (nullable = true) | |-- retweet_count: long (nullable = true) | |-- retweeted: boolean (nullable = true) | |-- source: string (nullable = true) | |-- text: string (nullable = true) | |-- truncated: boolean (nullable = true) | |-- user: struct (nullable = true) | | |-- contributors_enabled: boolean (nullable = true) | | |-- created_at: string (nullable = true) | | |-- default_profile: boolean (nullable = true) | | |-- default_profile_image: boolean (nullable = true) | | |-- description: string (nullable = true) | | |-- entities: struct (nullable = true) | | | |-- description: struct (nullable = true) | | | | |-- urls: array (nullable = true) | | | | | |-- element: struct (containsNull = true) | | | | | | |-- display_url: string (nullable = true) | | | | | | |-- expanded_url: string (nullable = true) | | | | | | |-- indices: array (nullable = true) | | | | | | | |-- element: long (containsNull = true) | | | | | | |-- url: string (nullable = true) | | | |-- url: struct (nullable = true) | | | | |-- urls: array (nullable = true) | | | | | |-- element: struct (containsNull = true) | | | | | | |-- display_url: string (nullable = true) | | | | | | |-- expanded_url: string (nullable = true) | | | | | | |-- indices: array (nullable = true) | | | | | | | |-- element: long (containsNull = true) | | | | | | |-- url: string (nullable = true) | | |-- favourites_count: long (nullable = true) | | |-- follow_request_sent: boolean (nullable = true) | | |-- followers_count: long (nullable = true) | | |-- following: boolean (nullable = true) | | |-- friends_count: long (nullable = true) | | |-- geo_enabled: boolean (nullable = true) | | |-- has_extended_profile: boolean (nullable = true) | | |-- id: long (nullable = true) | | |-- id_str: string (nullable = true) | | |-- is_translation_enabled: boolean (nullable = true) | | |-- is_translator: boolean (nullable = true) | | |-- lang: string (nullable = true) | | |-- listed_count: long (nullable = true) | | |-- location: string (nullable = true) | | |-- name: string (nullable = true) | | |-- notifications: boolean (nullable = true) | | |-- profile_background_color: string (nullable = true) | | |-- profile_background_image_url: string (nullable = true) | | |-- profile_background_image_url_https: string (nullable = true) | | |-- profile_background_tile: boolean (nullable = true) | | |-- profile_banner_url: string (nullable = true) | | |-- profile_image_url: string (nullable = true) | | |-- profile_image_url_https: string (nullable = true) | | |-- profile_link_color: string (nullable = true) | | |-- profile_sidebar_border_color: string (nullable = true) | | |-- profile_sidebar_fill_color: string (nullable = true) | | |-- profile_text_color: string (nullable = true) | | |-- profile_use_background_image: boolean (nullable = true) | | |-- protected: boolean (nullable = true) | | |-- screen_name: string (nullable = true) | | |-- statuses_count: long (nullable = true) | | |-- time_zone: string (nullable = true) | | |-- translator_type: string (nullable = true) | | |-- url: string (nullable = true) | | |-- utc_offset: long (nullable = true) | | |-- verified: boolean (nullable = true) |-- quoted_status_id: long (nullable = true) |-- quoted_status_id_str: string (nullable = true) |-- retweet_count: long (nullable = true) |-- retweeted: boolean (nullable = true) |-- retweeted_status: struct (nullable = true) | |-- contributors: string (nullable = true) | |-- coordinates: struct (nullable = true) | | |-- coordinates: array (nullable = true) | | | |-- element: double (containsNull = true) | | |-- type: string (nullable = true) | |-- created_at: string (nullable = true) | |-- entities: struct (nullable = true) | | |-- hashtags: array (nullable = true) | | | |-- element: struct (containsNull = true) | | | | |-- indices: array (nullable = true) | | | | | |-- element: long (containsNull = true) | | | | |-- text: string (nullable = true) | | |-- media: array (nullable = true) | | | |-- element: struct (containsNull = true) | | | | |-- display_url: string (nullable = true) | | | | |-- expanded_url: string (nullable = true) | | | | |-- id: long (nullable = true) | | | | |-- id_str: string (nullable = true) | | | | |-- indices: array (nullable = true) | | | | | |-- element: long (containsNull = true) | | | | |-- media_url: string (nullable = true) | | | | |-- media_url_https: string (nullable = true) | | | | |-- sizes: struct (nullable = true) | | | | | |-- large: struct (nullable = true) | | | | | | |-- h: long (nullable = true) | | | | | | |-- resize: string (nullable = true) | | | | | | |-- w: long (nullable = true) | | | | | |-- medium: struct (nullable = true) | | | | | | |-- h: long (nullable = true) | | | | | | |-- resize: string (nullable = true) | | | | | | |-- w: long (nullable = true) | | | | | |-- small: struct (nullable = true) | | | | | | |-- h: long (nullable = true) | | | | | | |-- resize: string (nullable = true) | | | | | | |-- w: long (nullable = true) | | | | | |-- thumb: struct (nullable = true) | | | | | | |-- h: long (nullable = true) | | | | | | |-- resize: string (nullable = true) | | | | | | |-- w: long (nullable = true) | | | | |-- source_status_id: long (nullable = true) | | | | |-- source_status_id_str: string (nullable = true) | | | | |-- source_user_id: long (nullable = true) | | | | |-- source_user_id_str: string (nullable = true) | | | | |-- type: string (nullable = true) | | | | |-- url: string (nullable = true) | | |-- symbols: array (nullable = true) | | | |-- element: string (containsNull = true) | | |-- urls: array (nullable = true) | | | |-- element: struct (containsNull = true) | | | | |-- display_url: string (nullable = true) | | | | |-- expanded_url: string (nullable = true) | | | | |-- indices: array (nullable = true) | | | | | |-- element: long (containsNull = true) | | | | |-- url: string (nullable = true) | | |-- user_mentions: array (nullable = true) | | | |-- element: struct (containsNull = true) | | | | |-- id: long (nullable = true) | | | | |-- id_str: string (nullable = true) | | | | |-- indices: array (nullable = true) | | | | | |-- element: long (containsNull = true) | | | | |-- name: string (nullable = true) | | | | |-- screen_name: string (nullable = true) | |-- extended_entities: struct (nullable = true) | | |-- media: array (nullable = true) | | | |-- element: struct (containsNull = true) | | | | |-- additional_media_info: struct (nullable = true) | | | | | |-- description: string (nullable = true) | | | | | |-- embeddable: boolean (nullable = true) | | | | | |-- monetizable: boolean (nullable = true) | | | | | |-- title: string (nullable = true) | | | | |-- display_url: string (nullable = true) | | | | |-- expanded_url: string (nullable = true) | | | | |-- id: long (nullable = true) | | | | |-- id_str: string (nullable = true) | | | | |-- indices: array (nullable = true) | | | | | |-- element: long (containsNull = true) | | | | |-- media_url: string (nullable = true) | | | | |-- media_url_https: string (nullable = true) | | | | |-- sizes: struct (nullable = true) | | | | | |-- large: struct (nullable = true) | | | | | | |-- h: long (nullable = true) | | | | | | |-- resize: string (nullable = true) | | | | | | |-- w: long (nullable = true) | | | | | |-- medium: struct (nullable = true) | | | | | | |-- h: long (nullable = true) | | | | | | |-- resize: string (nullable = true) | | | | | | |-- w: long (nullable = true) | | | | | |-- small: struct (nullable = true) | | | | | | |-- h: long (nullable = true) | | | | | | |-- resize: string (nullable = true) | | | | | | |-- w: long (nullable = true) | | | | | |-- thumb: struct (nullable = true) | | | | | | |-- h: long (nullable = true) | | | | | | |-- resize: string (nullable = true) | | | | | | |-- w: long (nullable = true) | | | | |-- source_status_id: long (nullable = true) | | | | |-- source_status_id_str: string (nullable = true) | | | | |-- source_user_id: long (nullable = true) | | | | |-- source_user_id_str: string (nullable = true) | | | | |-- type: string (nullable = true) | | | | |-- url: string (nullable = true) | | | | |-- video_info: struct (nullable = true) | | | | | |-- aspect_ratio: array (nullable = true) | | | | | | |-- element: long (containsNull = true) | | | | | |-- duration_millis: long (nullable = true) | | | | | |-- variants: array (nullable = true) | | | | | | |-- element: struct (containsNull = true) | | | | | | | |-- bitrate: long (nullable = true) | | | | | | | |-- content_type: string (nullable = true) | | | | | | | |-- url: string (nullable = true) | |-- favorite_count: long (nullable = true) | |-- favorited: boolean (nullable = true) | |-- geo: struct (nullable = true) | | |-- coordinates: array (nullable = true) | | | |-- element: double (containsNull = true) | | |-- type: string (nullable = true) | |-- id: long (nullable = true) | |-- id_str: string (nullable = true) | |-- in_reply_to_screen_name: string (nullable = true) | |-- in_reply_to_status_id: long (nullable = true) | |-- in_reply_to_status_id_str: string (nullable = true) | |-- in_reply_to_user_id: long (nullable = true) | |-- in_reply_to_user_id_str: string (nullable = true) | |-- is_quote_status: boolean (nullable = true) | |-- lang: string (nullable = true) | |-- metadata: struct (nullable = true) | | |-- iso_language_code: string (nullable = true) | | |-- result_type: string (nullable = true) | |-- place: struct (nullable = true) | | |-- bounding_box: struct (nullable = true) | | | |-- coordinates: array (nullable = true) | | | | |-- element: array (containsNull = true) | | | | | |-- element: array (containsNull = true) | | | | | | |-- element: double (containsNull = true) | | | |-- type: string (nullable = true) | | |-- contained_within: array (nullable = true) | | | |-- element: string (containsNull = true) | | |-- country: string (nullable = true) | | |-- country_code: string (nullable = true) | | |-- full_name: string (nullable = true) | | |-- id: string (nullable = true) | | |-- name: string (nullable = true) | | |-- place_type: string (nullable = true) | | |-- url: string (nullable = true) | |-- possibly_sensitive: boolean (nullable = true) | |-- quoted_status: struct (nullable = true) | | |-- contributors: string (nullable = true) | | |-- coordinates: string (nullable = true) | | |-- created_at: string (nullable = true) | | |-- entities: struct (nullable = true) | | | |-- hashtags: array (nullable = true) | | | | |-- element: struct (containsNull = true) | | | | | |-- indices: array (nullable = true) | | | | | | |-- element: long (containsNull = true) | | | | | |-- text: string (nullable = true) | | | |-- media: array (nullable = true) | | | | |-- element: struct (containsNull = true) | | | | | |-- display_url: string (nullable = true) | | | | | |-- expanded_url: string (nullable = true) | | | | | |-- id: long (nullable = true) | | | | | |-- id_str: string (nullable = true) | | | | | |-- indices: array (nullable = true) | | | | | | |-- element: long (containsNull = true) | | | | | |-- media_url: string (nullable = true) | | | | | |-- media_url_https: string (nullable = true) | | | | | |-- sizes: struct (nullable = true) | | | | | | |-- large: struct (nullable = true) | | | | | | | |-- h: long (nullable = true) | | | | | | | |-- resize: string (nullable = true) | | | | | | | |-- w: long (nullable = true) | | | | | | |-- medium: struct (nullable = true) | | | | | | | |-- h: long (nullable = true) | | | | | | | |-- resize: string (nullable = true) | | | | | | | |-- w: long (nullable = true) | | | | | | |-- small: struct (nullable = true) | | | | | | | |-- h: long (nullable = true) | | | | | | | |-- resize: string (nullable = true) | | | | | | | |-- w: long (nullable = true) | | | | | | |-- thumb: struct (nullable = true) | | | | | | | |-- h: long (nullable = true) | | | | | | | |-- resize: string (nullable = true) | | | | | | | |-- w: long (nullable = true) | | | | | |-- type: string (nullable = true) | | | | | |-- url: string (nullable = true) | | | |-- symbols: array (nullable = true) | | | | |-- element: string (containsNull = true) | | | |-- urls: array (nullable = true) | | | | |-- element: struct (containsNull = true) | | | | | |-- display_url: string (nullable = true) | | | | | |-- expanded_url: string (nullable = true) | | | | | |-- indices: array (nullable = true) | | | | | | |-- element: long (containsNull = true) | | | | | |-- url: string (nullable = true) | | | |-- user_mentions: array (nullable = true) | | | | |-- element: struct (containsNull = true) | | | | | |-- id: long (nullable = true) | | | | | |-- id_str: string (nullable = true) | | | | | |-- indices: array (nullable = true) | | | | | | |-- element: long (containsNull = true) | | | | | |-- name: string (nullable = true) | | | | | |-- screen_name: string (nullable = true) | | |-- extended_entities: struct (nullable = true) | | | |-- media: array (nullable = true) | | | | |-- element: struct (containsNull = true) | | | | | |-- display_url: string (nullable = true) | | | | | |-- expanded_url: string (nullable = true) | | | | | |-- id: long (nullable = true) | | | | | |-- id_str: string (nullable = true) | | | | | |-- indices: array (nullable = true) | | | | | | |-- element: long (containsNull = true) | | | | | |-- media_url: string (nullable = true) | | | | | |-- media_url_https: string (nullable = true) | | | | | |-- sizes: struct (nullable = true) | | | | | | |-- large: struct (nullable = true) | | | | | | | |-- h: long (nullable = true) | | | | | | | |-- resize: string (nullable = true) | | | | | | | |-- w: long (nullable = true) | | | | | | |-- medium: struct (nullable = true) | | | | | | | |-- h: long (nullable = true) | | | | | | | |-- resize: string (nullable = true) | | | | | | | |-- w: long (nullable = true) | | | | | | |-- small: struct (nullable = true) | | | | | | | |-- h: long (nullable = true) | | | | | | | |-- resize: string (nullable = true) | | | | | | | |-- w: long (nullable = true) | | | | | | |-- thumb: struct (nullable = true) | | | | | | | |-- h: long (nullable = true) | | | | | | | |-- resize: string (nullable = true) | | | | | | | |-- w: long (nullable = true) | | | | | |-- type: string (nullable = true) | | | | | |-- url: string (nullable = true) | | |-- favorite_count: long (nullable = true) | | |-- favorited: boolean (nullable = true) | | |-- geo: string (nullable = true) | | |-- id: long (nullable = true) | | |-- id_str: string (nullable = true) | | |-- in_reply_to_screen_name: string (nullable = true) | | |-- in_reply_to_status_id: long (nullable = true) | | |-- in_reply_to_status_id_str: string (nullable = true) | | |-- in_reply_to_user_id: long (nullable = true) | | |-- in_reply_to_user_id_str: string (nullable = true) | | |-- is_quote_status: boolean (nullable = true) | | |-- lang: string (nullable = true) | | |-- metadata: struct (nullable = true) | | | |-- iso_language_code: string (nullable = true) | | | |-- result_type: string (nullable = true) | | |-- place: struct (nullable = true) | | | |-- bounding_box: struct (nullable = true) | | | | |-- coordinates: array (nullable = true) | | | | | |-- element: array (containsNull = true) | | | | | | |-- element: array (containsNull = true) | | | | | | | |-- element: double (containsNull = true) | | | | |-- type: string (nullable = true) | | | |-- contained_within: array (nullable = true) | | | | |-- element: string (containsNull = true) | | | |-- country: string (nullable = true) | | | |-- country_code: string (nullable = true) | | | |-- full_name: string (nullable = true) | | | |-- id: string (nullable = true) | | | |-- name: string (nullable = true) | | | |-- place_type: string (nullable = true) | | | |-- url: string (nullable = true) | | |-- possibly_sensitive: boolean (nullable = true) | | |-- quoted_status_id: long (nullable = true) | | |-- quoted_status_id_str: string (nullable = true) | | |-- retweet_count: long (nullable = true) | | |-- retweeted: boolean (nullable = true) | | |-- source: string (nullable = true) | | |-- text: string (nullable = true) | | |-- truncated: boolean (nullable = true) | | |-- user: struct (nullable = true) | | | |-- contributors_enabled: boolean (nullable = true) | | | |-- created_at: string (nullable = true) | | | |-- default_profile: boolean (nullable = true) | | | |-- default_profile_image: boolean (nullable = true) | | | |-- description: string (nullable = true) | | | |-- entities: struct (nullable = true) | | | | |-- description: struct (nullable = true) | | | | | |-- urls: array (nullable = true) | | | | | | |-- element: string (containsNull = true) | | | | |-- url: struct (nullable = true) | | | | | |-- urls: array (nullable = true) | | | | | | |-- element: struct (containsNull = true) | | | | | | | |-- display_url: string (nullable = true) | | | | | | | |-- expanded_url: string (nullable = true) | | | | | | | |-- indices: array (nullable = true) | | | | | | | | |-- element: long (containsNull = true) | | | | | | | |-- url: string (nullable = true) | | | |-- favourites_count: long (nullable = true) | | | |-- follow_request_sent: boolean (nullable = true) | | | |-- followers_count: long (nullable = true) | | | |-- following: boolean (nullable = true) | | | |-- friends_count: long (nullable = true) | | | |-- geo_enabled: boolean (nullable = true) | | | |-- has_extended_profile: boolean (nullable = true) | | | |-- id: long (nullable = true) | | | |-- id_str: string (nullable = true) | | | |-- is_translation_enabled: boolean (nullable = true) | | | |-- is_translator: boolean (nullable = true) | | | |-- lang: string (nullable = true) | | | |-- listed_count: long (nullable = true) | | | |-- location: string (nullable = true) | | | |-- name: string (nullable = true) | | | |-- notifications: boolean (nullable = true) | | | |-- profile_background_color: string (nullable = true) | | | |-- profile_background_image_url: string (nullable = true) | | | |-- profile_background_image_url_https: string (nullable = true) | | | |-- profile_background_tile: boolean (nullable = true) | | | |-- profile_banner_url: string (nullable = true) | | | |-- profile_image_url: string (nullable = true) | | | |-- profile_image_url_https: string (nullable = true) | | | |-- profile_link_color: string (nullable = true) | | | |-- profile_sidebar_border_color: string (nullable = true) | | | |-- profile_sidebar_fill_color: string (nullable = true) | | | |-- profile_text_color: string (nullable = true) | | | |-- profile_use_background_image: boolean (nullable = true) | | | |-- protected: boolean (nullable = true) | | | |-- screen_name: string (nullable = true) | | | |-- statuses_count: long (nullable = true) | | | |-- time_zone: string (nullable = true) | | | |-- translator_type: string (nullable = true) | | | |-- url: string (nullable = true) | | | |-- utc_offset: long (nullable = true) | | | |-- verified: boolean (nullable = true) | |-- quoted_status_id: long (nullable = true) | |-- quoted_status_id_str: string (nullable = true) | |-- retweet_count: long (nullable = true) | |-- retweeted: boolean (nullable = true) | |-- scopes: struct (nullable = true) | | |-- followers: boolean (nullable = true) | |-- source: string (nullable = true) | |-- text: string (nullable = true) | |-- truncated: boolean (nullable = true) | |-- user: struct (nullable = true) | | |-- contributors_enabled: boolean (nullable = true) | | |-- created_at: string (nullable = true) | | |-- default_profile: boolean (nullable = true) | | |-- default_profile_image: boolean (nullable = true) | | |-- description: string (nullable = true) | | |-- entities: struct (nullable = true) | | | |-- description: struct (nullable = true) | | | | |-- urls: array (nullable = true) | | | | | |-- element: struct (containsNull = true) | | | | | | |-- display_url: string (nullable = true) | | | | | | |-- expanded_url: string (nullable = true) | | | | | | |-- indices: array (nullable = true) | | | | | | | |-- element: long (containsNull = true) | | | | | | |-- url: string (nullable = true) | | | |-- url: struct (nullable = true) | | | | |-- urls: array (nullable = true) | | | | | |-- element: struct (containsNull = true) | | | | | | |-- display_url: string (nullable = true) | | | | | | |-- expanded_url: string (nullable = true) | | | | | | |-- indices: array (nullable = true) | | | | | | | |-- element: long (containsNull = true) | | | | | | |-- url: string (nullable = true) | | |-- favourites_count: long (nullable = true) | | |-- follow_request_sent: boolean (nullable = true) | | |-- followers_count: long (nullable = true) | | |-- following: boolean (nullable = true) | | |-- friends_count: long (nullable = true) | | |-- geo_enabled: boolean (nullable = true) | | |-- has_extended_profile: boolean (nullable = true) | | |-- id: long (nullable = true) | | |-- id_str: string (nullable = true) | | |-- is_translation_enabled: boolean (nullable = true) | | |-- is_translator: boolean (nullable = true) | | |-- lang: string (nullable = true) | | |-- listed_count: long (nullable = true) | | |-- location: string (nullable = true) | | |-- name: string (nullable = true) | | |-- notifications: boolean (nullable = true) | | |-- profile_background_color: string (nullable = true) | | |-- profile_background_image_url: string (nullable = true) | | |-- profile_background_image_url_https: string (nullable = true) | | |-- profile_background_tile: boolean (nullable = true) | | |-- profile_banner_url: string (nullable = true) | | |-- profile_image_url: string (nullable = true) | | |-- profile_image_url_https: string (nullable = true) | | |-- profile_link_color: string (nullable = true) | | |-- profile_sidebar_border_color: string (nullable = true) | | |-- profile_sidebar_fill_color: string (nullable = true) | | |-- profile_text_color: string (nullable = true) | | |-- profile_use_background_image: boolean (nullable = true) | | |-- protected: boolean (nullable = true) | | |-- screen_name: string (nullable = true) | | |-- statuses_count: long (nullable = true) | | |-- time_zone: string (nullable = true) | | |-- translator_type: string (nullable = true) | | |-- url: string (nullable = true) | | |-- utc_offset: long (nullable = true) | | |-- verified: boolean (nullable = true) |-- source: string (nullable = true) |-- text: string (nullable = true) |-- truncated: boolean (nullable = true) |-- user: struct (nullable = true) | |-- contributors_enabled: boolean (nullable = true) | |-- created_at: string (nullable = true) | |-- default_profile: boolean (nullable = true) | |-- default_profile_image: boolean (nullable = true) | |-- description: string (nullable = true) | |-- entities: struct (nullable = true) | | |-- description: struct (nullable = true) | | | |-- urls: array (nullable = true) | | | | |-- element: struct (containsNull = true) | | | | | |-- display_url: string (nullable = true) | | | | | |-- expanded_url: string (nullable = true) | | | | | |-- indices: array (nullable = true) | | | | | | |-- element: long (containsNull = true) | | | | | |-- url: string (nullable = true) | | |-- url: struct (nullable = true) | | | |-- urls: array (nullable = true) | | | | |-- element: struct (containsNull = true) | | | | | |-- display_url: string (nullable = true) | | | | | |-- expanded_url: string (nullable = true) | | | | | |-- indices: array (nullable = true) | | | | | | |-- element: long (containsNull = true) | | | | | |-- url: string (nullable = true) | |-- favourites_count: long (nullable = true) | |-- follow_request_sent: boolean (nullable = true) | |-- followers_count: long (nullable = true) | |-- following: boolean (nullable = true) | |-- friends_count: long (nullable = true) | |-- geo_enabled: boolean (nullable = true) | |-- has_extended_profile: boolean (nullable = true) | |-- id: long (nullable = true) | |-- id_str: string (nullable = true) | |-- is_translation_enabled: boolean (nullable = true) | |-- is_translator: boolean (nullable = true) | |-- lang: string (nullable = true) | |-- listed_count: long (nullable = true) | |-- location: string (nullable = true) | |-- name: string (nullable = true) | |-- notifications: boolean (nullable = true) | |-- profile_background_color: string (nullable = true) | |-- profile_background_image_url: string (nullable = true) | |-- profile_background_image_url_https: string (nullable = true) | |-- profile_background_tile: boolean (nullable = true) | |-- profile_banner_url: string (nullable = true) | |-- profile_image_url: string (nullable = true) | |-- profile_image_url_https: string (nullable = true) | |-- profile_link_color: string (nullable = true) | |-- profile_sidebar_border_color: string (nullable = true) | |-- profile_sidebar_fill_color: string (nullable = true) | |-- profile_text_color: string (nullable = true) | |-- profile_use_background_image: boolean (nullable = true) | |-- protected: boolean (nullable = true) | |-- screen_name: string (nullable = true) | |-- statuses_count: long (nullable = true) | |-- time_zone: string (nullable = true) | |-- translator_type: string (nullable = true) | |-- url: string (nullable = true) | |-- utc_offset: long (nullable = true) | |-- verified: boolean (nullable = true)
tweetDf.select('id', 'text').show(10)
+------------------+--------------------+ | id| text| +------------------+--------------------+ |801657325836763136|RT @soompi: #SEVE...| |801657325677400064|RT @hobuing: also...| |801657307637678080|Retweeted The Seo...| |801657305628430336|RT @heochan_th: 1...| |801657297449586688|RT @heochan_th: 1...| |801657287697895424|go to seoul, sout...| |801657280760397824|RT @heochan_th: [...| |801657276788523008|RT @lesliehung: @...| |801657268177604608|RT @almightykeybe...| |801657258400616449|RT @always_yena: ...| +------------------+--------------------+ only showing top 10 rows
문제: 월드컵 데이터 JSON의 URL에서 DataFrame 생성¶
url에서 데이터를 직접 읽어 DataFrame을 생성하는 방법은 지원되지 않고 있다. requests 라이브러리를 사용하여, url에서 데이터를 가져오고, DataFrame을 생성하여 보자.
import requests
r=requests.get("https://raw.githubusercontent.com/jokecamp/FootballData/master/World%20Cups/all-world-cup-players.json")
print("Type of Response: ", type(r))
Type of Response: <class 'requests.models.Response'>
wc=r.json()
print (type(wc), type(wc[0]))
<class 'list'> <class 'dict'>
wcDf=spark.createDataFrame(wc)
wcDf.take(1)
[Row(Club='Club Atlético Talleres de Remedios de Escalada', ClubCountry='Argentina', Competition='World Cup', DateOfBirth='1905-5-5', FullName='Ãngel Bossio', IsCaptain=False, Number='', Position='GK', Team='Argentina', Year=1930)]
Dataframe 결측값 처리¶
cols = wcDf.columns
cols.remove('IsCaptain')
from pyspark.sql.functions import isnan, isnull, when, count, col
wcDf.select("Competition", isnan("Competition")).show(2)
+-----------+------------------+ |Competition|isnan(Competition)| +-----------+------------------+ | World Cup| false| | World Cup| false| +-----------+------------------+ only showing top 2 rows
wcDf.select("Competition", isnull("Competition").alias("Competition aliased")).show(2)
+-----------+-------------------+ |Competition|Competition aliased| +-----------+-------------------+ | World Cup| false| | World Cup| false| +-----------+-------------------+ only showing top 2 rows
from pyspark.sql.functions import isnan, isnull, when, count, col
wcDf.select([count(when(isnan(c) | isnull(c), c)).alias(c) for c in cols]).show()
+----+-----------+-----------+-----------+--------+------+--------+----+----+ |Club|ClubCountry|Competition|DateOfBirth|FullName|Number|Position|Team|Year| +----+-----------+-----------+-----------+--------+------+--------+----+----+ | 0| 0| 0| 0| 0| 0| 0| 0| 0| +----+-----------+-----------+-----------+--------+------+--------+----+----+
Datetime 형변환¶
from datetime import datetime
print (datetime.strptime("11/01/2023", '%m/%d/%Y'))
2023-11-01 00:00:00
from pyspark.sql.functions import udf
from pyspark.sql.types import DateType
toDate = udf(lambda x: datetime.strptime(x, '%m/%d/%Y'), DateType())
wcDf = wcDf.withColumn('date1', toDate(wcDf['DateOfBirth']))
# wcDf.take(1) 그 결과를 출력하면 아래와 같이 ValueError가 출력된다.
# pyspark to_date()
wcDf = wcDf.drop('date1')
from pyspark.sql.functions import to_date
_wcDfCasted=wcDf.withColumn('date2', to_date(wcDf['DateOfBirth'], 'yyyy-MM-dd'))
from pyspark.sql.types import DateType
wcDfCasted = _wcDfCasted.withColumn('date3', _wcDfCasted['DateOfBirth'].cast(DateType()))
wcDfCasted = wcDfCasted.withColumn('NumberInt', wcDfCasted['Number'].cast("integer"))
from pyspark.sql import functions as F
wcDfCasted = _wcDfCasted.withColumn('date3', F.col('DateOfBirth').astype(DateType()))
wcDfCasted = wcDfCasted.withColumn('NumberInt', F.col('Number').astype("integer"))
wcDfCasted.printSchema()
root |-- Club: string (nullable = true) |-- ClubCountry: string (nullable = true) |-- Competition: string (nullable = true) |-- DateOfBirth: string (nullable = true) |-- FullName: string (nullable = true) |-- IsCaptain: boolean (nullable = true) |-- Number: string (nullable = true) |-- Position: string (nullable = true) |-- Team: string (nullable = true) |-- Year: long (nullable = true) |-- date2: date (nullable = true) |-- date3: date (nullable = true) |-- NumberInt: integer (nullable = true)
spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")
wcDfCasted.take(1)
[Row(Club='Club Atlético Talleres de Remedios de Escalada', ClubCountry='Argentina', Competition='World Cup', DateOfBirth='1905-5-5', FullName='Ãngel Bossio', IsCaptain=False, Number='', Position='GK', Team='Argentina', Year=1930, date2=datetime.date(1905, 5, 5), date3=datetime.date(1905, 5, 5), NumberInt=None)]
코드 정리¶
import os, sys
import requests
from pyspark.sql import Row
from pyspark.sql.types import DateType
import pyspark
os.environ["PYSPARK_PYTHON"]=sys.executable #"/usr/bin/python3"
os.environ["PYSPARK_DRIVER_PYTHON"]=sys.executable #"/usr/bin/python3"
myConf=pyspark.SparkConf()
spark = pyspark.sql.SparkSession\
.builder\
.master("local")\
.appName("myApp")\
.config(conf=myConf)\
.getOrCreate()
spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")
# read url json
r=requests.get("https://raw.githubusercontent.com/jokecamp/FootballData/master/World%20Cups/all-world-cup-players.json")
wc=r.json()
# read dictionary into Row
wcDf = spark.createDataFrame(Row(**x) for x in wc)
# cast DoB string into date, Number string into integer
wcDfCasted = wcDf.withColumn('date3', wcDf['DateOfBirth'].cast(DateType()))
wcDfCasted = wcDfCasted.withColumn('NumberInt', wcDfCasted['Number'].cast("integer"))
wcDfCasted.take(1)
[Row(Competition='World Cup', Year=1930, Team='Argentina', Number='', Position='GK', FullName='Ãngel Bossio', Club='Club Atlético Talleres de Remedios de Escalada', ClubCountry='Argentina', DateOfBirth='1905-5-5', IsCaptain=False, date3=datetime.date(1905, 5, 5), NumberInt=None)]
wcDfCasted.show(1)
+-----------+----+---------+------+--------+------------+--------------------+-----------+-----------+---------+----------+---------+ |Competition|Year| Team|Number|Position| FullName| Club|ClubCountry|DateOfBirth|IsCaptain| date3|NumberInt| +-----------+----+---------+------+--------+------------+--------------------+-----------+-----------+---------+----------+---------+ | World Cup|1930|Argentina| | GK|Ãngel Bossio|Club Atlético Ta...| Argentina| 1905-5-5| false|1905-05-05| null| +-----------+----+---------+------+--------+------------+--------------------+-----------+-----------+---------+----------+---------+ only showing top 1 row
from pyspark.sql import Row
cfile= os.path.join("data", "ds_spark_2cols.csv")
lines = spark.sparkContext.textFile(cfile)
_col12 = lines.map(lambda l: l.split(","))
col12 = _col12.map(lambda p: Row(col1=int(p[0].strip()), col2=int(p[1].strip())))
_myDf = spark.createDataFrame(col12)
_myDf.printSchema()
_myDf.collect()
root |-- col1: long (nullable = true) |-- col2: long (nullable = true)
[Row(col1=35, col2=2), Row(col1=40, col2=27), Row(col1=12, col2=38), Row(col1=15, col2=31), Row(col1=21, col2=1), Row(col1=14, col2=19), Row(col1=46, col2=1), Row(col1=10, col2=34), Row(col1=28, col2=3), Row(col1=48, col2=1), Row(col1=16, col2=2), Row(col1=30, col2=3), Row(col1=32, col2=2), Row(col1=48, col2=1), Row(col1=31, col2=2), Row(col1=22, col2=1), Row(col1=12, col2=3), Row(col1=39, col2=29), Row(col1=19, col2=37), Row(col1=25, col2=2)]
# Parquet 파일 읽기/쓰기
# _myDf.write.parquet(os.path.join("data","people.parquet"))
# _pDf=spark.read.parquet(os.path.join("data","people.parquet"))
# _pDf.show()
DataFrame API 사용하기¶
# 빈 데이터프레임 생성
from pyspark.sql.types import StructType
schema = StructType([])
emptyDf = spark.createDataFrame(spark.sparkContext.emptyRDD(), schema)
# Range : 일련의 수를 가진 데이터프레임 생성
tmp = spark.range(0, 10, 2)
tmp.show()
+---+ | id| +---+ | 0| | 2| | 4| | 6| | 8| +---+
# 컬럼을 생성 후 current_date() 출력
from pyspark.sql import functions as F
spark.range(1).select(F.current_date()).show()
+--------------+ |current_date()| +--------------+ | 2023-11-29| +--------------+
# 컬럼 명 변경 + Timestamp 출력
spark.range(1).select(F.unix_timestamp().alias("current_timestamp")).show()
+-----------------+ |current_timestamp| +-----------------+ | 1701205944| +-----------------+
spark.range(1).select(F.unix_timestamp().alias("current_timestamp"))\
.rdd.collect()[0]['current_timestamp']
1701205944
컬럼 추가하기¶
tDf = spark\
.read\
.options(header='false', inferschema='true', delimiter='\t')\
.csv(os.path.join('data', 'ds_spark_heightweight.txt'))
tDf.columns
['_c0', '_c1', '_c2']
컬럼 추가 두가지 방법¶
tDf = tDf.withColumn("id", tDf._c0.cast("integer"))
tDf = tDf.withColumn("height", tDf['_c1'].cast("double"))
tDf = tDf.withColumn("weight", tDf['_c2'].cast("double"))
from pyspark.sql import functions as F
tDf = tDf.withColumn("id", F.col('_c0').astype("integer"))
tDf = tDf.withColumn("height", F.col('_c1').astype("double"))
tDf = tDf.withColumn("weight", F.col('_c2').astype("double"))
tDf.printSchema()
root |-- _c0: integer (nullable = true) |-- _c1: double (nullable = true) |-- _c2: double (nullable = true) |-- id: integer (nullable = true) |-- height: double (nullable = true) |-- weight: double (nullable = true)
tDf = tDf.drop('_c0').drop('_c1').drop('_c2') #원래 있던 컬럼(중복) 제거
tDf.printSchema()
root |-- id: integer (nullable = true) |-- height: double (nullable = true) |-- weight: double (nullable = true)
tDf.take(1)
[Row(id=1, height=65.78, weight=112.99)]
사용자 정의 함수 UDF(User Defined Functions)¶
UDF User Defined Functions는 사용자 정의함수로서 DataFrame의 withColumn() 함수와 같이 사용되어 새로운 컬럼을만드는 경우 유용하다.
보통 함수와 같이 함수명과 반환 값을 미리 정의해서 lambda 함수 또는 다른 함수를 사용할 수 있다.
다른 함수를 직접 사용할 수 없고, udf()를 통해서 호출해야 한다.
udf()는 코드가 복잡한 경우에 함수를 분리해서 처리하면 유용하겠다.
myDf=spark.createDataFrame([('1','kim, js',170),('1','lee, sm',175), ('2','lim, yg',180), ('2','lee',170)])
myDf.printSchema()
root |-- _1: string (nullable = true) |-- _2: string (nullable = true) |-- _3: long (nullable = true)
myDf.show()
+---+-------+---+ | _1| _2| _3| +---+-------+---+ | 1|kim, js|170| | 1|lee, sm|175| | 2|lim, yg|180| | 2| lee|170| +---+-------+---+
# 컬럼 명 변경
myDf=myDf.withColumnRenamed('_1','year').withColumnRenamed('_2','name')\
.withColumnRenamed('_3','height')
myDf.printSchema()
root |-- year: string (nullable = true) |-- name: string (nullable = true) |-- height: long (nullable = true)
# UDF 함수로 대문자 변환
def uppercase(s):
return s.upper()
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf
upperUdf = udf(uppercase, StringType())
myDf = myDf.withColumn("nameUpper", upperUdf(myDf['name']))
myDf.show()
[Stage 45:> (0 + 1) / 1]
+----+-------+------+---------+ |year| name|height|nameUpper| +----+-------+------+---------+ | 1|kim, js| 170| KIM, JS| | 1|lee, sm| 175| LEE, SM| | 2|lim, yg| 180| LIM, YG| | 2| lee| 170| LEE| +----+-------+------+---------+
# UDF 함수로 Casting
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType
toDoublefunc = udf(lambda x: float(x), DoubleType())
myDf = myDf.withColumn("heightD", toDoublefunc(myDf.height))
myDf.dtypes
[('year', 'string'), ('name', 'string'), ('height', 'bigint'), ('nameUpper', 'string'), ('heightD', 'double')]
# UDF 함수로 Casting
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
toint=udf(lambda x:int(x), IntegerType())
myDf=myDf.withColumn("yearI", toint(myDf['year']))
myDf.printSchema()
root |-- year: string (nullable = true) |-- name: string (nullable = true) |-- height: long (nullable = true) |-- nameUpper: string (nullable = true) |-- heightD: double (nullable = true) |-- yearI: integer (nullable = true)
# 데이터 선택
myDf.select('yearI').show()
+-----+ |yearI| +-----+ | 1| | 1| | 2| | 2| +-----+
# UDF 함수 : WithColumn 조건에 따른 컬럼생성
# height가 175 이상 or 미만에 따라 두 개로 분류
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf
height_udf = udf(lambda height: "taller" if height >=175 else "shorter", StringType())
heightDf=myDf.withColumn("height>175", height_udf(myDf.heightD))
heightDf.show()
+----+-------+------+---------+-------+-----+----------+ |year| name|height|nameUpper|heightD|yearI|height>175| +----+-------+------+---------+-------+-----+----------+ | 1|kim, js| 170| KIM, JS| 170.0| 1| shorter| | 1|lee, sm| 175| LEE, SM| 175.0| 1| taller| | 2|lim, yg| 180| LIM, YG| 180.0| 2| taller| | 2| lee| 170| LEE| 170.0| 2| shorter| +----+-------+------+---------+-------+-----+----------+
컬럼명 변경¶
tDf=tDf.withColumnRenamed('id','ID')
tDf.show(3)
+---+------+------+ | ID|height|weight| +---+------+------+ | 1| 65.78|112.99| | 2| 71.52|136.49| | 3| 69.4|153.03| +---+------+------+ only showing top 3 rows
그래프¶
_weightRdd=tDf.rdd.map(lambda fields:fields[1]).collect()
_heightRdd=tDf.rdd.map(lambda fields:fields[2]).collect()
import numpy as np
print (np.array(_weightRdd)[:5])
print (np.array(_heightRdd)[:5])
[65.78 71.52 69.4 68.22 67.79] [112.99 136.49 153.03 142.34 144.3 ]
%matplotlib inline
import numpy as np
import matplotlib.pyplot as plt
plt.plot(np.array(_weightRdd), np.array(_heightRdd),'o')
plt.show()
height = tDf.select("height").toPandas()
height.describe()
height | |
---|---|
count | 50.00000 |
mean | 68.05240 |
std | 1.82398 |
min | 63.48000 |
25% | 66.94000 |
50% | 67.86500 |
75% | 69.18000 |
max | 71.80000 |
type(height) #height는 dataframe
pandas.core.frame.DataFrame
import matplotlib.pyplot as plt
import seaborn as sns
fig = plt.figure(figsize=(20, 10))
ax1 = fig.add_subplot(1, 2, 1) # subplot location
ax1 = plt.boxplot(height) #BoxPlot 유형
ax2 = fig.add_subplot(1, 2, 2)
ax2 = sns.violinplot(data=height) #Violinplot 유형
Aggregate Functions¶
tDf.agg({"height":"count"}).show()
tDf.agg({"height":"kurtosis"}).show()
tDf.agg({"height":"avg"}).show()
+-------------+ |count(height)| +-------------+ | 50| +-------------+ +--------------------+ | kurtosis(height)| +--------------------+ |-0.00944222604387468| +--------------------+ +-----------------+ | avg(height)| +-----------------+ |68.05240000000002| +-----------------+
# Pyspark API F function
from pyspark.sql import functions as F
tDf.agg(F.min("height")).show()
+-----------+ |min(height)| +-----------+ | 63.48| +-----------+
컬럼 조회 select¶
# 컬럼명의로 직접 show 불가능
myDf['name']# .show()
Column<'name'>
F.col('name') #col()함수로 컬럼을 지정할 수 있음
Column<'name'>
# List of Row
_name=myDf.select('name')
_name.show()
+-------+ | name| +-------+ |kim, js| |lee, sm| |lim, yg| | lee| +-------+
# 여러 컬럼 조회
_name=myDf.select('name', 'height').show()
+-------+------+ | name|height| +-------+------+ |kim, js| 170| |lee, sm| 175| |lim, yg| 180| | lee| 170| +-------+------+
# Unpacking(*)하여 선택
cols = ['name', 'height']
myDf.select(*cols).show()
+-------+------+ | name|height| +-------+------+ |kim, js| 170| |lee, sm| 175| |lim, yg| 180| | lee| 170| +-------+------+
# Select LIKE
myDf.select("name", "height", myDf.name.like("%lee%")).show()
+-------+------+---------------+ | name|height|name LIKE %lee%| +-------+------+---------------+ |kim, js| 170| false| |lee, sm| 175| true| |lim, yg| 180| false| | lee| 170| true| +-------+------+---------------+
# Select startswith
myDf.select("name", "height", myDf.name.startswith("kim")).show()
from pyspark.sql import functions as F
myDf.filter(F.col('name').startswith("kim")).show()
+-------+------+---------------------+ | name|height|startswith(name, kim)| +-------+------+---------------------+ |kim, js| 170| true| |lee, sm| 175| false| |lim, yg| 180| false| | lee| 170| false| +-------+------+---------------------+ +----+-------+------+---------+-------+-----+ |year| name|height|nameUpper|heightD|yearI| +----+-------+------+---------+-------+-----+ | 1|kim, js| 170| KIM, JS| 170.0| 1| +----+-------+------+---------+-------+-----+
# Select endwith
myDf.select("name", "height", myDf.name.endswith("lee")).show()
from pyspark.sql import functions as F
myDf.filter(F.col('name').endswith("lee")).show()
+-------+------+-------------------+ | name|height|endswith(name, lee)| +-------+------+-------------------+ |kim, js| 170| false| |lee, sm| 175| false| |lim, yg| 180| false| | lee| 170| true| +-------+------+-------------------+ +----+----+------+---------+-------+-----+ |year|name|height|nameUpper|heightD|yearI| +----+----+------+---------+-------+-----+ | 2| lee| 170| LEE| 170.0| 2| +----+----+------+---------+-------+-----+
컬럼 이름 변경 : Alias¶
myDf1 = myDf.alias("myDf1")
myDf1.select(myDf1.name.substr(1,3).alias("short name")).show(3)
myDf1.select(F.col("name").substr(1,3).alias("short name")).show(3)
+----------+ |short name| +----------+ | kim| | lee| | lim| +----------+ only showing top 3 rows +----------+ |short name| +----------+ | kim| | lee| | lim| +----------+ only showing top 3 rows
행과 열을 선택 select, when, otherwise¶
UDF와 비슷하게 사용 가능하다. 편한 쪽으로 사용하자.
from pyspark.sql.functions import when
myDf.select("height", when(myDf.height < 175, 1).otherwise(0)).show()
+------+------------------------------------------+ |height|CASE WHEN (height < 175) THEN 1 ELSE 0 END| +------+------------------------------------------+ | 170| 1| | 175| 0| | 180| 0| | 170| 1| +------+------------------------------------------+
# 다항 조건 설정 : when 체이닝
from pyspark.sql.functions import when
myDf.select("height",
when(myDf.height <= 170, 1)
.when((myDf.height > 170) & (myDf.height <= 175), 2)
.otherwise(0)
)\
.show()
+------+--------------------------------------------------------------------------------------------+ |height|CASE WHEN (height <= 170) THEN 1 WHEN ((height > 170) AND (height <= 175)) THEN 2 ELSE 0 END| +------+--------------------------------------------------------------------------------------------+ | 170| 1| | 175| 2| | 180| 0| | 170| 1| +------+--------------------------------------------------------------------------------------------+
# alias로 이름 변경
from pyspark.sql.functions import when
myDf.select("height", (when(myDf.height < 175, 1).otherwise(0)).alias('<175')).show()
+------+----+ |height|<175| +------+----+ | 170| 1| | 175| 0| | 180| 0| | 170| 1| +------+----+
# withColumn으로 조건계산 행 추가
_myDf = myDf.withColumn('how tall', when(myDf['heightD'] >175.0, ">175").otherwise("<175"))
_myDf.show()
+----+-------+------+---------+-------+-----+--------+ |year| name|height|nameUpper|heightD|yearI|how tall| +----+-------+------+---------+-------+-----+--------+ | 1|kim, js| 170| KIM, JS| 170.0| 1| <175| | 1|lee, sm| 175| LEE, SM| 175.0| 1| <175| | 2|lim, yg| 180| LIM, YG| 180.0| 2| >175| | 2| lee| 170| LEE| 170.0| 2| <175| +----+-------+------+---------+-------+-----+--------+
행과 열을 선택 where, select¶
DataFrame은 관계형데이터베이스의 테이블과 매우 유사하다. SQL 명령을 사용하듯이 where(), select(), groupby() 함수를 사용할 수 있다.
myDf.where(myDf['height'] < 175).show()
myDf.where(myDf['height'] < 175)\
.select(myDf['name'], myDf['height']).show()
+----+-------+------+---------+-------+-----+ |year| name|height|nameUpper|heightD|yearI| +----+-------+------+---------+-------+-----+ | 1|kim, js| 170| KIM, JS| 170.0| 1| | 2| lee| 170| LEE| 170.0| 2| +----+-------+------+---------+-------+-----+ +-------+------+ | name|height| +-------+------+ |kim, js| 170| | lee| 170| +-------+------+
# filter : 조건에 따라 행 선택
myDf.filter(myDf['height'] > 175).show()
myDf.filter(col('height') > 175).show()
+----+-------+------+---------+-------+-----+ |year| name|height|nameUpper|heightD|yearI| +----+-------+------+---------+-------+-----+ | 2|lim, yg| 180| LIM, YG| 180.0| 2| +----+-------+------+---------+-------+-----+ +----+-------+------+---------+-------+-----+ |year| name|height|nameUpper|heightD|yearI| +----+-------+------+---------+-------+-----+ | 2|lim, yg| 180| LIM, YG| 180.0| 2| +----+-------+------+---------+-------+-----+
# 정규식 사용 Select
from pyspark.sql.functions import *
_heightDf = myDf.withColumn('nameNew', regexp_replace('name', 'lee', 'lim'))
_heightDf.show()
+----+-------+------+---------+-------+-----+-------+ |year| name|height|nameUpper|heightD|yearI|nameNew| +----+-------+------+---------+-------+-----+-------+ | 1|kim, js| 170| KIM, JS| 170.0| 1|kim, js| | 1|lee, sm| 175| LEE, SM| 175.0| 1|lim, sm| | 2|lim, yg| 180| LIM, YG| 180.0| 2|lim, yg| | 2| lee| 170| LEE| 170.0| 2| lim| +----+-------+------+---------+-------+-----+-------+
GroupBy¶
# groupBy
myDf.groupby(myDf['year'])
# 첫 컬럼 학년을 groupby()해서 최대값 max()를 구해보자.
myDf.groupby(myDf['year']).max().show()
+----+-----------+------------+----------+ |year|max(height)|max(heightD)|max(yearI)| +----+-----------+------------+----------+ | 1| 175| 175.0| 1| | 2| 180| 180.0| 2| +----+-----------+------------+----------+
# GroupBy With 집계함수
# 지원하는 함수는 avg, max, min, sum, count이다.
# pivot : 포지션 별로 집계
myDf.groupBy('year').agg({"heightD":"avg"}).show() #년별 평균키
wcDf.groupBy(wcDf.ClubCountry).count().show() # 국가별 인원수
wcDf.groupBy('ClubCountry').pivot('Position').count().show() # 국가별 포지션별 인원수
+----+------------+ |year|avg(heightD)| +----+------------+ | 1| 172.5| | 2| 175.0| +----+------------+ +-----------+-----+ |ClubCountry|count| +-----------+-----+ | England | 4| | Paraguay| 93| | Russia| 51| | POL| 11| | BRA| 27| | Senegal| 1| | Sweden| 154| | Colombia| 1| | FRA| 155| | ALG| 8| | England | 1| | RUS | 1| | Turkey| 65| | Zaire| 22| | Iraq| 22| | Germany| 206| | RSA| 16| | ITA| 224| | UKR| 38| | GHA| 8| +-----------+-----+ only showing top 20 rows +-----------+----+----+----+----+----+ |ClubCountry| | DF| FW| GK| MF| +-----------+----+----+----+----+----+ | England |null|null| 2|null| 2| | Paraguay|null| 26| 37| 10| 20| | Russia|null| 20| 11| 4| 16| | POL|null| 2| 2| 3| 4| | BRA|null| 7| 5| 4| 11| | Senegal|null|null|null| 1|null| | Sweden|null| 40| 47| 25| 42| | Colombia|null|null| 1|null|null| | ALG|null| 2|null| 6|null| | FRA|null| 46| 41| 18| 50| | England |null|null|null|null| 1| | RUS |null|null|null| 1|null| | Turkey|null| 20| 13| 12| 20| | Zaire|null| 6| 5| 3| 8| | Iraq|null| 6| 4| 3| 9| | Germany|null| 64| 51| 16| 75| | RSA|null| 5| 2| 3| 6| | UKR|null| 13| 7| 4| 14| | ITA|null| 74| 42| 19| 89| | CMR|null| 1| 1| 1|null| +-----------+----+----+----+----+----+ only showing top 20 rows
# F 함수로 집계하기
from pyspark.sql import functions as F
myDf.agg(F.min(myDf.heightD),F.max(myDf.heightD),F.avg(myDf.heightD)\
,F.sum(myDf.heightD)).show()
+------------+------------+------------+------------+ |min(heightD)|max(heightD)|avg(heightD)|sum(heightD)| +------------+------------+------------+------------+ | 170.0| 180.0| 173.75| 695.0| +------------+------------+------------+------------+
행 추가¶
행을 추가하려면, DataFrame을 서로 합치는 방법으로 가능하다. 추가할 행으로 DataFrame을 만들고, union() 함수로 합쳐야 한다.
myDf = myDf.drop('nameUpper').drop('heightD').drop('yearI')
toAppendDf = spark.createDataFrame([Row(4, 1, "choi, js")])
myDf.show()
+----+-------+------+ |year| name|height| +----+-------+------+ | 1|kim, js| 170| | 1|lee, sm| 175| | 2|lim, yg| 180| | 2| lee| 170| +----+-------+------+
_myDf = myDf.union(toAppendDf)
_myDf.show()
+----+-------+--------+ |year| name| height| +----+-------+--------+ | 1|kim, js| 170| | 1|lee, sm| 175| | 2|lim, yg| 180| | 2| lee| 170| | 4| 1|choi, js| +----+-------+--------+
#파티션 개수
myDf.rdd.getNumPartitions()
#파티션 개수 설정
_myDf = myDf.repartition(4)
print(_myDf.rdd.getNumPartitions())
#파티션 줄임
_myDf2 = _myDf.coalesce(2)
print(_myDf2.rdd.getNumPartitions())
4 2
# 통계 요약
myDf.describe().show()
+-------+------------------+-------+------------------+ |summary| year| name| height| +-------+------------------+-------+------------------+ | count| 4| 4| 4| | mean| 1.5| null| 173.75| | stddev|0.5773502691896257| null|4.7871355387816905| | min| 1|kim, js| 170| | max| 2|lim, yg| 180| +-------+------------------+-------+------------------+
문제: 년별 분기별 대여건수¶
서울시 열린데이터광장의 서울특별시공공자전거 일별 대여건수(2018~2019.03).csv를 분석해보자 데이터는 일자별로, 대여건수이이고, 몇 줄만 출력해보면 다음과 같다.
#헤더가 있는 데이터
_bicycle = spark.read.format('csv')\
.options(header='true', inferschema='true').load('data/seoulBicycleDailyCount_2018_201903.csv')
_bicycle.printSchema()
root |-- date: string (nullable = true) |-- count: integer (nullable = true)
_bicycle.show(5)
_bicycle.count()
+----------+------+ | date| count| +----------+------+ |2018-01-01| 4950| |2018-01-02| 7136| |2018-01-03| 7156| |2018-01-04| 7102| |2018-01-05| 7705| +----------+------+ only showing top 5 rows
455
1. 데이터 전처리¶
bicycle=_bicycle\
.withColumnRenamed("date", "Date")\
.withColumnRenamed(" count", "Count")
bicycle=bicycle.withColumn("year", bicycle.Date.substr(1, 4))
bicycle.printSchema()
root |-- Date: string (nullable = true) |-- Count: integer (nullable = true) |-- year: string (nullable = true)
bicycle=bicycle.withColumn("month",bicycle.Date.substr(6, 2))
bicycle.show(5)
+----------+-----+----+-----+ | Date|Count|year|month| +----------+-----+----+-----+ |2018-01-01| 4950|2018| 01| |2018-01-02| 7136|2018| 01| |2018-01-03| 7156|2018| 01| |2018-01-04| 7102|2018| 01| |2018-01-05| 7705|2018| 01| +----------+-----+----+-----+ only showing top 5 rows
# F 함수로 처리하는법
# 일단 앞에꺼 삭제
columns_to_drop = ['year','month']
df = bicycle.drop(*columns_to_drop)
# F함수로 이용
import pyspark.sql.functions as F
bicycle = bicycle\
.withColumn('year', F.year('date'))\
.withColumn('month', F.month('date'))
bicycle.printSchema()
bicycle.show(5)
root |-- Date: string (nullable = true) |-- Count: integer (nullable = true) |-- year: integer (nullable = true) |-- month: integer (nullable = true) +----------+-----+----+-----+ | Date|Count|year|month| +----------+-----+----+-----+ |2018-01-01| 4950|2018| 1| |2018-01-02| 7136|2018| 1| |2018-01-03| 7156|2018| 1| |2018-01-04| 7102|2018| 1| |2018-01-05| 7705|2018| 1| +----------+-----+----+-----+ only showing top 5 rows
bicycle.filter(bicycle.month == 2).show(3)
+----------+-----+----+-----+ | Date|Count|year|month| +----------+-----+----+-----+ |2018-02-01| 5821|2018| 2| |2018-02-02| 6557|2018| 2| |2018-02-03| 3499|2018| 2| +----------+-----+----+-----+ only showing top 3 rows
년도별 대여건수 합계¶
bicycle.groupBy('year').agg({"count":"sum"}).show()
+----+----------+ |year|sum(count)| +----+----------+ |2018| 10124874| |2019| 1871935| +----+----------+
분기별 대여건수 합계¶
# UDF 함수 만들기
def classifyQuarter(s):
q=""
s=int(s)
if 1<=s and s< 4:
q="Q1"
elif 4<=s and s<7:
q="Q2"
elif 7<=s and s<10:
q="Q3"
elif 10<=s and s<=12:
q="Q4"
else:
q="no"
return q
quarter_udf = udf(classifyQuarter, StringType())
# 날짜를 분기로 분류하여, 분기 별 며칠씩 있는지 파악
bicycle = bicycle.withColumn("quarter", quarter_udf(bicycle.month))
bicycle.groupBy('quarter').count().show()
bicycle.show(5)
+-------+-----+ |quarter|count| +-------+-----+ | Q2| 91| | Q1| 180| | Q3| 92| | Q4| 92| +-------+-----+ +----------+-----+----+-----+-------+ | Date|Count|year|month|quarter| +----------+-----+----+-----+-------+ |2018-01-01| 4950|2018| 1| Q1| |2018-01-02| 7136|2018| 1| Q1| |2018-01-03| 7156|2018| 1| Q1| |2018-01-04| 7102|2018| 1| Q1| |2018-01-05| 7705|2018| 1| Q1| +----------+-----+----+-----+-------+ only showing top 5 rows
# 분기 별 Count를 집계
bicycle.groupBy('quarter').agg({"count":"sum"}).show()
+-------+----------+ |quarter|sum(count)| +-------+----------+ | Q2| 2860617| | Q1| 2667704| | Q3| 3585513| | Q4| 2882975| +-------+----------+
년도별, 월별(분기별) 대여 건수 합계¶
bicycle.groupBy('year').pivot('quarter').agg({"count":"sum"}).show()
+----+-------+-------+-------+-------+ |year| Q1| Q2| Q3| Q4| +----+-------+-------+-------+-------+ |2018| 795769|2860617|3585513|2882975| |2019|1871935| null| null| null| +----+-------+-------+-------+-------+
bicycle.groupBy('year').pivot('month').agg({"count":"sum"}).show()
+----+------+------+------+------+------+-------+-------+-------+-------+-------+------+------+ |year| 1| 2| 3| 4| 5| 6| 7| 8| 9| 10| 11| 12| +----+------+------+------+------+------+-------+-------+-------+-------+-------+------+------+ |2018|164367|168741|462661|687885|965609|1207123|1100015|1037505|1447993|1420621|961532|500822| |2019|495573|471543|904819| null| null| null| null| null| null| null| null| null| +----+------+------+------+------+------+-------+-------+-------+-------+-------+------+------+
Pandas pivot으로 분리¶
import pandas as pd
import numpy as np
bicycleP = bicycle.toPandas()
bicycleP.info()
<class 'pandas.core.frame.DataFrame'> RangeIndex: 455 entries, 0 to 454 Data columns (total 5 columns): # Column Non-Null Count Dtype --- ------ -------------- ----- 0 Date 455 non-null object 1 Count 455 non-null int32 2 year 455 non-null int32 3 month 455 non-null int32 4 quarter 455 non-null object dtypes: int32(3), object(2) memory usage: 12.6+ KB
#bicycleP.groupby('year').aggregate({'Count':np.sum})
bicycleP.groupby('year').aggregate({'Count':'sum'})
Count | |
---|---|
year | |
2018 | 10124874 |
2019 | 1871935 |
pd.pivot_table(bicycleP, values = 'Count', index = ['year'], columns = ['month'], aggfunc= 'sum')
month | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | 10 | 11 | 12 |
---|---|---|---|---|---|---|---|---|---|---|---|---|
year | ||||||||||||
2018 | 164367.0 | 168741.0 | 462661.0 | 687885.0 | 965609.0 | 1207123.0 | 1100015.0 | 1037505.0 | 1447993.0 | 1420621.0 | 961532.0 | 500822.0 |
2019 | 495573.0 | 471543.0 | 904819.0 | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN |
bicycleP2018=bicycleP[bicycleP['year']==2018]
bicycleP2018byQ = pd.pivot_table(bicycleP2018, values = 'Count', index = ['year'], columns = ['quarter'], aggfunc= 'sum')
bicycleP2018byQ.iloc[:,0:4]
quarter | Q1 | Q2 | Q3 | Q4 |
---|---|---|---|---|
year | ||||
2018 | 795769 | 2860617 | 3585513 | 2882975 |
년,월 별 대여건수 그래프¶
sumMonthly=bicycle.groupBy('year').pivot('month').agg({"count":"sum"})
pdf=sumMonthly.toPandas()
pdf.head()
year | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | 10 | 11 | 12 | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 2018 | 164367 | 168741 | 462661 | 687885.0 | 965609.0 | 1207123.0 | 1100015.0 | 1037505.0 | 1447993.0 | 1420621.0 | 961532.0 | 500822.0 |
1 | 2019 | 495573 | 471543 | 904819 | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN |
# year 컬럼이 없어야 그래프를 그릴 수 있음
my=pdf.drop('year', 1).transpose()
/var/folders/v4/z0sc2ps925v8zh04gvwkx_500000gn/T/ipykernel_73858/4098688903.py:2: FutureWarning: In a future version of pandas all arguments of DataFrame.drop except for the argument 'labels' will be keyword-only. my=pdf.drop('year', 1).transpose()
my
0 | 1 | |
---|---|---|
1 | 164367.0 | 495573.0 |
2 | 168741.0 | 471543.0 |
3 | 462661.0 | 904819.0 |
4 | 687885.0 | NaN |
5 | 965609.0 | NaN |
6 | 1207123.0 | NaN |
7 | 1100015.0 | NaN |
8 | 1037505.0 | NaN |
9 | 1447993.0 | NaN |
10 | 1420621.0 | NaN |
11 | 961532.0 | NaN |
12 | 500822.0 | NaN |
my.columns=[2018, 2019]
my.plot(kind='line')
<AxesSubplot:>
정리¶
%%writefile src/ds_spark_bicycle.py
import pyspark
import pyspark.sql.functions as F
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
def classifyQuarter(s):
q=""
s=int(s)
if 1<=s and s< 4:
q="Q1"
elif 4<=s and s<7:
q="Q2"
elif 7<=s and s<10:
q="Q3"
elif 10<=s and s<=12:
q="Q4"
else:
q="no"
return q
def doIt():
# create dataframe
_bicycle = spark.read.format('com.databricks.spark.csv')\
.options(header='true', inferschema='true').load('data/seoulBicycleDailyCount_2018_201903.csv')
# columns Date, Count
bicycle=_bicycle\
.withColumnRenamed("date", "Date")\
.withColumnRenamed(" count", "Count")
#bicycle=bicycle.withColumn("year", bicycle.Date.substr(1, 4))
#bicycle=bicycle.withColumn("month",bicycle.Date.substr(6, 2))
bicycle = bicycle\
.withColumn('year', F.year('date'))\
.withColumn('month', F.month('date'))
# column quarter
quarter_udf = udf(classifyQuarter, StringType())
bicycle=bicycle.withColumn("quarter", quarter_udf(bicycle.month))
# groupBy
bicycle.groupBy('quarter').count().show()
bicycle.groupBy('year').agg({"count":"sum"}).show()
bicycle.groupBy('quarter').agg({"count":"sum"}).show()
bicycle.groupBy('quarter').agg({"count":"avg"}).show()
bicycle.groupBy('year').pivot('month').agg({"count":"sum"}).show()
bicycle.groupBy('year').pivot('quarter').agg({"count":"sum"}).show()
sumMonthly=bicycle.groupBy('year').pivot('month').agg({"count":"sum"})
# graph
pdf=sumMonthly.toPandas()
my=pdf.drop('year', 1).transpose()
my.columns=[2018, 2019]
my.plot(kind='line')
if __name__ == "__main__":
myConf=pyspark.SparkConf()
spark = pyspark.sql.SparkSession.builder\
.master("local")\
.appName("myApp")\
.config(conf=myConf)\
.getOrCreate()
doIt()
spark.stop()
Overwriting src/ds_spark_bicycle.py
Spark SQL¶
관계형 데이터베이스 RDB에서 사용하는 Sql을 사용하여 DataFrame으로부터 데이터를 조회할 수 있다. DataFrame과 달리, RDD는 비구조적인 경우에 사용하므로 테이블로 변환한 후 Sql을 사용하게 된다.
wcDf.printSchema()
root |-- Competition: string (nullable = true) |-- Year: long (nullable = true) |-- Team: string (nullable = true) |-- Number: string (nullable = true) |-- Position: string (nullable = true) |-- FullName: string (nullable = true) |-- Club: string (nullable = true) |-- ClubCountry: string (nullable = true) |-- DateOfBirth: string (nullable = true) |-- IsCaptain: boolean (nullable = true)
wcDf.createOrReplaceTempView("wc")
spark.sql("select Club,Team,Year from wc").show(1)
+--------------------+---------+----+ | Club| Team|Year| +--------------------+---------+----+ |Club Atlético Ta...|Argentina|1930| +--------------------+---------+----+ only showing top 1 row
wcPlayers=spark.sql("select FullName,Club,Team,Year from wc")
wcPlayers.show(1)
+------------+--------------------+---------+----+ | FullName| Club| Team|Year| +------------+--------------------+---------+----+ |Ãngel Bossio|Club Atlético Ta...|Argentina|1930| +------------+--------------------+---------+----+ only showing top 1 row
spark.catalog.listTables()
[Table(name='wc', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True)]
# RDD로 변환
namesRdd=wcPlayers.rdd.map(lambda x: "Full name: "+x[0])
for e in namesRdd.take(5):
print (e)
Full name: Ãngel Bossio Full name: Juan Botasso Full name: Roberto Cherro Full name: Alberto Chividini Full name:
SQL Function과 Join¶
bucketDf=spark.createDataFrame([[1,["orange", "apple", "pineapple"]],
[2,["watermelon","apple","bananas"]]],
["bucketId","items"])
bucketDf.show(bucketDf.count(), truncate=False) #truncate : 자르지 말고 다 보여줘
+--------+----------------------------+ |bucketId|items | +--------+----------------------------+ |1 |[orange, apple, pineapple] | |2 |[watermelon, apple, bananas]| +--------+----------------------------+
fDf=spark.createDataFrame([["orange", "F1"],
["", "F2"],
["pineapple","F3"],
["watermelon","F4"],
["bananas","F5"]],
["item","itemId"])
fDf.show()
+----------+------+ | item|itemId| +----------+------+ | orange| F1| | | F2| | pineapple| F3| |watermelon| F4| | bananas| F5| +----------+------+
# explode
# 컬럼에 List 또는 배열이 포함된 경우 explode() 함수는 이를 flat해서 새로운 컬럼을 생성하게 된다.
from pyspark.sql.functions import explode
bDf=bucketDf.select(bucketDf.bucketId, explode(bucketDf.items).alias('item'))
bDf.show()
+--------+----------+ |bucketId| item| +--------+----------+ | 1| orange| | 1| apple| | 1| pineapple| | 2|watermelon| | 2| apple| | 2| bananas| +--------+----------+
joinDf=fDf.join(bDf, fDf.item==bDf.item, "inner")
joinDf.select(fDf.itemId,fDf.item,bDf.bucketId).show()
+------+----------+--------+ |itemId| item|bucketId| +------+----------+--------+ | F5| bananas| 2| | F1| orange| 1| | F3| pineapple| 1| | F4|watermelon| 2| +------+----------+--------+
문제 S-1: 네트워크에 불법적으로 침입하는 사용자의 분석¶
# 파일 내려받기
import os
_url = 'http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz'
_fname = os.path.join(os.getcwd(),'data','kddcup.data_10_percent.gz')
_rdd = spark.sparkContext.textFile(_fname)
_rdd.count()
494021
_rdd.take(1)
['0,tcp,http,SF,181,5450,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,9,9,1.00,0.00,0.11,0.00,0.00,0.00,0.00,0.00,normal.']
_allRdd=_rdd.map(lambda x: x.split(',')) #컴마로 분리하기!
_allRdd.take(1)
[['0', 'tcp', 'http', 'SF', '181', '5450', '0', '0', '0', '0', '0', '1', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '8', '8', '0.00', '0.00', '0.00', '0.00', '1.00', '0.00', '0.00', '9', '9', '1.00', '0.00', '0.11', '0.00', '0.00', '0.00', '0.00', '0.00', 'normal.']]
# Normal인 경우와 아닌 경우 구분하기!
_normalRdd=_allRdd.filter(lambda x: x[41]=="normal.")
_attackRdd=_allRdd.filter(lambda x: x[41]!="normal.")
_normalRdd.count()
97278
_attackRdd.count()
396743
정상, 공격 건수¶
데이터가 normal인 경우와 아닌 경우로 구분하자. filter()는 41번째 행을 조건에 따라 데이터를 구분한다. count() 함수로 건수를 계산하면 'normal' 97,278, 'attack'은 396,743 건이다.
_normalRdd=_allRdd.filter(lambda x: x[41]=="normal.")
_attackRdd=_allRdd.filter(lambda x: x[41]!="normal.")
_normalRdd.count()
97278
_attackRdd.count()
396743
attack별 건수¶
attack 종류는 41번째 열에 구분되어 있다. 총 494,021건을 정상 'noraml'과 나머지는 'attack'으로 구분한다. 'attack'은 크게 4종류로 나눈다. DOS는 서비스 거부, R2L 원격침입, U2R은 루트권한침입, probing은 탐지이다.
_41 = _allRdd.map(lambda x: (x[41], 1))
_41.reduceByKey(lambda x,y: x+y).collect()
[('normal.', 97278), ('buffer_overflow.', 30), ('loadmodule.', 9), ('perl.', 3), ('neptune.', 107201), ('smurf.', 280790), ('guess_passwd.', 53), ('pod.', 264), ('teardrop.', 979), ('portsweep.', 1040), ('ipsweep.', 1247), ('land.', 21), ('ftp_write.', 8), ('back.', 2203), ('imap.', 12), ('satan.', 1589), ('phf.', 4), ('nmap.', 231), ('multihop.', 7), ('warezmaster.', 20), ('warezclient.', 1020), ('spy.', 2), ('rootkit.', 10)]
# ReduceByKey 대신 GroupByKey 사용 케이스
_41 = _allRdd.map(lambda x: (x[41], 1))
def f(x): return len(x)
_41.groupByKey().mapValues(f).collect()
[('normal.', 97278), ('buffer_overflow.', 30), ('loadmodule.', 9), ('perl.', 3), ('neptune.', 107201), ('smurf.', 280790), ('guess_passwd.', 53), ('pod.', 264), ('teardrop.', 979), ('portsweep.', 1040), ('ipsweep.', 1247), ('land.', 21), ('ftp_write.', 8), ('back.', 2203), ('imap.', 12), ('satan.', 1589), ('phf.', 4), ('nmap.', 231), ('multihop.', 7), ('warezmaster.', 20), ('warezclient.', 1020), ('spy.', 2), ('rootkit.', 10)]
Dataframe 생성¶
열 0, 1, 2, 3, 4, 5, 41을 선별하여 스키마를 정해서 RDD를 생성한다.
from pyspark.sql import Row
_csv = _rdd.map(lambda l: l.split(","))
_csvRdd = _csv.map(lambda p:
Row(
duration=int(p[0]),
protocol=p[1],
service=p[2],
flag=p[3],
src_bytes=int(p[4]),
dst_bytes=int(p[5]),
attack=p[41]
)
)
_df=spark.createDataFrame(_csvRdd)
_df.printSchema()
_df.show(5)
root |-- duration: long (nullable = true) |-- protocol: string (nullable = true) |-- service: string (nullable = true) |-- flag: string (nullable = true) |-- src_bytes: long (nullable = true) |-- dst_bytes: long (nullable = true) |-- attack: string (nullable = true)
[Stage 207:> (0 + 1) / 1]
+--------+--------+-------+----+---------+---------+-------+ |duration|protocol|service|flag|src_bytes|dst_bytes| attack| +--------+--------+-------+----+---------+---------+-------+ | 0| tcp| http| SF| 181| 5450|normal.| | 0| tcp| http| SF| 239| 486|normal.| | 0| tcp| http| SF| 235| 1337|normal.| | 0| tcp| http| SF| 219| 1337|normal.| | 0| tcp| http| SF| 217| 2032|normal.| +--------+--------+-------+----+---------+---------+-------+ only showing top 5 rows
23/11/29 06:12:47 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 207 (TID 162): Attempting to kill Python Worker
attack 분류¶
네트워크 침입이 'attack' 또는 'normal'에 따라 구분해서 attackB 컬럼을 생성한다.
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
attack_udf = udf(lambda x: "normal" if x =="normal." else "attack", StringType())
myDf=_df.withColumn("attackB", attack_udf(_df.attack))
myDf.printSchema()
root |-- duration: long (nullable = true) |-- protocol: string (nullable = true) |-- service: string (nullable = true) |-- flag: string (nullable = true) |-- src_bytes: long (nullable = true) |-- dst_bytes: long (nullable = true) |-- attack: string (nullable = true) |-- attackB: string (nullable = true)
udf() 함수를 사용해서 if문으로 'noraml' 및 'attack'을 총 5가지 종류로 구분한다.¶
반환 값은 StringType()이다.
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
def classify41(s):
_5=""
if s=="normal.":
_5="normal"
elif s=="back." or s=="land." or s=="neptune." or s=="pod." or s=="smurf." or s=="teardrop.":
_5="dos"
elif s=="ftp_write." or s=="guess_passwd." or s=="imap." or s=="multihop." or s=="phf." or\
s=="spy." or s=="warezclient." or s=="warezmaster.":
_5="r2l"
elif s=="buffer_overflow." or s=="loadmodule." or s=="perl." or s=="rootkit.":
_5="u2r"
elif s=="ipsweep." or s=="nmap." or s=="portsweep." or s=="satan.":
_5="probing"
return _5
attack5_udf = udf(classify41, StringType())
myDf=myDf.withColumn("attack5", attack5_udf(_df.attack))
myDf.printSchema()
root |-- duration: long (nullable = true) |-- protocol: string (nullable = true) |-- service: string (nullable = true) |-- flag: string (nullable = true) |-- src_bytes: long (nullable = true) |-- dst_bytes: long (nullable = true) |-- attack: string (nullable = true) |-- attackB: string (nullable = true) |-- attack5: string (nullable = true)
myDf.show(5)
[Stage 208:> (0 + 1) / 1]
+--------+--------+-------+----+---------+---------+-------+-------+-------+ |duration|protocol|service|flag|src_bytes|dst_bytes| attack|attackB|attack5| +--------+--------+-------+----+---------+---------+-------+-------+-------+ | 0| tcp| http| SF| 181| 5450|normal.| normal| normal| | 0| tcp| http| SF| 239| 486|normal.| normal| normal| | 0| tcp| http| SF| 235| 1337|normal.| normal| normal| | 0| tcp| http| SF| 219| 1337|normal.| normal| normal| | 0| tcp| http| SF| 217| 2032|normal.| normal| normal| +--------+--------+-------+----+---------+---------+-------+-------+-------+ only showing top 5 rows
23/11/29 06:12:51 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 208 (TID 163): Attempting to kill Python Worker
attack, normal 특징 분석¶
attack5 별로 건수를 세어보자.
myDf.groupBy('attack5').count().show()
[Stage 209:> (0 + 1) / 1]
+-------+------+ |attack5| count| +-------+------+ |probing| 4107| | u2r| 52| | normal| 97278| | r2l| 1126| | dos|391458| +-------+------+
myDf.groupBy("protocol").count().show()
[Stage 212:> (0 + 1) / 1]
+--------+------+ |protocol| count| +--------+------+ | tcp|190065| | udp| 20354| | icmp|283602| +--------+------+
myDf.groupBy('attackB','protocol').count().show()
[Stage 215:> (0 + 1) / 1]
+-------+--------+------+ |attackB|protocol| count| +-------+--------+------+ | normal| udp| 19177| | normal| icmp| 1288| | normal| tcp| 76813| | attack| icmp|282314| | attack| tcp|113252| | attack| udp| 1177| +-------+--------+------+
myDf.groupBy('attackB').pivot('protocol').count().show()
[Stage 226:> (0 + 1) / 1]
+-------+------+------+-----+ |attackB| icmp| tcp| udp| +-------+------+------+-----+ | normal| 1288| 76813|19177| | attack|282314|113252| 1177| +-------+------+------+-----+
attack5 별로 공격의 특징을 분석해보자. 어떤 protocol, src_bytes, duration이 어떤지 계산할 수 있다.¶
myDf.groupBy('attack5').pivot('protocol').avg('src_bytes').show()
[Stage 240:> (0 + 1) / 1]
+-------+------------------+------------------+------------------+ |attack5| icmp| tcp| udp| +-------+------------------+------------------+------------------+ |probing|10.700793650793651| 261454.6003016591|25.235897435897435| | u2r| null| 960.8979591836735|13.333333333333334| | normal| 91.47049689440993|1439.3120305156679| 98.01220211711947| | r2l| null|271972.57460035523| null| | dos| 936.2672084368129| 1090.303422435458| 28.0| +-------+------------------+------------------+------------------+
myDf.groupBy('attack5').avg('duration').show()
[Stage 246:> (0 + 1) / 1]
+-------+--------------------+ |attack5| avg(duration)| +-------+--------------------+ |probing| 485.0299488677867| | u2r| 80.9423076923077| | normal| 216.65732231336992| | r2l| 559.7522202486679| | dos|7.254929008986916E-4| +-------+--------------------+
from pyspark.sql import functions as F
myDf.groupBy('attackB').pivot('protocol').agg(F.max('dst_bytes')).show()
[Stage 257:> (0 + 1) / 1]
+-------+----+-------+---+ |attackB|icmp| tcp|udp| +-------+----+-------+---+ | normal| 0|5134218|516| | attack| 0|5155468| 74| +-------+----+-------+---+
myDf.select("protocol", "duration", "dst_bytes")\
.filter(_df.duration>1000)\
.filter(_df.dst_bytes==0)\
.groupBy("protocol")\
.count()\
.show()
[Stage 263:> (0 + 1) / 1]
+--------+-----+ |protocol|count| +--------+-----+ | tcp| 139| +--------+-----+
SQL로 Attack 분석하기¶
_df.createOrReplaceTempView("_tab") # Dataframe을 Spark의 임시 테이블(View)로 등록
tcp_interactions = spark.sql(
"""
SELECT duration, dst_bytes FROM _tab
WHERE protocol = 'tcp' AND duration > 1000 AND dst_bytes = 0
""")
tcp_interactions_out = tcp_interactions.rdd\
.map(lambda p: "Duration: {}, Dest. bytes: {}".format(p.duration, p.dst_bytes))
for i, ti_out in enumerate(tcp_interactions_out.collect()):
if(i%10==0):
print (ti_out)
[Stage 266:> (0 + 1) / 1]
Duration: 5057, Dest. bytes: 0 Duration: 5043, Dest. bytes: 0 Duration: 5046, Dest. bytes: 0 Duration: 5051, Dest. bytes: 0 Duration: 5057, Dest. bytes: 0 Duration: 5063, Dest. bytes: 0 Duration: 42448, Dest. bytes: 0 Duration: 40121, Dest. bytes: 0 Duration: 31709, Dest. bytes: 0 Duration: 30619, Dest. bytes: 0 Duration: 22616, Dest. bytes: 0 Duration: 21455, Dest. bytes: 0 Duration: 13998, Dest. bytes: 0 Duration: 12933, Dest. bytes: 0
문제 S-2: Twitter JSON 데이터 읽기¶
트윗 데이터에는 사용자가 작성한 글이 있는데, 이 부분만 추출하여 같이 발생하는 단어 또는 단어 빈도를 분석한다.
t2df= spark.read.json(os.path.join("src","ds_twitter_seoul_3.json"))
res=t2df.select('id','lang','text').take(1)
for e in res:
print (e['id'],e['lang'],e['text'])
801657325836763136 en RT @soompi: #SEVENTEEN’s Mingyu, Jin Se Yeon, And Leeteuk To MC For 2016 Super Seoul Dream Concert https://t.co/1XRSaRBbE0 https://t.co/fi…
twitterDF= spark.read.json(os.path.join("src","ds_twitter_1_noquote.json"))
twitterDF.printSchema()
root |-- contributors: string (nullable = true) |-- coordinates: string (nullable = true) |-- created_at: string (nullable = true) |-- entities: struct (nullable = true) | |-- hashtags: array (nullable = true) | | |-- element: string (containsNull = true) | |-- symbols: array (nullable = true) | | |-- element: string (containsNull = true) | |-- urls: array (nullable = true) | | |-- element: string (containsNull = true) | |-- user_mentions: array (nullable = true) | | |-- element: string (containsNull = true) |-- favorite_count: long (nullable = true) |-- favorited: boolean (nullable = true) |-- geo: string (nullable = true) |-- id: long (nullable = true) |-- id_str: string (nullable = true) |-- in_reply_to_screen_name: string (nullable = true) |-- in_reply_to_status_id: string (nullable = true) |-- in_reply_to_status_id_str: string (nullable = true) |-- in_reply_to_user_id: string (nullable = true) |-- in_reply_to_user_id_str: string (nullable = true) |-- is_quote_status: boolean (nullable = true) |-- lang: string (nullable = true) |-- place: string (nullable = true) |-- retweet_count: long (nullable = true) |-- retweeted: boolean (nullable = true) |-- source: string (nullable = true) |-- text: string (nullable = true) |-- truncated: boolean (nullable = true) |-- user: struct (nullable = true) | |-- contributors_enabled: boolean (nullable = true) | |-- created_at: string (nullable = true) | |-- default_profile: boolean (nullable = true) | |-- default_profile_image: boolean (nullable = true) | |-- description: string (nullable = true) | |-- entities: struct (nullable = true) | | |-- description: struct (nullable = true) | | | |-- urls: array (nullable = true) | | | | |-- element: string (containsNull = true) | |-- favourites_count: long (nullable = true) | |-- follow_request_sent: boolean (nullable = true) | |-- followers_count: long (nullable = true) | |-- following: boolean (nullable = true) | |-- friends_count: long (nullable = true) | |-- geo_enabled: boolean (nullable = true) | |-- has_extended_profile: boolean (nullable = true) | |-- id: long (nullable = true) | |-- id_str: string (nullable = true) | |-- is_translation_enabled: boolean (nullable = true) | |-- is_translator: boolean (nullable = true) | |-- lang: string (nullable = true) | |-- listed_count: long (nullable = true) | |-- location: string (nullable = true) | |-- name: string (nullable = true) | |-- notifications: boolean (nullable = true) | |-- profile_background_color: string (nullable = true) | |-- profile_background_image_url: string (nullable = true) | |-- profile_background_image_url_https: string (nullable = true) | |-- profile_background_tile: boolean (nullable = true) | |-- profile_image_url: string (nullable = true) | |-- profile_image_url_https: string (nullable = true) | |-- profile_link_color: string (nullable = true) | |-- profile_sidebar_border_color: string (nullable = true) | |-- profile_sidebar_fill_color: string (nullable = true) | |-- profile_text_color: string (nullable = true) | |-- profile_use_background_image: boolean (nullable = true) | |-- protected: boolean (nullable = true) | |-- screen_name: string (nullable = true) | |-- statuses_count: long (nullable = true) | |-- time_zone: string (nullable = true) | |-- translator_type: string (nullable = true) | |-- url: string (nullable = true) | |-- utc_offset: string (nullable = true) | |-- verified: boolean (nullable = true)
twitterDF.select('text').show()
+---------------+ | text| +---------------+ |Hello 21 160924| +---------------+
twitterDF.createOrReplaceTempView("twitter")
spark.sql("select text from twitter").show()
+---------------+ | text| +---------------+ |Hello 21 160924| +---------------+
문제 S-5: JDBC를 사용해서 데이터 읽기¶
Spark는 자바를 통한 데이터베이스 연결 JDBC (Java DataBase Connectivity)를 지원한다. (시험에는 안나오지만 접속하는 방법 알아두기!)
import pyspark
import os
# 설정하기 위해 .set("spark.driver.extraClassPath", jar 파일) 이렇게 jar 라이브러리를 추가한다
# (또는 설정파일 conf/spark-defaults.conf에 'spark.driver.extraClassPath'를 추가해도 된다).
myConf=pyspark.SparkConf().set("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:10.2.0,com.databricks:spark-csv_2.11:1.5.0")\
.set("spark.driver.extraClassPath", os.path.join(os.getcwd(),"lib","sqlite-jdbc-3.41.0.1.jar"))
spark = pyspark.sql.SparkSession\
.builder\
.master("local")\
.appName("myApp")\
.config(conf=myConf)\
.getOrCreate()
23/11/29 06:13:28 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
spark.conf.get('spark.driver.extraClassPath')
'/Users/sojaehwi/Documents/GitHub/LECTURE_Bigdata/lib/sqlite-jdbc-3.41.0.1.jar'
import json
info = json.loads( '''[
{ "id" : "001", "x" : "2", "name" : "Chuck"},
{ "id" : "009", "x" : "7", "name" : "Brent" }
]''')
myDf= spark.read.json(spark.sparkContext.parallelize(info))
myDf.show()
+---+-----+---+ | id| name| x| +---+-----+---+ |001|Chuck| 2| |009|Brent| 7| +---+-----+---+
# Connection 하는방법
# myDf.write.format('jdbc')\
# .options(
# url="jdbc:sqlite:"+os.path.join(os.getcwd(),"customer1.db"),
# dbtable="customers",
# driver="org.sqlite.JDBC"
# ).mode('overwrite').save()
# df=spark.read.format('jdbc')\
# .options(
# url="jdbc:sqlite:"+os.path.join(os.getcwd(),"customer1.db"),
# dbtable="customers",
# driver="org.sqlite.JDBC"
# ).load()
MongoDB Spark Connector¶
앞서 pymongo를 사용하여 MongoDB에 연결하였다.
또 다른 방법은 Spark에서 직접 다른 라이브러리를 사용하지 않고 MongoDB를 연결할 수 있다.
그러면 MongoDB가 원격 서버가 되고, Spark가 클라이언트가 되게 된다.
Spark에서 MongoDB에 연결하기 위해서는 반드시 라이브러리를 통해야 하므로, 이를 쓸 수 있도록 설정해 주어야 한다.
import pyspark
# 커널을 재시작한다.
myConf=pyspark.SparkConf().set("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:10.2.0")
spark = pyspark.sql.SparkSession\
.builder\
.master("local")\
.appName("myApp")\
.config(conf=myConf)\
.getOrCreate()
23/11/29 06:17:53 WARN Utils: Your hostname, sojaehwiui-MacBookPro.local resolves to a loopback address: 127.0.0.1; using 172.30.1.29 instead (on interface en0) 23/11/29 06:17:53 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address Ivy Default Cache set to: /Users/sojaehwi/.ivy2/cache The jars for the packages stored in: /Users/sojaehwi/.ivy2/jars org.mongodb.spark#mongo-spark-connector_2.12 added as a dependency :: resolving dependencies :: org.apache.spark#spark-submit-parent-57826270-2557-484f-ae1a-633e59d06748;1.0 confs: [default]
:: loading settings :: url = jar:file:/Users/sojaehwi/opt/anaconda3/lib/python3.9/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml
found org.mongodb.spark#mongo-spark-connector_2.12;10.2.0 in central found org.mongodb#mongodb-driver-sync;4.8.2 in central [4.8.2] org.mongodb#mongodb-driver-sync;[4.8.1,4.8.99) found org.mongodb#bson;4.8.2 in central found org.mongodb#mongodb-driver-core;4.8.2 in central found org.mongodb#bson-record-codec;4.8.2 in central :: resolution report :: resolve 2796ms :: artifacts dl 5ms :: modules in use: org.mongodb#bson;4.8.2 from central in [default] org.mongodb#bson-record-codec;4.8.2 from central in [default] org.mongodb#mongodb-driver-core;4.8.2 from central in [default] org.mongodb#mongodb-driver-sync;4.8.2 from central in [default] org.mongodb.spark#mongo-spark-connector_2.12;10.2.0 from central in [default] --------------------------------------------------------------------- | | modules || artifacts | | conf | number| search|dwnlded|evicted|| number|dwnlded| --------------------------------------------------------------------- | default | 5 | 1 | 0 | 0 || 5 | 0 | --------------------------------------------------------------------- :: retrieving :: org.apache.spark#spark-submit-parent-57826270-2557-484f-ae1a-633e59d06748 confs: [default] 0 artifacts copied, 5 already retrieved (0kB/3ms) 23/11/29 06:17:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
spark.conf.get('spark.jars.packages')
'org.mongodb.spark:mongo-spark-connector_2.12:10.2.0'
MongoDB에 쓰기 & 읽기¶
import os
from src import mylib
import pymongo
keyPath=os.path.join(os.getcwd(), 'src', 'key.properties')
key=mylib.getKey(keyPath)
#uriLocal='mongodb://localhost:27017' # 로컬에 연결하기 위한 uri
uriCloud='mongodb+srv://sjh9708:'+key['mongo']+'@mycluster.tfiobq6.mongodb.net/?retryWrites=true&w=majority' # DB를 생략해도 된다.
myDf = spark.createDataFrame([("kim",10),("lee",20),("choi",30),("park",40)],["name", "age"])
myDf.show()
[Stage 0:> (0 + 1) / 1]
+----+---+ |name|age| +----+---+ | kim| 10| | lee| 20| |choi| 30| |park| 40| +----+---+
uriCloud
'mongodb+srv://sjh9708:B3cDTEYT4y3V2SME@mycluster.tfiobq6.mongodb.net/?retryWrites=true&w=majority'
# 데이터 Write
myDf.write.format("mongodb")\
.mode("append")\
.option("connection.uri", uriCloud)\
.option("database", "myDB4")\
.option("collection", "myDf4Collection")\
.save()
# 데이터 쓰기
myDf2 = spark.read.format("mongodb")\
.option("connection.uri", uriCloud)\
.option("database", "myDB4_")\
.option("collection", "myDf4Collection")\
.load()
myDf2.printSchema()
myDf2.show()
root
23/11/29 06:18:02 WARN Partitioner: Unable to get collection stats (collstats) returning a single partition. 23/11/29 06:18:02 WARN Partitioner: Unable to get collection stats (collstats) returning a single partition.
++ || ++ ++
지하철 공공데이터 가져오기¶
subwayDf=spark.read.format("mongodb")\
.option("connection.uri", uriCloud)\
.option("database", "ds_open_subwayPassengersDb")\
.option("collection", "db_open_subwayTable")\
.load()
23/11/29 06:18:02 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
subwayDf.select('CardSubwayTime.row.LINE_NUM').show(5)
+--------------------------+ | LINE_NUM| +--------------------------+ |[1호선, 1호선, 1호선, 1...| |[1호선, 1호선, 1호선, 1...| |[1호선, 1호선, 1호선, 1...| |[1호선, 1호선, 1호선, 1...| |[1호선, 1호선, 1호선, 1...| +--------------------------+ only showing top 5 rows
subwayDf.select('CardSubwayTime.row.LINE_NUM', 'CardSubwayTime.row.SUB_STA_NM',
'CardSubwayTime.row.FOUR_RIDE_NUM', 'CardSubwayTime.row.FOUR_ALIGHT_NUM')\
.show(5)
+--------------------------+------------------------------+--------------------+--------------------+ | LINE_NUM| SUB_STA_NM| FOUR_RIDE_NUM| FOUR_ALIGHT_NUM| +--------------------------+------------------------------+--------------------+--------------------+ |[1호선, 1호선, 1호선, 1...|[서울역, 동묘앞, 시청, 종각...|[654.0, 51.0, 37....|[17.0, 1.0, 0.0, ...| |[1호선, 1호선, 1호선, 1...| [종로5가, 동대문, 신설동, ...|[71.0, 715.0, 343...|[0.0, 14.0, 3.0, ...| |[1호선, 1호선, 1호선, 1...|[서울역, 동묘앞, 시청, 종각...|[654.0, 51.0, 37....|[17.0, 1.0, 0.0, ...| |[1호선, 1호선, 1호선, 1...| [종로5가, 동대문, 신설동, ...|[71.0, 715.0, 343...|[0.0, 14.0, 3.0, ...| |[1호선, 1호선, 1호선, 1...|[서울역, 동묘앞, 시청, 종각...|[654.0, 51.0, 37....|[17.0, 1.0, 0.0, ...| +--------------------------+------------------------------+--------------------+--------------------+ only showing top 5 rows
'Data Science > 데이터분석 (Spark)' 카테고리의 다른 글
[빅데이터분석] ML을 위한 Data Transform과 단어빈도 (1) | 2024.01.05 |
---|---|
[빅데이터분석] 통계 이론과 지표 계산하기 (0) | 2023.12.11 |
[빅데이터분석] Spark RDD 다루기 : 데이터 집계와 Paired-RDD (0) | 2023.11.27 |
[빅데이터분석] Spark RDD 다루기 : 구조 생성, 데이터 처리 및 Map-Reduce (0) | 2023.11.27 |
[빅데이터분석] Apache Spark가 각광받고 있는 이유에 대한 백엔드 개발자의 관찰 (1) | 2023.11.27 |