import { Builder, ByteBuffer } from 'flatbuffers';

import { DeferredPromise } from '../DeferredPromise';
import { ErrorUtils } from '../ErrorUtils';
import type { ObjectSerializer } from '../ObjectSerializer';
import { ObjectUtils } from '../ObjectUtils';
import type { ScopedLogger } from '../ScopedLogger';
import { ErrorMsg } from '../wire/error-msg';
import { MsgType } from '../wire/msg-type';
import { ObjectState } from '../wire/object-state';
import { ObjectStateQuery } from '../wire/object-state-query';
import { ObjectUpdate } from '../wire/object-update';
import { SyncMessage } from '../wire/sync-message';
import { UpdateAccepted } from '../wire/update-accepted';
import type { EventStackFrame } from './EventsStackFrame';
import type { RemoteMsgHub } from './MsgHub';
import { ConnectionEventType } from './MsgHub';
import type { ObservableObject } from './ObservableObject';
import { requestExecutionFrame } from '../ExecutionFrame';

export const SyncEventIdents = Object.freeze({
    Prefix:         'synchub',
    InitialQuery:   'synchub_initial_query',
    Rollback:       'synchub_rollback',
    Notification:   'synchub_notification',
});

export class RemoteSyncHub {
    readonly _logger: ScopedLogger;
    
    connection: RemoteMsgHub;

    private _perIdentObservables: Map<string, ObservableObject<any>> = new Map();
    private _knownVersions: Map<string, {hash: string, value: any}> = new Map();
	private _reqAnimCallback: (t: number) => void;
    private _isDisposed: boolean = false;

    // allow 1 throttled update at a time
    _throttleQueued: {
        obs: ObservableObject<any>,
        promise: DeferredPromise<void>,
        addedAtTime: number,
    } | null = null;

    constructor(
        logger: ScopedLogger,
        hub: RemoteMsgHub,
    ) {
        this._logger = logger;
        this.connection = hub;
        
        this.connection.onNotification = (rawMsg) => {
            const bb = new ByteBuffer(rawMsg);
            const wireMsg = SyncMessage.getRootAsSyncMessage(bb);
            const msg = wireMsg.payload(new ObjectState())!;
            const o = this._perIdentObservables.get(msg.identifier()!);
            if (!o) {
                logger.debug(`unregistered synced object ${msg.identifier()}, ignore update`);
                return;
            }
			const response = deserializeResponse(rawMsg, o.serializer());
            if (!(response instanceof WireSyncedObjectState)) {
                this._logger.error(
                    `unexpeсted notification msg type ${wireMsg.payloadType()} - ${MsgType[wireMsg.payloadType()]}`,
					response
				);
                return;
            }
            const value = response.object;
            if (value !== undefined) {
                this._notifyConfirmed(o.identifier, msg.hash()!, response.object);
            }
        }
        this.connection.connectionEvents.subscribe({
            settings: {immediateMode: true},
            onNext: (evType: ConnectionEventType) => {
                if (evType === ConnectionEventType.Open) {
                    for (const ident of this._perIdentObservables.keys()) {
                        this._query(ident);
                    }
                }
            }
        })
		this._reqAnimCallback = (t: number) => this._loop(t);
		requestExecutionFrame(this._reqAnimCallback);
    }

    dispose() {
        if (this._isDisposed) {
            return;
        }
        this._isDisposed = true;
        for (const obj of this._perIdentObservables.values()) {
            obj.detachFromSyncHub();
        }
        this.connection.dispose();
    }

    _loop(t: number) {
        if (this._isDisposed) {
            return;
        }
        requestExecutionFrame(this._reqAnimCallback);
        this._handleQueuedThrottled(false);
    }

    _saveKnown(identifier: string, value: Object, hash: string) {
        this._knownVersions.set(identifier, { hash, value: ObjectUtils.deepCloneObj(value) });
    }

    _knownHash(identifier: string): string {
        return this._knownVersions.get(identifier)?.hash ?? "";
    }

    async startSyncing<T extends Object>(observable: ObservableObject<T>): Promise<void> {
        if (this._perIdentObservables.has(observable.identifier)) {
            this._logger.error(`${observable.identifier} is already registered in this hub, replacing it`);
            this._perIdentObservables.get(observable.identifier)!.detachFromSyncHub();
        }
        this._logger.debug(`start syncing ${observable.identifier}`);
        const [hash, value] = await this._queryHashValue(observable.identifier, "", observable.serializer());
        if (value == null) {
            throw new Error(`${observable.identifier} initial query returned 0`);
        }
        this._saveKnown(observable.identifier, value, hash);
        this._perIdentObservables.set(observable.identifier, observable);
        applyConfirmedState(observable, value, {identifier: SyncEventIdents.InitialQuery});
    }

    removeFromSync<T extends Object>(observable: ObservableObject<T>) {
        const known = this._perIdentObservables.get(observable.identifier);
        if (known === observable) {
            this._knownVersions.delete(observable.identifier);
            this._perIdentObservables.delete(observable.identifier);
        } else {
            this._logger.warn(`tried to detach unresigstered object ${observable.identifier}`);
        }
    }

    _handleQueuedThrottled(forcePush: boolean): Promise<void> | undefined {
        if (!this._throttleQueued) {
            return undefined;
        }
        const t = this._throttleQueued;
        const now = performance.now();
        const isNetworkFree = !this.connection.isNetworkBusy();
        if (forcePush
            || isNetworkFree
            || now - t.addedAtTime > t.obs.throttleSettings!.resetInBetweenTimeoutMs
            || now - t.addedAtTime > t.obs.throttleSettings!.resetTotalTimeoutMs
        ) {
            this._throttleQueued = null;
            this._pushUpdateOf(t.obs).then(() => t.promise.resolve(), (r) => t.promise.reject(r));
            return t.promise.promise;
        }
        return undefined;
    }

    pushUpdateOf<T extends Object>(obs: ObservableObject<T>, throttleStepN: number): Promise<void> {
        if (!obs.throttleSettings) {
            // can't throttle, push if something is queued to not change order of updates 
            this._handleQueuedThrottled(true);
            return this._pushUpdateOf(obs);
        }
        if (throttleStepN == 0) { // throttle 0 is also always pushed
            const isTheSameThrottled = this._throttleQueued?.obs == obs;
            const promiseOfThrottled = this._handleQueuedThrottled(true);
            this._handleQueuedThrottled(true);
            if (isTheSameThrottled) {
                return promiseOfThrottled!;
            }
            return this._pushUpdateOf(obs);
        }
        // throttle step n is > 0, may be possible to skip/throttle update
        if (this._throttleQueued?.obs === obs) {
            return this._handleQueuedThrottled(false)!;
            
        } else {
            this._handleQueuedThrottled(true);
            this._throttleQueued = {
                obs,
                addedAtTime: performance.now(),
                promise: new DeferredPromise()
            };
            return this._throttleQueued.promise.promise;
        }
    }

    private async _pushUpdateOf<T extends Object>(obs: ObservableObject<T>): Promise<void> {
        const identifier = obs.identifier;
        const newValue = ObjectUtils.deepCloneObj(obs.poll());
        const o = this._perIdentObservables.get(identifier);
        if (!o) {
            throw new Error(`cant push update for unregistered observable ${identifier}`);
        }
        this._logger.assert(o === obs, 'observable instance sanity check');
        try {
            const msgObj = new WireObjectUpdate(identifier, newValue, o.serializer(), this._knownHash(identifier));
            const response = await this.connection.request(msgObj.toMsgSerialized());
            const objResp = deserializeResponse(response, o.serializer());

            if (objResp instanceof WireUpdateAccepted) {
                this._logger.assert(objResp.identifier == identifier, 'response identifier sanity check', objResp.identifier, identifier);
                this._saveKnown(identifier, newValue, objResp.hash);
                this._logger.debug(`sucessfully patched`, identifier);
            } else {
                ErrorUtils.logThrow('unexpeced response', objResp);
            }

        } catch (e) {
            this._logger.warn(`error pushing ${identifier} update, rollback`, e);
            const known = this._knownVersions.get(identifier);
            if (known) {
                applyConfirmedState(o, known.value, {identifier: SyncEventIdents.Rollback});
            } else {
                this._logger.error(`no known version of ${identifier} exists to rollback after failed update`);
            }
			return Promise.reject(e);
        }
    }

    async _query(ident: string) {
        const o = this._perIdentObservables.get(ident);
        if (!o) {
            throw new Error(`${ident} observable is not registered`);
        }
        const knownHash = this._knownVersions.get(ident);
        const [hash, value] = await this._queryHashValue(ident, knownHash?.hash ?? "", o.serializer());
        if ((knownHash?.hash != hash) && value) {
            this._notifyConfirmed(ident, hash, value);
        }
    }

    async _queryHashValue<T extends Object>(ident: string, knownHash: string, serializer: ObjectSerializer<T>): Promise<[string, T | null]> {
        const msgObj = new WireObjectStateQuery(ident, knownHash);
        const response = await this.connection.request(msgObj.toMsgSerialized());
        const objResp = WireSyncedObjectState.fromMsgSerialized(response, serializer);
        console.assert(objResp.identifier == ident, 'response identifier sanity check', objResp.identifier, ident);
        return [objResp.hash, objResp.object];
    }

    _notifyConfirmed(identifier: string, hash: string, value: any) {
        const o = this._perIdentObservables.get(identifier);
        if (o === undefined) {
            return;
        }
        this._saveKnown(identifier, value, hash);
        try {
            applyConfirmedState(o, value, {identifier:SyncEventIdents.Notification});
        } catch (e) {
            this._logger.error(`error during notificaiton ${identifier}`, e);
        }
    }
}

function applyConfirmedState<T extends Object>(obj: ObservableObject<T>, state: T, event: Partial<EventStackFrame>) {
    if (obj.isShapeReplacementAllowed()) {
        obj.applyPatchOrReplace({value: state, event})
    } else {
        obj.applyPatch({patch: state, event })
    }
}

export type SyncMessages<T> = WireObjectStateQuery | WireObjectUpdate<T> | WireSyncedObjectState<T> | WireUpdateAccepted | WireErrorMsg;

export function deserializeResponse<T>(binary: Uint8Array, objSerializer: ObjectSerializer<T>): SyncMessages<T> {
    const bb = new ByteBuffer(binary);
    const wireMsg = SyncMessage.getRootAsSyncMessage(bb);
    const msgType: MsgType = wireMsg.payloadType();
    switch (msgType) {
        case MsgType.ErrorMsg: return WireErrorMsg.fromMsgSerialized(binary);
        case MsgType.ObjectStateQuery: return WireObjectStateQuery.fromMsgSerialized(binary);
        case MsgType.ObjectState: return WireSyncedObjectState.fromMsgSerialized(binary, objSerializer);
        case MsgType.ObjectUpdate: return WireObjectUpdate.fromMsgSerialized(binary, objSerializer);
        case MsgType.UpdateAccepted: return WireUpdateAccepted.fromMsgSerialized(binary);
    }
    throw new Error('unexpected msg type ' + msgType);
}

export class WireObjectStateQuery {
    identifier: string;
    knownHash: string;

    constructor(
        identifier: string,
        hash: string,
    ) {
        this.identifier = identifier;
        this.knownHash = hash;
    }

    toMsgSerialized(): Uint8Array {
        const fb = new Builder();
        const root = SyncMessage.createSyncMessage(
            fb,
            MsgType.ObjectStateQuery,
            ObjectStateQuery.createObjectStateQuery(
                fb,
                fb.createString(this.identifier),
                fb.createString(this.knownHash),
            )
        )
        fb.finish(root);
        return fb.asUint8Array();
    }

    static fromMsgSerialized(binary: Uint8Array): WireObjectStateQuery {
        const bb = new ByteBuffer(binary);
        const wireMsg = SyncMessage.getRootAsSyncMessage(bb);
        if (wireMsg.payloadType() != MsgType.ObjectStateQuery) {
            throw new Error('unexpeted msg type');
        }
        const msg = wireMsg.payload(new ObjectStateQuery())!;
        return new WireObjectStateQuery(
            msg.identifier()!,
            msg.knownHash()!,
        );
    }
}

export class WireSyncedObjectState<T> {
    identifier: string;
    object: T | null;
    hash: string;
    serializer: ObjectSerializer<T>;

    constructor(
        identifier: string,
        object: T | null,
        hash: string,
        serializer: ObjectSerializer<T>,
    ) {
        this.identifier = identifier;
        this.object = object;
        this.hash = hash;
        this.serializer = serializer;
    }

    toMsgSerialized(): Uint8Array {
        const fb = new Builder();
        const serialized = this.serializer.serialize(this.object!);
        const root = SyncMessage.createSyncMessage(
            fb,
            MsgType.ObjectState,
            ObjectState.createObjectState(
                fb,
                fb.createString(this.identifier),
                fb.createString(this.hash),
                ObjectState.createValueVector(fb, serialized)
            )
        )
        fb.finish(root);
        return fb.asUint8Array();
    }

    static fromMsgSerialized<T>(binary: Uint8Array, serializer: ObjectSerializer<T>): WireSyncedObjectState<any> {
        const bb = new ByteBuffer(binary);
        const wireMsg = SyncMessage.getRootAsSyncMessage(bb);
        if (wireMsg.payloadType() != MsgType.ObjectState) {
            throw new Error(`unexpeсted msg type ${wireMsg.payloadType()}`);
        }
        const msg = wireMsg.payload(new ObjectState())!;
        let obj: T | null = null;
        const value = msg.valueArray();
        if (value && value.length > 0) {
            obj = serializer.deserialize(value);
        }
        return new WireSyncedObjectState(
            msg.identifier()!,
            obj,
            msg.hash()!,
            serializer
        );
    }
}


export class WireObjectUpdate<T> {
    identifier: string;
    object: T | null;
    serializer: ObjectSerializer<T>;
    previousHash: string;

    constructor(
        identifier: string,
        object: T | null,
        serializer: ObjectSerializer<T>,
        previousHash: string
    ) {
        this.identifier = identifier;
        this.object = object;
        this.serializer = serializer;
        this.previousHash = previousHash;
    }

    toMsgSerialized(): Uint8Array {
        const fb = new Builder();
        const serialized = this.serializer.serialize(this.object!);
        const root = SyncMessage.createSyncMessage(
            fb,
            MsgType.ObjectUpdate,
            ObjectUpdate.createObjectUpdate(
                fb,
                fb.createString(this.identifier),
                ObjectUpdate.createValueVector(fb, serialized),
                fb.createString(this.previousHash),
            )
        )
        fb.finish(root);
        return fb.asUint8Array();
    }

    static fromMsgSerialized<T>(binary: Uint8Array, serializer: ObjectSerializer<T>): WireObjectUpdate<any> {
        const bb = new ByteBuffer(binary);
        const wireMsg = SyncMessage.getRootAsSyncMessage(bb);
        if (wireMsg.payloadType() != MsgType.ObjectUpdate) {
            throw new Error(`unexpeсted msg type ${wireMsg.payloadType()}`);
        }
        const msg = wireMsg.payload(new ObjectUpdate())!;
        let obj: T | null = null;
        const value = msg.valueArray();
        if (value && value.length > 0) {
            obj = serializer.deserialize(value);
        }
        return new WireObjectUpdate(
            msg.identifier()!,
            obj,
            serializer,
            msg.previousHash()!
        );
    }
}

export class WireUpdateAccepted {
    identifier: string;
    hash: string;

    constructor(
        identifier: string,
        hash: string,
    ) {
        this.identifier = identifier;
        this.hash = hash;
    }

    toMsgSerialized(): Uint8Array {
        const fb = new Builder();
        const root = UpdateAccepted.createUpdateAccepted(
            fb,
            MsgType.UpdateAccepted,
            UpdateAccepted.createUpdateAccepted(
                fb,
                fb.createString(this.identifier),
                fb.createString(this.hash),
            )
        )
        fb.finish(root);
        return fb.asUint8Array();
    }

    static fromMsgSerialized(binary: Uint8Array): WireUpdateAccepted {
        const bb = new ByteBuffer(binary);
        const wireMsg = SyncMessage.getRootAsSyncMessage(bb);
        if (wireMsg.payloadType() != MsgType.UpdateAccepted) {
            throw new Error(`unexpeсted msg type ${wireMsg.payloadType()}`);
        }
        const msg = wireMsg.payload(new UpdateAccepted())!;
        return new WireUpdateAccepted(
            msg.identifier()!,
            msg.hash()!,
        );
    }
}

export class WireErrorMsg {

    debugComment: string;

    constructor(
        debugComment: string,
    ) {
        this.debugComment = debugComment;
    }

    toMsgSerialized(): Uint8Array {
        const fb = new Builder();
        const root = ErrorMsg.createErrorMsg(
            fb,
            fb.createString(this.debugComment),
        );
        fb.finish(root);
        return fb.asUint8Array();
    }

    static fromMsgSerialized(binary: Uint8Array): WireErrorMsg {
        const bb = new ByteBuffer(binary);
        const wireMsg = SyncMessage.getRootAsSyncMessage(bb);
        if (wireMsg.payloadType() != MsgType.ErrorMsg) {
            throw new Error(`unexpeсted msg type ${wireMsg.payloadType()}`);
        }
        const msg = wireMsg.payload(new ErrorMsg())!;
        return new WireErrorMsg(
            msg.debugComment() || "",
        );
    }
}