Last active
January 19, 2020 19:16
-
-
Save hadoopsters/63a455b9cdd533dd026bcdfeb396c424 to your computer and use it in GitHub Desktop.
Loading Data from Cassandra into Hadoop
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.datastax.spark.connector.cql.CassandraConnectorConf | |
import org.apache.spark.sql.{SaveMode, SparkSession} | |
import org.apache.spark.sql.cassandra._ | |
object CassandraLoader extends Serializable { | |
/** Representative of the some_keyspace.some_table table. */ | |
case class MyCassandraTable(user_id: String, `type`: Int, key: String, value: String) | |
def main(args: Array[String]) { // scalastyle:off method.length | |
/************************************************************************************** | |
* INITIATE SPARK SESSION | |
* This session will be used to ingest data from a Cassandra cluster. | |
*************************************************************************************/ | |
val spark = SparkSession | |
.builder() | |
.config("hive.merge.orcfile.stripe.level", "false") | |
.appName("Cassandra Data Loader") | |
.enableHiveSupport() | |
.getOrCreate() | |
// Implicits Allow us to Use .as[CaseClass] | |
import spark.implicits._ | |
/************************************************************************************** | |
* SETUP CASSANDRA CONNECTION | |
* These settings determine which environment, keyspace and table to download. | |
*************************************************************************************/ | |
val user = Map("spark.cassandra.auth.username" -> "some_username") | |
val pwd = Map("spark.cassandra.auth.password" -> "some_password") | |
// Setup an entrypoint into your cassandra cluster from spark | |
val hosts = "cassandra001.yourcompany.com,cassandra002.yourcompany.com,cassandra003.yourcompany.com" | |
val port = "9042" | |
// Set Cassandra Connection Configuration in Spark Session | |
spark.setCassandraConf( | |
CassandraConnectorConf.KeepAliveMillisParam.option(1000) ++ | |
CassandraConnectorConf.ConnectionHostParam.option(hosts) ++ | |
CassandraConnectorConf.ReadTimeoutParam.option(240000) ++ | |
CassandraConnectorConf.ConnectionPortParam.option(port) ++ | |
user ++ pwd) | |
// Imply which keyspace.table to consume from Cassandra | |
val table = Map("keyspace" -> "some_keyspace", "table" -> "some_table_in_that_keyspace") | |
/************************************************************************************** | |
* CONSUME DATA FROM CASSANDRA | |
* Use the connector, via the format() method, to pull the data and write it. | |
*************************************************************************************/ | |
val data = spark | |
.read | |
.format("org.apache.spark.sql.cassandra") | |
.options(table) | |
.load() | |
.as[MyCassandraTable] | |
// write to hdfs | |
data | |
.write | |
.option("orc.compress", "snappy") | |
.mode(SaveMode.Overwrite) | |
.orc("/some/location/in/hdfs/") | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment