Created
October 25, 2022 14:39
-
-
Save joanteixi/1a82c1d6b6116ed80d8a96bcbfcf9a98 to your computer and use it in GitHub Desktop.
Spark read csv file from line N
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# reads a csv | |
def read_csv_from_rown_number(path, row_number = 1, sep=';', schema = None, ): | |
# load csv as raw text abd add a Id clumns | |
df = spark.read.text(path) | |
df = df.withColumn('idRowNbField', f.monotonically_increasing_id()) | |
df = df.where('idRowNbField >= %s' % row_number) | |
#extract header and clean header | |
header = df.limit(1).select('value').collect()[0] | |
header_list= list(header.asDict()['value'].split(sep)) | |
header_list = [field.strip() for field in header_list] | |
# remove header and idRowNbField field | |
df = df.where('idRowNbField > 1') | |
df = df.drop('idRowNbField') | |
# remove quotes from every doc... | |
df= df.withColumn('value', f.regexp_replace('value', '"', '')) | |
# write in temp table | |
random_name = f.rand() | |
df.write.csv('/tmp/%s.csv' % random_name, mode='overwrite') | |
# load as CSV | |
csvFile = spark.read.format('csv').option('sep', sep).option("mode", "PERMISSIVE").option("quote", "") | |
if schema: | |
csvFile = csvFile.schema(schema) | |
df = csvFile.load('/tmp/%s.csv' % random_name) | |
for i, column in enumerate(header_list): | |
df = df.withColumnRenamed('_c'+str(i), column) | |
return df |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment