import logging
import time
from datetime import datetime as dt
from threading import Thread, Event
from omega_client.communication.omega_connection import OmegaConnection
from omega_client.messaging.common_types import RequestHeader, \
AuthorizationRefresh, AuthorizationGrant
logger = logging.getLogger(__name__)
[docs]class SingleClientSessionRefresher(Thread):
def __init__(self,
request_sender: OmegaConnection,
client_id: int,
sender_comp_id: str):
"""
Refreshes the session of a single client (1 client_id).
:param request_sender: RequestSender class used for sending messages
to Omega
:param client_id: int client_id
:param sender_comp_id: str
"""
super().__init__()
self.access_token = None
self.refresh_token = None
self.token_expire_time = None
self.request_sender = request_sender
self.client_id = client_id
self.sender_comp_id = sender_comp_id
self.request_id = 0
self.waiting_for_new_token = Event()
self._is_running = Event()
[docs] def is_running(self):
"""
Return True if the thread is running, False otherwise.
"""
return self._is_running.is_set()
[docs] def run(self):
"""
Threaded implementation of automatic session refresh main loop
for a single client.
"""
self._is_running.set()
while self.is_running():
# sleep until 100 seconds before the token expires
time_until_session_refresh = (self.token_expire_time -
dt.utcnow().timestamp() - 100.)
logger.info('SessionRefresher sleeping {} seconds'.format(
time_until_session_refresh))
time.sleep(time_until_session_refresh)
self.request_id += 1
self.waiting_for_new_token.clear()
while not self.waiting_for_new_token.is_set():
# send the authorization refresh request to Omega
self.request_sender.request_authorization_refresh(
request_header=RequestHeader(
client_id=self.client_id,
sender_comp_id=self.sender_comp_id,
access_token=self.access_token,
request_id=self.request_id
),
auth_refresh=AuthorizationRefresh(
refresh_token=self.refresh_token)
)
self.waiting_for_new_token.wait(timeout=1)
logger.info('waited for 1 seconds for new token')
return
[docs] def stop(self):
"""
Clear the _is_running Event, which terminates the refresh loop.
"""
self._is_running.clear()
[docs] def update_token(self, auth_grant: AuthorizationGrant):
if auth_grant.success:
self.access_token = str(auth_grant.access_token)
self.refresh_token = str(auth_grant.refresh_token)
self.token_expire_time = float(auth_grant.expire_at)
self.request_sender.update_access_token(self.access_token)
self.waiting_for_new_token.set()
logger.info('SessionRefresher successfully updated access token')
return True
logger.info('SessionRefresher failed to successfully update access '
'token. Stopping.')
self.stop()
return False