Skip to content

Commit 3bf8971

Browse files
Merge pull request #325 from originalworks/owen-output-generator-refactor
Owen output generator refactor
2 parents 58d51d4 + 1a0586e commit 3bf8971

File tree

7 files changed

+321
-313
lines changed

7 files changed

+321
-313
lines changed

owen/src/aws-lambda/main.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,11 +88,11 @@ async fn function_handler(
8888
println!("synced directories: {s3_message_folders:?}");
8989

9090
match owen::run_with_sentry(&owen_config).await {
91-
Ok(message_processing_context) => {
91+
Ok(ddex_messages) => {
9292
queue
9393
.sync_message_folder_statuses(
9494
local_to_s3_folder_mapping,
95-
message_processing_context,
95+
ddex_messages,
9696
s3_message_folders,
9797
)
9898
.await

owen/src/aws-lambda/message_queue.rs

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use anyhow::Result;
22
use aws_sdk_dynamodb::types::AttributeValue;
3-
use owen::output_generator::MessageDirProcessingContext;
3+
use owen::output_generator::DdexMessage;
44
use std::{collections::HashMap, env};
55

66
pub struct MessageQueue {
@@ -156,29 +156,27 @@ impl MessageQueue {
156156
pub async fn sync_message_folder_statuses(
157157
&self,
158158
local_to_s3_folder_mapping: HashMap<String, String>,
159-
message_processing_context_vec: Vec<MessageDirProcessingContext>,
159+
ddex_messages: Vec<DdexMessage>,
160160
message_folders: Vec<String>,
161161
) -> Result<()> {
162-
let mut s3_folder_to_processing_context_map: HashMap<String, MessageDirProcessingContext> =
163-
HashMap::new();
162+
let mut s3_folder_to_ddex_message_map: HashMap<String, DdexMessage> = HashMap::new();
164163

165-
for message_processing_context in message_processing_context_vec {
164+
for ddex_message in ddex_messages {
166165
let s3_path = local_to_s3_folder_mapping
167-
.get(&message_processing_context.message_dir_path)
166+
.get(&ddex_message.message_dir_path)
168167
.expect(
169168
format!(
170169
"Could not retrieve s3 path from mapping to local folder. Local folder: {}",
171-
message_processing_context.message_dir_path
170+
ddex_message.message_dir_path
172171
)
173172
.as_str(),
174173
);
175174

176-
s3_folder_to_processing_context_map.insert(s3_path.clone(), message_processing_context);
175+
s3_folder_to_ddex_message_map.insert(s3_path.clone(), ddex_message);
177176
}
178177
for folder in message_folders {
179-
let message_processing_context = s3_folder_to_processing_context_map.get(&folder);
180-
if let Some(message_processing_context) = message_processing_context {
181-
if message_processing_context.excluded {
178+
if let Some(ddex_message) = s3_folder_to_ddex_message_map.get(&folder) {
179+
if ddex_message.excluded {
182180
self.set_single_message_folder_status(
183181
folder.clone(),
184182
self.rejected_status_value.to_string(),

owen/src/lib.rs

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,13 @@ use contracts::ContractsManager;
1515
use ddex_parser::ParserError;
1616
pub use log;
1717
use log_macros::{format_error, log_error};
18-
use output_generator::MessageDirProcessingContext;
1918
use sentry::User;
2019
use serde_json::json;
2120
use std::env;
2221
use std::str::FromStr;
2322

23+
use crate::ipfs::IpfsManager;
24+
use crate::output_generator::{DdexMessage, OutputFilesGenerator};
2425
use crate::wallet::OwenWallet;
2526

2627
#[cfg(any(feature = "aws-integration", feature = "local-s3"))]
@@ -45,7 +46,7 @@ pub enum IpfsInterface {
4546
pub struct Config {
4647
pub rpc_url: String,
4748
pub private_key: Option<String>,
48-
pub folder_path: String,
49+
pub input_files_dir: String,
4950
pub local_ipfs: bool,
5051
pub output_files_dir: String,
5152
pub username: String,
@@ -75,7 +76,7 @@ impl Config {
7576
let mut args = std::env::args();
7677
args.next();
7778

78-
let folder_path = args
79+
let input_files_dir = args
7980
.next()
8081
.unwrap_or_else(|| Config::get_env_var("INPUT_FILES_DIR").to_string());
8182

@@ -137,7 +138,7 @@ impl Config {
137138
let config = Config {
138139
rpc_url,
139140
private_key,
140-
folder_path,
141+
input_files_dir,
141142
local_ipfs,
142143
ipfs_api_base_url,
143144
output_files_dir,
@@ -179,13 +180,14 @@ impl Config {
179180
}
180181
}
181182

182-
pub async fn run(config: &Config) -> anyhow::Result<Vec<MessageDirProcessingContext>> {
183+
pub async fn run(config: &Config) -> anyhow::Result<Vec<DdexMessage>> {
183184
let owen_wallet = OwenWallet::build(&config).await?;
184185
let contracts_manager = ContractsManager::build(&config, &owen_wallet).await?;
185186
contracts_manager.check_image_compatibility().await?;
186187

187-
let message_dir_processing_log =
188-
output_generator::create_output_files(&config, &owen_wallet).await?;
188+
let ipfs_manager = IpfsManager::build(&config, &owen_wallet).await?;
189+
let output_files_generator = OutputFilesGenerator::build(&config, &ipfs_manager)?;
190+
let ddex_messages = output_files_generator.generate_files().await?;
189191

190192
let blob_transaction_data = BlobTransactionData::build(&config.output_files_dir)?;
191193

@@ -205,19 +207,18 @@ pub async fn run(config: &Config) -> anyhow::Result<Vec<MessageDirProcessingCont
205207
} else {
206208
contracts_manager.send_blob(blob_transaction_data).await?;
207209
}
208-
209-
Ok(message_dir_processing_log)
210+
Ok(ddex_messages)
210211
}
211212

212-
pub async fn run_with_sentry(config: &Config) -> anyhow::Result<Vec<MessageDirProcessingContext>> {
213+
pub async fn run_with_sentry(config: &Config) -> anyhow::Result<Vec<DdexMessage>> {
213214
sentry::configure_scope(|scope| {
214215
scope.set_user(Some(User {
215216
username: Some(config.username.to_owned()),
216217
..Default::default()
217218
}));
218219
});
219220

220-
let message_dir_processing_context = run(&config).await.map_err(|e| {
221+
let ddex_messages = run(&config).await.map_err(|e| {
221222
sentry::configure_scope(|scope| {
222223
scope.set_tag("error_type", {
223224
if e.is::<ParserError>() {
@@ -232,5 +233,5 @@ pub async fn run_with_sentry(config: &Config) -> anyhow::Result<Vec<MessageDirPr
232233
log_error!("{e}")
233234
})?;
234235

235-
anyhow::Ok(message_dir_processing_context)
236+
anyhow::Ok(ddex_messages)
236237
}

owen/src/local-s3/main.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,15 +34,15 @@ async fn main() -> Result<()> {
3434
let s3_message_folders = storage.s3_message_folders.clone();
3535

3636
match owen::run_with_sentry(&owen_config).await {
37-
Ok(message_processing_context) => {
38-
let s3_folder_to_processing_context_map = database.save_message_folders(
37+
Ok(ddex_messages) => {
38+
let s3_folder_to_ddex_message_map = database.save_message_folders(
3939
local_to_s3_folder_mapping,
40-
message_processing_context,
40+
ddex_messages,
4141
&s3_message_folders,
4242
)?;
4343

4444
storage
45-
.clear_s3_folders(s3_folder_to_processing_context_map, &s3_message_folders)
45+
.clear_s3_folders(s3_folder_to_ddex_message_map, &s3_message_folders)
4646
.await?;
4747
}
4848
Err(e)

owen/src/local-s3/message_database.rs

Lines changed: 12 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use anyhow::Result;
22
use owen::{
33
constants::{DEFAULT_DATABASE_NAME, DEFAULT_TABLE_NAME},
4-
output_generator::MessageDirProcessingContext,
4+
output_generator::DdexMessage,
55
};
66
use rusqlite::{params, Connection};
77
use std::collections::HashMap;
@@ -48,34 +48,28 @@ impl MessageDatabase {
4848
pub fn save_message_folders(
4949
&self,
5050
local_to_s3_folder_mapping: HashMap<String, String>,
51-
message_processing_context_vec: Vec<MessageDirProcessingContext>,
51+
ddex_messages: Vec<DdexMessage>,
5252
message_folders: &Vec<String>,
53-
) -> Result<HashMap<String, MessageDirProcessingContext>> {
54-
let mut s3_folder_to_processing_context_map: HashMap<String, MessageDirProcessingContext> =
55-
HashMap::new();
53+
) -> Result<HashMap<String, DdexMessage>> {
54+
let mut s3_folder_to_ddex_message_map: HashMap<String, DdexMessage> = HashMap::new();
5655

57-
for message_processing_context in message_processing_context_vec {
56+
for ddex_message in ddex_messages {
5857
let s3_path = local_to_s3_folder_mapping
59-
.get(&message_processing_context.message_dir_path)
58+
.get(&ddex_message.message_dir_path)
6059
.expect(
6160
format!(
6261
"Could not retrieve s3 path from mapping to local folder. Local folder: {}",
63-
message_processing_context.message_dir_path
62+
ddex_message.message_dir_path
6463
)
6564
.as_str(),
6665
);
6766

68-
s3_folder_to_processing_context_map.insert(s3_path.clone(), message_processing_context);
67+
s3_folder_to_ddex_message_map.insert(s3_path.clone(), ddex_message);
6968
}
7069
for s3_message_folder in message_folders {
71-
let message_processing_context =
72-
s3_folder_to_processing_context_map.get(s3_message_folder);
73-
if let Some(message_processing_context) = message_processing_context {
74-
if &message_processing_context.excluded == &true {
75-
let reason = message_processing_context
76-
.reason
77-
.as_ref()
78-
.expect("Could not retrieve exclusion reason from processing context");
70+
if let Some(ddex_message) = s3_folder_to_ddex_message_map.get(s3_message_folder) {
71+
if &ddex_message.excluded == &true {
72+
let reason = ddex_message.reason.as_deref().unwrap_or("");
7973
self.connection.execute(
8074
format!(
8175
"INSERT INTO {} (message_folder, status, reason) VALUES (?1, ?2, ?3)",
@@ -105,7 +99,7 @@ impl MessageDatabase {
10599
)?;
106100
}
107101
}
108-
Ok(s3_folder_to_processing_context_map)
102+
Ok(s3_folder_to_ddex_message_map)
109103
}
110104

111105
pub fn save_message_folders_with_status(

0 commit comments

Comments
 (0)