diff --git a/pom.xml b/pom.xml
index da6e146..913dbfb 100644
--- a/pom.xml
+++ b/pom.xml
@@ -24,9 +24,9 @@
shunting-yard-common
shunting-yard-emitter
- shunting-yard-receiver
+
diff --git a/shunting-yard-common/derby.log b/shunting-yard-common/derby.log
new file mode 100644
index 0000000..72eb23b
--- /dev/null
+++ b/shunting-yard-common/derby.log
@@ -0,0 +1,13 @@
+----------------------------------------------------------------
+Fri Jul 26 13:27:25 BST 2019:
+Booting Derby version The Apache Software Foundation - Apache Derby - 10.10.2.0 - (1582446): instance a816c00e-016c-2e40-0e8a-00000a772ec8
+on database directory /Users/abhgupta/Desktop/workspace/shunting-yard/shunting-yard-common/metastore_db with class loader sun.misc.Launcher$AppClassLoader@3b192d32
+Loaded from file:/Users/abhgupta/.m2/repository/org/apache/derby/derby/10.10.2.0/derby-10.10.2.0.jar
+java.vendor=AdoptOpenJDK
+java.runtime.version=1.8.0_212-b03
+user.dir=/Users/abhgupta/Desktop/workspace/shunting-yard/shunting-yard-common
+os.name=Mac OS X
+os.arch=x86_64
+os.version=10.13.6
+derby.system.home=null
+Database Class Loader started - derby.database.classpath=''
diff --git a/shunting-yard-common/pom.xml b/shunting-yard-common/pom.xml
index cbc8a51..07dc7ef 100644
--- a/shunting-yard-common/pom.xml
+++ b/shunting-yard-common/pom.xml
@@ -1,4 +1,5 @@
-
+
4.0.0
@@ -24,12 +25,24 @@
org.apache.hadoop
hadoop-common
+
+
+ asm
+ asm
+
+
org.apache.hive
hive-metastore
+
+
+ asm
+ asm
+
+
org.apache.hive.hcatalog
@@ -40,6 +53,13 @@
hcommon-hive-metastore
+
+
+ com.esotericsoftware
+ kryo
+ 4.0.2
+
+
junit
diff --git a/shunting-yard-common/src/main/java/com/hotels/shunting/yard/common/emitter/AbstractMetaStoreEventListener.java b/shunting-yard-common/src/main/java/com/hotels/shunting/yard/common/emitter/AbstractMetaStoreEventListener.java
index 377417e..fc9e2e4 100644
--- a/shunting-yard-common/src/main/java/com/hotels/shunting/yard/common/emitter/AbstractMetaStoreEventListener.java
+++ b/shunting-yard-common/src/main/java/com/hotels/shunting/yard/common/emitter/AbstractMetaStoreEventListener.java
@@ -1,5 +1,5 @@
/**
- * Copyright (C) 2016-2018 Expedia Inc.
+ * Copyright (C) 2016-2019 Expedia Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -15,7 +15,10 @@
*/
package com.hotels.shunting.yard.common.emitter;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS;
+
import static com.hotels.shunting.yard.common.emitter.EmitterUtils.error;
+import static com.hotels.shunting.yard.common.event.CustomEventParameters.HIVE_VERSION;
import java.util.concurrent.ExecutorService;
@@ -37,11 +40,13 @@
import org.apache.hadoop.hive.metastore.events.DropPartitionEvent;
import org.apache.hadoop.hive.metastore.events.DropTableEvent;
import org.apache.hadoop.hive.metastore.events.InsertEvent;
+import org.apache.hadoop.hive.metastore.events.ListenerEvent;
import org.apache.hadoop.hive.metastore.events.LoadPartitionDoneEvent;
+import org.apache.hive.common.util.HiveVersionInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.hotels.shunting.yard.common.event.SerializableListenerEvent;
+import com.hotels.shunting.yard.common.event.EventType;
import com.hotels.shunting.yard.common.event.SerializableListenerEventFactory;
import com.hotels.shunting.yard.common.io.MetaStoreEventSerDe;
import com.hotels.shunting.yard.common.messaging.Message;
@@ -72,11 +77,13 @@ private MessageTask message(Message message) {
return new WrappingMessageTask(getMessageTaskFactory().newTask(message));
}
- private Message withPayload(SerializableListenerEvent event) throws MetaException {
+ private Message withPayload(ListenerEvent event, String dbName, String tableName, EventType eventType)
+ throws MetaException {
return Message
.builder()
- .database(event.getDatabaseName())
- .table(event.getTableName())
+ .database(dbName)
+ .table(tableName)
+ .eventType(eventType)
.payload(getMetaStoreEventSerDe().marshal(event))
.build();
}
@@ -85,7 +92,12 @@ private Message withPayload(SerializableListenerEvent event) throws MetaExceptio
public void onCreateTable(CreateTableEvent tableEvent) throws MetaException {
log.info("Create table event received");
try {
- executorService.submit(message(withPayload(serializableListenerEventFactory.create(tableEvent))));
+ tableEvent.putParameter(HIVE_VERSION.varname(), HiveVersionInfo.getVersion());
+ tableEvent.putParameter(METASTOREURIS.varname, super.getConf().get(METASTOREURIS.varname));
+
+ executorService
+ .submit(message(withPayload(tableEvent, tableEvent.getTable().getDbName(),
+ tableEvent.getTable().getTableName(), EventType.ON_CREATE_TABLE)));
} catch (Exception e) {
error(e);
}
@@ -95,7 +107,9 @@ public void onCreateTable(CreateTableEvent tableEvent) throws MetaException {
public void onDropTable(DropTableEvent tableEvent) throws MetaException {
log.info("Drop table event received");
try {
- executorService.submit(message(withPayload(serializableListenerEventFactory.create(tableEvent))));
+ executorService
+ .submit(message(withPayload(tableEvent, tableEvent.getTable().getDbName(),
+ tableEvent.getTable().getTableName(), EventType.ON_DROP_TABLE)));
} catch (Exception e) {
error(e);
}
@@ -105,7 +119,9 @@ public void onDropTable(DropTableEvent tableEvent) throws MetaException {
public void onAlterTable(AlterTableEvent tableEvent) throws MetaException {
log.info("Alter table event received");
try {
- executorService.submit(message(withPayload(serializableListenerEventFactory.create(tableEvent))));
+ executorService
+ .submit(message(withPayload(tableEvent, tableEvent.getNewTable().getDbName(),
+ tableEvent.getNewTable().getTableName(), EventType.ON_ALTER_TABLE)));
} catch (Exception e) {
error(e);
}
@@ -115,7 +131,9 @@ public void onAlterTable(AlterTableEvent tableEvent) throws MetaException {
public void onAddPartition(AddPartitionEvent partitionEvent) throws MetaException {
log.info("Add partition event received");
try {
- executorService.submit(message(withPayload(serializableListenerEventFactory.create(partitionEvent))));
+ executorService
+ .submit(message(withPayload(partitionEvent, partitionEvent.getTable().getDbName(),
+ partitionEvent.getTable().getTableName(), EventType.ON_ADD_PARTITION)));
} catch (Exception e) {
error(e);
}
@@ -125,7 +143,9 @@ public void onAddPartition(AddPartitionEvent partitionEvent) throws MetaExceptio
public void onDropPartition(DropPartitionEvent partitionEvent) throws MetaException {
log.info("Drop partition event received");
try {
- executorService.submit(message(withPayload(serializableListenerEventFactory.create(partitionEvent))));
+ executorService
+ .submit(message(withPayload(partitionEvent, partitionEvent.getTable().getDbName(),
+ partitionEvent.getTable().getTableName(), EventType.ON_DROP_PARTITION)));
} catch (Exception e) {
error(e);
}
@@ -135,7 +155,9 @@ public void onDropPartition(DropPartitionEvent partitionEvent) throws MetaExcept
public void onAlterPartition(AlterPartitionEvent partitionEvent) throws MetaException {
log.info("Alter partition event received");
try {
- executorService.submit(message(withPayload(serializableListenerEventFactory.create(partitionEvent))));
+ executorService
+ .submit(message(withPayload(partitionEvent, partitionEvent.getTable().getDbName(),
+ partitionEvent.getTable().getTableName(), EventType.ON_ALTER_PARTITION)));
} catch (Exception e) {
error(e);
}
@@ -145,7 +167,8 @@ public void onAlterPartition(AlterPartitionEvent partitionEvent) throws MetaExce
public void onInsert(InsertEvent insertEvent) throws MetaException {
log.info("Insert event received");
try {
- executorService.submit(message(withPayload(serializableListenerEventFactory.create(insertEvent))));
+ executorService
+ .submit(message(withPayload(insertEvent, insertEvent.getTable(), insertEvent.getDb(), EventType.ON_INSERT)));
} catch (Exception e) {
error(e);
}
diff --git a/shunting-yard-common/src/main/java/com/hotels/shunting/yard/common/event/EventType.java b/shunting-yard-common/src/main/java/com/hotels/shunting/yard/common/event/EventType.java
index 5df4bab..054721e 100644
--- a/shunting-yard-common/src/main/java/com/hotels/shunting/yard/common/event/EventType.java
+++ b/shunting-yard-common/src/main/java/com/hotels/shunting/yard/common/event/EventType.java
@@ -1,5 +1,5 @@
/**
- * Copyright (C) 2016-2018 Expedia Inc.
+ * Copyright (C) 2016-2019 Expedia Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -15,32 +15,41 @@
*/
package com.hotels.shunting.yard.common.event;
+import org.apache.hadoop.hive.metastore.events.AddPartitionEvent;
+import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent;
+import org.apache.hadoop.hive.metastore.events.AlterTableEvent;
+import org.apache.hadoop.hive.metastore.events.CreateTableEvent;
+import org.apache.hadoop.hive.metastore.events.DropPartitionEvent;
+import org.apache.hadoop.hive.metastore.events.DropTableEvent;
+import org.apache.hadoop.hive.metastore.events.InsertEvent;
+import org.apache.hadoop.hive.metastore.events.ListenerEvent;
+
/**
* To make processing event in the receiver easier.
*/
public enum EventType {
- ON_CREATE_TABLE(SerializableCreateTableEvent.class),
- ON_ALTER_TABLE(SerializableAlterTableEvent.class),
- ON_DROP_TABLE(SerializableDropTableEvent.class),
- ON_ADD_PARTITION(SerializableAddPartitionEvent.class),
- ON_ALTER_PARTITION(SerializableAlterPartitionEvent.class),
- ON_DROP_PARTITION(SerializableDropPartitionEvent.class),
- ON_INSERT(SerializableInsertEvent.class);
+ ON_CREATE_TABLE(CreateTableEvent.class),
+ ON_ALTER_TABLE(AlterTableEvent.class),
+ ON_DROP_TABLE(DropTableEvent.class),
+ ON_ADD_PARTITION(AddPartitionEvent.class),
+ ON_ALTER_PARTITION(AlterPartitionEvent.class),
+ ON_DROP_PARTITION(DropPartitionEvent.class),
+ ON_INSERT(InsertEvent.class);
- private final Class extends SerializableListenerEvent> eventClass;
+ private final Class extends ListenerEvent> eventClass;
- private EventType(Class extends SerializableListenerEvent> eventClass) {
+ private EventType(Class extends ListenerEvent> eventClass) {
if (eventClass == null) {
throw new NullPointerException("Parameter eventClass is required");
}
this.eventClass = eventClass;
}
- public Class extends SerializableListenerEvent> eventClass() {
+ public Class extends ListenerEvent> eventClass() {
return eventClass;
}
- public static EventType forClass(Class extends SerializableListenerEvent> clazz) {
+ public static EventType forClass(Class extends ListenerEvent> clazz) {
for (EventType e : values()) {
if (e.eventClass().equals(clazz)) {
return e;
diff --git a/shunting-yard-common/src/main/java/com/hotels/shunting/yard/common/event/ListenerEventFactory.java b/shunting-yard-common/src/main/java/com/hotels/shunting/yard/common/event/ListenerEventFactory.java
new file mode 100644
index 0000000..f31082f
--- /dev/null
+++ b/shunting-yard-common/src/main/java/com/hotels/shunting/yard/common/event/ListenerEventFactory.java
@@ -0,0 +1,75 @@
+/**
+ * Copyright (C) 2016-2019 Expedia Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.hotels.shunting.yard.common.event;
+
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS;
+
+import static com.hotels.shunting.yard.common.event.CustomEventParameters.HIVE_VERSION;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.events.AddPartitionEvent;
+import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent;
+import org.apache.hadoop.hive.metastore.events.AlterTableEvent;
+import org.apache.hadoop.hive.metastore.events.CreateTableEvent;
+import org.apache.hadoop.hive.metastore.events.DropPartitionEvent;
+import org.apache.hadoop.hive.metastore.events.DropTableEvent;
+import org.apache.hadoop.hive.metastore.events.InsertEvent;
+import org.apache.hadoop.hive.metastore.events.ListenerEvent;
+import org.apache.hive.common.util.HiveVersionInfo;
+
+public class ListenerEventFactory {
+
+ private final Configuration config;
+
+ public ListenerEventFactory(Configuration config) {
+ this.config = config;
+ }
+
+ private T addParams(T event) {
+ event.putParameter(HIVE_VERSION.varname(), HiveVersionInfo.getVersion());
+ event.putParameter(METASTOREURIS.varname, config.get(METASTOREURIS.varname));
+ return event;
+ }
+
+ public SerializableCreateTableEvent create(CreateTableEvent event) {
+ return new SerializableCreateTableEvent(addParams(event));
+ }
+
+ public SerializableAlterTableEvent create(AlterTableEvent event) {
+ return new SerializableAlterTableEvent(addParams(event));
+ }
+
+ public SerializableDropTableEvent create(DropTableEvent event) {
+ return new SerializableDropTableEvent(addParams(event));
+ }
+
+ public SerializableAddPartitionEvent create(AddPartitionEvent event) {
+ return new SerializableAddPartitionEvent(addParams(event));
+ }
+
+ public SerializableAlterPartitionEvent create(AlterPartitionEvent event) {
+ return new SerializableAlterPartitionEvent(addParams(event));
+ }
+
+ public SerializableDropPartitionEvent create(DropPartitionEvent event) {
+ return new SerializableDropPartitionEvent(addParams(event));
+ }
+
+ public SerializableInsertEvent create(InsertEvent event) {
+ return new SerializableInsertEvent(addParams(event));
+ }
+
+}
diff --git a/shunting-yard-common/src/main/java/com/hotels/shunting/yard/common/event/SerializableListenerEvent.java b/shunting-yard-common/src/main/java/com/hotels/shunting/yard/common/event/SerializableListenerEvent.java
index 3f059a2..c07a806 100644
--- a/shunting-yard-common/src/main/java/com/hotels/shunting/yard/common/event/SerializableListenerEvent.java
+++ b/shunting-yard-common/src/main/java/com/hotels/shunting/yard/common/event/SerializableListenerEvent.java
@@ -1,5 +1,5 @@
/**
- * Copyright (C) 2016-2018 Expedia Inc.
+ * Copyright (C) 2016-2019 Expedia Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -55,9 +55,9 @@ protected SerializableListenerEvent(ListenerEvent event) {
environmentContext = event.getEnvironmentContext();
}
- public EventType getEventType() {
- return EventType.forClass(this.getClass());
- }
+ // public EventType getEventType() {
+ // return EventType.forClass(this.getClass());
+ // }
public abstract String getDatabaseName();
diff --git a/shunting-yard-common/src/main/java/com/hotels/shunting/yard/common/io/MetaStoreEventSerDe.java b/shunting-yard-common/src/main/java/com/hotels/shunting/yard/common/io/MetaStoreEventSerDe.java
index e86da3c..a7d8df4 100644
--- a/shunting-yard-common/src/main/java/com/hotels/shunting/yard/common/io/MetaStoreEventSerDe.java
+++ b/shunting-yard-common/src/main/java/com/hotels/shunting/yard/common/io/MetaStoreEventSerDe.java
@@ -1,5 +1,5 @@
/**
- * Copyright (C) 2016-2018 Expedia Inc.
+ * Copyright (C) 2016-2019 Expedia Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,9 +16,9 @@
package com.hotels.shunting.yard.common.io;
import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.events.ListenerEvent;
import com.hotels.shunting.yard.common.ShuntingYardException;
-import com.hotels.shunting.yard.common.event.SerializableListenerEvent;
public interface MetaStoreEventSerDe {
@@ -31,8 +31,8 @@ static T serDeForClassName(String className) {
}
}
- byte[] marshal(SerializableListenerEvent listenerEvent) throws MetaException;
+ byte[] marshal(ListenerEvent listenerEvent) throws MetaException;
- T unmarshal(byte[] payload) throws MetaException;
+ T unmarshal(byte[] payload) throws MetaException;
}
diff --git a/shunting-yard-common/src/main/java/com/hotels/shunting/yard/common/io/jackson/ImmutableMapSerializer.java b/shunting-yard-common/src/main/java/com/hotels/shunting/yard/common/io/jackson/ImmutableMapSerializer.java
new file mode 100644
index 0000000..4d3226e
--- /dev/null
+++ b/shunting-yard-common/src/main/java/com/hotels/shunting/yard/common/io/jackson/ImmutableMapSerializer.java
@@ -0,0 +1,107 @@
+/**
+ * Copyright (C) 2016-2019 Expedia Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.hotels.shunting.yard.common.io.jackson;
+
+import java.util.EnumMap;
+import java.util.HashMap;
+import java.util.Map;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableTable;
+import com.google.common.collect.Maps;
+
+public class ImmutableMapSerializer extends Serializer> {
+ private static final boolean DOES_NOT_ACCEPT_NULL = true;
+ private static final boolean IMMUTABLE = true;
+
+ public ImmutableMapSerializer() {
+ super(DOES_NOT_ACCEPT_NULL, IMMUTABLE);
+ }
+
+ @Override
+ public void write(Kryo kryo, Output output, ImmutableMap
diff --git a/shunting-yard-emitter/shunting-yard-emitter-kafka/pom.xml b/shunting-yard-emitter/shunting-yard-emitter-kafka/pom.xml
index c3dbf24..3edd19e 100644
--- a/shunting-yard-emitter/shunting-yard-emitter-kafka/pom.xml
+++ b/shunting-yard-emitter/shunting-yard-emitter-kafka/pom.xml
@@ -74,6 +74,7 @@
com.hotels:shunting-yard-emitter*
com.hotels:hcommon-hive-metastore
org.apache.kafka:*
+ com.esotericsoftware:*
diff --git a/shunting-yard-emitter/shunting-yard-emitter-kafka/src/main/java/com/hotels/shunting/yard/emitter/kafka/KafkaProducerProperty.java b/shunting-yard-emitter/shunting-yard-emitter-kafka/src/main/java/com/hotels/shunting/yard/emitter/kafka/KafkaProducerProperty.java
index 08cf52e..d2bd3ca 100644
--- a/shunting-yard-emitter/shunting-yard-emitter-kafka/src/main/java/com/hotels/shunting/yard/emitter/kafka/KafkaProducerProperty.java
+++ b/shunting-yard-emitter/shunting-yard-emitter-kafka/src/main/java/com/hotels/shunting/yard/emitter/kafka/KafkaProducerProperty.java
@@ -1,5 +1,5 @@
/**
- * Copyright (C) 2016-2018 Expedia Inc.
+ * Copyright (C) 2016-2019 Expedia Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,7 +16,7 @@
package com.hotels.shunting.yard.emitter.kafka;
import com.hotels.shunting.yard.common.Property;
-import com.hotels.shunting.yard.common.io.jackson.JsonMetaStoreEventSerDe;
+import com.hotels.shunting.yard.common.io.jackson.KryoMetaStoreEventSerDe;
public enum KafkaProducerProperty implements Property {
TOPIC("topic", null),
@@ -28,7 +28,7 @@ public enum KafkaProducerProperty implements Property {
BATCH_SIZE("batch.size", 16384),
LINGER_MS("linger.ms", 1L),
BUFFER_MEMORY("buffer.memory", 33554432L),
- SERDE_CLASS("serde.class", JsonMetaStoreEventSerDe.class.getName());
+ SERDE_CLASS("serde.class", KryoMetaStoreEventSerDe.class.getName());
private static final String PROPERTY_PREFIX = "com.hotels.shunting.yard.event.emitter.kafka.";
diff --git a/shunting-yard-emitter/shunting-yard-emitter-kafka/src/main/java/com/hotels/shunting/yard/emitter/kafka/messaging/KafkaMessageTask.java b/shunting-yard-emitter/shunting-yard-emitter-kafka/src/main/java/com/hotels/shunting/yard/emitter/kafka/messaging/KafkaMessageTask.java
index 9a73161..e10f326 100644
--- a/shunting-yard-emitter/shunting-yard-emitter-kafka/src/main/java/com/hotels/shunting/yard/emitter/kafka/messaging/KafkaMessageTask.java
+++ b/shunting-yard-emitter/shunting-yard-emitter-kafka/src/main/java/com/hotels/shunting/yard/emitter/kafka/messaging/KafkaMessageTask.java
@@ -1,5 +1,5 @@
/**
- * Copyright (C) 2016-2018 Expedia Inc.
+ * Copyright (C) 2016-2019 Expedia Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -17,6 +17,7 @@
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.header.Headers;
import com.hotels.shunting.yard.common.messaging.Message;
import com.hotels.shunting.yard.common.messaging.MessageTask;
@@ -36,7 +37,13 @@ class KafkaMessageTask implements MessageTask {
@Override
public void run() {
- producer.send(new ProducerRecord<>(topic, partition(), message.getTimestamp(), message.getPayload()));
+ ProducerRecord pr = new ProducerRecord(topic, partition(), message.getTimestamp(),
+ message.getPayload());
+
+ Headers headers = pr.headers();
+ headers.add("eventType", message.getEventType().name().getBytes());
+ System.out.println("*********EVENT TYPE=" + message.getEventType().name());
+ producer.send(pr);
}
private int partition() {
diff --git a/shunting-yard-emitter/shunting-yard-emitter-kafka/src/test/java/com/hotels/shunting/yard/emitter/kafka/listener/KafkaMetaStoreEventListenerTest.java b/shunting-yard-emitter/shunting-yard-emitter-kafka/src/test/java/com/hotels/shunting/yard/emitter/kafka/listener/KafkaMetaStoreEventListenerTest.java
index d62e0a6..aa03fd9 100644
--- a/shunting-yard-emitter/shunting-yard-emitter-kafka/src/test/java/com/hotels/shunting/yard/emitter/kafka/listener/KafkaMetaStoreEventListenerTest.java
+++ b/shunting-yard-emitter/shunting-yard-emitter-kafka/src/test/java/com/hotels/shunting/yard/emitter/kafka/listener/KafkaMetaStoreEventListenerTest.java
@@ -1,5 +1,5 @@
/**
- * Copyright (C) 2016-2018 Expedia Inc.
+ * Copyright (C) 2016-2019 Expedia Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -40,6 +40,7 @@
import org.apache.hadoop.hive.metastore.events.DropPartitionEvent;
import org.apache.hadoop.hive.metastore.events.DropTableEvent;
import org.apache.hadoop.hive.metastore.events.InsertEvent;
+import org.apache.hadoop.hive.metastore.events.ListenerEvent;
import org.apache.hadoop.hive.metastore.events.LoadPartitionDoneEvent;
import org.junit.Before;
import org.junit.Test;
@@ -56,7 +57,6 @@
import com.hotels.shunting.yard.common.event.SerializableDropPartitionEvent;
import com.hotels.shunting.yard.common.event.SerializableDropTableEvent;
import com.hotels.shunting.yard.common.event.SerializableInsertEvent;
-import com.hotels.shunting.yard.common.event.SerializableListenerEvent;
import com.hotels.shunting.yard.common.event.SerializableListenerEventFactory;
import com.hotels.shunting.yard.common.io.MetaStoreEventSerDe;
import com.hotels.shunting.yard.common.messaging.Message;
@@ -73,7 +73,7 @@ public class KafkaMetaStoreEventListenerTest {
private @Mock MetaStoreEventSerDe eventSerDe;
private @Mock MessageTask messageTask;
private @Mock MessageTaskFactory messageTaskFactory;
- private @Mock SerializableListenerEventFactory serializableListenerEventFactory;
+ private @Mock SerializableListenerEventFactory ListenerEventFactory;
private @Mock ExecutorService executorService;
private final Configuration config = new Configuration();
@@ -81,7 +81,7 @@ public class KafkaMetaStoreEventListenerTest {
@Before
public void init() throws Exception {
- when(eventSerDe.marshal(any(SerializableListenerEvent.class))).thenReturn(PAYLOAD);
+ when(eventSerDe.marshal(any(ListenerEvent.class))).thenReturn(PAYLOAD);
when(messageTaskFactory.newTask(any(Message.class))).thenReturn(messageTask);
doAnswer(new Answer() {
@Override
@@ -90,7 +90,7 @@ public Void answer(InvocationOnMock invocation) throws Throwable {
return null;
}
}).when(executorService).submit(any(Runnable.class));
- listener = new KafkaMetaStoreEventListener(config, serializableListenerEventFactory, eventSerDe, messageTaskFactory,
+ listener = new KafkaMetaStoreEventListener(config, ListenerEventFactory, eventSerDe, messageTaskFactory,
executorService);
}
@@ -100,7 +100,7 @@ public void onCreateTable() throws Exception {
SerializableCreateTableEvent serializableEvent = mock(SerializableCreateTableEvent.class);
when(serializableEvent.getDatabaseName()).thenReturn(DATABASE);
when(serializableEvent.getTableName()).thenReturn(TABLE);
- when(serializableListenerEventFactory.create(event)).thenReturn(serializableEvent);
+ when(ListenerEventFactory.create(event)).thenReturn(serializableEvent);
listener.onCreateTable(event);
verify(messageTask).run();
}
@@ -111,7 +111,7 @@ public void onAlterTable() throws Exception {
SerializableAlterTableEvent serializableEvent = mock(SerializableAlterTableEvent.class);
when(serializableEvent.getDatabaseName()).thenReturn(DATABASE);
when(serializableEvent.getTableName()).thenReturn(TABLE);
- when(serializableListenerEventFactory.create(event)).thenReturn(serializableEvent);
+ when(ListenerEventFactory.create(event)).thenReturn(serializableEvent);
listener.onAlterTable(event);
verify(messageTask).run();
}
@@ -122,7 +122,7 @@ public void onDropTable() throws Exception {
SerializableDropTableEvent serializableEvent = mock(SerializableDropTableEvent.class);
when(serializableEvent.getDatabaseName()).thenReturn(DATABASE);
when(serializableEvent.getTableName()).thenReturn(TABLE);
- when(serializableListenerEventFactory.create(event)).thenReturn(serializableEvent);
+ when(ListenerEventFactory.create(event)).thenReturn(serializableEvent);
listener.onDropTable(event);
verify(messageTask).run();
}
@@ -133,7 +133,7 @@ public void onAddPartition() throws Exception {
SerializableAddPartitionEvent serializableEvent = mock(SerializableAddPartitionEvent.class);
when(serializableEvent.getDatabaseName()).thenReturn(DATABASE);
when(serializableEvent.getTableName()).thenReturn(TABLE);
- when(serializableListenerEventFactory.create(event)).thenReturn(serializableEvent);
+ when(ListenerEventFactory.create(event)).thenReturn(serializableEvent);
listener.onAddPartition(event);
verify(messageTask).run();
}
@@ -144,7 +144,7 @@ public void onAlterPartition() throws Exception {
SerializableAlterPartitionEvent serializableEvent = mock(SerializableAlterPartitionEvent.class);
when(serializableEvent.getDatabaseName()).thenReturn(DATABASE);
when(serializableEvent.getTableName()).thenReturn(TABLE);
- when(serializableListenerEventFactory.create(event)).thenReturn(serializableEvent);
+ when(ListenerEventFactory.create(event)).thenReturn(serializableEvent);
listener.onAlterPartition(event);
verify(messageTask).run();
}
@@ -155,7 +155,7 @@ public void onDropPartition() throws Exception {
SerializableDropPartitionEvent serializableEvent = mock(SerializableDropPartitionEvent.class);
when(serializableEvent.getDatabaseName()).thenReturn(DATABASE);
when(serializableEvent.getTableName()).thenReturn(TABLE);
- when(serializableListenerEventFactory.create(event)).thenReturn(serializableEvent);
+ when(ListenerEventFactory.create(event)).thenReturn(serializableEvent);
listener.onDropPartition(event);
verify(messageTask).run();
}
@@ -166,7 +166,7 @@ public void onInsert() throws Exception {
SerializableInsertEvent serializableEvent = mock(SerializableInsertEvent.class);
when(serializableEvent.getDatabaseName()).thenReturn(DATABASE);
when(serializableEvent.getTableName()).thenReturn(TABLE);
- when(serializableListenerEventFactory.create(event)).thenReturn(serializableEvent);
+ when(ListenerEventFactory.create(event)).thenReturn(serializableEvent);
listener.onInsert(event);
verify(messageTask).run();
}
@@ -175,63 +175,63 @@ public void onInsert() throws Exception {
public void onConfigChange() throws Exception {
listener.onConfigChange(mock(ConfigChangeEvent.class));
verify(executorService, never()).submit(any(Runnable.class));
- verify(eventSerDe, never()).marshal(any(SerializableListenerEvent.class));
+ verify(eventSerDe, never()).marshal(any(ListenerEvent.class));
}
@Test
public void onCreateDatabase() throws Exception {
listener.onCreateDatabase(mock(CreateDatabaseEvent.class));
verify(executorService, never()).submit(any(Runnable.class));
- verify(eventSerDe, never()).marshal(any(SerializableListenerEvent.class));
+ verify(eventSerDe, never()).marshal(any(ListenerEvent.class));
}
@Test
public void onDropDatabase() throws Exception {
listener.onDropDatabase(mock(DropDatabaseEvent.class));
verify(executorService, never()).submit(any(Runnable.class));
- verify(eventSerDe, never()).marshal(any(SerializableListenerEvent.class));
+ verify(eventSerDe, never()).marshal(any(ListenerEvent.class));
}
@Test
public void onLoadPartitionDone() throws Exception {
listener.onLoadPartitionDone(mock(LoadPartitionDoneEvent.class));
verify(executorService, never()).submit(any(Runnable.class));
- verify(eventSerDe, never()).marshal(any(SerializableListenerEvent.class));
+ verify(eventSerDe, never()).marshal(any(ListenerEvent.class));
}
@Test
public void onAddIndex() throws Exception {
listener.onAddIndex(mock(AddIndexEvent.class));
verify(executorService, never()).submit(any(Runnable.class));
- verify(eventSerDe, never()).marshal(any(SerializableListenerEvent.class));
+ verify(eventSerDe, never()).marshal(any(ListenerEvent.class));
}
@Test
public void onDropIndex() throws Exception {
listener.onDropIndex(mock(DropIndexEvent.class));
verify(executorService, never()).submit(any(Runnable.class));
- verify(eventSerDe, never()).marshal(any(SerializableListenerEvent.class));
+ verify(eventSerDe, never()).marshal(any(ListenerEvent.class));
}
@Test
public void onAlterIndex() throws Exception {
listener.onAlterIndex(mock(AlterIndexEvent.class));
verify(executorService, never()).submit(any(Runnable.class));
- verify(eventSerDe, never()).marshal(any(SerializableListenerEvent.class));
+ verify(eventSerDe, never()).marshal(any(ListenerEvent.class));
}
@Test
public void onCreateFunction() throws Exception {
listener.onCreateFunction(mock(CreateFunctionEvent.class));
verify(executorService, never()).submit(any(Runnable.class));
- verify(eventSerDe, never()).marshal(any(SerializableListenerEvent.class));
+ verify(eventSerDe, never()).marshal(any(ListenerEvent.class));
}
@Test
public void onDropFunction() throws Exception {
listener.onDropFunction(mock(DropFunctionEvent.class));
verify(executorService, never()).submit(any(Runnable.class));
- verify(eventSerDe, never()).marshal(any(SerializableListenerEvent.class));
+ verify(eventSerDe, never()).marshal(any(ListenerEvent.class));
}
}
diff --git a/shunting-yard-emitter/shunting-yard-emitter-sqs/src/main/java/com/hotels/shunting/yard/emitter/sqs/messaging/SqsMessageTask.java b/shunting-yard-emitter/shunting-yard-emitter-sqs/src/main/java/com/hotels/shunting/yard/emitter/sqs/messaging/SqsMessageTask.java
index efe3584..d764627 100644
--- a/shunting-yard-emitter/shunting-yard-emitter-sqs/src/main/java/com/hotels/shunting/yard/emitter/sqs/messaging/SqsMessageTask.java
+++ b/shunting-yard-emitter/shunting-yard-emitter-sqs/src/main/java/com/hotels/shunting/yard/emitter/sqs/messaging/SqsMessageTask.java
@@ -1,5 +1,5 @@
/**
- * Copyright (C) 2016-2018 Expedia Inc.
+ * Copyright (C) 2016-2019 Expedia Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -15,7 +15,6 @@
*/
package com.hotels.shunting.yard.emitter.sqs.messaging;
-import org.datanucleus.util.Base64;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,11 +42,12 @@ class SqsMessageTask implements MessageTask {
@Override
public void run() {
LOG.info("Sending message to topic {} and group ID {}", topic, messageGroupId);
- producer.sendMessage(new SendMessageRequest()
- .withQueueUrl(topic)
- .withMessageGroupId(messageGroupId)
- .withMessageBody(new String(Base64.encode(payload)))
- .withDelaySeconds(0));
+ producer
+ .sendMessage(new SendMessageRequest()
+ .withQueueUrl(topic)
+ .withMessageGroupId(messageGroupId)
+ .withMessageBody(new String(payload))
+ .withDelaySeconds(0));
}
}
diff --git a/shunting-yard-receiver/shunting-yard-receiver-kafka/src/main/java/com/hotels/shunting/yard/receiver/kafka/messaging/KafkaMessageReader.java b/shunting-yard-receiver/shunting-yard-receiver-kafka/src/main/java/com/hotels/shunting/yard/receiver/kafka/messaging/KafkaMessageReader.java
index ffad984..e7cc36e 100644
--- a/shunting-yard-receiver/shunting-yard-receiver-kafka/src/main/java/com/hotels/shunting/yard/receiver/kafka/messaging/KafkaMessageReader.java
+++ b/shunting-yard-receiver/shunting-yard-receiver-kafka/src/main/java/com/hotels/shunting/yard/receiver/kafka/messaging/KafkaMessageReader.java
@@ -1,5 +1,5 @@
/**
- * Copyright (C) 2016-2018 Expedia Inc.
+ * Copyright (C) 2016-2019 Expedia Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -24,12 +24,14 @@
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.events.ListenerEvent;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
import com.google.common.annotations.VisibleForTesting;
-import com.hotels.shunting.yard.common.event.SerializableListenerEvent;
import com.hotels.shunting.yard.common.io.MetaStoreEventSerDe;
import com.hotels.shunting.yard.common.io.SerDeException;
import com.hotels.shunting.yard.common.messaging.MessageReader;
@@ -75,7 +77,7 @@ public boolean hasNext() {
}
@Override
- public SerializableListenerEvent next() {
+ public ListenerEvent next() {
readRecordsIfNeeded();
return eventPayLoad(records.next());
}
@@ -86,8 +88,16 @@ private void readRecordsIfNeeded() {
}
}
- private SerializableListenerEvent eventPayLoad(ConsumerRecord message) {
+ private ListenerEvent eventPayLoad(ConsumerRecord message) {
try {
+ Headers headers = message.headers();
+ Iterator itr = headers.headers("eventType").iterator();
+
+ while (itr.hasNext()) {
+ Header header = itr.next();
+ System.out.println(new String(header.value()));
+ }
+
return eventSerDe.unmarshal(message.value());
} catch (Exception e) {
throw new SerDeException("Unable to unmarshall event", e);
diff --git a/shunting-yard-replicator/src/test/java/com/hotels/shunting/yard/replicator/exec/messaging/MessageReaderAdapterTest.java b/shunting-yard-replicator/src/test/java/com/hotels/shunting/yard/replicator/exec/messaging/MessageReaderAdapterTest.java
index e8db65f..0fee8a8 100644
--- a/shunting-yard-replicator/src/test/java/com/hotels/shunting/yard/replicator/exec/messaging/MessageReaderAdapterTest.java
+++ b/shunting-yard-replicator/src/test/java/com/hotels/shunting/yard/replicator/exec/messaging/MessageReaderAdapterTest.java
@@ -1,3 +1,18 @@
+/**
+ * Copyright (C) 2016-2019 Expedia Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package com.hotels.shunting.yard.replicator.exec.messaging;
import static org.assertj.core.api.Assertions.assertThat;