Skip to content
Merged
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@

## [4.1.2] - TBD
### Changed
- Lazy loading database mappping create to avoid doing work when tcp connections are being opened (e.g. from LoadBalancers).
- Removed unnecessary logging in invocation logsfor setConf (it's a local call not a federated call).

## [4.1.1] - 2025-10-27
### Added
- Update Glue libs. See `libs/HOW_TO_INSTALL_MD`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,21 +32,27 @@
public class ExceptionWrappingHMSHandler implements InvocationHandler {
private static Logger log = LoggerFactory.getLogger(ExceptionWrappingHMSHandler.class);

private final CloseableIHMSHandler baseHandler;
private final FederatedHMSHandlerFactory federatedHMSHandlerFactory;
private CloseableIHMSHandler baseHandler;
private String user = "";

public static CloseableIHMSHandler newProxyInstance(CloseableIHMSHandler baseHandler) {

public static CloseableIHMSHandler newProxyInstance(FederatedHMSHandlerFactory federatedHMSHandlerFactory) {
return (CloseableIHMSHandler) Proxy
.newProxyInstance(ExceptionWrappingHMSHandler.class.getClassLoader(),
new Class[] { CloseableIHMSHandler.class }, new ExceptionWrappingHMSHandler(baseHandler));
new Class[] { CloseableIHMSHandler.class }, new ExceptionWrappingHMSHandler(federatedHMSHandlerFactory));
}

public ExceptionWrappingHMSHandler(CloseableIHMSHandler baseHandler) {
this.baseHandler = baseHandler;
public ExceptionWrappingHMSHandler(FederatedHMSHandlerFactory federatedHMSHandlerFactory) {
this.baseHandler = null;
this.federatedHMSHandlerFactory = federatedHMSHandlerFactory;
}

@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
if (baseHandler == null) {
baseHandler = federatedHMSHandlerFactory.create();
}
if (method.getName().equals("set_ugi")) {
user = (String) args[0];
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1556,13 +1556,11 @@ public fb_status getStatus() {
}

@Override
@Loggable(value = Loggable.DEBUG, skipResult = true, name = INVOCATION_LOG_NAME)
public Configuration getConf() {
return conf;
}

@Override
@Loggable(value = Loggable.DEBUG, skipResult = true, name = INVOCATION_LOG_NAME)
public void setConf(Configuration conf) {
this.conf = conf;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,22 +54,18 @@ public TProcessor getProcessor(TTransport transport) {
Socket socket = ((TSocket) transport).getSocket();
log.debug("Received a connection from ip: {}", socket.getInetAddress().getHostAddress());
}
CloseableIHMSHandler baseHandler = federatedHMSHandlerFactory.create();
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This gets created when a tcp connection is made (LoadBalancer might do this for livelyness check), the handles won't get invoked until a thrift client does a real call, so it makes sense to defer the creation of this when get a real invocation.
This will reduce the clutter in the logs where it looks like we are getting a lot of calls even if we aren't.

CloseableIHMSHandler handler = ExceptionWrappingHMSHandler.newProxyInstance(federatedHMSHandlerFactory);
boolean useSASL = hiveConf.getBoolVar(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL);
if (useSASL) {
try {
baseHandler.getStatus();
handler.getStatus();
} catch (TException e) {
throw new RuntimeException("Error creating TProcessor. Could not get status.", e);
}
CloseableIHMSHandler handler = ExceptionWrappingHMSHandler.newProxyInstance(baseHandler);
transportMonitor.monitor(transport, baseHandler);
return new TSetIpAddressProcessor<>(handler);
} else {
CloseableIHMSHandler handler = ExceptionWrappingHMSHandler.newProxyInstance(baseHandler);
transportMonitor.monitor(transport, baseHandler);
return new TSetIpAddressProcessor<>(handler);
}
transportMonitor.monitor(transport, handler);
TSetIpAddressProcessor<CloseableIHMSHandler> result = new TSetIpAddressProcessor<>(handler);
return result;
} catch (ReflectiveOperationException | RuntimeException e) {
throw new RuntimeException("Error creating TProcessor", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
Expand All @@ -32,39 +33,53 @@
public class ExceptionWrappingHMSHandlerTest {

private @Mock CloseableIHMSHandler baseHandler;
private @Mock FederatedHMSHandlerFactory factory;

@Before
public void setup() throws Exception {
when(factory.create()).thenReturn(baseHandler);
}

@Test
public void get_databaseNoExceptions() throws Exception {
CloseableIHMSHandler handler = ExceptionWrappingHMSHandler.newProxyInstance(baseHandler);
CloseableIHMSHandler handler = ExceptionWrappingHMSHandler.newProxyInstance(factory);
handler.get_database("bdp");
verify(baseHandler).get_database("bdp");
}

@Test
public void get_databaseWaggleDanceServerException() throws Exception {
CloseableIHMSHandler handler = ExceptionWrappingHMSHandler.newProxyInstance(baseHandler);
CloseableIHMSHandler handler = ExceptionWrappingHMSHandler.newProxyInstance(factory);
when(baseHandler.get_database("bdp")).thenThrow(new WaggleDanceServerException("waggle waggle!"));
assertThrows(MetaException.class, () -> { handler.get_database("bdp");});
assertThrows(MetaException.class, () -> {
handler.get_database("bdp");
});
}

@Test
public void get_databasNotAllowedException() throws Exception {
CloseableIHMSHandler handler = ExceptionWrappingHMSHandler.newProxyInstance(baseHandler);
CloseableIHMSHandler handler = ExceptionWrappingHMSHandler.newProxyInstance(factory);
when(baseHandler.get_database("bdp")).thenThrow(new NotAllowedException("waggle waggle!"));
assertThrows(MetaException.class, () -> { handler.get_database("bdp");});
assertThrows(MetaException.class, () -> {
handler.get_database("bdp");
});
}

@Test
public void get_databaseRunTimeExceptionIsNotWrapped() throws Exception {
CloseableIHMSHandler handler = ExceptionWrappingHMSHandler.newProxyInstance(baseHandler);
CloseableIHMSHandler handler = ExceptionWrappingHMSHandler.newProxyInstance(factory);
when(baseHandler.get_database("bdp")).thenThrow(new RuntimeException("generic non waggle dance exception"));
assertThrows("generic non waggle dance exception",RuntimeException.class, () -> { handler.get_database("bdp");});
assertThrows("generic non waggle dance exception", RuntimeException.class, () -> {
handler.get_database("bdp");
});
}

@Test
public void get_databaseCheckedExceptionIsNotWrapped() throws Exception {
CloseableIHMSHandler handler = ExceptionWrappingHMSHandler.newProxyInstance(baseHandler);
CloseableIHMSHandler handler = ExceptionWrappingHMSHandler.newProxyInstance(factory);
when(baseHandler.get_database("bdp")).thenThrow(new NoSuchObjectException("Does not exist!"));
assertThrows("Does not exist!",NoSuchObjectException.class, () -> { handler.get_database("bdp");});
assertThrows("Does not exist!", NoSuchObjectException.class, () -> {
handler.get_database("bdp");
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/
package com.hotels.bdp.waggledance.server;

import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Mockito.verify;
Expand Down Expand Up @@ -66,7 +65,8 @@ public void connectionIsMonitored() throws Exception {
ArgumentCaptor<Closeable> handlerCaptor = ArgumentCaptor.forClass(Closeable.class);
verify(transportMonitor).monitor(transportCaptor.capture(), handlerCaptor.capture());
assertThat(transportCaptor.getValue(), is(transport));
assertThat(handlerCaptor.getValue(), is(instanceOf(FederatedHMSHandler.class)));
handlerCaptor.getValue().close();
verify(federatedHMSHandler).close();
}

@Test
Expand All @@ -78,6 +78,7 @@ public void connectionIsMonitoredSasl() throws Exception {
ArgumentCaptor<Closeable> handlerCaptor = ArgumentCaptor.forClass(Closeable.class);
verify(transportMonitor).monitor(transportCaptor.capture(), handlerCaptor.capture());
assertThat(transportCaptor.getValue(), is(transport));
assertThat(handlerCaptor.getValue(), is(instanceOf(FederatedHMSHandler.class)));
handlerCaptor.getValue().close();
verify(federatedHMSHandler).close();
}
}
Loading