diff --git a/.github/workflows/reports-scheduler-test-and-build-workflow.yml b/.github/workflows/reports-scheduler-test-and-build-workflow.yml index 5148d31c..05d7ba99 100644 --- a/.github/workflows/reports-scheduler-test-and-build-workflow.yml +++ b/.github/workflows/reports-scheduler-test-and-build-workflow.yml @@ -65,6 +65,7 @@ jobs: uses: actions/upload-artifact@v4 with: name: reports-scheduler-linux + overwrite: true path: reports-scheduler-builds build-windows-macos: @@ -97,5 +98,6 @@ jobs: - name: Upload Artifacts uses: actions/upload-artifact@v4 with: - name: eports-scheduler-${{ matrix.os }} - path: eports-scheduler-builds + name: reports-scheduler-${{ matrix.os }} + overwrite: true + path: reports-scheduler-builds diff --git a/src/main/kotlin/org/opensearch/reportsscheduler/ReportsSchedulerPlugin.kt b/src/main/kotlin/org/opensearch/reportsscheduler/ReportsSchedulerPlugin.kt index c69c32c5..15c9366d 100644 --- a/src/main/kotlin/org/opensearch/reportsscheduler/ReportsSchedulerPlugin.kt +++ b/src/main/kotlin/org/opensearch/reportsscheduler/ReportsSchedulerPlugin.kt @@ -19,11 +19,13 @@ import org.opensearch.core.common.io.stream.NamedWriteableRegistry import org.opensearch.core.xcontent.NamedXContentRegistry import org.opensearch.env.Environment import org.opensearch.env.NodeEnvironment +import org.opensearch.identity.PluginSubject import org.opensearch.indices.SystemIndexDescriptor import org.opensearch.jobscheduler.spi.JobSchedulerExtension import org.opensearch.jobscheduler.spi.ScheduledJobParser import org.opensearch.jobscheduler.spi.ScheduledJobRunner import org.opensearch.plugins.ActionPlugin +import org.opensearch.plugins.IdentityAwarePlugin import org.opensearch.plugins.Plugin import org.opensearch.plugins.SystemIndexPlugin import org.opensearch.reportsscheduler.action.CreateReportDefinitionAction @@ -48,6 +50,7 @@ import org.opensearch.reportsscheduler.resthandler.ReportInstanceRestHandler import org.opensearch.reportsscheduler.resthandler.ReportStatsRestHandler import org.opensearch.reportsscheduler.scheduler.ReportDefinitionJobParser import org.opensearch.reportsscheduler.scheduler.ReportDefinitionJobRunner +import org.opensearch.reportsscheduler.security.PluginClient import org.opensearch.reportsscheduler.settings.PluginSettings import org.opensearch.repositories.RepositoriesService import org.opensearch.rest.RestController @@ -62,13 +65,14 @@ import java.util.function.Supplier * Entry point of the OpenSearch Reports scheduler plugin. * This class initializes the rest handlers. */ -class ReportsSchedulerPlugin : Plugin(), ActionPlugin, SystemIndexPlugin, JobSchedulerExtension { +class ReportsSchedulerPlugin : Plugin(), ActionPlugin, SystemIndexPlugin, IdentityAwarePlugin, JobSchedulerExtension { companion object { const val PLUGIN_NAME = "opensearch-reports-scheduler" const val LOG_PREFIX = "reports" const val BASE_REPORTS_URI = "/_plugins/_reports" const val LEGACY_BASE_REPORTS_URI = "/_opendistro/_reports" + private lateinit var pluginClient: PluginClient } /** @@ -104,9 +108,16 @@ class ReportsSchedulerPlugin : Plugin(), ActionPlugin, SystemIndexPlugin, JobSch repositoriesServiceSupplier: Supplier ): Collection { PluginSettings.addSettingsUpdateConsumer(clusterService) - ReportDefinitionsIndex.initialize(client, clusterService) - ReportInstancesIndex.initialize(client, clusterService) - return emptyList() + pluginClient = PluginClient(client) + ReportDefinitionsIndex.initialize(pluginClient, clusterService) + ReportInstancesIndex.initialize(pluginClient, clusterService) + return listOf(pluginClient) + } + + override fun assignSubject(pluginSubject: PluginSubject) { + // When security is not installed, the pluginSubject will still be assigned. + requireNotNull(pluginSubject) + pluginClient.setSubject(pluginSubject) } /** diff --git a/src/main/kotlin/org/opensearch/reportsscheduler/action/CreateReportDefinitionAction.kt b/src/main/kotlin/org/opensearch/reportsscheduler/action/CreateReportDefinitionAction.kt index 26b07126..e03394a5 100644 --- a/src/main/kotlin/org/opensearch/reportsscheduler/action/CreateReportDefinitionAction.kt +++ b/src/main/kotlin/org/opensearch/reportsscheduler/action/CreateReportDefinitionAction.kt @@ -12,21 +12,21 @@ import org.opensearch.commons.authuser.User import org.opensearch.core.xcontent.NamedXContentRegistry import org.opensearch.reportsscheduler.model.CreateReportDefinitionRequest import org.opensearch.reportsscheduler.model.CreateReportDefinitionResponse +import org.opensearch.reportsscheduler.security.PluginClient import org.opensearch.transport.TransportService -import org.opensearch.transport.client.Client /** * Create reportDefinition transport action */ internal class CreateReportDefinitionAction @Inject constructor( transportService: TransportService, - client: Client, + pluginClient: PluginClient, actionFilters: ActionFilters, val xContentRegistry: NamedXContentRegistry ) : PluginBaseAction( NAME, transportService, - client, + pluginClient, actionFilters, ::CreateReportDefinitionRequest ) { diff --git a/src/main/kotlin/org/opensearch/reportsscheduler/action/DeleteReportDefinitionAction.kt b/src/main/kotlin/org/opensearch/reportsscheduler/action/DeleteReportDefinitionAction.kt index bdbf09bc..97b7bf6c 100644 --- a/src/main/kotlin/org/opensearch/reportsscheduler/action/DeleteReportDefinitionAction.kt +++ b/src/main/kotlin/org/opensearch/reportsscheduler/action/DeleteReportDefinitionAction.kt @@ -12,21 +12,21 @@ import org.opensearch.commons.authuser.User import org.opensearch.core.xcontent.NamedXContentRegistry import org.opensearch.reportsscheduler.model.DeleteReportDefinitionRequest import org.opensearch.reportsscheduler.model.DeleteReportDefinitionResponse +import org.opensearch.reportsscheduler.security.PluginClient import org.opensearch.transport.TransportService -import org.opensearch.transport.client.Client /** * Delete reportDefinition transport action */ internal class DeleteReportDefinitionAction @Inject constructor( transportService: TransportService, - client: Client, + pluginClient: PluginClient, actionFilters: ActionFilters, val xContentRegistry: NamedXContentRegistry ) : PluginBaseAction( NAME, transportService, - client, + pluginClient, actionFilters, ::DeleteReportDefinitionRequest ) { diff --git a/src/main/kotlin/org/opensearch/reportsscheduler/action/GetAllReportDefinitionsAction.kt b/src/main/kotlin/org/opensearch/reportsscheduler/action/GetAllReportDefinitionsAction.kt index fa565b31..03f627d7 100644 --- a/src/main/kotlin/org/opensearch/reportsscheduler/action/GetAllReportDefinitionsAction.kt +++ b/src/main/kotlin/org/opensearch/reportsscheduler/action/GetAllReportDefinitionsAction.kt @@ -12,21 +12,21 @@ import org.opensearch.commons.authuser.User import org.opensearch.core.xcontent.NamedXContentRegistry import org.opensearch.reportsscheduler.model.GetAllReportDefinitionsRequest import org.opensearch.reportsscheduler.model.GetAllReportDefinitionsResponse +import org.opensearch.reportsscheduler.security.PluginClient import org.opensearch.transport.TransportService -import org.opensearch.transport.client.Client /** * Get all reportDefinitions transport action */ internal class GetAllReportDefinitionsAction @Inject constructor( transportService: TransportService, - client: Client, + pluginClient: PluginClient, actionFilters: ActionFilters, val xContentRegistry: NamedXContentRegistry ) : PluginBaseAction( NAME, transportService, - client, + pluginClient, actionFilters, ::GetAllReportDefinitionsRequest ) { diff --git a/src/main/kotlin/org/opensearch/reportsscheduler/action/GetAllReportInstancesAction.kt b/src/main/kotlin/org/opensearch/reportsscheduler/action/GetAllReportInstancesAction.kt index fa0f8588..3e3d9354 100644 --- a/src/main/kotlin/org/opensearch/reportsscheduler/action/GetAllReportInstancesAction.kt +++ b/src/main/kotlin/org/opensearch/reportsscheduler/action/GetAllReportInstancesAction.kt @@ -12,21 +12,21 @@ import org.opensearch.commons.authuser.User import org.opensearch.core.xcontent.NamedXContentRegistry import org.opensearch.reportsscheduler.model.GetAllReportInstancesRequest import org.opensearch.reportsscheduler.model.GetAllReportInstancesResponse +import org.opensearch.reportsscheduler.security.PluginClient import org.opensearch.transport.TransportService -import org.opensearch.transport.client.Client /** * Get all report instances transport action */ internal class GetAllReportInstancesAction @Inject constructor( transportService: TransportService, - client: Client, + pluginClient: PluginClient, actionFilters: ActionFilters, val xContentRegistry: NamedXContentRegistry ) : PluginBaseAction( NAME, transportService, - client, + pluginClient, actionFilters, ::GetAllReportInstancesRequest ) { diff --git a/src/main/kotlin/org/opensearch/reportsscheduler/action/GetReportDefinitionAction.kt b/src/main/kotlin/org/opensearch/reportsscheduler/action/GetReportDefinitionAction.kt index 7fac2a95..138b66df 100644 --- a/src/main/kotlin/org/opensearch/reportsscheduler/action/GetReportDefinitionAction.kt +++ b/src/main/kotlin/org/opensearch/reportsscheduler/action/GetReportDefinitionAction.kt @@ -12,21 +12,21 @@ import org.opensearch.commons.authuser.User import org.opensearch.core.xcontent.NamedXContentRegistry import org.opensearch.reportsscheduler.model.GetReportDefinitionRequest import org.opensearch.reportsscheduler.model.GetReportDefinitionResponse +import org.opensearch.reportsscheduler.security.PluginClient import org.opensearch.transport.TransportService -import org.opensearch.transport.client.Client /** * Get reportDefinition transport action */ internal class GetReportDefinitionAction @Inject constructor( transportService: TransportService, - client: Client, + pluginClient: PluginClient, actionFilters: ActionFilters, val xContentRegistry: NamedXContentRegistry ) : PluginBaseAction( NAME, transportService, - client, + pluginClient, actionFilters, ::GetReportDefinitionRequest ) { diff --git a/src/main/kotlin/org/opensearch/reportsscheduler/action/GetReportInstanceAction.kt b/src/main/kotlin/org/opensearch/reportsscheduler/action/GetReportInstanceAction.kt index 68349e82..b4309770 100644 --- a/src/main/kotlin/org/opensearch/reportsscheduler/action/GetReportInstanceAction.kt +++ b/src/main/kotlin/org/opensearch/reportsscheduler/action/GetReportInstanceAction.kt @@ -12,21 +12,21 @@ import org.opensearch.commons.authuser.User import org.opensearch.core.xcontent.NamedXContentRegistry import org.opensearch.reportsscheduler.model.GetReportInstanceRequest import org.opensearch.reportsscheduler.model.GetReportInstanceResponse +import org.opensearch.reportsscheduler.security.PluginClient import org.opensearch.transport.TransportService -import org.opensearch.transport.client.Client /** * Get report instance transport action */ internal class GetReportInstanceAction @Inject constructor( transportService: TransportService, - client: Client, + pluginClient: PluginClient, actionFilters: ActionFilters, val xContentRegistry: NamedXContentRegistry ) : PluginBaseAction( NAME, transportService, - client, + pluginClient, actionFilters, ::GetReportInstanceRequest ) { diff --git a/src/main/kotlin/org/opensearch/reportsscheduler/action/InContextReportCreateAction.kt b/src/main/kotlin/org/opensearch/reportsscheduler/action/InContextReportCreateAction.kt index c2757afb..97d97ebf 100644 --- a/src/main/kotlin/org/opensearch/reportsscheduler/action/InContextReportCreateAction.kt +++ b/src/main/kotlin/org/opensearch/reportsscheduler/action/InContextReportCreateAction.kt @@ -12,21 +12,21 @@ import org.opensearch.commons.authuser.User import org.opensearch.core.xcontent.NamedXContentRegistry import org.opensearch.reportsscheduler.model.InContextReportCreateRequest import org.opensearch.reportsscheduler.model.InContextReportCreateResponse +import org.opensearch.reportsscheduler.security.PluginClient import org.opensearch.transport.TransportService -import org.opensearch.transport.client.Client /** * In-Context ReportCreate transport action */ internal class InContextReportCreateAction @Inject constructor( transportService: TransportService, - client: Client, + pluginClient: PluginClient, actionFilters: ActionFilters, val xContentRegistry: NamedXContentRegistry ) : PluginBaseAction( NAME, transportService, - client, + pluginClient, actionFilters, ::InContextReportCreateRequest ) { diff --git a/src/main/kotlin/org/opensearch/reportsscheduler/action/OnDemandReportCreateAction.kt b/src/main/kotlin/org/opensearch/reportsscheduler/action/OnDemandReportCreateAction.kt index 35bf77db..c8018686 100644 --- a/src/main/kotlin/org/opensearch/reportsscheduler/action/OnDemandReportCreateAction.kt +++ b/src/main/kotlin/org/opensearch/reportsscheduler/action/OnDemandReportCreateAction.kt @@ -12,21 +12,21 @@ import org.opensearch.commons.authuser.User import org.opensearch.core.xcontent.NamedXContentRegistry import org.opensearch.reportsscheduler.model.OnDemandReportCreateRequest import org.opensearch.reportsscheduler.model.OnDemandReportCreateResponse +import org.opensearch.reportsscheduler.security.PluginClient import org.opensearch.transport.TransportService -import org.opensearch.transport.client.Client /** * On-Demand ReportCreate transport action */ internal class OnDemandReportCreateAction @Inject constructor( transportService: TransportService, - client: Client, + pluginClient: PluginClient, actionFilters: ActionFilters, val xContentRegistry: NamedXContentRegistry ) : PluginBaseAction( NAME, transportService, - client, + pluginClient, actionFilters, ::OnDemandReportCreateRequest ) { diff --git a/src/main/kotlin/org/opensearch/reportsscheduler/action/PluginBaseAction.kt b/src/main/kotlin/org/opensearch/reportsscheduler/action/PluginBaseAction.kt index b2400a6a..0a39df53 100644 --- a/src/main/kotlin/org/opensearch/reportsscheduler/action/PluginBaseAction.kt +++ b/src/main/kotlin/org/opensearch/reportsscheduler/action/PluginBaseAction.kt @@ -13,7 +13,6 @@ import org.opensearch.OpenSearchStatusException import org.opensearch.action.ActionRequest import org.opensearch.action.support.ActionFilters import org.opensearch.action.support.HandledTransportAction -import org.opensearch.common.util.concurrent.ThreadContext import org.opensearch.commons.ConfigConstants.OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT import org.opensearch.commons.authuser.User import org.opensearch.core.action.ActionListener @@ -25,16 +24,16 @@ import org.opensearch.index.engine.VersionConflictEngineException import org.opensearch.indices.InvalidIndexNameException import org.opensearch.reportsscheduler.ReportsSchedulerPlugin.Companion.LOG_PREFIX import org.opensearch.reportsscheduler.metrics.Metrics +import org.opensearch.reportsscheduler.security.PluginClient import org.opensearch.reportsscheduler.util.logger import org.opensearch.tasks.Task import org.opensearch.transport.TransportService -import org.opensearch.transport.client.Client import java.io.IOException abstract class PluginBaseAction( name: String, transportService: TransportService, - val client: Client, + val pluginClient: PluginClient, actionFilters: ActionFilters, requestReader: Writeable.Reader ) : HandledTransportAction(name, transportService, actionFilters, requestReader) { @@ -53,15 +52,11 @@ abstract class PluginBaseAction ) { val userStr: String? = - client.threadPool().threadContext.getTransient(OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT) + pluginClient.threadPool().threadContext.getTransient(OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT) val user: User? = User.parse(userStr) - val storedThreadContext = client.threadPool().threadContext.newStoredContext(false) scope.launch { try { - client.threadPool().threadContext.stashContext().use { - storedThreadContext.restore() - listener.onResponse(executeRequest(request, user)) - } + listener.onResponse(executeRequest(request, user)) } catch (exception: OpenSearchStatusException) { Metrics.REPORT_EXCEPTIONS_ES_STATUS_EXCEPTION.counter.increment() log.warn("$LOG_PREFIX:OpenSearchStatusException: message:${exception.message}") @@ -113,43 +108,4 @@ abstract class PluginBaseAction T.use(block: (T) -> R): R { - var exception: Throwable? = null - try { - return block(this) - } catch (e: Throwable) { - exception = e - throw e - } finally { - closeFinally(exception) - } - } - - /** - * Closes this [AutoCloseable], suppressing possible exception or error thrown by [AutoCloseable.close] function when - * it's being closed due to some other [cause] exception occurred. - * - * The suppressed exception is added to the list of suppressed exceptions of [cause] exception. - */ - @Suppress("TooGenericExceptionCaught") - private fun ThreadContext.StoredContext.closeFinally(cause: Throwable?) = when (cause) { - null -> close() - else -> try { - close() - } catch (closeException: Throwable) { - cause.addSuppressed(closeException) - } - } } diff --git a/src/main/kotlin/org/opensearch/reportsscheduler/action/UpdateReportDefinitionAction.kt b/src/main/kotlin/org/opensearch/reportsscheduler/action/UpdateReportDefinitionAction.kt index c9bc5598..20250611 100644 --- a/src/main/kotlin/org/opensearch/reportsscheduler/action/UpdateReportDefinitionAction.kt +++ b/src/main/kotlin/org/opensearch/reportsscheduler/action/UpdateReportDefinitionAction.kt @@ -12,21 +12,21 @@ import org.opensearch.commons.authuser.User import org.opensearch.core.xcontent.NamedXContentRegistry import org.opensearch.reportsscheduler.model.UpdateReportDefinitionRequest import org.opensearch.reportsscheduler.model.UpdateReportDefinitionResponse +import org.opensearch.reportsscheduler.security.PluginClient import org.opensearch.transport.TransportService -import org.opensearch.transport.client.Client /** * Update reportDefinitions transport action */ internal class UpdateReportDefinitionAction @Inject constructor( transportService: TransportService, - client: Client, + pluginClient: PluginClient, actionFilters: ActionFilters, val xContentRegistry: NamedXContentRegistry ) : PluginBaseAction( NAME, transportService, - client, + pluginClient, actionFilters, ::UpdateReportDefinitionRequest ) { diff --git a/src/main/kotlin/org/opensearch/reportsscheduler/action/UpdateReportInstanceStatusAction.kt b/src/main/kotlin/org/opensearch/reportsscheduler/action/UpdateReportInstanceStatusAction.kt index acbf8255..eb44f400 100644 --- a/src/main/kotlin/org/opensearch/reportsscheduler/action/UpdateReportInstanceStatusAction.kt +++ b/src/main/kotlin/org/opensearch/reportsscheduler/action/UpdateReportInstanceStatusAction.kt @@ -12,21 +12,21 @@ import org.opensearch.commons.authuser.User import org.opensearch.core.xcontent.NamedXContentRegistry import org.opensearch.reportsscheduler.model.UpdateReportInstanceStatusRequest import org.opensearch.reportsscheduler.model.UpdateReportInstanceStatusResponse +import org.opensearch.reportsscheduler.security.PluginClient import org.opensearch.transport.TransportService -import org.opensearch.transport.client.Client /** * Update ReportInstance Status transport action */ internal class UpdateReportInstanceStatusAction @Inject constructor( transportService: TransportService, - client: Client, + pluginClient: PluginClient, actionFilters: ActionFilters, val xContentRegistry: NamedXContentRegistry ) : PluginBaseAction( NAME, transportService, - client, + pluginClient, actionFilters, ::UpdateReportInstanceStatusRequest ) { diff --git a/src/main/kotlin/org/opensearch/reportsscheduler/index/ReportDefinitionsIndex.kt b/src/main/kotlin/org/opensearch/reportsscheduler/index/ReportDefinitionsIndex.kt index 40f7fa87..6a7062ef 100644 --- a/src/main/kotlin/org/opensearch/reportsscheduler/index/ReportDefinitionsIndex.kt +++ b/src/main/kotlin/org/opensearch/reportsscheduler/index/ReportDefinitionsIndex.kt @@ -12,6 +12,7 @@ import org.opensearch.action.delete.DeleteRequest import org.opensearch.action.get.GetRequest import org.opensearch.action.index.IndexRequest import org.opensearch.action.search.SearchRequest +import org.opensearch.action.support.WriteRequest import org.opensearch.action.update.UpdateRequest import org.opensearch.cluster.service.ClusterService import org.opensearch.common.unit.TimeValue @@ -27,7 +28,6 @@ import org.opensearch.reportsscheduler.model.RestTag.ACCESS_LIST_FIELD import org.opensearch.reportsscheduler.model.RestTag.TENANT_FIELD import org.opensearch.reportsscheduler.model.RestTag.UPDATED_TIME_FIELD import org.opensearch.reportsscheduler.settings.PluginSettings -import org.opensearch.reportsscheduler.util.SecureIndexClient import org.opensearch.reportsscheduler.util.logger import org.opensearch.search.builder.SearchSourceBuilder import org.opensearch.transport.client.Client @@ -51,7 +51,7 @@ internal object ReportDefinitionsIndex { * @param clusterService The ES cluster service */ fun initialize(client: Client, clusterService: ClusterService) { - this.client = SecureIndexClient(client) + this.client = client this.clusterService = clusterService } @@ -68,15 +68,13 @@ internal object ReportDefinitionsIndex { .mapping(indexMappingSource, XContentType.YAML) .settings(indexSettingsSource, XContentType.YAML) try { - client.threadPool().threadContext.stashContext().use { - val actionFuture = client.admin().indices().create(request) - val response = actionFuture.actionGet(PluginSettings.operationTimeoutMs) - if (response.isAcknowledged) { - log.info("$LOG_PREFIX:Index $REPORT_DEFINITIONS_INDEX_NAME creation Acknowledged") - } else { - Metrics.REPORT_DEFINITION_CREATE_SYSTEM_ERROR.counter.increment() - error("$LOG_PREFIX:Index $REPORT_DEFINITIONS_INDEX_NAME creation not Acknowledged") - } + val actionFuture = client.admin().indices().create(request) + val response = actionFuture.actionGet(PluginSettings.operationTimeoutMs) + if (response.isAcknowledged) { + log.info("$LOG_PREFIX:Index $REPORT_DEFINITIONS_INDEX_NAME creation Acknowledged") + } else { + Metrics.REPORT_DEFINITION_CREATE_SYSTEM_ERROR.counter.increment() + error("$LOG_PREFIX:Index $REPORT_DEFINITIONS_INDEX_NAME creation not Acknowledged") } } catch (exception: ResourceAlreadyExistsException) { log.warn("message: ${exception.message}") @@ -108,6 +106,7 @@ internal object ReportDefinitionsIndex { createIndex() val indexRequest = IndexRequest(REPORT_DEFINITIONS_INDEX_NAME) .source(reportDefinitionDetails.toXContent()) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) .create(true) val actionFuture = client.index(indexRequest) val response = actionFuture.actionGet(PluginSettings.operationTimeoutMs) @@ -192,6 +191,7 @@ internal object ReportDefinitionsIndex { createIndex() val updateRequest = UpdateRequest() .index(REPORT_DEFINITIONS_INDEX_NAME) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) .id(id) .doc(reportDefinitionDetails.toXContent()) .fetchSource(true) @@ -213,6 +213,7 @@ internal object ReportDefinitionsIndex { createIndex() val deleteRequest = DeleteRequest() .index(REPORT_DEFINITIONS_INDEX_NAME) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) .id(id) val actionFuture = client.delete(deleteRequest) val response = actionFuture.actionGet(PluginSettings.operationTimeoutMs) diff --git a/src/main/kotlin/org/opensearch/reportsscheduler/index/ReportInstancesIndex.kt b/src/main/kotlin/org/opensearch/reportsscheduler/index/ReportInstancesIndex.kt index c09f3ba3..241a5424 100644 --- a/src/main/kotlin/org/opensearch/reportsscheduler/index/ReportInstancesIndex.kt +++ b/src/main/kotlin/org/opensearch/reportsscheduler/index/ReportInstancesIndex.kt @@ -12,6 +12,7 @@ import org.opensearch.action.delete.DeleteRequest import org.opensearch.action.get.GetRequest import org.opensearch.action.index.IndexRequest import org.opensearch.action.search.SearchRequest +import org.opensearch.action.support.WriteRequest import org.opensearch.action.update.UpdateRequest import org.opensearch.cluster.service.ClusterService import org.opensearch.common.unit.TimeValue @@ -27,7 +28,6 @@ import org.opensearch.reportsscheduler.model.RestTag.ACCESS_LIST_FIELD import org.opensearch.reportsscheduler.model.RestTag.TENANT_FIELD import org.opensearch.reportsscheduler.model.RestTag.UPDATED_TIME_FIELD import org.opensearch.reportsscheduler.settings.PluginSettings -import org.opensearch.reportsscheduler.util.SecureIndexClient import org.opensearch.reportsscheduler.util.logger import org.opensearch.search.builder.SearchSourceBuilder import org.opensearch.transport.client.Client @@ -51,7 +51,7 @@ internal object ReportInstancesIndex { * @param clusterService The ES cluster service */ fun initialize(client: Client, clusterService: ClusterService) { - this.client = SecureIndexClient(client) + this.client = client this.clusterService = clusterService } @@ -68,14 +68,12 @@ internal object ReportInstancesIndex { .mapping(indexMappingSource, XContentType.YAML) .settings(indexSettingsSource, XContentType.YAML) try { - client.threadPool().threadContext.stashContext().use { - val actionFuture = client.admin().indices().create(request) - val response = actionFuture.actionGet(PluginSettings.operationTimeoutMs) - if (response.isAcknowledged) { - log.info("$LOG_PREFIX:Index $REPORT_INSTANCES_INDEX_NAME creation Acknowledged") - } else { - error("$LOG_PREFIX:Index $REPORT_INSTANCES_INDEX_NAME creation not Acknowledged") - } + val actionFuture = client.admin().indices().create(request) + val response = actionFuture.actionGet(PluginSettings.operationTimeoutMs) + if (response.isAcknowledged) { + log.info("$LOG_PREFIX:Index $REPORT_INSTANCES_INDEX_NAME creation Acknowledged") + } else { + error("$LOG_PREFIX:Index $REPORT_INSTANCES_INDEX_NAME creation not Acknowledged") } } catch (exception: ResourceAlreadyExistsException) { log.warn("message: ${exception.message}") @@ -106,6 +104,7 @@ internal object ReportInstancesIndex { createIndex() val indexRequest = IndexRequest(REPORT_INSTANCES_INDEX_NAME) .source(reportInstance.toXContent()) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) .create(true) val actionFuture = client.index(indexRequest) val response = actionFuture.actionGet(PluginSettings.operationTimeoutMs) @@ -190,6 +189,7 @@ internal object ReportInstancesIndex { .index(REPORT_INSTANCES_INDEX_NAME) .id(reportInstance.id) .doc(reportInstance.toXContent()) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) .fetchSource(true) val actionFuture = client.update(updateRequest) val response = actionFuture.actionGet(PluginSettings.operationTimeoutMs) diff --git a/src/main/kotlin/org/opensearch/reportsscheduler/security/PluginClient.kt b/src/main/kotlin/org/opensearch/reportsscheduler/security/PluginClient.kt new file mode 100644 index 00000000..6f272969 --- /dev/null +++ b/src/main/kotlin/org/opensearch/reportsscheduler/security/PluginClient.kt @@ -0,0 +1,64 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.reportsscheduler.security + +import org.apache.logging.log4j.LogManager +import org.apache.logging.log4j.Logger +import org.opensearch.action.ActionRequest +import org.opensearch.action.ActionType +import org.opensearch.common.CheckedRunnable +import org.opensearch.core.action.ActionListener +import org.opensearch.core.action.ActionResponse +import org.opensearch.identity.Subject +import org.opensearch.transport.client.Client +import org.opensearch.transport.client.FilterClient + +/** + * A special client for executing transport actions as this plugin's system subject. + */ +class PluginClient : FilterClient { + private var subject: Subject? = null + + constructor(delegate: Client) : super(delegate) + + constructor(delegate: Client, subject: Subject) : super(delegate) { + this.subject = subject + } + + fun setSubject(subject: Subject) { + this.subject = subject + } + + override fun doExecute( + action: ActionType?, + request: Request?, + listener: ActionListener? + ) { + checkNotNull(subject) { "PluginClient is not initialized." } + threadPool().getThreadContext().newStoredContext(false).use { ctx -> + subject!!.runAs( + CheckedRunnable { + Companion.logger.info( + "Running transport action with subject: {}", + subject!!.getPrincipal().getName() + ) + super.doExecute( + action, + request, + ActionListener.runBefore(listener, CheckedRunnable { ctx.restore() }) + ) + } + ) + } + } + + companion object { + private val logger: Logger = LogManager.getLogger(PluginClient::class.java) + } +} diff --git a/src/main/kotlin/org/opensearch/reportsscheduler/util/SecureIndexClient.kt b/src/main/kotlin/org/opensearch/reportsscheduler/util/SecureIndexClient.kt deleted file mode 100644 index 62edcc76..00000000 --- a/src/main/kotlin/org/opensearch/reportsscheduler/util/SecureIndexClient.kt +++ /dev/null @@ -1,303 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.reportsscheduler.util - -import org.opensearch.action.ActionRequest -import org.opensearch.action.ActionType -import org.opensearch.action.bulk.BulkRequest -import org.opensearch.action.bulk.BulkResponse -import org.opensearch.action.delete.DeleteRequest -import org.opensearch.action.delete.DeleteResponse -import org.opensearch.action.explain.ExplainRequest -import org.opensearch.action.explain.ExplainResponse -import org.opensearch.action.fieldcaps.FieldCapabilitiesRequest -import org.opensearch.action.fieldcaps.FieldCapabilitiesResponse -import org.opensearch.action.get.GetRequest -import org.opensearch.action.get.GetResponse -import org.opensearch.action.get.MultiGetRequest -import org.opensearch.action.get.MultiGetResponse -import org.opensearch.action.index.IndexRequest -import org.opensearch.action.index.IndexResponse -import org.opensearch.action.search.ClearScrollRequest -import org.opensearch.action.search.ClearScrollResponse -import org.opensearch.action.search.MultiSearchRequest -import org.opensearch.action.search.MultiSearchResponse -import org.opensearch.action.search.SearchRequest -import org.opensearch.action.search.SearchResponse -import org.opensearch.action.search.SearchScrollRequest -import org.opensearch.action.termvectors.MultiTermVectorsRequest -import org.opensearch.action.termvectors.MultiTermVectorsResponse -import org.opensearch.action.termvectors.TermVectorsRequest -import org.opensearch.action.termvectors.TermVectorsResponse -import org.opensearch.action.update.UpdateRequest -import org.opensearch.action.update.UpdateResponse -import org.opensearch.common.action.ActionFuture -import org.opensearch.common.util.concurrent.ThreadContext -import org.opensearch.core.action.ActionListener -import org.opensearch.core.action.ActionResponse -import org.opensearch.transport.client.Client - -/** - * Wrapper class on [Client] with security context removed. - */ -@Suppress("TooManyFunctions") -internal class SecureIndexClient(private val client: Client) : Client by client { - /** - * {@inheritDoc} - */ - override fun execute( - action: ActionType, - request: Request - ): ActionFuture { - client.threadPool().threadContext.stashContext().use { return client.execute(action, request) } - } - - /** - * {@inheritDoc} - */ - override fun execute( - action: ActionType, - request: Request, - listener: ActionListener - ) { - client.threadPool().threadContext.stashContext().use { return client.execute(action, request, listener) } - } - - /** - * {@inheritDoc} - */ - override fun index(request: IndexRequest): ActionFuture { - client.threadPool().threadContext.stashContext().use { return client.index(request) } - } - - /** - * {@inheritDoc} - */ - override fun index(request: IndexRequest, listener: ActionListener) { - client.threadPool().threadContext.stashContext().use { return client.index(request, listener) } - } - - /** - * {@inheritDoc} - */ - override fun update(request: UpdateRequest): ActionFuture { - client.threadPool().threadContext.stashContext().use { return client.update(request) } - } - - /** - * {@inheritDoc} - */ - override fun update(request: UpdateRequest, listener: ActionListener) { - client.threadPool().threadContext.stashContext().use { return client.update(request, listener) } - } - - /** - * {@inheritDoc} - */ - override fun delete(request: DeleteRequest): ActionFuture { - client.threadPool().threadContext.stashContext().use { return client.delete(request) } - } - - /** - * {@inheritDoc} - */ - override fun delete(request: DeleteRequest, listener: ActionListener) { - client.threadPool().threadContext.stashContext().use { return client.delete(request, listener) } - } - - /** - * {@inheritDoc} - */ - override fun bulk(request: BulkRequest): ActionFuture { - client.threadPool().threadContext.stashContext().use { return client.bulk(request) } - } - - /** - * {@inheritDoc} - */ - override fun bulk(request: BulkRequest, listener: ActionListener) { - client.threadPool().threadContext.stashContext().use { return client.bulk(request, listener) } - } - - /** - * {@inheritDoc} - */ - override fun get(request: GetRequest): ActionFuture { - client.threadPool().threadContext.stashContext().use { return client.get(request) } - } - - /** - * {@inheritDoc} - */ - override fun get(request: GetRequest, listener: ActionListener) { - client.threadPool().threadContext.stashContext().use { return client.get(request, listener) } - } - - /** - * {@inheritDoc} - */ - override fun multiGet(request: MultiGetRequest): ActionFuture { - client.threadPool().threadContext.stashContext().use { return client.multiGet(request) } - } - - /** - * {@inheritDoc} - */ - override fun multiGet(request: MultiGetRequest, listener: ActionListener) { - client.threadPool().threadContext.stashContext().use { return client.multiGet(request, listener) } - } - - /** - * {@inheritDoc} - */ - override fun search(request: SearchRequest): ActionFuture { - client.threadPool().threadContext.stashContext().use { return client.search(request) } - } - - /** - * {@inheritDoc} - */ - override fun search(request: SearchRequest, listener: ActionListener) { - client.threadPool().threadContext.stashContext().use { return client.search(request, listener) } - } - - /** - * {@inheritDoc} - */ - override fun searchScroll(request: SearchScrollRequest): ActionFuture { - client.threadPool().threadContext.stashContext().use { return client.searchScroll(request) } - } - - /** - * {@inheritDoc} - */ - override fun searchScroll(request: SearchScrollRequest, listener: ActionListener) { - client.threadPool().threadContext.stashContext().use { return client.searchScroll(request, listener) } - } - - /** - * {@inheritDoc} - */ - override fun multiSearch(request: MultiSearchRequest): ActionFuture { - client.threadPool().threadContext.stashContext().use { return client.multiSearch(request) } - } - - /** - * {@inheritDoc} - */ - override fun multiSearch(request: MultiSearchRequest, listener: ActionListener) { - client.threadPool().threadContext.stashContext().use { return client.multiSearch(request, listener) } - } - - /** - * {@inheritDoc} - */ - override fun termVectors(request: TermVectorsRequest): ActionFuture { - client.threadPool().threadContext.stashContext().use { return client.termVectors(request) } - } - - /** - * {@inheritDoc} - */ - override fun termVectors(request: TermVectorsRequest, listener: ActionListener) { - client.threadPool().threadContext.stashContext().use { return client.termVectors(request, listener) } - } - - /** - * {@inheritDoc} - */ - override fun multiTermVectors(request: MultiTermVectorsRequest): ActionFuture { - client.threadPool().threadContext.stashContext().use { return client.multiTermVectors(request) } - } - - /** - * {@inheritDoc} - */ - override fun multiTermVectors(request: MultiTermVectorsRequest, listener: ActionListener) { - client.threadPool().threadContext.stashContext().use { return client.multiTermVectors(request, listener) } - } - - /** - * {@inheritDoc} - */ - override fun explain(request: ExplainRequest): ActionFuture { - client.threadPool().threadContext.stashContext().use { return client.explain(request) } - } - - /** - * {@inheritDoc} - */ - override fun explain(request: ExplainRequest, listener: ActionListener) { - client.threadPool().threadContext.stashContext().use { return client.explain(request, listener) } - } - - /** - * {@inheritDoc} - */ - override fun clearScroll(request: ClearScrollRequest): ActionFuture { - client.threadPool().threadContext.stashContext().use { return client.clearScroll(request) } - } - - /** - * {@inheritDoc} - */ - override fun clearScroll(request: ClearScrollRequest, listener: ActionListener) { - client.threadPool().threadContext.stashContext().use { return client.clearScroll(request, listener) } - } - - /** - * {@inheritDoc} - */ - override fun fieldCaps(request: FieldCapabilitiesRequest): ActionFuture { - client.threadPool().threadContext.stashContext().use { return client.fieldCaps(request) } - } - - /** - * {@inheritDoc} - */ - override fun fieldCaps(request: FieldCapabilitiesRequest, listener: ActionListener) { - client.threadPool().threadContext.stashContext().use { return client.fieldCaps(request, listener) } - } - - /** - * Executes the given [block] function on this resource and then closes it down correctly whether an exception - * is thrown or not. - * - * In case if the resource is being closed due to an exception occurred in [block], and the closing also fails with an exception, - * the latter is added to the [suppressed][java.lang.Throwable.addSuppressed] exceptions of the former. - * - * @param block a function to process this [AutoCloseable] resource. - * @return the result of [block] function invoked on this resource. - */ - @Suppress("TooGenericExceptionCaught") - private inline fun T.use(block: (T) -> R): R { - var exception: Throwable? = null - try { - return block(this) - } catch (e: Throwable) { - exception = e - throw e - } finally { - closeFinally(exception) - } - } - - /** - * Closes this [AutoCloseable], suppressing possible exception or error thrown by [AutoCloseable.close] function when - * it's being closed due to some other [cause] exception occurred. - * - * The suppressed exception is added to the list of suppressed exceptions of [cause] exception. - */ - @Suppress("TooGenericExceptionCaught") - private fun ThreadContext.StoredContext.closeFinally(cause: Throwable?) = when (cause) { - null -> close() - else -> try { - close() - } catch (closeException: Throwable) { - cause.addSuppressed(closeException) - } - } -}