import type { ResultAsync, JobExecutor, LazyVersioned, Result, PollWithVersionResult } from "..";
import { IterUtils, ResolvedPromise, Success, RecalcScheduleType, Failure, ExecutionThreadPreference, InProgress, WorkerPool } from "..";
import type { ScopedLogger } from "../ScopedLogger";
import type { BasicCollectionUpdates} from "./BasicCollectionUpdates";
import { Allocated, Deleted, Updated } from "./BasicCollectionUpdates";
import type { BasicDataSource } from "./BasicDataSource";
import { ObservableStream } from "./ObservableStream";
import { StreamAccumulator } from "./StreamAccumulator";


export class MappedCollectionParallel<T, IdT, Source, JobArgs = Source, JobOutput = T>
	implements LazyVersioned<ReadonlyMap<IdT, ResultAsync<T>>>, BasicDataSource<ResultAsync<T>, IdT>
{
	// does not preserve order of updates of source collection
	readonly updatesStream: ObservableStream<BasicCollectionUpdates<IdT>>;

	readonly logger: ScopedLogger;
	readonly dataSource: BasicDataSource<Source, IdT>;

    readonly jobExecutorCtor: new () => JobExecutor<JobArgs, JobOutput>;
    readonly toJobArgsConverter: ((it: Source) => Result<JobArgs>) | undefined;
    readonly fromJobResultConverter: ((jobOutput: JobOutput) => T) | undefined;
    readonly scheduleType: RecalcScheduleType;

	// private _waitingFor = new Map<IdT, ResultAsync<T>>();
    private _asyncResultNotSignaledYet = new Map<IdT, Failure | Success<T>>();
    private _results = new Map<IdT, ResultAsync<T>>();
	private _disposeFn: ((sourceObj: T) => void) | null;
	private _updatesAccumulator: StreamAccumulator<BasicCollectionUpdates<IdT>>;

	_dirtyIds: IdT[] = [];


	constructor(args: {
		identifier: string,
		logger: ScopedLogger,
		dataSource: BasicDataSource<Source, IdT>,
		mapExecutorCtor: { new(): JobExecutor<JobArgs, JobOutput> },
        toJobArgsConverter?: (it: Source) => Result<JobArgs>,
        fromJobResultConverter?: (jobOutput: JobOutput) => T,
		disposeFn?: (obj: T) => void,
		schedule?: RecalcScheduleType,
	}) {

		this.logger = args.logger.newScope(args.identifier);
		this.dataSource = args.dataSource;
        this.jobExecutorCtor = args.mapExecutorCtor;
        this.toJobArgsConverter = args.toJobArgsConverter;
        this.fromJobResultConverter = args.fromJobResultConverter;
		this._disposeFn = args.disposeFn ?? null;
		this.scheduleType = args.schedule ?? RecalcScheduleType.Microtask;

        this.updatesStream = new ObservableStream({
			identifier: args.identifier,
			holdLastValueForNewSubscribers: false,
			defaultValueForNewSubscribersFactory: () => {
				if (this._results.size === 0) {
					return undefined;
				}
				return new Allocated<IdT>(Array.from(this._results.keys()));
			},
		});
		this._updatesAccumulator = new StreamAccumulator(this.dataSource.updatesStream, (_update) => {
			this._scheduleRecalc();
			return true;
		});
	}

	dispose() {
		this._updatesAccumulator.dispose();
		if (this._disposeFn) {
			for (const [id, obj] of this._results) {
                if (!(obj instanceof Success)){
                    continue;
                }
				try {
					this._disposeFn(obj.value);
				} catch (e) {
					this.logger.error('error during disposing obj', id, e);
				}
			}
		}
		this._results.clear();
	}

    isSynced() {
        if (!this._updatesAccumulator.isEmpty()) {
            return false;
        }
        if (this._asyncResultNotSignaledYet.size > 0) {
            return false;
        }
        if (this._dirtyIds.length > 0) {
            return false;
        }
        for (const value of this._results.values()) {
            if (value instanceof InProgress) {
                return false;
            }
        }
        return true;
    }

	poll(): ReadonlyMap<IdT, ResultAsync<T>> {
		this._handleCalcsIfNecessary();
		return this._results;
	}
	pollWithVersion(): PollWithVersionResult<
		 Readonly<ReadonlyMap<IdT, ResultAsync<T>>>
	> {
		return { value: this.poll(), version: this.version() };
	}
	version(): number {
		return this.updatesStream.version() + this._updatesAccumulator.version();
	}

	peekByIds(ids: Iterable<IdT>): Map<IdT, ResultAsync<T>> {
		this._handleCalcsIfNecessary();
		const res = new Map<IdT, ResultAsync<T>>();
		for (const id of ids) {
			const obj = this._results.get(id);
			if (obj !== undefined) {
				res.set(id, obj);
			}
		}
		return res;
	}
	allIds(): IterableIterator<IdT> {
		return this.poll().keys();
	}

	markDirty(id: IdT) {
		this._dirtyIds.push(id);
	}


	private _updateScheduled: boolean = false;
	private _scheduleRecalc() {
		if (this.scheduleType === RecalcScheduleType.Immidiate) {
			this._handleCalcsIfNecessary();
		} else if (this.scheduleType === RecalcScheduleType.Microtask) {
			if (!this._updateScheduled) {
				this._updateScheduled = true;
				ResolvedPromise.then(() => {
					this._updateScheduled = false;
					this._handleCalcsIfNecessary();
				});
			}
		} else if (this.scheduleType === RecalcScheduleType.OnDemand) {
			// call poll to get synced results
		} else {
			this.logger.error('unknown schedule type', this.scheduleType);
		}
	}

	private _handleCalcsIfNecessary() {
		const updates = this._updatesAccumulator.consume();
		if (updates) {
			for (const update of updates) {
				IterUtils.extendArray(this._dirtyIds, update.ids);
			}
		}
		if (this._dirtyIds.length > 0) {
			let dirtyIdsUnique: Iterable<IdT>;
			if (typeof this._dirtyIds[0] === 'number') {
				dirtyIdsUnique = this._dirtyIds;
				IterUtils.sortDedupNumbers(dirtyIdsUnique as unknown[] as number[]);
			} else {
				dirtyIdsUnique = new Set(this._dirtyIds);
			}
			this._dirtyIds = [];
			this._handleDirtyIds(dirtyIdsUnique)
		}
        if (this._asyncResultNotSignaledYet.size > 0) {
            const updatedIds: IdT[] = [];
            for (const [id, result] of this._asyncResultNotSignaledYet) {
                const currState = this._results.get(id);
                if (!(currState instanceof InProgress)) {
                    this.logger.batchedError('unexpected state for not signaled caclulation', [id, currState]);
                }
                if (currState !== undefined) {
                    this._results.set(id, result);
                }
                updatedIds.push(id);
            }
			this._asyncResultNotSignaledYet.clear();
            this.updatesStream.pushNext(new Updated(updatedIds));
        }
	}

    private _handleDirtyIds(dirtyIdsUnique: Iterable<IdT>) {

		const newIds: IdT[] = [];
		const updatedIds: IdT[] = [];
		const removedIds: IdT[] = [];

		const dataSource = this.dataSource;
        const toCalculate: [IdT, JobArgs][] = [];

		for (const [id, sourceObj] of dataSource.peekByIds(dirtyIdsUnique)) {
			this._asyncResultNotSignaledYet.delete(id);
            const currResult = this._results.get(id);
			if (sourceObj === undefined) {
				if (currResult !== undefined) {
					this._results.delete(id);
					removedIds.push(id);
					if (currResult instanceof Success && this._disposeFn) {
						try {
							this._disposeFn(currResult.value);
						} catch (e) {
							this.logger.error('error during object disposal', id, e);
						}
					}
				}

			} else {
                if (this._results.has(id)) {
                    updatedIds.push(id);
                } else {
                    newIds.push(id);
                }
				try {
					let jobArgs: JobArgs;
					if (this.toJobArgsConverter) {
						const converted = this.toJobArgsConverter(sourceObj);
						if (converted instanceof Success) {
							jobArgs = converted.value;
						} else {
							this._asyncResultNotSignaledYet.set(id, converted);
							continue;
						}
					} else {
						jobArgs = sourceObj as unknown as JobArgs;
					}
                    toCalculate.push([id, jobArgs as JobArgs]);
				} catch (e) {
					this.logger.error('error converting args for job executor', id, e);
                    this._asyncResultNotSignaledYet.set(id,new Failure({msg: e instanceof Error ? e.message : '' + e}));
				}
			}
		}
        if (toCalculate.length > 0) {
            const executor = new this.jobExecutorCtor();

            for (const [id, args] of toCalculate) {
                const pref = executor.executionPreference(args);
                let asyncResult: ResultAsync<T>;
                if (pref === ExecutionThreadPreference.MainThread) {
                    try {
                        const jobOutput = executor.execute(args) as JobOutput;
                        const result = this.fromJobResultConverter ? this.fromJobResultConverter(jobOutput) : jobOutput;
                        asyncResult = new Success(result as T);
                    } catch (e) {
                        console.error('job result conversion error', e);
                        asyncResult = new Failure({msg: e instanceof Error ? e.message : '' + e});
                    }
                } else {
                    asyncResult = new InProgress(id + '');
                    const jobResultPromise = WorkerPool.execute(
                        this.jobExecutorCtor,
                        args,
                        WorkerPool.createWorkGuidForInCollectionId(this, id),
                    );
                    jobResultPromise.then(
                        (jobOutput) => {
                            if (this._results.get(id) !== asyncResult) {
                                return;
                            }
                            try {
                                const result = this.fromJobResultConverter ? this.fromJobResultConverter(jobOutput) : jobOutput;
                                this._setAsyncNotSignaledResult(id, new Success(result as T));
                            } catch (e) {
                                this._setAsyncNotSignaledResult(id, new Failure({msg: e instanceof Error ? e.message : "" + e}));
                            }
                        },
                        (e) => {
                            if (this._results.get(id) !== asyncResult) {
                                return;
                            }
                            this._setAsyncNotSignaledResult(id, new Failure({msg: e instanceof Error ? e.message : "" + e}));
                        }
                    );
                }
                this._results.set(id, asyncResult);
            }
        }

		if (newIds.length) {
			this.updatesStream.pushNext(new Allocated(newIds));
		}
		if (updatedIds.length) {
			this.updatesStream.pushNext(new Updated(updatedIds));
		}
		if (removedIds.length) {
			this.updatesStream.pushNext(new Deleted(removedIds));
		}
    }

	private _setAsyncNotSignaledResult(id: IdT, result: Success<T> | Failure) {
		this._asyncResultNotSignaledYet.set(id, result);
		this._scheduleRecalc();
	}
}
