import { Observable } from 'rxjs';
import type { Subscriber } from 'rxjs';
import { pick, isEqual } from 'lodash';
import memoizeOne from 'memoize-one';
import moment from 'moment';
import qs from 'query-string';
import { SearchService } from '@splunkdev/cloud-sdk/services/search';
import type {
    Message,
    SearchJob,
    UpdateJob,
} from '@splunkdev/cloud-sdk/services/search';
import {
    ServiceClient,
    RequestQueueManagerParams,
} from '@splunkdev/cloud-sdk/client';
import type { QueryArgs } from '@splunkdev/cloud-sdk/client';
import { DataSet } from '@splunk/datasource-utils';
import { _ } from '@splunk/ui-utils/i18n';
import type { SplunkAuthClient } from '@splunkdev/cloud-auth-client';
import type {
    DataSourceConfigMeta,
    FieldObj,
    JSONArray,
    RequestParams,
    DataSourceMeta,
    JobStatus,
} from '@splunk/dashboard-types';
import { getDefaultDataSourceName, wait } from '@splunk/dashboard-utils';
import DataSource from './DataSource';
import addLeadingSearchCommand from './utils/addLeadingSearchCommand';
import SimpleScheduler from './utils/SimpleScheduler';
import CloudSearchOptionsSchema from './CloudSearchOptionsSchema';
import { getDefaultOptionsForSearchQuery } from './utils/SearchConfigUtils';
import type { ObservableData } from './types';

// This is not exported from the cloud-sdk/client for use
interface RequestQueueParams {
    initialTimeout: number;
    exponent: number;
    retries: number;
    maxInFlight: number;
    // currently unused by udf
    enableRetryHeader?: boolean;
}

export interface CloudSearchOptions {
    query: string;
    queryParameters: {
        earliest: string;
        latest: string;
    };
    requiredFreshness?: number;
    module?: string;
    sid?: string;
    [key: string]: unknown;
}

export interface CloudSearchContextConfig {
    cloudApiUrl?: string;
    accessToken?: string;
    authClient: InstanceType<typeof SplunkAuthClient>;
    tenantId: string;
    searchSubmitQueueParams?: Partial<RequestQueueParams>;
    searchQueueParams?: Partial<RequestQueueParams>;
    requestQueueManagerParams?: Partial<RequestQueueParams>;
    defaultModule?: string;
    [key: string]: unknown;
}

interface PageData extends JSONArray {
    nextLink?: string;
    wait?: string;
}

interface QueryParameters {
    earliest: string;
    earliestTime: string;
    // eslint-disable-next-line camelcase
    earliest_time: string;
    latest: string;
    latestTime: string;
    // eslint-disable-next-line camelcase
    latest_time: string;
    timezone: string;
}

interface NormalizedQueryParameters {
    earliest: string;
    latest: string;
    timezone: string;
}

type CurrentJob = Awaited<ReturnType<SearchService['getJob']>> | null;

export type CloudSearchConfigMeta = DataSourceConfigMeta<
    typeof CloudSearchOptionsSchema
>;

/**
 * Get status message based on search job properties
 * @private
 */
export const getStatusMessage = ({
    status,
    totalCount,
}: {
    status?: JobStatus | null;
    totalCount?: number;
}): string => {
    if (status === 'running' && totalCount === 0) {
        return _(
            'Search is running, but not enough data to render visualization'
        );
    }

    if (status === 'done' && totalCount === 0) {
        return _('Search ran successfully, but no results were returned');
    }

    if (!status) {
        return _('Search is not yet running, queued on client');
    }

    return '';
};

/**
 * get sdk client
 * @param {String} cloudApiUrl cloud api url
 * @param {String} accessToken access token as string
 * @param {CloudAuthClient} authClient cloud-auth instance
 * @param {String} tenantId tenant id
 */
const getSdkClient = ({
    cloudApiUrl,
    accessToken,
    authClient,
    tenantId,
    searchSubmitQueueParams = {},
    searchQueueParams = {},
}: CloudSearchContextConfig) => {
    const serviceClient = new ServiceClient({
        urls: cloudApiUrl ? { api: cloudApiUrl } : undefined,
        tokenSource: async () => accessToken || authClient.getAccessToken(),
        defaultTenant: tenantId,
        requestQueueManagerParams: new RequestQueueManagerParams(
            {
                // These are taken from overrides in DefaultQueueManagerParams (https://github.com/splunk/splunk-cloud-sdk-js) (src/client.ts)
                retries: 6,
                initialTimeout: 1000,
                exponent: 1.6,
                maxInFlight: 3,
            },
            // FYI 'search-submit' queue is to start search job while 'search' queue is for search status and results requests.
            new Map([
                [
                    'search-submit', // SEARCH_SUBMIT_QUEUE is not exported (https://github.com/splunk/splunk-cloud-sdk-js) (src/client.ts)
                    {
                        retries: searchSubmitQueueParams?.retries ?? 10,
                        initialTimeout:
                            searchSubmitQueueParams?.initialTimeout ?? 1000,
                        exponent: searchSubmitQueueParams?.exponent ?? 1.6,
                        maxInFlight: searchSubmitQueueParams?.maxInFlight ?? 10,
                    },
                ],
                [
                    'search',
                    {
                        retries: searchQueueParams?.retries ?? 10,
                        initialTimeout:
                            searchQueueParams?.initialTimeout ?? 500,
                        exponent: searchQueueParams?.exponent ?? 2,
                        maxInFlight: searchQueueParams?.maxInFlight ?? 10,
                    },
                ],
            ])
        ),
    });
    return new SearchService(serviceClient);
};

/**
 * a cached version of getSdkClient;
 */
const getCachedSdkClient = memoizeOne(getSdkClient, isEqual);

/**
 * polling intervals
 */
const JOB_POLLING_INTERVAL = 400;
const RESULTS_POLLING_INTERVAL = 500;
const OTHER_SEARCH_PARAMS = [
    'extractAllFields',
    'timeFormat',
    'maxTime',
    'timeOfSearch',
    'enablePreview',
    'requiredFreshness',
];

/**
 * Find message in job status
 * @param {Array} messages
 * @param {String} type
 */
const findMessages = (messages: Message[] = [], type = 'INFO') =>
    messages.filter((m) => m.type === type);

/**
 * Extract error message from messages lists
 */
const extractErrorMessage = (messages: Message[] = []) =>
    findMessages(messages, 'ERROR')
        .map((m: Message) => m.text)
        .join(',');

/**
 * check if job is done
 * @param {Object} job job properties
 */
export const isJobDone = (job?: SearchJob | null): boolean =>
    job?.status === 'done';

/**
 * check if job is failed
 * @param {Object} job job instance
 * @returns {Boolean}
 */
export const isJobFailed = (job?: SearchJob | null): boolean =>
    job?.status === 'failed';

/**
 * check if job is either done or failed
 * @param {Object} job job instance
 * @returns {Boolean}
 */
export const isJobCompleted = (job?: SearchJob | null): boolean =>
    !!job && (isJobDone(job) || isJobFailed(job));

/**
 * should update job stats
 * @param {Object} job job instance
 * @returns {Boolean}
 */
export const shouldUpdateJob = (job?: SearchJob | null): boolean =>
    !job || !isJobCompleted(job);

/**
 * return true if we need to fetch current result.
 * @param {Object} job job instance
 * @param {Boolean} [progress=true] whether allow fetching intermediate data
 * @private
 */
export const shouldFetch = (job: SearchJob, progress = true): boolean => {
    // disable fetch when search is running
    return progress || isJobDone(job);
};

const normalizeEarliest = (
    queryParameters: Partial<QueryParameters>
): string => {
    let earliest =
        queryParameters.earliest ||
        queryParameters.earliestTime ||
        queryParameters.earliest_time ||
        // another normalization to make both customers and search service API happy
        '0';

    // this is to be compliant with the `v2beta1` version of SCP search API, sigh...
    // here's its requirement: "When specifying an absolute time specify either UNIX time, or UTC in seconds using the ISO-8601 (%FT%T.%Q) format. For example 2019-01-25T13:15:30Z. GMT is the default timezone. You must specify GMT when you specify UTC. Any offset specified is ignored."
    // so basically it is saying if the time format is ISO it must ends with 'Z'
    // additionally, SCP doesn't support user timezone setting, so we agreed on setting to browser's timezone!
    if (
        moment(earliest, moment.ISO_8601, true).isValid() &&
        !earliest.endsWith('Z')
    ) {
        earliest = moment(earliest).toISOString();
    }

    return earliest;
};

const normalizeLatest = (queryParameters: Partial<QueryParameters>): string => {
    let latest =
        queryParameters.latest ||
        queryParameters.latestTime ||
        queryParameters.latest_time ||
        // This is to make sure current react time range picker works. Remove at your own peril!
        'now';

    // this is to be compliant with the `v2beta1` version of SCP search API, sigh...
    // here's its requirement: "When specifying an absolute time specify either UNIX time, or UTC in seconds using the ISO-8601 (%FT%T.%Q) format. For example 2019-01-25T13:15:30Z. GMT is the default timezone. You must specify GMT when you specify UTC. Any offset specified is ignored."
    // so basically it is saying if the time format is ISO it must ends with 'Z'
    // additionally, SCP doesn't support user timezone setting, so we agreed on setting to browser's timezone!
    if (
        moment(latest, moment.ISO_8601, true).isValid() &&
        !latest.endsWith('Z')
    ) {
        latest = moment(latest).toISOString();
    }

    return latest;
};

const normalizeTimezone = (queryParameters: Partial<QueryParameters>): string =>
    queryParameters.timezone ||
    Intl.DateTimeFormat().resolvedOptions().timeZone;

/**
 * Normalize queryParameters so that search API v1beta1 doesn't throw error
 * @param {Object} queryParameters
 */
export const normalizeQueryParameters = (
    queryParameters: Partial<QueryParameters> = {}
): NormalizedQueryParameters => ({
    earliest: normalizeEarliest(queryParameters),
    latest: normalizeLatest(queryParameters),
    timezone: normalizeTimezone(queryParameters),
});

/**
 * an object with supported time durations and their multiplier
 * to convert to milliseconds. don't expect wait time to be > minutes
 */
const supportedTimeDurations: Record<string, number> = {
    ms: 1,
    s: 1000,
    m: 60000,
};

const numberUnitRegex = /(\d+)(\w+)/;
/**
 * convert the wait string returned by the endpoint to milliseconds
 * @param {String} wait time as string e.g. '5s' is 5 seconds, '500ms' is 500 milliseconds
 */
export const getWaitTimeInMs = (waitStr?: string): number => {
    if (!waitStr) {
        return 0;
    }
    const matches = waitStr.match(numberUnitRegex);
    if (!matches || matches.length !== 3) {
        return 0;
    }
    const unit = matches[2]; // this assumes we always have a unit
    const time = parseInt(matches[1], 10);
    if (supportedTimeDurations[unit]) {
        return time * supportedTimeDurations[unit];
    }
    return time;
};

/**
 * Split a relative url into pathname and query string parts
 * @param {String} relativeUrl A relative url
 * @returns {Object} { pathname, query }
 */
const parseNextLink = (relativeUrl: string) => qs.parseUrl(relativeUrl);

/**
 * fetch paged data or data that is not ready yet
 *
 * @param {SDKClient} client sdk client
 * @param {String} link to page to be fetched
 * @param {Number} time to wait in milliseconds (if result set is not ready yet)
 */
export const fetchPagedData = async (
    client: ServiceClient,
    link: string,
    timeToWait = 0,
    queryData: QueryArgs = {}
): Promise<JSONArray> => {
    if (timeToWait) {
        await wait(timeToWait);
    }
    // sdk is using { query: {} } to instruct query parameters lol.
    const pageResponse = await client.get('api', link, { query: queryData });
    // Casting here is sketchy as the client code doesn't really specify this very well.
    const pageData = pageResponse.body as PageData;
    let { fields = [], results = [] } = pageData;
    if (pageData.nextLink) {
        const { url, query } = parseNextLink(pageData.nextLink);
        const nextPageData = await fetchPagedData(
            client,
            url,
            getWaitTimeInMs(pageData.wait),
            query
        );

        results = results.concat(nextPageData.results);
        fields = fields || nextPageData.fields;
    }
    return {
        fields,
        results,
    };
};

/**
 * fetch actual data from 'source' endpoint.
 * @param {SDKClient} client search service sdk client
 * @param {Object} requestParams requestParams from request function.
 * @param {Object} job search job
 */
export const fetchData = async (
    searchServiceClient: SearchService,
    requestParams: RequestParams,
    job: CurrentJob
): Promise<DataSet | null> => {
    // bail if we haven't started a job
    if (!job?.sid) {
        return null;
    }
    let data: DataSet | null = null;

    const fetchParams = pick(requestParams, [
        'offset',
        'count',
        'field',
        'search',
    ]);

    const fetchResultFn = job.enablePreview
        ? searchServiceClient.listPreviewResults
        : searchServiceClient.listResults;
    const response = await fetchResultFn(job.sid, fetchParams);
    let { fields = [], results = [] } = response;
    // if there is a nextLink
    // the result set either exceeds the maximum page size or is not ready yet
    // https://confluence.splunk.com/display/PROD/Implementation+of+Server+Driven+Paging+for+Search+Service
    if (response.nextLink) {
        const { url, query } = parseNextLink(response.nextLink);
        const pageData = await fetchPagedData(
            searchServiceClient.client,
            url,
            getWaitTimeInMs(response.wait),
            query
        );

        fields = fields.length ? fields : pageData.fields;
        results = results.concat(pageData.results);
    }
    // We will emit data out when
    // 1) response contains data (fields.length && results.length)
    // 2) job is complete, in this case we need to emit dataset even it's empty
    if (isJobCompleted(job) || (fields.length && results.length)) {
        data = DataSet.fromJSONArray(fields as FieldObj[], results);
    }
    return data;
};

/**
 * pick datasource metadata from job properties.
 * @param {Object} job job properties
 * @returns {Object} job metadata object normalized as per go/search-job-metadata
 */
export const fetchMetaData = (job: SearchJob): DataSourceMeta => {
    const jobProperties = pick(job, [
        'status',
        'sid',
        'percentComplete',
    ]) as DataSourceMeta;
    // we always include totalCount to show warning on viz if 0 results returned
    jobProperties.totalCount = job.resultsAvailable ?? 0;
    // Enterprise has real-time searches, SCS does not so always set to false.
    jobProperties.isRealTimeSearch = false;
    jobProperties.statusMessage = getStatusMessage(
        pick(jobProperties, ['status', 'totalCount', 'isRealTimeSearch'])
    );
    jobProperties.lastUpdated = job?.completionTime || job?.dispatchTime;
    return jobProperties;
};

/**
 * Create a new `CloudSearch` datasource instance. Datasource is a module that provides data into a visualization.
 * One datasource can bind to multiple viz while one viz can be powered by multiple datasources.
 * This is specific to SDC search framework and use js-sdk search framework.
 * Examples:
 * ```js
 *      try {
 *          const datasource = new CloudSearch(options, context);
 *      } catch (e) {
 *          // Handle error here.
 *      }
 *```
 * @param {Object} options - Search parameters.
 * @param {String} options.query - The search string. This is the only required search param.
 * @param {String} options.queryParameters.earliestTime - earliest time of the query
 * @param {String} options.queryParameters.latestTime - latest time of the query
 * @param {String} options.requiredFreshness - freshness of search used for search job caching
 * @param {String} options.module - search module
 * @param {Object} context - ssc config parameters.
 * @param {Object} context.authClient - instance of @splunk/ssc-auth
 * @param {String} context.tenantId - current user tenantId to make the request to sdc search service
 * @returns {CloudSearch} A new CloudSearch datasource instance.
 * @public
 */
class CloudSearch extends DataSource<
    CloudSearchOptions,
    CloudSearchContextConfig
> {
    static schema = CloudSearchOptionsSchema;

    static config: CloudSearchConfigMeta;

    private client: ReturnType<typeof getSdkClient> | null = null;

    private currentJob: CurrentJob;

    caughtError: string | null;

    isTeardown: boolean;

    private jobId: string | undefined;

    private updateJobScheduler: InstanceType<typeof SimpleScheduler> | null =
        null;

    constructor(
        options: CloudSearchOptions = {} as CloudSearchOptions,
        context: CloudSearchContextConfig = {} as CloudSearchContextConfig
    ) {
        super(options, context);
        this.currentJob = null;
        this.caughtError = null;
        this.isTeardown = false;
        this.updateJob = this.updateJob.bind(this);

        if (!this.options.query && !this.options.sid) {
            throw Error('search string or sid is required!');
        }
        if (!this.context.tenantId) {
            throw Error('tenantId is required');
        }
        if (
            this.context.accessToken == null &&
            this.context.authClient == null
        ) {
            throw Error('authClient or accessToken is required');
        }
    }

    /**
     * create a sdk client
     */
    getServiceClient(): ReturnType<typeof getSdkClient> {
        if (this.client === null) {
            const {
                cloudApiUrl,
                accessToken,
                authClient,
                tenantId,
                cacheClient = true,
                searchQueueParams,
            } = this.context;
            // For backwards compatibility, previously it was 'requestQueueManagerParams'. Now it's renamed since we have 2 queues with diff params.
            const searchSubmitQueueParams:
                | Partial<RequestQueueParams>
                | undefined =
                this.context.searchSubmitQueueParams ||
                this.context.requestQueueManagerParams;

            const getFn = cacheClient ? getCachedSdkClient : getSdkClient;
            this.client = getFn({
                cloudApiUrl,
                accessToken,
                authClient,
                tenantId,
                searchSubmitQueueParams,
                searchQueueParams,
            });
        }

        return this.client;
    }

    /**
     * Creates a search job and returns a jobId.
     *
     * Examples:
     * ```js
     *      const datasource = new CloudSearch(options, context);
     *      const sid = await datasource.setup();
     *```
     * @throws {error} If request to sdk endpoint fails
     * @returns {Promise<string>} The SID of created search.
     * @public
     */
    async setup(): Promise<string | null> {
        // Must do an explicit undefined check to allow the user specified default module value of empty string ""
        const module =
            this.options.module === undefined
                ? this.context.defaultModule
                : this.options.module;

        const jobParams: SearchJob = {
            // we by default enable preview unless user set `enablePreview` to false in options
            enablePreview: true,
            ...pick(this.options, OTHER_SEARCH_PARAMS),
            module,
            // When a sid is passed, it may not have a query
            query: this.options.query
                ? addLeadingSearchCommand(this.options.query)
                : this.options.query,
        };

        if (this.options.queryParameters) {
            jobParams.queryParameters = normalizeQueryParameters(
                this.options.queryParameters
            );
        }

        try {
            const job = this.options.sid
                ? await this.getServiceClient().getJob(this.options.sid)
                : await this.getServiceClient().createJob(jobParams);
            if (!job) {
                throw new Error(
                    this.options.sid ? 'job not found' : 'unknown error'
                );
            }
            this.jobId = job.sid;
            this.updateJobScheduler = SimpleScheduler.createScheduler(
                this.updateJob
            );
            this.updateJobScheduler.start();
        } catch (ex) {
            if (this.options.sid) {
                // eslint-disable-next-line no-console
                console.error('failed to retrieve search job:', ex);
            } else {
                // eslint-disable-next-line no-console
                console.error('failed to create search job:', ex);
            }
            this.caughtError = (ex as Error).message;
            return null;
        }

        return this.jobId ?? null;
    }

    /**
     * Compute the next fetch interval, return false to terminal the loop
     */
    computeNextFetchDelay = (): false | number => {
        if (isJobCompleted(this.currentJob) || this.isTeardown) {
            return false; // no next fetch;
        }
        return RESULTS_POLLING_INTERVAL;
    };

    /**
     * Update observer based on current job status. Has side-effects due to api fetch
     * @param {Subscriber<ObservableData>} obj.observer
     * @param {RequestParams} obj.requestParams
     */
    private async handleJobRequest({
        observer,
        requestParams,
    }: {
        observer: Subscriber<ObservableData>;
        requestParams: RequestParams;
    }): Promise<void> {
        // Bail if there is no job to fetch data from
        if (!this.currentJob) {
            return;
        }
        const params = {
            offset: 0,
            count: 0,
            progress: true,
            ...requestParams,
        };
        let data: Awaited<ReturnType<typeof fetchData>> = null;

        switch (this.currentJob.status) {
            case 'failed':
                observer.error({
                    level: 'error',
                    message: extractErrorMessage(this.currentJob.messages),
                });
                break;
            case 'running':
            case 'done':
                if (shouldFetch(this.currentJob, params.progress)) {
                    data = await fetchData(
                        this.getServiceClient(),
                        params,
                        this.currentJob
                    );
                    if (data) {
                        observer.next({
                            data,
                            meta: fetchMetaData(this.currentJob),
                        });
                    }
                }
                break;
            default:
                break;
        }
        if (isJobDone(this.currentJob)) {
            observer.complete();
        }
    }

    /**
     * Supports getResults as exposed
     * by [@splunk/cloud-sdk](https://github.com/splunk/splunk-cloud-sdk-js) (src/search.ts)
     *
     * Examples:
     * ```js
     *      const datasource = new CloudSearch(options, context);
     *      datasource.setup().then(() => {
     *          datasource.request().subscribe({
     *              complete() {
     *                 // search completed
     *              },
     *              next(obj) {
     *                  // Process results
     *              },
     *              error() {
     *                  // Handle error
     *              },
     *         });
     *     });
     * ```
     * @param {Object} requestParams search request parameters.
     * @param {Number} requestParams.offset data offset.
     * @param {Number} requestParams.count data count.
     * @param {String} requestParams.progress whether to return intermediate search result
     * @returns {function(*)}
     * @public
     */
    request(requestParams: RequestParams = {}): Observable<ObservableData> {
        return Observable.create((observer: Subscriber<ObservableData>) => {
            let resultScheduler: InstanceType<typeof SimpleScheduler> | null =
                null;
            if (this.caughtError) {
                observer.error({
                    level: 'error',
                    message: this.caughtError,
                });
            } else {
                resultScheduler = SimpleScheduler.createScheduler(async () => {
                    try {
                        /**
                         * This function will perform following tasks in order:
                         * 1. given the latest job state, fetch results and compute metadata
                         * 2. compute the next fetch delay or stop the scheduler
                         */
                        await this.handleJobRequest({
                            observer,
                            requestParams,
                        });
                        return this.computeNextFetchDelay();
                    } catch (ex) {
                        observer.error({
                            level: 'error',
                            message: (ex as Error).message,
                        });
                        return false; // stop next fetch once we have error
                    }
                });
                // kick off the loop
                resultScheduler.start();
            }
            return () => {
                // clean up function when no consumer subscribe to this observable
                if (resultScheduler) {
                    resultScheduler.stop();
                    resultScheduler = null;
                }
            };
        });
    }

    /**
     * This is how the search sdk expects delete to work,
     * but it is commented out on ssc-search spec which means that this method can't really be used right now.
     * When available to use, it will delete the search job with the given jobId
     *
     * Examples:
     * ```js
     *   const response = await datasource.teardown().then(
     *      value => {
     *          console.log(`Should have deleted the job: ${value}`);
     *          },
     *      reason => {
     *          console.log(`Failed to delete the job: ${reason}`);
     *         }
     *   );
     *```
     * @returns {Promise<*>}
     * @public
     */
    async teardown(): Promise<null> {
        if (this.updateJobScheduler) {
            this.updateJobScheduler.stop();
        }
        /**
         * cancelling a non-running search will return 400... i'm sick with search service
         * that push all server state to client that make the system complicated as HELL
         */
        if (this.jobId && !isJobCompleted(this.currentJob)) {
            await this.getServiceClient().updateJob(this.jobId, {
                status: 'canceled',
            } as UpdateJob);
        }
        this.isTeardown = true;
        return null;
    }

    /**
     * Given the job id, update the job stats and compute the next update delay
     */
    async updateJob(): Promise<false | number> {
        try {
            if (this.jobId && shouldUpdateJob(this.currentJob)) {
                this.currentJob = (await this.getServiceClient().getJob(
                    this.jobId
                )) as CurrentJob;
                return JOB_POLLING_INTERVAL;
            }
        } catch (ex) {
            this.caughtError = (ex as Error).message;
        }
        return false;
    }
}

CloudSearch.config = {
    title: _('Search'),
    displayDataSourceItemListByDefault: true,
    canCreateDataSource: true,
    dataSourceRemoveVerb: 'delete',
    isDataSourceNameEditable: true,
    getDataSourceName: getDefaultDataSourceName,
    defaultOptions: ({ definition }) =>
        getDefaultOptionsForSearchQuery(definition),
    editorConfig: [],
    optionsSchema: CloudSearchOptionsSchema,
};

export default CloudSearch;
