Skip to content

Instantly share code, notes, and snippets.

@gbraccialli
Last active March 25, 2020 05:23
Show Gist options
  • Save gbraccialli/827a2180ffbfecec594483fb3875cf8d to your computer and use it in GitHub Desktop.
Save gbraccialli/827a2180ffbfecec594483fb3875cf8d to your computer and use it in GitHub Desktop.
GIT_PROJECT = "xxxx"
PROJECT = "aaaaa"
USERNAME = "guilherme"
BRANCH = "develop"
SPARK_MODE = "local" # local or yarn
%run /home/jupyter/kedro_load.py $GIT_PROJECT $PROJECT $USERNAME $BRANCH $SPARK_MODE
######################################################################
def randomString(stringLength=10):
import random, string
"""Generate a random string of fixed length """
letters = string.ascii_lowercase
return ''.join(random.choice(letters) for i in range(stringLength))
def init_spark(spark_hadoop_path,username):
import os
import findspark
findspark.init(
spark_hadoop_path
)
from pyspark.sql import SparkSession
if (SPARK_MODE=="yarn"):
os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3"
queue = f"{username}_{randomString(5)}"
spark = (
SparkSession.builder
.master("yarn")
.config("spark.sql.execution.arrow.enabled", "false")
.config("spark.yarn.queue",queue)
.appName(queue)
.getOrCreate()
)
else:
spark = (
SparkSession.builder
.master("local[*]")
.config("spark.driver.memory", "4g")
.config("spark.sql.execution.arrow.enabled", "false")
.appName(f"app-{username}")
.getOrCreate()
)
return spark
import logging
import pathlib
import datetime
import os
import sys
import getpass
import importlib
GIT_PROJECT = sys.argv[1]
PROJECT = sys.argv[2]
USERNAME = sys.argv[3]
BRANCH = sys.argv[4]
SPARK_MODE = sys.argv[5]
spark = init_spark("/usr/lib/spark/",USERNAME)
from pyspark.sql import functions as F
from pyspark.sql import Window as W
from pyspark.sql.types import *
import pandas as pd
os.system("mkdir /tmp/code")
os.system("chmod 777 -R /tmp/code")
os.system(f"rm -rf /tmp/code/{USERNAME}_{PROJECT}/")
os.system(f"mkdir -p /tmp/code/{USERNAME}_{PROJECT}/")
os.system("chmod 777 -R /tmp/code")
sync_command1 = f"cd /tmp/code/{USERNAME}_{PROJECT}/; git clone https://github.com/xxx/{GIT_PROJECT}.git;"
os.system(sync_command1)
sync_command2 = f"cd /tmp/code/{USERNAME}_{PROJECT}/{GIT_PROJECT}/; git pull; git checkout {BRANCH};"
os.system(sync_command2)
#logging.info(sync_command)
#print(sync_command1)
#print(sync_command2)
LOCAL_BASE_PATH = f'/tmp/code/{USERNAME}_{PROJECT}/{GIT_PROJECT}/'
CONF_ROOT = "conf"
PROJECT_BASE_PATH = f'{LOCAL_BASE_PATH}'
LOGS_DIR = f'{LOCAL_BASE_PATH}/logs/'
VIZUALIZATION_DIR = f'{LOCAL_BASE_PATH}/conf/visualization'
pathlib.Path(LOGS_DIR).mkdir(parents=True, exist_ok=True)
pathlib.Path(VIZUALIZATION_DIR).mkdir(parents=True, exist_ok=True)
os.chdir(PROJECT_BASE_PATH) # Move to project root
path_to_add = f"{PROJECT_BASE_PATH}/src/"
if path_to_add not in sys.path:
sys.path.insert(0, path_to_add)
#logging.getLogger().handlers[0].setLevel(logging.ERROR)
logging.getLogger("py4j.java_gateway").setLevel(logging.ERROR)
from kedro.context import KedroContext, load_context
project_context = load_context(f"{PROJECT_BASE_PATH}")
io = project_context.io
catalog = project_context.catalog
pipeline = project_context.pipeline
logging.getLogger("kedro").setLevel(logging.ERROR)
logging.getLogger("kedro.io").setLevel(logging.ERROR)
logging.getLogger("kedro.pipeline").setLevel(logging.INFO)
def run_pipeline(pipeline):
from kedro.runner import SequentialRunner
SequentialRunner().run(pipeline, io)
import pandas as pd
pd.set_option('display.max_columns', None)
print("kedro loaded at {}".format(datetime.datetime.now()))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment