import * as flatbuffers from 'flatbuffers';

import { ObservableStream, ResolvedPromise } from '../';
import { DeferredPromise } from '../DeferredPromise';
import { FetchUtils } from '../FetchUtils';
import type { ScopedLogger } from '../ScopedLogger';
import { HubMessage } from '../wire/hub-message';
import { HubMessages } from '../wire/hub-messages';
import type { IncomingNotificationsHandler, RemoteMsgHub} from './MsgHub';
import { ConnectionEventType } from './MsgHub';

const ScheduledMessagesLimit = 1000;
const ReconnectionAttemptsLimit = 500;

// const PingIntervalSeconds = 60;
//TODO ping every minute to protect from disconnecting

export class WebSocketHub implements RemoteMsgHub {

    connectionEvents: ObservableStream<ConnectionEventType>;

    logger: ScopedLogger;
    url: string;
    onNotification: IncomingNotificationsHandler;

    _connectionsAttemptsCounter: number = 0;

    private _socket: WebSocket | undefined;

    _connectionPromise: Promise<Event> | null = null;

    private _scheduledMessages: [string, Uint8Array][] = [];
    private _awaitingResponses: Map<string, DeferredPromise<Uint8Array>> = new Map();

    private _sendScheduled = false;
    private _isDisposed: boolean = false;
    private _disposeListeners: ((self: WebSocketHub) => void)[] = [];

    constructor(
        logger: ScopedLogger,
        url: string,
        notificationsHandler?: (msg: Uint8Array) => void
    ) {
        this.logger = logger;
        this.url = url;
        this.connectionEvents = new ObservableStream({identifier: `${url}-connection-events`});
        this.onNotification =
            notificationsHandler ??
            ((_data) => this.logger.info(`websocket msg received, override notification callback and handle these`));
        
        this._setupConnection();
        this.connectionEvents.subscribe({
            settings: {immediateMode: true},
            onNext: (etype) => {
                if (etype === ConnectionEventType.Closed) {
                    this.rejectAwaitingRequests();
                    DeferredPromise.delay(this._connectionsAttemptsCounter * 100).promise.then(() => this._setupConnection());
                }
            }
        })
    }

    dispose() {
        if (this._isDisposed) {
            return;
        }
        this.logger.debug('disposing hub');
        this._isDisposed = true;
        this.clearScheduled()
        this._socket?.close();
        while (this._disposeListeners?.length) {
            const l = this._disposeListeners.shift()!;
            try {
                l(this);
            } catch (e) {
                console.error('error in observable_object dispose listener', e);
            }
        }
    }
        
    withDisposeListener(disposeListener: (self: WebSocketHub) => void) {
        this._disposeListeners.push(disposeListener);
    }

    private _setupConnection() {
        if (this._isDisposed) {
            return;
        }
        if (this._socket != undefined && (this._socket.readyState == WebSocket.OPEN || this._socket.readyState == WebSocket.CONNECTING)) {
            return;
        }
        if (this._connectionsAttemptsCounter > ReconnectionAttemptsLimit) {
            this.logger.error('too many websocket reconnection attempts, stop');
            return;
        }
        this._connectionsAttemptsCounter += 1;

        const fullUrl = new URL(this.url);
        fullUrl.protocol = fullUrl.protocol === "https:" ? "wss" : "ws";
        const socket = new WebSocket(fullUrl);
        
        const defferedOpen = new DeferredPromise<Event>();

        socket.onopen = (event) => {
            defferedOpen.resolve(event);
            this.connectionEvents.pushNext(ConnectionEventType.Open);
        };
        socket.onclose = (_event) => {
            defferedOpen.reject('closed');
            this.connectionEvents.pushNext(ConnectionEventType.Closed);
        };
        socket.onerror = (e) => {
            defferedOpen.reject('closed on error');
            this.logger.error('websocket error', e);
            this.connectionEvents.pushNext(ConnectionEventType.Closed);
        };
        socket.onmessage = (event) => {
            this._handleServerMessage(event);
        };
        if (this._socket) {
            this._socket.close();
        }
        this._socket = socket;
        this._connectionPromise = defferedOpen.promise;
        this._connectionPromise.finally(() => this._connectionPromise = null);
    }

    rejectAwaitingRequests() {
        for (const pr of this._awaitingResponses.values()) {
            pr.reject('websocket closed');
        }
        this._awaitingResponses.clear();
    }


    clearScheduled() {
        this._scheduledMessages.length = 0;
    }


    isNetworkBusy(): boolean {
        return (this._scheduledMessages.length + this._awaitingResponses.size) / ScheduledMessagesLimit > 0.02;
    }

    request(data: Uint8Array): Promise<Uint8Array> {
        if (this._scheduledMessages.length > ScheduledMessagesLimit) {
            return Promise.reject('scheduling queue limit reached');
        }
        const msgGuid = FetchUtils.generateGuid();
        console.assert(this._awaitingResponses.get(msgGuid) == undefined, 'msg guid sanity check');
        const defferedResponse = new DeferredPromise<Uint8Array>(60_000);
        this._awaitingResponses.set(msgGuid, defferedResponse);
        this._scheduledMessages.push([msgGuid, data]);
        this._scheduleSend();
        return defferedResponse.promise;
    }

    private _scheduleSend() {
        if (!this._sendScheduled) {
            ResolvedPromise.then(() => this._sendQueued());
            this._sendScheduled = true;
        }
    }

    private _sendQueued() {
        this._sendScheduled = false;
        if (this._scheduledMessages.length == 0) {
            return;
        }
        this._setupConnection();
        if (this._connectionPromise) {
            this._connectionPromise.then(() => this._scheduleSend());
            return;
        }

        const msgs = this._scheduledMessages;
        let binaryToSend: Uint8Array;
        {
            const fb = new flatbuffers.Builder(msgs.length * 1000);
            const root = HubMessages.createHubMessages(
                fb,
                HubMessages.createMessagesVector(
                    fb,
                    msgs.map(([guid, data]) => HubMessage.createHubMessage(
                        fb,
                        fb.createString(guid),
                        HubMessage.createPayloadVector(fb, data),
                        0
                    ))
                )
            );
            fb.finish(root);
            binaryToSend = fb.asUint8Array();
        }
        this._socket!.send(binaryToSend);
        this._scheduledMessages.length = 0;
    }

    private async _handleServerMessage(event: MessageEvent) {
        const responseArrayBuffer = await new Response(event.data).arrayBuffer()
        const responseBytes = new Uint8Array(responseArrayBuffer);
        const serverMsgs = new Map<string, Uint8Array>();
        {
            const buffer = new flatbuffers.ByteBuffer(new Uint8Array(responseBytes));
            const root = HubMessages.getRootAsHubMessages(buffer);
            for (let i = 0, il = root.messagesLength(); i < il; ++i) {
                const msg = root.messages(i)!;
                serverMsgs.set(msg.guid()!, msg.payloadArray()!);
            }
        }

        // to keep requests responses order, iterate awaiting events
        for (const [guid, responseHandler] of this._awaitingResponses) {
            const serverResponse = serverMsgs.get(guid);
            if (serverResponse) {
                serverMsgs.delete(guid);
                this._awaitingResponses.delete(guid);
                responseHandler.resolve(serverResponse);
            }
        }

        // now, the rest are not responses, but notifications from server
        for (const notificationBinary of serverMsgs.values()) {
            try {
                this.onNotification(notificationBinary);
            } catch (e) {
                this.logger.error('error duing notification handling', e);
            }
        }
    }
}


