diff --git a/.gitignore b/.gitignore index 0e6ae28..7efd2f1 100644 --- a/.gitignore +++ b/.gitignore @@ -152,3 +152,4 @@ docs/.astro # Examples - ignore lock files since they use local file: links examples/*/package-lock.json +examples/pumpfun-server/ diff --git a/Cargo.lock b/Cargo.lock index 3ffd04d..a36ed44 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1294,6 +1294,7 @@ version = "0.5.3" dependencies = [ "bs58", "dashmap", + "futures", "hex", "hyperstack-macros", "lru", diff --git a/hyperstack-macros/src/ast/types.rs b/hyperstack-macros/src/ast/types.rs index c4ef465..1ef3441 100644 --- a/hyperstack-macros/src/ast/types.rs +++ b/hyperstack-macros/src/ast/types.rs @@ -251,6 +251,27 @@ pub struct ComputedFieldSpec { #[serde(rename_all = "lowercase")] pub enum ResolverType { Token, + Url(UrlResolverConfig), +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash, Default)] +#[serde(rename_all = "lowercase")] +pub enum HttpMethod { + #[default] + Get, + Post, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)] +pub struct UrlResolverConfig { + /// Field path to get the URL from (e.g., "info.uri") + pub url_path: String, + /// HTTP method to use (default: GET) + #[serde(default)] + pub method: HttpMethod, + /// JSON path to extract from response (None = full payload) + #[serde(default, skip_serializing_if = "Option::is_none")] + pub extract_path: Option, } #[derive(Debug, Clone, Serialize, Deserialize)] diff --git a/hyperstack-macros/src/codegen/vixen_runtime.rs b/hyperstack-macros/src/codegen/vixen_runtime.rs index ead3aea..52c5333 100644 --- a/hyperstack-macros/src/codegen/vixen_runtime.rs +++ b/hyperstack-macros/src/codegen/vixen_runtime.rs @@ -259,6 +259,7 @@ pub fn generate_vm_handler( health_monitor: Option, slot_tracker: hyperstack::runtime::hyperstack_server::SlotTracker, resolver_client: Option>, + url_resolver_client: std::sync::Arc, } impl std::fmt::Debug for VmHandler { @@ -278,6 +279,7 @@ pub fn generate_vm_handler( health_monitor: Option, slot_tracker: hyperstack::runtime::hyperstack_server::SlotTracker, resolver_client: Option>, + url_resolver_client: std::sync::Arc, ) -> Self { Self { vm, @@ -286,6 +288,7 @@ pub fn generate_vm_handler( health_monitor, slot_tracker, resolver_client, + url_resolver_client, } } @@ -347,13 +350,18 @@ pub fn generate_vm_handler( }; let mut token_requests = Vec::new(); + let mut url_requests = Vec::new(); let mut other_requests = Vec::new(); for request in requests { - match request.resolver { + match &request.resolver { hyperstack::runtime::hyperstack_interpreter::ast::ResolverType::Token => { token_requests.push(request) } + hyperstack::runtime::hyperstack_interpreter::ast::ResolverType::Url(_) => { + url_requests.push(request) + } + #[allow(unreachable_patterns)] _ => other_requests.push(request), } } @@ -429,6 +437,66 @@ pub fn generate_vm_handler( } } + // Process URL resolver requests + if !url_requests.is_empty() { + let url_client = self.url_resolver_client.clone(); + + for request in url_requests { + if let hyperstack::runtime::hyperstack_interpreter::ast::ResolverType::Url(config) = &request.resolver { + // Get the URL from the input value + let url = match &request.input { + hyperstack::runtime::serde_json::Value::String(s) => s.clone(), + _ => { + hyperstack::runtime::tracing::warn!( + "URL resolver input is not a string: {:?}", + request.input + ); + let mut vm = self.vm.lock().unwrap_or_else(|e| e.into_inner()); + vm.restore_resolver_requests(vec![request]); + continue; + } + }; + + if url.is_empty() { + let mut vm = self.vm.lock().unwrap_or_else(|e| e.into_inner()); + vm.restore_resolver_requests(vec![request]); + continue; + } + + match url_client.resolve(&url, &config.method).await { + Ok(resolved_value) => { + let mut vm = self.vm.lock().unwrap_or_else(|e| e.into_inner()); + match vm.apply_resolver_result( + self.bytecode.as_ref(), + &request.cache_key, + resolved_value, + ) { + Ok(mut new_mutations) => { + mutations.append(&mut new_mutations); + } + Err(err) => { + hyperstack::runtime::tracing::warn!( + url = %url, + "Failed to apply URL resolver result: {}", + err + ); + } + } + } + Err(err) => { + hyperstack::runtime::tracing::warn!( + url = %url, + "URL resolver request failed, re-queuing: {}", + err + ); + let mut vm = self.vm.lock().unwrap_or_else(|e| e.into_inner()); + vm.restore_resolver_requests(vec![request]); + } + } + } + } + } + if !other_requests.is_empty() { let other_count = other_requests.len(); let mut vm = self.vm.lock().unwrap_or_else(|e| e.into_inner()); @@ -858,6 +926,8 @@ pub fn generate_spec_function( } }; + let url_resolver_client = Arc::new(hyperstack::runtime::hyperstack_interpreter::resolvers::UrlResolverClient::new()); + let slot_tracker = hyperstack::runtime::hyperstack_server::SlotTracker::new(); let mut attempt = 0u32; let mut backoff = reconnection_config.initial_delay; @@ -899,6 +969,7 @@ pub fn generate_spec_function( health_monitor.clone(), slot_tracker.clone(), resolver_client.clone(), + url_resolver_client.clone(), ); let account_parser = parsers::AccountParser; @@ -1167,6 +1238,7 @@ pub fn generate_vm_handler_struct() -> TokenStream { health_monitor: Option, slot_tracker: hyperstack::runtime::hyperstack_server::SlotTracker, resolver_client: Option>, + url_resolver_client: std::sync::Arc, } impl std::fmt::Debug for VmHandler { @@ -1186,6 +1258,7 @@ pub fn generate_vm_handler_struct() -> TokenStream { health_monitor: Option, slot_tracker: hyperstack::runtime::hyperstack_server::SlotTracker, resolver_client: Option>, + url_resolver_client: std::sync::Arc, ) -> Self { Self { vm, @@ -1194,6 +1267,7 @@ pub fn generate_vm_handler_struct() -> TokenStream { health_monitor, slot_tracker, resolver_client, + url_resolver_client, } } @@ -1255,13 +1329,18 @@ pub fn generate_vm_handler_struct() -> TokenStream { }; let mut token_requests = Vec::new(); + let mut url_requests = Vec::new(); let mut other_requests = Vec::new(); for request in requests { - match request.resolver { + match &request.resolver { hyperstack::runtime::hyperstack_interpreter::ast::ResolverType::Token => { token_requests.push(request) } + hyperstack::runtime::hyperstack_interpreter::ast::ResolverType::Url(_) => { + url_requests.push(request) + } + #[allow(unreachable_patterns)] _ => other_requests.push(request), } } @@ -1337,6 +1416,66 @@ pub fn generate_vm_handler_struct() -> TokenStream { } } + // Process URL resolver requests + if !url_requests.is_empty() { + let url_client = self.url_resolver_client.clone(); + + for request in url_requests { + if let hyperstack::runtime::hyperstack_interpreter::ast::ResolverType::Url(config) = &request.resolver { + // Get the URL from the input value + let url = match &request.input { + hyperstack::runtime::serde_json::Value::String(s) => s.clone(), + _ => { + hyperstack::runtime::tracing::warn!( + "URL resolver input is not a string: {:?}", + request.input + ); + let mut vm = self.vm.lock().unwrap_or_else(|e| e.into_inner()); + vm.restore_resolver_requests(vec![request]); + continue; + } + }; + + if url.is_empty() { + let mut vm = self.vm.lock().unwrap_or_else(|e| e.into_inner()); + vm.restore_resolver_requests(vec![request]); + continue; + } + + match url_client.resolve(&url, &config.method).await { + Ok(resolved_value) => { + let mut vm = self.vm.lock().unwrap_or_else(|e| e.into_inner()); + match vm.apply_resolver_result( + self.bytecode.as_ref(), + &request.cache_key, + resolved_value, + ) { + Ok(mut new_mutations) => { + mutations.append(&mut new_mutations); + } + Err(err) => { + hyperstack::runtime::tracing::warn!( + url = %url, + "Failed to apply URL resolver result: {}", + err + ); + } + } + } + Err(err) => { + hyperstack::runtime::tracing::warn!( + url = %url, + "URL resolver request failed, re-queuing: {}", + err + ); + let mut vm = self.vm.lock().unwrap_or_else(|e| e.into_inner()); + vm.restore_resolver_requests(vec![request]); + } + } + } + } + } + if !other_requests.is_empty() { let other_count = other_requests.len(); let mut vm = self.vm.lock().unwrap_or_else(|e| e.into_inner()); @@ -1834,6 +1973,8 @@ pub fn generate_multi_pipeline_spec_function( } }; + let url_resolver_client = Arc::new(hyperstack::runtime::hyperstack_interpreter::resolvers::UrlResolverClient::new()); + let slot_tracker = hyperstack::runtime::hyperstack_server::SlotTracker::new(); let mut attempt = 0u32; let mut backoff = reconnection_config.initial_delay; @@ -1875,6 +2016,7 @@ pub fn generate_multi_pipeline_spec_function( health_monitor.clone(), slot_tracker.clone(), resolver_client.clone(), + url_resolver_client.clone(), ); if attempt == 0 { diff --git a/hyperstack-macros/src/lib.rs b/hyperstack-macros/src/lib.rs index 34cf2ad..8e2fab9 100644 --- a/hyperstack-macros/src/lib.rs +++ b/hyperstack-macros/src/lib.rs @@ -103,6 +103,7 @@ pub fn hyperstack(attr: TokenStream, item: TokenStream) -> TokenStream { /// - `#[aggregate(...)]` - Aggregate field values /// - `#[computed(...)]` - Computed fields from other fields /// - `#[derive_from(...)]` - Derive values from instructions +/// - `#[resolve(...)]` - Resolve external data (token metadata via DAS API or data from URLs) #[proc_macro_derive( Stream, attributes( diff --git a/hyperstack-macros/src/parse/attributes.rs b/hyperstack-macros/src/parse/attributes.rs index 2419669..564b86e 100644 --- a/hyperstack-macros/src/parse/attributes.rs +++ b/hyperstack-macros/src/parse/attributes.rs @@ -1004,6 +1004,8 @@ pub fn parse_aggregate_attribute( pub struct ResolveAttribute { pub from: Option, pub address: Option, + pub url: Option, + pub method: Option, pub extract: Option, pub target_field_name: String, pub resolver: Option, @@ -1023,6 +1025,8 @@ pub struct ResolveSpec { struct ResolveAttributeArgs { from: Option, address: Option, + url: Option, + method: Option, extract: Option, resolver: Option, strategy: Option, @@ -1032,6 +1036,8 @@ impl Parse for ResolveAttributeArgs { fn parse(input: ParseStream) -> syn::Result { let mut from = None; let mut address = None; + let mut url = None; + let mut method = None; let mut extract = None; let mut resolver = None; let mut strategy = None; @@ -1048,6 +1054,29 @@ impl Parse for ResolveAttributeArgs { } else if ident_str == "address" { let lit: syn::LitStr = input.parse()?; address = Some(lit.value()); + } else if ident_str == "url" { + // Parse as dotted path (e.g., info.uri) - handle both dot-separated and single identifiers + let mut parts = Vec::new(); + let first: syn::Ident = input.parse()?; + parts.push(first.to_string()); + + // Parse any additional .identifier segments + while input.peek(Token![.]) { + input.parse::()?; + let next: syn::Ident = input.parse()?; + parts.push(next.to_string()); + } + + url = Some(parts.join(".")); + } else if ident_str == "method" { + let method_ident: syn::Ident = input.parse()?; + match method_ident.to_string().to_lowercase().as_str() { + "get" | "post" => method = Some(method_ident), + _ => return Err(syn::Error::new( + method_ident.span(), + "Invalid HTTP method. Only 'GET' or 'POST' are supported.", + )), + } } else if ident_str == "extract" { let lit: syn::LitStr = input.parse()?; extract = Some(lit.value()); @@ -1077,6 +1106,8 @@ impl Parse for ResolveAttributeArgs { Ok(ResolveAttributeArgs { from, address, + url, + method, extract, resolver, strategy, @@ -1094,6 +1125,25 @@ pub fn parse_resolve_attribute( let args: ResolveAttributeArgs = attr.parse_args()?; + // Check for mutually exclusive parameters: url vs (from/address) + let has_url = args.url.is_some(); + let has_token_source = args.from.is_some() || args.address.is_some(); + + if has_url && has_token_source { + return Err(syn::Error::new_spanned( + attr, + "#[resolve] cannot specify 'url' together with 'from' or 'address'", + )); + } + + if !has_url && !has_token_source { + return Err(syn::Error::new_spanned( + attr, + "#[resolve] requires either 'url' or 'from'/'address' parameter", + )); + } + + // Token resolvers: cannot have both from and address if args.from.is_some() && args.address.is_some() { return Err(syn::Error::new_spanned( attr, @@ -1101,10 +1151,11 @@ pub fn parse_resolve_attribute( )); } - if args.from.is_none() && args.address.is_none() { + // URL resolvers require extract parameter + if has_url && args.extract.is_none() { return Err(syn::Error::new_spanned( attr, - "#[resolve] requires either 'from' or 'address' parameter", + "#[resolve] with 'url' requires 'extract' parameter", )); } @@ -1113,6 +1164,8 @@ pub fn parse_resolve_attribute( Ok(Some(ResolveAttribute { from: args.from, address: args.address, + url: args.url, + method: args.method.map(|m| m.to_string()), extract: args.extract, target_field_name: target_field_name.to_string(), resolver: args.resolver, @@ -1146,7 +1199,6 @@ pub fn parse_computed_attribute( target_field_name: target_field_name.to_string(), })) } - pub fn has_entity_attribute(attrs: &[Attribute]) -> bool { attrs.iter().any(|attr| attr.path().is_ident("entity")) } diff --git a/hyperstack-macros/src/stream_spec/ast_writer.rs b/hyperstack-macros/src/stream_spec/ast_writer.rs index ed53dec..ef6e751 100644 --- a/hyperstack-macros/src/stream_spec/ast_writer.rs +++ b/hyperstack-macros/src/stream_spec/ast_writer.rs @@ -208,9 +208,11 @@ fn build_resolver_specs(resolve_specs: &[parse::ResolveSpec]) -> Vec ResolveStrategy { } } -fn resolver_type_key(resolver: &ResolverType) -> &'static str { +fn resolver_type_key(resolver: &ResolverType) -> String { match resolver { - ResolverType::Token => "token", + ResolverType::Token => "token".to_string(), + ResolverType::Url(config) => format!("url:{}", config.url_path), } } diff --git a/hyperstack-macros/src/stream_spec/entity.rs b/hyperstack-macros/src/stream_spec/entity.rs index 46212e1..efa608b 100644 --- a/hyperstack-macros/src/stream_spec/entity.rs +++ b/hyperstack-macros/src/stream_spec/entity.rs @@ -18,7 +18,7 @@ use std::collections::{HashMap, HashSet}; use quote::{format_ident, quote}; use syn::{Fields, GenericArgument, ItemStruct, PathArguments, Type}; -use crate::ast::{EntitySection, FieldTypeInfo, ResolverHook, ResolverType}; +use crate::ast::{EntitySection, FieldTypeInfo, HttpMethod, ResolverHook, ResolverType, UrlResolverConfig}; use crate::codegen; use crate::event_type_helpers::IdlLookup; use crate::parse; @@ -424,16 +424,34 @@ pub fn process_entity_struct_with_idl( pub #field_name: #field_type }); - let resolver = if let Some(name) = resolve_attr.resolver.as_deref() { + // Determine resolver type: URL resolver if url is present, otherwise Token resolver + let resolver = if let Some(url_path) = resolve_attr.url.clone() { + // URL resolver + let method = resolve_attr.method.as_deref().map(|m| { + match m.to_lowercase().as_str() { + "post" => HttpMethod::Post, + _ => HttpMethod::Get, + } + }).unwrap_or(HttpMethod::Get); + + ResolverType::Url(UrlResolverConfig { + url_path, + method, + extract_path: resolve_attr.extract.clone(), + }) + } else if let Some(name) = resolve_attr.resolver.as_deref() { + // Token resolver with explicit type parse_resolver_type_name(name, field_type) + .unwrap_or_else(|err| panic!("{}", err)) } else { + // Token resolver with inferred type infer_resolver_type(field_type) - } - .unwrap_or_else(|err| panic!("{}", err)); + .unwrap_or_else(|err| panic!("{}", err)) + }; resolve_specs.push(parse::ResolveSpec { resolver, - from: resolve_attr.from, + from: resolve_attr.url.clone().or(resolve_attr.from), address: resolve_attr.address, extract: resolve_attr.extract, target_field_name: resolve_attr.target_field_name, diff --git a/hyperstack-macros/src/stream_spec/proto_struct.rs b/hyperstack-macros/src/stream_spec/proto_struct.rs index 842f106..6302e71 100644 --- a/hyperstack-macros/src/stream_spec/proto_struct.rs +++ b/hyperstack-macros/src/stream_spec/proto_struct.rs @@ -448,6 +448,30 @@ pub fn process_struct_with_context( crate::ast::ResolverType::Token => quote! { hyperstack::runtime::hyperstack_interpreter::ast::ResolverType::Token }, + crate::ast::ResolverType::Url(config) => { + let url_path = &config.url_path; + let method_code = match config.method { + crate::ast::HttpMethod::Get => quote! { + hyperstack::runtime::hyperstack_interpreter::ast::HttpMethod::Get + }, + crate::ast::HttpMethod::Post => quote! { + hyperstack::runtime::hyperstack_interpreter::ast::HttpMethod::Post + }, + }; + let extract_path_code = match &config.extract_path { + Some(path) => quote! { Some(#path.to_string()) }, + None => quote! { None }, + }; + quote! { + hyperstack::runtime::hyperstack_interpreter::ast::ResolverType::Url( + hyperstack::runtime::hyperstack_interpreter::ast::UrlResolverConfig { + url_path: #url_path.to_string(), + method: #method_code, + extract_path: #extract_path_code, + } + ) + } + }, }; let strategy_code = match strategy.as_str() { "LastWrite" => quote! { diff --git a/hyperstack-macros/src/stream_spec/sections.rs b/hyperstack-macros/src/stream_spec/sections.rs index 089a655..5d719a1 100644 --- a/hyperstack-macros/src/stream_spec/sections.rs +++ b/hyperstack-macros/src/stream_spec/sections.rs @@ -633,21 +633,46 @@ pub fn process_nested_struct( } else if let Ok(Some(resolve_attr)) = parse::parse_resolve_attribute(attr, &field_name.to_string()) { - let resolver = if let Some(name) = resolve_attr.resolver.as_deref() { + // Determine resolver type: URL resolver if url is present, otherwise Token resolver + let qualified_url = resolve_attr.url.as_deref().map(|url_path_raw| { + if url_path_raw.contains('.') { + url_path_raw.to_string() + } else { + format!("{}.{}", section_name, url_path_raw) + } + }); + + let resolver = if let Some(ref url_path) = qualified_url { + let method = resolve_attr.method.as_deref().map(|m| { + match m.to_lowercase().as_str() { + "post" => crate::ast::HttpMethod::Post, + _ => crate::ast::HttpMethod::Get, + } + }).unwrap_or(crate::ast::HttpMethod::Get); + + crate::ast::ResolverType::Url(crate::ast::UrlResolverConfig { + url_path: url_path.clone(), + method, + extract_path: resolve_attr.extract.clone(), + }) + } else if let Some(name) = resolve_attr.resolver.as_deref() { super::entity::parse_resolver_type_name(name, field_type) + .unwrap_or_else(|err| panic!("{}", err)) } else { super::entity::infer_resolver_type(field_type) - } - .unwrap_or_else(|err| panic!("{}", err)); + .unwrap_or_else(|err| panic!("{}", err)) + }; let mut target_field_name = resolve_attr.target_field_name; if !target_field_name.contains('.') { target_field_name = format!("{}.{}", section_name, target_field_name); } + let from = qualified_url.or(resolve_attr.from); + resolve_specs.push(parse::ResolveSpec { resolver, - from: resolve_attr.from, + from, address: resolve_attr.address, extract: resolve_attr.extract, target_field_name, diff --git a/interpreter/Cargo.toml b/interpreter/Cargo.toml index 5de6876..bdd92d4 100644 --- a/interpreter/Cargo.toml +++ b/interpreter/Cargo.toml @@ -26,6 +26,7 @@ lru = "0.12" sha2 = "0.10" tracing = "0.1" reqwest = { version = "0.12", default-features = false, features = ["json", "rustls-tls"] } +futures = "0.3" hyperstack-macros = { version = "0.5.3", path = "../hyperstack-macros" } # OpenTelemetry for distributed tracing and metrics (optional, behind 'otel' feature) diff --git a/interpreter/src/ast.rs b/interpreter/src/ast.rs index 7cd0da4..5036a9a 100644 --- a/interpreter/src/ast.rs +++ b/interpreter/src/ast.rs @@ -367,6 +367,27 @@ pub struct ComputedFieldSpec { #[serde(rename_all = "lowercase")] pub enum ResolverType { Token, + Url(UrlResolverConfig), +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash, Default)] +#[serde(rename_all = "lowercase")] +pub enum HttpMethod { + #[default] + Get, + Post, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)] +pub struct UrlResolverConfig { + /// Field path to get the URL from (e.g., "info.uri") + pub url_path: String, + /// HTTP method to use (default: GET) + #[serde(default)] + pub method: HttpMethod, + /// JSON path to extract from response (None = full payload) + #[serde(default, skip_serializing_if = "Option::is_none")] + pub extract_path: Option, } #[derive(Debug, Clone, Serialize, Deserialize)] diff --git a/interpreter/src/resolvers.rs b/interpreter/src/resolvers.rs index 3e5f05e..ebe2289 100644 --- a/interpreter/src/resolvers.rs +++ b/interpreter/src/resolvers.rs @@ -1,6 +1,8 @@ use std::collections::{HashMap, HashSet}; use std::sync::OnceLock; +use futures::future::join_all; + use serde::{Deserialize, Serialize}; use serde_json::Value; @@ -494,6 +496,137 @@ impl TokenMetadataResolverClient { } } +// ============================================================================ +// URL Resolver Client - Fetch and parse data from external URLs +// ============================================================================ + +const DEFAULT_URL_TIMEOUT_SECS: u64 = 30; + +pub struct UrlResolverClient { + client: reqwest::Client, +} + +impl Default for UrlResolverClient { + fn default() -> Self { + Self::new() + } +} + +impl UrlResolverClient { + pub fn new() -> Self { + let client = reqwest::Client::builder() + .timeout(std::time::Duration::from_secs(DEFAULT_URL_TIMEOUT_SECS)) + .build() + .expect("Failed to create HTTP client for URL resolver"); + + Self { client } + } + + pub fn with_timeout(timeout_secs: u64) -> Self { + let client = reqwest::Client::builder() + .timeout(std::time::Duration::from_secs(timeout_secs)) + .build() + .expect("Failed to create HTTP client for URL resolver"); + + Self { client } + } + + /// Resolve a URL and return the parsed JSON response + pub async fn resolve( + &self, + url: &str, + method: &crate::ast::HttpMethod, + ) -> Result> { + if url.is_empty() { + return Err("URL is empty".into()); + } + + let response = match method { + crate::ast::HttpMethod::Get => self.client.get(url).send().await?, + crate::ast::HttpMethod::Post => self.client.post(url).send().await?, + }; + + let response = response.error_for_status()?; + let value = response.json::().await?; + + Ok(value) + } + + /// Resolve a URL and extract a specific JSON path from the response + pub async fn resolve_with_extract( + &self, + url: &str, + method: &crate::ast::HttpMethod, + extract_path: Option<&str>, + ) -> Result> { + let response = self.resolve(url, method).await?; + + if let Some(path) = extract_path { + Self::extract_json_path(&response, path) + } else { + Ok(response) + } + } + + /// Extract a value from a JSON object using dot-notation path + /// e.g., "data.image" extracts response["data"]["image"] + pub fn extract_json_path( + value: &Value, + path: &str, + ) -> Result> { + if path.is_empty() { + return Ok(value.clone()); + } + + let mut current = value; + for segment in path.split('.') { + // Try as object key first + if let Some(next) = current.get(segment) { + current = next; + } else if let Ok(index) = segment.parse::() { + // Try as array index + if let Some(next) = current.get(index) { + current = next; + } else { + return Err(format!("Index '{}' out of bounds in path '{}'", index, path).into()); + } + } else { + return Err(format!("Key '{}' not found in path '{}'", segment, path).into()); + } + } + + Ok(current.clone()) + } + + /// Batch resolve multiple URLs in parallel + pub async fn resolve_batch( + &self, + urls: &[(String, crate::ast::HttpMethod, Option)], + ) -> HashMap { + let futures = urls + .iter() + .filter(|(url, _, _)| !url.is_empty()) + .map(|(url, method, extract_path)| async move { + let result = self + .resolve_with_extract(url, method, extract_path.as_deref()) + .await; + (url.clone(), result) + }); + + join_all(futures) + .await + .into_iter() + .filter_map(|(url, result)| match result { + Ok(value) => Some((url, value)), + Err(e) => { + tracing::warn!(url = %url, error = %e, "Failed to resolve URL"); + None + } + }) + .collect() + } +} + struct TokenMetadataResolver; const TOKEN_METADATA_METHODS: &[ResolverComputedMethod] = &[ diff --git a/interpreter/src/vm.rs b/interpreter/src/vm.rs index 45c2085..0b35226 100644 --- a/interpreter/src/vm.rs +++ b/interpreter/src/vm.rs @@ -376,9 +376,10 @@ fn value_to_cache_key(value: &Value) -> String { } } -fn resolver_type_key(resolver: &ResolverType) -> &'static str { +fn resolver_type_key(resolver: &ResolverType) -> String { match resolver { - ResolverType::Token => "token", + ResolverType::Token => "token".to_string(), + ResolverType::Url(config) => format!("url:{}", config.url_path), } } diff --git a/stacks/pumpfun/.hyperstack/PumpfunStream.stack.json b/stacks/pumpfun/.hyperstack/PumpfunStream.stack.json index 1570dc7..aa54f77 100644 --- a/stacks/pumpfun/.hyperstack/PumpfunStream.stack.json +++ b/stacks/pumpfun/.hyperstack/PumpfunStream.stack.json @@ -5348,6 +5348,16 @@ "inner_type": "bool", "source_path": null, "resolved_type": null + }, + { + "field_name": "resolved_image", + "rust_type_name": "Option < String >", + "base_type": "String", + "is_optional": true, + "is_array": false, + "inner_type": "String", + "source_path": null, + "resolved_type": null } ], "is_nested_struct": false, @@ -6689,6 +6699,16 @@ "source_path": null, "resolved_type": null }, + "info.resolved_image": { + "field_name": "resolved_image", + "rust_type_name": "Option < String >", + "base_type": "String", + "is_optional": true, + "is_array": false, + "inner_type": "String", + "source_path": null, + "resolved_type": null + }, "info.symbol": { "field_name": "symbol", "rust_type_name": "Option < String >", @@ -7262,7 +7282,24 @@ } } ], - "resolver_specs": [], + "resolver_specs": [ + { + "resolver": { + "url": { + "url_path": "info.uri", + "method": "get", + "extract_path": "image" + } + }, + "input_path": "info.uri", + "strategy": "SetOnce", + "extracts": [ + { + "target_path": "info.resolved_image" + } + ] + } + ], "computed_fields": [ "trading.last_trade_price", "trading.total_volume", @@ -7409,7 +7446,7 @@ "result_type": "Option < f64 >" } ], - "content_hash": "bb7da3279a1983c38e3e2d84c91babe722c3e4d9e32963b5d26cba8be745e7bd", + "content_hash": "1bc6f8116859f576b2730b2de2bf9fab4090643faa9bf7bf814d31642ff7fc38", "views": [] } ], @@ -17939,5 +17976,5 @@ "program_id": "6EF8rrecthR5Dkzon8Nwu78hRvfCKubJ14M5uBEwF6P" } ], - "content_hash": "9edc87a2beb6a7fc98dc3a46d6fe0cbd9acd61f71acaf36818669bcd7bd7a8a1" + "content_hash": "f7a952f5de70efe998814a8d532f022a21724f1ca47d5dcc1d72ffdcc99a6578" } \ No newline at end of file diff --git a/stacks/pumpfun/src/stack.rs b/stacks/pumpfun/src/stack.rs index 7d59c63..b7490a0 100644 --- a/stacks/pumpfun/src/stack.rs +++ b/stacks/pumpfun/src/stack.rs @@ -59,6 +59,10 @@ pub mod pumpfun_stream { #[map(pump_sdk::accounts::BondingCurve::complete, strategy = LastWrite)] pub is_complete: Option, + + // URL resolver: fetch and extract image from metadata URI + #[resolve(url = info.uri, extract = "image")] + pub resolved_image: Option, } // ReserveState section: All fields come from BondingCurve account updates diff --git a/stacks/sdk/typescript/src/pumpfun/index.ts b/stacks/sdk/typescript/src/pumpfun/index.ts index 8f94044..695d17a 100644 --- a/stacks/sdk/typescript/src/pumpfun/index.ts +++ b/stacks/sdk/typescript/src/pumpfun/index.ts @@ -17,6 +17,7 @@ export interface PumpfunTokenId { export interface PumpfunTokenInfo { is_complete?: boolean | null; name?: string | null; + resolved_image?: string | null; symbol?: string | null; uri?: string | null; } @@ -278,6 +279,7 @@ export const PumpfunTokenIdSchema = z.object({ export const PumpfunTokenInfoSchema = z.object({ is_complete: z.boolean().nullable().optional(), name: z.string().nullable().optional(), + resolved_image: z.string().nullable().optional(), symbol: z.string().nullable().optional(), uri: z.string().nullable().optional(), });