import { map as ldMap, reduce as ldReduce, includes as ldIncludes } from "lodash-es";
import { inject, Injectable } from "@angular/core";
import { combineLatest, EMPTY, filter, Observable, of, Subject, takeUntil, timer } from "rxjs";
import { finalize, first, map, pairwise, shareReplay, switchMap } from "rxjs/operators";

import { LgConsole } from "@logex/framework/core";
import { ISubscriptionTicket, LG_MESSAGE_BUS_SERVICE } from "@logex/framework/lg-application";

import { ILoadersCollection, LoaderConfiguration, LoadManager } from "../load-manager";

import { DatasetInfoV2 } from "./dataset-info-v2";
import { StaleDataServiceGatewayV2 } from "./gateways/stale-data-service-gateway-v2";
import { MaybeStaleData, CalculationError, CalculationProgress, IStaleDataService , CalculationErrorDto, CalculationProgressDto, TableStatusDto } from "./types";
import { combineCalculationProgress } from "./helpers/combineCalculationProgress";
import { combineCalculationErrors } from "./helpers/combineCalculationErrors";


const STATUS_CHECK_INTERVAL = 30 * 1000;

function memoizeByKey<TRes>(
    cache: Map<string, TRes>,
    key: any,
    getValue: () => TRes
): TRes {
    let res = cache.get(key);

    if (res == null) {
        res = getValue();
        cache.set(key, res);
    }

    return res;
}

@Injectable()
export class StaleDataServiceV2 implements IStaleDataService {
    private _loaders = inject(LoadManager) as ILoadersCollection;
    private readonly _lgConsole = inject(LgConsole).withSource("StaleDataServiceV2");
    private readonly _messageBus = inject(LG_MESSAGE_BUS_SERVICE);
    private readonly _staleDataServiceGatewayV2 = inject(StaleDataServiceGatewayV2);

    constructor() {
        this._setupStatusCheckFallbackRequest();

        // On first connection with the server, establish message handlers
        this._messageBus.connectionEstablished$.pipe(
            first(connectionEstablishedOk => connectionEstablishedOk)
        ).subscribe(() => {
            this._onTablesChangeSubscription = this._messageBus.on(
                "OnTablesChange",
                (subscriptionId: string, changes: TableStatusDto[]) => {
                    this._onTablesChange(subscriptionId, changes);
                }) as ISubscriptionTicket;

            this._onCalculationProgressSubscription = this._messageBus.on(
                "OnCalculationProgress",
                (subscriptionId: string, progress: CalculationProgressDto) => {
                    this._onCalculationProgress(subscriptionId, progress);
                }) as ISubscriptionTicket;

            this._onCalculationFinishedSubscription = this._messageBus.on(
                "OnCalculationFinished",
                (subscriptionId: string) => {
                    this._onCalculationFinished(subscriptionId);
                }) as ISubscriptionTicket;

            this._onCalculationErrorSubscription = this._messageBus.on(
                "OnCalculationError",
                (subscriptionId: string, error: CalculationErrorDto) => {
                    this._onCalculationError(subscriptionId, error);
                }) as ISubscriptionTicket;
        });

        // On every reconnection reload datasets
        this._messageBus.connectionEstablished$
            .pipe(
                pairwise(),
                filter(([prev, curr]) => prev === false && curr === true),
                takeUntil(this._destroyed$)
            )
            .subscribe(() => {
                this._datasets.forEach(dataset => dataset.reload());
            });
    }

    // ----------------------------------------------------------------------------------
    // Fields
    private readonly _datasets = new Map<string, DatasetInfoV2>();
    private readonly _isStaleCache = new Map<string, Observable<boolean>>();
    private readonly _staleTablesCache = new Map<string, Observable<Set<string>>>();
    private readonly _isCalculatingCache = new Map<string, Observable<boolean>>();
    private readonly _calculatingProgressCache = new Map<string, Observable<CalculationProgress | null>>();
    private readonly _calculatingErrorCache = new Map<string, Observable<CalculationError[] | null>>();

    private _onTablesChangeSubscription: ISubscriptionTicket | undefined;
    private _onCalculationProgressSubscription: ISubscriptionTicket | undefined;
    private _onCalculationFinishedSubscription: ISubscriptionTicket | undefined;
    private _onCalculationErrorSubscription: ISubscriptionTicket | undefined;

    private readonly _destroyed$ = new Subject<void>();


    // ----------------------------------------------------------------------------------
    configureLoader<TData>(
        datasetId: string,
        cfg: LoaderConfiguration<MaybeStaleData<TData>>,
        project: (data: TData | null, isStale: boolean, args: any[]) => unknown = (data, isStale) =>
            ({ data, isStale } as MaybeStaleData<TData>),
        queryReload?: (wasStale: boolean, isStale: boolean) => boolean
    ): Record<string, LoaderConfiguration> {
        const dataset = this._getDatasetInfo(datasetId);

        if (queryReload !== undefined) dataset.queryReloadCallback = queryReload;

        return {
            [datasetId]: {
                ...cfg,

                loader: (...args) => {
                    dataset.onLoadingStart(args);

                    // Subscriptions are passes as array. Loader function must recognize it and handle properly
                    return cfg.loader(...args, this.getDataSetSubscriptions(datasetId)).pipe(
                        map(data => {
                            if (data == null) return project(null, false, args);

                            dataset.onDataReceived(data);

                            return project(data.data, data.isStale, args);
                        })
                    );
                },

                onCancel: () => {
                    dataset.cancel();
                }
            }
        };
    }

    isStale$(...datasets: string[]): Observable<boolean> {
        if (datasets.length === 0) return of(false);

        const cacheKey = datasets.join(",");
        return memoizeByKey(this._isStaleCache, cacheKey, () =>
            combineLatest(
                ldMap(datasets, x => this._getDatasetInfo(x).isStale$)
            ).pipe(
                map((values: boolean[]) => ldReduce(values, (a, x) => a || x, false)),
                finalize(() => this._isStaleCache.delete(cacheKey)),
                shareReplay({ bufferSize: 1, refCount: true }),
                takeUntil(this._destroyed$),
            )
        );
    }

    staleTables$(...datasets: string[]): Observable<Set<string>> {
        if (datasets.length === 0) return of(new Set<string>());

        const cacheKey = datasets.join(",");
        return memoizeByKey(this._staleTablesCache, cacheKey, () =>
            combineLatest(
                ldMap(datasets, x => this._getDatasetInfo(x).staleTables$)
            ).pipe(
                map((values: Array<Set<string>>) => {
                    const staleTables = ldReduce(values, (a, x) => {
                        x.forEach(y => a.add(y));
                        return a;
                    }, new Set<string>());
                    this._lgConsole.debug("Stale tables", { datasets, values, staleTables });
                    return staleTables;
                }),
                finalize(() => this._staleTablesCache.delete(cacheKey)),
                shareReplay({ bufferSize: 1, refCount: true }),
                takeUntil(this._destroyed$),
            )
        );
    }

    isCalculating(...datasets: string[]): boolean {
        if (datasets.length === 0) return false;

        return ldReduce(
            ldMap(datasets, x => this._getDatasetInfo(x).isCalculating),
            (a, x) => a || x,
            <boolean>false
        );
    }

    isCalculating$(...datasets: string[]): Observable<boolean> {
        if (datasets.length === 0) return of(false);

        const cacheKey = datasets.join(",");
        return memoizeByKey(this._isCalculatingCache, cacheKey, () =>
            combineLatest(
                ldMap(datasets, x => this._getDatasetInfo(x).isCalculating$)
            ).pipe(
                map((values: boolean[]) => ldReduce(values, (a, x) => a || x, false)),
                finalize(() => this._isCalculatingCache.delete(cacheKey)),
                shareReplay({ bufferSize: 1, refCount: true }),
                takeUntil(this._destroyed$),
            )
        );
    }

    calculationProgress$(...datasets: string[]): Observable<CalculationProgress | null> {
        if (datasets.length === 0) return of(null);

        const cacheKey = datasets.join(",");
        return memoizeByKey(this._calculatingProgressCache, cacheKey, () =>
            combineLatest(
                ldMap(datasets, x => this._getDatasetInfo(x).calculationProgress$)
            ).pipe(
                map(values => combineCalculationProgress(values)),
                finalize(() => this._calculatingProgressCache.delete(cacheKey)),
                shareReplay({ bufferSize: 1, refCount: true }),
                takeUntil(this._destroyed$),
            )
        );
    }

    calculationError$(...datasets: string[]): Observable<CalculationError[] | null> {
        if (datasets.length === 0) return of(null);

        const cacheKey = datasets.join(",");
        return memoizeByKey(this._calculatingErrorCache, cacheKey, () =>
            combineLatest(
                ldMap(datasets, x => this._getDatasetInfo(x).calculationError$)
            ).pipe(
                map(values => combineCalculationErrors(values)),
                finalize(() => this._calculatingErrorCache.delete(cacheKey)),
                shareReplay({ bufferSize: 1, refCount: true }),
                takeUntil(this._destroyed$),
            )
        );
    }

    setLoadersOverride(value: ILoadersCollection) {
        this._loaders = value;
    }

    private _onTablesChange(subscriptionId: string, changes: TableStatusDto[]): void {
        const dataset = this._findDatasetBySubscriptionId(subscriptionId);
        if (dataset == null) return;

        dataset.onDataChanged(subscriptionId, changes);
    }

    private _onCalculationProgress(subscriptionId: string, progress: CalculationProgressDto): void {
        const dataset = this._findDatasetBySubscriptionId(subscriptionId);
        if (dataset == null) return;

        dataset.onCalculationProgress(subscriptionId, progress);
    }

    private _onCalculationFinished(subscriptionId: string): void {
        const dataset = this._findDatasetBySubscriptionId(subscriptionId);
        if (dataset == null) return;

        dataset.onCalculationFinished(subscriptionId);
    }

    private _onCalculationError(subscriptionId: string, error: CalculationErrorDto): void {
        const dataset = this._findDatasetBySubscriptionId(subscriptionId);
        if (dataset == null) return;

        dataset.onCalculationError(subscriptionId, error);
    }

    private _findDatasetBySubscriptionId(subscriptionId: string): DatasetInfoV2 | undefined {
        for (const dataset of this._datasets.values()) {
            if (ldIncludes(dataset.subscriptions, subscriptionId)) return dataset;
        }
        return undefined;
    }

    private _getDatasetInfo(datasetId: string): DatasetInfoV2 {
        let dataSet = this._datasets.get(datasetId);

        if (dataSet == null) {
            dataSet = new DatasetInfoV2(
                datasetId,
                this._loaders,
                this._lgConsole
            );
            this._datasets.set(datasetId, dataSet);
        }

        return dataSet;
    }

    private getDataSetSubscriptions(dataSetName: string): string | string[] | undefined {
        const subscriptions = this._datasets.get(dataSetName)?.subscriptions ?? [];
        return subscriptions.length === 0 ? undefined
            : subscriptions.length === 1 ? subscriptions[0]
            : subscriptions;
    }

    private _setupStatusCheckFallbackRequest(): void {
        this._messageBus.connectionEstablished$.pipe(
            switchMap((connectionEstablishedOk) => {
                return connectionEstablishedOk
                    ? EMPTY
                    : timer(0, STATUS_CHECK_INTERVAL)
                        .pipe(map(() => this._getSubscriptionIdsFromDataSets()))
            }))
            .subscribe(subscriptionIds => {
                if (subscriptionIds == null || subscriptionIds.length === 0) return;

                this._staleDataServiceGatewayV2
                    .checkSubscriptions(subscriptionIds)
                    .subscribe(subscriptionStates => {
                        for (const [subscriptionId, status] of Object.entries(subscriptionStates)) {
                            if (status.isStale) continue;
                            this._onTablesChange(subscriptionId, []);
                        }
                    });
            });
    }

    private _getSubscriptionIdsFromDataSets(): string[] {
        const acc: string[][] = [];
        for (const dataset of this._datasets.values()) {
            acc.push(dataset.subscriptions);
        }
        return acc.flat();
    }

    /***
    * This method is called from `useStaleData`. It is deliberately not `ngOnDestroy` because we don't want to
     * get double destruction.
    */
    release(): void {
        this._datasets.forEach((dataset) => dataset.cancel());
        const subscriptionIds = this._getSubscriptionIdsFromDataSets();
        if (subscriptionIds.length > 0) {
            this._staleDataServiceGatewayV2.cancelSubscriptions(subscriptionIds).subscribe();
        }

        this._onTablesChangeSubscription?.cancel();
        this._onCalculationProgressSubscription?.cancel();
        this._onCalculationFinishedSubscription?.cancel();
        this._onCalculationErrorSubscription?.cancel();

        // Clean all caches with observables
        this._destroyed$.next();
        this._destroyed$.complete();
    }
}
