Last active
December 13, 2019 14:40
-
-
Save diogobaltazar/51e98e21a93db3cb2b6deb8774e2f834 to your computer and use it in GitHub Desktop.
PySpark SQL comparison
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# DATA ######################################################## | |
''' test table | |
['str_col', 'int_col'] | |
('abc', 1) | |
('def1', 2) | |
('def1', 3) | |
('def1', 3) | |
+-------+-------+ | |
|str_col|int_col| | |
+-------+-------+ | |
| abc| 1| | |
| def1| 2| | |
| def1| 3| | |
| def1| 3| | |
+-------+-------+ | |
test_2 table: | |
['int_col', 'dt_col', 'str_col'] | |
(3, datetime.datetime(2019, 1, 1, 0, 0), 'non-matching') | |
(3, datetime.datetime(2019, 1, 1, 0, 0), 'def1') | |
(4, datetime.datetime(2020, 1, 1, 0, 0), 'kil') | |
+-------+-------------------+------------+ | |
|int_col| dt_col| str_col| | |
+-------+-------------------+------------+ | |
| 3|2019-01-01 00:00:00|non-matching| | |
| 3|2019-01-01 00:00:00| def1| | |
| 4|2020-01-01 00:00:00| kil| | |
+-------+-------------------+------------+ | |
''' | |
# SELECT DISTINCT ############################################# | |
stat = ( | |
'select distinct str_col ' | |
+ 'from test ' | |
#+ 'where test.str_col ~ \'[0-9]\'' | |
) | |
df_res = df.select('str_col').distinct() | |
# REGEX ####################################################### | |
stat = ( | |
'select * ' | |
+ 'from test ' | |
+ 'where test.str_col ~ \'[0-9]\'' | |
) | |
df_res = df.filter(df['str_col'].rlike('[0-9]')) | |
# LIMIT ####################################################### | |
stat = ( | |
'select * ' | |
+ 'from test ' | |
+ 'limit 1' | |
) | |
df_res = df.limit(1) | |
# GROUPING AND AGGREGATING #################################### | |
# grouping is used to get information F about the group | |
# grouping must be followed by an aggregation F of the group | |
# group by A, agg F by A, show (A, F) | |
# showing other cols requires grouping them as well | |
# (how would I display the agg result with more cols?) | |
stat = ( | |
'select round(avg(int_col), 2) as avg ' | |
+ 'from test ' | |
) | |
df_res = df.agg({"int_col": "avg"}) | |
df_res = df.agg(F.round(F.avg(df['int_col']), 2).alias('avg')) | |
stat = ( | |
'select round(avg(int_col), 2) as avg ' | |
+ 'from test ' | |
+ 'group by str_col' | |
) | |
df_res = ( | |
df | |
.groupBy('str_col') | |
.agg(F.round(F.avg(df['int_col']), 2).alias('avg')) | |
) | |
stat = ( | |
'select str_col ' | |
+ 'from test ' | |
+ 'group by str_col ' | |
) | |
df_res = ( | |
df | |
.groupBy('str_col') | |
.agg(F.count(df['int_col']))# obliged to specify an agg f | |
.select('str_col') | |
) | |
stat = ( | |
'select * ' | |
+ 'from test ' | |
+ 'group by str_col, int_col ' | |
) | |
df_res = ( | |
df | |
.groupBy('str_col', 'int_col') | |
.agg(F.count(df['int_col'])) # obliged to specify an agg f | |
.select('str_col', 'int_col') | |
) | |
''' result | |
['str_col', 'int_col'] [('def1', 3), ('abc', 1), ('def1', 2)] | |
+-------+-------+ | |
|str_col|int_col| | |
+-------+-------+ | |
| def1| 2| | |
| abc| 1| | |
| def1| 3| | |
+-------+-------+ | |
''' | |
stat = ( | |
'select max(int_col) ' | |
+ 'from test ' | |
) | |
df_res = df.agg({"int_col": "max"}) | |
stat = ( | |
'select max(int_col) ' | |
+ 'from test ' | |
+ 'group by str_col' | |
) | |
df_res = df.groupBy('str_col').agg({"int_col": "max"}) | |
stat = ( | |
'select max(int_col) as max_int ' | |
+ 'from test ' | |
+ 'group by str_col ' | |
+ 'having max(int_col) < 3 ' # could not use max_int | |
) | |
df_res = ( | |
df | |
.groupBy('str_col') | |
.agg(F.max(df['int_col']).alias('max_int')) | |
.filter(F.col('max_int') < 3) # could not use df['max_int'] | |
) | |
# ORDER/SORT ################################################ | |
stat = ( | |
'select * ' | |
+ 'from test ' | |
+ 'order by int_col' | |
) | |
df_res = df.sort(df['int_col']) | |
# CAST ###################################################### | |
stat = ( | |
'select dt_col::date ' | |
+ 'from test_2 ' | |
) | |
df_res = ( | |
df | |
.withColumn( | |
'new_col' | |
, F.date_format('dt_col', "yyyy-MM-dd") | |
) | |
.select('int_col', 'new_col') | |
.withColumnRenamed('new_col', 'dt_col') | |
) | |
''' result | |
['dt_col'] [(datetime.date(2019, 1, 1),)] | |
+-------+----------+ | |
|int_col| dt_col| | |
+-------+----------+ | |
| 3|2019-01-01| | |
+-------+----------+ | |
''' | |
# JOIN ###################################################### | |
# A.B (on 1 column) | |
stat = ( | |
'select * ' | |
+ 'from test as t ' | |
+ 'join test_2 as t2 ' # defaults to inner join | |
+ 'on t.int_col = t2.int_col;' | |
) | |
df_res = ( | |
df | |
.join(df_2, 'int_col') # defaults to inner join | |
) | |
''' result | |
['str_col', 'int_col', 'int_col', 'dt_col', 'str_col'] | |
('def1', 3, 3, datetime.datetime(2019, 1, 1, 0, 0), 'def1') | |
('def1', 3, 3, datetime.datetime(2019, 1, 1, 0, 0), 'non-matching') | |
('def1', 3, 3, datetime.datetime(2019, 1, 1, 0, 0), 'def1') | |
('def1', 3, 3, datetime.datetime(2019, 1, 1, 0, 0), 'non-matching') | |
+-------+-------+-------------------+------------+ | |
|int_col|str_col| dt_col| str_col| | |
+-------+-------+-------------------+------------+ | |
| 3| def1|2019-01-01 00:00:00|non-matching| | |
| 3| def1|2019-01-01 00:00:00| def1| | |
| 3| def1|2019-01-01 00:00:00|non-matching| | |
| 3| def1|2019-01-01 00:00:00| def1| | |
+-------+-------+-------------------+------------+ | |
''' | |
# A.B (on 2 columns) | |
stat = ( | |
'select * ' | |
+ 'from test as t ' | |
+ 'join test_2 as t2 ' | |
+ 'on t.int_col = t2.int_col ' | |
+ 'and t.str_col = t2.str_col' | |
) | |
df_res = ( | |
df | |
.join(df_2, ['int_col', 'str_col'], how = 'inner') | |
) | |
''' result | |
['str_col', 'int_col', 'int_col', 'dt_col', 'str_col'] | |
('def1', 3, 3, datetime.datetime(2019, 1, 1, 0, 0), 'def1') | |
('def1', 3, 3, datetime.datetime(2019, 1, 1, 0, 0), 'def1') | |
+-------+-------+-------------------+ | |
|int_col|str_col| dt_col| | |
+-------+-------+-------------------+ | |
| 3| def1|2019-01-01 00:00:00| | |
| 3| def1|2019-01-01 00:00:00| | |
+-------+-------+-------------------+ | |
''' | |
# A+A.B | |
stat = ( | |
'select * ' | |
+ 'from test as t ' | |
+ 'left join test_2 as t2 ' | |
+ 'on t.int_col = t2.int_col;' | |
) | |
df_res = ( | |
df | |
.join(df_2, 'int_col', 'left') | |
) | |
''' result | |
['str_col', 'int_col', 'int_col', 'dt_col', 'str_col'] | |
('abc', 1, None, None, None) | |
('def1', 2, None, None, None) | |
('def1', 3, 3, datetime.datetime(2019, 1, 21, 0, 0), 'def1') | |
('def1', 3, 3, datetime.datetime(2019, 1, 1, 0, 0), 'non-matching') | |
('def1', 3, 3, datetime.datetime(2019, 1, 21, 0, 0), 'def1') | |
('def1', 3, 3, datetime.datetime(2019, 1, 1, 0, 0), 'non-matching') | |
+-------+-------+-------------------+------------+ | |
|int_col|str_col| dt_col| str_col| | |
+-------+-------+-------------------+------------+ | |
| 1| abc| null| null| | |
| 3| def1|2019-01-01 00:00:00|non-matching| | |
| 3| def1|2019-01-21 00:00:00| def1| | |
| 3| def1|2019-01-01 00:00:00|non-matching| | |
| 3| def1|2019-01-21 00:00:00| def1| | |
| 2| def1| null| null| | |
+-------+-------+-------------------+------------+ | |
''' | |
# A-B | |
stat = ( | |
'select * ' | |
+ 'from test as t ' | |
+ 'left join test_2 as t2 ' | |
+ 'on t.int_col = t2.int_col ' | |
+ 'where dt_col is null; # need not specify the tbl | |
) | |
df_res = ( | |
df | |
.join(df_2, 'int_col', 'left') | |
.filter(F.col('dt_col').isNull()) # col means whatever ds res col, UNLESS it's ambiguous | |
) | |
''' result | |
['str_col', 'int_col', 'int_col', 'dt_col', 'str_col'] | |
('abc', 1, None, None, None) | |
('def1', 2, None, None, None) | |
+-------+-------+------+-------+ | |
|int_col|str_col|dt_col|str_col| | |
+-------+-------+------+-------+ | |
| 1| abc| null| null| | |
| 2| def1| null| null| | |
+-------+-------+------+-------+ | |
''' | |
# B+A.B | |
stat = ( | |
'select * ' | |
+ 'from test as t ' | |
+ 'right join test_2 as t2 ' | |
+ 'on t.int_col = t2.int_col;' | |
) | |
df_res = ( | |
df | |
.join(df_2, 'int_col', 'right') | |
) | |
''' result | |
['str_col', 'int_col', 'int_col', 'dt_col', 'str_col'] | |
('def1', 3, 3, datetime.datetime(2019, 1, 21, 0, 0), 'def1') | |
('def1', 3, 3, datetime.datetime(2019, 1, 1, 0, 0), 'non-matching') | |
('def1', 3, 3, datetime.datetime(2019, 1, 21, 0, 0), 'def1') | |
('def1', 3, 3, datetime.datetime(2019, 1, 1, 0, 0), 'non-matching') | |
(None, None, 4, datetime.datetime(2020, 1, 2, 0, 0), 'kil') | |
+-------+-------+-------------------+------------+ | |
|int_col|str_col| dt_col| str_col| | |
+-------+-------+-------------------+------------+ | |
| 3| def1|2019-01-01 00:00:00|non-matching| | |
| 3| def1|2019-01-01 00:00:00|non-matching| | |
| 3| def1|2019-01-21 00:00:00| def1| | |
| 3| def1|2019-01-21 00:00:00| def1| | |
| 4| null|2020-01-02 00:00:00| kil| | |
+-------+-------+-------------------+------------+ | |
''' | |
# B-A | |
stat = ( | |
'select * ' | |
+ 'from test as t ' | |
+ 'right join test_2 as t2 ' | |
+ 'on t.int_col = t2.int_col ' | |
+ 'where str_col is null;' | |
) | |
df_res = ( | |
df | |
.join(df_2, 'int_col', 'right') | |
.filter(df['str_col'].isNul()) | |
) | |
''' result | |
['str_col', 'int_col', 'int_col', 'dt_col', 'str_col'] | |
(None, None, 4, datetime.datetime(2020, 1, 2, 0, 0), 'kil') | |
+-------+-------+-------------------+-------+ | |
|int_col|str_col| dt_col|str_col| | |
+-------+-------+-------------------+-------+ | |
| 4| null|2020-01-02 00:00:00| kil| | |
+-------+-------+-------------------+-------+ | |
''' | |
# A+B | |
stat = ( | |
'select * ' | |
+ 'from test as t ' | |
+ 'full join test_2 as t2 ' | |
+ 'on t.int_col = t2.int_col;' | |
) | |
df_res = ( | |
df | |
.join(df_2, 'int_col', 'full') | |
) | |
''' result | |
['str_col', 'int_col', 'int_col', 'dt_col', 'str_col'] | |
('abc', 1, None, None, None) | |
('def1', 2, None, None, None) | |
('def1', 3, 3, datetime.datetime(2019, 1, 21, 0, 0), 'def1') | |
('def1', 3, 3, datetime.datetime(2019, 1, 1, 0, 0), 'non-matching') | |
('def1', 3, 3, datetime.datetime(2019, 1, 21, 0, 0), 'def1') | |
('def1', 3, 3, datetime.datetime(2019, 1, 1, 0, 0), 'non-matching') | |
(None, None, 4, datetime.datetime(2020, 1, 2, 0, 0), 'kil') | |
+-------+-------+-------------------+------------+ | |
|int_col|str_col| dt_col| str_col| | |
+-------+-------+-------------------+------------+ | |
| 1| abc| null| null| | |
| 3| def1|2019-01-01 00:00:00|non-matching| | |
| 3| def1|2019-01-21 00:00:00| def1| | |
| 3| def1|2019-01-01 00:00:00|non-matching| | |
| 3| def1|2019-01-21 00:00:00| def1| | |
| 2| def1| null| null| | |
| 4| null|2020-01-02 00:00:00| kil| | |
+-------+-------+-------------------+------------+ | |
''' | |
# (A-B)+(B-A) | |
stat = ( | |
'select * ' | |
+ 'from test as t ' | |
+ 'full join test_2 as t2 ' | |
+ 'on t.int_col = t2.int_col ' | |
+ 'where str_col is null ' | |
+ 'or dt_col is null;' # mind the OR instead of AND | |
) | |
def_res = ( | |
df | |
.join(df_2, 'int_col', 'full') | |
.where(df['str_col'].isNull() | df['str_col'].isNull()) | |
) | |
''' result | |
['str_col', 'int_col', 'int_col', 'dt_col', 'str_col'] | |
('abc', 1, None, None, None) | |
('def1', 2, None, None, None) | |
(None, None, 4, datetime.datetime(2020, 1, 2, 0, 0), 'kil') | |
+-------+-------+-------------------+-------+ | |
|int_col|str_col| dt_col|str_col| | |
+-------+-------+-------------------+-------+ | |
| 1| abc| null| null| | |
| 2| def1| null| null| | |
| 4| null|2020-01-02 00:00:00| kil| | |
+-------+-------+-------------------+-------+ | |
''' | |
# UNION/INTERSECTION/DIFFERENCE ############################### | |
stat = ( | |
'select t.int_col, t.str_col ' | |
+ 'from test as t ' | |
+ 'union all ' # all == include duplicate rows | |
'select t2.int_col, t2.str_col ' | |
+ 'from test_2 as t2 ' | |
) | |
df_res = ( | |
df.select('int_col', 'str_col') | |
.union(df_2.select('int_col', 'str_col')) | |
) | |
''' result | |
['int_col', 'str_col'] | |
(1, 'abc') | |
(2, 'def1') | |
(3, 'def1') | |
(3, 'def1') | |
(3, 'non-matching') | |
(3, 'def1') | |
(4, 'kil') | |
+-------+------------+ | |
|int_col| str_col| | |
+-------+------------+ | |
| 1| abc| | |
| 2| def1| | |
| 3| def1| | |
| 3| def1| | |
| 3|non-matching| | |
| 3| def1| | |
| 4| kil| | |
+-------+------------+ | |
''' | |
stat = ( | |
'select t.int_col, t.str_col ' | |
+ 'from test as t ' | |
+ 'union ' | |
'select t2.int_col, t2.str_col ' | |
+ 'from test_2 as t2 ' | |
) | |
df_res = ( | |
df.select('int_col', 'str_col') | |
.union(df_2.select('int_col', 'str_col')) | |
.distinct() # remove duplicate rows | |
) | |
''' result | |
['int_col', 'str_col'] | |
(3, 'def1') | |
(4, 'kil') | |
(2, 'def1') | |
(1, 'abc') | |
(3, 'non-matching') | |
+-------+------------+ | |
|int_col| str_col| | |
+-------+------------+ | |
| 3| def1| | |
| 4| kil| | |
| 1| abc| | |
| 3|non-matching| | |
| 2| def1| | |
+-------+------------+ | |
''' | |
stat = ( | |
'select t.int_col, t.str_col ' | |
+ 'from test as t ' | |
+ 'intersect ' | |
'select t2.int_col, t2.str_col ' | |
+ 'from test_2 as t2 ' | |
) | |
df_res = ( | |
df.select('int_col', 'str_col') | |
.intersect(df_2.select('int_col', 'str_col')) | |
) | |
'''result | |
['str_col', 'int_col'] | |
('def1', 3) | |
+-------+-------+ | |
|str_col|int_col| | |
+-------+-------+ | |
| def1| 3| | |
+-------+-------+ | |
''' | |
stat = ( | |
'select t.int_col, t.str_col ' | |
+ 'from test as t ' | |
+ 'except ' | |
'select t2.int_col, t2.str_col ' | |
+ 'from test_2 as t2 ' | |
) | |
df_res = ( | |
df | |
.join(df_2, ['int_col', 'str_col'], 'left') | |
.filter(df_2['dt_col'].isNull()) # so the col that df doesn't have | |
.select('str_col', 'int_col') | |
) | |
'''result | |
['str_col', 'int_col'] | |
('abc', 1) | |
('def1', 2) | |
+-------+-------+ | |
|str_col|int_col| | |
+-------+-------+ | |
| def1| 2| | |
| abc| 1| | |
+-------+-------+ | |
''' | |
# DATA TYPES ################################################## | |
timestamp = 'https://www.postgresql.org/docs/9.1/functions-datetime.html' | |
# EXERCISES ################################################### | |
stat = ( | |
'select ' | |
+ 'extract(year from dt_col) as year' | |
+ ', sum(int_col) ' # aggregation funct | |
+ 'from test_2 ' | |
+ 'group by year ' | |
+ 'order by year desc' # alias col year is recognized | |
) | |
df_res = ( | |
df | |
.select( | |
'int_col' # explicit col used up ahead | |
, F.year('dt_col').alias('year')) | |
.groupBy('year') | |
.agg(F.sum('int_col').alias('sum')) # aggregation funct | |
.sort(F.col('year').desc()) # recon alias col == F.col(col) | |
) | |
''' result | |
['year', 'sum'] | |
(2020.0, 4) | |
(2019.0, 6) | |
+----+---+ | |
|year|sum| | |
+----+---+ | |
|2020| 4| | |
|2019| 6| | |
+----+---+ | |
''' | |
# CURRYING | |
stat = ( | |
'select * ' | |
+ 'from test as t ' | |
+ 'where t.int_col in (' | |
+ 'select extract(day from dt_col) as day ' | |
+ 'from test_2 as t2' | |
+ ')' | |
) | |
# the function must return a pyspark.sql.functions.udf | |
# and take any type of args | |
def in_days(col_name, days_df): | |
days = list( | |
map( | |
lambda _: _[col_name] | |
, days_df.collect() | |
) | |
) | |
# currying: returning a function | |
# taking as args the non-specified args | |
# which must be of type pyspark.sql.column | |
@F.udf(returnType=T.BooleanType()) | |
def in_days_curry(col_value): | |
return col_value in days | |
return in_days_curry | |
df_res = ( | |
df | |
.filter( | |
in_days('day', df_2.select(F.dayofmonth('dt_col').alias('day'))) # currying with: | |
(df['int_col']) # int_col column | |
) | |
) | |
'''result | |
['str_col', 'int_col'] | |
('abc', 1) | |
('def1', 2) | |
+-------+-------+ | |
|str_col|int_col| | |
+-------+-------+ | |
| abc| 1| | |
| def1| 2| | |
+-------+-------+ | |
''' | |
# get all df.str_col, search them in df_2.str_col, get sum S of df_2.int_col, | |
# filter df by int_col, searching for divisors of S | |
stat = ( | |
'with S as (' | |
+ 'select sum(t2.int_col) as S ' | |
+ 'from test_2 as t2 ' | |
+ 'where t2.str_col not in (' | |
# all distinct test strings | |
+ 'select distinct t.str_col ' | |
+ 'from test as t ' | |
+ ')' | |
+ ') ' | |
+ 'select * ' | |
+ 'from test as t, S ' | |
+ 'where ((S.S * 2) % t.int_col) = 0 ' # as to get all even and 1 | |
) | |
def filt_str(df): | |
# cols of df | |
df_col_vals = list(map(lambda _: _.str_col, df.collect())) | |
@F.udf(returnType = T.BooleanType()) | |
def filt_str_udf(str_col_val): | |
return str_col_val not in df_col_vals | |
return filt_str_udf | |
def filt_by_divisers(dividend): | |
@F.udf(returnType = T.BooleanType()) | |
def filt_by_divisers_udf(col_val): | |
return dividend % col_val == 0 | |
return filt_by_divisers_udf | |
df_res = ( | |
df | |
.filter( | |
filt_by_divisers( # currying | |
df_2 | |
.filter( # currying | |
filt_str(df.select('str_col')) | |
(df_2['str_col']) | |
) | |
.select(F.sum('int_col').alias('sum')) | |
.head().sum * 2 # as to get all even and 1 | |
) | |
(df['int_col']) | |
) | |
) | |
'''result | |
['str_col', 'int_col', 's'] | |
('abc', 1, 7) | |
('def1', 2, 7) | |
+-------+-------+ | |
|str_col|int_col| | |
+-------+-------+ | |
| abc| 1| | |
| def1| 2| | |
+-------+-------+ | |
''' |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment