Created
July 11, 2021 16:52
-
-
Save kurochan/f8f826559a0ed2d33facb0282619358d to your computer and use it in GitHub Desktop.
investigation of Snowpark runtime environment
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.kurochan.snowpark_test | |
import com.snowflake.snowpark._ | |
import com.snowflake.snowpark.functions._ | |
import collection.JavaConverters._ | |
object Main { | |
def createSession(): Session = { | |
val configs = Map ( | |
"URL" -> "https://xxxxxxx.ap-northeast-1.aws.snowflakecomputing.com:443", | |
"USER" -> "TEST_USER1", | |
"PASSWORD" -> "xxxxxxxxxxxxxxxxxxxxxxxxxx", | |
"ROLE" -> "SYSADMIN", | |
"WAREHOUSE" -> "COMPUTE_WH", | |
"DB" -> "SANDBOX", | |
"SCHEMA" -> "PUBLIC" | |
) | |
Session.builder.configs(configs).create | |
} | |
def getProps(s: String): String = { | |
val props = System.getProperties | |
val names = props.propertyNames().asScala | |
val propStrings = names.map(n => s"${n.toString}: ${props.getProperty(n.toString)}") | |
propStrings.foldLeft("")((x, y) => s"${x}, ${y}") | |
} | |
def main(args: Array[String]): Unit = { | |
val session = createSession() | |
val getPropsUdf = udf(getProps _) | |
val table = session.range(1) | |
val res = table.select(getPropsUdf(col("ID"))).collect().head | |
println(res.toString()) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.kurochan.snowpark_test | |
import com.snowflake.snowpark._ | |
import com.snowflake.snowpark.functions._ | |
object Main { | |
def createSession(): Session = { | |
val configs = Map ( | |
"URL" -> "https://xxxxxxx.ap-northeast-1.aws.snowflakecomputing.com:443", | |
"USER" -> "TEST_USER1", | |
"PASSWORD" -> "xxxxxxxxxxxxxxxxxxxxxxxxxx", | |
"ROLE" -> "SYSADMIN", | |
"WAREHOUSE" -> "COMPUTE_WH", | |
"DB" -> "SANDBOX", | |
"SCHEMA" -> "PUBLIC" | |
) | |
Session.builder.configs(configs).create | |
} | |
val randomId: String = scala.util.Random.alphanumeric.take(10).mkString | |
def getRandomId(s: String): String = { | |
randomId | |
} | |
var counter = 0 | |
def incrementAndGet(s: String): Long = { | |
counter += 1 | |
counter | |
} | |
def getProcessors(s: String): Long = { | |
Runtime.getRuntime.availableProcessors() | |
} | |
def getMemory(s: String): Long = { | |
Runtime.getRuntime.maxMemory() | |
} | |
def getCurrentMemory(s: String): Long = { | |
Runtime.getRuntime.totalMemory() | |
} | |
def main(args: Array[String]): Unit = { | |
val session = createSession() | |
val randomIdUdf = udf(getRandomId _) | |
val incrementAndGetUdf = udf(incrementAndGet _) | |
val getProcessorsUdf = udf(getProcessors _) | |
val getMemoryUdf = udf(getMemory _) | |
val getCurrentMemoryUdf = udf(getCurrentMemory _) | |
val table = session.table("large_log_table").limit(1000000) | |
val data = table.select( | |
randomIdUdf(col("name")).as("static_id"), | |
incrementAndGetUdf(col("name")).as("counter_value"), | |
getProcessorsUdf(col("name")).as("processors"), | |
getMemoryUdf(col("name")).as("memory"), | |
getCurrentMemoryUdf(col("name")).as("current_memory"), | |
) | |
val localResult = data.groupBy(col("static_id"), col("counter_value")) | |
.agg( | |
min(col("processors")).as("min_processors"), | |
max(col("processors")).as("max_processors"), | |
(min(col("memory"))).as("min_memory"), | |
(max(col("memory"))).as("max_memory"), | |
(max(col("current_memory"))).as("current_memory"), | |
count(lit(1)).as("count"), | |
) | |
val clusterResult = localResult.groupBy(col("static_id")) | |
.agg( | |
min(col("min_processors")).as("min_processors"), | |
max(col("max_processors")).as("max_processors"), | |
(min(col("min_memory")) / 1024 / 1024 / 1024).as("min_memory"), | |
(max(col("max_memory")) / 1024 / 1024 / 1024).as("max_memory"), | |
(max(col("current_memory")) / 1024 / 1024 / 1024).as("current_memory"), | |
max(col("count")).as("conflict_count"), | |
sum(col("count")).as("exec_count") | |
) | |
.sort(col("exec_count").desc) | |
clusterResult.show(100) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment