@@ -21,13 +21,17 @@ bool capiocl::Parser::firstIsSubpathOfSecond(const std::filesystem::path &path,
2121}
2222
2323std::tuple<std::string, capiocl::Engine *>
24- capiocl::Parser::parse (const std::filesystem::path &source,
25- const std::filesystem::path &resolve_prefix, bool store_only_in_memory) {
24+ capiocl::Parser::parse (const std::filesystem::path &source, std::filesystem::path &resolve_prefix,
25+ bool store_only_in_memory) {
2626
2727 std::string workflow_name = CAPIO_CL_DEFAULT_WF_NAME;
2828 auto locations = new Engine ();
2929 START_LOG (gettid (), " call(config_file='%s')" , source.c_str ());
3030
31+ if (resolve_prefix.empty ()) {
32+ resolve_prefix = " ." ;
33+ }
34+
3135 locations->newFile (" *" );
3236 locations->setDirectory (" *" );
3337 if (store_only_in_memory) {
@@ -73,6 +77,7 @@ capiocl::Parser::parse(const std::filesystem::path &source,
7377
7478 for (const auto &app : doc[" IO_Graph" ]) {
7579 if (!app.contains (" name" ) || !app[" name" ].is_string ()) {
80+ print_message (CLI_LEVEL_ERROR, " Missing IO_Graph name or name is not a valid string!" );
7681 ERR_EXIT (" Error: app name is mandatory" );
7782 }
7883
@@ -87,14 +92,15 @@ capiocl::Parser::parse(const std::filesystem::path &source,
8792 ERR_EXIT (msg.c_str ());
8893 }
8994
95+ print_message (CLI_LEVEL_JSON, " Parsing input_stream for app " + app_name);
9096 for (const auto &itm : app[" input_stream" ]) {
9197 std::filesystem::path file_path (itm.get <std::string>());
9298 if (file_path.is_relative ()) {
9399 print_message (CLI_LEVEL_WARNING,
94100 " Path : " + file_path.string () + " IS RELATIVE! resolving..." );
95101 file_path = resolve_prefix / file_path;
96102 }
97- locations->newFile (file_path. c_str () );
103+ locations->newFile (file_path);
98104 locations->addConsumer (file_path, app_name);
99105 }
100106
@@ -104,7 +110,7 @@ capiocl::Parser::parse(const std::filesystem::path &source,
104110 print_message (CLI_LEVEL_ERROR, msg);
105111 ERR_EXIT (msg.c_str ());
106112 }
107-
113+ print_message (CLI_LEVEL_JSON, " Parsing output_stream for app " + app_name);
108114 for (const auto &itm : app[" output_stream" ]) {
109115 std::filesystem::path file_path (itm.get <std::string>());
110116 if (file_path.is_relative ()) {
@@ -118,6 +124,7 @@ capiocl::Parser::parse(const std::filesystem::path &source,
118124
119125 // ---- streaming ----
120126 if (app.contains (" streaming" ) && app[" streaming" ].is_array ()) {
127+ print_message (CLI_LEVEL_JSON, " Parsing streaming for app " + app_name);
121128 for (const auto &stream_item : app[" streaming" ]) {
122129 bool is_file = true ;
123130 std::vector<std::filesystem::path> streaming_names;
@@ -145,51 +152,79 @@ capiocl::Parser::parse(const std::filesystem::path &source,
145152 streaming_names.push_back (p);
146153 }
147154 } else {
155+ print_message (
156+ CLI_LEVEL_ERROR,
157+ " Missing streaming name/dirname, or name/dirname is not an array for app " +
158+ app_name);
148159 ERR_EXIT (" error: either name or dirname in streaming section is required" );
149160 }
150161
151- // committed
152- if (!stream_item.contains (" committed" ) || !stream_item[" committed" ].is_string ()) {
153- ERR_EXIT (" commit rule is mandatory in streaming section" );
154- }
155-
156- std::string committed = stream_item[" committed" ].get <std::string>();
157- auto pos = committed.find (' :' );
158- if (pos != std::string::npos) {
159- commit_rule = committed.substr (0 , pos);
160- std::string count_str = committed.substr (pos + 1 );
161- if (!isInteger (count_str)) {
162- ERR_EXIT (" invalid number in commit rule" );
162+ // Commit rule. Optional in nature, hence no check required!
163+ if (stream_item.contains (" committed" )) {
164+ if (!stream_item[" committed" ].is_string ()) {
165+ print_message (CLI_LEVEL_ERROR, " Error: invalid type for commit rule!" );
166+ ERR_EXIT (" Error: invalid type for commit rule!" );
163167 }
164- if (commit_rule == COMMITTED_ON_CLOSE) {
165- n_close = std::stol (count_str);
166- } else if (commit_rule == COMMITTED_N_FILES) {
167- n_files = std::stol (count_str);
168+
169+ std::string committed = stream_item[" committed" ].get <std::string>();
170+ auto pos = committed.find (' :' );
171+ if (pos != std::string::npos) {
172+ commit_rule = committed.substr (0 , pos);
173+ std::string count_str = committed.substr (pos + 1 );
174+ if (!isInteger (count_str)) {
175+ ERR_EXIT (" invalid number in commit rule" );
176+ }
177+ if (commit_rule == COMMITTED_ON_CLOSE) {
178+ n_close = std::stol (count_str);
179+ } else if (commit_rule == COMMITTED_N_FILES) {
180+ n_files = std::stol (count_str);
181+ } else {
182+ ERR_EXIT (" invalid commit rule type" );
183+ }
168184 } else {
169- ERR_EXIT ( " invalid commit rule type " ) ;
185+ commit_rule = committed ;
170186 }
171- } else {
172- commit_rule = committed;
173- }
174187
175- // file_deps
176- if (commit_rule == COMMITTED_ON_FILE) {
177- if (!stream_item.contains (" file_deps" ) ||
178- !stream_item[" file_deps" ].is_array ()) {
179- ERR_EXIT (" commit rule is on_file but no file_deps section found" );
180- }
181- for (const auto &dep : stream_item[" file_deps" ]) {
182- std::filesystem::path p (dep.get <std::string>());
183- if (p.is_relative ()) {
184- p = resolve_prefix / p;
188+ // file_deps
189+ if (commit_rule == COMMITTED_ON_FILE) {
190+ if (!stream_item.contains (" file_deps" ) ||
191+ !stream_item[" file_deps" ].is_array ()) {
192+ ERR_EXIT (" commit rule is on_file but no file_deps section found" );
185193 }
186- file_deps.push_back (p);
194+ for (const auto &dep : stream_item[" file_deps" ]) {
195+ std::filesystem::path p (dep.get <std::string>());
196+ if (p.is_relative ()) {
197+ p = resolve_prefix / p;
198+ }
199+ file_deps.push_back (p);
200+ }
201+ }
202+
203+ // check commit rule is one of the available
204+ if (commit_rule != capiocl::COMMITTED_N_FILES &&
205+ commit_rule != capiocl::COMMITTED_ON_CLOSE &&
206+ commit_rule != capiocl::COMMITTED_ON_FILE &&
207+ commit_rule != capiocl::COMMITTED_ON_TERMINATION) {
208+ print_message (CLI_LEVEL_ERROR, " Error: commit rule " + commit_rule +
209+ " is not one of the allowed one!" );
210+ ERR_EXIT (" Unknown commit rule %s" , commit_rule.c_str ());
187211 }
188212 }
189213
190- // mode
191- if (stream_item.contains (" mode" ) && stream_item[" mode" ].is_string ()) {
214+ // Firing rule. Optional in nature, hence no check required!
215+ if (stream_item.contains (" mode" )) {
216+ if (!stream_item[" mode" ].is_string ()) {
217+ print_message (CLI_LEVEL_ERROR, " Error: invalid type for mode" );
218+ ERR_EXIT (" Error: invalid type for mode" );
219+ }
192220 mode = stream_item[" mode" ].get <std::string>();
221+
222+ if (mode != capiocl::MODE_UPDATE && mode != capiocl::MODE_NO_UPDATE) {
223+ print_message (CLI_LEVEL_ERROR,
224+ " Error: invalid firing rule provided for app: " + app_name);
225+ ERR_EXIT (" Error: invalid firing rule provided for app: %s" ,
226+ app_name.c_str ());
227+ }
193228 }
194229
195230 // n_files (optional)
@@ -206,6 +241,14 @@ capiocl::Parser::parse(const std::filesystem::path &source,
206241 } else {
207242 locations->setDirectory (path);
208243 }
244+
245+ print_message (CLI_LEVEL_INFO,
246+ " App: " + app_name + " - " + " path: " + path.string () + " - " +
247+ " committed: " + commit_rule + " - " + " mode: " + mode +
248+ " - " + " n_files: " + std::to_string (n_files) + " - " +
249+ " n_close: " + std::to_string (n_close));
250+ print_message (CLI_LEVEL_INFO, " " );
251+
209252 locations->setCommitRule (path, commit_rule);
210253 locations->setFireRule (path, mode);
211254 locations->setCommitedCloseNumber (path, n_close);
0 commit comments