fleast/twitch.py

168 lines
6.6 KiB
Python

import cherrypy
import requests
import threading
import time
from oauthlib.oauth2 import BackendApplicationClient
from requests_oauthlib import OAuth2Session
from urllib.parse import quote
class TwitchClient:
def __init__(self, token, secret, freq=2):
self.token = token
self.lock = threading.Lock()
self.header_v6 = {'Client-ID': self.token}
self.urlbase_v6 = 'https://api.twitch.tv/helix'
self.last_q = time.time()
self.delay = 1 / freq
# authentication
self.oauth = ''
self.oauth_token_url = 'https://id.twitch.tv/oauth2/token'
self.auth_secret = secret
oauth_clint = BackendApplicationClient(client_id=self.token)
self.oauth_session = OAuth2Session(client=oauth_clint)
self.update_oauth()
def update_oauth(self):
"""
Update self.oauth token based on client id and secret.
:return: nothing
"""
token = self.oauth_session.fetch_token(token_url=self.oauth_token_url,
client_secret=self.auth_secret,
include_client_id=True)
self.oauth = token['access_token']
def do_q_auth_v6(self, base, header):
"""
Do query with v6 authentication header and single retry.
:param base: string with requesting URL
:param header: dictionary of http headers
:return: string with response or None
"""
result = self.do_q(base, header | {'Authorization': 'Bearer ' + self.oauth})
if result is not None:
return result
self.update_oauth()
return self.do_q(base, header | {'Authorization': 'Bearer ' + self.oauth})
def do_q(self, base, header):
"""
Do query for twitch server
:param base: string with requesting URL
:param header: dictionary of http headers
:return: string with response or None
"""
self.lock.acquire() # Lock for 1 at time query
try:
cherrypy.log('Request: %s' % base)
delta = time.time() - self.last_q # Delta for correct query freq
if delta < self.delay:
time.sleep(delta) # Sleep remaining time
r = requests.get(base, headers=header).json()
error_message = r.get("error", "")
if len(error_message) > 0:
cherrypy.log('Request: fail with error "%s"' % error_message)
r = None
else:
cherrypy.log('Request: OK')
self.last_q = time.time()
except requests.exceptions.RequestException as e:
cherrypy.log('Request: FAIL')
cherrypy.log('Error: {}'.format(e))
r = None
finally:
self.lock.release() # Do not forget to release lock
return r
def get_base(self, ver):
"""
Get base which is depended on API version
:param ver: string with API version ('v5' or 'v6')
:return: tuple with list of headers and URL string
:raises: value error on incorrect API version
"""
if ver == 'v6':
return self.header_v6, self.urlbase_v6
else:
raise ValueError('Not supported API version')
# - # - #
def raw_query_v6(self, q):
"""
Do a query with API v6
:param q: query string
:return: string with get query result or None
"""
header, base = self.get_base('v6')
return self.do_q_auth_v6(base + q, header)
def get_game_id_v6(self, name):
"""
Getting game id with API v6
:param name: string with the name of game
:return: tuple of integer with game id and string with game name or (None,None)
"""
header, base = self.get_base('v6')
r = self.do_q_auth_v6('{}/games?name={}'.format(base, name), header)
if r and r.get('data'):
return r['data'][0]['id'], r['data'][0]['name']
def get_live_streams_v6(self, name, lang):
"""
Getting list of livestreams with API v5
:param name: string with the name of game
:param lang: string with the shortcut of language
:return: list of all streams which are live with this format -
https://dev.twitch.tv/docs/v5/reference/search/#search-streams
"""
result = {'_total': 0, 'streams': []}
game_id = self.get_game_id_v6(quote(name))
if game_id is None:
return result
header, base = self.get_base('v6')
init_q_template = "{}/streams?language={}&first={}&game_id={}"
q_template = "{}/streams?language={}&first={}&after={}&game_id={}"
data = self.do_q_auth_v6(init_q_template.format(base, lang, 100, game_id[0]), header)
result['streams'].extend(data['data'])
while len(data.get('data', [])) > 0: # there must be non zero value, but search is kinda broken now
result['streams'].extend(data['data'])
data = self.do_q_auth_v6(q_template.format(base, lang, 100, data['pagination']['cursor'], game_id[0]), header)
return self.unique_streams_v6(result)
def get_irl_live_streams_v6(self, lang):
header, base = self.get_base('v6')
init_q_template = "{}/streams?language={}&first={}{}"
q_template = "{}/streams?language={}&first={}&after={}{}"
game_id = ''
irl_ids = ["509660", "509673", "509667", "509669", "509670", "509658",
"509672", "509671", "509664", "509663", "417752", "509659"]
for irl_id in irl_ids:
game_id += '&game_id={}'.format(irl_id)
result = {'_total': 0, 'streams': []}
data = self.do_q_auth_v6(init_q_template.format(base, lang, 100, game_id), header)
result['streams'].extend(data['data'])
while len(data.get('data', [])) > 0: # there must be non zero value, but search is kinda broken now
result['streams'].extend(data['data'])
data = self.do_q_auth_v6(q_template.format(base, lang, 100, data['pagination']['cursor'], game_id), header)
return self.unique_streams_v6(result)
def unique_streams_v6(self, result):
uniq_streams = []
streams = sorted(result['streams'], key=lambda k: k['viewer_count'])
result['streams']=[]
for s in streams:
if s['user_name'] not in uniq_streams:
uniq_streams.append(s['user_name'])
result['streams'].append(s)
result['_total'] = len(result['streams'])
return result