import { SampleContextService } from '../sample-context.service';
import { BehaviorSubject, Subject, Observable, EMPTY, merge, of, combineLatest, ReplaySubject } from 'rxjs';
import { Card, CardResult, DataSpec, Insight, Variable, Sample, TaggableObjectsService } from 'src/generated-sources';
import { switchMap, map, shareReplay, filter, distinctUntilChanged, tap, take, startWith } from 'rxjs/operators';
import { DataikuAPIService } from '@core/dataiku-api/dataiku-api.service';
import { catchAPIError, ErrorContext, APIError } from '@core/dataiku-api/api-error';
import { OnDestroy, Injectable } from '@angular/core';
import { getInsightObjectRef, resolveSmartName } from '../utils';
import { ComputeService } from '../compute.service';
import { untilDestroyed, UntilDestroy } from '@ngneat/until-destroy';
import { columnsToVariables } from '../schema-utils';
import { auditMap, FutureWatcherService } from 'dku-frontend-core';
import { WaitingService } from '@core/overlays/waiting.service';
import deepEqual from 'fast-deep-equal';

interface InsightData {
    params: Card;
    results: CardResult;
    dataSpec: DataSpec;
    insight: Insight;
}

@UntilDestroy()
@Injectable()
export class InsightContextService extends SampleContextService implements ErrorContext, OnDestroy {
    private insight$ = new Subject<Insight>();
    private updateRequests$ = new Subject<{ params: Card, dataSpec: DataSpec }>();
    private updateDataSpecRequests$ = new ReplaySubject<{ dataSpec: DataSpec, refreshOnDataSpecChange: boolean }>(1);
    private error$ = new BehaviorSubject<APIError | undefined>(undefined);

    private insightRef: TaggableObjectsService.TaggableObjectRef;

    private insightData$ = this.insight$.pipe(
        switchMap(insight => {
            this.insightRef = getInsightObjectRef(insight);
            return this.DataikuAPI.dashboards.getInsightWithPayload(insight.projectKey, insight.id!)
                .pipe(
                    map(insightWithPayload => ({
                        insight,
                        params: insightWithPayload.insight.params.card,
                        results: JSON.parse(insightWithPayload.payload),
                        dataSpec: insightWithPayload.insight.params.dataSpec
                    })),
                    catchAPIError(this)
                );
        }),
        shareReplay(1)
    );

    private sample$: Observable<Sample> = this.updateDataSpecRequests$.pipe(
        distinctUntilChanged((prev, cur) => deepEqual(prev.dataSpec, cur.dataSpec)),
    ).pipe(
        // Sample needs to be retrieved or rebuilt
        switchMap(curRequest => {
            return this.DataikuAPI.statistics.currentSample(this.insightRef!, curRequest.dataSpec!)
                .pipe(
                    switchMap(currentSampleResponse => {
                        // Sample was already built
                        if (currentSampleResponse) {
                            return of(currentSampleResponse);
                        }

                        // Otherwise build a new sample
                        return this.DataikuAPI.statistics.rebuildSample(this.insightRef!, curRequest.dataSpec!)
                            .pipe(
                                this.waitingService.bindOverlayAndWaitForResult<Sample>(),
                            );
                    }),
                    this.waitingService.bindStaticOverlay('Preparing sample...'),
                    tap(() => {
                        if (curRequest.refreshOnDataSpecChange) {
                            this.insightData$.pipe(take(1)).subscribe(
                                insightData => this.updateInsight({
                                    params: insightData.params,
                                    dataSpec: curRequest.dataSpec
                                })
                            );
                        }
                    }
                    )
                );
        }),
        catchAPIError(this),
        untilDestroyed(this),
        shareReplay(1)
    );

    private availableVariables$ = combineLatest([this.insightData$.pipe(
        filter(insightData => !!insightData),
        switchMap(insightData => {
            if (!insightData) {
                return EMPTY;
            }
            const datasetLoc = resolveSmartName(
                insightData.insight.projectKey,
                insightData.dataSpec.inputDatasetSmartName
            );
            return this.DataikuAPI.datasets.get(
                datasetLoc.projectKey,
                datasetLoc.id,
                insightData.insight.projectKey
            ).pipe(catchAPIError(this));
        })), this.sample$.pipe(startWith(undefined))]).pipe(
        map(([dataset, sample]) => {
            const schemaSource = (sample === undefined) ? dataset : sample;
            if (!schemaSource) {
                return [];
            }
            return columnsToVariables(schemaSource.schema!.columns || []);
        }),
        startWith([]),
        shareReplay(1)
    );

    private noComputationUpdated$ = new Subject<{ card: Card, result: CardResult, dataSpec: DataSpec }>();
    private updated$ = merge(combineLatest([this.updateRequests$, this.sample$, this.updateDataSpecRequests$]).pipe(
        filter(([updateReq, sample, updateDataSpecRequest]) => deepEqual(updateReq.dataSpec, sample.dataSpec) && deepEqual(updateDataSpecRequest.dataSpec, sample.dataSpec)),
        auditMap(([changes, sample]) => {
                return this.DataikuAPI.statistics.fixCard(changes.params!)
                    .pipe(
                        switchMap(fixedCard =>
                            this.computeService.computeCard(fixedCard, sample.id, false
                            ).pipe(
                                map(result => ({ card: fixedCard, result, dataSpec: changes.dataSpec }))
                            )
                        ),
                        catchAPIError(this)
                    );
        })), this.noComputationUpdated$.asObservable())
        .pipe(
            shareReplay(1)
        );

    constructor(
        DataikuAPI: DataikuAPIService,
        futureWatcherService: FutureWatcherService,
        private computeService: ComputeService,
        private waitingService: WaitingService,
    ) {
        super(DataikuAPI, futureWatcherService);

        // Tie observables lifetime to service lifetime
        // Note: do not start availableVariables$ by default because it may be unnecessary (in view only mode)
        merge(this.updated$, this.insightData$)
            .pipe(untilDestroyed(this)).subscribe();
    }

    getError() {
        return this.error$;
    }

    getUpdatedInsight() {
        return this.updated$;
    }

    pushError(error?: APIError) {
        this.error$.next(error);
    }

    forceLoadSample() {
        this.insightData$.pipe(
            take(1)
        ).subscribe(insightData => {
                this.updateDataSpec({ dataSpec: insightData.dataSpec, refreshOnDataSpecChange: false })
            });
    }

    setInsight(insight: Insight) {
        this.insight$.next(insight);
    }

    getInsightData(): Observable<InsightData | undefined> {
        return this.insightData$;
    }

    getSample(): Observable<Sample> {
        return this.sample$;
    }

    getDataSpecRequest(): Observable<DataSpec> {
        return this.updateDataSpecRequests$.pipe(
            map((d) => d.dataSpec)
        );
    }

    availableVariables(): Observable<Variable[]> {
        return this.availableVariables$;
    }

    updateDataSpec(changes: { dataSpec: DataSpec, refreshOnDataSpecChange: boolean }) {
        this.updateDataSpecRequests$.next(changes);
    }

    updateInsight(changes: { params: Card, dataSpec: DataSpec }) {
        this.updateRequests$.next(changes);
    }

    updateInsightParamsResults(changes: { card: Card, result: CardResult, dataSpec: DataSpec }) {
        this.noComputationUpdated$.next(changes);
    }

    ngOnDestroy() {
    }
}
