diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 680cc42..7a76d1b 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -29,6 +29,14 @@ jobs: fetch-depth: 1 submodules: true + - name: Install dependencies for C (cross-)compilation + run: | + sudo apt-get update + sudo apt-get install -y clang lld gcc-aarch64-linux-gnu libc6-dev-arm64-cross + + - name: Compile C script using make + run: make c_scripts + - name: Setup Java ${{ matrix.java_version }} uses: actions/setup-java@v3 with: diff --git a/.gitignore b/.gitignore index ff34859..fa803fb 100644 --- a/.gitignore +++ b/.gitignore @@ -10,3 +10,4 @@ out /gradle.properties /logs.tmp *.jar +/plugins/nf-cws/src/resources/nf-cws/getStatsAndResolveSymlinks_* diff --git a/Makefile b/Makefile index 97f0a81..b80f6ee 100644 --- a/Makefile +++ b/Makefile @@ -7,18 +7,52 @@ else mm = endif +C_SCRIPT_SRC = plugins/nf-cws/src/resources/nf-cws/getStatsAndResolveSymlinks.c +C_SCRIPT_AARCH64 = plugins/nf-cws/src/resources/nf-cws/getStatsAndResolveSymlinks_linux_aarch64 +C_SCRIPT_X86_64 = plugins/nf-cws/src/resources/nf-cws/getStatsAndResolveSymlinks_linux_x86_64 +C_SCRIPT_ALL_TARGETS = $(C_SCRIPT_X86_64) $(C_SCRIPT_AARCH64) + +arch:=$(shell uname -m) + +ifneq ($(arch),x86_64) +$(info ====================================================================================) +$(info This Makefile assumes to be run on an x86_64 build system. Found: $(arch)) +$(info As an alternative you can adjust the Makefile or build $(C_SCRIPT_SRC) yourself for the following targets:) +$(info - target aarch64-linux-gnu: saved to $(C_SCRIPT_AARCH64)) +$(info - target x86_64-linux-gnu: saved to $(C_SCRIPT_X86_64)) +$(error Aborting) +endif + +c_scripts: $(C_SCRIPT_ALL_TARGETS) + +$(C_SCRIPT_AARCH64): $(C_SRIPT_SRC) + clang -static \ + -target aarch64-linux-gnu \ + --sysroot=/usr/aarch64-linux-gnu \ + plugins/nf-cws/src/resources/nf-cws/getStatsAndResolveSymlinks.c \ + -fuse-ld=lld \ + -o $@ + + +$(C_SCRIPT_X86_64): $(C_SCRIPT_SRC) + clang -static \ + -target x86_64-linux-gnu \ + plugins/nf-cws/src/resources/nf-cws/getStatsAndResolveSymlinks.c \ + -fuse-ld=lld \ + -o $@ + clean: rm -rf .nextflow* rm -rf work rm -rf build rm -rf plugins/*/build + rm -f $(C_SCRIPT_ALL_TARGETS) ./gradlew clean compile: ./gradlew :nextflow:exportClasspath compileGroovy @echo "DONE `date`" - check: ./gradlew check @@ -55,7 +89,7 @@ assemble: # generate build zips under build/plugins # you can install the plugin copying manually these files to $HOME/.nextflow/plugins # -buildPlugins: +buildPlugins: $(C_SCRIPT_ALL_TARGETS) ./gradlew copyPluginZip # @@ -69,4 +103,4 @@ upload-plugins: ./gradlew plugins:upload publish-index: - ./gradlew plugins:publishIndex \ No newline at end of file + ./gradlew plugins:publishIndex diff --git a/README.md b/README.md index ceca44a..3290918 100644 --- a/README.md +++ b/README.md @@ -1,108 +1,195 @@ -# nf-cws plugin - -This plugin enables Nextflow to communicate with a Common Workflow Scheduler instance and transfer the required information. - -### Supported Executors - -- k8s - -### How to use - -To run Nextflow with this plugin, you need version >=`23.03.0-edge`. -To activate the plugin, add `-plugins nf-cws` to your `nextflow` call or add the following to your `nextflow.config`: -``` -plugins { - id 'nf-cws' -} -``` - -### Configuration - -| Attribute | Required | Explanation | -|:---------------:|---------:|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| dns | - | Provide the link to the running CWS instance.
NOTE: If you provide an address here, the `k8s` executor will not try to start a Common Workflow Scheduler instance on demand. | -| strategy | - | Which strategy should be used for scheduling; available strategies depend on the CWS instance | -| costFunction | - | Which cost function should be used for scheduling; available strategies depend on the CWS instance | -| batchSize | - | Number of tasks to submit together (only if more than this are ready to run); default: 1 | -| memoryPredictor | - | The memory predictor that shall be used for task scaling.
If not set, task scaling is disabled. See Common Workflow Scheduler for supported predictors. | -| minMemory | - | The minimum memory to size a task to. Only used if memory prediction is performed. | -| maxMemory | - | The maximum memory to size a task to. Only used if memory prediction is performed. | - -##### Example: -``` -cws { - dns = 'http://cws-scheduler/' - strategy = 'rank_max-fair' - costFunction = 'MinSize' - batchSize = 10 - memoryPredictor = '' - minMemory = 128.MB - maxMemory = 64.GB -} -``` - -#### K8s Executor - -The `k8s` executor allows starting a Common Workflow Scheduler instance on demand. This will happen if you do not define any CWS-related config. Otherwise, you can configure the following: - -``` -k8s { - scheduler { - name = 'workflow-scheduler' - serviceAccount = 'nextflowscheduleraccount' - imagePullPolicy = 'IfNotPresent' - cpu = '2' - memory = '1400Mi' - container = 'commonworkflowscheduler/kubernetesscheduler:v1.0' - command = null - port = 8080 - workDir = '/scheduler' - runAsUser = 0 - autoClose = false - nodeSelector = null - } -} -``` - -| Attribute | Required | Explanation | -|:----------------|----------|------------------------------------------------------------------------------------------------------------------------------| -| name | - | The name of the pod created | -| serviceAccount | - | Service account used by the scheduler | -| imagePullPolicy | - | Image pull policy for the created pod ([k8s docs](https://kubernetes.io/docs/concepts/containers/images/#image-pull-policy)) | -| cpu | - | Number of cores to use for the scheduler pod | -| memory | - | Memory to use for the scheduler pod | -| container | - | Container image to use for the scheduler pod | -| command | - | Command to start the CWS in the pod. If you need to overwrite the original ENTRYPOINT | -| port | - | Port where to reach the CWS Rest API | -| workDir | - | Workdir within the pod | -| runAsUser | - | Run the scheduler as a specific user | -| autoClose | - | Stop the pod after the workflow is finished | -| nodeSelector | - | A node selector for the CWS pod | - -### Tracing -This plugin adds additional fields to the trace report. Therefore, you have to add the required fields to the `trace.fields` field in your Nextflow config (also check the official [documentation](https://www.nextflow.io/docs/latest/tracing.html#trace-report)). -The following fields can be used: - -| Name | Description | -|:---------------------------------------|:-----------------------------------------------------------------------------------------------------------------:| -| input_size | The accumulated size of the input files | -| memory_adapted | The memory that was used after adaption by the scheduler | -| submit_to_scheduler_time | Time in ms to register the task at CWS | -| submit_to_k8s_time | Time to create and submit pod to k8s | -| scheduler_time_in_queue | How long was the task in the queue until it got scheduled | -| scheduler_place_in_queue | At which place was the task in the queue when it got scheduled | -| scheduler_tried_to_schedule | How often was a scheduling plan calculated until the task was assigned | -| scheduler_time_to_schedule | How long did it take to calculate the location for this task | -| scheduler_nodes_tried | How many nodes have been compared | -| scheduler_nodes_cost | Cost value to schedule on the different nodes (only available for some algorithms) | -| scheduler_could_stop_fetching | How often could the scheduler skip a node | -| scheduler_best_cost | Cost on the selected node (only available for some algorithms) | -| scheduler_delta_schedule_submitted | Time delta between starting to calculate the scheduling plan and submitting the task to the target node | -| scheduler_delta_schedule_alignment | Time delta between beginning to calculate the scheduling plan and finding the target node | -| scheduler_batch_id | The id of the batch the task belongs to | -| scheduler_delta_batch_start_submitted | Time delta between a batch was started, and the scheduler received this task from the workflow engine | -| scheduler_delta_batch_start_received | Time delta between a batch was started, and the scheduler received the pod from the k8s API | -| scheduler_delta_batch_closed_batch_end | Time delta between a batch was closed by the workflow engine, and the scheduler received the pod from the k8s API | -| scheduler_delta_submitted_batch_end | Time delta between a task was submitted, and the batch became schedulable | -| memory_adapted | The memory used for a task when sizing is active | -| input_size | The sum of the input size of all task inputs | +# nf-cws plugin + +This plugin enables Nextflow to communicate with a Common Workflow Scheduler instance and transfer the required +information. + +Together with the Common Workflow Scheduler, the plugin enables you: +- to use more sophisticated scheduling strategies [(More information)](https://arxiv.org/pdf/2302.07652.pdf) +- automatically resize the memory of your memory if your estimation is too high [(More information)](https://arxiv.org/pdf/2408.00047.pdf) +- keep your intermediate data locally at the worker node - this saves 18% of makespan for RNA-Seq, and 95% of makespan for I/O intensive task chaining [(More information)](https://arxiv.org/pdf/2503.13072.pdf) + +For more information on the scheduling, +see the [scheduler repository](https://github.com/CommonWorkflowScheduler/KubernetesScheduler). + +### Supported Executors + +- k8s + +### How to use + +To run Nextflow with this plugin, you need version >=`24.04.0` and <=`25.02.3-edge` +(because Nextflow's Kubernetes refactor in `25.03.0-edge` is not supported *yet*). +To activate the plugin, add `-plugins nf-cws` to your `nextflow` call or add the following to your `nextflow.config`: + +``` +plugins { + id 'nf-cws' +} +``` + +### Configuration + +| Attribute | Required | Explanation | +|:---------------:|---------:|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| dns | - | Provide the link to the running CWS instance.
NOTE: If you provide an address here, the `k8s` executor will not try to start a Common Workflow Scheduler instance on demand. | +| strategy | - | Which strategy should be used for scheduling; available strategies depend on the CWS instance | +| costFunction | - | Which cost function should be used for scheduling; available strategies depend on the CWS instance | +| batchSize | - | Number of tasks to submit together (only if more than this are ready to run); default: 1 | +| memoryPredictor | - | The memory predictor that shall be used for task scaling.
If not set, task scaling is disabled. See Common Workflow Scheduler for supported predictors. | +| minMemory | - | The minimum memory to size a task to. Only used if memory prediction is performed. | +| maxMemory | - | The maximum memory to size a task to. Only used if memory prediction is performed. | + +##### Example: + +``` +cws { + dns = 'http://cws-scheduler/' + strategy = 'rank_max-fair' + costFunction = 'MinSize' + batchSize = 10 + memoryPredictor = '' + minMemory = 128.MB + maxMemory = 64.GB +} +``` + +#### K8s Executor + +The `k8s` executor allows starting a Common Workflow Scheduler instance on demand. This will happen if you do not define +any CWS-related config. Otherwise, you can configure the following: + +``` +k8s { + scheduler { + name = 'workflow-scheduler' + serviceAccount = 'nextflowscheduleraccount' + imagePullPolicy = 'IfNotPresent' + cpu = '2' + memory = '1400Mi' + container = 'commonworkflowscheduler/kubernetesscheduler:v2.1' + command = null + port = 8080 + workDir = '/scheduler' + runAsUser = 0 + autoClose = false + nodeSelector = null + } +} +``` + +| Attribute | Required | Explanation | +|:----------------|----------|------------------------------------------------------------------------------------------------------------------------------| +| name | - | The name of the pod created | +| serviceAccount | - | Service account used by the scheduler | +| imagePullPolicy | - | Image pull policy for the created pod ([k8s docs](https://kubernetes.io/docs/concepts/containers/images/#image-pull-policy)) | +| cpu | - | Number of cores to use for the scheduler pod | +| memory | - | Memory to use for the scheduler pod | +| container | - | Container image to use for the scheduler pod | +| command | - | Command to start the CWS in the pod. If you need to overwrite the original ENTRYPOINT | +| port | - | Port where to reach the CWS Rest API | +| workDir | - | Workdir within the pod | +| runAsUser | - | Run the scheduler as a specific user | +| autoClose | - | Stop the pod after the workflow is finished | +| nodeSelector | - | A node selector for the CWS pod | + +#### WOW + +WOW is a new scheduling approach for dynamic scientific workflow systems that steers both data movement and task +scheduling to reduce network congestion and overall runtime. + +WOW requires some additional configuration due to its use of the local file system in addition to the distributed file +system. + +``` +k8s { + localPath = '/localdata' + localStorageMountPath = '/localdata' + storage { + copyStrategy = 'ftp' + workdir = '/localdata/localwork/' + } +} +``` + +| Attribute | Required | Explanation | +|:----------------------|----------|-------------------------------------------------------------------------------------------------| +| localPath | yes | Host path for the local mount +| localStorageMountPath | no | Container path for the local mount +| storage.copyStrategy | no | Strategy to copy the files between nodes - currently only supports 'ftp' (and its alias 'copy') +| storage.workdir | no | Working directory to use - must be inside of the locally mounted directory + +### Tracing + +This plugin adds additional fields to the trace report. Therefore, you have to add the required fields to +the `trace.fields` field in your Nextflow config (also check the +official [documentation](https://www.nextflow.io/docs/latest/tracing.html#trace-report)). +The following fields can be used: + +| Name | Description | +|:---------------------------------------|:-----------------------------------------------------------------------------------------------------------------:| +| input_size | The accumulated size of the input files | +| memory_adapted | The memory that was used after adaption by the scheduler | +| submit_to_scheduler_time | Time in ms to register the task at CWS | +| submit_to_k8s_time | Time to create and submit pod to k8s | +| scheduler_time_in_queue | How long was the task in the queue until it got scheduled | +| scheduler_place_in_queue | At which place was the task in the queue when it got scheduled | +| scheduler_tried_to_schedule | How often was a scheduling plan calculated until the task was assigned | +| scheduler_time_to_schedule | How long did it take to calculate the location for this task | +| scheduler_nodes_tried | How many nodes have been compared | +| scheduler_nodes_cost | Cost value to schedule on the different nodes (only available for some algorithms) | +| scheduler_could_stop_fetching | How often could the scheduler skip a node | +| scheduler_best_cost | Cost on the selected node (only available for some algorithms) | +| scheduler_delta_schedule_submitted | Time delta between starting to calculate the scheduling plan and submitting the task to the target node | +| scheduler_delta_schedule_alignment | Time delta between beginning to calculate the scheduling plan and finding the target node | +| scheduler_batch_id | The id of the batch the task belongs to | +| scheduler_delta_batch_start_submitted | Time delta between a batch was started, and the scheduler received this task from the workflow engine | +| scheduler_delta_batch_start_received | Time delta between a batch was started, and the scheduler received the pod from the k8s API | +| scheduler_delta_batch_closed_batch_end | Time delta between a batch was closed by the workflow engine, and the scheduler received the pod from the k8s API | +| scheduler_delta_submitted_batch_end | Time delta between a task was submitted, and the batch became schedulable | +| memory_adapted | The memory used for a task when sizing is active | +| input_size | The sum of the input size of all task inputs | +| infiles_time: | (WOW) Time to walk through and retrieve stats of all local (input) files at task start | +| outfiles_time: | (WOW) Time to walk through and retrieve stats of all local (output) files at task start | +| scheduler_time_delta_phase_three: | (WOW) List of time instances taken to calculcate step 3 of the WOW scheduling algorithm (see paper for details) | +| scheduler_copy_tasks: | (WOW) Number of times copy tasks were started for this task | + +--- + +## Citation + +If you use this software or artifacts in a publication, please cite it as: + +#### Text + +Lehmann Fabian, Jonathan Bader, Friedrich Tschirpke, Lauritz Thamsen, and Ulf Leser. **How Workflow Engines Should Talk +to Resource Managers: A Proposal for a Common Workflow Scheduling Interface**. In 2023 IEEE/ACM 23rd International +Symposium on Cluster, Cloud and Internet Computing (CCGrid). Bangalore, India, 2023. + +([https://arxiv.org/pdf/2302.07652.pdf](https://arxiv.org/pdf/2302.07652.pdf)) + +#### BibTeX + +``` +@inproceedings{lehmannHowWorkflowEngines2023, + author = {Lehmann, Fabian and Bader, Jonathan and Tschirpke, Friedrich and Thamsen, Lauritz and Leser, Ulf}, + booktitle = {2023 IEEE/ACM 23rd International Symposium on Cluster, Cloud and Internet Computing (CCGrid)}, + title = {How Workflow Engines Should Talk to Resource Managers: A Proposal for a Common Workflow Scheduling Interface}, + year = {2023}, + address = {{Bangalore, India}}, + doi = {10.1109/CCGrid57682.2023.00025} +} +``` + +#### Strategy-specific Citation + +Please note that the following strategies originated in individual papers: + +- PONDER: [https://arxiv.org/pdf/2408.00047.pdf](https://arxiv.org/pdf/2408.00047.pdf) +- WOW: [https://arxiv.org/pdf/2503.13072.pdf](https://arxiv.org/pdf/2503.13072.pdf) + +--- + +#### Acknowledgement: + +This work was funded by the German Research Foundation (DFG), CRC 1404: "FONDA: Foundations of Workflows for Large-Scale +Scientific Data Analysis." diff --git a/buildSrc/src/main/groovy/io.nextflow.groovy-common-conventions.gradle b/buildSrc/src/main/groovy/io.nextflow.groovy-common-conventions.gradle index 9798cbb..5b9a93b 100644 --- a/buildSrc/src/main/groovy/io.nextflow.groovy-common-conventions.gradle +++ b/buildSrc/src/main/groovy/io.nextflow.groovy-common-conventions.gradle @@ -17,6 +17,7 @@ java { toolchain { languageVersion = JavaLanguageVersion.of(21) } + sourceCompatibility = 17 targetCompatibility = 17 } diff --git a/plugins/nf-cws/build.gradle b/plugins/nf-cws/build.gradle index aa20f2f..6b524e6 100644 --- a/plugins/nf-cws/build.gradle +++ b/plugins/nf-cws/build.gradle @@ -34,7 +34,7 @@ sourceSets { } ext{ - nextflowVersion = '24.11.0-edge' + nextflowVersion = '25.01.0-edge' } dependencies { diff --git a/plugins/nf-cws/src/main/nextflow/cws/CWSConfig.groovy b/plugins/nf-cws/src/main/nextflow/cws/CWSConfig.groovy index e83e398..c445daf 100644 --- a/plugins/nf-cws/src/main/nextflow/cws/CWSConfig.groovy +++ b/plugins/nf-cws/src/main/nextflow/cws/CWSConfig.groovy @@ -16,6 +16,14 @@ class CWSConfig { String getStrategy() { target.strategy as String ?: 'FIFO' } + boolean strategyIsLocationAware() { + switch(target.strategy) { + case "wow": return true + default: + return false + } + } + String getCostFunction() { target.costFunction as String } String getMemoryPredictor() { target.memoryPredictor as String } @@ -30,6 +38,10 @@ class CWSConfig { return s ? new MemoryUnit(s) : null } + Integer getMaxCopyTasksPerNode() { target.maxCopyTasksPerNode as Integer } + + Integer getMaxWaitingCopyTasksPerNode() { target.maxWaitingCopyTasksPerNode as Integer } + int getBatchSize() { String s = target.batchSize as String //Default: 1 -> No batching diff --git a/plugins/nf-cws/src/main/nextflow/cws/CWSPlugin.groovy b/plugins/nf-cws/src/main/nextflow/cws/CWSPlugin.groovy index 069c002..bc90ae9 100644 --- a/plugins/nf-cws/src/main/nextflow/cws/CWSPlugin.groovy +++ b/plugins/nf-cws/src/main/nextflow/cws/CWSPlugin.groovy @@ -1,8 +1,15 @@ package nextflow.cws import groovy.transform.CompileStatic +import nextflow.cws.wow.file.LocalPath +import nextflow.cws.wow.file.OfflineLocalPath +import nextflow.cws.wow.file.WorkdirPath +import nextflow.cws.wow.filesystem.WOWFileSystemProvider +import nextflow.cws.wow.serializer.LocalPathSerializer +import nextflow.file.FileHelper import nextflow.plugin.BasePlugin import nextflow.trace.TraceRecord +import nextflow.util.KryoHelper import org.pf4j.PluginWrapper @CompileStatic @@ -14,6 +21,10 @@ class CWSPlugin extends BasePlugin { private static void registerTraceFields() { TraceRecord.FIELDS.putAll( [ + infiles_time: 'num', + outfiles_time: 'num', + create_bash_wrapper_time: 'num', + create_request_time: 'num', submit_to_scheduler_time: 'num', submit_to_k8s_time: 'num', scheduler_time_in_queue: 'num', @@ -33,6 +44,18 @@ class CWSPlugin extends BasePlugin { scheduler_delta_submitted_batch_end: 'num', memory_adapted: 'mem', input_size: 'num', + scheduler_files_bytes: 'num', + scheduler_files_node_bytes: 'num', + scheduler_files_node_other_task_bytes: 'num', + scheduler_files: 'num', + scheduler_files_node: 'num', + scheduler_files_node_other_task: 'num', + scheduler_depending_task: 'num', + scheduler_location_count: 'num', + scheduler_nodes_to_copy_from: 'num', + scheduler_no_alignment_found: 'num', + scheduler_time_delta_phase_three: 'str', + scheduler_copy_tasks: 'num', ] ) } @@ -40,6 +63,10 @@ class CWSPlugin extends BasePlugin { void start() { super.start() registerTraceFields() + KryoHelper.register( LocalPath, LocalPathSerializer ) + KryoHelper.register( OfflineLocalPath, LocalPathSerializer ) + KryoHelper.register( WorkdirPath, LocalPathSerializer ) + FileHelper.getOrInstallProvider(WOWFileSystemProvider) } } diff --git a/plugins/nf-cws/src/main/nextflow/cws/CWSSchedulerBatch.groovy b/plugins/nf-cws/src/main/nextflow/cws/CWSSchedulerBatch.groovy index 783fec3..94fc908 100644 --- a/plugins/nf-cws/src/main/nextflow/cws/CWSSchedulerBatch.groovy +++ b/plugins/nf-cws/src/main/nextflow/cws/CWSSchedulerBatch.groovy @@ -1,7 +1,9 @@ package nextflow.cws +import groovy.transform.CompileStatic import nextflow.cws.processor.SchedulerBatch +@CompileStatic class CWSSchedulerBatch extends SchedulerBatch { private SchedulerClient schedulerClient diff --git a/plugins/nf-cws/src/main/nextflow/cws/CWSSession.groovy b/plugins/nf-cws/src/main/nextflow/cws/CWSSession.groovy index 1b4408c..984d36c 100644 --- a/plugins/nf-cws/src/main/nextflow/cws/CWSSession.groovy +++ b/plugins/nf-cws/src/main/nextflow/cws/CWSSession.groovy @@ -1,8 +1,11 @@ package nextflow.cws +import groovy.transform.CompileStatic + /** * Central, global instance to manage all generated Executor instances */ +@CompileStatic class CWSSession { static final CWSSession INSTANCE = new CWSSession() diff --git a/plugins/nf-cws/src/main/nextflow/cws/SchedulerClient.groovy b/plugins/nf-cws/src/main/nextflow/cws/SchedulerClient.groovy index 20722a4..46151fb 100644 --- a/plugins/nf-cws/src/main/nextflow/cws/SchedulerClient.groovy +++ b/plugins/nf-cws/src/main/nextflow/cws/SchedulerClient.groovy @@ -1,180 +1,288 @@ -package nextflow.cws - -import groovy.json.JsonOutput -import groovy.json.JsonSlurper -import groovy.util.logging.Slf4j -import nextflow.dag.DAG - -@Slf4j -class SchedulerClient { - - private final CWSConfig config - private final String runName - private boolean registered = false - private boolean closed = false - private int tasksInBatch = 0 - protected String dns - - SchedulerClient( CWSConfig config, String runName ) { - this.config = config - this.runName = runName - this.dns = config.dns?.endsWith('/') ? config.dns[0..-2] : config.dns - CWSSession.INSTANCE.addSchedulerClient( this ) - } - - protected String getDNS() { - return dns ? dns + "/v1" : null - } - - synchronized void registerScheduler( Map data ) { - if ( registered ) return - String url = "${getDNS()}/scheduler/$runName" - registered = true - int trials = 0 - while ( trials++ < 50 ) { - try { - HttpURLConnection post = new URL(url).openConnection() as HttpURLConnection - post.setRequestMethod( "POST" ) - post.setDoOutput(true) - post.setRequestProperty("Content-Type", "application/json") - data.strategy = config.getStrategy() - String message = JsonOutput.toJson( data ) - post.getOutputStream().write(message.getBytes("UTF-8")) - int responseCode = post.getResponseCode() - if( responseCode != 200 ){ - throw new IllegalStateException( "Got code: ${responseCode} from k8s scheduler while registering" ) - } - return - } catch ( UnknownHostException ignored ) { - throw new IllegalArgumentException("The scheduler was not found under '$url', is the url correct and the scheduler running?") - } catch ( ConnectException ignored ) { - Thread.sleep( 3000 ) - }catch (IOException e) { - throw new IllegalStateException("Cannot register scheduler under $url, got ${e.class.toString()}: ${e.getMessage()}", e) - } - } - throw new IllegalStateException("Cannot connect to scheduler under $url" ) - } - - synchronized void closeScheduler(){ - if ( closed ) return - closed = true - HttpURLConnection post = new URL("${getDNS()}/scheduler/$runName").openConnection() as HttpURLConnection - post.setRequestMethod( "DELETE" ) - int responseCode = post.getResponseCode() - log.trace "Delete scheduler code was: ${responseCode}" - } - - void submitMetrics( Map metrics, int id ){ - HttpURLConnection post = new URL("${getDNS()}/scheduler/$runName/metrics/task/$id").openConnection() as HttpURLConnection - post.setRequestMethod( "POST" ) - String message = JsonOutput.toJson( metrics ) - post.setDoOutput(true) - post.setRequestProperty("Content-Type", "application/json") - post.getOutputStream().write(message.getBytes("UTF-8")) - int responseCode = post.getResponseCode() - if( responseCode != 200 ){ - throw new IllegalStateException( "Got code: ${responseCode} from nextflow scheduler, while submitting metrics" ) - } - } - - Map registerTask( Map config, int id ){ - - HttpURLConnection post = new URL("${getDNS()}/scheduler/$runName/task/$id").openConnection() as HttpURLConnection - post.setRequestMethod( "POST" ) - String message = JsonOutput.toJson( config ) - post.setDoOutput(true) - post.setRequestProperty("Content-Type", "application/json") - post.getOutputStream().write(message.getBytes("UTF-8")) - int responseCode = post.getResponseCode() - if( responseCode != 200 ){ - throw new IllegalStateException( "Got code: ${responseCode} from nextflow scheduler, while registering task: ${config.name}" ) - } - tasksInBatch++ - Map response = new JsonSlurper().parse(post.getInputStream()) as Map - return response - - } - - private void batch( String command ){ - HttpURLConnection put = new URL("${getDNS()}/scheduler/$runName/${command}Batch").openConnection() as HttpURLConnection - put.setRequestMethod( "PUT" ) - if ( command == 'end' ){ - put.setDoOutput(true) - put.setRequestProperty("Content-Type", "application/json") - put.getOutputStream().write("$tasksInBatch".getBytes("UTF-8")) - } - int responseCode = put.getResponseCode() - if( responseCode != 200 ){ - throw new IllegalStateException( "Got code: ${responseCode} from nextflow scheduler, while ${command}ing batch" ) - } - } - - void startBatch(){ - tasksInBatch = 0 - if ( !closed ) batch('start') - } - - void endBatch(){ - if ( !closed ) batch('end') - } - - Map getTaskState( int id ){ - - HttpURLConnection get = new URL("${getDNS()}/scheduler/$runName/task/$id").openConnection() as HttpURLConnection - get.setRequestMethod( "GET" ) - get.setDoOutput(true) - int responseCode = get.getResponseCode() - if( responseCode != 200 ){ - throw new IllegalStateException( "Got code: ${responseCode} from nextflow scheduler, while requesting task state: $id" ) - } - Map response = new JsonSlurper().parse(get.getInputStream()) as Map - return response - - } - - - ///* DAG */ - - void submitVertices( List vertices ){ - List> verticesToSubmit = vertices.collect { - [ - label : it.label, - type : it.type.toString(), - uid : it.getId() - ] as Map - } - HttpURLConnection put = new URL("${getDNS()}/scheduler/$runName/DAG/vertices").openConnection() as HttpURLConnection - put.setRequestMethod( "POST" ) - String message = JsonOutput.toJson( verticesToSubmit ) - put.setDoOutput(true) - put.setRequestProperty("Content-Type", "application/json") - put.getOutputStream().write(message.getBytes("UTF-8")) - int responseCode = put.getResponseCode() - if( responseCode != 200 ){ - throw new IllegalStateException( "Got code: ${responseCode} from nextflow scheduler, while submitting vertices: ${vertices}" ) - } - } - - void submitEdges( List edges ){ - List> edgesToSubmit = edges.collect { - [ - uid : it.getId(), - label : it.getLabel(), - from : it.getFrom()?.getId(), - to : it.getTo()?.getId() - ] as Map - } - HttpURLConnection put = new URL("${getDNS()}/scheduler/$runName/DAG/edges").openConnection() as HttpURLConnection - put.setRequestMethod( "POST" ) - String message = JsonOutput.toJson( edgesToSubmit ) - put.setDoOutput(true) - put.setRequestProperty("Content-Type", "application/json") - put.getOutputStream().write(message.getBytes("UTF-8")) - int responseCode = put.getResponseCode() - if( responseCode != 200 ){ - throw new IllegalStateException( "Got code: ${responseCode} from nextflow scheduler, while submitting edges: ${edges}" ) - } - } - +package nextflow.cws + +import groovy.json.JsonOutput +import groovy.json.JsonSlurper +import groovy.transform.CompileDynamic +import groovy.transform.CompileStatic +import groovy.util.logging.Slf4j +import nextflow.cws.wow.filesystem.WOWFileSystemProvider +import nextflow.dag.DAG + +import java.nio.file.Path + +@Slf4j +@CompileStatic +class SchedulerClient { + + private final CWSConfig config + + private final String runName + + private boolean registered = false + + private boolean closed = false + + private int tasksInBatch = 0 + + protected String dns + + SchedulerClient( CWSConfig config, String runName ) { + this.config = config + this.runName = runName + this.dns = config.dns?.endsWith('/') ? config.dns[0..-2] : config.dns + CWSSession.INSTANCE.addSchedulerClient( this ) + WOWFileSystemProvider.INSTANCE.registerSchedulerClient( this ) + } + + protected String getDNS() { + return dns ? dns + "/v1" : null + } + + synchronized void registerScheduler( Map data ) { + if ( registered ) return + String url = "${getDNS()}/scheduler/$runName" + registered = true + int trials = 0 + while ( trials++ < 50 ) { + try { + HttpURLConnection post = URI.create(url).toURL().openConnection() as HttpURLConnection + post.setRequestMethod( "POST" ) + post.setDoOutput(true) + post.setRequestProperty("Content-Type", "application/json") + data.strategy = config.getStrategy() + String message = JsonOutput.toJson( data ) + post.getOutputStream().write(message.getBytes("UTF-8")) + int responseCode = post.getResponseCode() + if( responseCode != 200 ){ + throw new IllegalStateException( "Got code: ${responseCode} from k8s scheduler while registering" ) + } + return + } catch ( UnknownHostException ignored ) { + throw new IllegalArgumentException("The scheduler was not found under '$url', is the url correct and the scheduler running?") + } catch ( ConnectException ignored ) { + Thread.sleep( 3000 ) + }catch (IOException e) { + throw new IllegalStateException("Cannot register scheduler under $url, got ${e.class.toString()}: ${e.getMessage()}", e) + } + } + throw new IllegalStateException("Cannot connect to scheduler under $url" ) + } + + synchronized void closeScheduler(){ + if ( closed ) return + closed = true + HttpURLConnection post = URI.create("${getDNS()}/scheduler/$runName").toURL().openConnection() as HttpURLConnection + post.setRequestMethod( "DELETE" ) + int responseCode = post.getResponseCode() + log.trace "Delete scheduler code was: ${responseCode}" + } + + void submitMetrics( Map metrics, int id ){ + HttpURLConnection post = URI.create("${getDNS()}/scheduler/$runName/metrics/task/$id").toURL().openConnection() as HttpURLConnection + post.setRequestMethod( "POST" ) + String message = JsonOutput.toJson( metrics ) + post.setDoOutput(true) + post.setRequestProperty("Content-Type", "application/json") + post.getOutputStream().write(message.getBytes("UTF-8")) + int responseCode = post.getResponseCode() + if( responseCode != 200 ){ + throw new IllegalStateException( "Got code: ${responseCode} from nextflow scheduler, while submitting metrics" ) + } + } + + Map registerTask( Map config, int id ){ + + HttpURLConnection post = URI.create("${getDNS()}/scheduler/$runName/task/$id").toURL().openConnection() as HttpURLConnection + post.setRequestMethod( "POST" ) + String message = JsonOutput.toJson( config ) + post.setDoOutput(true) + post.setRequestProperty("Content-Type", "application/json") + post.getOutputStream().write(message.getBytes("UTF-8")) + int responseCode = post.getResponseCode() + if( responseCode != 200 ){ + throw new IllegalStateException( "Got code: ${responseCode} from nextflow scheduler, while registering task: ${config.name}" ) + } + tasksInBatch++ + Map response = new JsonSlurper().parse(post.getInputStream()) as Map + return response + + } + + private void batch( String command ){ + HttpURLConnection put = URI.create("${getDNS()}/scheduler/$runName/${command}Batch").toURL().openConnection() as HttpURLConnection + put.setRequestMethod( "PUT" ) + if ( command == 'end' ){ + put.setDoOutput(true) + put.setRequestProperty("Content-Type", "application/json") + put.getOutputStream().write("$tasksInBatch".getBytes("UTF-8")) + } + int responseCode = put.getResponseCode() + if( responseCode != 200 ){ + throw new IllegalStateException( "Got code: ${responseCode} from nextflow scheduler, while ${command}ing batch" ) + } + } + + void startBatch(){ + tasksInBatch = 0 + if ( !closed ) batch('start') + } + + void endBatch(){ + if ( !closed ) batch('end') + } + + Map getTaskState( int id ){ + + HttpURLConnection get = URI.create("${getDNS()}/scheduler/$runName/task/$id").toURL().openConnection() as HttpURLConnection + get.setRequestMethod( "GET" ) + get.setDoOutput(true) + int responseCode = get.getResponseCode() + if( responseCode != 200 ){ + throw new IllegalStateException( "Got code: ${responseCode} from nextflow scheduler, while requesting task state: $id" ) + } + Map response = new JsonSlurper().parse(get.getInputStream()) as Map + return response + + } + + ///* DAG */ + + @CompileDynamic + void submitVertices( List vertices ){ + List> verticesToSubmit = vertices.collect { + [ + label : it.label, + type : it.type.toString(), + uid : it.getId() + ] as Map + } + HttpURLConnection put = URI.create("${getDNS()}/scheduler/$runName/DAG/vertices").toURL().openConnection() as HttpURLConnection + put.setRequestMethod( "POST" ) + String message = JsonOutput.toJson( verticesToSubmit ) + put.setDoOutput(true) + put.setRequestProperty("Content-Type", "application/json") + put.getOutputStream().write(message.getBytes("UTF-8")) + int responseCode = put.getResponseCode() + if( responseCode != 200 ){ + throw new IllegalStateException( "Got code: ${responseCode} from nextflow scheduler, while submitting vertices: ${vertices}" ) + } + } + + @CompileDynamic + void submitEdges( List edges ){ + List> edgesToSubmit = edges.collect { + [ + uid : it.getId(), + label : it.getLabel(), + from : it.getFrom()?.getId(), + to : it.getTo()?.getId() + ] as Map + } + HttpURLConnection put = URI.create("${getDNS()}/scheduler/$runName/DAG/edges").toURL().openConnection() as HttpURLConnection + put.setRequestMethod( "POST" ) + String message = JsonOutput.toJson( edgesToSubmit ) + put.setDoOutput(true) + put.setRequestProperty("Content-Type", "application/json") + put.getOutputStream().write(message.getBytes("UTF-8")) + int responseCode = put.getResponseCode() + if( responseCode != 200 ){ + throw new IllegalStateException( "Got code: ${responseCode} from nextflow scheduler, while submitting edges: ${edges}" ) + } + } + + ///* File location */ + + Map getFileLocation( String path ){ + + String pathEncoded = URLEncoder.encode(path,'utf-8') + HttpURLConnection get = URI.create("${getDNS()}/file/$runName?path=$pathEncoded").toURL().openConnection() as HttpURLConnection + get.setRequestMethod( "GET" ) + get.setDoOutput(true) + int responseCode = get.getResponseCode() + if( responseCode != 200 ){ + throw new IllegalStateException( "Got code: ${responseCode} from nextflow scheduler, while requesting file location: $path (${get.responseMessage})" ) + } + Map response = new JsonSlurper().parse(get.getInputStream()) as Map + return response + + } + + String getDaemonOnNode( String node ){ + + HttpURLConnection get = URI.create("${getDNS()}/daemon/$runName/$node").toURL().openConnection() as HttpURLConnection + get.setRequestMethod( "GET" ) + get.setDoOutput(true) + int responseCode = get.getResponseCode() + if( responseCode != 200 ){ + throw new IllegalStateException( "Got code: ${responseCode} from nextflow scheduler, while requesting daemon on node: $node" ) + } + String response = new JsonSlurper().parse(get.getInputStream()) as String + return response + + } + + void addFileLocation( String path, long size, long timestamp, long locationWrapperID, boolean overwrite, String node = null ){ + + String method = overwrite ? 'overwrite' : 'add' + + HttpURLConnection get = URI.create("${getDNS()}/file/$runName/location/${method}${ node ? "/$node" : ''}").toURL().openConnection() as HttpURLConnection + get.setRequestMethod( "POST" ) + get.setDoOutput(true) + Map data = [ + path : path, + size : size, + timestamp : timestamp, + locationWrapperID : locationWrapperID + ] + if ( node ){ + data.node = node + } + String message = JsonOutput.toJson( data ) + get.setRequestProperty("Content-Type", "application/json") + get.getOutputStream().write(message.getBytes("UTF-8")) + int responseCode = get.getResponseCode() + if( responseCode != 200 ){ + throw new IllegalStateException( "Got code: ${responseCode} from nextflow scheduler, while updating file location: $path: $node (${get.responseMessage})" ) + } + + } + + void publish(Path source, Path destination, String mode ) { + HttpURLConnection get = URI.create("${getDNS()}/file/$runName/publish").toURL().openConnection() as HttpURLConnection + get.setRequestMethod( "PUT" ) + get.setDoOutput(true) + Map data = [ + 'source' : source.toString(), + 'destination' : destination.toString(), + 'mode' : mode.toUpperCase(), + ] + String message = JsonOutput.toJson( data ) + get.setRequestProperty("Content-Type", "application/json") + get.getOutputStream().write(message.getBytes("UTF-8")) + int responseCode = get.getResponseCode() + if( responseCode != 200 ){ + throw new IllegalStateException( "Got code: ${responseCode} from nextflow scheduler, while publishing file: $source (${get.responseMessage}) -- ${get.getURL()}" ) + } + } + + void publishRemaining() { + HttpURLConnection get = URI.create("${getDNS()}/file/$runName/publish").toURL().openConnection() as HttpURLConnection + get.setRequestMethod( "POST" ) + int responseCode = get.getResponseCode() + if( responseCode != 200 ){ + throw new IllegalStateException( "Got code: ${responseCode} from nextflow scheduler, when triggering publish (${get.responseMessage}) -- ${get.getURL()}" ) + } + } + + int getRemainingToPublish() { + HttpURLConnection get = URI.create("${getDNS()}/file/$runName/publish").toURL().openConnection() as HttpURLConnection + get.setRequestMethod( "GET" ) + get.setRequestProperty("Content-Type", "application/json") + int responseCode = get.getResponseCode() + if( responseCode != 200 ){ + throw new IllegalStateException( "Got code: ${responseCode} from nextflow scheduler, while getting remaining to publish (${get.responseMessage})" ) + } + get.getInputStream().text as int + } + } \ No newline at end of file diff --git a/plugins/nf-cws/src/main/nextflow/cws/k8s/CWSK8sClient.groovy b/plugins/nf-cws/src/main/nextflow/cws/k8s/CWSK8sClient.groovy index 083149a..48b41a6 100644 --- a/plugins/nf-cws/src/main/nextflow/cws/k8s/CWSK8sClient.groovy +++ b/plugins/nf-cws/src/main/nextflow/cws/k8s/CWSK8sClient.groovy @@ -1,15 +1,97 @@ package nextflow.cws.k8s +import groovy.json.JsonOutput +import groovy.transform.CompileStatic import groovy.util.logging.Slf4j +import nextflow.k8s.client.ClientConfig import nextflow.k8s.client.K8sClient import nextflow.k8s.client.K8sResponseJson import nextflow.util.MemoryUnit +import org.yaml.snakeyaml.Yaml + +import java.nio.file.Path @Slf4j +@CompileStatic class CWSK8sClient extends K8sClient { - CWSK8sClient( K8sClient k8sClient ) { - super( k8sClient.config ) + CWSK8sClient(ClientConfig config) { + super(config) + } + + static private void trace(String method, String path, String text) { + log.trace "[CWS-K8s] API response $method $path \n${prettyPrint(text).indent()}" + } + + /** + * Create a pod + * + * See + * https://v1-8.docs.kubernetes.io/docs/api-reference/v1.8/#create-55 + * https://v1-8.docs.kubernetes.io/docs/api-reference/v1.8/#pod-v1-core + * + * @param spec + * @return + */ + K8sResponseJson podCreate(String req, namespace = config.namespace) { + assert req + final action = "/api/v1/namespaces/$namespace/pods" + final resp = post(action, req) + trace('POST', action, resp.text) + return new K8sResponseJson(resp.text) + } + + K8sResponseJson podCreate(Map req, Path saveYamlPath=null, namespace = config.namespace) { + + if( saveYamlPath ) try { + saveYamlPath.text = new Yaml().dump(req).toString() + } + catch( Exception e ) { + log.debug "WARN: unable to save request yaml -- cause: ${e.message ?: e}" + } + + podCreate(JsonOutput.toJson(req), namespace) + } + + K8sResponseJson daemonSetCreate(Map req, Path saveYamlPath=null) { + if (saveYamlPath) { + try { + saveYamlPath.text = new Yaml().dump(req).toString() + } + catch (Exception e) { + log.debug "WARN: unable to save request yaml -- cause: ${e.message ?: e}" + } + } + daemonSetCreate(JsonOutput.toJson(req)) + } + + K8sResponseJson daemonSetCreate(String req) { + assert req + final action = "/apis/apps/v1/namespaces/$config.namespace/daemonsets" + log.debug "TRYING... $action" + final resp = post(action, req) + trace('POST', action, resp.text) + new K8sResponseJson(resp.text) + } + + K8sResponseJson daemonSetDelete(String name) { + assert name + final action = "/apis/apps/v1/namespaces/$config.namespace/daemonsets/$name" + final resp = delete(action) + trace('DELETE', action, resp.text) + new K8sResponseJson(resp.text) + } + + K8sResponseJson configCreateBinary(String name, Map data) { + + final spec = [ + apiVersion: 'v1', + kind: 'ConfigMap', + metadata: [ name: name, namespace: config.namespace ], + binaryData: data + ] + + configCreate0(spec) } /** @@ -22,7 +104,7 @@ class CWSK8sClient extends K8sClient { final K8sResponseJson resp = podStatus0(podName) //If this label is not set, the memory was not scaled - if ( resp?.metadata?.labels?."commonworkflowscheduler/memoryscaled" != 'true' ) { + if ( ((resp?.metadata as Map)?.labels as Map)?."commonworkflowscheduler/memoryscaled" != 'true' ) { return null } @@ -30,7 +112,7 @@ class CWSK8sClient extends K8sClient { if ( containers == null || containers.size() == 0 ) { return null } - String memory = containers[0]?.resources?.limits?.memory as String + String memory = ((containers[0]?.resources as Map)?.limits as Map)?.memory as String if ( memory == null ) { return null } else { diff --git a/plugins/nf-cws/src/main/nextflow/cws/k8s/CWSK8sConfig.groovy b/plugins/nf-cws/src/main/nextflow/cws/k8s/CWSK8sConfig.groovy index 964b561..39a2057 100644 --- a/plugins/nf-cws/src/main/nextflow/cws/k8s/CWSK8sConfig.groovy +++ b/plugins/nf-cws/src/main/nextflow/cws/k8s/CWSK8sConfig.groovy @@ -1,94 +1,184 @@ -package nextflow.cws.k8s - -import groovy.transform.CompileStatic -import groovy.transform.Memoized -import groovy.transform.PackageScope -import nextflow.k8s.K8sConfig -import nextflow.k8s.model.PodNodeSelector - -import java.util.stream.Collectors - -class CWSK8sConfig extends K8sConfig { - - private Map target - - CWSK8sConfig(Map config) { - super(config) - this.target = config - } - - K8sScheduler getScheduler(){ - return target.scheduler ? new K8sScheduler( (Map)target.scheduler ) : null - } - - @CompileStatic - @PackageScope - static class K8sScheduler { - - Map target - - private final String[] fields = [ - 'name', - 'serviceAccount', - 'cpu', - 'memory', - 'container', - 'command', - 'port', - 'workDir', - 'runAsUser', - 'autoClose', - 'nodeSelector', - 'imagePullPolicy' - ] - - K8sScheduler(Map scheduler) { - this.target = scheduler - } - - String getName() { target.name as String ?: 'workflow-scheduler' } - - String getServiceAccount() { target.serviceAccount as String } - - // If no container is specified pull the latest image - String getImagePullPolicy() { target.container ? target.imagePullPolicy as String : "Always" } - - Integer getCPUs() { target.cpu as Integer ?: 1 } - - String getMemory() { target.memory as String ?: "1400Mi" } - - String getContainer() { target.container as String ?: 'commonworkflowscheduler/kubernetesscheduler:latest' } - - String getCommand() { target.command as String } - - Integer getPort() { target.port as Integer ?: 8080 } - - String getWorkDir() { target.workDir as String } - - Integer runAsUser() { target.runAsUser as Integer } - - Boolean autoClose() { target.autoClose == null ? true : target.autoClose as Boolean } - - PodNodeSelector getNodeSelector(){ - return target.nodeSelector ? new PodNodeSelector( target.nodeSelector ) : null - } - - Map getAdditional() { - return target.entrySet() - .stream() - .filter{!(it.getKey() in fields) } - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)) - } - - @Memoized - static K8sScheduler defaultConfig( K8sConfig k8sConfig ){ - return new K8sScheduler([ - "serviceAccount" : k8sConfig.getServiceAccount(), - "runAsUser" : 0, - "autoClose" : true - ] as Map) - } - - } - -} +package nextflow.cws.k8s + +import groovy.transform.CompileStatic +import groovy.transform.Memoized +import groovy.transform.PackageScope +import nextflow.exception.AbortOperationException +import nextflow.k8s.K8sConfig +import nextflow.k8s.client.K8sClient +import nextflow.k8s.model.PodHostMount +import nextflow.k8s.model.PodNodeSelector + +import java.nio.file.Path +import java.util.stream.Collectors + +@CompileStatic +class CWSK8sConfig extends K8sConfig { + + private Map target + final boolean schedulingIsLocationAware + + CWSK8sConfig(Map config, boolean isLocationAware) { + super(config) + this.target = config + this.schedulingIsLocationAware = isLocationAware + if ( getLocalPath() ) { + final name = getLocalPath() + final mount = getLocalStorageMountPath() + getPodOptions().mountHostPaths.add(new PodHostMount(name, mount)) + } + } + + K8sScheduler getScheduler(){ + target.scheduler || schedulingIsLocationAware ? new K8sScheduler( (Map)target.scheduler ) : null + } + + Storage getStorage() { + schedulingIsLocationAware ? new Storage((Map) target.storage, getLocalClaimPaths()) : null + } + + String getArchitecture() { + target.architecture ?: System.getProperty("os.arch") + } + + String getLocalPath() { + target.localPath as String + } + + String getLocalStorageMountPath() { + target.localStorageMountPath ?: '/workspace' as String + } + + Collection getLocalClaimPaths() { + getPodOptions().mountHostPaths.collect { it.mountPath } + } + + String findLocalVolumeClaimByPath(String path) { + def result = getPodOptions().mountHostPaths.find { path.startsWith(it.mountPath) } + return result ? result.hostPath : null + } + + void checkStorageAndPaths(K8sClient client, String pipelineName) { + super.checkStorageAndPaths(client) + if ( schedulingIsLocationAware ) { + //The nextflow project/workflow has to be on a shared drive + if (pipelineName && pipelineName[0] == '/' && !findVolumeClaimByPath(pipelineName)) + throw new AbortOperationException("Kubernetes `pipelineName` must be a path mounted as a persistent volume -- projectDir=$pipelineName; volumes=${getClaimPaths().join(', ')}") + + if (getStorage() && !findLocalVolumeClaimByPath(getStorage().getWorkdir())) + throw new AbortOperationException("Kubernetes `storage.workdir` must be a path mounted as a local volume -- storage.workdir=${getStorage().getWorkdir()}; volumes=${getLocalClaimPaths().join(', ')}") + } + } + + @CompileStatic + @PackageScope + static class K8sScheduler { + + Map target + + private final String[] fields = [ + 'name', + 'serviceAccount', + 'cpu', + 'memory', + 'container', + 'command', + 'port', + 'workDir', + 'runAsUser', + 'autoClose', + 'nodeSelector', + 'imagePullPolicy' + ] + + K8sScheduler(Map scheduler) { + this.target = scheduler + } + + String getName() { target.name as String ?: 'workflow-scheduler' } + + String getServiceAccount() { target.serviceAccount as String } + + // If no container is specified pull the latest image + String getImagePullPolicy() { target.container ? target.imagePullPolicy as String : "Always" } + + Integer getCPUs() { target.cpu as Integer ?: 1 } + + String getMemory() { target.memory as String ?: "1400Mi" } + + String getContainer() { target.container as String ?: 'commonworkflowscheduler/kubernetesscheduler:v2.1' } + + String getCommand() { target.command as String } + + Integer getPort() { target.port as Integer ?: 8080 } + + String getWorkDir() { target.workDir as String } + + Integer runAsUser() { target.runAsUser as Integer } + + Boolean autoClose() { target.autoClose == null ? true : target.autoClose as Boolean } + + PodNodeSelector getNodeSelector(){ + return target.nodeSelector ? new PodNodeSelector( target.nodeSelector ) : null + } + + Map getAdditional() { + return target.entrySet() + .stream() + .filter{!(it.getKey() in fields) } + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)) + } + + @Memoized + static K8sScheduler defaultConfig( K8sConfig k8sConfig ){ + return new K8sScheduler([ + "serviceAccount" : k8sConfig.getServiceAccount(), + "runAsUser" : 0, + "autoClose" : true + ] as Map) + } + + } + + @CompileStatic + @PackageScope + static class Storage { + + @Delegate + Map target + Collection localClaims + + Storage(Map scheduler, Collection localClaims ) { + this.target = scheduler + this.localClaims = localClaims + } + + String getCopyStrategy() { + target.copyStrategy as String ?: 'ftp' + } + + String getWorkdir() { + Path workdir = (target.workdir ?: localClaims[0]) as Path + if( ! workdir.getName().equalsIgnoreCase('localWork') ){ + workdir = workdir.resolve( 'localWork' ) + } + return workdir.toString() + } + + PodNodeSelector getNodeSelector(){ + return target.nodeSelector ? new PodNodeSelector( target.nodeSelector ) : null + } + + boolean deleteIntermediateData(){ + target.deleteIntermediateData as Boolean ?: false + } + + String getImageName() { + target.imageName ?: 'commonworkflowscheduler/ftpdaemon:v2.0' + } + + String getCmd() { + target.cmd as String + } + } +} \ No newline at end of file diff --git a/plugins/nf-cws/src/main/nextflow/cws/k8s/CWSK8sExecutor.groovy b/plugins/nf-cws/src/main/nextflow/cws/k8s/CWSK8sExecutor.groovy index c64026a..1c30f5d 100644 --- a/plugins/nf-cws/src/main/nextflow/cws/k8s/CWSK8sExecutor.groovy +++ b/plugins/nf-cws/src/main/nextflow/cws/k8s/CWSK8sExecutor.groovy @@ -1,5 +1,6 @@ package nextflow.cws.k8s +import com.google.common.hash.Hashing import groovy.transform.CompileStatic import groovy.transform.Memoized import groovy.transform.PackageScope @@ -11,7 +12,11 @@ import nextflow.cws.processor.CWSTaskPollingMonitor import nextflow.k8s.K8sConfig import nextflow.k8s.K8sExecutor import nextflow.k8s.client.K8sClient +import nextflow.k8s.client.K8sResponseException +import nextflow.k8s.model.PodHostMount +import nextflow.k8s.model.PodMountConfig import nextflow.k8s.model.PodOptions +import nextflow.k8s.model.PodVolumeClaim import nextflow.processor.TaskHandler import nextflow.processor.TaskMonitor import nextflow.processor.TaskRun @@ -19,19 +24,31 @@ import nextflow.util.Duration import nextflow.util.ServiceName import org.pf4j.ExtensionPoint +import java.nio.file.Path +import java.nio.file.Paths + @Slf4j @CompileStatic @ServiceName('k8s') class CWSK8sExecutor extends K8sExecutor implements ExtensionPoint { @PackageScope SchedulerClient schedulerClient + @PackageScope CWSSchedulerBatch schedulerBatch + protected CWSK8sClient client + /** + * Name of the created daemonSet + */ + private String daemonSet = null + + private String configMapName = null + @Override @Memoized protected K8sConfig getK8sConfig() { - return new CWSK8sConfig( (Map)session.config.k8s ) + return new CWSK8sConfig( (Map)session.config.k8s, getCWSConfig().strategyIsLocationAware() ) } @Memoized @@ -66,44 +83,67 @@ class CWSK8sExecutor extends K8sExecutor implements ExtensionPoint { assert task assert task.workDir log.trace "[K8s] launching process > ${task.name} -- work folder: ${task.workDirStr}" - return new CWSK8sTaskHandler( task, this ) + return new CWSK8sTaskHandler( task, this, configMapName ) + } + + @Override + protected K8sClient getClient() { + return client } @Override protected void register() { super.register() - this.client = new CWSK8sClient(super.getClient()) + final k8sConfig = getK8sConfig() + final clientConfig = k8sConfig.getClient() + this.client = new CWSK8sClient(clientConfig) + log.debug "[K8s] config=$k8sConfig; API client config=$clientConfig" - CWSK8sConfig.K8sScheduler cwsK8sConfig = (k8sConfig as CWSK8sConfig).getScheduler() - CWSConfig cwsConfig = new CWSConfig(session.config.navigate('cws') as Map) + final CWSConfig cwsConfig = getCWSConfig() + final CWSK8sConfig cwsK8sConfig = k8sConfig as CWSK8sConfig + + if ( cwsConfig.strategyIsLocationAware() ) { + createDaemonSet() + registerGetStatsConfigMap( cwsK8sConfig.getArchitecture() ) + } + + CWSK8sConfig.K8sScheduler k8sSchedulerConfig = cwsK8sConfig.getScheduler() Map data - if ( !cwsK8sConfig && !cwsConfig.dns ) { + if ( !k8sSchedulerConfig && !cwsConfig.dns ) { //Use default configuration - cwsK8sConfig = CWSK8sConfig.K8sScheduler.defaultConfig( k8sConfig ) + k8sSchedulerConfig = CWSK8sConfig.K8sScheduler.defaultConfig( k8sConfig ) } - if( cwsK8sConfig ) { + if( k8sSchedulerConfig ) { + final PodOptions podOptions = cwsK8sConfig.getPodOptions() schedulerClient = new K8sSchedulerClient( cwsConfig, + k8sSchedulerConfig, cwsK8sConfig, - k8sConfig, - k8sConfig.getNamespace(), + cwsK8sConfig.getNamespace(), session.runName, client, - k8sConfig.getPodOptions().getVolumeClaims() + podOptions.getVolumeClaims(), + podOptions.getMountHostPaths() ) - final PodOptions podOptions = k8sConfig.getPodOptions() Boolean traceEnabled = session.config.navigate('trace.enabled') as Boolean + CWSK8sConfig.Storage storage = cwsK8sConfig.getStorage() data = [ - volumeClaims : podOptions.volumeClaims, + volumeClaims : podOptions.getVolumeClaims(), traceEnabled : traceEnabled, costFunction : cwsConfig.getCostFunction(), memoryPredictor : cwsConfig.getMemoryPredictor(), maxMemory : cwsConfig.getMaxMemory()?.toBytes(), minMemory : cwsConfig.getMinMemory()?.toBytes(), - additional : cwsK8sConfig.getAdditional() + additional : k8sSchedulerConfig.getAdditional(), + workDir : session.workDir as String, + localWorkDir : storage?.getWorkdir(), + copyStrategy : storage?.getCopyStrategy(), + locationAware : cwsConfig.strategyIsLocationAware(), + maxCopyTasksPerNode : cwsConfig.getMaxCopyTasksPerNode(), + maxWaitingCopyTasksPerNode : cwsConfig.getMaxWaitingCopyTasksPerNode() ] } else { data = [ @@ -119,7 +159,22 @@ class CWSK8sExecutor extends K8sExecutor implements ExtensionPoint { @Override void shutdown() { - final CWSK8sConfig.K8sScheduler schedulerConfig = (k8sConfig as CWSK8sConfig).getScheduler() + final CWSK8sConfig cwsK8sConfig = k8sConfig as CWSK8sConfig + final CWSK8sConfig.K8sScheduler schedulerConfig = cwsK8sConfig.getScheduler() + + schedulerClient.publishRemaining(); + + if ( getCWSConfig().strategyIsLocationAware() ) { + int remaining + do { + remaining = schedulerClient.getRemainingToPublish() + if (remaining > 0) { + log.info "Waiting for $remaining files to be published" + sleep(1000) + } + } while (remaining > 0) + } + if( schedulerConfig ) { try{ schedulerClient.closeScheduler() @@ -127,6 +182,150 @@ class CWSK8sExecutor extends K8sExecutor implements ExtensionPoint { log.error( "Error while closing scheduler", e) } } + + if( daemonSet ){ + try { + def result = client.daemonSetDelete( daemonSet ) + log.trace "$result" + } catch (K8sResponseException e){ + log.error("Couldn't delete daemonset: $daemonSet", e) + } + } + log.trace "Close K8s Executor" + } + + protected void registerGetStatsConfigMap( String architecture ) { + Map configMap = [:] + + String architectureStatFileName + if (architecture == "amd64" || architecture == "x86_64") { + architectureStatFileName = "/nf-cws/getStatsAndResolveSymlinks_linux_x86_64" + } else if (architecture == "aarch64" || architecture == "arm64") { + architectureStatFileName = "/nf-cws/getStatsAndResolveSymlinks_linux_aarch64" + } else { + throw new RuntimeException("The ${architecture} architecture is currently not supported for WOW. " + + "You may compile the getStatsAndResolveSymlinks.c yourself and add it to the resources directory.") + } + final contentStream = CWSK8sExecutor.class.getResourceAsStream(architectureStatFileName) + final content = contentStream.bytes.encodeBase64().toString() + configMap[WOWK8sWrapperBuilder.statFileName] = content + + configMapName = makeConfigMapName(content) + tryCreateConfigMap(configMapName, configMap) + log.debug "Created K8s configMap with name: $configMapName" + k8sConfig.getPodOptions().getMountConfigMaps().add( new PodMountConfig(configMapName, '/etc/nextflow') ) + } + + protected void tryCreateConfigMap(String name, Map data) { + try { + client.configCreateBinary(name, data) + } + catch( K8sResponseException e ) { + if( e.response.reason != 'AlreadyExists' ) + throw e + } + } + + protected static String makeConfigMapName(String content ) { + "nf-get-stat-${hash(content)}" + } + + protected static String hash(String text) { + def hasher = Hashing.murmur3_32_fixed().newHasher() + hasher.putUnencodedChars(text) + return hasher.hash().toString() + } + + private void createDaemonSet(){ + + final K8sConfig k8sConfig = getK8sConfig() + final PodOptions podOptions = (k8sConfig as CWSK8sConfig).getPodOptions() + final mounts = [] + final volumes = [] + int volume = 1 + + // host mounts + for( PodHostMount entry : podOptions.mountHostPaths ) { + final name = 'vol-' + volume++ + mounts << [name: name, mountPath: entry.mountPath] + volumes << [name: name, hostPath: [path: entry.hostPath]] + } + + final namesMap = [:] + + // creates a volume name for each unique claim name + for( String claimName : podOptions.volumeClaims.collect { it.claimName }.unique() ) { + final volName = 'vol-' + volume++ + namesMap[claimName] = volName + volumes << [name: volName, persistentVolumeClaim: [claimName: claimName]] + } + + // -- volume claims + for( PodVolumeClaim entry : podOptions.volumeClaims ) { + //check if we already have a volume for the pvc + final name = namesMap.get(entry.claimName) + final claim = [name: name, mountPath: entry.mountPath ] + if( entry.subPath ) + claim.subPath = entry.subPath + if( entry.readOnly ) + claim.readOnly = entry.readOnly + mounts << claim + } + + String name = "mount-${session.runName.replace('_', '-')}" + def spec = [ + containers: [ [ + name: name, + image: (k8sConfig as CWSK8sConfig).getStorage().getImageName(), + volumeMounts: mounts, + imagePullPolicy : 'IfNotPresent' + ] ], + volumes: volumes, + serviceAccount: client.config.serviceAccount + ] + + CWSK8sConfig.Storage storage = (k8sConfig as CWSK8sConfig).getStorage() + if( storage.getNodeSelector() ) + spec.put( 'nodeSelector', storage.getNodeSelector().toSpec() as Serializable ) + + def pod = [ + apiVersion: 'apps/v1', + kind: 'DaemonSet', + metadata: [ + labels: [ + app: 'nextflow' + ], + name: name, + namespace: k8sConfig.getNamespace() ?: 'default' + ], + spec : [ + restartPolicy: 'Always', + template: [ + metadata: [ + labels: [ + name : name, + "nextflow.io/app" : 'nextflow' + ] + ], + spec: spec, + ], + selector: [ + matchLabels: [ + name: name + ] + ] + ] + ] + + daemonSet = name + client.daemonSetCreate(pod, Paths.get('.nextflow-daemonset.yaml') ) + log.trace "Created daemonSet: $name" + } -} + @Override + boolean isForeignFile( Path path ) { + if ( path.getScheme() == 'wow' ) return false + return super.isForeignFile(path) + } +} \ No newline at end of file diff --git a/plugins/nf-cws/src/main/nextflow/cws/k8s/CWSK8sTaskHandler.groovy b/plugins/nf-cws/src/main/nextflow/cws/k8s/CWSK8sTaskHandler.groovy index be46394..6ca3e0e 100644 --- a/plugins/nf-cws/src/main/nextflow/cws/k8s/CWSK8sTaskHandler.groovy +++ b/plugins/nf-cws/src/main/nextflow/cws/k8s/CWSK8sTaskHandler.groovy @@ -1,7 +1,9 @@ package nextflow.cws.k8s import groovy.transform.CompileDynamic +import groovy.transform.CompileStatic import groovy.util.logging.Slf4j +import nextflow.cws.CWSConfig import nextflow.cws.SchedulerClient import nextflow.executor.BashWrapperBuilder import nextflow.extension.GroupKey @@ -15,6 +17,7 @@ import java.nio.file.NoSuchFileException import java.nio.file.Path @Slf4j +@CompileStatic class CWSK8sTaskHandler extends K8sTaskHandler { static final public String CMD_TRACE_SCHEDULER = '.command.scheduler.trace' @@ -37,12 +40,15 @@ class CWSK8sTaskHandler extends K8sTaskHandler { private boolean failedOOM = false - CWSK8sTaskHandler( TaskRun task, CWSK8sExecutor executor ) { + private final String configMapName + + CWSK8sTaskHandler( TaskRun task, CWSK8sExecutor executor, String configMapName ) { super( task, executor ) this.client = executor.getCWSK8sClient() this.schedulerClient = executor.schedulerClient this.executor = executor this.syntheticPodName = super.getSyntheticPodName(task) + this.configMapName = configMapName } @Override @@ -53,9 +59,28 @@ class CWSK8sTaskHandler extends K8sTaskHandler { @Override protected Map newSubmitRequest0(TaskRun task, String imageName) { Map pod = super.newSubmitRequest0(task, imageName) - if ( (k8sConfig as CWSK8sConfig)?.getScheduler() ){ - (pod.spec as Map).schedulerName = (k8sConfig as CWSK8sConfig).getScheduler().getName() + "-" + getRunName() + final CWSK8sConfig cwsK8sConfig = k8sConfig as CWSK8sConfig + if ( cwsK8sConfig?.getScheduler() ){ + (pod.spec as Map).schedulerName = cwsK8sConfig.getScheduler().getName() + "-" + getRunName() + } + + final CWSConfig cwsConfig = executor.getCWSConfig() + if ( cwsConfig.strategyIsLocationAware() ) { + //Set default mode for configMap + Map specs = pod.spec as Map + List volumes = specs?.volumes as List + if (volumes) { + for (Map vol : volumes) { + if (vol.configMap == null) continue + if ((vol.configMap as Map)?.name == configMapName) { + Map configMap = vol.configMap as Map + configMap.defaultMode = 0755 + break + } + } + } } + return pod } @@ -84,6 +109,8 @@ class CWSK8sTaskHandler extends K8sTaskHandler { booleanInputs.add( [ name : key, value : input] ) } else if ( input instanceof Number ) { numberInputs.add( [ name : key, value : input] ) + } else if ( input instanceof Character ) { + stringInputs.add( [ name : key, value : input as String] ) } else if ( input instanceof String ) { stringInputs.add( [ name : key, value : input] ) } else if ( input instanceof GStringImpl ) { @@ -94,7 +121,7 @@ class CWSK8sTaskHandler extends K8sTaskHandler { } } - private long calculateInputSize( List> fileInputs ){ + private static long calculateInputSize(List> fileInputs ){ return fileInputs .parallelStream() .mapToLong { @@ -137,7 +164,13 @@ class CWSK8sTaskHandler extends K8sTaskHandler { return schedulerClient.registerTask( config, task.id.intValue() ) } + @Override protected BashWrapperBuilder createBashWrapper(TaskRun task) { + final CWSConfig cwsConfig = executor.getCWSConfig() + final CWSK8sConfig cwsK8sConfig = k8sConfig as CWSK8sConfig + if ( cwsConfig.strategyIsLocationAware() ) { + return new WOWK8sWrapperBuilder( task , cwsK8sConfig.getStorage(), cwsConfig.memoryPredictor as boolean ) + } return fusionEnabled() ? fusionLauncher() : new CWSK8sWrapperBuilder( task, executor.getCWSConfig().memoryPredictor as boolean ) @@ -170,22 +203,30 @@ class CWSK8sTaskHandler extends K8sTaskHandler { } } + boolean schedulerPostProcessingHasFinished(){ + Map state = schedulerClient.getTaskState(task.id.intValue()) + return (!state.state) ?: ["FINISHED", "FINISHED_WITH_ERROR", "INIT_WITH_ERRORS", "DELETED"].contains( state.state.toString() ) + } + @Override boolean checkIfCompleted() { Map state = getState() - if( !state || !state.terminated ) { + final CWSConfig cwsConfig = executor.getCWSConfig() + if( !state || !state.terminated || ( cwsConfig.strategyIsLocationAware() && !schedulerPostProcessingHasFinished() ) ) { return false } if( executor.getCWSConfig().memoryPredictor ) { memoryAdapted = client.getAdaptedPodMemory( podName ) if ( memoryAdapted != null ) { //only use a special failure logic if the memory was adapted - if (state.terminated.exitCode == 128 - && (state.terminated.reason as String) == "StartError") { + + def terminated = state.terminated as Map + if (terminated.exitCode == 128 + && (terminated.reason as String) == "StartError") { failedOOM = true log.info("The memory was choosen too small for the pod ${podName} to be started. More memory is tried next time.") task.error = new MemoryScalingFailure() - } else if ((state.terminated.reason as String) == "OOMKilled") { + } else if ((terminated.reason as String) == "OOMKilled") { failedOOM = true log.info("The memory was choosen too small for the pod ${podName} to be executed. More memory is tried next time.") task.error = new MemoryScalingFailure() @@ -212,14 +253,17 @@ class CWSK8sTaskHandler extends K8sTaskHandler { if( value == null ) continue switch( name ) { - case "scheduler_nodes_cost" : - case "scheduler_init_throughput": + case 'scheduler_nodes_cost' : + case 'scheduler_time_delta_phase_three' : traceRecord.put( name, value ) break - case "scheduler_best_cost" : + case 'scheduler_best_cost' : double val = parseDouble( value, file, name ) traceRecord.put( name, val ) break + case 'input_size' : + traceRecord.put( name, inputSize ) + break default: long val = parseLong( value, file, name ) traceRecord.put( name, val ) @@ -227,7 +271,7 @@ class CWSK8sTaskHandler extends K8sTaskHandler { } } - private double parseDouble( String str, Path file , String row ) { + private static double parseDouble(String str, Path file, String row ) { try { return str.toDouble() } @@ -237,7 +281,7 @@ class CWSK8sTaskHandler extends K8sTaskHandler { } } - private long parseLong( String str, Path file , String row ) { + private static long parseLong(String str, Path file, String row ) { try { return str.toLong() } diff --git a/plugins/nf-cws/src/main/nextflow/cws/k8s/CWSK8sWrapperBuilder.groovy b/plugins/nf-cws/src/main/nextflow/cws/k8s/CWSK8sWrapperBuilder.groovy index a9f096b..c4f7dca 100644 --- a/plugins/nf-cws/src/main/nextflow/cws/k8s/CWSK8sWrapperBuilder.groovy +++ b/plugins/nf-cws/src/main/nextflow/cws/k8s/CWSK8sWrapperBuilder.groovy @@ -1,8 +1,10 @@ package nextflow.cws.k8s +import groovy.transform.CompileStatic import nextflow.k8s.K8sWrapperBuilder import nextflow.processor.TaskRun +@CompileStatic class CWSK8sWrapperBuilder extends K8sWrapperBuilder { CWSK8sWrapperBuilder(TaskRun task, boolean memoryPredictorEnabled) { diff --git a/plugins/nf-cws/src/main/nextflow/cws/k8s/K8sSchedulerClient.groovy b/plugins/nf-cws/src/main/nextflow/cws/k8s/K8sSchedulerClient.groovy index 6aa3ac6..b507689 100644 --- a/plugins/nf-cws/src/main/nextflow/cws/k8s/K8sSchedulerClient.groovy +++ b/plugins/nf-cws/src/main/nextflow/cws/k8s/K8sSchedulerClient.groovy @@ -1,12 +1,13 @@ package nextflow.cws.k8s +import groovy.transform.CompileStatic import groovy.util.logging.Slf4j import nextflow.cws.CWSConfig import nextflow.cws.SchedulerClient import nextflow.exception.NodeTerminationException import nextflow.k8s.K8sConfig -import nextflow.k8s.client.K8sClient import nextflow.k8s.client.K8sResponseException +import nextflow.k8s.model.PodHostMount import nextflow.k8s.model.PodSecurityContext import nextflow.k8s.model.PodSpecBuilder import nextflow.k8s.model.PodVolumeClaim @@ -14,13 +15,21 @@ import nextflow.k8s.model.PodVolumeClaim import java.nio.file.Paths @Slf4j +@CompileStatic class K8sSchedulerClient extends SchedulerClient { private final CWSK8sConfig.K8sScheduler schedulerConfig - private final K8sClient k8sClient + + private final CWSK8sClient k8sClient + private final K8sConfig k8sConfig + private final String namespace + + private final Collection hostMounts + private final Collection volumeClaims + private String ip K8sSchedulerClient( @@ -29,10 +38,12 @@ class K8sSchedulerClient extends SchedulerClient { K8sConfig k8sConfig, String namespace, String runName, - K8sClient k8sClient, - Collection volumeClaims + CWSK8sClient k8sClient, + Collection volumeClaims, + Collection hostMounts ) { super( config, runName ) + this.hostMounts = hostMounts ?: [] this.volumeClaims = volumeClaims this.k8sClient = k8sClient this.k8sConfig = k8sConfig @@ -52,14 +63,13 @@ class K8sSchedulerClient extends SchedulerClient { } data.dns = getDNS() data.namespace = namespace + data.localClaims = hostMounts super.registerScheduler(data) } private void startScheduler(){ - boolean start = false Map state - try{ //If no pod with the name exists an exceptions is thrown state = k8sClient.podState( schedulerConfig.getName() ) @@ -69,7 +79,6 @@ class K8sSchedulerClient extends SchedulerClient { log.info "Scheduler ${schedulerConfig.getName()} is terminated" } else if( state.running || state.waiting ) log.trace "Scheduler ${schedulerConfig.getName()} is already running" else log.error "Unknown state for ${schedulerConfig.getName()}: ${state.toString()}" - } catch ( K8sResponseException e ) { log.error( "Got unexpected HTTP error ${e.response} while checking scheduler's state", e.message ) } catch ( NodeTerminationException ignored){ @@ -77,7 +86,6 @@ class K8sSchedulerClient extends SchedulerClient { start = true log.info "Scheduler ${schedulerConfig.getName()} can not be found and will be started..." } - if( start ){ log.trace "Scheduler ${schedulerConfig.getName()} is not running, let's start" final builder = new PodSpecBuilder() @@ -90,26 +98,22 @@ class K8sSchedulerClient extends SchedulerClient { .withNamespace( namespace ) .withLabel('component', 'scheduler') .withLabel('tier', 'control-plane') - .withLabel('app', 'nextflow') + .withLabel('nextflow.io/app', 'nextflow') + .withHostMounts( hostMounts ) .withVolumeClaims( volumeClaims ) if( schedulerConfig.getNodeSelector() ) builder.setNodeSelector( schedulerConfig.getNodeSelector() ) - if ( schedulerConfig.getWorkDir() ) builder.withWorkDir( schedulerConfig.getWorkDir() ) - if( schedulerConfig.getCommand() ) builder.withCommand( schedulerConfig.getCommand() ) - if( schedulerConfig.runAsUser() != null ){ builder.securityContext = new PodSecurityContext( schedulerConfig.runAsUser() ) } - //This is required to use the PodSpecBuilder as it is builder.command = [ "delete this" ] Map pod = builder.build() - List env = [[ name: 'SCHEDULER_NAME', value: schedulerConfig.getName() @@ -117,13 +121,12 @@ class K8sSchedulerClient extends SchedulerClient { name: 'AUTOCLOSE', value: schedulerConfig.autoClose() as String ]] - - Map container = pod.spec.containers.get(0) as Map + Map container = ((pod.spec as Map).containers as List).get(0) as Map container.put('env', env) container.remove( 'command' ) (container.resources as Map)?.remove( 'limits' ) - k8sClient.podCreate( pod, Paths.get('.nextflow-scheduler.yaml') ) + k8sClient.podCreate( pod, Paths.get('.nextflow-scheduler.yaml'), namespace) } //wait for scheduler to get ready diff --git a/plugins/nf-cws/src/main/nextflow/cws/k8s/MemoryScalingFailure.groovy b/plugins/nf-cws/src/main/nextflow/cws/k8s/MemoryScalingFailure.groovy index 937a023..96bbfe1 100644 --- a/plugins/nf-cws/src/main/nextflow/cws/k8s/MemoryScalingFailure.groovy +++ b/plugins/nf-cws/src/main/nextflow/cws/k8s/MemoryScalingFailure.groovy @@ -1,8 +1,10 @@ package nextflow.cws.k8s +import groovy.transform.CompileStatic import groovy.transform.InheritConstructors import nextflow.exception.ProcessRetryableException @InheritConstructors +@CompileStatic class MemoryScalingFailure extends RuntimeException implements ProcessRetryableException { } diff --git a/plugins/nf-cws/src/main/nextflow/cws/k8s/WOWK8sWrapperBuilder.groovy b/plugins/nf-cws/src/main/nextflow/cws/k8s/WOWK8sWrapperBuilder.groovy new file mode 100644 index 0000000..e2eacbf --- /dev/null +++ b/plugins/nf-cws/src/main/nextflow/cws/k8s/WOWK8sWrapperBuilder.groovy @@ -0,0 +1,112 @@ +package nextflow.cws.k8s + +import groovy.transform.CompileStatic +import groovy.util.logging.Slf4j +import nextflow.file.FileHelper +import nextflow.processor.TaskRun +import nextflow.util.Escape + +import java.nio.file.Path + +/** + * Implements a BASH wrapper for tasks executed by kubernetes cluster + * + * @author Paolo Di Tommaso + */ +@CompileStatic +@Slf4j +class WOWK8sWrapperBuilder extends CWSK8sWrapperBuilder { + + CWSK8sConfig.Storage storage + Path localWorkDir + + static String statFileName = "getStatsAndSymlinks" + + WOWK8sWrapperBuilder(TaskRun task, CWSK8sConfig.Storage storage, boolean memoryPredictorEnabled) { + super(task, memoryPredictorEnabled) + this.headerScript = "NXF_CHDIR=${Escape.path(task.workDir)}" + this.storage = storage + if( storage ){ + switch (storage.getCopyStrategy().toLowerCase()) { + case 'copy': + case 'ftp': + if ( this.scratch == null || this.scratch == true ){ + //Reduce amount of local data - only keep necessary outputs + this.scratch = (storage.getWorkdir() as Path).resolve( "scratch" ).toString() + this.stageOutMode = 'move' + } + break + default : + throw new IllegalArgumentException("Unsupported copy strategy: ${storage.getCopyStrategy()}") + } + if ( !this.targetDir || workDir == targetDir ) { + this.localWorkDir = FileHelper.getWorkFolder(storage.getWorkdir() as Path, task.getHash()) + this.targetDir = this.localWorkDir + } + } + } + + @Override + protected boolean shouldUnstageOutputs() { + assert localWorkDir + return super.shouldUnstageOutputs() + } + + private String getStorageLocalWorkDir() { + String localWorkDir = storage.getWorkdir() + if ( !localWorkDir.endsWith("/") ){ + localWorkDir += "/" + } + localWorkDir + } + + @Override + protected Map makeBinding() { + final Map binding = super.makeBinding() + + binding.K8sResolveSymlinks = null + + if ( binding.stage_inputs && storage && localWorkDir ) { + final String cmd = """\ + # create symlinks + if test -f "${workDir.toString()}/.command.symlinks"; then + bash "${workDir.toString()}/.command.symlinks" || true + fi + """.stripIndent() + binding.stage_inputs = cmd + binding.stage_inputs + } + + if ( localWorkDir ) { + binding.unstage_outputs = copyStrategy.getUnstageOutputFilesScript(outputFiles, localWorkDir) + } + + return binding + } + + @Override + protected String getLaunchCommand(String interpreter, String env) { + assert storage && localWorkDir + String cmd = '' + cmd += "local INFILESTIME=\$(\"/etc/nextflow/${statFileName}\" infiles \"${workDir.toString()}/.command.infiles\" \"${getStorageLocalWorkDir()}\" \"\$PWD/\" || true)\n" + cmd += super.getLaunchCommand(interpreter, env) + if( isTraceRequired() ){ + cmd += "\nlocal exitCode=\$?" + cmd += """\necho \"infiles_time=\${INFILESTIME}" >> ${workDir.resolve(TaskRun.CMD_TRACE)}\n""" + cmd += "return \$exitCode\n" + } + return cmd + } + + @Override + String getCleanupCmd(String scratch) { + assert storage && localWorkDir + String cmd = super.getCleanupCmd( scratch ) + cmd += "mkdir -p \"${localWorkDir.toString()}/\" || true\n" + cmd += "local OUTFILESTIME=\$(\"/etc/nextflow/${statFileName}\" outfiles \"${workDir.toString()}/.command.outfiles\" \"${getStorageLocalWorkDir()}\" \"${localWorkDir.toString()}/\" || true)\n" + if ( isTraceRequired() ) { + cmd += "echo \"outfiles_time=\${OUTFILESTIME}\" >> ${workDir.resolve(TaskRun.CMD_TRACE)}" + } + return cmd + } + +} \ No newline at end of file diff --git a/plugins/nf-cws/src/main/nextflow/cws/processor/CWSTaskPollingMonitor.groovy b/plugins/nf-cws/src/main/nextflow/cws/processor/CWSTaskPollingMonitor.groovy index 7784618..a908936 100644 --- a/plugins/nf-cws/src/main/nextflow/cws/processor/CWSTaskPollingMonitor.groovy +++ b/plugins/nf-cws/src/main/nextflow/cws/processor/CWSTaskPollingMonitor.groovy @@ -1,11 +1,20 @@ package nextflow.cws.processor +import groovy.transform.CompileStatic import groovy.util.logging.Slf4j import nextflow.Session +import nextflow.cws.CWSConfig +import nextflow.cws.wow.file.OfflineLocalPath +import nextflow.cws.wow.file.WOWFileAttributes +import nextflow.cws.wow.file.WorkdirPath +import nextflow.cws.wow.util.LocalFileWalker +import nextflow.processor.PublishDir +import nextflow.processor.TaskHandler import nextflow.processor.TaskPollingMonitor import nextflow.util.Duration @Slf4j +@CompileStatic class CWSTaskPollingMonitor extends TaskPollingMonitor { /** @@ -13,6 +22,8 @@ class CWSTaskPollingMonitor extends TaskPollingMonitor { */ private final SchedulerBatch schedulerBatch + private final CWSConfig cwsConfig + /** * Create the task polling monitor with the provided named parameters object. *

@@ -28,6 +39,7 @@ class CWSTaskPollingMonitor extends TaskPollingMonitor { protected CWSTaskPollingMonitor(Map params) { super(params) this.schedulerBatch = params.schedulerBatch as SchedulerBatch + cwsConfig = new CWSConfig(session.config.navigate('cws') as Map) } static TaskPollingMonitor create(Session session, String name, int defQueueSize, Duration defPollInterval, SchedulerBatch schedulerBatch ) { @@ -54,4 +66,38 @@ class CWSTaskPollingMonitor extends TaskPollingMonitor { return pendingTasks } + private static void checkPublishDirMode(TaskHandler handler ) { + def publishDirs = handler.task.config.get('publishDir') + if ( publishDirs && publishDirs instanceof List ) { + for( Object params : publishDirs ) { + if( !params ) continue + if( params instanceof Map ) { + def mode = PublishDir.create(params).getMode() + // We only support COPY and MOVE, if the user uses a different mode, we set it to COPY + if ( !(mode in [ PublishDir.Mode.COPY, PublishDir.Mode.MOVE]) ) { + params.mode = PublishDir.Mode.COPY + } + } + } + } + } + + @Override + protected void finalizeTask(TaskHandler handler) { + if (!cwsConfig.strategyIsLocationAware()) { + super.finalizeTask(handler) + return + } + def workDir = handler.task.workDir + def helper = LocalFileWalker.createWorkdirHelper( workDir ) + if ( helper ) { + def attributes = new WOWFileAttributes(workDir) + OfflineLocalPath path = new WorkdirPath( workDir, attributes, workDir, helper ) + handler.task.workDir = path + } + checkPublishDirMode(handler) + super.finalizeTask(handler) + helper?.validate() + } + } diff --git a/plugins/nf-cws/src/main/nextflow/cws/processor/SchedulerBatch.groovy b/plugins/nf-cws/src/main/nextflow/cws/processor/SchedulerBatch.groovy index ae46ddb..480e906 100644 --- a/plugins/nf-cws/src/main/nextflow/cws/processor/SchedulerBatch.groovy +++ b/plugins/nf-cws/src/main/nextflow/cws/processor/SchedulerBatch.groovy @@ -1,8 +1,12 @@ package nextflow.cws.processor +import groovy.transform.CompileStatic + +@CompileStatic abstract class SchedulerBatch { private final int batchSize + private int currentlySubmitted = 0 SchedulerBatch( int batchSize ) { diff --git a/plugins/nf-cws/src/main/nextflow/cws/wow/file/LocalFile.groovy b/plugins/nf-cws/src/main/nextflow/cws/wow/file/LocalFile.groovy new file mode 100644 index 0000000..730bd6a --- /dev/null +++ b/plugins/nf-cws/src/main/nextflow/cws/wow/file/LocalFile.groovy @@ -0,0 +1,22 @@ +package nextflow.cws.wow.file + +import groovy.transform.CompileStatic + +import java.nio.file.Path + +@CompileStatic +class LocalFile extends File { + + private final LocalPath localPath + + LocalFile( LocalPath localPath ){ + super( localPath.toString() ) + this.localPath = localPath + } + + @Override + Path toPath() { + return localPath + } + +} \ No newline at end of file diff --git a/plugins/nf-cws/src/main/nextflow/cws/wow/file/LocalPath.groovy b/plugins/nf-cws/src/main/nextflow/cws/wow/file/LocalPath.groovy new file mode 100644 index 0000000..57cbe87 --- /dev/null +++ b/plugins/nf-cws/src/main/nextflow/cws/wow/file/LocalPath.groovy @@ -0,0 +1,227 @@ +package nextflow.cws.wow.file + +import groovy.transform.CompileStatic +import groovy.util.logging.Slf4j +import nextflow.cws.wow.filesystem.WOWFileSystem + +import java.nio.file.* + +@Slf4j +@CompileStatic +class LocalPath implements Path, Serializable { + + protected final Path path + + private transient final WOWFileAttributes attributes + + boolean createdSymlinks = false + + protected Path workDir + + protected LocalPath(Path path, WOWFileAttributes attributes, Path workDir ) { + this.path = path + this.attributes = attributes + this.workDir = workDir + } + + Path getInner() { + path + } + + LocalPath toLocalPath( Path path, WOWFileAttributes attributes = null ){ + toLocalPath( path, attributes, workDir ) + } + + static LocalPath toLocalPath( Path path, WOWFileAttributes attributes, Path workDir ){ + ( path instanceof LocalPath ) ? path as LocalPath : new LocalPath( path, attributes, workDir ) + } + + T asType( Class c ) { + if ( c.isAssignableFrom( getClass() ) ) return (T) this + if ( c.isAssignableFrom( LocalPath.class ) ) return (T) toFile() + if ( c == String.class ) return (T) toString() + log.debug("Invoke method asType $c on ${this.class}") + return super.asType( c ) + } + + String getBaseName() { + path.getBaseName() + } + + boolean isDirectory( LinkOption... options ) { + attributes ? attributes.isDirectory() : 0 + } + + long size() { + attributes ? attributes.size() : 0 + } + + boolean empty(){ + this.size() == 0 + } + + boolean asBoolean(){ + true + } + + @Override + FileSystem getFileSystem() { + WOWFileSystem.INSTANCE + } + + @Override + boolean isAbsolute() { + path.isAbsolute() + } + + @Override + Path getRoot() { + path.getRoot() + } + + @Override + Path getFileName() { + path.getFileName() + } + + @Override + Path getParent() { + toLocalPath( path.getParent() ) + } + + @Override + int getNameCount() { + path.getNameCount() + } + + @Override + Path getName(int index) { + path.getName( index ) + } + + @Override + Path subpath(int beginIndex, int endIndex) { + toLocalPath( path.subpath( beginIndex, endIndex ) ) + } + + @Override + boolean startsWith(Path other) { + path.startsWith( other ) + } + + @Override + boolean startsWith(String other) { + path.startsWith(other) + } + + @Override + boolean endsWith(Path other) { + path.endsWith( other ) + } + + @Override + boolean endsWith(String other) { + path.endsWith( other ) + } + + @Override + Path normalize() { + toLocalPath( path.normalize(), attributes ) + } + + @Override + Path resolve(Path other) { + toLocalPath( path.resolve( other ), new WOWFileAttributes( other ) ) + } + + @Override + Path resolve(String other) { + resolve( Path.of( other ) ) + } + + @Override + Path resolveSibling(Path other) { + path.resolveSibling( other ) + } + + @Override + Path resolveSibling(String other) { + path.resolveSibling( other ) + } + + @Override + Path relativize(Path other) { + if ( other instanceof LocalPath ){ + def localPath = (LocalPath) other + return toLocalPath( path.relativize( localPath.path), (WOWFileAttributes) localPath.attributes, localPath.workDir ) + } + path.relativize( other ) + } + + @Override + URI toUri() { + getFileSystem().provider().getScheme() + "://" + path.toAbsolutePath() as URI + } + + String toUriString() { + getFileSystem().provider().getScheme() + ":/" + path.toAbsolutePath() + } + + Path toAbsolutePath(){ + toLocalPath( path.toAbsolutePath(), attributes ) + } + + @Override + Path toRealPath(LinkOption... options) throws IOException { + attributes.destination ? toLocalPath( attributes.destination ) : toLocalPath( path.toRealPath( options ) ) + } + + @Override + File toFile() { + new LocalFile( this ) + } + + @Override + WatchKey register(WatchService watcher, WatchEvent.Kind[] events, WatchEvent.Modifier... modifiers) throws IOException { + path.register( watcher, events, modifiers ) + } + + @Override + WatchKey register(WatchService watcher, WatchEvent.Kind... events) throws IOException { + path.register( watcher, events ) + } + + @Override + int compareTo(Path other) { + if ( other instanceof LocalPath ){ + return path <=> ((LocalPath) other).path + } + path <=> other + } + + @Override + String toString() { + path.toString() + } + + WOWFileAttributes getAttributes(){ + attributes + } + + @Override + boolean equals(Object obj) { + if ( obj instanceof LocalPath ){ + return path == ((LocalPath) obj).path + } + if ( obj instanceof Path ){ + return path == obj + } + return false + } + + @Override + int hashCode() { + path.hashCode() * 2 + 1 + } + +} \ No newline at end of file diff --git a/plugins/nf-cws/src/main/nextflow/cws/wow/file/OfflineLocalPath.groovy b/plugins/nf-cws/src/main/nextflow/cws/wow/file/OfflineLocalPath.groovy new file mode 100644 index 0000000..ae50514 --- /dev/null +++ b/plugins/nf-cws/src/main/nextflow/cws/wow/file/OfflineLocalPath.groovy @@ -0,0 +1,20 @@ +package nextflow.cws.wow.file + +import groovy.transform.CompileStatic + +import java.nio.file.Path + +/** + * We need this class to be able to iterate through a subdirectory of a workdir + */ +@CompileStatic +class OfflineLocalPath extends LocalPath { + + final WorkdirHelper workdirHelper + + OfflineLocalPath(Path path, WOWFileAttributes attributes, Path workDir, WorkdirHelper workdirHelper ) { + super(path, attributes, workDir) + this.workdirHelper = workdirHelper + } + +} diff --git a/plugins/nf-cws/src/main/nextflow/cws/wow/file/WOWFileAttributes.groovy b/plugins/nf-cws/src/main/nextflow/cws/wow/file/WOWFileAttributes.groovy new file mode 100644 index 0000000..939fc30 --- /dev/null +++ b/plugins/nf-cws/src/main/nextflow/cws/wow/file/WOWFileAttributes.groovy @@ -0,0 +1,174 @@ +package nextflow.cws.wow.file + +import groovy.transform.CompileStatic +import groovy.util.logging.Slf4j + +import java.nio.file.Path +import java.nio.file.attribute.BasicFileAttributes +import java.nio.file.attribute.FileTime + +@Slf4j +@CompileStatic +class WOWFileAttributes implements BasicFileAttributes { + + /** + * Helper to parse the file attributes returned by the getStatsAndResolveSymlinks.c + */ + static private final int FILE_EXISTS = 1 + static private final int REAL_PATH = 2 + static private final int SIZE = 3 + static private final int FILE_TYPE = 4 + static private final int CREATION_DATE = 5 + static private final int ACCESS_DATE = 6 + static private final int MODIFICATION_DATE = 7 + + private final boolean directory + + private final boolean link + + private final long size + + private final String fileType + + private final FileTime creationDate + + private final FileTime accessDate + + private final FileTime modificationDate + + private final Path destination + + private final boolean local + + private static FileTime fileTimeFromString( String date ) { + for (int i = 0; i < date.length(); i++) { + if ( !date.charAt(i).isDigit() ) { + return null + } + } + long nanos = Long.parseLong(date) + return FileTime.fromMillis(nanos / 1e6 as long) + } + + WOWFileAttributes( String[] data ) { + boolean fileExists = data[ FILE_EXISTS ] == "1" + if ( fileExists && data.length != 8 ) throw new RuntimeException( "Cannot parse row (8 columns required): ${data.join(',')}" ) + destination = data.length > REAL_PATH && data[ REAL_PATH ] ? data[ REAL_PATH ] as Path : null + if ( data.length != 8 ) { + this.directory = false + this.link = true + this.size = 0 + this.fileType = null + this.creationDate = null + this.accessDate = null + this.modificationDate = null + this.local = false + return + } + this.link = !data[ REAL_PATH ].isEmpty() + this.size = data[ SIZE ] as Long + String fileType = data[ FILE_TYPE ] + this.accessDate = fileTimeFromString(data[ ACCESS_DATE ]) + this.modificationDate = fileTimeFromString(data[ MODIFICATION_DATE ]) + this.creationDate = fileTimeFromString(data[ CREATION_DATE ]) ?: this.modificationDate + if ( fileType.startsWith("non-local ") ) { + this.local = false + this.fileType = fileType.substring( 10 ) + } else { + this.local = true + this.fileType = fileType + } + this.directory = this.fileType == 'directory' + if ( !directory && !this.fileType.contains( 'file' ) ){ + log.error( "Unknown type: ${this.fileType}" ) + } + } + + WOWFileAttributes(Path path ) { + if ( !path.exists() ) { + directory = true + link = false + size = 4096 + fileType = 'directory' + creationDate = FileTime.fromMillis( 0 ) + accessDate = FileTime.fromMillis( 0 ) + modificationDate = FileTime.fromMillis( 0 ) + destination = path + local = false + } else if ( path.isDirectory() ) { + directory = true + link = false + size = 4096 + fileType = 'directory' + creationDate = FileTime.fromMillis(0) + accessDate = FileTime.fromMillis(0) + modificationDate = FileTime.fromMillis(0) + destination = path + local = false + } else { + directory = false + link = path.isLink() + size = path.size() + fileType = link ? 'symbolic link' : 'regular file' + creationDate = null + accessDate = null + modificationDate = FileTime.fromMillis(path.lastModified()) + destination = link ? path.toRealPath() : path + local = !link + } + } + + @Override + FileTime lastModifiedTime() { + modificationDate + } + + @Override + FileTime lastAccessTime() { + accessDate + } + + @Override + FileTime creationTime() { + creationDate + } + + @Override + boolean isRegularFile() { + !directory + } + + @Override + boolean isDirectory() { + directory + } + + @Override + boolean isSymbolicLink() { + link + } + + @Override + boolean isOther() { + false + } + + @Override + long size() { + size + } + + @Override + Object fileKey() { + null + } + + Path getDestination(){ + destination + } + + boolean isLocal() { + local + } + +} \ No newline at end of file diff --git a/plugins/nf-cws/src/main/nextflow/cws/wow/file/WorkdirHelper.groovy b/plugins/nf-cws/src/main/nextflow/cws/wow/file/WorkdirHelper.groovy new file mode 100644 index 0000000..3b3420d --- /dev/null +++ b/plugins/nf-cws/src/main/nextflow/cws/wow/file/WorkdirHelper.groovy @@ -0,0 +1,80 @@ +package nextflow.cws.wow.file + +import groovy.transform.CompileStatic +import groovy.util.logging.Slf4j + +import java.nio.file.DirectoryStream +import java.nio.file.Path + +@Slf4j +@CompileStatic +class WorkdirHelper { + + private final Map paths + + private final Path rootPath + + private boolean validated = false + + WorkdirHelper( Path rootPath, Map paths) { + this.rootPath = rootPath + this.paths = paths + } + + void validate() { + validated = true + } + + boolean isValidated() { + return validated + } + + LocalPath get( Path path ) { + if ( validated ) { + throw new IllegalStateException("WorkdirHelper validated") + } + paths.get( path ) + } + + Path relativeToWorkdir( LocalPath path ) { + new LocalPath( rootPath.relativize( path.path ), path.getAttributes(), path.workDir ) + } + + int getNameCount() { + rootPath.getNameCount() + } + + DirectoryStream getDirectoryStream(Path path) { + if( validated ) { + throw new IllegalStateException("WorkdirHelper validated") + } + // If this is a local path, we need to check if the path is relative to the rootPath + boolean useLocalPath = path.startsWith(rootPath) + return new DirectoryStream() { + @Override + Iterator iterator() { + final def cp = path as LocalPath + def all = paths.entrySet().findAll { + Path toCompareAgainst = useLocalPath ? it.value.getInner() : it.key + def result = toCompareAgainst.parent == cp.getInner() + return result + }.collect { + (Path) it.value + } + return all.iterator() + } + @Override + void close() throws IOException {} + } + } + + @Override + String toString() { + return "WorkdirHelper{" + + "paths=" + paths + + ", rootPath=" + rootPath + + ", validated=" + validated + + '}' + } + +} diff --git a/plugins/nf-cws/src/main/nextflow/cws/wow/file/WorkdirPath.groovy b/plugins/nf-cws/src/main/nextflow/cws/wow/file/WorkdirPath.groovy new file mode 100644 index 0000000..43541a3 --- /dev/null +++ b/plugins/nf-cws/src/main/nextflow/cws/wow/file/WorkdirPath.groovy @@ -0,0 +1,46 @@ +package nextflow.cws.wow.file + +import groovy.transform.CompileStatic +import groovy.util.logging.Slf4j + +import java.nio.file.Path + +/** This special path is used to represent the workdir path in the local file system. + * This is required to calculate the correct relative path to the workdir. + */ +@Slf4j +@CompileStatic +class WorkdirPath extends OfflineLocalPath { + + WorkdirPath(Path path, WOWFileAttributes attributes, Path workDir, WorkdirHelper workdirHelper) { + super(path, attributes, workDir, workdirHelper) + } + + @Override + Path relativize(Path other) { + if ( other instanceof LocalPath ) { + def otherPath = ((LocalPath) other).path + if ( this.path == otherPath ) { + return Path.of("") + } + return workdirHelper.relativeToWorkdir( (LocalPath) other ) + } + return this.path.relativize( other ) + } + + /** + * This method considers that the current path != the local path. + * If a file is found in the local paths, it returns the local path. + * @param other + * @return + */ + Path resolve( String other ) { + def file = this.path.resolve( other ) + workdirHelper.get( file ) ?: file + } + + @Override + int getNameCount() { + return workdirHelper.getNameCount() + } +} diff --git a/plugins/nf-cws/src/main/nextflow/cws/wow/filesystem/WOWFileSystem.groovy b/plugins/nf-cws/src/main/nextflow/cws/wow/filesystem/WOWFileSystem.groovy new file mode 100644 index 0000000..ac0a3dd --- /dev/null +++ b/plugins/nf-cws/src/main/nextflow/cws/wow/filesystem/WOWFileSystem.groovy @@ -0,0 +1,82 @@ +package nextflow.cws.wow.filesystem + +import groovy.transform.CompileDynamic +import groovy.transform.CompileStatic + +import java.nio.file.* +import java.nio.file.attribute.UserPrincipalLookupService +import java.nio.file.spi.FileSystemProvider + +@CompileStatic +class WOWFileSystem extends FileSystem { + + static final WOWFileSystem INSTANCE = new WOWFileSystem() + + @Override + FileSystemProvider provider() { + WOWFileSystemProvider.INSTANCE + } + + @Override + void close() throws IOException { + } + + @Override + boolean isOpen() { + true + } + + @Override + boolean isReadOnly() { + false + } + + @Override + String getSeparator() { + "/" + } + + @Override + Iterable getRootDirectories() { + throw new UnsupportedOperationException("Root directories not supported by ${provider().getScheme().toUpperCase()} file system") + } + + @Override + Iterable getFileStores() { + throw new UnsupportedOperationException("File stores not supported by ${provider().getScheme().toUpperCase()} file system") + } + + @Override + Set supportedFileAttributeViews() { + new HashSet<>() + } + + @Override + Path getPath(String s, String... strings) { + throw new UnsupportedOperationException("Path get not supported by ${provider().getScheme().toUpperCase()} file system") + } + + @Override + @CompileDynamic + PathMatcher getPathMatcher(String s) { + new PathMatcher() { + private final def matcher = FileSystems.getDefault().getPathMatcher( s ) + @Override + boolean matches(Path path) { + // Make this a Unix Path and use the default matcher + return matcher.matches( Path.of(path.toString()) ) + } + } + } + + @Override + UserPrincipalLookupService getUserPrincipalLookupService() { + throw new UnsupportedOperationException("User principal lookup service not supported by ${provider().getScheme().toUpperCase()} file system") + } + + @Override + WatchService newWatchService() throws IOException { + throw new UnsupportedOperationException("Watch service not supported by ${provider().getScheme().toUpperCase()} file system") + } + +} diff --git a/plugins/nf-cws/src/main/nextflow/cws/wow/filesystem/WOWFileSystemPathFactory.groovy b/plugins/nf-cws/src/main/nextflow/cws/wow/filesystem/WOWFileSystemPathFactory.groovy new file mode 100644 index 0000000..fe5731e --- /dev/null +++ b/plugins/nf-cws/src/main/nextflow/cws/wow/filesystem/WOWFileSystemPathFactory.groovy @@ -0,0 +1,39 @@ +package nextflow.cws.wow.filesystem + +import groovy.transform.CompileStatic +import nextflow.cws.wow.file.LocalPath +import nextflow.file.FileSystemPathFactory + +import java.nio.file.Path + +@CompileStatic +class WOWFileSystemPathFactory extends FileSystemPathFactory { + + @Override + protected Path parseUri(String uri) { + if ( uri.startsWith("wow://") ) { + def of = Path.of(uri.substring(5)) + return LocalPath.toLocalPath(of, null, null ) + } + return null + } + + @Override + protected String toUriString(Path path) { + if ( path instanceof LocalPath ) { + return path.toUriString() + } + return null + } + + @Override + protected String getBashLib(Path target) { + null + } + + @Override + protected String getUploadCmd(String source, Path target) { + throw new UnsupportedOperationException("WOWFileSystemPathFactory does not support getUploadCmd") + } + +} diff --git a/plugins/nf-cws/src/main/nextflow/cws/wow/filesystem/WOWFileSystemProvider.groovy b/plugins/nf-cws/src/main/nextflow/cws/wow/filesystem/WOWFileSystemProvider.groovy new file mode 100644 index 0000000..057da10 --- /dev/null +++ b/plugins/nf-cws/src/main/nextflow/cws/wow/filesystem/WOWFileSystemProvider.groovy @@ -0,0 +1,237 @@ +package nextflow.cws.wow.filesystem + +import groovy.transform.CompileStatic +import groovy.transform.PackageScope +import groovy.util.logging.Slf4j +import nextflow.cws.SchedulerClient +import nextflow.cws.wow.file.LocalPath +import nextflow.cws.wow.file.OfflineLocalPath +import nextflow.file.FileSystemTransferAware +import sun.net.ftp.FtpClient + +import java.nio.channels.SeekableByteChannel +import java.nio.file.* +import java.nio.file.attribute.BasicFileAttributes +import java.nio.file.attribute.FileAttribute +import java.nio.file.attribute.FileAttributeView +import java.nio.file.spi.FileSystemProvider + +@Slf4j +@CompileStatic +class WOWFileSystemProvider extends FileSystemProvider implements FileSystemTransferAware { + + static final WOWFileSystemProvider INSTANCE = new WOWFileSystemProvider() + + protected SchedulerClient schedulerClient = null + + private transient final Object createSymlinkHelper = new Object() + + void registerSchedulerClient(SchedulerClient schedulerClient) throws UnsupportedOperationException { + if (this.schedulerClient != null) { + throw new UnsupportedOperationException("WOW file system does not support multiple scheduler clients") + } + this.schedulerClient = schedulerClient + } + + private FtpClient getConnection(final String node, String daemon ){ + int trial = 0 + while ( true ) { + try { + FtpClient ftpClient = FtpClient.create(daemon) + ftpClient.login("root", "password".toCharArray() ) + ftpClient.enablePassiveMode( true ) + ftpClient.setBinaryType() + return ftpClient + } catch ( IOException e ) { + if ( trial > 5 ) { + log.error("Cannot create FTP client: $daemon on $node", e) + throw e + } + sleep(Math.pow(2, trial++) as long) + daemon = schedulerClient.getDaemonOnNode(node) + } + } + } + + @PackageScope Map getLocation(LocalPath path ){ + String absolutePath = path.toAbsolutePath().toString() + Map response = schedulerClient.getFileLocation( absolutePath ) + synchronized ( createSymlinkHelper ) { + if ( !path.createdSymlinks ) { + for (Map link : (response.symlinks as List)) { + Path src = link.src as Path + Path dst = link.dst as Path + if (Files.exists(src, LinkOption.NOFOLLOW_LINKS)) { + try { + if (src.isDirectory()) src.deleteDir() + else Files.delete(src) + } catch (Exception ignored) { + log.warn("Unable to delete " + src) + } + } else { + src.parent.toFile().mkdirs() + } + try { + Files.createSymbolicLink(src, dst) + } catch (Exception ignored) { + log.warn("Unable to create symlink: " + src + " -> " + dst) + } + } + path.createdSymlinks = true + } + } + response + } + + @Override + InputStream newInputStream(Path path, OpenOption... options) { + if (schedulerClient == null) { + throw new RuntimeException("WOW file system has no registered scheduler client") + } + assert path instanceof LocalPath + Map location = getLocation( path ) + + if ( location?.sameAsEngine ) { + return Files.newInputStream(path.getInner(), options) + } + + FtpClient ftpClient = getConnection(location.node.toString(), location.daemon.toString()) + InputStream is = ftpClient.getFileStream(location.path.toString()) + return new WOWInputStream(is, schedulerClient, path, ftpClient) + } + + @Override + OutputStream newOutputStream(Path path, OpenOption... options) { + if (schedulerClient == null) { + throw new RuntimeException("WOW file system has no registered scheduler client") + } + assert path instanceof LocalPath + + OutputStream os = super.newOutputStream(path.getInner(), options) + return new WOWOutputStream(os, schedulerClient, path) + } + + @Override + String getScheme() { + "wow" + } + + @Override + FileSystem newFileSystem(URI uri, Map map) throws IOException { + getFileSystem(uri) + } + + @Override + FileSystem getFileSystem(URI uri) { + WOWFileSystem.INSTANCE + } + + @Override + Path getPath(URI uri) { + getFileSystem(uri).getPath(uri.path) + } + + @Override + SeekableByteChannel newByteChannel(Path path, Set options, FileAttribute... attrs) throws IOException { + throw new UnsupportedOperationException("New byte channel not supported by ${getScheme().toUpperCase()} file system provider") + } + + @Override + DirectoryStream newDirectoryStream(Path path, DirectoryStream.Filter filter) throws IOException { + if (path instanceof OfflineLocalPath && !((OfflineLocalPath) path).workdirHelper.isValidated()) { + return ((OfflineLocalPath) path).workdirHelper.getDirectoryStream(path) + } + + throw new UnsupportedOperationException("Directory stream not supported by ${getScheme().toUpperCase()} file system provider") + } + + @Override + void createDirectory(Path path, FileAttribute... fileAttributes) throws IOException { + throw new UnsupportedOperationException("Create directory not supported by ${getScheme().toUpperCase()} file system provider") + } + + @Override + void delete(Path path) throws IOException { + throw new UnsupportedOperationException("Delete not supported by ${getScheme().toUpperCase()} file system provider") + } + + @Override + void copy(Path path, Path path1, CopyOption... copyOptions) throws IOException { + throw new UnsupportedOperationException("Copy not supported by ${getScheme().toUpperCase()} file system provider") + } + + @Override + void move(Path path, Path path1, CopyOption... copyOptions) throws IOException { + throw new UnsupportedOperationException("Move not supported by ${getScheme().toUpperCase()} file system provider") + } + + @Override + boolean isSameFile(Path path1, Path path2) throws IOException { + path1 == path2 + } + + @Override + boolean isHidden(Path path) throws IOException { + path.getFileName().startsWith(".") + } + + @Override + FileStore getFileStore(Path path) throws IOException { + throw new UnsupportedOperationException("File store not supported by ${getScheme().toUpperCase()} file system provider") + } + + @Override + void checkAccess(Path path, AccessMode... accessModes) throws IOException { + // all access is allowed + } + + @Override + V getFileAttributeView(Path path, Class aClass, LinkOption... linkOptions) { + throw new UnsupportedOperationException("File attribute view not supported by ${getScheme().toUpperCase()} file system provider") + } + + @Override + A readAttributes(Path path, Class aClass, LinkOption... linkOptions) throws IOException { + if (path instanceof LocalPath) { + return path.getAttributes() as A + } else { + return Files.readAttributes(path, BasicFileAttributes.class, linkOptions) as A + } + } + + @Override + Map readAttributes(Path path, String s, LinkOption... linkOptions) throws IOException { + throw new UnsupportedOperationException("Read attributes not supported by ${getScheme().toUpperCase()} file system provider") + } + + @Override + void setAttribute(Path path, String s, Object o, LinkOption... linkOptions) throws IOException { + throw new UnsupportedOperationException("Set attribute not supported by ${getScheme().toUpperCase()} file system provider") + } + + @Override + boolean canUpload(Path source, Path target) { + false + } + + @Override + boolean canDownload(Path source, Path target) { + true + } + + @Override + void download(Path source, Path target, CopyOption... copyOptions) throws IOException { + try { + schedulerClient.publish( source, target, "COPY" ) + } catch ( Exception e ) { + log.error("Error downloading file from ${source} to ${target}", e) + throw e + } + } + + @Override + void upload(Path source, Path target, CopyOption... copyOptions) throws IOException { + throw new UnsupportedOperationException("Uploading not supported by ${getScheme().toUpperCase()} file system provider") + } + +} diff --git a/plugins/nf-cws/src/main/nextflow/cws/wow/filesystem/WOWInputStream.groovy b/plugins/nf-cws/src/main/nextflow/cws/wow/filesystem/WOWInputStream.groovy new file mode 100644 index 0000000..6433da5 --- /dev/null +++ b/plugins/nf-cws/src/main/nextflow/cws/wow/filesystem/WOWInputStream.groovy @@ -0,0 +1,73 @@ +package nextflow.cws.wow.filesystem + +import groovy.transform.CompileStatic +import nextflow.cws.SchedulerClient +import nextflow.cws.wow.file.LocalPath +import sun.net.ftp.FtpClient + +@CompileStatic +class WOWInputStream extends InputStream { + + private InputStream inner + private SchedulerClient schedulerClient + private LocalPath path + private boolean fullyRead + private FtpClient ftpClient + + private File temporaryFile + private OutputStream temporaryFileStream + private boolean transferredTemporaryFile + + WOWInputStream(InputStream inner, SchedulerClient schedulerClient, LocalPath path, FtpClient ftpClient) { + super() + this.inner = inner + this.schedulerClient = schedulerClient + this.path = path + this.fullyRead = false + this.ftpClient = ftpClient + this.temporaryFile = File.createTempFile("local", "buffer") + this.temporaryFileStream = temporaryFile.newOutputStream() + this.transferredTemporaryFile = false + } + + private void checkTemporaryFileTransferal() { + if (transferredTemporaryFile || !fullyRead) { + return + } + temporaryFileStream.flush() + temporaryFileStream.close() + + File file = path.getInner().toFile() + temporaryFile.moveTo(file) + transferredTemporaryFile = true + + Map location = WOWFileSystemProvider.INSTANCE.getLocation( path ) + schedulerClient.addFileLocation(path.toString(), file.size(), file.lastModified(), location.locationWrapperID as long, false) + } + + @Override + int available() throws IOException { + return inner.available() + } + + @Override + int read() throws IOException { + int b = inner.read() + if (b == -1) { + fullyRead = true + checkTemporaryFileTransferal() + } else { + temporaryFileStream.write(b) + } + return b + } + + @Override + void close() throws IOException { + fullyRead = inner.read() == -1 + inner.close() + ftpClient.close() + checkTemporaryFileTransferal() + } + +} diff --git a/plugins/nf-cws/src/main/nextflow/cws/wow/filesystem/WOWOutputStream.groovy b/plugins/nf-cws/src/main/nextflow/cws/wow/filesystem/WOWOutputStream.groovy new file mode 100644 index 0000000..56e7e8d --- /dev/null +++ b/plugins/nf-cws/src/main/nextflow/cws/wow/filesystem/WOWOutputStream.groovy @@ -0,0 +1,36 @@ +package nextflow.cws.wow.filesystem + +import groovy.transform.CompileStatic +import nextflow.cws.SchedulerClient +import nextflow.cws.wow.file.LocalPath + +@CompileStatic +class WOWOutputStream extends OutputStream { + + private OutputStream inner + + private SchedulerClient client + + private LocalPath path + + WOWOutputStream(OutputStream inner, SchedulerClient client, LocalPath path) { + super() + this.inner = inner + this.client = client + this.path = path + } + + @Override + void write(int b) throws IOException { + inner.write(b) + } + + @Override + void close() throws IOException { + inner.close() + Map location = WOWFileSystemProvider.INSTANCE.getLocation( path ) + File file = path.getInner().toFile() + client.addFileLocation(path.toString(), file.size(), file.lastModified(), location.locationWrapperID as long, true) + } + +} diff --git a/plugins/nf-cws/src/main/nextflow/cws/wow/serializer/LocalPathSerializer.groovy b/plugins/nf-cws/src/main/nextflow/cws/wow/serializer/LocalPathSerializer.groovy new file mode 100644 index 0000000..e06efb5 --- /dev/null +++ b/plugins/nf-cws/src/main/nextflow/cws/wow/serializer/LocalPathSerializer.groovy @@ -0,0 +1,32 @@ +package nextflow.cws.wow.serializer + +import com.esotericsoftware.kryo.Kryo +import com.esotericsoftware.kryo.Serializer +import com.esotericsoftware.kryo.io.Input +import com.esotericsoftware.kryo.io.Output +import groovy.transform.CompileStatic +import nextflow.cws.wow.file.LocalPath + +@CompileStatic +class LocalPathSerializer extends Serializer { + + private static final Map storage = [:] + + @Override + void write(Kryo kryo, Output output, LocalPath object) { + def content = object.getInner().toString() + synchronized (storage) { + storage.put(content, object) + } + output.writeString( content ) + } + + @Override + LocalPath read(Kryo kryo, Input input, Class type) { + def content = input.readString() + synchronized (storage) { + return storage.get(content) + } + } + +} diff --git a/plugins/nf-cws/src/main/nextflow/cws/wow/util/LocalFileWalker.groovy b/plugins/nf-cws/src/main/nextflow/cws/wow/util/LocalFileWalker.groovy new file mode 100644 index 0000000..4011bdf --- /dev/null +++ b/plugins/nf-cws/src/main/nextflow/cws/wow/util/LocalFileWalker.groovy @@ -0,0 +1,55 @@ +package nextflow.cws.wow.util + +import groovy.transform.CompileStatic +import groovy.util.logging.Slf4j +import nextflow.cws.wow.file.LocalPath +import nextflow.cws.wow.file.OfflineLocalPath +import nextflow.cws.wow.file.WOWFileAttributes +import nextflow.cws.wow.file.WorkdirHelper + +import java.nio.file.Files +import java.nio.file.Path +import java.nio.file.Paths + +@Slf4j +@CompileStatic +class LocalFileWalker { + + static private final int VIRTUAL_PATH = 0 + + static WorkdirHelper createWorkdirHelper(Path start){ + Map files = new HashMap<>() + Path rootPath = null + File file = new File( start.toString() + File.separatorChar + ".command.outfiles" ) + if ( !file.exists() ) { + log.warn( "File ${file} does not exist" ) + return null + } + String line + WorkdirHelper workdirHelper + file.withReader { reader -> + line = reader.readLine() + rootPath = Paths.get(line.split(';')[ VIRTUAL_PATH ]) + workdirHelper = new WorkdirHelper( rootPath, files ) + while ((line = reader.readLine()) != null) { + String[] data = line.split(';') + String path = data[ VIRTUAL_PATH ] + WOWFileAttributes attributes = new WOWFileAttributes( data ) + Path currentPath = Paths.get(path) + if ( !attributes.local ) { + //If task did not run on local machine, create symbolic link + if ( !Files.isSymbolicLink( currentPath ) ) { + Files.createDirectories( currentPath.getParent() ) + Files.createSymbolicLink( currentPath, attributes.destination ) + } + } else { + def localPath = new OfflineLocalPath(currentPath, attributes, start, workdirHelper) + def pathOnSharedFs = start.resolve(rootPath.relativize(currentPath)) + files.put( pathOnSharedFs, localPath ) + } + } + return workdirHelper + } + } + +} \ No newline at end of file diff --git a/plugins/nf-cws/src/resources/META-INF/MANIFEST.MF b/plugins/nf-cws/src/resources/META-INF/MANIFEST.MF index ace7073..9097f88 100644 --- a/plugins/nf-cws/src/resources/META-INF/MANIFEST.MF +++ b/plugins/nf-cws/src/resources/META-INF/MANIFEST.MF @@ -1,6 +1,6 @@ Manifest-Version: 1.0 Plugin-Id: nf-cws -Plugin-Version: 1.0.6 +Plugin-Version: 2.0.0 Plugin-Class: nextflow.cws.CWSPlugin Plugin-Provider: nextflow Plugin-Requires: >=23.05.0-edge diff --git a/plugins/nf-cws/src/resources/META-INF/extensions.idx b/plugins/nf-cws/src/resources/META-INF/extensions.idx index a8d9308..557340d 100644 --- a/plugins/nf-cws/src/resources/META-INF/extensions.idx +++ b/plugins/nf-cws/src/resources/META-INF/extensions.idx @@ -1,2 +1,3 @@ nextflow.cws.CWSFactory -nextflow.cws.k8s.CWSK8sExecutor \ No newline at end of file +nextflow.cws.k8s.CWSK8sExecutor +nextflow.cws.wow.filesystem.WOWFileSystemPathFactory \ No newline at end of file diff --git a/plugins/nf-cws/src/resources/nf-cws/getStatsAndResolveSymlinks.c b/plugins/nf-cws/src/resources/nf-cws/getStatsAndResolveSymlinks.c new file mode 100644 index 0000000..8c67994 --- /dev/null +++ b/plugins/nf-cws/src/resources/nf-cws/getStatsAndResolveSymlinks.c @@ -0,0 +1,425 @@ +#include +#include +#include +#include +#include +#include +#include +#include + +#define FULL_DESCR 0 +#define SHORT_DESCR_WITH_TIMESTAMP 1 + +#define INFILES_NAME "infiles" +#define OUTFILES_NAME "outfiles" + +#define STACK_MIN_SIZE 1 + +#define SYMLINK_TARGET_BUFFER_SIZE 500 + +struct symlink { + char * src; + char * dst; +}; + +struct stack { + int top; + int size; + struct symlink * items; +}; + +struct stack * newStack(int size) { + struct stack * ptr = (struct stack *) malloc(sizeof(struct stack *)); + ptr->top = -1; + ptr->items = (struct symlink *) malloc(sizeof(struct symlink) * size); + ptr->size = size; + return ptr; +} + +int isEmpty(struct stack * ptr) { + return (ptr->top == -1); +} + +struct stack * push_symlink(struct stack * ptr, char * const src, char * const dst) { + if (ptr->top + 1 == ptr->size) { + struct stack * new_ptr = newStack(ptr->size * 2); + new_ptr->items = (struct symlink *) realloc(ptr->items, sizeof(struct symlink) * ptr->size * 2); + new_ptr->top = ptr->top; + free(ptr); + ptr = new_ptr; + } + ptr->top++; + char * source = (char *) malloc(strlen(src) + 1); + strcpy(source, src); + char * destination = (char *) malloc(strlen(dst) + 1); + strcpy(destination, dst); + ptr->items[ptr->top].src = source; + ptr->items[ptr->top].dst = destination; + return ptr; +} + +const struct symlink * const head(struct stack * ptr) { + if (isEmpty(ptr)) { + fprintf(stderr, "Error: Stack is empty!"); + exit(EXIT_FAILURE); + } + return &(ptr->items[ptr->top]); +} + +void delete_top(struct stack * ptr) { + if (isEmpty(ptr)) { + fprintf(stderr, "Error: Stack is empty!"); + exit(EXIT_FAILURE); + } + free(ptr->items[ptr->top].src); + free(ptr->items[ptr->top].dst); + ptr->top--; +} + +void deleteStack(struct stack * ptr) { + while (!isEmpty(ptr)) { + delete_top(ptr); + } + ptr->size = -1; + free(ptr->items); + free(ptr); +} + +int collectFileInformation(char * const * dir_to_search, const int version, + const char * const local_dir, + const char * const result_filename); +int getFullDescr(char * const * dir, + const char * const local_dir, + FILE * file_ptr); +int getShortDescrAndTimestamp(char * const * dir, + const char * const local_dir, + FILE * file_ptr); + + +int main(int arc, char * const argv[]) { + + if (arc < 5) { + fprintf(stderr, "Error: too few arguments!\n"); + return -1; + } + argv++; + struct timespec start_time, end_time; + int gettimeofday_rc; + if ((gettimeofday_rc = clock_gettime(CLOCK_REALTIME, &start_time)) != 0) { + fprintf(stderr, "Error getting the time of day\n"); + return gettimeofday_rc; + } + + if (strcmp(argv[0], INFILES_NAME) != 0 && strcmp(argv[0], OUTFILES_NAME) != 0) { + fprintf(stderr, "Error: version must be '%s' or '%s'\n", INFILES_NAME, OUTFILES_NAME); + return -1; + } + const char * const name = argv++[0]; + const int version = strcmp(name, INFILES_NAME) == 0 + ? SHORT_DESCR_WITH_TIMESTAMP : FULL_DESCR; + const char * const result_filename = argv++[0]; + const char * const local_dir = argv++[0]; + int rc = collectFileInformation(argv, version, local_dir, result_filename); + + if ((gettimeofday_rc = clock_gettime(CLOCK_REALTIME, &end_time)) != 0) { + fprintf(stderr, "Error getting the time of day\n"); + return gettimeofday_rc; + } + long long int start = start_time.tv_sec * 1000000000 + start_time.tv_nsec; + long long int end = end_time.tv_sec * 1000000000 + end_time.tv_nsec; + printf("%lli\n", (end - start) / 1000000); + + return rc; +} + +int collectFileInformation(char * const * dir_to_search, const int version, + const char * const local_dir, + const char * const result_filename) { + + DIR* dir_ptr = opendir(local_dir); + if (dir_ptr) { + closedir(dir_ptr); + } else { + fprintf(stderr, "Error: the local directory '%s' does not exist.\n", local_dir); + return -1; + } + + dir_ptr = opendir(dir_to_search[0]); + if (dir_ptr) { + closedir(dir_ptr); + } else { + fprintf(stderr, "Error: the directory to search '%s' does not exist.\n", dir_to_search[0]); + return -1; + } + + FILE * file_ptr = fopen(result_filename, "w"); + if (file_ptr == NULL) { + fprintf(stderr, "Error opening the file %s\n", result_filename); + return -1; + } + int rc; + switch (version) { + case FULL_DESCR: + rc = getFullDescr(dir_to_search, local_dir, file_ptr); + break; + case SHORT_DESCR_WITH_TIMESTAMP: + rc = getShortDescrAndTimestamp(dir_to_search, local_dir, file_ptr); + break; + + default: + break; + } + fclose(file_ptr); + return rc; +} + +int getFullDescr(char * const * dir, + const char * const local_dir, + FILE * file_ptr) { + + FTS * fts_ptr; + FTSENT * ptr, * ch_ptr; + int fts_options = FTS_PHYSICAL; + + if ((fts_ptr = fts_open(dir, fts_options, NULL)) == NULL) { + fprintf(stderr, "Error traversing the directory %s\n", dir[0]); + return -1; + } + ch_ptr = fts_children(fts_ptr, 0); + if (ch_ptr == NULL) { + return 0; + } + int skip_next = 0; + struct stack * symlink_stack = newStack(STACK_MIN_SIZE); + char * symlink_target_path = (char *) malloc(SYMLINK_TARGET_BUFFER_SIZE); + while ((ptr = fts_read(fts_ptr)) != NULL) { + if (ptr->fts_info == FTS_DP) { + continue; + } + if (skip_next) { + skip_next = 0; + continue; + } + char * file_type; + strcpy(symlink_target_path, ""); + int exists; + switch (ptr->fts_info) { + case FTS_D: + file_type = "directory"; + exists = 1; + break; + case FTS_F: + file_type = "regular file"; + exists = 1; + break; + case FTS_SL: + file_type = "symbolic link"; + realpath(ptr->fts_path, symlink_target_path); + if (symlink_target_path == NULL) { + strcpy(symlink_target_path, ""); + } + exists = 1 + access(symlink_target_path, F_OK); // test for file existence + if (!exists) { + break; + } + // checking whether it is a directory: + struct stat target_file_stat; + int stat_rv; + if ((stat_rv = stat(symlink_target_path, &target_file_stat)) != 0) { + fprintf(stderr, "Error reading the file %s\n", symlink_target_path); + return stat_rv; + } + if (S_ISDIR(target_file_stat.st_mode)) { + // if target is not local, we skip it + if (strncmp(symlink_target_path, local_dir, strlen(local_dir)) != 0) { + file_type = "non-local directory"; + break; + } + file_type = "directory"; + // if target is within the directory we are searching, we skip it, + // to prevent searching a directory more than once + if (strncmp(symlink_target_path, dir[0], strlen(dir[0])) == 0) { + break; + } + fts_set(fts_ptr, ptr, FTS_FOLLOW); + symlink_stack = push_symlink(symlink_stack, ptr->fts_path, symlink_target_path); + skip_next = 1; + } else if (S_ISREG(target_file_stat.st_mode)) { + file_type = "regular file"; + } + break; + + default: + file_type = "unknown"; + exists = 1; + break; + } + if (!isEmpty(symlink_stack) && strcmp(file_type, "symbolic link") != 0) { + while (strncmp(head(symlink_stack)->src, ptr->fts_path, strlen(head(symlink_stack)->src)) != 0) { + delete_top(symlink_stack); + if (isEmpty(symlink_stack)) { + break; + } + } + if (!isEmpty(symlink_stack)) { + strcpy(symlink_target_path, head(symlink_stack)->dst); + int fts_len = strlen(ptr->fts_path); + int to_replace_len = strlen(head(symlink_stack)->src); + int rel_len = fts_len - to_replace_len; + char rel_path[rel_len+1]; + memcpy(rel_path, &ptr->fts_path[to_replace_len], rel_len+1); + strcat(symlink_target_path, rel_path); + } + } + fprintf( + file_ptr, + "%s;%i;%s;%li;%s;%li%09li;%li%09li;%li%09li\n", + ptr->fts_path, + exists, + symlink_target_path, + ptr->fts_statp->st_size, + file_type, + ptr->fts_statp->st_ctim.tv_sec, ptr->fts_statp->st_ctim.tv_nsec, + // ctim - time of last status change which is used as an approximation of the creation time + ptr->fts_statp->st_atim.tv_sec, ptr->fts_statp->st_atim.tv_nsec, + ptr->fts_statp->st_mtim.tv_sec, ptr->fts_statp->st_mtim.tv_nsec + ); + } + free(symlink_target_path); + deleteStack(symlink_stack); + fts_close(fts_ptr); + return 0; +} + +int getShortDescrAndTimestamp(char * const * dir, + const char * const local_dir, + FILE * file_ptr) { + + struct timespec time_now; + int gettimeofday_rc; + if ((gettimeofday_rc = clock_gettime(CLOCK_REALTIME, &time_now)) != 0) { + fprintf(stderr, "Error getting the time of day\n"); + return gettimeofday_rc; + } + fprintf(file_ptr, "%li%li\n", time_now.tv_sec, time_now.tv_nsec); + + fprintf(file_ptr, "%s\n", dir[0]); + + int prefix_len = strlen(dir[0]); + if (dir[0][prefix_len-1] != '/') { + prefix_len++; + } + + FTS * fts_ptr; + FTSENT * ptr, * ch_ptr; + int fts_options = FTS_PHYSICAL; + + if ((fts_ptr = fts_open(dir, fts_options, NULL)) == NULL) { + fprintf(stderr, "Error traversing the directory %s\n", dir[0]); + return -1; + } + ch_ptr = fts_children(fts_ptr, 0); + if (ch_ptr == NULL) { + return 0; + } + int skip_next = 0; + struct stack * symlink_stack = newStack(STACK_MIN_SIZE); + char * symlink_target_path = (char *) malloc(SYMLINK_TARGET_BUFFER_SIZE); + while ((ptr = fts_read(fts_ptr)) != NULL) { + if (ptr->fts_info == FTS_DP) { + continue; + } + if (strncmp(ptr->fts_path, dir[0], strlen(ptr->fts_path)) == 0) { + continue; + } + if (skip_next) { + skip_next = 0; + continue; + } + char * file_type; + strcpy(symlink_target_path, ""); + int exists; + switch (ptr->fts_info) { + case FTS_D: + file_type = "directory"; + exists = 1; + break; + case FTS_F: + file_type = "regular file"; + exists = 1; + break; + case FTS_SL: + file_type = "symbolic link"; + int bytes_written = readlink(ptr->fts_path, symlink_target_path, SYMLINK_TARGET_BUFFER_SIZE); + symlink_target_path[bytes_written] = '\0'; + // realpath(ptr->fts_path, symlink_target_path); + if (symlink_target_path == NULL) { + strcpy(symlink_target_path, ""); + } + exists = 1 + access(symlink_target_path, F_OK); // test for file existence + if (!exists) { + break; + } + // checking whether it is a directory: + struct stat target_file_stat; + int stat_rv; + if ((stat_rv = stat(symlink_target_path, &target_file_stat)) != 0) { + fprintf(stderr, "Error reading the file %s\n", symlink_target_path); + return stat_rv; + } + if (S_ISDIR(target_file_stat.st_mode)) { + // if target is not local, we skip it + if (strncmp(symlink_target_path, local_dir, strlen(local_dir)) != 0) { + file_type = "non-local directory"; + break; + } + file_type = "directory"; + // if target is within the directory we are searching, we skip it, + // to prevent searching a directory more than once + if (strncmp(symlink_target_path, dir[0], strlen(dir[0])) == 0) { + break; + } + fts_set(fts_ptr, ptr, FTS_FOLLOW); + symlink_stack = push_symlink(symlink_stack, ptr->fts_path, symlink_target_path); + skip_next = 1; + } else if (S_ISREG(target_file_stat.st_mode)) { + file_type = "regular file"; + } + break; + + default: + file_type = "unknown"; + exists = 1; + break; + } + if (!isEmpty(symlink_stack) && strcmp(file_type, "symbolic link") != 0) { + while (strncmp(head(symlink_stack)->src, ptr->fts_path, strlen(head(symlink_stack)->src)) != 0) { + delete_top(symlink_stack); + if (isEmpty(symlink_stack)) { + break; + } + } + if (!isEmpty(symlink_stack)) { + strcpy(symlink_target_path, head(symlink_stack)->dst); + int fts_len = strlen(ptr->fts_path); + int to_replace_len = strlen(head(symlink_stack)->src); + int rel_len = fts_len - to_replace_len; + char rel_path[rel_len+1]; + memcpy(rel_path, &ptr->fts_path[to_replace_len], rel_len+1); + strcat(symlink_target_path, rel_path); + } + } + fprintf( + file_ptr, + "%s;%i;%s;%s\n", + ptr->fts_path + prefix_len, + exists, + symlink_target_path, + file_type + ); + } + free(symlink_target_path); + deleteStack(symlink_stack); + fts_close(fts_ptr); + return 0; +} diff --git a/plugins/nf-cws/src/test/nextflow/cws/wow/file/WOWFileSystemTest.groovy b/plugins/nf-cws/src/test/nextflow/cws/wow/file/WOWFileSystemTest.groovy new file mode 100644 index 0000000..8acc2a3 --- /dev/null +++ b/plugins/nf-cws/src/test/nextflow/cws/wow/file/WOWFileSystemTest.groovy @@ -0,0 +1,20 @@ +package nextflow.cws.wow.file + +import nextflow.cws.wow.filesystem.WOWFileSystem +import spock.lang.Specification + +import java.nio.file.Path + +class OWFileSystemTest extends Specification { + + def "GetPathMatcher"() { + given: + def fileSystem = new WOWFileSystem() + + when: + def matcher = fileSystem.getPathMatcher("glob:*_data") + + then: + matcher.matches(Path.of("/localdata/localwork/c4/6274337c27f9979441ab60afdd145d/multiqc_data").getFileName()) + } +} diff --git a/plugins/nf-cws/src/test/nextflow/cws/wow/file/WorkdirHelperTest.groovy b/plugins/nf-cws/src/test/nextflow/cws/wow/file/WorkdirHelperTest.groovy new file mode 100644 index 0000000..f45eefd --- /dev/null +++ b/plugins/nf-cws/src/test/nextflow/cws/wow/file/WorkdirHelperTest.groovy @@ -0,0 +1,50 @@ +package nextflow.cws.wow.file + + +import spock.lang.Specification + +import java.nio.file.Path + +class WorkdirHelperTest extends Specification { + + def "GetDirectoryStream"() { + String localPath = "/localdata/localwork/be/8292aaebea2ddf9ae8ad4952882dcb" + String sharedPath = "/input/data/work/be/8292aaebea2ddf9ae8ad4952882dcb" + def localPath1 = LocalPath.toLocalPath(Path.of(localPath, "file.txt"), null, null) + def localPath2 = LocalPath.toLocalPath(Path.of(localPath, "a/file2.txt"), null, null) + def localPath3 = LocalPath.toLocalPath(Path.of(localPath, "b/file3.txt"), null, null) + def localPath4 = LocalPath.toLocalPath(Path.of(localPath, "b/c/"), null, null) + def localPath5 = LocalPath.toLocalPath(Path.of(localPath, "b/c/file4.txt"), null, null) + def localPath6 = LocalPath.toLocalPath(Path.of(localPath, "a/"), null, null) + def localPath7 = LocalPath.toLocalPath(Path.of(localPath, "b/"), null, null) + def sharedPath1 = LocalPath.toLocalPath(Path.of(sharedPath, "file.txt"), null, null) + def sharedPath2 = LocalPath.toLocalPath(Path.of(sharedPath, "a/file2.txt"), null, null) + def sharedPath3 = LocalPath.toLocalPath(Path.of(sharedPath, "b/file3.txt"), null, null) + def sharedPath4 = LocalPath.toLocalPath(Path.of(sharedPath, "b/c/"), null, null) + def sharedPath5 = LocalPath.toLocalPath(Path.of(sharedPath, "b/c/file4.txt"), null, null) + def sharedPath6 = LocalPath.toLocalPath(Path.of(sharedPath, "a/"), null, null) + def sharedPath7 = LocalPath.toLocalPath(Path.of(sharedPath, "b/"), null, null) + Map files = new HashMap<>() + files.put( sharedPath1, localPath1 ) + files.put( sharedPath2, localPath2 ) + files.put( sharedPath3, localPath3 ) + files.put( sharedPath4, localPath4 ) + files.put( sharedPath5, localPath5 ) + files.put( sharedPath6, localPath6 ) + files.put( sharedPath7, localPath7 ) + def helper = new WorkdirHelper( Path.of(localPath), files ) + def workDir = Path.of("/input/data/work/be/8292aaebea2ddf9ae8ad4952882dcb") + def attributes = new WOWFileAttributes(workDir) + WorkdirPath path = new WorkdirPath( workDir, attributes, workDir, helper ) + def stream = helper.getDirectoryStream( path ) + def result = stream.collect() + + expect: + result.size() == 3 + localPath1 in result + localPath6 in result + localPath7 in result + } + + +} diff --git a/plugins/nf-cws/src/test/nextflow/cws/wow/file/WorkdirPathTest.groovy b/plugins/nf-cws/src/test/nextflow/cws/wow/file/WorkdirPathTest.groovy new file mode 100644 index 0000000..13abf10 --- /dev/null +++ b/plugins/nf-cws/src/test/nextflow/cws/wow/file/WorkdirPathTest.groovy @@ -0,0 +1,33 @@ +package nextflow.cws.wow.file + + +import spock.lang.Specification + +import java.nio.file.Path + +class WorkdirPathTest extends Specification { + + def "matches"() { + when: + def path = Path.of("file.txt").getFileName() + + then: + path.getFileSystem().getPathMatcher( "glob:*.txt" ).matches( path ) + } + + def "resolve"() { + when: + def workDir = Path.of("/input/data/work/be/8292aaebea2ddf9ae8ad4952882dcb") + def localPath = LocalPath.toLocalPath(Path.of("/localdata/localwork/be/8292aaebea2ddf9ae8ad4952882dcb/file.txt"), null, null) + Map files = new HashMap<>() + files.put(workDir.resolve("file.txt"), localPath) + def helper = new WorkdirHelper( Path.of("/localdata/localwork/be/8292aaebea2ddf9ae8ad4952882dcb/"), files ) + def attributes = new WOWFileAttributes(workDir) + WorkdirPath path = new WorkdirPath( workDir, attributes, workDir, helper ) + + then: + def resolve = path.resolve("file.txt") + resolve == localPath + } + +}