Skip to content

Instantly share code, notes, and snippets.

@MarioZZJ
Created August 19, 2022 06:59
Show Gist options
  • Save MarioZZJ/54336b039aeddd187c102ee494f46b24 to your computer and use it in GitHub Desktop.
Save MarioZZJ/54336b039aeddd187c102ee494f46b24 to your computer and use it in GitHub Desktop.
from glob import glob
from lxml import etree
import gc
import gzip
from dbutils.pooled_db import PooledDB
import pymysql
import threading
import logging
import time
import pandas as pd
import numpy as np
local_obj = threading.local()
sql_document = """
INSERT INTO documents
(document_id, external_id, external_id_type, title, authors_name_str, document_type, venue_str, abstract_short, doi, publish_date, publish_year, keywords_str, data_source)
VALUES
(%s, %s,'pmid',%s,%s,'Journal',%s,%s,%s,%s,%s,%s,%s)
"""
sql_document_abstract = """
INSERT INTO document_abstract
(document_id, abstract_full, data_source)
VALUES
(%s,%s,%s)
"""
sql_document_authors = """
INSERT INTO document_authors
(document_id, author_display_name, author_rank, author_last_name, author_fore_name, author_initials, author_affiliation_name, data_source)
VALUES
(%s, %s, %s, %s, %s, %s, %s,%s)
"""
sql_document_venues = """
INSERT INTO document_venues
(document_id, venue_display_name, venue_name, venue_year, venue_volume, venue_issue, venue_page, venue_type,data_source,issn,domain,field,subfield)
VALUES
(%s, %s, %s, %s, %s, %s, %s, 'Journal',%s,%s,%s,%s,%s)
"""
sql_document_keywords = """
INSERT INTO document_keywords
(document_id, keyword_str, data_source)
VALUES
(%s, %s,%s)
"""
class Base:
def __init__(self):
self.pool = self.create_pool()
def create_pool(self):
"""
创建数据库连接池
:return: 连接池
"""
pool = PooledDB(creator=pymysql,
maxconnections=12, # 连接池允许的最大连接数,0和None表示不限制连接数
mincached=0, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
maxcached=None, # 链接池中最多闲置的链接,0和None不限制
maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制
blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
host='127.0.0.1', # 此处必须是是127.0.0.1
port=3306,
user='cradmin',
passwd='CRadmin.888',
db='creativity_web',
charset='utf8mb4')
return pool
def save_mysql(self, sql, arg, filename):
"""
保存数据库
:param sql: 执行sql语句 str
:param arg: 添加的sql语句的参数 list[]
"""
id = None
try:
db = self.pool.connection() # 连接数据池
cursor = db.cursor() # 获取游标
cursor.execute(sql,arg)
# id = db.insert_id()
id = cursor.lastrowid
db.commit()
except:
logging.exception("error in save_mysql at file " + filename)
db.rollback()
finally:
cursor.close()
db.close()
return id
def save_mysql_many(self, sql, args, filename):
"""
保存数据库
:param sql: 执行sql语句 str
:param arg: 添加的sql语句的参数 list[tuple()]
"""
try:
db = self.pool.connection() # 连接数据池
cursor = db.cursor() # 获取游标
cursor.executemany(sql,args)
db.commit()
except:
logging.exception("error in save_mysql_many at file " + filename)
db.rollback()
finally:
cursor.close()
db.close()
def parse_files(self, filepaths):
for filepath in filepaths:
filename = filepath[-20:-7]
with gzip.open(XML_DIRECTORY + filename + ".xml.gz", 'rb') as f:
gc.collect()
root = etree.fromstring(f.read())
articleSet = root.xpath('/PubmedArticleSet/PubmedArticle')
local_obj.abstract_insert = []
local_obj.venues_insert = []
local_obj.keywords_insert = []
local_obj.author_insert = []
local_obj.document_insert = []
for article in articleSet:
pmid = int(article.xpath('MedlineCitation/PMID[@Version="1"]/text()')[0]) if article.xpath('MedlineCitation/PMID[@Version="1"]/text()') else -1
journal_volume = article.xpath('MedlineCitation/Article/Journal/JournalIssue/Volume/text()')[0] if article.xpath('MedlineCitation/Article/Journal/JournalIssue/Volume/text()') else ''
journal_issue = article.xpath('MedlineCitation/Article/Journal/JournalIssue/Issue/text()')[0] if article.xpath('MedlineCitation/Article/Journal/JournalIssue/Issue/text()') else ''
j_p_year = article.xpath('MedlineCitation/Article/Journal/JournalIssue/PubDate/Year/text()')[0] if article.xpath('MedlineCitation/Article/Journal/JournalIssue/PubDate/Year/text()') else ''
j_p_month = article.xpath('MedlineCitation/Article/Journal/JournalIssue/PubDate/Month/text()')[0] if article.xpath('MedlineCitation/Article/Journal/JournalIssue/PubDate/Month/text()') else ''
j_p_day = article.xpath('MedlineCitation/Article/Journal/JournalIssue/PubDate/Day/text()')[0] if article.xpath('MedlineCitation/Article/Journal/JournalIssue/PubDate/Day/text()') else ''
journal_pubdate = (''+j_p_year + '.' + j_p_month + '.' + j_p_day)
if j_p_year == '':
j_p_year = -1
issn = article.xpath('MedlineCitation/Article/Journal/ISSN/text()')[0].replace('-','') if article.xpath('MedlineCitation/Article/Journal/ISSN/text()') else ''
domain = ''
field = ''
subfield = ''
if issn in issn_match.keys():
domain = issn_match[issn]["domain"]
field = issn_match[issn]["field"]
subfield = issn_match[issn]["subfield"]
journal_title = article.xpath('MedlineCitation/Article/Journal/Title/text()')[0] if article.xpath('MedlineCitation/Article/Journal/Title/text()') else ''
pagination = article.xpath('MedlineCitation/Article/Pagination/MedlinePgn/text()')[0] if article.xpath('MedlineCitation/Article/Pagination/MedlinePgn/text()') else ''
if journal_volume != '':
venue_str = journal_title + '. ' + journal_pubdate + '. Vol ' + journal_volume + ', Issue ' + journal_issue
else:
venue_str = journal_title + '. ' + journal_pubdate
article_title = article.xpath('MedlineCitation/Article/ArticleTitle/text()')[0] if article.xpath('MedlineCitation/Article/ArticleTitle/text()') else ''
authors = article.xpath('MedlineCitation/Article/AuthorList/Author')
authors_name = ""
author_info_list = []
author_index = 0
for author in authors:
author_index += 1
authors_name += "; "
fn = author.xpath('ForeName/text()')[0] if author.xpath('ForeName/text()') else ''
authors_name += fn
authors_name += ' '
lf = author.xpath('LastName/text()')[0] if author.xpath('LastName/text()') else ''
authors_name += lf
ini = author.xpath('Initials/text()')[0] if author.xpath('Initials/text()') else ''
af = author.xpath('AffiliationInfo/Affiliation/text()')[0] if author.xpath('AffiliationInfo/Affiliation/text()') else ''
author_info = [fn + ' ' + lf,author_index,lf,fn,ini,af]
author_info_list.append(author_info)
authors_name = authors_name[2:]
abstract_text = article.xpath('MedlineCitation/Article/Abstract/AbstractText/text()')[0] if article.xpath('MedlineCitation/Article/Abstract/AbstractText/text()') else ''
KeywordList = article.xpath('MedlineCitation/KeywordList/Keyword')
keywords_str = ""
keywords_list = []
for keyword in KeywordList:
keywords_str += "; "
keywords_str += keyword.text if keyword.text else ""
if keyword.text:
keywords_list.append(keyword.text[:255])
keywords_str = keywords_str[2:] if len(keywords_str) > 1 else ""
articleIds = article.xpath('PubmedData/ArticleIdList/ArticleId')
doi = ''
for aId in articleIds:
if aId.get('IdType') == 'doi':
doi = str(aId.text) if aId.text else ''
if pmid == -1:
continue
else:
id = 100000000 + pmid
try:
data = [id, pmid,
article_title[:255],
authors_name[:511],
venue_str[:255],
abstract_text[:511],
doi[:64],
journal_pubdate,
j_p_year,
keywords_str[:511],filename]
local_obj.document_insert.append(data)
if len(local_obj.document_insert) >= 950:
self.save_mysql_many(sql_document, local_obj.document_insert, filename)
local_obj.document_insert = []
local_obj.abstract_insert.append((id,abstract_text,filename))
if len(local_obj.abstract_insert) >= 950:
self.save_mysql_many(sql_document_abstract,local_obj.abstract_insert, filename)
local_obj.abstract_insert = []
for author_info in author_info_list:
local_obj.author_insert.append((id, author_info[0],author_info[1],author_info[2],author_info[3],author_info[4], author_info[5][:511],filename))
if len(local_obj.author_insert) >= 950:
self.save_mysql_many(sql_document_authors,local_obj.author_insert, filename)
local_obj.author_insert = []
local_obj.venues_insert.append((id, venue_str[:511], journal_title[:255], j_p_year, journal_volume[:16], journal_issue[:16], pagination[:32],filename,issn,domain,field,subfield))
if len(local_obj.venues_insert) >= 950:
self.save_mysql_many(sql_document_venues,local_obj.venues_insert, filename)
local_obj.venues_insert = []
for keyword in keywords_list:
local_obj.keywords_insert.append((id, keyword,filename))
if len(local_obj.keywords_insert) >= 950:
self.save_mysql_many(sql_document_keywords,local_obj.keywords_insert, filename)
local_obj.keywords_insert = []
except Exception as e:
logging.exception("error in for_article at file " + filename)
continue
self.save_mysql_many(sql_document_abstract,local_obj.abstract_insert, filename)
self.save_mysql_many(sql_document_authors,local_obj.author_insert, filename)
self.save_mysql_many(sql_document_venues,local_obj.venues_insert, filename)
self.save_mysql_many(sql_document_keywords,local_obj.keywords_insert, filename)
self.save_mysql_many(sql_document, local_obj.document_insert, filename)
def multi_worker(self, filepaths, workers):
threads = []
for i in range(workers):
threads.append(threading.Thread(target=self.parse_files,args=[filepaths[i::workers]],daemon=False))
print(threads)
for thread in threads:
thread.start()
while True:
for i in range(len(threads)-1,-1,-1):
if threads[i].is_alive():
time.sleep(60)
else:
print(threads[i].name + " ends at " + time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time())))
threads.pop(i)
if len(threads) == 0:
break
if __name__ == '__main__':
XML_DIRECTORY = r'/home/mariozzj/data/pubmed/'
filepaths = sorted(list(glob(XML_DIRECTORY + r'*.gz')))
issn_classification = pd.read_csv("/home/mariozzj/data/issn_classification.csv")
issn_match = {}
for index,row in issn_classification.iterrows():
issn = str(row.issn1)
domain = str(row.domain)
field = str(row.field)
subfield = str(row.subfield)
issn_match[issn] = {
"domain": domain,
"field": field,
"subfield": subfield,
}
if str(row.issn2) != 'nan':
issn2 = str(row.issn2)
issn_match[issn2] = {
"domain": domain,
"field": field,
"subfield": subfield,
}
if str(row.issn3) != 'nan':
issn3 = str(row.issn3)
issn_match[issn3] = {
"domain": domain,
"field": field,
"subfield": subfield,
}
b = Base()
sem=threading.Semaphore(12)
b.multi_worker(filepaths, 12)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment