Skip to content

Instantly share code, notes, and snippets.

@almugabo
Created May 3, 2019 04:13
Show Gist options
  • Save almugabo/974824e1e00ee6d47bb90026a48d92ab to your computer and use it in GitHub Desktop.
Save almugabo/974824e1e00ee6d47bb90026a48d92ab to your computer and use it in GitHub Desktop.
open patent services
import requests
from base64 import b64encode
import requests
import json
UrlAuth = 'https://ops.epo.org/3.2/auth/accesstoken'
UrlServiceBase = 'https://ops.epo.org/3.2/rest-services/published-data'
UrlServiceBaseSearch = UrlServiceBase + '/search/biblio/?q='
class OpenPatentService():
# TO DO :
# [1] Parsing json responses
# [2] get equivalents
def __init__(self, consumer_key, consumer_secret):
#authorization credentials
self.consumer_key = consumer_key
self.consumer_secret = consumer_secret
def get_token(self):
'''
Get token for the Open Patent Service
Input:
KeyDict = a dictionary of credetianls to the service
example KeyDict = {'KeyConsumer': 'MxxxmkhgjhgVGAozoghsNka',
'KeySecret': 'uhkdscvhaxka'}
Please Note :
Token expiry every 20 minutes
'''
#Get Access Token
headers = {'Authorization': 'Basic {0}'.format(
b64encode(
'{0}:{1}'.format(self.consumer_key, self.consumer_secret).encode('ascii')
).decode('ascii')
),
'Content-Type': 'application/x-www-form-urlencoded',
}
payload = {'grant_type': 'client_credentials'}
Response = requests.post(UrlAuth, headers=headers, data=payload)
if Response.status_code == 200:
TokenResult = Response.json()['access_token']
else:
TokenResult = 'no token ! error ' + str(Response.status_code)
return TokenResult
def get_biblio(self,
PatentNr,
PatentNrType = 'publication',
PatentNrFormat= 'inpadoc',
Constituent = 'biblio'):
'''
#retrieve patent bibliographic data given a number
# Input
PatentNr e.g. 'EP1000000.A1'
PatentNrType can be 'publication', 'application' or 'priority'
PatentNrFormat can be 'epodoc' or 'inpadoc
Constituent = 'biblio'
'''
#get token
TokenId = self.get_token()
# create headers
RequestHeaders = {'Authorization' : 'Bearer {0}'.format(TokenId), 'Accept': 'application/json'}
UrlRequest = UrlServiceBase + '/' + '/'.join([PatentNrType,PatentNrFormat,PatentNr, Constituent ])
if xResponse.text.find('Access token has expired') != -1:
TokenId = get_token()
RequestHeaders = {'Authorization' : 'Bearer {0}'.format(TokenId), 'Accept': 'application/json'}
xResponse = requests.get(UrlRequest, headers = RequestHeaders)
if xResponse.status_code == 200:
return xResponse.content
else:
print('!!! ERROR')
def search_biblio(self,
xQueryText,
xRange = '1-10'):
'''
search functionality.
the Query text shuold follow Espacenet
Context Query Language
e.g.
inventor=ursula keller
xQueryText = "in='ursula keller'"
Output:
search results
'''
#get token
TokenId = self.get_token()
# create headers
RequestHeaders = {'Authorization' : 'Bearer {0}'.format(TokenId), 'Accept': 'application/json'}
UrlRequest = UrlServiceBaseSearch + xQueryText + '&Range=' + xRange
xResponse = requests.get(UrlRequest, headers = RequestHeaders)
if xResponse.text.find('Access token has expired') != -1:
TokenId = get_token()
RequestHeaders = {'Authorization' : 'Bearer {0}'.format(TokenId), 'Accept': 'application/json'}
xResponse = requests.get(UrlRequest, headers = RequestHeaders)
if xResponse.status_code == 200:
return xResponse.content
else:
print('!!! ERROR')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment