Setup Environmnet variables for Hadoop.
export HADOOP_VERSION=2.8.5
export HADOOP_HOME=${HOME}/hadoop-$HADOOP_VERSION
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export PATH=${HADOOP_HOME}/bin:$PATH
Download Hadoop files.
DROP TABLE IF EXISTS target_tables; | |
CREATE TEMP TABLE target_tables AS ( | |
SELECT | |
DISTINCT tbl AS target_table_id, | |
sti.schema AS target_schema, | |
sti.table AS target_table, | |
sti.database AS cluster, | |
query | |
FROM stl_insert | |
JOIN SVV_TABLE_INFO sti ON sti.table_id = tbl |
SELECT | |
c.table_catalog as cluster, | |
c.table_schema as schema_name, | |
c.table_name as table_name, | |
c.column_name as col_name, | |
c.data_type as col_type , | |
pgcd.description as col_description, | |
ordinal_position as col_sort_order | |
FROM | |
INFORMATION_SCHEMA.COLUMNS c |
SELECT | |
t.table_catalog as cluster, | |
t.table_schema as schema_name, | |
t.table_name as table_name, | |
t.table_type as table_type, | |
pt.tableowner as table_owner, | |
pgtd.description as description | |
FROM | |
information_schema.tables as t | |
INNER JOIN pg_catalog.pg_statio_all_tables as st on |
def personality_udf(personality_mapping: dict): | |
@udf(returnType=IntegerType()) | |
def inner(cat): | |
return personality_mapping.get(cat, -1) | |
return inner | |
def compute_gvector_udf(personality_mapping: dict): | |
@udf(returnType=ArrayType(DoubleType())) | |
def inner(ngvector, cgvector, suma): |
# Install quinn>=0.3.1 | |
from quinn.extensions.dataframe_ext import DataFrame | |
def with_idx(id_col: str, output_col: str) -> Callable[[DataFrame], DataFrame]: | |
def inner(df: DataFrame) -> DataFrame: | |
window = Window.orderBy(id_col) | |
unique_activity_ids = df \ | |
.select(id_col).distinct() \ | |
.withColumn(output_col, F.row_number().over(window)) |
trait SparkWriter { | |
def write(df: DataFrame, mode: WriteMode = Overwrite): Unit | |
} | |
class SparkJsonWriter(path: String, partitions: Int = 1) extends SparkWriter { | |
def write(df: DataFrame, mode: WriteMode = Overwrite): Unit = { | |
df | |
.coalesce(partitions) | |
.write | |
.mode(mode) |
object Filters { | |
def filter(filters: Seq[Column])(df: DataFrame): DataFrame = { | |
filters.foldLeft(df)((df, filter) => df.filter(filter)) | |
} | |
} | |
trait SparkReader { | |
protected def execute(reader: DataFrameReader): DataFrame | |
def read(schema: Option[StructType] = None, filters: Seq[Column] = Seq.empty)(implicit sparkSession: SparkSession): DataFrame = { |
class ActivityInsightsJob(activityReader: SparkReader, | |
analyticsReader: SparkReader, | |
insightsWriter: SparkWriter | |
)(implicit val sparkSession: SparkSession) extends SparkTask { | |
def run(): Unit = { | |
val metricsDF = analyticsReader.read(Some(AnalyticsSchema)) | |
.transform(Events.isActivityImpression) | |
.transform(Events.isActivityView) | |
.transform(Events.isBookmarked) |
export SPARK_OPTS='--packages org.apache.hadoop:hadoop-aws:2.8.5 --conf spark.hadoop.fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider' | |
pyspark |
Setup Environmnet variables for Hadoop.
export HADOOP_VERSION=2.8.5
export HADOOP_HOME=${HOME}/hadoop-$HADOOP_VERSION
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export PATH=${HADOOP_HOME}/bin:$PATH
Download Hadoop files.