Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
--
-- Add supervisor_endpoint table to maintain supervisor agent info, this table will bind to resource_resource
--
CREATE TABLE `supervisor_endpoint` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`host` varchar(256) NOT NULL COMMENT 'host of supervisor',
`port` int(11) NOT NULL COMMENT 'port of supervisor',
`status` varchar(64) NOT NULL COMMENT 'status of supervisor, contains PREPAREING,AVAILABLE,DESTROYED,UNAVAILABLE,ABANDON',
`loads` int(11) NOT NULL COMMENT 'load of supervisor',
`resource_id` bigint(20) NOT NULL COMMENT 'resource id related to resource_resource table, -1 means not related',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'create time',
`update_time` datetime DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'update time',
PRIMARY KEY (`id`),
UNIQUE KEY host_and_port (`host`, `port`),
KEY `status_index` (`status`)
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
--
-- Add resource_allocate_info table to maintain task resource allocate info
--
CREATE TABLE `resource_allocate_info` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`task_id` bigint(20) NOT NULL COMMENT 'task id allocate to this allocate info',
`resource_allocate_state` varchar(20) NOT NULL COMMENT 'resource allocate state, update by resource allocator, including PREPARING, AVAILABLE, FAILED, FINISHED',
`resource_usage_state` varchar(20) NOT NULL COMMENT 'resource usage state update by resource user, including PREPARING, USING, FINISHED',
`endpoint` varchar(512) DEFAULT NULL COMMENT 'supervisor endpoint, in format host:port',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'create time',
`update_time` datetime DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'update time',
PRIMARY KEY (`id`),
UNIQUE KEY `task_index` (`task_id`),
INDEX `usage_state_index` (`resource_usage_state`, `resource_allocate_state`),
INDEX `allocate_state_index` (`resource_allocate_state`, `resource_usage_state`)
);
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,6 @@ public static void main(String[] args) {
log.error("Task existed abnormal", e);
}
log.info("Task executor exit.");
System.exit(0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,48 +15,14 @@
*/
package com.oceanbase.odc.agent.runtime;

import java.net.InetSocketAddress;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.oceanbase.odc.common.json.JsonUtils;
import com.oceanbase.odc.common.util.StringUtils;
import com.oceanbase.odc.service.common.util.UrlUtils;
import com.oceanbase.odc.service.task.executor.TraceDecoratorThreadFactory;
import com.oceanbase.odc.service.common.response.SuccessResponse;
import com.oceanbase.odc.service.task.executor.TraceDecoratorUtils;
import com.oceanbase.odc.service.task.net.HttpServerContainer;
import com.oceanbase.odc.service.task.net.RequestHandler;
import com.oceanbase.odc.service.task.util.JobUtils;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;

/**
Expand All @@ -65,186 +31,36 @@
* @since 4.2.4
*/
@Slf4j
class EmbedServer {

private ExecutorRequestHandler requestHandler;
private Thread thread;

public void start() {
requestHandler = new ExecutorRequestHandler();
thread = new Thread(TraceDecoratorUtils.decorate(new Runnable() {
@Override
public void run() {
// param
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ThreadPoolExecutor bizThreadPool = new ThreadPoolExecutor(
0,
128,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(64),
new TraceDecoratorThreadFactory(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "odc-job, EmbedServer bizThreadPool-" + r.hashCode());
}
}),
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
throw new RuntimeException("odc-job, EmbedServer bizThreadPool is EXHAUSTED!");
}
});
try {
// start server
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel channel) throws Exception {
channel.pipeline()
.addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS)) // beat 3N,
// close if
// idle
.addLast(new HttpServerCodec())
.addLast(new HttpObjectAggregator(5 * 1024 * 1024)) // merge request &
// reponse to FULL
.addLast(new EmbedHttpServerHandler(requestHandler, bizThreadPool));
}
})
.childOption(ChannelOption.SO_KEEPALIVE, true);

ChannelFuture future;
int port;
if (JobUtils.getExecutorPort().isPresent()) {
// start with assigned port
future = bootstrap.bind(JobUtils.getExecutorPort().get()).sync();
port = JobUtils.getExecutorPort().get();
} else {
// start with random port
future = bootstrap.bind(0).sync();
InetSocketAddress localAddress = (InetSocketAddress) future.channel().localAddress();
// save port to system properties
JobUtils.setExecutorPort(localAddress.getPort());
port = localAddress.getPort();
}
log.info("odc-job remoting server start success, nettype = {}, port = {}",
EmbedServer.class, port);

// wait util stop
future.channel().closeFuture().sync();

} catch (InterruptedException e) {
log.info("odc-job remoting server stop.");
} catch (Exception e) {
log.error("odc-job remoting server error.", e);
} finally {
// stop
try {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
}
}));
thread.setDaemon(true); // daemon, service jvm, user thread leave >>> daemon leave >>> jvm leave
thread.start();
}

public void stop() throws Exception {
// destroy server thread
if (thread != null && thread.isAlive()) {
thread.interrupt();
class EmbedServer extends HttpServerContainer<SuccessResponse<Object>> {
@Override
protected int getPort() {
int port;
if (JobUtils.getExecutorPort().isPresent()) {
// start with assigned port
port = JobUtils.getExecutorPort().get();
} else {
port = 0;
}

log.info("odc-job remoting server destroy success.");
return port;
}

@Override
protected RequestHandler<SuccessResponse<Object>> getRequestHandler() {
return new ExecutorRequestHandler();
}


public static class EmbedHttpServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
private static final Logger logger = LoggerFactory.getLogger(EmbedHttpServerHandler.class);

private final ThreadPoolExecutor bizThreadPool;
private final ExecutorRequestHandler requestHandler;

public EmbedHttpServerHandler(ExecutorRequestHandler executorRequestHandler, ThreadPoolExecutor bizThreadPool) {
this.requestHandler = executorRequestHandler;
this.bizThreadPool = bizThreadPool;
}

@Override
protected void channelRead0(final ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {
// request parse
// final byte[] requestBytes = ByteBufUtil.getBytes(msg.content()); //
// byteBuf.toString(io.netty.util.CharsetUtil.UTF_8);
String requestData = msg.content().toString(CharsetUtil.UTF_8);
String uri = UrlUtils.decode(msg.uri());
HttpMethod httpMethod = msg.method();
boolean keepAlive = HttpUtil.isKeepAlive(msg);
if (StringUtils.isNotBlank(uri)) {
logger.info("odc-job get uri {}", uri);
}
if (StringUtils.isNotBlank(requestData)) {
logger.info("odc-job get requestData {}", requestData);
}

// invoke
bizThreadPool.execute(TraceDecoratorUtils.decorate(new Runnable() {
@Override
public void run() {
// do invoke
Object responseObj = requestHandler.process(httpMethod, uri, requestData);

// to json
String responseJson = JsonUtils.toJson(responseObj);

// write response
writeResponse(ctx, keepAlive, responseJson);
}
}));
}

/**
* write response
*/
private void writeResponse(ChannelHandlerContext ctx, boolean keepAlive, String responseJson) {
// write response
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK,
Unpooled.copiedBuffer(responseJson, CharsetUtil.UTF_8));
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "application/json");
response.headers().set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
if (keepAlive) {
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
}
ctx.writeAndFlush(response);
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
logger.error("odc-job provider netty_http server caught exception", cause);
ctx.close();
}

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
ctx.channel().close(); // beat 3N, close if idle
logger.debug("odc-job provider netty_http server close an idle channel.");
} else {
super.userEventTriggered(ctx, evt);
}
}
@Override
protected String getModuleName() {
return "odc-job";
}

@Override
protected Thread createThread(Runnable r) {
return new Thread(TraceDecoratorUtils.decorate(r));
}

@Override
protected Consumer<Integer> portConsumer() {
return JobUtils::setExecutorPort;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.oceanbase.odc.service.task.executor.logger.LogBiz;
import com.oceanbase.odc.service.task.executor.logger.LogBizImpl;
import com.oceanbase.odc.service.task.executor.logger.LogUtils;
import com.oceanbase.odc.service.task.net.RequestHandler;
import com.oceanbase.odc.service.task.schedule.JobIdentity;
import com.oceanbase.odc.service.task.util.JobUtils;

Expand All @@ -42,7 +43,7 @@
* @since 4.2.4
*/
@Slf4j
class ExecutorRequestHandler {
class ExecutorRequestHandler implements RequestHandler<SuccessResponse<Object>> {

private final Pattern queryLogUrlPattern = Pattern.compile(String.format(JobExecutorUrls.QUERY_LOG, "([0-9]+)"));
private final Pattern stopTaskPattern = Pattern.compile(String.format(JobExecutorUrls.STOP_TASK, "([0-9]+)"));
Expand Down Expand Up @@ -86,7 +87,7 @@ public SuccessResponse<Object> process(HttpMethod httpMethod, String uri, String
if (matcher.find()) {
JobIdentity ji = getJobIdentity(matcher);
TaskRuntimeInfo runtimeInfo = ThreadPoolTaskExecutor.getInstance().getTaskRuntimeInfo(ji);
boolean result = runtimeInfo.getTaskContainer().modify(JobUtils.fromJsonToMap(requestData));
boolean result = runtimeInfo.getTaskContainer().modifyTask(JobUtils.fromJsonToMap(requestData));
return Responses.ok(result);
}

Expand All @@ -109,11 +110,16 @@ public SuccessResponse<Object> process(HttpMethod httpMethod, String uri, String

return Responses.single("invalid request, uri-mapping(" + uri + ") not found.");
} catch (Exception e) {
log.error(e.getMessage(), e);
return Responses.single("request error:" + ExceptionUtils.getRootCauseReason(e));
return processException(e);
}
}

@Override
public SuccessResponse<Object> processException(Throwable e) {
log.error(e.getMessage(), e);
return Responses.single("request error:" + ExceptionUtils.getRootCauseReason(e));
}

private static JobIdentity getJobIdentity(Matcher matcher) {
return JobIdentity.of(Long.parseLong(matcher.group(1)));
}
Expand Down
Loading