Created
January 24, 2023 06:30
-
-
Save harry-stark/0a0315d9ac918f5db6b7ed2c9c62d972 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from spark_session_builder import build_spark_session | |
spark= build_spark_session(master="spark://cpu128-dy-r6i-32xlarge-3:7077",num_cores=128,mem_gb=999) | |
from pyspark.ml.feature import MinHashLSH,MinHashLSHModel | |
from pyspark.ml.linalg import Vectors | |
import time | |
from pyspark.sql.functions import col | |
from pyspark.ml.feature import MinHashLSH, Tokenizer, HashingTF | |
hash_size=100 | |
threshold=0.8 | |
start=time.time() | |
data = spark.read.parquet("/fsx/shared/pilev2_parquet/StackExchange_ver4/dataset.parquet").limit(1000) | |
print(data.count()) | |
# mh=MinHashLSHModel(inputCol="text",outputCol="minhash",numHashTables=hash_size) | |
# model=mh.fit(data) | |
# data_transformed=model.transform(data) | |
# lsh=MinHashLSH(inputCol="minhash",outputCol="duplicates",threshold=threshold) | |
# lsh_model=lsh.fit(data_transformed) | |
# duplicates=lsh_model.approxSimilarityJoin(data_transformed,data_transformed,threshold) | |
#data = spark.read.parquet(*input_files) | |
# Tokenize text column and convert to vectors | |
tokenizer = Tokenizer(inputCol="text", outputCol="words") | |
wordsData = tokenizer.transform(data) | |
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=hash_size) | |
featurizedData = hashingTF.transform(wordsData) | |
# Convert vectors to MinHash | |
mh = MinHashLSH(inputCol="rawFeatures", outputCol="minhash") | |
model = mh.fit(featurizedData) | |
data_transformed = model.transform(featurizedData) | |
print(data_transformed.count()) | |
print("hashed") | |
data_transformed.show() | |
raise Exception("stop") | |
# Perform LSH to find near duplicates | |
duplicates = model.approxSimilarityJoin(data_transformed, data_transformed, threshold) | |
# Write output to file | |
# duplicates.write.parquet(output_file) | |
print(duplicates.count()) | |
end1=time.time() | |
duplicates.write.parquet("./results/outs.parquet") | |
end2=time.time() | |
print("Dedup time without writing",end1-start) | |
print("Dedup time with writing",end2-start) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment