import logging
from queue import Empty, Queue
from threading import Event, Thread
import time
from typing import List
import capnp
import zmq
from omega_client.messaging.common_types import AccountBalancesReport, \
AccountCredentials, AccountInfo, AuthorizationGrant, AuthorizationRefresh, \
CompletedOrdersReport, ExchangePropertiesReport, \
ExecutionReport, OpenPositionsReport, Order, OrderInfo, \
OrderType, RequestHeader, TimeInForce, WorkingOrdersReport
from omega_client.messaging.message_factory import cancel_all_orders_capnp, \
cancel_order_capnp, heartbeat_capnp, logoff_capnp, logon_capnp, \
place_order_capnp, replace_order_capnp, request_account_balances_capnp, \
request_account_data_capnp, request_auth_refresh_capnp, \
request_completed_orders_capnp, request_exchange_properties_capnp, \
request_open_positions_capnp, request_order_status_capnp, \
request_server_time_capnp, request_working_orders_capnp
logger = logging.getLogger(__name__)
# TODO: Remove return types after adding easy conversion and access to
# message body for debugging and testing
[docs]class RequestSender(Thread):
"""
Runs as an individual thread to send requests to TesConnection,
which then gets routed to Omega. The motivation of the design is different
threads should not share zmq sockets, and that the TesConnection event
loop should not be blocked.
When a request is "sent" from this class, it is placed into an internal
thread-safe queue. The request sender loop checks if the queue has a
message, and if there is one, sends it to TesConnection through an inproc
connection.
Attributes:
_ZMQ_CONTEXT: (zmq.Context) Required to create sockets. It is
recommended that one application use one shared zmq context for all
sockets.
_ZMQ_ENDPOINT: (str)
The zmq endpoint to connect to.
_QUEUE_POLLING_TIMEOUT_SECONDS: (int)
The polling timeout for the internal queue.
_outgoing_message_queue: (Queue) Internal message queue for outgoing
Omega Messages.
_is_running: (Event) Event object that indicates on/ off behavior for
the response handler loop.
"""
def __init__(self,
zmq_context: zmq.Context,
zmq_endpoint: str,
outgoing_message_queue: Queue = None,
queue_polling_timeout_seconds: int = 1,
name: str='TesRequestSender'):
assert zmq_context
assert zmq_endpoint
self._ZMQ_CONTEXT = zmq_context
self._ZMQ_ENDPOINT = zmq_endpoint
self._QUEUE_POLLING_TIMEOUT_SECONDS = queue_polling_timeout_seconds
self._outgoing_message_queue = outgoing_message_queue or Queue()
self._is_running = Event()
super().__init__(name=name)
def _queue_message(self, omega_message_capnp: capnp._DynamicStructBuilder):
"""
Put a capnp message into the internal queue for sending to
TesConnection.
:param omega_message_capnp:
"""
self._outgoing_message_queue.put(omega_message_capnp)
[docs] def cleanup(self):
"""
Stop the response receiver gracefully and join the thread.
"""
self.stop()
self.join()
[docs] def stop(self):
"""
Clear the _is_running Event, which terminates the request sender loop.
"""
self._is_running.clear()
[docs] def is_running(self):
"""
Return True if the thread is running, False otherwise.
"""
return self._is_running.is_set()
[docs] def run(self):
"""
Message sending loop.
Create the request_socket as a zmq.DEALER socket and then connect to
the provided _ZMQ_ENDPOINT.
Try to get a message for _QUEUE_POLLING_TIMEOUT_SECONDS and then send
it out to TesConnection.
"""
request_socket = self._ZMQ_CONTEXT.socket(zmq.DEALER)
request_socket.connect(self._ZMQ_ENDPOINT)
self._is_running.set()
while self._is_running.is_set():
try:
# Block for 1 second
capnp_request = self._outgoing_message_queue.get(
timeout=self._QUEUE_POLLING_TIMEOUT_SECONDS)
request_socket.send(capnp_request.to_bytes())
except Empty:
continue
time.sleep(2.)
request_socket.close()
###########################################################################
# #
# ~~~~~~~~~~~~~~~~~~~~~~~~ Outgoing OmegaMessages ~~~~~~~~~~~~~~~~~~~~~~~ #
# ---------------- Public Methods to be called by client----------------- #
# #
###########################################################################
[docs] def logon(self,
request_header: RequestHeader,
client_secret: str,
credentials: List[AccountCredentials]):
"""
Logon to Omega for a specific client_id and set of credentials.
:param request_header: Header parameter object for requests.
:param client_secret: (str) client_secret key assigned by Fund3.
:param credentials: (List[AccountCredentials]) List of exchange
credentials in the form of AccountCredentials.
:return: (capnp._DynamicStructBuilder) Logon capnp object.
"""
omega_message, logon = logon_capnp(
request_header=request_header,
client_secret=client_secret,
credentials=credentials
)
self._queue_message(omega_message)
return logon
[docs] def logoff(self, request_header: RequestHeader):
"""
Logoff Omega for a specific client_id.
:param request_header: Header parameter object for requests.
:return: (capnp._DynamicStructBuilder) Logoff capnp object.
"""
omega_message, body = logoff_capnp(request_header=request_header)
self._queue_message(omega_message)
return body
[docs] def send_heartbeat(self, request_header: RequestHeader):
"""
Sends a heartbeat to Omega for maintaining and verifying connection.
Only clients that are logged on will receive heartbeat back from Omega.
:param request_header: Header parameter object for requests.
:return: (capnp._DynamicStructBuilder) heartbeat capnp object.
"""
omega_message, body = heartbeat_capnp(request_header=request_header)
self._queue_message(omega_message)
return body
[docs] def request_server_time(self, request_header: RequestHeader):
"""
Request Omega server time for syncing client and server timestamps.
:param request_header: Header parameter object for requests.
:return: (capnp._DynamicStructBuilder) heartbeat capnp object.
"""
omega_message, body = request_server_time_capnp(
request_header=request_header)
self._queue_message(omega_message)
return body
[docs] def place_order(self, request_header: RequestHeader, order: Order):
"""
Sends a request to Omega to place an order.
:param request_header: Header parameter object for requests.
:param order: (Order) Python object containing all required fields.
:return: (capnp._DynamicStructBuilder) place_order capnp object.
"""
omega_message, place_order = place_order_capnp(
request_header=request_header, order=order)
self._queue_message(omega_message)
return place_order
[docs] def replace_order(self,
request_header: RequestHeader,
account_info: AccountInfo,
order_id: str,
# pylint: disable=E1101
order_type: str = OrderType.undefined.name,
quantity: float = 0.0,
price: float = 0.0,
stop_price: float = 0.0,
time_in_force: str = TimeInForce.gtc.name,
# pylint: enable=E1101
expire_at: float = 0.0):
"""
Sends a request to Omega to replace an order.
:param request_header: Header parameter object for requests.
:param account_info: (AccountInfo) Account on which to cancel order.
:param order_id: (str) order_id as returned from the ExecutionReport.
:param order_type: (OrderType) (optional)
:param quantity: (float) (optional)
:param price: (float) (optional)
:param stop_price: (float) (optional)
:param time_in_force: (TimeInForce) (optional)
:param expire_at: (float) (optional)
:return: (capnp._DynamicStructBuilder) replaceOrder capnp object.
"""
omega_message, replace_order = replace_order_capnp(
request_header=request_header,
account_info=account_info,
order_id=order_id,
order_type=order_type,
quantity=quantity,
price=price,
stop_price=stop_price,
time_in_force=time_in_force,
expire_at=expire_at
)
self._queue_message(omega_message)
return replace_order
[docs] def cancel_order(self,
request_header: RequestHeader,
account_info: AccountInfo,
order_id: str):
"""
Sends a request to Omega to cancel an order.
:param request_header: Header parameter object for requests.
:param account_info: (AccountInfo) Account on which to cancel order.
:param order_id: (str) order_id as returned from the ExecutionReport.
:return: (capnp._DynamicStructBuilder) cancel_order object.
"""
omega_message, cancel_order = cancel_order_capnp(
request_header=request_header,
account_info=account_info,
order_id=order_id
)
self._queue_message(omega_message)
return cancel_order
[docs] def cancel_all_orders(self,
request_header: RequestHeader,
account_info: AccountInfo,
symbol: str = None,
side: str = None):
"""
Sends a request to Omega to cancel all orders. Optionally including
side and/or symbol
:param request_header: Header parameter object for requests.
:param account_info: (AccountInfo) Account on which to cancel order.
:param symbol: str (optional)
:param side: str (optional)
:return: (capnp._DynamicStructBuilder) cancel_all_orders object.
"""
omega_message, cancel_all_orders = cancel_all_orders_capnp(
request_header=request_header,
account_info=account_info,
symbol=symbol,
side=side)
logger.debug('Cancelling All Orders.', extra={'symbol': symbol,
'side': side})
self._queue_message(omega_message)
return cancel_all_orders
[docs] def request_account_data(self,
request_header: RequestHeader,
account_info: AccountInfo):
"""
Sends a request to Omega for full account snapshot including balances,
open positions, and working orders on specified account.
:param request_header: Header parameter object for requests.
:param account_info: (AccountInfo) Account from which to retrieve data.
:return: (capnp._DynamicStructBuilder) get_account_data capnp object.
"""
omega_message, get_account_data = request_account_data_capnp(
request_header=request_header, account_info=account_info)
self._queue_message(omega_message)
return get_account_data
[docs] def request_open_positions(self,
request_header: RequestHeader,
account_info: AccountInfo):
"""
Sends a request to Omega for open positions on an Account.
:param request_header: Header parameter object for requests.
:param account_info: (AccountInfo) Account from which to retrieve data.
:return: (capnp._DynamicStructBuilder) get_open_positions capnp
object.
"""
omega_message, get_open_positions = request_open_positions_capnp(
request_header=request_header, account_info=account_info)
self._queue_message(omega_message)
return get_open_positions
[docs] def request_account_balances(self,
request_header: RequestHeader,
account_info: AccountInfo):
"""
Sends a request to Omega for full account balances snapshot on an
Account.
:param request_header: Header parameter object for requests.
:param account_info: (AccountInfo) Account from which to retrieve data.
:return: (capnp._DynamicStructBuilder) get_account_balances capnp
object.
"""
omega_message, get_account_balances = request_account_balances_capnp(
request_header=request_header, account_info=account_info)
self._queue_message(omega_message)
return get_account_balances
[docs] def request_working_orders(self,
request_header: RequestHeader,
account_info: AccountInfo):
"""
Sends a request to Omega for all working orders snapshot on an
Account.
:param request_header: Header parameter object for requests.
:param account_info: (AccountInfo) Account from which to retrieve data.
:return: (capnp._DynamicStructBuilder) get_working_orders capnp object.
"""
omega_message, get_working_orders = request_working_orders_capnp(
request_header=request_header, account_info=account_info)
self._queue_message(omega_message)
return get_working_orders
[docs] def request_order_status(self,
request_header: RequestHeader,
account_info: AccountInfo,
order_id: str):
"""
Sends a request to Omega to request status of a specific order.
:param request_header: Header parameter object for requests.
:param account_info: (AccountInfo) Account from which to retrieve data.
:param order_id: (str) The id of the order of interest.
:return: (capnp._DynamicStructBuilder) get_order_status capnp object.
"""
omega_message, get_order_status = request_order_status_capnp(
request_header=request_header,
account_info=account_info,
order_id=order_id
)
self._queue_message(omega_message)
return get_order_status
[docs] def request_completed_orders(self,
request_header: RequestHeader,
account_info: AccountInfo,
count: int = None,
since: float = None):
"""
Sends a request to Omega for all completed orders on specified
account. If both 'count' and 'from_unix' are None, returns orders
for last 24h.
:param request_header: Header parameter object for requests.
:param account_info: (AccountInfo) Account from which to retrieve data.
:param count: (int) optional, number of returned orders (most recent
ones).
:param since: (float) optional, returns all orders from provided unix timestamp to present.
:return: (capnp._DynamicStructBuilder) get_completed_orders capnp
object.
"""
omega_message, get_completed_orders = request_completed_orders_capnp(
request_header=request_header,
account_info=account_info,
count=count,
since=since
)
self._queue_message(omega_message)
return get_completed_orders
[docs] def request_exchange_properties(self,
request_header: RequestHeader,
exchange: str):
"""
Sends a request to Omega for supported currencies, symbols and their
associated properties, timeInForces, and orderTypes on an exchange.
:param request_header: Header parameter object for requests.
:param exchange: (str) The exchange of interest.
:return: (capnp._DynamicStructBuilder) get_exchange_properties capnp
object.
"""
omega_message, get_exchange_properties = (
request_exchange_properties_capnp(
request_header=request_header, exchange=exchange)
)
self._queue_message(omega_message)
return get_exchange_properties
[docs] def request_authorization_refresh(self,
request_header: RequestHeader,
auth_refresh: AuthorizationRefresh):
"""
Sends a request to Omega to refresh the session
:param request_header: Header parameter object for requests.
:param auth_refresh: AuthorizationRefresh python object
:return: (capnp._DynamicStructBuilder) authorization_refresh capnp
object.
"""
omega_message, authorization_refresh = (
request_auth_refresh_capnp(
request_header=request_header, auth_refresh=auth_refresh)
)
self._queue_message(omega_message)
return authorization_refresh