Source code for cryptrality.core

# distutils: language=c++


from enum import Enum
from datetime import datetime, timedelta, timezone
from cryptrality.misc import str_to_minutes, candle_close_timestamp
from numpy import float64
from typing import Callable, Dict, Type, Union


[docs]class BaseLogger: """Basic Logger with methods to mimmics a logging class"""
[docs] @staticmethod def info(msg): """Basic logging printing INFO in front of a message""" print("INFO %s" % msg)
[docs] @staticmethod def debug(msg): """Basic logging printing DEBUG in front of a message""" print("DEBUG %s" % msg)
[docs] @staticmethod def warning(msg): """Basic logging printing WARNING in front of a message""" print("WARNING %s" % msg)
[docs] @staticmethod def error(msg): """Basic logging printing ERROR in front of a message""" print("ERROR %s" % msg)
[docs] @staticmethod def critical(msg): """Basic logging printing CRITICAL in front of a message""" print("CRITICAL %s" % msg)
[docs]class RunnerClass: """ This is a singleton pattern """ _instance = None messages = [] current_time = None positions = {} step_size = {} price_precision = {} portfolio = {} current_price = {} historical_klines = {} plot_data = {} plot_config = {} klines = {} schedule = [] trading_pairs = [] next_execution = None state = None handler_worker = None max_plot_data = None loggers = {"main": BaseLogger(), "exec": BaseLogger()} log_path = None def __new__(cls, *args, **kwargs): if cls._instance is None: print("Creating The Singleton Runner") cls._instance = super(RunnerClass, cls).__new__( cls, *args, **kwargs ) # Put any initialization here. return cls._instance def add_data_plot( self, symbol: str, x: Dict[str, Union[float64, float, int]] ) -> None: try: self.plot_data[symbol].append(x) except KeyError: self.plot_data[symbol] = [x] if self.max_plot_data: self.plot_data[symbol] = self.plot_data[symbol][ -self.max_plot_data : ]
[docs] def get_candles(self, start, end): """empty method, each exchange should implement its own version""" print("get klines from %s to %s" % (start, end)) self.klines = []
[docs] def update_executions(self, timestamp: int, now: bool = False) -> None: """ Reading the opening time of the receiving candles, it stores the earlier time in which a candle will close (eg in presence of multiple timeframe) and copute which of the timeframes used in the strategy will close at that given time. NOTE: this approach will prevent execution of the strategy is some missing data is encountered. Skipping an execution will stop the strategy to run """ date = datetime.utcfromtimestamp(timestamp / 1000) next_time = None intervals = [] if now: next_time = date else: for runtime_schedule in self.schedule: delta_minutes = timedelta( minutes=str_to_minutes(runtime_schedule["interval"]) ) if next_time is None: next_time = date + delta_minutes else: other_time = date + delta_minutes if other_time < next_time: next_time = other_time all_minutes = (next_time.hour * 60) + next_time.minute for runtime_schedule in self.schedule: if all_minutes % runtime_schedule["interval_minutes"] == 0: intervals.append(runtime_schedule["interval"]) self.next_execution = { "timestamp": int( next_time.replace(tzinfo=timezone.utc).timestamp() * 1000 ), "intervals": intervals, }
[docs] def check_executions(self, timestamp: int) -> bool: """ Check if the execution of handlers is possible by checking if the timestamp of the next scheduled execution is equal to the timestamp of the candles in the dataset """ if timestamp > self.next_execution["timestamp"]: self.update_executions(timestamp) responses = [] for runtime_schedule in self.schedule: if ( runtime_schedule["interval"] in self.next_execution["intervals"] ): for symbol in runtime_schedule["symbols"]: try: kline_data = self.historical_klines[ runtime_schedule["interval"] ][symbol] responses.append( self.next_execution["timestamp"] == candle_close_timestamp( kline_data["timestamp"][-1], runtime_schedule["interval"], ) ) except KeyError: pass if len(responses) > 0: return all(responses) else: return False
class State: pass
[docs]def makeScheduler() -> Callable: """define interval, symbol and number of cadles available""" handler = [] def schedule(interval, symbols, window_size=500): def handlers(fn): handler.append( { "name": fn.__name__, "fn": fn, "interval": interval, "symbols": symbols, "window_size": window_size, } ) return fn return handlers schedule.all = handler return schedule
scheduler = makeScheduler() class Order: def __init__( self, id: int, symbol: str, order_type: int, side: int, quantity: float, limit_price: None = None, status: int = 1, trigger_side: None = None, ) -> None: self.id = id self.link_id = None self.type = OrderType(order_type) self.symbol = symbol self.side = OrderSide(side) self.quantity = quantity self.filled_quantity = 0 self.status = OrderStatus(status) self.close_position = False self.limit_price = limit_price self.stop_price = None self.created_time = None self.cancel_time = None self.error = None self.fees_asset = None self.fills = None self.leverage = None self.executed_quantity = None self.executed_price = None self.executed_time = None self.fees = 0 if trigger_side is None: if self.side == OrderSide.Buy: self.trigger_side = -1 elif self.side == OrderSide.Sell: self.trigger_side = 1 else: self.trigger_side = trigger_side self.limit_fallback_action = None self.limit_fallback_start_from = "created" self.limit_timer_start = None self.limit_fallback_seconds = None self.fills_when_canceled = False def setup_fallback_action(self, action, start_from, fallback_seconds): self.limit_fallback_action = action self.limit_fallback_start_from = start_from self.limit_fallback_seconds = fallback_seconds def check_limit_timer(self, current_time): if self.limit_fallback_seconds: if self.limit_fallback_action: if self.limit_fallback_start_from == "created": self.limit_timer_start = self.created_time elif self.limit_fallback_start_from == "filled": if self.limit_timer_start is None: if self.status == OrderStatus.PartiallyFilled: self.limit_timer_start = RunnerClass.current_time if self.limit_timer_start: date_start = datetime.utcfromtimestamp( self.limit_timer_start / 1000 ) date_now = datetime.utcfromtimestamp(current_time / 1000) if (date_now - date_start) >= timedelta( seconds=self.limit_fallback_seconds ): return True return False def is_placed(self): return self.status == OrderStatus(1) def is_pending(self): return self.status == OrderStatus(2) def is_partially_filled(self): return self.status == OrderStatus(3) def is_filled(self): return self.status == OrderStatus(4) def is_canceled(self): return self.status == OrderStatus(5) def is_rejected(self): return self.status == OrderStatus(6) def is_error(self): return self.status == OrderStatus(8) def cancel(self, cancel_time=None): self.status = OrderStatus(5) if cancel_time: self.cancel_time = cancel_time @staticmethod def refresh(): print("legacy refresh method, it doesn't do anything") def update_leverage(self, leverage): self.leverage = leverage def update_trigger_mode(self, trigger_with): self.trigger_with = trigger_with def __str__(self): return str(self.__class__) + ": " + str(self.__dict__)
[docs]class PositionStatus(Enum): Created = 1 Open = 2 Close = 3
[docs]class OrderStatus(Enum): Created = 1 Pending = 2 PartiallyFilled = 3 Filled = 4 Canceled = 5 Rejected = 6 Expired = 7 Error = 8 BarrierTouched = 9 StopTriggered = 10
[docs]class OrderSide(Enum): Buy = 0 Sell = 1
[docs]class OrderType(Enum): Market = 0 Limit = 1 IfTouched = 2 StopMarket = 3 StopLimit = 4 MakerLimit = 5
class Position: def __init__(self, symbol: str, quantity: int, price: None) -> None: self.symbol = symbol self.quantity = quantity self.price = price self.exit_price = None self.entry_time = None self.exit_time = None self.pnl = 0 self.status = PositionStatus(1) self.is_closed = None self.is_open = None self.orders = [] def open(self, timestamp: int) -> None: self.status = PositionStatus(2) self.is_closed = False self.is_open = True self.entry_time = timestamp def close(self): self.status = PositionStatus(3) self.is_closed = True self.is_open = False """check the orders if any is left open""" def update(self, quantity: float, timestamp: int, price: float) -> None: self.quantity = quantity self.price = price if self.status == PositionStatus.Created and self.quantity != 0: self.open(timestamp) def add_order(self, order: Order) -> None: self.orders.append(order) def __str__(self): return str(self.__class__) + ": " + str(self.__dict__) def plot(symbol: str, x: Dict[str, Union[float64, float, int]]) -> None: RunnerClass.add_data_plot(RunnerClass, symbol, x) def plot_config(config: Dict[str, Dict[str, str]]) -> None: RunnerClass.plot_config = config