Last active
September 2, 2015 05:55
-
-
Save hiropppe/747d97269e6d6f2ca605 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
from pyspark.sql import Row, SQLContext | |
from pyspark.mllib.feature import HashingTF | |
from pyspark.mllib.feature import IDF | |
from pyspark.mllib.regression import LabeledPoint | |
from pyspark.mllib.classification import SVMWithSGD, SVMModel | |
from pyspark.mllib.classification import NaiveBayes, NaiveBayesModel | |
import MeCab | |
default_stop_pos = ['接続詞', '助詞', '助動詞', '記号'] | |
def mecab_analyzer(text, stop_pos=default_stop_pos): | |
mecab = MeCab.Tagger('-Ochasen') | |
encoded_text = text.encode('utf-8') | |
node = mecab.parseToNode(encoded_text) | |
node = node.next | |
word = [] | |
while node: | |
surface = node.surface | |
feature_array = node.feature.split(',') | |
if feature_array[0] == 'BOS/EOS' or feature_array[0] in stop_pos: | |
node = node.next | |
continue | |
if feature_array[6] == '*': | |
w = surface | |
else: | |
w = feature_array[6] | |
word.append(w.decode('utf-8')) | |
node = node.next | |
return word | |
# Classification(SparkML mllib) | |
pos_files = sc.wholeTextFiles("hdfs://hdp1.containers.dev:9000/user/root/data/binary_clf/all/1") | |
neg_files = sc.wholeTextFiles("hdfs://hdp1.containers.dev:9000/user/root/data/binary_clf/all/0") | |
xy = pos_files.map(lambda x: (mecab_analyzer(x[1]), 1.0)).union(neg_files.map(lambda x: (mecab_analyzer(x[1]), 0.0))) | |
xy = xy.map(lambda x: (x, np.random.rand())).sortBy(lambda x: x[1]).map(lambda x: x[0]) | |
fold = 3 | |
hashingTF = HashingTF() | |
# split rdd | |
data = [] | |
as_list = xy.collect() | |
size = len(as_list)/fold | |
for i in range(fold-1): | |
data.append(as_list[size*i:size*(i+1)]) | |
data.append(as_list[size*(i+1):]) | |
# cv | |
cv_accuracy = [] | |
cv_precision = [] | |
cv_recall = [] | |
for i in range(fold): | |
print 'iteration', i | |
train = [] | |
for k in range(fold): | |
data_idx = k+i | |
if(fold <= data_idx): | |
data_idx -= fold | |
if(k < fold - 1): | |
print data_idx, 'train data', | |
train.extend(data[data_idx]) | |
else: | |
print data_idx, 'test data' | |
test = data[data_idx] | |
## HashingTF model | |
train_xy = sc.parallelize(train).map(lambda x: LabeledPoint(x[1], hashingTF.transform(x[0]))) | |
test_xy = sc.parallelize(test).map(lambda x: LabeledPoint(x[1], hashingTF.transform(x[0]))) | |
## HashingTF + IDF model | |
#train_xy = sc.parallelize(train).map(lambda x: (x[1], hashingTF.transform(x[0]))) | |
#test_xy = sc.parallelize(test).map(lambda x: (x[1], hashingTF.transform(x[0]))) | |
#x_tf = train_xy.map(lambda x: x[1]).union(test_xy.map(lambda x: x[1])) | |
#idf_model = IDF().fit(x_tf) | |
#tmp = [] | |
#for x in train_xy.collect(): | |
# tmp.append((x[0], idf_model.transform(x[1]))) | |
#train_xy = sc.parallelize(tmp).map(lambda x: LabeledPoint(x[0], x[1])) | |
#del tmp[:] | |
#for x in test_xy.collect(): | |
# tmp.append((x[0], idf_model.transform(x[1]))) | |
#test_xy = sc.parallelize(tmp).map(lambda x: LabeledPoint(x[0], x[1])) | |
### Error applying IDF | |
### It appears that you are attempting to reference SparkContext from a broadcast | |
### variable, action, or transforamtion. SparkContext can only be used on the driver, | |
### not in code that it run on workers. For more information, see SPARK-5063. | |
##train_xy = train_xy.map(lambda x: LabeledPoint(x[0], idf_model.transform(x[1]))) | |
##test_xy = test_xy.map(lambda x: LabeledPoint(x[0], idf_model.transform(x[1]))) | |
# train | |
model = SVMWithSGD.train(train_xy, iterations=100) | |
#model= NaiveBayes.train(train_xy, 0.05) | |
# test data | |
labelsAndPreds = test_xy.map(lambda p: (p.label, model.predict(p.features))) | |
data_size = test_xy.count() | |
pos_size = float(test_xy.filter(lambda p: p.label == 1.0).count()) | |
neg_size = data_size - pos_size | |
pos_pred_size = labelsAndPreds.filter(lambda (v, p): p == 1.0).count() | |
neg_pred_size = data_size - pos_pred_size | |
pos_acc_size = labelsAndPreds.filter(lambda (v, p): v == 1.0 and v == p).count() | |
neg_acc_size = labelsAndPreds.filter(lambda (v, p): v == 0.0 and v == p).count() | |
acc_size = pos_acc_size + neg_acc_size | |
# metrics | |
accuracy = acc_size / float(data_size) | |
precision = pos_acc_size / float(pos_pred_size) | |
recall = pos_acc_size / float(pos_size) | |
f1 = 2 * (precision * recall) / (precision + recall) | |
score = {'accuracy': accuracy, 'recall': recall, 'precision': precision, 'f1': f1} | |
print score | |
cv_accuracy.append(score['accuracy']) | |
cv_precision.append(score['precision']) | |
cv_recall.append(score['recall']) | |
accuracy = sum(cv_accuracy)/len(cv_accuracy) | |
precision = sum(cv_precision)/len(cv_precision) | |
recall = sum(cv_recall)/len(cv_recall) | |
cv_score = {'accuracy': accuracy, 'precision': precision, 'recall': recall, 'f1': 2 * (precision * recall) / (precision + recall)} | |
print cv_score |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment