Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,12 @@ k8s {

| Attribute | Required | Explanation |
|:----------------------|----------|-------------------------------------------------------------------------------------------------|
| localPath | yes | Host path for the local mount
| localStorageMountPath | no | Container path for the local mount
| storage.copyStrategy | no | Strategy to copy the files between nodes - currently only supports 'ftp' (and its alias 'copy')
| storage.workdir | no | Working directory to use - must be inside of the locally mounted directory
| localPath | yes | Host path for the local mount |
| localStorageMountPath | no | Container path for the local mount |
| storage.copyStrategy | no | Strategy to copy the files between nodes - currently only supports 'ftp' (and its alias 'copy') |
| storage.workdir | no | Working directory to use - must be inside of the locally mounted directory |
| storage.cpu | no | CPU to use for daemons running on all nodes, default: empty - no limits |
| storage.memory | no | Memory to use for daemons running on all nodes, default: 256Mi |

### Tracing

Expand Down
12 changes: 12 additions & 0 deletions plugins/nf-cws/src/main/nextflow/cws/k8s/CWSK8sClient.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,18 @@ class CWSK8sClient extends K8sClient {
configCreate0(spec)
}

K8sResponseJson podDelete(String name) {
try {
return super.podDelete( name )
} catch ( Exception e ) {
log.warn "Failed to delete pod '$name' -- cause: ${e.message ?: e}"
if ( 'returned an error code=404' in e.message ) {
return null
}
throw e
}
}

/**
* Get the memory of a pod that has been adapted by the CWS
* @param podName The name of the pod
Expand Down
12 changes: 12 additions & 0 deletions plugins/nf-cws/src/main/nextflow/cws/k8s/CWSK8sConfig.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -180,5 +180,17 @@ class CWSK8sConfig extends K8sConfig {
String getCmd() {
target.cmd as String
}

Double getCpu() {
if ( target.cpu ) {
return target.cpu as double
}
return null
}

String getMemory() {
target.memory as String ?: '256Mi'
}

}
}
20 changes: 17 additions & 3 deletions plugins/nf-cws/src/main/nextflow/cws/k8s/CWSK8sExecutor.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -272,19 +272,33 @@ class CWSK8sExecutor extends K8sExecutor implements ExtensionPoint {
mounts << claim
}

CWSK8sConfig.Storage storage = (k8sConfig as CWSK8sConfig).getStorage()

String name = "mount-${session.runName.replace('_', '-')}"
def resources = [
limits: [
memory: storage.getMemory()
],
requests: [
memory: storage.getMemory()
]
]
if ( storage.getCpu() ) {
(resources.limits as Map).put( 'cpu', storage.getCpu() )
(resources.requests as Map).put( 'cpu', storage.getCpu() )
}
def spec = [
containers: [ [
name: name,
image: (k8sConfig as CWSK8sConfig).getStorage().getImageName(),
volumeMounts: mounts,
imagePullPolicy : 'IfNotPresent'
imagePullPolicy : 'IfNotPresent',
resources: resources
] ],
volumes: volumes,
serviceAccount: client.config.serviceAccount
]

CWSK8sConfig.Storage storage = (k8sConfig as CWSK8sConfig).getStorage()
if( storage.getNodeSelector() )
spec.put( 'nodeSelector', storage.getNodeSelector().toSpec() as Serializable )

Expand All @@ -299,8 +313,8 @@ class CWSK8sExecutor extends K8sExecutor implements ExtensionPoint {
namespace: k8sConfig.getNamespace() ?: 'default'
],
spec : [
restartPolicy: 'Always',
template: [
restartPolicy: 'Always',
metadata: [
labels: [
name : name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import nextflow.cws.CWSConfig
import nextflow.cws.SchedulerClient
import nextflow.exception.ProcessRetryableException
import nextflow.executor.BashWrapperBuilder
import nextflow.extension.GroupKey
import nextflow.file.FileHolder
Expand Down Expand Up @@ -198,7 +199,9 @@ class CWSK8sTaskHandler extends K8sTaskHandler {
try {
return super.checkIfRunning()
} catch ( Exception e) {
log.error("Error checking if task is running", e)
if ( e instanceof ProcessRetryableException ) {
throw new RuntimeException( "Exception while checking if task is running", e )
}
throw e
}
}
Expand Down