Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@ out/
.project
.settings
.vscode
bin/
bin/plugins/*.zip
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.util.List;
import java.util.function.Supplier;
import org.opensearch.action.ActionRequest;
import org.opensearch.rest.RestHeaderDefinition;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.service.ClusterService;
Expand Down Expand Up @@ -70,11 +71,27 @@
* Plugin class for Query Insights.
*/
public class QueryInsightsPlugin extends Plugin implements ActionPlugin, TelemetryAwarePlugin {

/**
* Custom task header for tracking the application/source that initiated a query.
*/
public static final String APPLICATION_ID_HEADER = "X-QI-Application-Id";

/**
* Default constructor
*/
public QueryInsightsPlugin() {}

@Override
public Collection<RestHeaderDefinition> getRestHeaders() {
return List.of(new RestHeaderDefinition(APPLICATION_ID_HEADER, false));
}

@Override
public Collection<String> getTaskHeaders() {
return List.of(APPLICATION_ID_HEADER);
}

@Override
public Collection<Object> createComponents(
final Client client,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,13 @@
import org.opensearch.common.inject.Inject;
import org.opensearch.core.index.Index;
import org.opensearch.core.tasks.resourcetracker.TaskResourceInfo;
import org.opensearch.plugin.insights.QueryInsightsPlugin;
import org.opensearch.plugin.insights.core.auth.UserPrincipalContext;
import org.opensearch.plugin.insights.core.metrics.OperationalMetric;
import org.opensearch.plugin.insights.core.metrics.OperationalMetricsCounter;
import org.opensearch.plugin.insights.core.service.FinishedQueriesCache;
import org.opensearch.plugin.insights.core.service.QueryInsightsService;
import org.opensearch.plugin.insights.core.service.TopQueriesService;
import org.opensearch.plugin.insights.core.service.categorizer.QueryShapeGenerator;
import org.opensearch.plugin.insights.rules.model.Attribute;
import org.opensearch.plugin.insights.rules.model.Measurement;
Expand Down Expand Up @@ -223,7 +225,29 @@ public boolean isEnabled() {
}

@Override
public void onPhaseStart(SearchPhaseContext context) {}
public void onPhaseStart(SearchPhaseContext context) {
// Capture user identity at search phase start for live queries
try {
if (context.getTask() != null) {
long taskIdNum = context.getTask().getId();
String nodeId = clusterService.localNode().getId();
String taskId = nodeId + ":" + taskIdNum;
// Only capture once per task
if (queryInsightsService.getLiveQueryUserInfo(taskId) == null) {
UserPrincipalContext ctx = new UserPrincipalContext(threadPool);
String rawUserStr = ctx.getUserString();
log.debug("onPhaseStart taskId={} rawUserStr={}", taskId, rawUserStr);
UserPrincipalContext.UserPrincipalInfo userInfo = ctx.extractUserInfo();
if (userInfo != null) {
log.debug("Captured user for task {}: {}", taskId, userInfo.getUserName());
queryInsightsService.putLiveQueryUserInfo(taskId, userInfo);
}
}
}
} catch (Exception e) {
log.debug("Failed to capture user info in onPhaseStart", e);
}
}

@Override
public void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {}
Expand All @@ -250,6 +274,14 @@ public void onRequestFailure(final SearchPhaseContext context, final SearchReque

private void addToFinishedCache(SearchPhaseContext context, SearchQueryRecord record) {
try {
// Clean up live query user map
if (context.getTask() != null) {
long taskIdNum = context.getTask().getId();
String nodeId = clusterService.localNode().getId();
queryInsightsService.removeLiveQueryUserInfo(nodeId + ":" + taskIdNum);
}
// Extract user info into record attributes before caching
TopQueriesService.setUserInfo(record);
FinishedQueriesCache cache = queryInsightsService.getFinishedQueriesCache();
if (cache == null || record == null) return;
cache.capture(record, context.getTask().getId());
Expand Down Expand Up @@ -373,6 +405,11 @@ private SearchQueryRecord constructSearchQueryRecord(
if (userProvidedLabel != null) {
labels.put(Task.X_OPAQUE_ID, userProvidedLabel);
}
// Retrieve custom source tracking headers
String applicationId = context.getTask().getHeader(QueryInsightsPlugin.APPLICATION_ID_HEADER);
if (applicationId != null) {
labels.put(QueryInsightsPlugin.APPLICATION_ID_HEADER, applicationId);
}
attributes.put(Attribute.LABELS, labels);

UserPrincipalContext userPrincipalContext = threadPool != null ? new UserPrincipalContext(threadPool) : null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.opensearch.threadpool.Scheduler;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.client.Client;
import org.opensearch.plugin.insights.core.auth.UserPrincipalContext;

/**
* Service responsible for gathering, analyzing, storing and exporting
Expand Down Expand Up @@ -143,6 +144,7 @@ public class QueryInsightsService extends AbstractLifecycleComponent {
private LocalIndexLifecycleManager localIndexLifecycleManager;

private final FinishedQueriesCache finishedQueriesCache;
private final java.util.concurrent.ConcurrentHashMap<String, UserPrincipalContext.UserPrincipalInfo> liveQueryUserMap = new java.util.concurrent.ConcurrentHashMap<>();

SinkType sinkType;

Expand Down Expand Up @@ -673,6 +675,18 @@ public FinishedQueriesCache getFinishedQueriesCache() {
return finishedQueriesCache;
}

public void putLiveQueryUserInfo(String taskId, UserPrincipalContext.UserPrincipalInfo userInfo) {
liveQueryUserMap.put(taskId, userInfo);
}

public UserPrincipalContext.UserPrincipalInfo getLiveQueryUserInfo(String taskId) {
return liveQueryUserMap.get(taskId);
}

public void removeLiveQueryUserInfo(String taskId) {
liveQueryUserMap.remove(taskId);
}

/**
* Returns true if the finished queries cache is enabled (idle timeout is non-zero).
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ public class LiveQueryRecord implements Writeable, ToXContentObject {
private final long totalMemory;
private final TaskDetails coordinatorTask;
private final List<TaskDetails> shardTasks;
private final String username;
private final List<String> userRoles;
private final List<String> backendRoles;

public LiveQueryRecord(
String liveQueryRecordId,
Expand All @@ -41,7 +44,10 @@ public LiveQueryRecord(
long totalCpu,
long totalMemory,
TaskDetails coordinatorTask,
List<TaskDetails> shardTasks
List<TaskDetails> shardTasks,
String username,
List<String> userRoles,
List<String> backendRoles
) {
this.liveQueryRecordId = liveQueryRecordId;
this.status = status;
Expand All @@ -52,6 +58,9 @@ public LiveQueryRecord(
this.totalMemory = totalMemory;
this.coordinatorTask = coordinatorTask;
this.shardTasks = shardTasks != null ? shardTasks : new ArrayList<>();
this.username = username;
this.userRoles = userRoles != null ? userRoles : List.of();
this.backendRoles = backendRoles != null ? backendRoles : List.of();
}

public LiveQueryRecord(StreamInput in) throws IOException {
Expand All @@ -64,6 +73,9 @@ public LiveQueryRecord(StreamInput in) throws IOException {
this.totalMemory = in.readLong();
this.coordinatorTask = in.readOptionalWriteable(TaskDetails::new);
this.shardTasks = in.readList(TaskDetails::new);
this.username = in.readOptionalString();
this.userRoles = in.readOptionalStringList();
this.backendRoles = in.readOptionalStringList();
}

@Override
Expand All @@ -77,6 +89,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeLong(totalMemory);
out.writeOptionalWriteable(coordinatorTask);
out.writeList(shardTasks);
out.writeOptionalString(username);
out.writeOptionalStringCollection(userRoles.isEmpty() ? null : userRoles);
out.writeOptionalStringCollection(backendRoles.isEmpty() ? null : backendRoles);
}

@Override
Expand All @@ -100,6 +115,15 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
task.toXContent(builder, params);
}
builder.endArray();
if (username != null) {
builder.field("username", username);
}
if (!userRoles.isEmpty()) {
builder.array("user_roles", userRoles.toArray(new String[0]));
}
if (!backendRoles.isEmpty()) {
builder.array("backend_roles", backendRoles.toArray(new String[0]));
}
builder.endObject();
return builder;
}
Expand Down Expand Up @@ -139,4 +163,16 @@ public TaskDetails getCoordinatorTask() {
public List<TaskDetails> getShardTasks() {
return shardTasks;
}

public String getUsername() {
return username;
}

public List<String> getUserRoles() {
return userRoles;
}

public List<String> getBackendRoles() {
return backendRoles;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,17 @@ public void onResponse(ListTasksResponse taskResponse) {
// Determine status based on coordinator cancellation
String queryStatus = coordinatorInfo.isCancelled() ? "cancelled" : "running";

// Look up user info captured by the listener at search start
String username = null;
List<String> userRoles = List.of();
List<String> backendRoles = List.of();
var userInfo = queryInsightsService.getLiveQueryUserInfo(queryId);
if (userInfo != null) {
username = userInfo.getUserName();
userRoles = userInfo.getRoles();
backendRoles = userInfo.getBackendRoles();
}

LiveQueryRecord record = new LiveQueryRecord(
queryId,
queryStatus,
Expand All @@ -150,7 +161,10 @@ public void onResponse(ListTasksResponse taskResponse) {
totalCpu,
totalMem,
new TaskDetails(coordinatorInfo, queryStatus),
shardTasks
shardTasks,
username,
userRoles,
backendRoles
);

allRecords.add(record);
Expand Down
Loading