diff --git a/.gitignore b/.gitignore index ac667e3be..491e872e2 100644 --- a/.gitignore +++ b/.gitignore @@ -10,4 +10,4 @@ out/ .project .settings .vscode -bin/ \ No newline at end of file +bin/plugins/*.zip diff --git a/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java b/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java index 5c749629f..4105f9c27 100644 --- a/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java +++ b/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java @@ -12,6 +12,7 @@ import java.util.List; import java.util.function.Supplier; import org.opensearch.action.ActionRequest; +import org.opensearch.rest.RestHeaderDefinition; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.service.ClusterService; @@ -70,11 +71,27 @@ * Plugin class for Query Insights. */ public class QueryInsightsPlugin extends Plugin implements ActionPlugin, TelemetryAwarePlugin { + + /** + * Custom task header for tracking the application/source that initiated a query. + */ + public static final String APPLICATION_ID_HEADER = "X-QI-Application-Id"; + /** * Default constructor */ public QueryInsightsPlugin() {} + @Override + public Collection getRestHeaders() { + return List.of(new RestHeaderDefinition(APPLICATION_ID_HEADER, false)); + } + + @Override + public Collection getTaskHeaders() { + return List.of(APPLICATION_ID_HEADER); + } + @Override public Collection createComponents( final Client client, diff --git a/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java b/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java index 83810be1f..b0856275b 100644 --- a/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java +++ b/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java @@ -42,11 +42,13 @@ import org.opensearch.common.inject.Inject; import org.opensearch.core.index.Index; import org.opensearch.core.tasks.resourcetracker.TaskResourceInfo; +import org.opensearch.plugin.insights.QueryInsightsPlugin; import org.opensearch.plugin.insights.core.auth.UserPrincipalContext; import org.opensearch.plugin.insights.core.metrics.OperationalMetric; import org.opensearch.plugin.insights.core.metrics.OperationalMetricsCounter; import org.opensearch.plugin.insights.core.service.FinishedQueriesCache; import org.opensearch.plugin.insights.core.service.QueryInsightsService; +import org.opensearch.plugin.insights.core.service.TopQueriesService; import org.opensearch.plugin.insights.core.service.categorizer.QueryShapeGenerator; import org.opensearch.plugin.insights.rules.model.Attribute; import org.opensearch.plugin.insights.rules.model.Measurement; @@ -223,7 +225,29 @@ public boolean isEnabled() { } @Override - public void onPhaseStart(SearchPhaseContext context) {} + public void onPhaseStart(SearchPhaseContext context) { + // Capture user identity at search phase start for live queries + try { + if (context.getTask() != null) { + long taskIdNum = context.getTask().getId(); + String nodeId = clusterService.localNode().getId(); + String taskId = nodeId + ":" + taskIdNum; + // Only capture once per task + if (queryInsightsService.getLiveQueryUserInfo(taskId) == null) { + UserPrincipalContext ctx = new UserPrincipalContext(threadPool); + String rawUserStr = ctx.getUserString(); + log.debug("onPhaseStart taskId={} rawUserStr={}", taskId, rawUserStr); + UserPrincipalContext.UserPrincipalInfo userInfo = ctx.extractUserInfo(); + if (userInfo != null) { + log.debug("Captured user for task {}: {}", taskId, userInfo.getUserName()); + queryInsightsService.putLiveQueryUserInfo(taskId, userInfo); + } + } + } + } catch (Exception e) { + log.debug("Failed to capture user info in onPhaseStart", e); + } + } @Override public void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {} @@ -250,6 +274,14 @@ public void onRequestFailure(final SearchPhaseContext context, final SearchReque private void addToFinishedCache(SearchPhaseContext context, SearchQueryRecord record) { try { + // Clean up live query user map + if (context.getTask() != null) { + long taskIdNum = context.getTask().getId(); + String nodeId = clusterService.localNode().getId(); + queryInsightsService.removeLiveQueryUserInfo(nodeId + ":" + taskIdNum); + } + // Extract user info into record attributes before caching + TopQueriesService.setUserInfo(record); FinishedQueriesCache cache = queryInsightsService.getFinishedQueriesCache(); if (cache == null || record == null) return; cache.capture(record, context.getTask().getId()); @@ -373,6 +405,11 @@ private SearchQueryRecord constructSearchQueryRecord( if (userProvidedLabel != null) { labels.put(Task.X_OPAQUE_ID, userProvidedLabel); } + // Retrieve custom source tracking headers + String applicationId = context.getTask().getHeader(QueryInsightsPlugin.APPLICATION_ID_HEADER); + if (applicationId != null) { + labels.put(QueryInsightsPlugin.APPLICATION_ID_HEADER, applicationId); + } attributes.put(Attribute.LABELS, labels); UserPrincipalContext userPrincipalContext = threadPool != null ? new UserPrincipalContext(threadPool) : null; diff --git a/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java b/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java index 6a1ce0ad7..06eb3b274 100644 --- a/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java +++ b/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java @@ -66,6 +66,7 @@ import org.opensearch.threadpool.Scheduler; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.client.Client; +import org.opensearch.plugin.insights.core.auth.UserPrincipalContext; /** * Service responsible for gathering, analyzing, storing and exporting @@ -143,6 +144,7 @@ public class QueryInsightsService extends AbstractLifecycleComponent { private LocalIndexLifecycleManager localIndexLifecycleManager; private final FinishedQueriesCache finishedQueriesCache; + private final java.util.concurrent.ConcurrentHashMap liveQueryUserMap = new java.util.concurrent.ConcurrentHashMap<>(); SinkType sinkType; @@ -673,6 +675,18 @@ public FinishedQueriesCache getFinishedQueriesCache() { return finishedQueriesCache; } + public void putLiveQueryUserInfo(String taskId, UserPrincipalContext.UserPrincipalInfo userInfo) { + liveQueryUserMap.put(taskId, userInfo); + } + + public UserPrincipalContext.UserPrincipalInfo getLiveQueryUserInfo(String taskId) { + return liveQueryUserMap.get(taskId); + } + + public void removeLiveQueryUserInfo(String taskId) { + liveQueryUserMap.remove(taskId); + } + /** * Returns true if the finished queries cache is enabled (idle timeout is non-zero). */ diff --git a/src/main/java/org/opensearch/plugin/insights/rules/model/LiveQueryRecord.java b/src/main/java/org/opensearch/plugin/insights/rules/model/LiveQueryRecord.java index 8fafa9c50..995e24119 100644 --- a/src/main/java/org/opensearch/plugin/insights/rules/model/LiveQueryRecord.java +++ b/src/main/java/org/opensearch/plugin/insights/rules/model/LiveQueryRecord.java @@ -31,6 +31,9 @@ public class LiveQueryRecord implements Writeable, ToXContentObject { private final long totalMemory; private final TaskDetails coordinatorTask; private final List shardTasks; + private final String username; + private final List userRoles; + private final List backendRoles; public LiveQueryRecord( String liveQueryRecordId, @@ -41,7 +44,10 @@ public LiveQueryRecord( long totalCpu, long totalMemory, TaskDetails coordinatorTask, - List shardTasks + List shardTasks, + String username, + List userRoles, + List backendRoles ) { this.liveQueryRecordId = liveQueryRecordId; this.status = status; @@ -52,6 +58,9 @@ public LiveQueryRecord( this.totalMemory = totalMemory; this.coordinatorTask = coordinatorTask; this.shardTasks = shardTasks != null ? shardTasks : new ArrayList<>(); + this.username = username; + this.userRoles = userRoles != null ? userRoles : List.of(); + this.backendRoles = backendRoles != null ? backendRoles : List.of(); } public LiveQueryRecord(StreamInput in) throws IOException { @@ -64,6 +73,9 @@ public LiveQueryRecord(StreamInput in) throws IOException { this.totalMemory = in.readLong(); this.coordinatorTask = in.readOptionalWriteable(TaskDetails::new); this.shardTasks = in.readList(TaskDetails::new); + this.username = in.readOptionalString(); + this.userRoles = in.readOptionalStringList(); + this.backendRoles = in.readOptionalStringList(); } @Override @@ -77,6 +89,9 @@ public void writeTo(StreamOutput out) throws IOException { out.writeLong(totalMemory); out.writeOptionalWriteable(coordinatorTask); out.writeList(shardTasks); + out.writeOptionalString(username); + out.writeOptionalStringCollection(userRoles.isEmpty() ? null : userRoles); + out.writeOptionalStringCollection(backendRoles.isEmpty() ? null : backendRoles); } @Override @@ -100,6 +115,15 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws task.toXContent(builder, params); } builder.endArray(); + if (username != null) { + builder.field("username", username); + } + if (!userRoles.isEmpty()) { + builder.array("user_roles", userRoles.toArray(new String[0])); + } + if (!backendRoles.isEmpty()) { + builder.array("backend_roles", backendRoles.toArray(new String[0])); + } builder.endObject(); return builder; } @@ -139,4 +163,16 @@ public TaskDetails getCoordinatorTask() { public List getShardTasks() { return shardTasks; } + + public String getUsername() { + return username; + } + + public List getUserRoles() { + return userRoles; + } + + public List getBackendRoles() { + return backendRoles; + } } diff --git a/src/main/java/org/opensearch/plugin/insights/rules/transport/live_queries/TransportLiveQueriesAction.java b/src/main/java/org/opensearch/plugin/insights/rules/transport/live_queries/TransportLiveQueriesAction.java index 92919fdaa..af5f35f9a 100644 --- a/src/main/java/org/opensearch/plugin/insights/rules/transport/live_queries/TransportLiveQueriesAction.java +++ b/src/main/java/org/opensearch/plugin/insights/rules/transport/live_queries/TransportLiveQueriesAction.java @@ -141,6 +141,17 @@ public void onResponse(ListTasksResponse taskResponse) { // Determine status based on coordinator cancellation String queryStatus = coordinatorInfo.isCancelled() ? "cancelled" : "running"; + // Look up user info captured by the listener at search start + String username = null; + List userRoles = List.of(); + List backendRoles = List.of(); + var userInfo = queryInsightsService.getLiveQueryUserInfo(queryId); + if (userInfo != null) { + username = userInfo.getUserName(); + userRoles = userInfo.getRoles(); + backendRoles = userInfo.getBackendRoles(); + } + LiveQueryRecord record = new LiveQueryRecord( queryId, queryStatus, @@ -150,7 +161,10 @@ public void onResponse(ListTasksResponse taskResponse) { totalCpu, totalMem, new TaskDetails(coordinatorInfo, queryStatus), - shardTasks + shardTasks, + username, + userRoles, + backendRoles ); allRecords.add(record);