Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ dokka = "2.1.0"
hiddenapibypass = "6.1"
conscrypt-java = "2.5.2"
compose = "1.9.3"
junixsocket = "2.10.1"

[libraries]

Expand All @@ -37,3 +38,4 @@ okio = { module = "com.squareup.okio:okio", version.ref = "okio" }
documentfile = { group = "androidx.documentfile", name = "documentfile", version.ref = "documentfile" }
hiddenapibypass = { module = "org.lsposed.hiddenapibypass:hiddenapibypass", version.ref = "hiddenapibypass" }
conscrypt-java = { module = "org.conscrypt:conscrypt-openjdk-uber", version.ref = "conscrypt-java" }
junixsocket-core = { module = "com.kohlschutter.junixsocket:junixsocket-core", version.ref = "junixsocket" }
1 change: 1 addition & 0 deletions kadb-test-app/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
bin/
1 change: 1 addition & 0 deletions kadb/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
build/
1 change: 1 addition & 0 deletions kadb/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ kotlin {
jvmMain.dependencies {
implementation(libs.spake2)
implementation(libs.jmdns)
implementation(libs.junixsocket.core)
}

commonTest.dependencies {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.flyfishxu.kadb.forwarding

import android.net.LocalServerSocket
import android.net.LocalSocket
import okio.BufferedSink
import okio.Source
import okio.buffer
import okio.sink
import okio.source

internal actual class LocalAbstractServer actual constructor(name: String) : AutoCloseable {
private val server = LocalServerSocket(name)

actual fun accept(): LocalAbstractClient = LocalAbstractClient(server.accept())

actual override fun close() {
runCatching { server.close() }
}
}

internal actual class LocalAbstractClient internal constructor(
private val socket: LocalSocket
) : AutoCloseable {

actual val source: Source = socket.inputStream.source()
actual val sink: BufferedSink = socket.outputStream.sink().buffer()

actual override fun close() {
runCatching { socket.close() }
}
}
20 changes: 19 additions & 1 deletion kadb/src/commonMain/kotlin/com/flyfishxu/kadb/Kadb.kt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package com.flyfishxu.kadb
import com.flyfishxu.kadb.cert.CertUtils.loadKeyPair
import com.flyfishxu.kadb.cert.platform.defaultDeviceName
import com.flyfishxu.kadb.core.AdbConnection
import com.flyfishxu.kadb.forwarding.LocalAbstractForwarder
import com.flyfishxu.kadb.forwarding.TcpForwarder
import com.flyfishxu.kadb.pair.PairingConnectionCtx
import com.flyfishxu.kadb.shell.AdbShellResponse
Expand Down Expand Up @@ -191,7 +192,24 @@ class Kadb(

@Throws(InterruptedException::class)
fun tcpForward(hostPort: Int, targetPort: Int): AutoCloseable {
val forwarder = TcpForwarder(this, hostPort, targetPort)
return tcpForward(hostPort, "tcp:$targetPort")
}

@Throws(InterruptedException::class)
fun tcpForward(hostPort: Int, remote: String): AutoCloseable {
val forwarder = TcpForwarder(this, hostPort, remote)
forwarder.start()
return forwarder
}

/**
* Forward a local abstract unix domain socket (host side) to a remote ADB destination.
*
* Example: `localabstract:my-sock` -> `tcp:7001`
*/
@Throws(InterruptedException::class)
fun localAbstractForward(localName: String, remote: String): AutoCloseable {
val forwarder = LocalAbstractForwarder(this, localName, remote)
forwarder.start()
return forwarder
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
package com.flyfishxu.kadb.forwarding

import com.flyfishxu.kadb.Kadb
import com.flyfishxu.kadb.debug.log
import com.flyfishxu.kadb.stream.AdbStream
import okio.BufferedSink
import okio.Source
import java.io.IOException
import java.io.InterruptedIOException
import java.net.SocketException
import java.util.concurrent.CountDownLatch
import java.util.concurrent.ExecutorService
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeoutException
import kotlin.concurrent.thread

internal interface ForwardingClient : AutoCloseable {
val source: Source
val sink: BufferedSink
override fun close()
}

internal interface ForwardingServer : AutoCloseable {
fun accept(): ForwardingClient
override fun close()
}

internal interface ForwardingDuplex : AutoCloseable {
val source: Source
val sink: BufferedSink
override fun close()
}

internal object StreamForwardStrategy {
fun transfer(source: Source, sink: BufferedSink) {
try {
while (!Thread.currentThread().isInterrupted) {
try {
if (source.read(sink.buffer, 256) >= 0) {
sink.flush()
} else {
return
}
} catch (_: IOException) {
return
}
}
} catch (_: InterruptedException) {
// Do nothing
} catch (_: InterruptedIOException) {
// Do nothing
}
}
}

internal abstract class BaseForwarder(
private val kadb: Kadb,
private val remoteDestination: String,
private val endpointDescription: String,
private val forwardingType: String,
private val remoteChannelFactory: (String) -> ForwardingDuplex = { destination ->
AdbForwardingDuplex(kadb.open(destination))
},
) : AutoCloseable {

private var serverThread: Thread? = null
private var server: ForwardingServer? = null
private var clientExecutor: ExecutorService? = null
private val stateController = ForwarderStateController(endpointDescription)

fun start() {
stateController.prepareStart()

clientExecutor = createClientExecutor()
serverThread = thread {
try {
handleForwarding()
} catch (_: SocketException) {
// Do nothing
} catch (e: IOException) {
log { "could not start $forwardingType port forwarding: ${e.message}" }
} catch (e: Throwable) {
log { "could not start $forwardingType port forwarding: ${e.message}" }
} finally {
stateController.markStopped()
}
}

stateController.awaitStarted()
}

private fun handleForwarding() {
val serverRef = createServer()
server = serverRef

stateController.markStarted()

while (!Thread.currentThread().isInterrupted) {
val client = serverRef.accept()

clientExecutor?.execute {
try {
handleClient(client)
} catch (e: Throwable) {
log { "Forwarder client handler failed: ${e.message}" }
runCatching { client.close() }
}
}
}
}

private fun handleClient(client: ForwardingClient) {
val remoteChannel = remoteChannelFactory(remoteDestination)

val readerThread = thread {
StreamForwardStrategy.transfer(client.source, remoteChannel.sink)
}

try {
StreamForwardStrategy.transfer(remoteChannel.source, client.sink)
} finally {
runCatching { remoteChannel.close() }
runCatching { client.close() }
readerThread.interrupt()
}
}

protected abstract fun createServer(): ForwardingServer

override fun close() {
val shouldStop = stateController.shouldStop()
if (!shouldStop) return

stateController.awaitStartedOrStopped()
stateController.markStopping()

server?.close()
server = null
serverThread?.interrupt()
serverThread = null
clientExecutor?.shutdown()
clientExecutor?.awaitTermination(5, TimeUnit.SECONDS)
clientExecutor = null

stateController.awaitStopped()
}

private fun createClientExecutor(): ExecutorService {
return ThreadPoolExecutor(
1,
MAX_CLIENT_THREADS,
60,
TimeUnit.SECONDS,
LinkedBlockingQueue(MAX_CLIENT_QUEUE_SIZE),
ThreadPoolExecutor.CallerRunsPolicy(),
)
}

private companion object {
const val MAX_CLIENT_THREADS = 32
const val MAX_CLIENT_QUEUE_SIZE = 256
}
}

private class AdbForwardingDuplex(
private val stream: AdbStream,
) : ForwardingDuplex {
override val source: Source = stream.source
override val sink: BufferedSink = stream.sink

override fun close() {
stream.close()
}
}

private class ForwarderStateController(
private val endpointDescription: String,
) {
@Volatile
private var state: State = State.STOPPED
private val lock = Any()
private var startedSignal = CountDownLatch(0)
private var stoppedSignal = CountDownLatch(0)

fun prepareStart() {
synchronized(lock) {
check(state == State.STOPPED) { "Forwarder is already started at $endpointDescription" }
startedSignal = CountDownLatch(1)
stoppedSignal = CountDownLatch(1)
state = State.STARTING
}
}

fun shouldStop(): Boolean = synchronized(lock) {
state != State.STOPPED && state != State.STOPPING
}

fun markStarted() {
state = State.STARTED
startedSignal.countDown()
}

fun markStopping() {
state = State.STOPPING
}

fun markStopped() {
state = State.STOPPED
stoppedSignal.countDown()
}

fun awaitStarted() {
awaitStateTransition(startedSignal, "start")
}

fun awaitStartedOrStopped() {
if (state == State.STOPPED) return
awaitStarted()
}

fun awaitStopped() {
awaitStateTransition(stoppedSignal, "stop")
}

private enum class State {
STARTING, STARTED, STOPPING, STOPPED
}

private fun awaitStateTransition(signal: CountDownLatch, action: String) {
if (!signal.await(5, TimeUnit.SECONDS)) {
throw TimeoutException("Timed out waiting for forwarder to $action")
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package com.flyfishxu.kadb.forwarding

import com.flyfishxu.kadb.Kadb

internal class LocalAbstractForwarder(
private val kadb: Kadb,
private val localName: String,
private val remote: String,
) : BaseForwarder(
kadb = kadb,
remoteDestination = remote,
endpointDescription = "localabstract:$localName",
forwardingType = "localabstract",
) {

override fun createServer(): ForwardingServer = LocalAbstractServerAdapter(localName)

private class LocalAbstractServerAdapter(name: String) : ForwardingServer {
private val delegate = LocalAbstractServer(name)

override fun accept(): ForwardingClient = LocalAbstractClientAdapter(delegate.accept())

override fun close() {
delegate.close()
}
}

private class LocalAbstractClientAdapter(
private val delegate: LocalAbstractClient,
) : ForwardingClient {
override val source = delegate.source
override val sink = delegate.sink

override fun close() {
delegate.close()
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.flyfishxu.kadb.forwarding

import okio.BufferedSink
import okio.Source

/**
* Platform-specific local abstract unix domain socket listener.
*
* Mirrors adb's `localabstract:<name>` behavior for the local endpoint.
*/
internal expect class LocalAbstractServer(name: String) : AutoCloseable {
fun accept(): LocalAbstractClient
override fun close()
}

internal expect class LocalAbstractClient : AutoCloseable {
val source: Source
val sink: BufferedSink
override fun close()
}
Loading