From 07a2078094bf34b823e064cdaf88fd0ff02f075c Mon Sep 17 00:00:00 2001 From: ARUN Sai Date: Wed, 9 Sep 2020 14:47:30 -0700 Subject: [PATCH 1/3] Basic implementation of MySqlAccountService with following functionalities. a) Fetch all accounts and container metadata from DB and initialize cache on boot-up, b) Implement AccountService interface update APIs to write changes to mysql database and getter APIs to read changes from cache. --- .../ambry/account/AbstractAccountService.java | 183 +----------- .../github/ambry/account/AccountInfoMap.java | 267 +++++++++++++++++ .../ambry/account/MySqlAccountService.java | 278 ++++++++++++++++++ .../ambry/account/MySqlAccountStore.java | 103 +++++++ .../ambry/account/mysql/AccountDao.java | 47 ++- .../ambry/account/mysql/ContainerDao.java | 21 +- .../account/mysql/MySqlDataAccessor.java | 2 +- .../com/github/ambry/account/Account.java | 16 +- .../ambry/account/MySqlAccountsDBTool.java | 19 +- 9 files changed, 730 insertions(+), 206 deletions(-) create mode 100644 ambry-account/src/main/java/com/github/ambry/account/AccountInfoMap.java create mode 100644 ambry-account/src/main/java/com/github/ambry/account/MySqlAccountService.java create mode 100644 ambry-account/src/main/java/com/github/ambry/account/MySqlAccountStore.java diff --git a/ambry-account/src/main/java/com/github/ambry/account/AbstractAccountService.java b/ambry-account/src/main/java/com/github/ambry/account/AbstractAccountService.java index 17d5ad05ee..906c900148 100644 --- a/ambry-account/src/main/java/com/github/ambry/account/AbstractAccountService.java +++ b/ambry-account/src/main/java/com/github/ambry/account/AbstractAccountService.java @@ -13,15 +13,10 @@ */ package com.github.ambry.account; -import org.json.JSONArray; -import org.json.JSONException; -import org.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import java.util.*; import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; @@ -102,8 +97,8 @@ boolean hasConflictingAccount(Collection accountsToSet) { Account accountInMap = getAccountById(account.getId()); if (accountInMap != null && account.getSnapshotVersion() != accountInMap.getSnapshotVersion()) { logger.error( - "Account to update (accountId={} accountName={}) has an unexpected snapshot version in zk (expected={}, encountered={})", - account.getId(), account.getName(), account.getSnapshotVersion(), accountInMap.getSnapshotVersion()); + "Account to update (accountId={} accountName={}) has an unexpected snapshot version in zk (expected={}, encountered={})", + account.getId(), account.getName(), account.getSnapshotVersion(), accountInMap.getSnapshotVersion()); return true; } // check that there are no other accounts that conflict with the name of the account to update @@ -111,8 +106,8 @@ boolean hasConflictingAccount(Collection accountsToSet) { Account potentialConflict = getAccountByName(account.getName()); if (potentialConflict != null && potentialConflict.getId() != account.getId()) { logger.error( - "Account to update (accountId={} accountName={}) conflicts with an existing record (accountId={} accountName={})", - account.getId(), account.getName(), potentialConflict.getId(), potentialConflict.getName()); + "Account to update (accountId={} accountName={}) conflicts with an existing record (accountId={} accountName={})", + account.getId(), account.getName(), potentialConflict.getId(), potentialConflict.getName()); return true; } } @@ -126,7 +121,7 @@ boolean hasConflictingAccount(Collection accountsToSet) { * @param isCalledFromListener {@code true} if the caller is the account update listener, {@@code false} otherwise. */ protected void notifyAccountUpdateConsumers(AccountInfoMap newAccountInfoMap, AccountInfoMap oldAccountInfoMap, - boolean isCalledFromListener) { + boolean isCalledFromListener) { Map idToUpdatedAccounts = new HashMap<>(); for (Account newAccount : newAccountInfoMap.getAccounts()) { if (!newAccount.equals(oldAccountInfoMap.getAccountById(newAccount.getId()))) { @@ -135,7 +130,7 @@ protected void notifyAccountUpdateConsumers(AccountInfoMap newAccountInfoMap, Ac } if (idToUpdatedAccounts.size() > 0) { logger.info("Received updates for {} accounts. Received from listener={}. Account IDs={}", - idToUpdatedAccounts.size(), isCalledFromListener, idToUpdatedAccounts.keySet()); + idToUpdatedAccounts.size(), isCalledFromListener, idToUpdatedAccounts.keySet()); // @todo In long run, this metric is not necessary. if (isCalledFromListener) { accountServiceMetrics.accountUpdatesCapturedByScheduledUpdaterCount.inc(); @@ -147,7 +142,7 @@ protected void notifyAccountUpdateConsumers(AccountInfoMap newAccountInfoMap, Ac accountUpdateConsumer.accept(updatedAccounts); long consumerExecutionTimeInMs = System.currentTimeMillis() - startTime; logger.trace("Consumer={} has been notified for account change, took {} ms", accountUpdateConsumer, - consumerExecutionTimeInMs); + consumerExecutionTimeInMs); accountServiceMetrics.accountUpdateConsumerTimeInMs.update(consumerExecutionTimeInMs); } catch (Exception e) { logger.error("Exception occurred when notifying accountUpdateConsumer={}", accountUpdateConsumer, e); @@ -159,167 +154,3 @@ protected void notifyAccountUpdateConsumers(AccountInfoMap newAccountInfoMap, Ac } } -/** - *

- * A helper class that represents a collection of {@link Account}s, where the ids and names of the - * {@link Account}s are one-to-one mapped. An {@code AccountInfoMap} guarantees no duplicated account - * id or name, nor conflict among the {@link Account}s within it. - *

- *

- * Based on the properties, a {@code AccountInfoMap} internally builds index for {@link Account}s using both - * {@link Account}'s id and name as key. - *

- */ -class AccountInfoMap { - private final Map nameToAccountMap; - private final Map idToAccountMap; - private final static Logger logger = LoggerFactory.getLogger(AccountInfoMap.class); - - /** - * Constructor for an empty {@code AccountInfoMap}. - */ - AccountInfoMap(AccountServiceMetrics accountServiceMetrics) { - this(accountServiceMetrics, new HashMap<>()); - } - - /** - *

- * Constructs an {@code AccountInfoMap} from a group of {@link Account}s. The {@link Account}s exists - * in the form of a string-to-string map, where the key is the string form of an {@link Account}'s id, - * and the value is the string form of the {@link Account}'s JSON string. - *

- *

- * The source {@link Account}s in the {@code accountMap} may duplicate account ids or names, or corrupted - * JSON strings that cannot be parsed as valid {@link JSONObject}. In such cases, construction of - * {@code AccountInfoMap} will fail. - *

- * @param accountMap A map of {@link Account}s in the form of (accountIdString, accountJSONString). - * @throws JSONException If parsing account data in json fails. - */ - AccountInfoMap(AccountServiceMetrics accountServiceMetrics, Map accountMap) throws JSONException { - nameToAccountMap = new HashMap<>(); - idToAccountMap = new HashMap<>(); - for (Map.Entry entry : accountMap.entrySet()) { - String idKey = entry.getKey(); - String valueString = entry.getValue(); - Account account; - JSONObject accountJson = new JSONObject(valueString); - if (idKey == null) { - accountServiceMetrics.remoteDataCorruptionErrorCount.inc(); - throw new IllegalStateException( - "Invalid account record when reading accountMap in ZNRecord because idKey=null"); - } - account = Account.fromJson(accountJson); - if (account.getId() != Short.valueOf(idKey)) { - accountServiceMetrics.remoteDataCorruptionErrorCount.inc(); - throw new IllegalStateException( - "Invalid account record when reading accountMap in ZNRecord because idKey and accountId do not match. idKey=" - + idKey + " accountId=" + account.getId()); - } - if (idToAccountMap.containsKey(account.getId()) || nameToAccountMap.containsKey(account.getName())) { - throw new IllegalStateException( - "Duplicate account id or name exists. id=" + account.getId() + " name=" + account.getName()); - } - idToAccountMap.put(account.getId(), account); - nameToAccountMap.put(account.getName(), account); - } - } - - /** - *

- * Constructs an AccountInfoMap instance from one single JSON file which contains multiple accounts. - *

- *

- * This constructor is primarily useful if the source of the accounts is a flat storage for example a local - * JSON file. - *

- * - * @param accountsJsonString JSON data containing an array of all accounts. - * @throws JSONException If parsing account data in json fails. - */ - protected AccountInfoMap(String accountsJsonString) throws JSONException { - nameToAccountMap = new HashMap<>(); - idToAccountMap = new HashMap<>(); - - JSONArray accountArray = new JSONArray(accountsJsonString); - - for (int i = 0; i < accountArray.length(); i++) { - JSONObject accountJson = accountArray.getJSONObject(i); - Account account = Account.fromJson(accountJson); - - if (idToAccountMap.containsKey(account.getId()) || nameToAccountMap.containsKey(account.getName())) { - throw new IllegalStateException( - "Duplicate account id or name exists. id=" + account.getId() + " name=" + account.getName()); - } - - idToAccountMap.put(account.getId(), account); - nameToAccountMap.put(account.getName(), account); - } - } - - /** - * Gets {@link Account} by its id. - * @param id The id to get the {@link Account}. - * @return The {@link Account} with the given id, or {@code null} if such an {@link Account} does not exist. - */ - Account getAccountById(Short id) { - return idToAccountMap.get(id); - } - - /** - * Gets {@link Account} by its name. - * @param name The id to get the {@link Account}. - * @return The {@link Account} with the given name, or {@code null} if such an {@link Account} does not exist. - */ - Account getAccountByName(String name) { - return nameToAccountMap.get(name); - } - - /** - * Gets all the {@link Account}s in this {@code AccountInfoMap} in a {@link Collection}. - * @return A {@link Collection} of all the {@link Account}s in this map. - */ - Collection getAccounts() { - return Collections.unmodifiableCollection(idToAccountMap.values()); - } - - /** - * Return true if there is no accounts in this info map. - * @return True when there is no accounts. - */ - boolean isEmpty() { - return idToAccountMap.isEmpty(); - } - - /** - * Checks if there is any {@link Account} in a given collection of {@link Account}s conflicts against any {@link Account} - * in a {@link AccountInfoMap}, according to the Javadoc of {@link AccountService}. Two {@link Account}s can be - * conflicting with each other if they have different account Ids but the same account name. - * - * @param accountsToSet The collection of {@link Account}s to check conflict. - * @return {@code true} if there is at least one {@link Account} in {@code accountPairs} conflicts with the existing - * {@link Account}s in {@link AccountInfoMap}, {@code false} otherwise. - */ - boolean hasConflictingAccount(Collection accountsToSet) { - for (Account account : accountsToSet) { - // if the account already exists, check that the snapshot version matches the expected value. - Account accountInMap = getAccountById(account.getId()); - if (accountInMap != null && account.getSnapshotVersion() != accountInMap.getSnapshotVersion()) { - logger.error( - "Account to update (accountId={} accountName={}) has an unexpected snapshot version in zk (expected={}, encountered={})", - account.getId(), account.getName(), account.getSnapshotVersion(), accountInMap.getSnapshotVersion()); - return true; - } - // check that there are no other accounts that conflict with the name of the account to update - // (case D and E from the javadoc) - Account potentialConflict = getAccountByName(account.getName()); - if (potentialConflict != null && potentialConflict.getId() != account.getId()) { - logger.error( - "Account to update (accountId={} accountName={}) conflicts with an existing record (accountId={} accountName={})", - account.getId(), account.getName(), potentialConflict.getId(), potentialConflict.getName()); - return true; - } - } - return false; - } -} diff --git a/ambry-account/src/main/java/com/github/ambry/account/AccountInfoMap.java b/ambry-account/src/main/java/com/github/ambry/account/AccountInfoMap.java new file mode 100644 index 0000000000..3aee5fc9be --- /dev/null +++ b/ambry-account/src/main/java/com/github/ambry/account/AccountInfoMap.java @@ -0,0 +1,267 @@ +package com.github.ambry.account; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.json.JSONArray; +import org.json.JSONException; +import org.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + *

+ * A class that represents a collection of {@link Account}s, where the ids and names of the + * {@link Account}s are one-to-one mapped. An {@code AccountInfoMap} guarantees no duplicated account + * id or name, nor conflict among the {@link Account}s within it. + *

+ *

+ * Based on the properties, a {@code AccountInfoMap} internally builds index for {@link Account}s using both + * {@link Account}'s id and name as key. + *

+ */ +class AccountInfoMap { + private final Map nameToAccountMap; + private final Map idToAccountMap; + // used to track last modified time of the accounts and containers in this cache + private long lastModifiedTime = 0; + private final static Logger logger = LoggerFactory.getLogger(AccountInfoMap.class); + + /** + * Constructor for an empty {@code AccountInfoMap}. + */ + AccountInfoMap(AccountServiceMetrics accountServiceMetrics) { + this(accountServiceMetrics, new HashMap<>()); + } + + /** + *

+ * Constructs an {@code AccountInfoMap} from a group of {@link Account}s. The {@link Account}s exists + * in the form of a string-to-string map, where the key is the string form of an {@link Account}'s id, + * and the value is the string form of the {@link Account}'s JSON string. + *

+ *

+ * The source {@link Account}s in the {@code accountMap} may duplicate account ids or names, or corrupted + * JSON strings that cannot be parsed as valid {@link JSONObject}. In such cases, construction of + * {@code AccountInfoMap} will fail. + *

+ * @param accountMap A map of {@link Account}s in the form of (accountIdString, accountJSONString). + * @throws JSONException If parsing account data in json fails. + */ + AccountInfoMap(AccountServiceMetrics accountServiceMetrics, Map accountMap) throws JSONException { + nameToAccountMap = new HashMap<>(); + idToAccountMap = new HashMap<>(); + for (Map.Entry entry : accountMap.entrySet()) { + String idKey = entry.getKey(); + String valueString = entry.getValue(); + Account account; + JSONObject accountJson = new JSONObject(valueString); + if (idKey == null) { + accountServiceMetrics.remoteDataCorruptionErrorCount.inc(); + throw new IllegalStateException( + "Invalid account record when reading accountMap in ZNRecord because idKey=null"); + } + account = Account.fromJson(accountJson); + if (account.getId() != Short.valueOf(idKey)) { + accountServiceMetrics.remoteDataCorruptionErrorCount.inc(); + throw new IllegalStateException( + "Invalid account record when reading accountMap in ZNRecord because idKey and accountId do not match. idKey=" + + idKey + " accountId=" + account.getId()); + } + if (idToAccountMap.containsKey(account.getId()) || nameToAccountMap.containsKey(account.getName())) { + throw new IllegalStateException( + "Duplicate account id or name exists. id=" + account.getId() + " name=" + account.getName()); + } + idToAccountMap.put(account.getId(), account); + nameToAccountMap.put(account.getName(), account); + } + } + + /** + *

+ * Constructs an AccountInfoMap instance from one single JSON file which contains multiple accounts. + *

+ *

+ * This constructor is primarily useful if the source of the accounts is a flat storage for example a local + * JSON file. + *

+ * + * @param accountsJsonString JSON data containing an array of all accounts. + * @throws JSONException If parsing account data in json fails. + */ + protected AccountInfoMap(String accountsJsonString) throws JSONException { + nameToAccountMap = new HashMap<>(); + idToAccountMap = new HashMap<>(); + + JSONArray accountArray = new JSONArray(accountsJsonString); + + for (int i = 0; i < accountArray.length(); i++) { + JSONObject accountJson = accountArray.getJSONObject(i); + Account account = Account.fromJson(accountJson); + + if (idToAccountMap.containsKey(account.getId()) || nameToAccountMap.containsKey(account.getName())) { + throw new IllegalStateException( + "Duplicate account id or name exists. id=" + account.getId() + " name=" + account.getName()); + } + + idToAccountMap.put(account.getId(), account); + nameToAccountMap.put(account.getName(), account); + } + } + + /** + *

+ * Constructs an {@code AccountInfoMap} from a group of {@link Account}s. + *

+ *

+ * This is used when source {@link Account}s is a relational database. We won't need to check for duplicate IDs or names + * since they are ensured by applying unique key constraints in the DB. + *

+ * @param accounts A list of {@link Account}s. + */ + AccountInfoMap(List accounts) { + nameToAccountMap = new HashMap<>(); + idToAccountMap = new HashMap<>(); + updateAccounts(accounts); + } + + /** + * Gets {@link Account} by its id. + * @param id The id to get the {@link Account}. + * @return The {@link Account} with the given id, or {@code null} if such an {@link Account} does not exist. + */ + Account getAccountById(Short id) { + return idToAccountMap.get(id); + } + + /** + * Gets {@link Account} by its name. + * @param name The id to get the {@link Account}. + * @return The {@link Account} with the given name, or {@code null} if such an {@link Account} does not exist. + */ + Account getAccountByName(String name) { + return nameToAccountMap.get(name); + } + + /** + * Gets all the {@link Account}s in this {@code AccountInfoMap} in a {@link Collection}. + * @return A {@link Collection} of all the {@link Account}s in this map. + */ + Collection getAccounts() { + return Collections.unmodifiableCollection(idToAccountMap.values()); + } + + /** + * Return true if there is no accounts in this info map. + * @return True when there is no accounts. + */ + boolean isEmpty() { + return idToAccountMap.isEmpty(); + } + + /** + * Checks if there is any {@link Account} in a given collection of {@link Account}s conflicts against any {@link Account} + * in a {@link AccountInfoMap}, according to the Javadoc of {@link AccountService}. Two {@link Account}s can be + * conflicting with each other if they have different account Ids but the same account name. + * + * @param accountsToSet The collection of {@link Account}s to check conflict. + * @return {@code true} if there is at least one {@link Account} in {@code accountPairs} conflicts with the existing + * {@link Account}s in {@link AccountInfoMap}, {@code false} otherwise. + */ + boolean hasConflictingAccount(Collection accountsToSet) { + for (Account account : accountsToSet) { + // if the account already exists, check that the snapshot version matches the expected value. + Account accountInMap = getAccountById(account.getId()); + if (accountInMap != null && account.getSnapshotVersion() != accountInMap.getSnapshotVersion()) { + logger.error( + "Account to update (accountId={} accountName={}) has an unexpected snapshot version in zk (expected={}, encountered={})", + account.getId(), account.getName(), account.getSnapshotVersion(), accountInMap.getSnapshotVersion()); + return true; + } + // check that there are no other accounts that conflict with the name of the account to update + // (case D and E from the javadoc) + Account potentialConflict = getAccountByName(account.getName()); + if (potentialConflict != null && potentialConflict.getId() != account.getId()) { + logger.error( + "Account to update (accountId={} accountName={}) conflicts with an existing record (accountId={} accountName={})", + account.getId(), account.getName(), potentialConflict.getId(), potentialConflict.getName()); + return true; + } + } + return false; + } + + /** + * Updates the {@code AccountInfoMap} with the input {@link Collection} of {@link Account}s. + * @param accounts collection of {@link Account}s to be added. + */ + void updateAccounts(Collection accounts) { + for (Account account : accounts) { + Account accountToUpdate = idToAccountMap.get(account.getId()); + if (accountToUpdate == null) { + accountToUpdate = account; + } else { + accountToUpdate.setStatus(account.getStatus()); + accountToUpdate.setSnapshotVersion(account.getSnapshotVersion()); + accountToUpdate.updateContainers(account.getAllContainers()); + } + idToAccountMap.put(account.getId(), accountToUpdate); + nameToAccountMap.put(account.getName(), accountToUpdate); + } + } + + /** + * Updates the {@code AccountInfoMap} with the input {@link Collection} of {@link Container}s. + * @param containers collection of {@link Container}s to be added. + */ + void updateContainers(Collection containers) { + for (Container container : containers) { + addContainer(container.getParentAccountId(), container); + } + } + + /** + * Adds a {@link Container} to the {@link Account}. + * @param accountId The id of the parent {@link Account} for this {@link Container}. + * @param container {@link Container} to be added. + * @throws IllegalArgumentException if {@link Account} with provided id doesn't exist. + */ + void addContainer(short accountId, Container container) { + Account parentAccount = idToAccountMap.get(accountId); + if (parentAccount == null) { + throw new IllegalArgumentException("Account with ID " + accountId + "doesn't exist"); + } + idToAccountMap.get(accountId).updateContainers(Collections.singletonList(container)); + } + + /** + * Gets {@link Container} by its Parent Account id and id. + * @param accountId The id of the parent {@link Account} for this {@link Container}. + * @param id The id to get the {@link Container}. + * @return The {@link Container} with the given id within the parent Account Id, or {@code null} if + * such a parent {@link Account} or {@link Container} does not exist. + */ + Container getContainerByIdForAccount(Short accountId, short id) { + Account parentAccount = idToAccountMap.get(accountId); + return parentAccount == null ? null : parentAccount.getContainerById(id); + } + + /** + * Gets the last modified time of accounts and containers in this in-memory cache + * @return the last modified time of accounts and containers + */ + long getLastModifiedTime() { + return lastModifiedTime; + } + + /** + * Sets the last modified time of accounts and containers in this in-memory cache + * @param lastModifiedTime time when the accounts and containers were last updated + */ + void setLastModifiedTime(long lastModifiedTime) { + this.lastModifiedTime = lastModifiedTime; + } +} diff --git a/ambry-account/src/main/java/com/github/ambry/account/MySqlAccountService.java b/ambry-account/src/main/java/com/github/ambry/account/MySqlAccountService.java new file mode 100644 index 0000000000..f84623e571 --- /dev/null +++ b/ambry-account/src/main/java/com/github/ambry/account/MySqlAccountService.java @@ -0,0 +1,278 @@ +package com.github.ambry.account; + +import com.github.ambry.account.mysql.MySqlConfig; +import java.io.IOException; +import java.sql.SQLException; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Consumer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static com.github.ambry.account.AccountUtils.*; + + +public class MySqlAccountService implements AccountService { + + private MySqlAccountStore mySqlAccountStore = null; + private final AccountServiceMetrics accountServiceMetrics; + private final MySqlConfig mySqlConfig; + // in-memory cache for storing account and container metadata + private final AccountInfoMap accountInfoMap; + private static final Logger logger = LoggerFactory.getLogger(MySqlAccountService.class); + private final ReadWriteLock rwLock = new ReentrantReadWriteLock(); + + public MySqlAccountService(AccountServiceMetrics accountServiceMetrics, MySqlConfig mySqlConfig) { + this.accountServiceMetrics = accountServiceMetrics; + this.mySqlConfig = mySqlConfig; + accountInfoMap = new AccountInfoMap(accountServiceMetrics); + try { + createMySqlAccountStore(); + } catch (SQLException e) { + logger.error("MySQL account store creation failed", e); + // Continue account service creation. Cache will initialized with metadata from backup copy on local disk to serve read requests. + // Write requests will be blocked until MySql DB is up. Connection to MySql DB will be retried in polling thread that fetches new accounts. + } + + // TODO: create backup manager to manage local back up copies of Account and container metadata and lastModifiedTime + + initializeCache(); + + // TODO: Start background thread for periodically querying MYSql DB for added/modified accounts and containers. Also, retry creation of MySqlAccountStore if it failed above. + + // TODO: Subscribe to notifications from ZK + } + + /** + * creates MySql Account store which establishes connection to database + * @throws SQLException + */ + private void createMySqlAccountStore() throws SQLException { + if (mySqlAccountStore == null) { + try { + mySqlAccountStore = new MySqlAccountStore(this.mySqlConfig); + } catch (SQLException e) { + logger.error("MySQL account store creation failed", e); + throw e; + } + } + } + + /** + * Call to initialize in-memory cache by fetching all the {@link Account}s and {@link Container}s metadata records. + * It consists of 2 steps: + * 1. Check local disk for back up copy and load metadata and last modified time of Accounts/Containers into cache. + * 2. Fetch added/modified accounts and containers from mysql database since the last modified time (found in step 1) + * and load into cache. + */ + void initializeCache() { + // TODO: Check local disk for back up copy and load metadata and last modified time into cache. + fetchAndUpdateCache(); + } + + /** + * Fetches all the accounts and containers that have been created or modified since the last sync time and loads into + * cache. + */ + void fetchAndUpdateCache() { + try { + // Retry connection to mysql if we couldn't set up previously + createMySqlAccountStore(); + } catch (SQLException e) { + logger.error("Fetching Accounts from MySql DB failed", e); + return; + } + + // get the last sync time of accounts and containers in cache + long lastModifiedTime = accountInfoMap.getLastModifiedTime(); + + try { + // Fetch all added/modified accounts from MySql database since LMT + List accounts = mySqlAccountStore.getNewAccounts(lastModifiedTime); + rwLock.writeLock().lock(); + try { + accountInfoMap.updateAccounts(accounts); + } finally { + rwLock.writeLock().unlock(); + } + + // Fetch all added/modified containers from MySql database since LMT + List containers = mySqlAccountStore.getNewContainers(lastModifiedTime); + rwLock.writeLock().lock(); + try { + accountInfoMap.updateContainers(containers); + } finally { + rwLock.writeLock().unlock(); + } + + // TODO: Find the max LMT in the fetched accounts and containers and update the cache + + } catch (SQLException e) { + logger.error("Fetching Accounts from MySql DB failed", e); + } + } + + @Override + public Account getAccountById(short accountId) { + rwLock.readLock().lock(); + try { + return accountInfoMap.getAccountById(accountId); + } finally { + rwLock.readLock().unlock(); + } + } + + @Override + public Account getAccountByName(String accountName) { + rwLock.readLock().lock(); + try { + return accountInfoMap.getAccountByName(accountName); + } finally { + rwLock.readLock().unlock(); + } + } + + @Override + public boolean updateAccounts(Collection accounts) { + Objects.requireNonNull(accounts, "accounts cannot be null"); + if (accounts.isEmpty()) { + logger.debug("Empty account collection to update."); + return false; + } + + if (mySqlAccountStore == null) { + logger.info("MySql Account DB store is not accessible"); + return false; + } + + // TODO: Similar to HelixAccountServiceConfig.updateDisabled, we might need a config to disable account updates when needed + + if (hasDuplicateAccountIdOrName(accounts)) { + logger.debug("Duplicate account id or name exist in the accounts to update"); + //accountServiceMetrics.updateAccountErrorCount.inc(); + return false; + } + + // Make a pre check for conflict between the accounts to update and the accounts in the local cache. Will fail this + // update operation for all the accounts if any conflict exists. For existing accounts, there is a chance that the account to update + // conflicts with the accounts in the local cache, but does not conflict with those in the MySql database. This + // will happen if some accounts are updated but the local cache is not yet refreshed. + // TODO: Once we have APIs (and versioning) for updates at container granularity, we will need to check conflicts at container level. + rwLock.readLock().lock(); + try { + if (accountInfoMap.hasConflictingAccount(accounts)) { + logger.debug("Accounts={} conflict with the accounts in local cache. Cancel the update operation.", accounts); + //accountServiceMetrics.updateAccountErrorCount.inc(); + return false; + } + } finally { + rwLock.readLock().unlock(); + } + + try { + updateAccountsWithMySqlStore(accounts); + } catch (SQLException e) { + logger.error("Failed updating accounts={} in MySql DB", accounts, e); + // record failure, parse exception to figure out what we did wrong (eg. id or name collision). If it is id collision, + // retry with incrementing ID (Account ID generation logic is currently in nuage-ambry, we might need to move it here) + //accountServiceMetrics.updateAccountErrorCount.inc(); + return false; + } + + // update in-memory cache with accounts + rwLock.writeLock().lock(); + try { + accountInfoMap.updateAccounts(accounts); + } finally { + rwLock.writeLock().unlock(); + } + + // TODO: can notify account updates to other nodes via ZK . + + // TODO: persist updated accounts and max timestamp to local back up file. + + return true; + } + + @Override + public Collection getAllAccounts() { + rwLock.readLock().lock(); + try { + return accountInfoMap.getAccounts(); + } finally { + rwLock.readLock().unlock(); + } + } + + @Override + public boolean addAccountUpdateConsumer(Consumer> accountUpdateConsumer) { + return false; + } + + @Override + public boolean removeAccountUpdateConsumer(Consumer> accountUpdateConsumer) { + return false; + } + + @Override + public void close() throws IOException { + + } + + /** + * Updates MySql DB with added or modified {@link Account}s + * @param accounts collection of {@link Account}s + * @throws SQLException + */ + private void updateAccountsWithMySqlStore(Collection accounts) throws SQLException { + long startTimeMs = System.currentTimeMillis(); + logger.trace("Start updating accounts={} into MySql DB", accounts); + + // write Accounts and containers to MySql + for (Account account : accounts) { + if (getAccountById(account.getId()) == null) { + // new account (insert the containers and account into db tables) + mySqlAccountStore.addAccounts(Collections.singletonList(account)); + mySqlAccountStore.addContainers(account.getAllContainers()); + } else { + // existing account (update account table) + mySqlAccountStore.updateAccounts(Collections.singletonList(account)); + updateContainersWithMySqlStore(account.getId(), account.getAllContainers()); + } + } + + long timeForUpdate = System.currentTimeMillis() - startTimeMs; + logger.trace("Completed updating accounts into MySql DB, took time={} ms", timeForUpdate); + //accountServiceMetrics.updateAccountTimeInMs.update(timeForUpdate); + + } + + /** + * Updates MySql DB with added or modified {@link Container}s of a given account + * @param accountId id of the {@link Account} for the {@link Container}s + * @param containers collection of {@link Container}s + * @throws SQLException + */ + private void updateContainersWithMySqlStore(short accountId, Collection containers) throws SQLException { + //check if account ID should exist first in in-memory cache + Account accountInCache = accountInfoMap.getAccountById(accountId); + if (accountInCache == null) { + throw new IllegalArgumentException("Account with ID " + accountId + "doesn't exist"); + } + + for (Container container : containers) { + if (accountInfoMap.getContainerByIdForAccount(container.getParentAccountId(), container.getId()) == null) { + // new container added (insert into container table) + mySqlAccountStore.addContainers(Collections.singletonList(container)); + } else { + // existing container modified (update container table) + mySqlAccountStore.updateContainers(Collections.singletonList(container)); + } + } + } + +} diff --git a/ambry-account/src/main/java/com/github/ambry/account/MySqlAccountStore.java b/ambry-account/src/main/java/com/github/ambry/account/MySqlAccountStore.java new file mode 100644 index 0000000000..a48363cc3f --- /dev/null +++ b/ambry-account/src/main/java/com/github/ambry/account/MySqlAccountStore.java @@ -0,0 +1,103 @@ +package com.github.ambry.account; + +import com.github.ambry.account.mysql.AccountDao; +import com.github.ambry.account.mysql.ContainerDao; +import com.github.ambry.account.mysql.MySqlConfig; +import com.github.ambry.account.mysql.MySqlDataAccessor; +import java.sql.SQLException; +import java.util.Collection; +import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Wrapper class to handle MySql store operations on Account and Container tables + */ +public class MySqlAccountStore { + + private final MySqlDataAccessor mySqlDataAccessor; + private final AccountDao accountDao; + private final ContainerDao containerDao; + + public MySqlAccountStore(MySqlConfig config) throws SQLException { + mySqlDataAccessor = new MySqlDataAccessor(config); + accountDao = new AccountDao(mySqlDataAccessor); + containerDao = new ContainerDao(mySqlDataAccessor); + } + + /** + * Adds new {@link Account}s to Account table in MySql DB + * @param accounts collection of {@link Account}s to be inserted + * @throws SQLException + */ + public void addAccounts(Collection accounts) throws SQLException { + for (Account account : accounts) { + accountDao.addAccount(account); + } + } + + /** + * Adds new {@link Container}s to Container table in MySql DB + * @param containers collection of {@link Container}s to be inserted + * @throws SQLException + */ + public void addContainers(Collection containers) throws SQLException { + for (Container container : containers) { + containerDao.addContainer(container.getParentAccountId(), container); + } + } + + /** + * Updates existing {@link Account}s in Account table in MySql DB + * @param accounts collection of {@link Account}s to be updated + * @throws SQLException + */ + public void updateAccounts(Collection accounts) throws SQLException { + for (Account account : accounts) { + accountDao.updateAccount(account); + } + } + + /** + * Updates existing {@link Container}s in Container table in MySql DB + * @param containers collection of {@link Account}s to be updated + * @throws SQLException + */ + public void updateContainers(Collection containers) throws SQLException { + for (Container container : containers) { + containerDao.updateContainer(container.getParentAccountId(), container); + } + } + + /** + * Gets all {@link Account}s that have been created or modified since the specified time. + * @param updatedSince the last modified time used to filter. + * @return a list of {@link Account}s + * @throws SQLException + */ + public List getNewAccounts(long updatedSince) throws SQLException { + return accountDao.getNewAccounts(updatedSince); + } + + /** + * Gets all {@link Container}s that have been created or modified since the specified time. + * @param updatedSince the last modified time used to filter. + * @return a list of {@link Container}s + * @throws SQLException + */ + public List getNewContainers(long updatedSince) throws SQLException { + return containerDao.getNewContainers(updatedSince); + } + + /** + * Gets all {@link Container}s of a given account + * @param accountId ID of the account + * @return a list of {@link Container}s + * @throws SQLException + */ + public List getContainersByAccount(short accountId) throws SQLException { + return containerDao.getContainers(accountId); + } + +} diff --git a/ambry-account/src/main/java/com/github/ambry/account/mysql/AccountDao.java b/ambry-account/src/main/java/com/github/ambry/account/mysql/AccountDao.java index d74ecd289c..18806c3a86 100644 --- a/ambry-account/src/main/java/com/github/ambry/account/mysql/AccountDao.java +++ b/ambry-account/src/main/java/com/github/ambry/account/mysql/AccountDao.java @@ -15,8 +15,6 @@ import com.github.ambry.account.Account; import com.github.ambry.account.AccountSerdeUtils; -import com.github.ambry.account.Container; -import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; @@ -35,19 +33,23 @@ public class AccountDao { public static final String VERSION = "version"; public static final String CREATION_TIME = "creationTime"; public static final String LAST_MODIFIED_TIME = "lastModifiedTime"; + public static final String ACCOUNT_ID = "accountId"; private final MySqlDataAccessor dataAccessor; private final String insertSql; private final String getSinceSql; + private final String updateSql; public AccountDao(MySqlDataAccessor dataAccessor) { this.dataAccessor = dataAccessor; insertSql = String.format("insert into %s (%s, %s, %s, %s) values (?, ?, now(), now())", ACCOUNT_TABLE, ACCOUNT_INFO, VERSION, CREATION_TIME, LAST_MODIFIED_TIME); - getSinceSql = - String.format("select %s, %s from %s where %s > ?", ACCOUNT_INFO, LAST_MODIFIED_TIME, ACCOUNT_TABLE, - LAST_MODIFIED_TIME); + getSinceSql = String.format("select %s, %s from %s where %s > ?", ACCOUNT_INFO, LAST_MODIFIED_TIME, ACCOUNT_TABLE, + LAST_MODIFIED_TIME); + updateSql = + String.format("update %s set %s = ?, %s = ?, %s = now() where %s = ? ", ACCOUNT_TABLE, ACCOUNT_INFO, VERSION, + LAST_MODIFIED_TIME, ACCOUNT_ID); } /** @@ -76,23 +78,38 @@ public void addAccount(Account account) throws SQLException { * @throws SQLException */ public List getNewAccounts(long updatedSince) throws SQLException { - List accounts = new ArrayList<>(); Timestamp sinceTime = new Timestamp(updatedSince); PreparedStatement getSinceStatement = dataAccessor.getPreparedStatement(getSinceSql); getSinceStatement.setTimestamp(1, sinceTime); try (ResultSet rs = getSinceStatement.executeQuery()) { - while (rs.next()) { - String accountJson = rs.getString(ACCOUNT_INFO); - Timestamp lastModifiedTime = rs.getTimestamp(LAST_MODIFIED_TIME); - Account account = AccountSerdeUtils.accountFromJson(accountJson); - //account.setLastModifiedTime(lastModifiedTime); - accounts.add(account); - } - return accounts; + return convertResultSet(rs); + } + } + + public void updateAccount(Account account) throws SQLException { + try { + PreparedStatement updateStatement = dataAccessor.getPreparedStatement(updateSql); + updateStatement.setString(1, AccountSerdeUtils.accountToJson(account, true)); + updateStatement.setInt(2, account.getSnapshotVersion()); + updateStatement.setInt(3, account.getId()); + updateStatement.executeUpdate(); } catch (SQLException e) { - // record failure, parse exception, ... + // TODO: record failure, parse exception to figure out what we did wrong (eg. id or name collision) + // For now, assume connection issue. dataAccessor.reset(); throw e; } } + + public List convertResultSet(ResultSet rs) throws SQLException { + List accounts = new ArrayList<>(); + while (rs.next()) { + String accountJson = rs.getString(ACCOUNT_INFO); + Timestamp lastModifiedTime = rs.getTimestamp(LAST_MODIFIED_TIME); + Account account = AccountSerdeUtils.accountFromJson(accountJson); + //account.setLastModifiedTime(lastModifiedTime); + accounts.add(account); + } + return accounts; + } } diff --git a/ambry-account/src/main/java/com/github/ambry/account/mysql/ContainerDao.java b/ambry-account/src/main/java/com/github/ambry/account/mysql/ContainerDao.java index bf784c23ee..306d47f821 100644 --- a/ambry-account/src/main/java/com/github/ambry/account/mysql/ContainerDao.java +++ b/ambry-account/src/main/java/com/github/ambry/account/mysql/ContainerDao.java @@ -15,7 +15,7 @@ import com.github.ambry.account.AccountSerdeUtils; import com.github.ambry.account.Container; -import java.sql.Connection; +import com.github.ambry.account.Account; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; @@ -41,6 +41,7 @@ public class ContainerDao { private final String insertSql; private final String getSinceSql; private final String getByAccountSql; + private final String updateSql; public ContainerDao(MySqlDataAccessor dataAccessor) { this.dataAccessor = dataAccessor; @@ -53,6 +54,8 @@ public ContainerDao(MySqlDataAccessor dataAccessor) { getByAccountSql = String.format("select %s, %s, %s from %s where %s = ?", ACCOUNT_ID, CONTAINER_INFO, LAST_MODIFIED_TIME, CONTAINER_TABLE, ACCOUNT_ID); + updateSql = String.format("update %s set %s = ?, %s = 1, %s = now() where %s = ? AND %s = ? ", CONTAINER_TABLE, + CONTAINER_INFO, VERSION, LAST_MODIFIED_TIME, ACCOUNT_ID, CONTAINER_ID); } /** @@ -82,6 +85,22 @@ public void addContainer(int accountId, Container container) throws SQLException * @return a list of {@link Container}. * @throws SQLException */ + public void updateContainer(int accountId, Container container) throws SQLException { + try { + // Note: assuming autocommit for now + PreparedStatement updateStatement = dataAccessor.getPreparedStatement(updateSql); + updateStatement.setString(1, AccountSerdeUtils.containerToJson(container)); + updateStatement.setInt(2, accountId); + updateStatement.setInt(3, container.getId()); + updateStatement.executeUpdate(); + } catch (SQLException e) { + // TODO: record failure, parse exception to figure out what we did wrong (eg. id or name collision) + // For now, assume connection issue. + dataAccessor.reset(); + throw e; + } + } + public List getContainers(int accountId) throws SQLException { PreparedStatement getByAccountStatement = dataAccessor.getPreparedStatement(getByAccountSql); getByAccountStatement.setInt(1, accountId); diff --git a/ambry-account/src/main/java/com/github/ambry/account/mysql/MySqlDataAccessor.java b/ambry-account/src/main/java/com/github/ambry/account/mysql/MySqlDataAccessor.java index a33a2b1197..d0b0f59da2 100644 --- a/ambry-account/src/main/java/com/github/ambry/account/mysql/MySqlDataAccessor.java +++ b/ambry-account/src/main/java/com/github/ambry/account/mysql/MySqlDataAccessor.java @@ -98,7 +98,7 @@ public synchronized PreparedStatement getPreparedStatement(String sql) throws SQ * This should be called after a failed database operation. */ synchronized void reset() { - for (PreparedStatement statement: statementCache.values()) { + for (PreparedStatement statement : statementCache.values()) { try { statement.close(); } catch (SQLException e) { diff --git a/ambry-api/src/main/java/com/github/ambry/account/Account.java b/ambry-api/src/main/java/com/github/ambry/account/Account.java index 72ff575d7e..efbdc1436c 100644 --- a/ambry-api/src/main/java/com/github/ambry/account/Account.java +++ b/ambry-api/src/main/java/com/github/ambry/account/Account.java @@ -107,7 +107,7 @@ public class Account { private final short id; private final String name; private AccountStatus status; - private final int snapshotVersion; + private int snapshotVersion; // internal data structure private final Map containerIdToContainerMap = new HashMap<>(); private final Map containerNameToContainerMap = new HashMap<>(); @@ -231,6 +231,14 @@ public int getSnapshotVersion() { return snapshotVersion; } + public void setStatus(AccountStatus status) { + this.status = status; + } + + public void setSnapshotVersion(int snapshotVersion) { + this.snapshotVersion = snapshotVersion; + } + /** * Gets the {@link Container} of this account with the specified container id. * @param containerId The id of the container to get. @@ -309,7 +317,7 @@ public enum AccountStatus { * Adds a {@link Container} to this account and updates internal maps accordingly. * @param container The container to update this account. */ - public void updateContainerMap(Container container) { + private void updateContainerMap(Container container) { containerIdToContainerMap.put(container.getId(), container); containerNameToContainerMap.put(container.getName(), container); } @@ -361,4 +369,8 @@ private void checkDuplicateContainerNameOrId(Container container) { throw new IllegalStateException(errorMessage); } } + + void updateContainers(Collection containers) { + containers.forEach(this::updateContainerMap); + } } diff --git a/ambry-tools/src/main/java/com/github/ambry/account/MySqlAccountsDBTool.java b/ambry-tools/src/main/java/com/github/ambry/account/MySqlAccountsDBTool.java index 92cca6965e..10bbe1cc66 100644 --- a/ambry-tools/src/main/java/com/github/ambry/account/MySqlAccountsDBTool.java +++ b/ambry-tools/src/main/java/com/github/ambry/account/MySqlAccountsDBTool.java @@ -15,6 +15,8 @@ import java.sql.SQLException; import java.sql.Statement; import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -70,9 +72,8 @@ public class MySqlAccountsDBTool { static final String ACCOUNT_METADATA_MAP_KEY = "accountMetadata"; static final String RELATIVE_ACCOUNT_METADATA_PATH = "/account_metadata/full_data"; + private final MySqlAccountStore mySqlAccountStore; private final MySqlDataAccessor mySqlDataAccessor; - private final AccountDao accountDao; - private final ContainerDao containerDao; private final HelixPropertyStore helixPropertyStore; private final String fullZKAccountMetadataPath; @@ -171,8 +172,7 @@ public static void main(String[] args) throws IOException { public MySqlAccountsDBTool(VerifiableProperties verifiableProperties, String zkServer) throws SQLException { this.mySqlDataAccessor = new MySqlDataAccessor(new MySqlConfig(verifiableProperties)); - this.accountDao = new AccountDao(mySqlDataAccessor); - this.containerDao = new ContainerDao(mySqlDataAccessor); + this.mySqlAccountStore = new MySqlAccountStore(new MySqlConfig(verifiableProperties)); //Create helix property store HelixPropertyStoreConfig helixPropertyStoreConfig = new HelixPropertyStoreConfig(verifiableProperties); this.helixPropertyStore = CommonUtils.createHelixPropertyStore(zkServer, helixPropertyStoreConfig, null); @@ -212,10 +212,8 @@ public void initialize() throws SQLException { // Populate Account and Container tables for (Account account : accountInfoMap.getAccounts()) { - for (Container container : account.getAllContainers()) { - containerDao.addContainer(account.getId(), container); - } - accountDao.addAccount(account); + mySqlAccountStore.addContainers(account.getAllContainers()); + mySqlAccountStore.addAccounts(Collections.singletonList(account)); } logger.info("Initialized account metadata in DB from ZK path {}, took time={} ms", fullZKAccountMetadataPath, @@ -244,13 +242,12 @@ public void compare() throws SQLException { .collect(Collectors.toSet())); // Query the list of all Account from mysql - Set accountSetFromDB = new HashSet<>(accountDao.getNewAccounts(0)); + Set accountSetFromDB = new HashSet<>(mySqlAccountStore.getNewAccounts(0)); // Query the list of containers for each Account and add them to the Account accountSetFromDB.forEach(account -> { try { - containerDao.getContainers(account.getId()) - .forEach(account::updateContainerMap); + account.updateContainers(mySqlAccountStore.getContainersByAccount(account.getId())); } catch (SQLException e) { logger.error("MySQL querying containers failed", e); return; From d14bf24924fa80639f2f26570ee157421ea885ff Mon Sep 17 00:00:00 2001 From: ARUN Sai Date: Wed, 9 Sep 2020 18:19:01 -0700 Subject: [PATCH 2/3] Add missing license and ensure dataAcessor.reset() is called in all places --- .../ambry/account/MySqlAccountService.java | 18 +++++++++++++++++- .../ambry/account/MySqlAccountStore.java | 17 ++++++++++++++--- .../github/ambry/account/mysql/AccountDao.java | 5 +++++ .../ambry/account/mysql/ContainerDao.java | 1 - 4 files changed, 36 insertions(+), 5 deletions(-) diff --git a/ambry-account/src/main/java/com/github/ambry/account/MySqlAccountService.java b/ambry-account/src/main/java/com/github/ambry/account/MySqlAccountService.java index f84623e571..73c011b579 100644 --- a/ambry-account/src/main/java/com/github/ambry/account/MySqlAccountService.java +++ b/ambry-account/src/main/java/com/github/ambry/account/MySqlAccountService.java @@ -1,3 +1,17 @@ +/* + * Copyright 2020 LinkedIn Corp. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ + package com.github.ambry.account; import com.github.ambry.account.mysql.MySqlConfig; @@ -16,6 +30,9 @@ import static com.github.ambry.account.AccountUtils.*; +/** + * An implementation of {@link AccountService} that employs MySql database as its underlying storage. + */ public class MySqlAccountService implements AccountService { private MySqlAccountStore mySqlAccountStore = null; @@ -274,5 +291,4 @@ private void updateContainersWithMySqlStore(short accountId, Collection getNewContainers(long updatedSince) throws SQLException { public List getContainersByAccount(short accountId) throws SQLException { return containerDao.getContainers(accountId); } - } diff --git a/ambry-account/src/main/java/com/github/ambry/account/mysql/AccountDao.java b/ambry-account/src/main/java/com/github/ambry/account/mysql/AccountDao.java index 18806c3a86..58ba9af7a6 100644 --- a/ambry-account/src/main/java/com/github/ambry/account/mysql/AccountDao.java +++ b/ambry-account/src/main/java/com/github/ambry/account/mysql/AccountDao.java @@ -83,6 +83,11 @@ public List getNewAccounts(long updatedSince) throws SQLException { getSinceStatement.setTimestamp(1, sinceTime); try (ResultSet rs = getSinceStatement.executeQuery()) { return convertResultSet(rs); + } catch (SQLException e) { + // TODO: record failure, parse exception to figure out what we did wrong (eg. id or name collision) + // For now, assume connection issue. + dataAccessor.reset(); + throw e; } } diff --git a/ambry-account/src/main/java/com/github/ambry/account/mysql/ContainerDao.java b/ambry-account/src/main/java/com/github/ambry/account/mysql/ContainerDao.java index 306d47f821..1e4803d82e 100644 --- a/ambry-account/src/main/java/com/github/ambry/account/mysql/ContainerDao.java +++ b/ambry-account/src/main/java/com/github/ambry/account/mysql/ContainerDao.java @@ -15,7 +15,6 @@ import com.github.ambry.account.AccountSerdeUtils; import com.github.ambry.account.Container; -import com.github.ambry.account.Account; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; From 451aac45e93cfa9876c48a1063236da46492a90a Mon Sep 17 00:00:00 2001 From: ARUN Sai Date: Thu, 10 Sep 2020 19:16:05 -0700 Subject: [PATCH 3/3] Changes to: 1. Address Rob's comments 2. Scheduler thread to sync changes from DB 3. Add MySqlAccountServiceFactory and MySqlAccountServiceConfig --- .../github/ambry/account/AccountInfoMap.java | 24 +++--- .../ambry/account/MySqlAccountService.java | 73 ++++++++++------ .../account/MySqlAccountServiceFactory.java | 74 ++++++++++++++++ .../ambry/account/MySqlAccountStore.java | 7 +- .../ambry/account/mysql/AccountDao.java | 21 +++-- .../ambry/account/mysql/ContainerDao.java | 17 ++-- .../ambry/account/mysql/MySqlConfig.java | 47 ---------- .../account/mysql/MySqlDataAccessor.java | 17 ++-- .../ambry/account/mysql/AccountDaoTest.java | 10 ++- .../com/github/ambry/account/Account.java | 14 +-- .../config/MySqlAccountServiceConfig.java | 85 +++++++++++++++++++ .../github/ambry/account/DatabaseTest.java | 4 +- .../ambry/account/MySqlAccountsDBTool.java | 26 +++--- 13 files changed, 279 insertions(+), 140 deletions(-) create mode 100644 ambry-account/src/main/java/com/github/ambry/account/MySqlAccountServiceFactory.java delete mode 100644 ambry-account/src/main/java/com/github/ambry/account/mysql/MySqlConfig.java create mode 100644 ambry-api/src/main/java/com/github/ambry/config/MySqlAccountServiceConfig.java diff --git a/ambry-account/src/main/java/com/github/ambry/account/AccountInfoMap.java b/ambry-account/src/main/java/com/github/ambry/account/AccountInfoMap.java index 3aee5fc9be..0113beb18c 100644 --- a/ambry-account/src/main/java/com/github/ambry/account/AccountInfoMap.java +++ b/ambry-account/src/main/java/com/github/ambry/account/AccountInfoMap.java @@ -199,17 +199,18 @@ boolean hasConflictingAccount(Collection accountsToSet) { * @param accounts collection of {@link Account}s to be added. */ void updateAccounts(Collection accounts) { - for (Account account : accounts) { - Account accountToUpdate = idToAccountMap.get(account.getId()); - if (accountToUpdate == null) { - accountToUpdate = account; + for (Account updatedAccount : accounts) { + Account account = idToAccountMap.get(updatedAccount.getId()); + if (account == null) { + account = updatedAccount; } else { - accountToUpdate.setStatus(account.getStatus()); - accountToUpdate.setSnapshotVersion(account.getSnapshotVersion()); - accountToUpdate.updateContainers(account.getAllContainers()); + AccountBuilder accountBuilder = new AccountBuilder(account).status(updatedAccount.getStatus()) + .snapshotVersion(updatedAccount.getSnapshotVersion()); + updatedAccount.getAllContainers().forEach(accountBuilder::addOrUpdateContainer); + account = accountBuilder.build(); } - idToAccountMap.put(account.getId(), accountToUpdate); - nameToAccountMap.put(account.getName(), accountToUpdate); + idToAccountMap.put(account.getId(), account); + nameToAccountMap.put(account.getName(), account); } } @@ -234,7 +235,10 @@ void addContainer(short accountId, Container container) { if (parentAccount == null) { throw new IllegalArgumentException("Account with ID " + accountId + "doesn't exist"); } - idToAccountMap.get(accountId).updateContainers(Collections.singletonList(container)); + AccountBuilder accountBuilder = new AccountBuilder(parentAccount).addOrUpdateContainer(container); + parentAccount = accountBuilder.build(); + idToAccountMap.put(parentAccount.getId(), parentAccount); + nameToAccountMap.put(parentAccount.getName(), parentAccount); } /** diff --git a/ambry-account/src/main/java/com/github/ambry/account/MySqlAccountService.java b/ambry-account/src/main/java/com/github/ambry/account/MySqlAccountService.java index 73c011b579..f4b8f35d9e 100644 --- a/ambry-account/src/main/java/com/github/ambry/account/MySqlAccountService.java +++ b/ambry-account/src/main/java/com/github/ambry/account/MySqlAccountService.java @@ -14,13 +14,16 @@ package com.github.ambry.account; -import com.github.ambry.account.mysql.MySqlConfig; +import com.github.ambry.config.MySqlAccountServiceConfig; +import com.github.ambry.server.StatsSnapshot; import java.io.IOException; import java.sql.SQLException; import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Objects; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Consumer; @@ -28,6 +31,7 @@ import org.slf4j.LoggerFactory; import static com.github.ambry.account.AccountUtils.*; +import static com.github.ambry.utils.Utils.*; /** @@ -37,15 +41,18 @@ public class MySqlAccountService implements AccountService { private MySqlAccountStore mySqlAccountStore = null; private final AccountServiceMetrics accountServiceMetrics; - private final MySqlConfig mySqlConfig; + private final MySqlAccountServiceConfig config; // in-memory cache for storing account and container metadata private final AccountInfoMap accountInfoMap; private static final Logger logger = LoggerFactory.getLogger(MySqlAccountService.class); private final ReadWriteLock rwLock = new ReentrantReadWriteLock(); + private final ScheduledExecutorService scheduler; - public MySqlAccountService(AccountServiceMetrics accountServiceMetrics, MySqlConfig mySqlConfig) { + public MySqlAccountService(AccountServiceMetrics accountServiceMetrics, MySqlAccountServiceConfig config, + ScheduledExecutorService scheduler) { this.accountServiceMetrics = accountServiceMetrics; - this.mySqlConfig = mySqlConfig; + this.config = config; + this.scheduler = scheduler; accountInfoMap = new AccountInfoMap(accountServiceMetrics); try { createMySqlAccountStore(); @@ -57,9 +64,7 @@ public MySqlAccountService(AccountServiceMetrics accountServiceMetrics, MySqlCon // TODO: create backup manager to manage local back up copies of Account and container metadata and lastModifiedTime - initializeCache(); - - // TODO: Start background thread for periodically querying MYSql DB for added/modified accounts and containers. Also, retry creation of MySqlAccountStore if it failed above. + initialFetchAndSchedule(); // TODO: Subscribe to notifications from ZK } @@ -71,8 +76,9 @@ public MySqlAccountService(AccountServiceMetrics accountServiceMetrics, MySqlCon private void createMySqlAccountStore() throws SQLException { if (mySqlAccountStore == null) { try { - mySqlAccountStore = new MySqlAccountStore(this.mySqlConfig); + mySqlAccountStore = new MySqlAccountStore(config); } catch (SQLException e) { + // TODO: record failure, parse exception to figure out what we did wrong. If it is a non-transient error like credential issue, we should fail start up logger.error("MySQL account store creation failed", e); throw e; } @@ -80,22 +86,33 @@ private void createMySqlAccountStore() throws SQLException { } /** - * Call to initialize in-memory cache by fetching all the {@link Account}s and {@link Container}s metadata records. + * Initialize in-memory cache by fetching all the {@link Account}s and {@link Container}s metadata records. * It consists of 2 steps: * 1. Check local disk for back up copy and load metadata and last modified time of Accounts/Containers into cache. * 2. Fetch added/modified accounts and containers from mysql database since the last modified time (found in step 1) * and load into cache. */ - void initializeCache() { + private void initialFetchAndSchedule() { + // TODO: Check local disk for back up copy and load metadata and last modified time into cache. + + // Fetch added/modified accounts and containers from mysql db since LMT and update cache. fetchAndUpdateCache(); + + //Also, schedule to execute the logic periodically. + if (scheduler != null) { + scheduler.scheduleAtFixedRate(this::fetchAndUpdateCache, config.updaterPollingIntervalMs, + config.updaterPollingIntervalMs, TimeUnit.MILLISECONDS); + logger.info("Background account updater will fetch accounts from mysql db at intervals of {} ms", + config.updaterPollingIntervalMs); + } } /** * Fetches all the accounts and containers that have been created or modified since the last sync time and loads into * cache. */ - void fetchAndUpdateCache() { + private void fetchAndUpdateCache() { try { // Retry connection to mysql if we couldn't set up previously createMySqlAccountStore(); @@ -108,19 +125,12 @@ void fetchAndUpdateCache() { long lastModifiedTime = accountInfoMap.getLastModifiedTime(); try { - // Fetch all added/modified accounts from MySql database since LMT + // Fetch all added/modified accounts and containers from MySql database since LMT List accounts = mySqlAccountStore.getNewAccounts(lastModifiedTime); - rwLock.writeLock().lock(); - try { - accountInfoMap.updateAccounts(accounts); - } finally { - rwLock.writeLock().unlock(); - } - - // Fetch all added/modified containers from MySql database since LMT List containers = mySqlAccountStore.getNewContainers(lastModifiedTime); rwLock.writeLock().lock(); try { + accountInfoMap.updateAccounts(accounts); accountInfoMap.updateContainers(containers); } finally { rwLock.writeLock().unlock(); @@ -166,10 +176,13 @@ public boolean updateAccounts(Collection accounts) { return false; } - // TODO: Similar to HelixAccountServiceConfig.updateDisabled, we might need a config to disable account updates when needed + if (config.updateDisabled) { + logger.info("Updates has been disabled"); + return false; + } if (hasDuplicateAccountIdOrName(accounts)) { - logger.debug("Duplicate account id or name exist in the accounts to update"); + logger.error("Duplicate account id or name exist in the accounts to update"); //accountServiceMetrics.updateAccountErrorCount.inc(); return false; } @@ -178,11 +191,11 @@ public boolean updateAccounts(Collection accounts) { // update operation for all the accounts if any conflict exists. For existing accounts, there is a chance that the account to update // conflicts with the accounts in the local cache, but does not conflict with those in the MySql database. This // will happen if some accounts are updated but the local cache is not yet refreshed. - // TODO: Once we have APIs (and versioning) for updates at container granularity, we will need to check conflicts at container level. + // TODO: Once we have APIs (and versioning) for updating containers, we will need to check conflicts for containers as well. rwLock.readLock().lock(); try { if (accountInfoMap.hasConflictingAccount(accounts)) { - logger.debug("Accounts={} conflict with the accounts in local cache. Cancel the update operation.", accounts); + logger.error("Accounts={} conflict with the accounts in local cache. Cancel the update operation.", accounts); //accountServiceMetrics.updateAccountErrorCount.inc(); return false; } @@ -236,8 +249,16 @@ public boolean removeAccountUpdateConsumer(Consumer> account } @Override - public void close() throws IOException { + public void selectInactiveContainersAndMarkInZK(StatsSnapshot statsSnapshot) { + // TODO: Work with Sophie to implement this method in MySqlAccountService + throw new UnsupportedOperationException("This method is not supported"); + } + @Override + public void close() throws IOException { + if (scheduler != null) { + shutDownExecutorService(scheduler, config.updaterShutDownTimeoutMs, TimeUnit.MILLISECONDS); + } } /** @@ -275,7 +296,7 @@ private void updateAccountsWithMySqlStore(Collection accounts) throws S * @throws SQLException */ private void updateContainersWithMySqlStore(short accountId, Collection containers) throws SQLException { - //check if account ID should exist first in in-memory cache + //check for account ID in in-memory cache Account accountInCache = accountInfoMap.getAccountById(accountId); if (accountInCache == null) { throw new IllegalArgumentException("Account with ID " + accountId + "doesn't exist"); diff --git a/ambry-account/src/main/java/com/github/ambry/account/MySqlAccountServiceFactory.java b/ambry-account/src/main/java/com/github/ambry/account/MySqlAccountServiceFactory.java new file mode 100644 index 0000000000..cdd41799ce --- /dev/null +++ b/ambry-account/src/main/java/com/github/ambry/account/MySqlAccountServiceFactory.java @@ -0,0 +1,74 @@ +/* + * Copyright 2017 LinkedIn Corp. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ +package com.github.ambry.account; + +import com.codahale.metrics.MetricRegistry; +import com.github.ambry.config.MySqlAccountServiceConfig; +import com.github.ambry.config.VerifiableProperties; +import com.github.ambry.utils.Utils; +import java.util.concurrent.ScheduledExecutorService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * MySql based implementation of {@link AccountServiceFactory}. + *

+ * Returns a new instance of {@link MySqlAccountService} on {@link #getAccountService()} call. + */ +public class MySqlAccountServiceFactory implements AccountServiceFactory { + private static final String MYSQL_ACCOUNT_UPDATER_PREFIX = "mysql-account-updater"; + private static final Logger logger = LoggerFactory.getLogger(MySqlAccountServiceFactory.class); + protected final MySqlAccountServiceConfig accountServiceConfig; + protected final AccountServiceMetrics accountServiceMetrics; + + /** + * Constructor. + * @param verifiableProperties The properties to get a {@link MySqlAccountService} instance. Cannot be {@code null}. + * @param metricRegistry The {@link MetricRegistry} for metrics tracking. Cannot be {@code null}. + */ + public MySqlAccountServiceFactory(VerifiableProperties verifiableProperties, MetricRegistry metricRegistry) { + this(new MySqlAccountServiceConfig(verifiableProperties), new AccountServiceMetrics(metricRegistry)); + } + + /** + * Constructor. + * @param accountServiceConfig The {@link MySqlAccountServiceConfig} to use. + * @param accountServiceMetrics The {@link AccountServiceMetrics} to report metrics. + */ + protected MySqlAccountServiceFactory(MySqlAccountServiceConfig accountServiceConfig, + AccountServiceMetrics accountServiceMetrics) { + this.accountServiceConfig = accountServiceConfig; + this.accountServiceMetrics = accountServiceMetrics; + } + + @Override + public AccountService getAccountService() { + try { + long startTimeMs = System.currentTimeMillis(); + logger.info("Starting a MySqlAccountService"); + ScheduledExecutorService scheduler = + accountServiceConfig.updaterPollingIntervalMs > 0 ? Utils.newScheduler(1, MYSQL_ACCOUNT_UPDATER_PREFIX, false) + : null; + MySqlAccountService mySqlAccountService = + new MySqlAccountService(accountServiceMetrics, accountServiceConfig, scheduler); + long spentTimeMs = System.currentTimeMillis() - startTimeMs; + logger.info("MySqlAccountService started, took {} ms", spentTimeMs); + accountServiceMetrics.startupTimeInMs.update(spentTimeMs); + return mySqlAccountService; + } catch (Exception e) { + throw new IllegalStateException("Could not instantiate MySqlAccountService", e); + } + } +} diff --git a/ambry-account/src/main/java/com/github/ambry/account/MySqlAccountStore.java b/ambry-account/src/main/java/com/github/ambry/account/MySqlAccountStore.java index ad672dd65e..cc9e0d8423 100644 --- a/ambry-account/src/main/java/com/github/ambry/account/MySqlAccountStore.java +++ b/ambry-account/src/main/java/com/github/ambry/account/MySqlAccountStore.java @@ -16,8 +16,8 @@ import com.github.ambry.account.mysql.AccountDao; import com.github.ambry.account.mysql.ContainerDao; -import com.github.ambry.account.mysql.MySqlConfig; import com.github.ambry.account.mysql.MySqlDataAccessor; +import com.github.ambry.config.MySqlAccountServiceConfig; import java.sql.SQLException; import java.util.Collection; import java.util.List; @@ -28,12 +28,11 @@ */ public class MySqlAccountStore { - private final MySqlDataAccessor mySqlDataAccessor; private final AccountDao accountDao; private final ContainerDao containerDao; - public MySqlAccountStore(MySqlConfig config) throws SQLException { - mySqlDataAccessor = new MySqlDataAccessor(config); + public MySqlAccountStore(MySqlAccountServiceConfig config) throws SQLException { + MySqlDataAccessor mySqlDataAccessor = new MySqlDataAccessor(config); accountDao = new AccountDao(mySqlDataAccessor); containerDao = new ContainerDao(mySqlDataAccessor); } diff --git a/ambry-account/src/main/java/com/github/ambry/account/mysql/AccountDao.java b/ambry-account/src/main/java/com/github/ambry/account/mysql/AccountDao.java index 58ba9af7a6..ac97087849 100644 --- a/ambry-account/src/main/java/com/github/ambry/account/mysql/AccountDao.java +++ b/ambry-account/src/main/java/com/github/ambry/account/mysql/AccountDao.java @@ -74,7 +74,7 @@ public void addAccount(Account account) throws SQLException { /** * Gets all accounts that have been created or modified since the specified time. * @param updatedSince the last modified time used to filter. - * @return a list of {@link Account}. + * @return a list of {@link Account}s. * @throws SQLException */ public List getNewAccounts(long updatedSince) throws SQLException { @@ -91,6 +91,11 @@ public List getNewAccounts(long updatedSince) throws SQLException { } } + /** + * Updates an existing account in the database. + * @param account the account to update. + * @throws SQLException + */ public void updateAccount(Account account) throws SQLException { try { PreparedStatement updateStatement = dataAccessor.getPreparedStatement(updateSql); @@ -106,11 +111,17 @@ public void updateAccount(Account account) throws SQLException { } } - public List convertResultSet(ResultSet rs) throws SQLException { + /** + * Convert a query result set to a list of accounts. + * @param resultSet the result set. + * @return a list of {@link Account}s. + * @throws SQLException + */ + private List convertResultSet(ResultSet resultSet) throws SQLException { List accounts = new ArrayList<>(); - while (rs.next()) { - String accountJson = rs.getString(ACCOUNT_INFO); - Timestamp lastModifiedTime = rs.getTimestamp(LAST_MODIFIED_TIME); + while (resultSet.next()) { + String accountJson = resultSet.getString(ACCOUNT_INFO); + Timestamp lastModifiedTime = resultSet.getTimestamp(LAST_MODIFIED_TIME); Account account = AccountSerdeUtils.accountFromJson(accountJson); //account.setLastModifiedTime(lastModifiedTime); accounts.add(account); diff --git a/ambry-account/src/main/java/com/github/ambry/account/mysql/ContainerDao.java b/ambry-account/src/main/java/com/github/ambry/account/mysql/ContainerDao.java index 1e4803d82e..e88e680a56 100644 --- a/ambry-account/src/main/java/com/github/ambry/account/mysql/ContainerDao.java +++ b/ambry-account/src/main/java/com/github/ambry/account/mysql/ContainerDao.java @@ -53,6 +53,7 @@ public ContainerDao(MySqlDataAccessor dataAccessor) { getByAccountSql = String.format("select %s, %s, %s from %s where %s = ?", ACCOUNT_ID, CONTAINER_INFO, LAST_MODIFIED_TIME, CONTAINER_TABLE, ACCOUNT_ID); + // TODO: For update, take the version from Container object after adding the field in it. updateSql = String.format("update %s set %s = ?, %s = 1, %s = now() where %s = ? AND %s = ? ", CONTAINER_TABLE, CONTAINER_INFO, VERSION, LAST_MODIFIED_TIME, ACCOUNT_ID, CONTAINER_ID); } @@ -79,9 +80,9 @@ public void addContainer(int accountId, Container container) throws SQLException } /** - * Gets the containers in a specified account. - * @param accountId the id for the account. - * @return a list of {@link Container}. + * Updates a container in the database. + * @param accountId the container's parent account id. + * @param container the container to update. * @throws SQLException */ public void updateContainer(int accountId, Container container) throws SQLException { @@ -100,6 +101,12 @@ public void updateContainer(int accountId, Container container) throws SQLExcept } } + /** + * Gets the containers in a specified account. + * @param accountId the id for the parent account. + * @return a list of {@link Container}s. + * @throws SQLException + */ public List getContainers(int accountId) throws SQLException { PreparedStatement getByAccountStatement = dataAccessor.getPreparedStatement(getByAccountSql); getByAccountStatement.setInt(1, accountId); @@ -115,7 +122,7 @@ public List getContainers(int accountId) throws SQLException { /** * Gets all containers that have been created or modified since the specified time. * @param updatedSince the last modified time used to filter. - * @return a list of {@link Container}. + * @return a list of {@link Container}s. * @throws SQLException */ public List getNewContainers(long updatedSince) throws SQLException { @@ -134,7 +141,7 @@ public List getNewContainers(long updatedSince) throws SQLException { /** * Convert a query result set to a list of containers. * @param resultSet the result set. - * @return a list of containers. + * @return a list of {@link Container}s. * @throws SQLException */ private List convertResultSet(ResultSet resultSet) throws SQLException { diff --git a/ambry-account/src/main/java/com/github/ambry/account/mysql/MySqlConfig.java b/ambry-account/src/main/java/com/github/ambry/account/mysql/MySqlConfig.java deleted file mode 100644 index 47a3348f77..0000000000 --- a/ambry-account/src/main/java/com/github/ambry/account/mysql/MySqlConfig.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Copyright 2020 LinkedIn Corp. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - */ -package com.github.ambry.account.mysql; - -import com.github.ambry.config.Config; -import com.github.ambry.config.Default; -import com.github.ambry.config.VerifiableProperties; - - -/** - * Config for MySql database connection. - */ -public class MySqlConfig { - - public static final String MYSQL_URL = "mysql.url"; - public static final String MYSQL_USER = "mysql.user"; - public static final String MYSQL_PASSWORD = "mysql.password"; - /** - * Flag indicating whether to use DirectHttps CosmosDB connection mode. - * Provides better performance but may not work with all firewall settings. - */ - @Config(MYSQL_URL) - public final String mysqlUrl; - - @Config(MYSQL_USER) - public final String mysqlUser; - - @Config(MYSQL_PASSWORD) - public final String mysqlPassword; - - public MySqlConfig(VerifiableProperties verifiableProperties) { - mysqlUrl = verifiableProperties.getString(MYSQL_URL); - mysqlUser = verifiableProperties.getString(MYSQL_USER); - mysqlPassword = verifiableProperties.getString(MYSQL_PASSWORD); - } -} diff --git a/ambry-account/src/main/java/com/github/ambry/account/mysql/MySqlDataAccessor.java b/ambry-account/src/main/java/com/github/ambry/account/mysql/MySqlDataAccessor.java index d0b0f59da2..1bb91a1868 100644 --- a/ambry-account/src/main/java/com/github/ambry/account/mysql/MySqlDataAccessor.java +++ b/ambry-account/src/main/java/com/github/ambry/account/mysql/MySqlDataAccessor.java @@ -13,6 +13,7 @@ */ package com.github.ambry.account.mysql; +import com.github.ambry.config.MySqlAccountServiceConfig; import java.sql.Connection; import java.sql.Driver; import java.sql.DriverManager; @@ -39,12 +40,12 @@ public class MySqlDataAccessor { private final Map statementCache = new HashMap<>(); /** Production constructor */ - public MySqlDataAccessor(MySqlConfig config) throws SQLException { + public MySqlDataAccessor(MySqlAccountServiceConfig config) throws SQLException { // TODO: this will become a list of url's with info on which ones are writeable // Q: Can we assume url's share credentials or does each need its own? - mysqlUrl = config.mysqlUrl; - mysqlUser = config.mysqlUser; - mysqlPassword = config.mysqlPassword; + mysqlUrl = config.dbUrl; + mysqlUser = config.dbUser; + mysqlPassword = config.dbPassword; // Initialize driver mysqlDriver = DriverManager.getDriver(mysqlUrl); // AccountService needs to work if mysql is down. Mysql can also reboot. @@ -56,10 +57,10 @@ public MySqlDataAccessor(MySqlConfig config) throws SQLException { } /** Test constructor */ - public MySqlDataAccessor(MySqlConfig config, Driver mysqlDriver) { - mysqlUrl = config.mysqlUrl; - mysqlUser = config.mysqlUser; - mysqlPassword = config.mysqlPassword; + public MySqlDataAccessor(MySqlAccountServiceConfig config, Driver mysqlDriver) { + mysqlUrl = config.dbUrl; + mysqlUser = config.dbUser; + mysqlPassword = config.dbPassword; this.mysqlDriver = mysqlDriver; } diff --git a/ambry-account/src/test/java/com/github/ambry/account/mysql/AccountDaoTest.java b/ambry-account/src/test/java/com/github/ambry/account/mysql/AccountDaoTest.java index 8fffd18e96..24bafa06d9 100644 --- a/ambry-account/src/test/java/com/github/ambry/account/mysql/AccountDaoTest.java +++ b/ambry-account/src/test/java/com/github/ambry/account/mysql/AccountDaoTest.java @@ -16,6 +16,8 @@ import com.github.ambry.account.Account; import com.github.ambry.account.AccountBuilder; import com.github.ambry.account.AccountSerdeUtils; +import com.github.ambry.account.MySqlAccountService; +import com.github.ambry.config.MySqlAccountServiceConfig; import com.github.ambry.config.VerifiableProperties; import java.sql.Connection; import java.sql.Driver; @@ -72,10 +74,10 @@ static MySqlDataAccessor getDataAccessor(Connection mockConnection) throws SQLEx when(mockDriver.connect(anyString(), any(Properties.class))).thenReturn(mockConnection); //lenient().when(mockConnection.isValid(anyInt())).thenReturn(true); Properties properties = new Properties(); - properties.setProperty(MySqlConfig.MYSQL_URL, "jdbc:mysql://localhost/AccountMetadata"); - properties.setProperty(MySqlConfig.MYSQL_USER, "ambry"); - properties.setProperty(MySqlConfig.MYSQL_PASSWORD, "ambry"); - MySqlConfig config = new MySqlConfig(new VerifiableProperties(properties)); + properties.setProperty(MySqlAccountServiceConfig.DB_URL, "jdbc:mysql://localhost/AccountMetadata"); + properties.setProperty(MySqlAccountServiceConfig.DB_USER, "ambry"); + properties.setProperty(MySqlAccountServiceConfig.DB_PASSWORD, "ambry"); + MySqlAccountServiceConfig config = new MySqlAccountServiceConfig(new VerifiableProperties(properties)); MySqlDataAccessor dataAccessor = new MySqlDataAccessor(config, mockDriver); when(dataAccessor.getDatabaseConnection()).thenReturn(mockConnection); return dataAccessor; diff --git a/ambry-api/src/main/java/com/github/ambry/account/Account.java b/ambry-api/src/main/java/com/github/ambry/account/Account.java index efbdc1436c..7c39b61dd5 100644 --- a/ambry-api/src/main/java/com/github/ambry/account/Account.java +++ b/ambry-api/src/main/java/com/github/ambry/account/Account.java @@ -107,7 +107,7 @@ public class Account { private final short id; private final String name; private AccountStatus status; - private int snapshotVersion; + private final int snapshotVersion; // internal data structure private final Map containerIdToContainerMap = new HashMap<>(); private final Map containerNameToContainerMap = new HashMap<>(); @@ -231,14 +231,6 @@ public int getSnapshotVersion() { return snapshotVersion; } - public void setStatus(AccountStatus status) { - this.status = status; - } - - public void setSnapshotVersion(int snapshotVersion) { - this.snapshotVersion = snapshotVersion; - } - /** * Gets the {@link Container} of this account with the specified container id. * @param containerId The id of the container to get. @@ -369,8 +361,4 @@ private void checkDuplicateContainerNameOrId(Container container) { throw new IllegalStateException(errorMessage); } } - - void updateContainers(Collection containers) { - containers.forEach(this::updateContainerMap); - } } diff --git a/ambry-api/src/main/java/com/github/ambry/config/MySqlAccountServiceConfig.java b/ambry-api/src/main/java/com/github/ambry/config/MySqlAccountServiceConfig.java new file mode 100644 index 0000000000..f57edd2d4a --- /dev/null +++ b/ambry-api/src/main/java/com/github/ambry/config/MySqlAccountServiceConfig.java @@ -0,0 +1,85 @@ +/* + * Copyright 2020 LinkedIn Corp. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ +package com.github.ambry.config; + +/** + * Config for {@link MySqlAccountServiceConfig} + */ +public class MySqlAccountServiceConfig { + public static final String MYSQL_ACCOUNT_SERVICE_PREFIX = "mysql.account.service."; + public static final String DB_URL = MYSQL_ACCOUNT_SERVICE_PREFIX + "db.url"; + public static final String DB_USER = MYSQL_ACCOUNT_SERVICE_PREFIX + "db.user"; + public static final String DB_PASSWORD = MYSQL_ACCOUNT_SERVICE_PREFIX + "db.password"; + public static final String UPDATER_POLLING_INTERVAL_MS_KEY = + MYSQL_ACCOUNT_SERVICE_PREFIX + "updater.polling.interval.ms"; + public static final String UPDATER_SHUT_DOWN_TIMEOUT_MS_KEY = + MYSQL_ACCOUNT_SERVICE_PREFIX + "updater.shut.down.timeout.ms"; + public static final String BACKUP_DIRECTORY_KEY = MYSQL_ACCOUNT_SERVICE_PREFIX + "backup.dir"; + public static final String UPDATE_DISABLED = MYSQL_ACCOUNT_SERVICE_PREFIX + "update.disabled"; + + // TODO: Might need to take an array of URLs which would have one write (master) and multiple read urls (backup) + @Config(DB_URL) + @Default("") + public final String dbUrl; + + @Config(DB_USER) + @Default("") + public final String dbUser; + + @Config(DB_PASSWORD) + @Default("") + public final String dbPassword; + + /** + * The time interval in milli seconds between two consecutive account pulling for the background account updater of + * {@code MySqlAccountService}. Setting to 0 will disable it. + */ + @Config(UPDATER_POLLING_INTERVAL_MS_KEY) + @Default("2 * 1000") + public final int updaterPollingIntervalMs; + + /** + * The timeout in ms to shut down the account updater of {@code MySqlAccountService}. + */ + @Config(UPDATER_SHUT_DOWN_TIMEOUT_MS_KEY) + @Default("5 * 1000") + public final int updaterShutDownTimeoutMs; + + /** + * The directory on the local machine where account data backups will be stored before updating accounts. + * If this string is empty, backups will be disabled. + */ + @Config(BACKUP_DIRECTORY_KEY) + @Default("") + public final String backupDir; + + /** + * If true, MySqlAccountService would reject all the requests to update accounts. + */ + @Config(UPDATE_DISABLED) + @Default("false") + public final boolean updateDisabled; + + public MySqlAccountServiceConfig(VerifiableProperties verifiableProperties) { + dbUrl = verifiableProperties.getString(DB_URL, ""); + dbUser = verifiableProperties.getString(DB_USER, ""); + dbPassword = verifiableProperties.getString(DB_PASSWORD, ""); + updaterPollingIntervalMs = + verifiableProperties.getIntInRange(UPDATER_POLLING_INTERVAL_MS_KEY, 2 * 1000, 0, Integer.MAX_VALUE); + updaterShutDownTimeoutMs = + verifiableProperties.getIntInRange(UPDATER_SHUT_DOWN_TIMEOUT_MS_KEY, 5 * 1000, 1, Integer.MAX_VALUE); + backupDir = verifiableProperties.getString(BACKUP_DIRECTORY_KEY, ""); + updateDisabled = verifiableProperties.getBoolean(UPDATE_DISABLED, false); + } +} diff --git a/ambry-tools/src/main/java/com/github/ambry/account/DatabaseTest.java b/ambry-tools/src/main/java/com/github/ambry/account/DatabaseTest.java index 445e23c41e..e8012a9ef7 100644 --- a/ambry-tools/src/main/java/com/github/ambry/account/DatabaseTest.java +++ b/ambry-tools/src/main/java/com/github/ambry/account/DatabaseTest.java @@ -14,8 +14,8 @@ package com.github.ambry.account; import com.github.ambry.account.mysql.ContainerDao; -import com.github.ambry.account.mysql.MySqlConfig; import com.github.ambry.account.mysql.MySqlDataAccessor; +import com.github.ambry.config.MySqlAccountServiceConfig; import com.github.ambry.config.VerifiableProperties; import com.github.ambry.utils.Utils; import java.sql.Connection; @@ -54,7 +54,7 @@ public static void main(String args[]) { } private static void perfTest(VerifiableProperties verifiableProperties) throws Exception { - MySqlConfig config = new MySqlConfig(verifiableProperties); + MySqlAccountServiceConfig config = new MySqlAccountServiceConfig(verifiableProperties); MySqlDataAccessor dataAccessor = new MySqlDataAccessor(config); ContainerDao containerDao = new ContainerDao(dataAccessor); // Use high account id to avoid conflict diff --git a/ambry-tools/src/main/java/com/github/ambry/account/MySqlAccountsDBTool.java b/ambry-tools/src/main/java/com/github/ambry/account/MySqlAccountsDBTool.java index 10bbe1cc66..67b4b635cc 100644 --- a/ambry-tools/src/main/java/com/github/ambry/account/MySqlAccountsDBTool.java +++ b/ambry-tools/src/main/java/com/github/ambry/account/MySqlAccountsDBTool.java @@ -3,10 +3,10 @@ import com.codahale.metrics.MetricRegistry; import com.github.ambry.account.mysql.AccountDao; import com.github.ambry.account.mysql.ContainerDao; -import com.github.ambry.account.mysql.MySqlConfig; import com.github.ambry.account.mysql.MySqlDataAccessor; import com.github.ambry.commons.CommonUtils; import com.github.ambry.config.HelixPropertyStoreConfig; +import com.github.ambry.config.MySqlAccountServiceConfig; import com.github.ambry.config.VerifiableProperties; import com.github.ambry.tools.util.ToolUtils; import com.github.ambry.utils.SystemTime; @@ -15,7 +15,6 @@ import java.sql.SQLException; import java.sql.Statement; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -171,8 +170,8 @@ public static void main(String[] args) throws IOException { public MySqlAccountsDBTool(VerifiableProperties verifiableProperties, String zkServer) throws SQLException { - this.mySqlDataAccessor = new MySqlDataAccessor(new MySqlConfig(verifiableProperties)); - this.mySqlAccountStore = new MySqlAccountStore(new MySqlConfig(verifiableProperties)); + this.mySqlDataAccessor = new MySqlDataAccessor(new MySqlAccountServiceConfig(verifiableProperties)); + this.mySqlAccountStore = new MySqlAccountStore(new MySqlAccountServiceConfig(verifiableProperties)); //Create helix property store HelixPropertyStoreConfig helixPropertyStoreConfig = new HelixPropertyStoreConfig(verifiableProperties); this.helixPropertyStore = CommonUtils.createHelixPropertyStore(zkServer, helixPropertyStoreConfig, null); @@ -241,18 +240,13 @@ public void compare() throws SQLException { .map(accountString -> Account.fromJson(new JSONObject(accountString))) .collect(Collectors.toSet())); - // Query the list of all Account from mysql - Set accountSetFromDB = new HashSet<>(mySqlAccountStore.getNewAccounts(0)); - - // Query the list of containers for each Account and add them to the Account - accountSetFromDB.forEach(account -> { - try { - account.updateContainers(mySqlAccountStore.getContainersByAccount(account.getId())); - } catch (SQLException e) { - logger.error("MySQL querying containers failed", e); - return; - } - }); + // Query the list of all Account from mysql along with their containers + Set accountSetFromDB = new HashSet<>(); + for (Account account : mySqlAccountStore.getNewAccounts(0)) { + AccountBuilder accountBuilder = + new AccountBuilder(account).containers(mySqlAccountStore.getContainersByAccount(account.getId())); + accountSetFromDB.add(accountBuilder.build()); + } //Accounts missing (or different) in DB = accounts in ZK - accounts in DB accountSetFromZK.removeAll(accountSetFromDB);