Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
## New Features

* New utility methods `SftpClient.put(Path localFile, String remoteFileName)` and `SftpClient.put(InputStream in, String remoteFileName)` facilitate SFTP file uploading.
* [GH-539](https://github.com/apache/mina-sshd/issues/539) Implement no-flow-control extension

## Potential compatibility issues

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
Expand All @@ -42,12 +43,14 @@
import org.apache.sshd.client.auth.AuthenticationIdentitiesProvider;
import org.apache.sshd.client.config.hosts.HostConfigEntry;
import org.apache.sshd.client.session.ClientSession;
import org.apache.sshd.common.PropertyResolver;
import org.apache.sshd.common.SshConstants;
import org.apache.sshd.common.session.Session;
import org.apache.sshd.common.util.GenericUtils;
import org.apache.sshd.common.util.ReflectionUtils;
import org.apache.sshd.common.util.io.input.NoCloseInputStream;
import org.apache.sshd.common.util.threads.ThreadUtils;
import org.apache.sshd.core.CoreModuleProperties;
import org.apache.sshd.scp.client.ScpClient;
import org.apache.sshd.scp.client.ScpClient.Option;
import org.apache.sshd.scp.client.ScpClientCreator;
Expand All @@ -61,6 +64,8 @@
import org.apache.sshd.scp.common.helpers.ScpTimestampCommandDetails;
import org.slf4j.Logger;

import static org.apache.sshd.common.PropertyResolverUtils.toPropertyResolver;

/**
* @see <A HREF="https://man7.org/linux/man-pages/man1/scp.1.html">SCP(1) - manual page</A>
* @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
Expand Down Expand Up @@ -151,7 +156,7 @@ public static String[] normalizeCommandArguments(PrintStream stdout, PrintStream
return null;
}

return effective.toArray(new String[effective.size()]);
return effective.toArray(new String[0]);
}

/* -------------------------------------------------------------------------------- */
Expand Down Expand Up @@ -248,11 +253,11 @@ public static void showUsageMessage(PrintStream stderr) {
public static void xferLocalToRemote(
BufferedReader stdin, PrintStream stdout, PrintStream stderr, String[] args,
ScpLocation source, ScpLocation target, Collection<Option> options,
OutputStream logStream, Level level, boolean quiet)
OutputStream logStream, Level level, boolean quiet, PropertyResolver defaultOptions)
throws Exception {
ScpClientCreator creator = resolveScpClientCreator(stderr, args);
ClientSession session = ((logStream == null) || (creator == null) || GenericUtils.isEmpty(args))
? null : setupClientSession(SCP_PORT_OPTION, stdin, level, stdout, stderr, args);
? null : setupClientSession(SCP_PORT_OPTION, stdin, level, stdout, stderr, args, defaultOptions);
if (session == null) {
showUsageMessage(stderr);
System.exit(-1);
Expand Down Expand Up @@ -330,10 +335,10 @@ private void logEvent(
public static void xferRemoteToRemote(
BufferedReader stdin, PrintStream stdout, PrintStream stderr, String[] args,
ScpLocation source, ScpLocation target, Collection<Option> options,
OutputStream logStream, Level level, boolean quiet)
OutputStream logStream, Level level, boolean quiet, PropertyResolver defaultOptions)
throws Exception {
ClientSession srcSession = ((logStream == null) || GenericUtils.isEmpty(args))
? null : setupClientSession(SCP_PORT_OPTION, stdin, level, stdout, stderr, args);
? null : setupClientSession(SCP_PORT_OPTION, stdin, level, stdout, stderr, args, defaultOptions);
if (srcSession == null) {
showUsageMessage(stderr);
System.exit(-1);
Expand Down Expand Up @@ -444,6 +449,10 @@ public static void main(String[] args) throws Exception {
new InputStreamReader(new NoCloseInputStream(System.in), Charset.defaultCharset()))) {
args = normalizeCommandArguments(stdout, stderr, args);

PropertyResolver defaultOptions = toPropertyResolver(new HashMap<>());
CoreModuleProperties.NO_FLOW_CONTROL.set(defaultOptions, Boolean.TRUE);
CoreModuleProperties.WINDOW_SIZE.set(defaultOptions, 1024L * 1024L * 1024L);

Level level = Level.SEVERE;
int numArgs = GenericUtils.length(args);
// see the way normalizeCommandArguments works...
Expand Down Expand Up @@ -472,9 +481,11 @@ public static void main(String[] args) throws Exception {
}

if (threeWay) {
xferRemoteToRemote(stdin, stdout, stderr, args, source, target, options, logStream, level, quiet);
xferRemoteToRemote(stdin, stdout, stderr, args, source, target, options, logStream, level, quiet,
defaultOptions);
} else {
xferLocalToRemote(stdin, stdout, stderr, args, source, target, options, logStream, level, quiet);
xferLocalToRemote(stdin, stdout, stderr, args, source, target, options, logStream, level, quiet,
defaultOptions);
}
} finally {
if ((logStream != stdout) && (logStream != stderr)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.ServiceLoader;
Expand All @@ -47,6 +48,7 @@
import org.apache.sshd.client.ClientFactoryManager;
import org.apache.sshd.client.session.ClientSession;
import org.apache.sshd.common.NamedResource;
import org.apache.sshd.common.PropertyResolver;
import org.apache.sshd.common.ServiceFactory;
import org.apache.sshd.common.channel.ChannelFactory;
import org.apache.sshd.common.cipher.CipherFactory;
Expand All @@ -71,6 +73,7 @@
import org.apache.sshd.common.util.io.output.LineLevelAppenderStream;
import org.apache.sshd.common.util.io.output.NullOutputStream;
import org.apache.sshd.common.util.threads.ThreadUtils;
import org.apache.sshd.core.CoreModuleProperties;
import org.apache.sshd.server.config.SshServerConfigFileReader;
import org.apache.sshd.sftp.client.SftpClient;
import org.apache.sshd.sftp.client.SftpClient.Attributes;
Expand All @@ -91,6 +94,8 @@
import org.apache.sshd.sftp.common.extensions.openssh.StatVfsExtensionParser;
import org.slf4j.Logger;

import static org.apache.sshd.common.PropertyResolverUtils.toPropertyResolver;

/**
* TODO Add javadoc
*
Expand Down Expand Up @@ -366,9 +371,13 @@ public static void main(String[] args) throws Exception {
setupLogging(level, stdout, stderr, logStream);
}

PropertyResolver defaultOptions = toPropertyResolver(new HashMap<>());
CoreModuleProperties.NO_FLOW_CONTROL.set(defaultOptions, Boolean.TRUE);
CoreModuleProperties.WINDOW_SIZE.set(defaultOptions, 1024L * 1024L * 1024L);

ClientSession session = (logStream == null)
? null
: setupClientSession(SFTP_PORT_OPTION, stdin, level, stdout, stderr, args);
: setupClientSession(SFTP_PORT_OPTION, stdin, level, stdout, stderr, args, defaultOptions);
if (session == null) {
System.err.println("usage: sftp [-v[v][v]] [-E logoutput] [-i identity] [-io nio2|mina|netty]"
+ " [-J proxyJump] [-l login] [" + SFTP_PORT_OPTION + " port] [-o option=value]"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,15 @@ public static boolean isArgumentedOption(String portOption, String argName) {
// CHECKSTYLE:OFF
public static ClientSession setupClientSession(
String portOption, BufferedReader stdin, Level level,
PrintStream stdout, PrintStream stderr, String... args)
PrintStream stdout, PrintStream stderr, String[] args)
throws Exception {
return setupClientSession(portOption, stdin, level, stdout, stderr, args, null);
}

public static ClientSession setupClientSession(
String portOption, BufferedReader stdin, Level level,
PrintStream stdout, PrintStream stderr, String[] args,
PropertyResolver defaultOptions)
throws Exception {
int port = -1;
String host = null;
Expand Down Expand Up @@ -240,7 +248,8 @@ public static ClientSession setupClientSession(
return null;
}

PropertyResolver resolver = PropertyResolverUtils.toPropertyResolver(options);
PropertyResolver resolver = PropertyResolverUtils.toPropertyResolver(options,
defaultOptions);
SshClient client = setupClient(
resolver, ciphers, macs, compressions, identities,
stdin, stdout, stderr, level, args);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
public class NoFlowControl extends AbstractKexExtensionParser<String> {
public static final String NAME = "no-flow-control";

public static final String SUPPORTED = "s";
public static final String PREFERRED = "p";

public static final NoFlowControl INSTANCE = new NoFlowControl();

public NoFlowControl() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ public void consume(long len) throws IOException {
BufferUtils.validateUint32Value(len, "Invalid consumption length: %d");
checkInitialized("consume");

if (noFlowControl) {
// flow control is disabled, so just bail out
return;
}

long remainLen;
synchronized (lock) {
remainLen = getSize() - len;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ public void consume(long len) {
BufferUtils.validateUint32Value(len, "Invalid consumption length: %d");
checkInitialized("consume");

if (noFlowControl) {
// flow control is disabled, so just bail out
return;
}

long remainLen;
synchronized (lock) {
remainLen = getSize() - len;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.function.Predicate;

import org.apache.sshd.common.PropertyResolver;
import org.apache.sshd.common.session.Session;
import org.apache.sshd.common.util.ValidateUtils;
import org.apache.sshd.common.util.buffer.BufferUtils;
import org.apache.sshd.common.util.logging.AbstractLoggingBean;
Expand All @@ -46,6 +47,8 @@ public abstract class Window extends AbstractLoggingBean implements ChannelHolde

protected final Object lock = new Object();

protected boolean noFlowControl;

private final AtomicBoolean closed = new AtomicBoolean(false);
private final AtomicBoolean initialized = new AtomicBoolean(false);
private final Channel channelInstance;
Expand Down Expand Up @@ -94,6 +97,8 @@ protected void init(long size, long packetSize, PropertyResolver resolver) {
}

synchronized (lock) {
Session session = channelInstance.getSession(); // this should only be null during tests
this.noFlowControl = session != null && session.isNoFlowControl();
this.maxSize = size;
this.packetSize = packetSize;
updateSize(size);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,26 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.function.BiConsumer;

import org.apache.sshd.common.AttributeRepository.AttributeKey;
import org.apache.sshd.common.NamedFactory;
import org.apache.sshd.common.kex.extension.parser.HostBoundPubkeyAuthentication;
import org.apache.sshd.common.kex.extension.parser.NoFlowControl;
import org.apache.sshd.common.kex.extension.parser.ServerSignatureAlgorithms;
import org.apache.sshd.common.session.Session;
import org.apache.sshd.common.session.helpers.AbstractSession;
import org.apache.sshd.common.signature.Signature;
import org.apache.sshd.common.util.ValidateUtils;
import org.apache.sshd.common.util.buffer.Buffer;
import org.apache.sshd.common.util.logging.AbstractLoggingBean;
import org.apache.sshd.core.CoreModuleProperties;

/**
* Detects if the server sends a
Expand Down Expand Up @@ -88,6 +97,15 @@ public boolean handleKexExtensionRequest(
} else {
session.setAttribute(HOSTBOUND_AUTHENTICATION, version);
}
} else if (NoFlowControl.NAME.equals(name)) {
String o = NoFlowControl.INSTANCE.parseExtension(data);
Optional<Boolean> nfc = CoreModuleProperties.NO_FLOW_CONTROL.get(session);
if (NoFlowControl.PREFERRED.equals(o) && nfc.orElse(Boolean.TRUE)
|| NoFlowControl.SUPPORTED.equals(o) && nfc.orElse(Boolean.FALSE)) {
AbstractSession abstractSession
= ValidateUtils.checkInstanceOf(session, AbstractSession.class, "Not a supported session: %s", session);
abstractSession.activateNoFlowControl();
}
}
return true;
}
Expand Down Expand Up @@ -133,4 +151,39 @@ protected void handleServerSignatureAlgorithms(Session session, Collection<Strin
session.setSignatureFactories(clientAlgorithms);
}
}

@Override
public void sendKexExtensions(Session session, KexPhase phase) throws Exception {
Map<String, Object> extensions = new LinkedHashMap<>();
collectExtensions(session, phase, extensions::put);
if (!extensions.isEmpty()) {
Buffer buffer = session.createBuffer(KexExtensions.SSH_MSG_EXT_INFO);
KexExtensions.putExtensions(extensions.entrySet(), buffer);
if (log.isDebugEnabled()) {
log.debug("sendKexExtensions({})[{}]: sending SSH_MSG_EXT_INFO with {} info records", session, phase,
extensions.size());
}
session.writePacket(buffer);
}
}

/**
* Collects extension info records, handing them off to the given {@code marshaller} for writing into an
* {@link KexExtensions#SSH_MSG_EXT_INFO} message.
* <p>
* This default implementation marshals a {@link NoFlowControl} extension}.
* </p>
*
* @param session {@link Session} to send the KEX extension information for
* @param phase {@link KexPhase} of the SSH protocol
* @param marshaller {@link BiConsumer} writing the extensions into an SSH message
*/
public void collectExtensions(Session session, KexPhase phase, BiConsumer<String, Object> marshaller) {
// no-flow-control
Boolean nfc = CoreModuleProperties.NO_FLOW_CONTROL.get(session).orElse(null);
if (nfc == null || nfc) {
marshaller.accept(NoFlowControl.NAME,
nfc != null ? NoFlowControl.PREFERRED : NoFlowControl.SUPPORTED);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,29 @@

package org.apache.sshd.common.kex.extension;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
import java.util.function.BiConsumer;

import org.apache.sshd.common.AttributeRepository.AttributeKey;
import org.apache.sshd.common.kex.KexProposalOption;
import org.apache.sshd.common.kex.extension.parser.NoFlowControl;
import org.apache.sshd.common.kex.extension.parser.ServerSignatureAlgorithms;
import org.apache.sshd.common.session.Session;
import org.apache.sshd.common.session.helpers.AbstractSession;
import org.apache.sshd.common.util.GenericUtils;
import org.apache.sshd.common.util.ValidateUtils;
import org.apache.sshd.common.util.buffer.Buffer;
import org.apache.sshd.common.util.logging.AbstractLoggingBean;
import org.apache.sshd.core.CoreModuleProperties;

/**
* A basic default implementation of a server-side {@link KexExtensionHandler} handling the
* {@link ServerSignatureAlgorithms} KEX extension.
* {@link ServerSignatureAlgorithms} KEX extension along with the {@link NoFlowControl} extension.
*
* @see <a href="https://tools.ietf.org/html/rfc8308">RFC 8308</a>
*/
Expand Down Expand Up @@ -130,6 +136,23 @@ public void sendKexExtensions(Session session, KexPhase phase) throws Exception
}
}

@Override
public boolean handleKexExtensionRequest(
Session session, int index, int count, String name, byte[] data)
throws IOException {
if (NoFlowControl.NAME.equals(name)) {
String o = NoFlowControl.INSTANCE.parseExtension(data);
Optional<Boolean> nfc = CoreModuleProperties.NO_FLOW_CONTROL.get(session);
if (NoFlowControl.PREFERRED.equals(o) && nfc.orElse(Boolean.TRUE)
|| NoFlowControl.SUPPORTED.equals(o) && nfc.orElse(Boolean.FALSE)) {
AbstractSession abstractSession
= ValidateUtils.checkInstanceOf(session, AbstractSession.class, "Not a supported session: %s", session);
abstractSession.activateNoFlowControl();
}
}
return true;
}

/**
* Collects extension info records, handing them off to the given {@code marshaller} for writing into an
* {@link KexExtensions#SSH_MSG_EXT_INFO} message.
Expand All @@ -144,6 +167,7 @@ public void sendKexExtensions(Session session, KexPhase phase) throws Exception
*/
@SuppressWarnings("javadoc")
public void collectExtensions(Session session, KexPhase phase, BiConsumer<String, Object> marshaller) {
// server-sig-algs
if (phase == KexPhase.NEWKEYS) {
Collection<String> algorithms = session.getSignatureFactoriesNames();
if (!GenericUtils.isEmpty(algorithms)) {
Expand All @@ -157,5 +181,11 @@ public void collectExtensions(Session session, KexPhase phase, BiConsumer<String
ServerSignatureAlgorithms.NAME);
}
}
// no-flow-control
Boolean nfc = CoreModuleProperties.NO_FLOW_CONTROL.get(session).orElse(null);
if (nfc != Boolean.FALSE) {
marshaller.accept(NoFlowControl.NAME,
nfc != null ? NoFlowControl.PREFERRED : NoFlowControl.SUPPORTED);
}
}
}
Loading