import _ from "lodash-es";
import { Inject, Injectable, OnDestroy } from "@angular/core";
import { asyncScheduler, BehaviorSubject, Observable, of, Subscription, timer, zip } from "rxjs";
import { map, switchMap, takeWhile, throttleTime, withLatestFrom } from "rxjs/operators";

import { IMessageBusService, ISubscriptionTicket, LG_MESSAGE_BUS_SERVICE } from "@logex/framework/lg-application";
import { LgConsole } from "@logex/framework/core";

import { StaleDataServiceGateway } from "./gateways/stale-data-service-gateway";
import { JobStatus } from "./gateways/stale-data-service-gateway.types";
import { JobProgressInfo, WatchedJobs } from "./types/types";


const STATUS_CHECK_INTERVAL = 60 * 1000;
const REMOVE_OLD_MESSAGES_INTERVAL = 120 * 1000;


function isJobStatusNewer( existing: JobStatus | null, updated: JobStatus ): boolean {
    return existing == null
        || !existing.isStarted && updated.isStarted
        || !existing.isFinished && updated.isFinished
        || existing.currentStep < updated.currentStep;
}

@Injectable( {
    providedIn: "root",
} )
export class CalculationJobsMonitoringService implements OnDestroy {

    constructor(
        private _gateway: StaleDataServiceGateway,
        @Inject( LG_MESSAGE_BUS_SERVICE ) private _messageBus: IMessageBusService,
        private _lgConsole: LgConsole,
    ) {
        this._lgConsole = this._lgConsole.withSource( "StaleDataService" );
    }


    private _isDestroyed = false;
    private _watchedJobs: BehaviorSubject<WatchedJobs> = new BehaviorSubject<WatchedJobs>( {} );
    private _statusRequestSub: Subscription | null = null;


    private _onJobProgressSubscription = this._messageBus.on( "OnJobProgress", ( info: JobProgressInfo ) => {
        this._onJobProgress( info );
    } ) as ISubscriptionTicket;

    private _onJobCompleteSubscription = this._messageBus.on( "OnJobComplete", ( jobId: string ) => {
        this._onJobComplete( jobId );
    } ) as ISubscriptionTicket;

    private _onJobCancelledSubscription = this._messageBus.on( "OnJobCancelled", ( jobId: string ) => {
        this._onJobCancelled( jobId );
    } ) as ISubscriptionTicket;

    private _onJobErrorSubscription = this._messageBus.on( "OnJobError", ( jobId: string, e: any ) => {
        this._onJobError( jobId, e );
    } ) as ISubscriptionTicket;


    private _messageHistory: Array<[timeStamp: number, jobStatus: JobStatus]> = [];
    private _oldMessagesRemover = timer( 0, REMOVE_OLD_MESSAGES_INTERVAL )
        .subscribe( () => {
            const timeToCompare = Date.now() - REMOVE_OLD_MESSAGES_INTERVAL;
            if ( this._messageHistory.length > 0 ) {
                this._messageHistory = this._messageHistory.filter( message => message[0] < timeToCompare );
            }
        } );


    watchJobs( jobIds: string[] ): Observable<JobStatus[]> {
        const watchedJobs = this._watchedJobs.getValue();

        const observable: Observable<JobStatus[]> = new Observable<JobStatus[]>( subscriber => {
            const subscription = this._jobStatusesByJobIds( jobIds )
                .pipe(
                    takeWhile( statuses => !_.every( statuses, x => x?.isFinished ), true ),
                )
                .subscribe( {
                    next: value => subscriber.next( value ),

                    complete: () => {
                        subscriber.complete();
                        this._removeSubscriber( jobIds, observable );
                    },

                    error: e => {
                        this._removeSubscriber( jobIds, observable );

                        throw Error( "Should not happen" );
                    },
                } );

            return ( {
                ...subscription,
                unsubscribe: () => this._removeSubscriber( jobIds, observable ),
            } );
        } ).pipe(
            throttleTime( 500, asyncScheduler, { leading: true, trailing: true } ),
        );


        jobIds.forEach( jobId => {
            if ( watchedJobs[jobId] != null ) {
                watchedJobs[jobId].subscribers.add( observable );
            } else {
                watchedJobs[jobId] = {
                    id: jobId,
                    status: this._messageHistory.find( x => x[1].jobId === jobId )?.[1] ?? null,
                    subscribers: new Set( [observable] )
                };
            }
        } );

        this._setupStatusCheckFallbackRequest();

        return observable;
    }


    private _setupStatusCheckFallbackRequest(): void {
        if ( this._statusRequestSub != null ) return;

        this._statusRequestSub = timer( STATUS_CHECK_INTERVAL, STATUS_CHECK_INTERVAL )
            .pipe(
                withLatestFrom( this._watchedJobs ),
                switchMap( ( [, jobs] ) => {
                    const jobIds = Object.keys( jobs ).filter( id => {
                        const job = jobs[id];
                        return job?.status == null || !job.status.isFinished;
                    } );

                    return zip(
                        this._gateway.getJobsStatus( jobIds ),
                        of( jobIds )
                    );
                } )
            )
            .subscribe( ( [statuses, jobIds] ) => {
                const watchedJobs = this._watchedJobs.getValue();

                let isUpdated = false;
                jobIds.forEach( jobId => {
                    const status = statuses.find( x => x.jobId === jobId );
                    if ( status == null ) {
                        // If we didn't receive a status for any task, it could be server restart.
                        // In this case we can consider the job cancelled
                        watchedJobs[jobId].status = {
                            ...this._getJobStatusTemplate( jobId ),
                            isFinished: true,
                            isCancelled: true,
                        };
                        isUpdated = true;

                    } else {
                        const job = watchedJobs[status.jobId];

                        if ( job == null ) {
                            this._lgConsole.error( `Jobs status check returned status for unknown job ${status.jobId}` );
                            return;
                        }

                        // Fallback check response can arrive after message bus notification, so progress in this status can be
                        // already outdated. We need to update the existing progress only if it is older.
                        if ( isJobStatusNewer( job.status, status ) ) {
                            job.status = status;
                            isUpdated = true;
                        }
                    }
                } );

                if ( isUpdated ) {
                    this._watchedJobs.next( watchedJobs );
                }
            } );
    }


    private _jobStatusesByJobIds( jobIds: string[] ): Observable<JobStatus[]> {
        return this._watchedJobs.pipe(
            map( jobs => {
                const res: JobStatus[] = [];
                for ( const jobId of jobIds ) {
                    const job = jobs[jobId];
                    if ( job != null ) {
                        res.push( job.status ?? this._getJobStatusTemplate( jobId ) );
                    }
                }
                return res;
            } ),
        );
    }


    private _removeSubscriber( jobIds: string[], subscriber: Observable<JobStatus[]> ): void {
        const watchedJobs = this._watchedJobs.getValue();

        jobIds.forEach( jobId => {
            const watchedJob = watchedJobs[jobId];
            watchedJob?.subscribers?.delete( subscriber );
            if ( watchedJob?.subscribers?.size === 0 ) {
                delete watchedJobs[jobId];
            }
        } );

        // Stop fallback status check requests
        if ( _.isEmpty( watchedJobs ) && this._statusRequestSub != null ) {
            this._statusRequestSub.unsubscribe();
            this._statusRequestSub = null;
        }
    }


    private _onJobProgress( info: JobProgressInfo ): void {
        const watchedJobs = this._watchedJobs.getValue();
        const job = watchedJobs[info.jobId];
        if ( job == null ) return;

        if ( job.status == null ) {
            job.status = this._getJobStatusTemplate( info.jobId );
            job.status.isStarted = true;
        }

        if ( job.status.currentStep < info.currentStep ||
            job.status.totalSteps !== info.totalSteps
        ) {
            // this._lgConsole.debug( `Job ${info.jobId} progresses ${info.currentStep}/${info.totalSteps}` );

            job.status.currentStep = info.currentStep;
            job.status.totalSteps = info.totalSteps;
            this._watchedJobs.next( watchedJobs );
        }
    }


    private _onJobComplete( jobId: string ): void {
        const status = {
            ...this._getJobStatusTemplate( jobId ),
            isStarted: true,
            isFinished: true,
            isSuccess: true
        };

        this._messageHistory.push( [Date.now(), status] );

        const watchedJobs = this._watchedJobs.getValue();
        const job = watchedJobs[jobId];
        if ( job == null ) return;

        let isUpdated = false;

        if ( job.status == null ) {
            job.status = status;
            isUpdated = true;

        } else if ( !job.status.isFinished || !job.status.isSuccess ) {
            // Not a duplicate notification
            this._lgConsole.debug( `Calculation job complete: ${jobId}` );
            job.status.isFinished = true;
            job.status.isSuccess = true;
            isUpdated = true;
        }

        if ( isUpdated ) {
            this._watchedJobs.next( watchedJobs );
        }
    }


    private _onJobCancelled( jobId: string ): void {
        const status = {
            ...this._getJobStatusTemplate( jobId ),
            isStarted: true,
            isFinished: true,
            isCancelled: true,
        };

        this._messageHistory.push( [Date.now(), status] );

        const watchedJobs = this._watchedJobs.getValue();
        const job = watchedJobs[jobId];
        if ( !job ) return;

        let isUpdated = false;

        if ( job.status == null ) {
            job.status = status;
            isUpdated = true;

        } else if ( !job.status.isFinished || !job.status.isCancelled ) {
            // Not a duplicate notification
            this._lgConsole.debug( `Calculation job cancelled: ${jobId}` );

            job.status.isFinished = true;
            job.status.isCancelled = true;
            isUpdated = true;
        }
        if ( isUpdated ) {
            this._watchedJobs.next( watchedJobs );
        }
    }


    private _onJobError( jobId: string, e: any ): void {
        const status = {
            ...this._getJobStatusTemplate( jobId ),
            isStarted: true,
            isFinished: true,
            error: e
        };

        this._messageHistory.push( [Date.now(), status] );

        const watchedJobs = this._watchedJobs.getValue();
        const job = watchedJobs[jobId];
        if ( job == null ) return;

        let isUpdated = false;
        if ( job.status == null ) {
            job.status = status;
            isUpdated = true;

        } else if ( !job.status.isFinished || job.status.error == null ) {
            // Not a duplicate notification
            this._lgConsole.error( `Error executing job: ${jobId}`, { e } );

            job.status.isFinished = true;
            job.status.error = e;
            isUpdated = true;
        }

        if ( isUpdated ) {
            this._watchedJobs.next( watchedJobs );
        }
    }


    private _getJobStatusTemplate( jobId: string ): JobStatus {
        return {
            jobId,
            isStarted: false,
            currentStep: 0,
            totalSteps: 0,
            isFinished: false,
            isCancelled: false,
            isSuccess: false,
            error: null,
        };
    }


    ngOnDestroy(): void {
        if ( this._isDestroyed ) return;
        
        this._watchedJobs?.complete();
        this._statusRequestSub?.unsubscribe();
        this._statusRequestSub = null;
        this._oldMessagesRemover?.unsubscribe();
        this._onJobProgressSubscription?.cancel();
        this._onJobCompleteSubscription?.cancel();
        this._onJobCancelledSubscription?.cancel();
        this._onJobErrorSubscription?.cancel();
        
        this._isDestroyed = true;
    }
}
