import {
    Observable,
    ReplaySubject,
    type Subscription as ObservableSubscription,
} from 'rxjs';
import { get, once } from 'lodash';
import { _ } from '@splunk/ui-utils/i18n';
import type { DataSource, ObservableData } from '@splunk/datasources';
import type {
    DataSourceDefinition,
    DataSourceMeta,
    RequestParams,
    RootDataSourcesDefinition,
    BroadcastEventCallback,
    BroadcastEventArgs,
    PresetUtility,
    ErrorPayload,
} from '@splunk/dashboard-types';
import { isRiskyCommandError } from '@splunk/dashboard-utils';
import Subscription from './Subscription';
import RefreshScheduler, { parseExprToSeconds } from './RefreshScheduler';
import type RiskyCommandController from './RiskyCommandController';

// eslint-disable-next-line @typescript-eslint/no-explicit-any
type ExtendedDataSource = DataSource<any, any> & {
    completeSearchQuery?: string;
    getRefreshConfig?: () => {
        refresh?: string | number;
        refreshType?: string;
    };
    getMetaData?: () => DataSourceMeta;
};

type DataSourceControllerContext = {
    enableRiskyCommand?: boolean;
    [key: string]: unknown;
};

type ReplaySubjectInterface = { meta?: DataSourceMeta; error?: ErrorPayload };

/**
 * A control wrapper on top of actual DataSource instance
 */
class DataSourceController {
    id: string;

    baseChainModel?: RootDataSourcesDefinition;

    consumerIds: Set<string>;

    context?: DataSourceControllerContext;

    dataSource?: ExtendedDataSource;

    dataSourceEventCallback?: BroadcastEventCallback;

    dataSourceSubs: Map<string, Subscription>;

    def: DataSourceDefinition & ReplaySubjectInterface;

    enableRiskyCommand: boolean;

    handleSubscriptionCancelOverride?: boolean;

    parent?: DataSourceController;

    parentDataSourceSub: Subscription | null = null;

    parentStatusSub: ObservableSubscription | null = null;

    preset: PresetUtility;

    query: string | undefined;

    refreshing?: boolean;

    riskyCommandController?: RiskyCommandController;

    scheduler?: RefreshScheduler;

    setupOnce: () => Promise<void>;

    setupError: string | null = null;

    subs: Map<string, Subscription>;

    statusSubject: ReplaySubject<ReplaySubjectInterface>;

    teardownCallback?: () => void;

    additionalDataSourceIds: Set<string>;

    private isSetup = false;

    /**
     *
     * @param {o.String} id DataSource Id
     * @param {o.Object} dataSourceDef DataSource definition
     * @param {o.Object} dataSourceContext DataSource context
     * @param {o.Object} preset dashboard preset
     * @param {o.Object} riskyCommandController RiskyCommandController
     */
    constructor({
        id,
        dataSourceDef,
        dataSourceContext,
        preset,
        riskyCommandController,
    }: {
        id: string;
        dataSourceDef: DataSourceDefinition;
        dataSourceContext?: DataSourceControllerContext;
        preset: PresetUtility;
        riskyCommandController?: RiskyCommandController;
    }) {
        this.id = id;
        this.def = dataSourceDef;
        this.preset = preset;
        this.context = dataSourceContext;
        this.setupOnce = once(async () => {
            await this.setup();
            this.isSetup = true;
        });
        // subscriptions
        this.subs = new Map();
        // dataSourceSubs
        this.dataSourceSubs = new Map();
        this.consumerIds = new Set();
        this.query = this.def.options?.query as string | undefined;
        this.riskyCommandController = riskyCommandController;
        this.enableRiskyCommand = !!this.context?.enableRiskyCommand;
        // ReplaySubject used instead of a BehaviorSubject because we don't want the subject to emit when there
        // hasn't been a status update yet. A BehaviorSubject would require a default value and will immediately emit.
        this.statusSubject = new ReplaySubject(1);
        this.additionalDataSourceIds = new Set();
    }

    /**
     * setup DataSource auto refresh
     */
    setupAutoRefresh(): void {
        let refresh: string | number | undefined;
        let refreshType: string | undefined;
        if (this.dataSource && this.dataSource.getRefreshConfig) {
            const refreshConfig = this.dataSource.getRefreshConfig();
            refresh = refreshConfig?.refresh;
            refreshType = refreshConfig?.refreshType;
        } else {
            refresh = this.def.options?.refresh as string | number | undefined;
            refreshType = this.def.options?.refreshType as string | undefined;
        }

        if (!refresh) {
            return;
        }
        const refreshInterval = parseExprToSeconds(refresh);
        if (!refreshInterval) {
            return;
        }

        if (this.scheduler) {
            this.scheduler.stop();
        }
        this.scheduler = new RefreshScheduler({
            refreshFunc: this.refresh,
            refreshInterval,
            refreshType,
        });
    }

    /**
     * get existing additional datasource ids
     * @returns {Set} additional Datasource Ids
     */
    getAdditionalDataSourceIds(): DataSourceController['additionalDataSourceIds'] {
        return this.additionalDataSourceIds;
    }

    /**
     * get existing subscriptions
     * @returns {Map} subscriptions
     */
    getSubscriptions = (): DataSourceController['subs'] => {
        return this.subs;
    };

    /**
     * Get a specific subscription
     * @param {string} consumerId identifier for the consumer+binding
     * @returns {Subscription | undefined}
     */
    getSubscription(consumerId: string) {
        return this.subs.get(consumerId);
    }

    /**
     * get existing data source subscriptions
     * @returns {Map} data source subscriptions
     */
    getDataSourceSubscriptions = (): DataSourceController['dataSourceSubs'] => {
        return this.dataSourceSubs;
    };

    get completeSearchQuery(): string {
        return this.parent
            ? `${this.parent.completeSearchQuery}\n${this.query}`
            : this.query ?? '';
    }

    /**
     * check if a consumer is subscribed to this data source controller
     * @param {String} consumerId
     * @returns {Boolean}
     */
    isSubscribed(consumerId: string): boolean {
        return this.subs.has(consumerId);
    }

    /**
     * check if viz or input is attached to the data source
     * @returns {Boolean}
     */
    isVizOrInputAttached = (): boolean => {
        return this.subs.size > this.dataSourceSubs.size;
    };

    /**
     * setup DataSource
     */
    setup = async (): Promise<void> => {
        try {
            const { completeSearchQuery } = this;
            this.dataSource = this.preset?.createDataSource(
                this.def.type,
                {
                    ...this.def.options,
                    checkRiskyCommand:
                        this.enableRiskyCommand &&
                        this.riskyCommandController?.shouldCheckRiskyCommand(
                            completeSearchQuery
                        ),
                },
                this.context,
                this.def.meta,
                this.id,
                this.baseChainModel
            );
            this.setupError = null;

            if (this.parent) {
                await this.subscribeToParentDataSourceController();
            }

            if (this.dataSource) {
                await this.dataSource.setup();
            }

            this.setupAutoRefresh();
            this.broadcast({
                eventType: 'datasource.setup',
                targetId: this.id,
                payload: {},
            });
        } catch (ex) {
            this.setupAutoRefresh();
            this.setupError = (ex as Error).message;
        }
    };

    /**
     * teardown DataSource
     */
    teardown = async (): Promise<void> => {
        try {
            if (this.parentDataSourceSub) {
                this.parentDataSourceSub.cancel();
                this.parentDataSourceSub = null;
            }
            if (this.parentStatusSub) {
                this.parentStatusSub.unsubscribe();
                this.parentStatusSub = null;
            }
            if (this.dataSource) {
                await this.dataSource.teardown();
            }
            this.broadcast({
                eventType: 'datasource.teardown',
                targetId: this.id,
                payload: {},
            });
        } catch (ex) {
            // ignore teardown error
        }
    };

    /**
     * pause DataSource
     * temporarily use teardown datasources instead of pausing
     * this should be fixed when adding resume datasources function
     */
    pause = (): void => {
        if (this.scheduler) {
            this.scheduler.stop();
        }
        this.teardown();
    };

    /**
     * Refresh the base data source, if has parent subscription, otherwise refresh current data source
     * @param {Boolean} checkRiskyCommand optional, by default assuming all searches are risky
     */
    refresh = async ({ checkRiskyCommand = true } = {}): Promise<void> => {
        if (!checkRiskyCommand && this.enableRiskyCommand) {
            if (!this.parent || this.dataSource?.options?.sid) {
                this.riskyCommandController?.addToDisableCheckList(
                    this.completeSearchQuery
                );
            }
        }

        if (this.parentDataSourceSub) {
            if (!checkRiskyCommand && this.parent) {
                await this.parent.refresh({ checkRiskyCommand });
            } else {
                await this.getBase().refresh({ checkRiskyCommand });
            }
        } else {
            await this.reset();
        }
    };

    /**
     * reset the current DataSource
     */
    reset = async (): Promise<void> => {
        if (!this.refreshing) {
            try {
                this.refreshing = true;
                // we need to wait till teardown completed here.
                await this.teardown();
                await this.setup();
            } catch (ex) {
                this.setupError = (ex as Error).message;
            } finally {
                this.refreshing = false;
                // refresh all subscriptions
                this.refreshSubscriptions();
            }
        }
    };

    /**
     * refresh all subscriptions
     */
    refreshSubscriptions(): void {
        // refresh all subscriptions
        this.subs.forEach((sub) => {
            sub.refresh();
        });
    }

    /**
     * additional dataSource Ids
     * @param dataSourceId
     */
    addAdditionalDataSourceId = (dataSourceId: string): void => {
        this.additionalDataSourceIds.add(dataSourceId);
    };

    /**
     * broadcast a DataSource event
     * @param {Object} event DataSource lifecycle event
     */
    broadcast = (event: BroadcastEventArgs): void => {
        if (typeof this.dataSourceEventCallback !== 'function') {
            return;
        }

        const eventCallback = this.dataSourceEventCallback;

        eventCallback(event);
        this.additionalDataSourceIds.forEach((dataSourceId) => {
            eventCallback({
                ...event,
                targetId: dataSourceId,
            });
        });
    };

    /**
     * register teardown callback
     * @param {Function} teardownCallback teardown callback
     */
    onTeardown = (teardownCallback: () => void): void => {
        this.teardownCallback = teardownCallback;
    };

    /**
     * register DataSource event callback
     * @param {Function} eventCallback callback
     */
    onDataSourceEvent = (
        eventCallback: (args: BroadcastEventArgs) => void
    ): void => {
        this.dataSourceEventCallback = eventCallback;
    };

    /**
     * handle subscription cancel
     * @param {String} consumerId DataSource consumerId, this will either be viz or input id
     */
    handleSubscriptionCancel = (consumerId: string): void => {
        this.subs.delete(consumerId);
        this.dataSourceSubs.delete(consumerId);

        // teardown datasource when no consumers
        if (this.subs.size === 0) {
            if (!this.handleSubscriptionCancelOverride) {
                // we don't need to wait till teardown complete
                this.teardown();
                if (this.scheduler) {
                    this.scheduler.stop();
                }
                if (this.teardownCallback) {
                    this.teardownCallback();
                }
            } else {
                this.handleSubscriptionCancelOverride = false;
            }
        }
    };

    handleSubscriptionStart = (): void => {
        if (this.scheduler && this.scheduler.refreshType === 'interval') {
            this.scheduler.scheduleNextRefresh();
        }
    };

    handleSubscriptionComplete = (): void => {
        if (this.scheduler && this.scheduler.refreshType === 'delay') {
            this.scheduler.scheduleNextRefresh();
        }
    };

    /**
     * request a new dataset
     * @param {Object} requestParams requestParams
     * @returns {Observable} Observable instance that will emit data or error
     */
    request = (requestParams?: RequestParams): Observable<ObservableData> => {
        if (!this.dataSource) {
            throw new Error(_('DataSource is not set up.'));
        }

        const result = this.refreshing
            ? Observable.of()
            : this.dataSource.request(requestParams);
        return typeof (result as Observable<ObservableData>).subscribe ===
            'function'
            ? result
            : Observable.create(result);
    };

    /**
     * create a new DataSource subscription
     * @param {String} consumerId DataSource consumerId, this will either be viz or input id
     * @param {Object} initialRequestParams initial requestParams
     * @param {Boolean} isDataSource
     * @returns {Subscription} DataSource subscription
     */
    subscribe = async ({
        consumerId,
        initialRequestParams,
        isDataSource,
    }: {
        consumerId: string;
        initialRequestParams?: RequestParams;
        isDataSource?: boolean;
    }): Promise<Subscription> => {
        // always try to call setup first if this.isSetup is not true
        if (!this.isSetup) {
            await this.setupOnce();
        }

        const sub = new Subscription(this, consumerId, initialRequestParams);
        this.subs.set(consumerId, sub);
        this.consumerIds.add(consumerId);
        sub.onStart(this.handleSubscriptionStart);
        sub.onCancel(this.handleSubscriptionCancel);
        sub.onComplete(this.handleSubscriptionComplete);

        if (isDataSource) {
            this.dataSourceSubs.set(consumerId, sub);
        }

        return sub;
    };

    /**
     * get DataSource metadata
     * @returns {Object} DataSource meta data
     */
    getDataSourceMetaData = (): DataSourceMeta => {
        return this.dataSource?.getMetaData?.() ?? {};
    };

    /**
     * Subscribe to parent data source controller
     */
    async subscribeToParentDataSourceController(): Promise<void> {
        if (this.parentDataSourceSub) {
            this.parentDataSourceSub.cancel();
            this.parentDataSourceSub = null;
        }

        if (this.parentStatusSub) {
            this.parentStatusSub.unsubscribe();
            this.parentStatusSub = null;
        }

        if (!this.dataSource) {
            return;
        }

        const precedingSearchStatusObject = {
            meta: {
                statusMessage: _(
                    'Paused: Dependent on successful preceding search.'
                ),
            },
        };

        if (this.parent) {
            this.parentStatusSub = this.parent
                .getStatusObservable()
                .subscribe(({ error: statusError, meta }) => {
                    if (
                        statusError?.message &&
                        isRiskyCommandError(statusError.message)
                    ) {
                        if (this.parent?.isVizOrInputAttached()) {
                            this.statusSubject.next(
                                precedingSearchStatusObject
                            );
                            return;
                        }
                    }
                    this.statusSubject.next({ error: statusError, meta });
                });

            this.parentDataSourceSub = await this.parent?.subscribe({
                consumerId: this.id,
                isDataSource: true,
                initialRequestParams: {
                    count: 1,
                    offset: 0,
                    progress: true,
                },
            });
        }

        const { next, error } = this.dataSource.getObserver();
        // this sends out results_preview requests
        this.parentDataSourceSub?.subscribeToData({
            onData: (args) => {
                const { completeSearchQuery } = this;
                const checkRiskyCommand =
                    this.enableRiskyCommand &&
                    this.riskyCommandController?.shouldCheckRiskyCommand(
                        completeSearchQuery
                    );

                const meta = get(args, 'meta', {});
                const newMeta = {
                    ...meta,
                    checkRiskyCommand,
                    completeSearchQuery,
                };
                next({ ...args, ...{ meta: newMeta } });
                if (this.dataSource?.shouldRefreshSubscription()) {
                    this.refreshSubscriptions();
                }
            },
            onError: (args) => {
                error(args);
                if (isRiskyCommandError(args.message)) {
                    if (this.parent?.isVizOrInputAttached()) {
                        this.statusSubject.next(precedingSearchStatusObject);
                    } else {
                        const { completeSearchQuery } = this;
                        this.statusSubject.next({
                            error: {
                                ...args,
                                meta: {
                                    ...args.meta,
                                    completeSearchQuery,
                                },
                            },
                        });
                    }
                }
                if (this.dataSource?.shouldRefreshSubscription()) {
                    this.refreshSubscriptions();
                }
            },
        });
    }

    /**
     * Generate the id and definition data
     */
    getIdAndDefinition(): {
        id: string;
        definition: DataSourceController['def'];
    } {
        return {
            id: this.id,
            definition: this.def,
        };
    }

    /**
     * Sets the current parent in the data source
     * @param {DataSourceController} parent
     */
    setParent(parent: DataSourceController): void {
        this.parent = parent;
    }

    getStatusObservable(): DataSourceController['statusSubject'] {
        return this.statusSubject;
    }

    /**
     * Get the base data source for the current one
     */
    getBase(): DataSourceController {
        return this.parent ? this.parent.getBase() : this;
    }

    /**
     * Set the base chain model
     * @param {Object} baseChainModel
     */
    setBaseChainModel(baseChainModel: RootDataSourcesDefinition): void {
        this.baseChainModel = baseChainModel;
    }

    /**
     * Function to determine if consumer id has previously subscribed
     * @param {String} consumerId
     * @returns {Boolean}
     */
    isConsumerSubscribedBefore(consumerId: string): boolean {
        return this.consumerIds.has(consumerId);
    }

    /**
     * Function to override handleSubscriptionCancel
     */
    setHandleSubscriptionCancelOverride(): void {
        this.handleSubscriptionCancelOverride = true;
    }
}

export default DataSourceController;
