import type { VersionedValue } from '../';
import type { Observer } from './Observables';
import type { ObservableStream } from './ObservableStream';

export interface IStreamAccumulator<T> extends VersionedValue {
    dispose(): void;
    version(): number;
    isEmpty(): boolean
    consume(): T[] | undefined;
    mergeAndConsume<R>(mergeFn: (updates: T[]) => R): R | undefined;
}

export class StreamAccumulator<T> implements IStreamAccumulator<T> {

    _events: T[] = [];

    _version: number = 0;

    private _subscription: Observer;
	private _filterFn: ((newObj: T) => boolean) | null;

    constructor(
        stream: ObservableStream<T>,
		filterFn?: (newObj: T) => boolean,
    ) {
		this._filterFn = filterFn ?? null;
        this._subscription = stream.subscribe({
            settings: {immediateMode: true},
            onNext: (d) => {
				if (this._filterFn && !this._filterFn(d)) {
					return;
				}
                this._events.push(d);
                this._version += 1;
            }
        })
    }

    version(): number {
        return this._version;
    }

    dispose() {
        this._subscription.dispose();
    }

    isEmpty() {
        return this._events.length === 0;
    }

    consume(): T[] | undefined {
        if (this._events.length) {
            const toReturn = this._events;
            this._events = [];
            return toReturn;
        }
        return undefined;
    }

    mergeAndConsume<R>(mergeFn: (updates: T[]) => R): R | undefined {
        const r = this.consume();
        if (r !== undefined) {
            return mergeFn(r);
        }
        return undefined;
    }
}

export class StreamTransformedAccumulator<From, To> implements IStreamAccumulator<To> {

    _events: To[] = [];

    _version: number = 0;

    private _subscription: Observer;
	private _transformFn: (newObj: From) => To | null;

    constructor(
        stream: ObservableStream<From>,
		transformFn: (newObj: From) => To | null,
    ) {
		this._transformFn = transformFn;
        this._subscription = stream.subscribe({
            settings: {immediateMode: true},
            onNext: (d) => {
                const transformed = this._transformFn(d);
				if (transformed == null) {
					return;
				}
                this._events.push(transformed);
                this._version += 1;
            }
        })
    }

    version(): number {
        return this._version;
    }

    dispose() {
        this._subscription.dispose();
    }

    isEmpty() {
        return this._events.length === 0;
    }

    consume(): To[] | undefined {
        if (this._events.length) {
            const toReturn = this._events;
            this._events = [];
            return toReturn;
        }
        return undefined;
    }

    mergeAndConsume<R>(mergeFn: (updates: To[]) => R): R | undefined {
        const r = this.consume();
        if (r !== undefined) {
            return mergeFn(r);
        }
        return undefined;
    }
}
