"use strict";

var __awaiter = this && this.__awaiter || function (thisArg, _arguments, P, generator) {
  function adopt(value) {
    return value instanceof P ? value : new P(function (resolve) {
      resolve(value);
    });
  }

  return new (P || (P = Promise))(function (resolve, reject) {
    function fulfilled(value) {
      try {
        step(generator.next(value));
      } catch (e) {
        reject(e);
      }
    }

    function rejected(value) {
      try {
        step(generator["throw"](value));
      } catch (e) {
        reject(e);
      }
    }

    function step(result) {
      result.done ? resolve(result.value) : adopt(result.value).then(fulfilled, rejected);
    }

    step((generator = generator.apply(thisArg, _arguments || [])).next());
  });
};

var __generator = this && this.__generator || function (thisArg, body) {
  var _ = {
    label: 0,
    sent: function sent() {
      if (t[0] & 1) throw t[1];
      return t[1];
    },
    trys: [],
    ops: []
  },
      f,
      y,
      t,
      g;
  return g = {
    next: verb(0),
    "throw": verb(1),
    "return": verb(2)
  }, typeof Symbol === "function" && (g[Symbol.iterator] = function () {
    return this;
  }), g;

  function verb(n) {
    return function (v) {
      return step([n, v]);
    };
  }

  function step(op) {
    if (f) throw new TypeError("Generator is already executing.");

    while (_) {
      try {
        if (f = 1, y && (t = op[0] & 2 ? y["return"] : op[0] ? y["throw"] || ((t = y["return"]) && t.call(y), 0) : y.next) && !(t = t.call(y, op[1])).done) return t;
        if (y = 0, t) op = [op[0] & 2, t.value];

        switch (op[0]) {
          case 0:
          case 1:
            t = op;
            break;

          case 4:
            _.label++;
            return {
              value: op[1],
              done: false
            };

          case 5:
            _.label++;
            y = op[1];
            op = [0];
            continue;

          case 7:
            op = _.ops.pop();

            _.trys.pop();

            continue;

          default:
            if (!(t = _.trys, t = t.length > 0 && t[t.length - 1]) && (op[0] === 6 || op[0] === 2)) {
              _ = 0;
              continue;
            }

            if (op[0] === 3 && (!t || op[1] > t[0] && op[1] < t[3])) {
              _.label = op[1];
              break;
            }

            if (op[0] === 6 && _.label < t[1]) {
              _.label = t[1];
              t = op;
              break;
            }

            if (t && _.label < t[2]) {
              _.label = t[2];

              _.ops.push(op);

              break;
            }

            if (t[2]) _.ops.pop();

            _.trys.pop();

            continue;
        }

        op = body.call(thisArg, _);
      } catch (e) {
        op = [6, e];
        y = 0;
      } finally {
        f = t = 0;
      }
    }

    if (op[0] & 5) throw op[1];
    return {
      value: op[0] ? op[1] : void 0,
      done: true
    };
  }
};

var _a;

Object.defineProperty(exports, "__esModule", {
  value: true
});
exports.unsubscribe = exports.subscribe = exports.connectStreaming = exports.streamedDataStore = exports.DISABLE_STREAMING = void 0;

var utils_1 = require("./utils");

var atom_types_1 = require("@atom-finance/atom-types");

var riverWebServerURL = (_a = process.env.REACT_APP_RIVER_URL) !== null && _a !== void 0 ? _a : 'wss://staging-river-server.atom.finance';
exports.DISABLE_STREAMING = !!process.env.REACT_APP_DISABLE_STREAMING || !!localStorage.getItem("disableStreaming");
exports.streamedDataStore = new Map();
var socket;
var SocketState;

(function (SocketState) {
  SocketState[SocketState["CONNECTING"] = 0] = "CONNECTING";
  SocketState[SocketState["OPEN"] = 1] = "OPEN";
  SocketState[SocketState["CLOSING"] = 2] = "CLOSING";
  SocketState[SocketState["CLOSED"] = 3] = "CLOSED";
})(SocketState || (SocketState = {}));

var connectStreaming = function connectStreaming(nthTry) {
  if (nthTry === void 0) {
    nthTry = 1;
  }

  if (exports.DISABLE_STREAMING) {
    return;
  }

  console.debug("River: Connecting to river. (Try # " + nthTry + ")");
  socket = new WebSocket(riverWebServerURL);
  var hasOpened = false;
  var hasReceivedPong = false;

  socket.onmessage = function (msg) {
    try {
      var data = JSON.parse(msg.data);

      if (data.messageType === atom_types_1.RiverServerMessageType.Pong) {
        hasReceivedPong = true;
        return;
      }

      if (data.messageType === atom_types_1.RiverServerMessageType.Trades) {
        var _loop_1 = function _loop_1(element) {
          var savedSymbol = exports.streamedDataStore.get(element.symbol);

          if (savedSymbol) {
            savedSymbol.updateComponentFunctions.forEach(function (x) {
              return x(element);
            });
            savedSymbol.streamedData = element;
          }
        };

        for (var _i = 0, _a = data.trades; _i < _a.length; _i++) {
          var element = _a[_i];

          _loop_1(element);
        }
      }
    } catch (e) {
      console.error(e);
    }
  };

  socket.onopen = function () {
    hasOpened = true;

    try {
      var tickers = Array.from(exports.streamedDataStore.keys());

      if (!tickers.length) {
        return;
      }

      console.debug("River: sending subscriptions on socket open. Number of subscriptions: ", tickers.length);
      socket === null || socket === void 0 ? void 0 : socket.send(JSON.stringify({
        messageType: atom_types_1.RiverClientMessageType.Sub,
        tickers: tickers
      }));
    } catch (e) {
      console.error(e);
    }
  };

  socket.onerror = function (error) {
    console.error("River: Error from Websocket", error);
  };

  var checkForPong = function checkForPong() {
    return __awaiter(void 0, void 0, void 0, function () {
      var _a;

      return __generator(this, function (_b) {
        switch (_b.label) {
          case 0:
            if (!socket) {
              console.debug("River: Undefined socket in checkPong!");
              return [2
              /*return*/
              ];
            }

            _a = socket.readyState;

            switch (_a) {
              case SocketState.CLOSED:
                return [3
                /*break*/
                , 1];

              case SocketState.CLOSING:
                return [3
                /*break*/
                , 1];

              case SocketState.CONNECTING:
                return [3
                /*break*/
                , 2];

              case SocketState.OPEN:
                return [3
                /*break*/
                , 4];
            }

            return [3
            /*break*/
            , 6];

          case 1:
            console.debug("River: Connection closed. Starting a new connection.");
            (0, exports.connectStreaming)(hasOpened ? 1 : nthTry + 1);
            return [2
            /*return*/
            ];

          case 2:
            console.debug('River: Connecting, waiting to check for pong.');
            return [4
            /*yield*/
            , (0, utils_1.sleep)(Math.min(3000 * nthTry, 10000))];

          case 3:
            _b.sent();

            checkForPong();
            return [2
            /*return*/
            ];

          case 4:
            hasReceivedPong = false;
            socket.send(JSON.stringify({
              messageType: 'Ping'
            }));
            return [4
            /*yield*/
            , (0, utils_1.sleep)(3000)];

          case 5:
            _b.sent();

            if (hasReceivedPong) {
              checkForPong();
              return [2
              /*return*/
              ];
            }

            console.debug("River: Ping failed. Starting a new connection.");
            socket.close();
            (0, exports.connectStreaming)();
            return [2
            /*return*/
            ];

          case 6:
            console.debug("River: Unhandled socket ready state. (" + socket.readyState + ")");
            checkForPong();
            _b.label = 7;

          case 7:
            return [2
            /*return*/
            ];
        }
      });
    });
  };

  checkForPong();
  return socket;
};

exports.connectStreaming = connectStreaming;

var sendIfOpen = function sendIfOpen(msg) {
  if (!socket) {
    return;
  } // Make sure tickers not empty


  if (msg.messageType !== atom_types_1.RiverClientMessageType.Ping && !msg.tickers.length) {
    console.debug("River: ignoring message because empty tickers array.");
    return;
  } // Make sure socket open


  if (socket.readyState !== SocketState.OPEN) {
    console.debug("River: ignoring message because socket is not currently open");
    return;
  }

  socket.send(JSON.stringify(msg));
};

var SUB_BATCH_TIME = 50; // 50ms sub batches

var UNSUB_BATCH_TIME = 1000; // 1s unsub batches (don't need to be fast to unsub)

var subBatch = new Set(); // batch of tickers in queue to subscribe

var unSubBatch = new Set(); // batch of tickers in queue to unsubscribe

var subscribe = function subscribe(symbol, action) {
  // if we have an unsub opp in queue for a symbol we should remove it from the unSubBatch since we want to subscribe
  if (unSubBatch.has(symbol)) unSubBatch.delete(symbol);
  var savedSymbol = exports.streamedDataStore.get(symbol);

  if (savedSymbol) {
    savedSymbol.updateComponentFunctions.push(action);
    return;
  }

  if (!subBatch.size) {
    setTimeout(function () {
      if (!subBatch.size) return;
      var tickers = Array.from(subBatch);
      subBatch.clear();
      sendIfOpen({
        messageType: atom_types_1.RiverClientMessageType.Sub,
        tickers: tickers
      });
    }, SUB_BATCH_TIME);
  }

  subBatch.add(symbol);
  console.debug("River: adding to streamed data store", symbol);
  exports.streamedDataStore.set(symbol, {
    streamedData: null,
    updateComponentFunctions: [action]
  });
};

exports.subscribe = subscribe;

var unsubscribe = function unsubscribe(symbol, action) {
  // if we have a sub opp in queue for a symbol we should remove it from the subBatch since we want to unsubscribe
  if (subBatch.has(symbol)) subBatch.delete(symbol);
  var savedSymbol = exports.streamedDataStore.get(symbol);

  if (!savedSymbol) {
    return;
  }

  savedSymbol.updateComponentFunctions = savedSymbol.updateComponentFunctions.filter(function (x) {
    return x !== action;
  });

  if (!unSubBatch.size) {
    setTimeout(function () {
      var tickers = Array.from(unSubBatch); // Only unsub tickers if not contain update functions

      for (var _i = 0, tickers_1 = tickers; _i < tickers_1.length; _i++) {
        var ticker = tickers_1[_i];
        var savedTicker = exports.streamedDataStore.get(ticker);

        if (!(savedTicker === null || savedTicker === void 0 ? void 0 : savedTicker.updateComponentFunctions.length)) {
          exports.streamedDataStore.delete(ticker);
        } else {
          // Do not unsub, another component is still using this streaming data.
          console.debug("River: Symbol still in use by other component", ticker);
          unSubBatch.delete(ticker);
        }
      }

      if (!unSubBatch.size) return;
      var unSubTickers = Array.from(unSubBatch);
      console.debug("River: removing from streamed data store", unSubTickers);
      unSubBatch.clear();
      sendIfOpen({
        messageType: atom_types_1.RiverClientMessageType.Unsub,
        tickers: unSubTickers
      });
    }, UNSUB_BATCH_TIME);
  }

  unSubBatch.add(symbol);
};

exports.unsubscribe = unsubscribe;