168 lines
6.6 KiB
Python
168 lines
6.6 KiB
Python
import cherrypy
|
|
import requests
|
|
import threading
|
|
import time
|
|
|
|
from oauthlib.oauth2 import BackendApplicationClient
|
|
from requests_oauthlib import OAuth2Session
|
|
from urllib.parse import quote
|
|
|
|
|
|
class TwitchClient:
|
|
def __init__(self, token, secret, freq=2):
|
|
self.token = token
|
|
self.lock = threading.Lock()
|
|
|
|
self.header_v6 = {'Client-ID': self.token}
|
|
self.urlbase_v6 = 'https://api.twitch.tv/helix'
|
|
|
|
self.last_q = time.time()
|
|
self.delay = 1 / freq
|
|
|
|
# authentication
|
|
self.oauth = ''
|
|
self.oauth_token_url = 'https://id.twitch.tv/oauth2/token'
|
|
self.auth_secret = secret
|
|
oauth_clint = BackendApplicationClient(client_id=self.token)
|
|
self.oauth_session = OAuth2Session(client=oauth_clint)
|
|
self.update_oauth()
|
|
|
|
def update_oauth(self):
|
|
"""
|
|
Update self.oauth token based on client id and secret.
|
|
:return: nothing
|
|
"""
|
|
token = self.oauth_session.fetch_token(token_url=self.oauth_token_url,
|
|
client_secret=self.auth_secret,
|
|
include_client_id=True)
|
|
self.oauth = token['access_token']
|
|
|
|
def do_q_auth_v6(self, base, header):
|
|
"""
|
|
Do query with v6 authentication header and single retry.
|
|
:param base: string with requesting URL
|
|
:param header: dictionary of http headers
|
|
:return: string with response or None
|
|
"""
|
|
result = self.do_q(base, header | {'Authorization': 'Bearer ' + self.oauth})
|
|
if result is not None:
|
|
return result
|
|
self.update_oauth()
|
|
return self.do_q(base, header | {'Authorization': 'Bearer ' + self.oauth})
|
|
|
|
def do_q(self, base, header):
|
|
"""
|
|
Do query for twitch server
|
|
:param base: string with requesting URL
|
|
:param header: dictionary of http headers
|
|
:return: string with response or None
|
|
"""
|
|
self.lock.acquire() # Lock for 1 at time query
|
|
try:
|
|
cherrypy.log('Request: %s' % base)
|
|
delta = time.time() - self.last_q # Delta for correct query freq
|
|
if delta < self.delay:
|
|
time.sleep(delta) # Sleep remaining time
|
|
r = requests.get(base, headers=header).json()
|
|
error_message = r.get("error", "")
|
|
if len(error_message) > 0:
|
|
cherrypy.log(f'Request: fail with error "{error_message}"')
|
|
r = None
|
|
else:
|
|
cherrypy.log('Request: OK')
|
|
self.last_q = time.time()
|
|
except requests.exceptions.RequestException as e:
|
|
cherrypy.log('Request: FAIL')
|
|
cherrypy.log('Error: {}'.format(e))
|
|
r = None
|
|
finally:
|
|
self.lock.release() # Do not forget to release lock
|
|
return r
|
|
|
|
def get_base(self, ver):
|
|
"""
|
|
Get base which is depended on API version
|
|
:param ver: string with API version ('v5' or 'v6')
|
|
:return: tuple with list of headers and URL string
|
|
:raises: value error on incorrect API version
|
|
"""
|
|
if ver == 'v6':
|
|
return self.header_v6, self.urlbase_v6
|
|
else:
|
|
raise ValueError('Not supported API version')
|
|
|
|
# - # - #
|
|
|
|
def raw_query_v6(self, q):
|
|
"""
|
|
Do a query with API v6
|
|
:param q: query string
|
|
:return: string with get query result or None
|
|
"""
|
|
header, base = self.get_base('v6')
|
|
return self.do_q_auth_v6(base + q, header)
|
|
|
|
def get_game_id_v6(self, name):
|
|
"""
|
|
Getting game id with API v6
|
|
:param name: string with the name of game
|
|
:return: tuple of integer with game id and string with game name or (None,None)
|
|
"""
|
|
|
|
header, base = self.get_base('v6')
|
|
r = self.do_q_auth_v6('{}/games?name={}'.format(base, name), header)
|
|
if r and r.get('data'):
|
|
return r['data'][0]['id'], r['data'][0]['name']
|
|
|
|
def get_live_streams_v6(self, name, lang):
|
|
"""
|
|
Getting list of livestreams with API v5
|
|
:param name: string with the name of game
|
|
:param lang: string with the shortcut of language
|
|
:return: list of all streams which are live with this format -
|
|
https://dev.twitch.tv/docs/v5/reference/search/#search-streams
|
|
"""
|
|
result = {'_total': 0, 'streams': []}
|
|
game_id = self.get_game_id_v6(quote(name))
|
|
if game_id is None:
|
|
return result
|
|
|
|
header, base = self.get_base('v6')
|
|
init_q_template = "{}/streams?language={}&first={}&game_id={}"
|
|
q_template = "{}/streams?language={}&first={}&after={}&game_id={}"
|
|
data = self.do_q_auth_v6(init_q_template.format(base, lang, 100, game_id[0]), header)
|
|
result['streams'].extend(data['data'])
|
|
while len(data.get('data', [])) > 0: # there must be non zero value, but search is kinda broken now
|
|
result['streams'].extend(data['data'])
|
|
data = self.do_q_auth_v6(q_template.format(base, lang, 100, data['pagination']['cursor'], game_id[0]), header)
|
|
return self.unique_streams_v6(result)
|
|
|
|
def get_irl_live_streams_v6(self, lang):
|
|
header, base = self.get_base('v6')
|
|
init_q_template = "{}/streams?language={}&first={}{}"
|
|
q_template = "{}/streams?language={}&first={}&after={}{}"
|
|
|
|
game_id = ''
|
|
irl_ids = ["509660", "509673", "509667", "509669", "509670", "509658",
|
|
"509672", "509671", "509664", "509663", "417752", "509659"]
|
|
for irl_id in irl_ids:
|
|
game_id += '&game_id={}'.format(irl_id)
|
|
|
|
result = {'_total': 0, 'streams': []}
|
|
data = self.do_q_auth_v6(init_q_template.format(base, lang, 100, game_id), header)
|
|
result['streams'].extend(data['data'])
|
|
while len(data.get('data', [])) > 0: # there must be non zero value, but search is kinda broken now
|
|
result['streams'].extend(data['data'])
|
|
data = self.do_q_auth_v6(q_template.format(base, lang, 100, data['pagination']['cursor'], game_id), header)
|
|
return self.unique_streams_v6(result)
|
|
|
|
def unique_streams_v6(self, result):
|
|
uniq_streams = []
|
|
streams = sorted(result['streams'], key=lambda k: k['viewer_count'])
|
|
result['streams']=[]
|
|
for s in streams:
|
|
if s['user_name'] not in uniq_streams:
|
|
uniq_streams.append(s['user_name'])
|
|
result['streams'].append(s)
|
|
result['_total'] = len(result['streams'])
|
|
return result
|