Skip to content
This repository was archived by the owner on Feb 15, 2022. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion doc/source/poppy.manager.rst
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,3 @@
:members:
:undoc-members:
:show-inheritance:

141 changes: 141 additions & 0 deletions poppy/manager/default/ssl_certificate.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,27 @@ def __init__(self, manager):

def create_ssl_certificate(
self, project_id, cert_obj, https_upgrade=False):
"""Create a new certificate.

Create ssl certificate has two parts.
- Store the cert_obj details along with project id, flavor id,
domain name and certificate type into cassandra storage
- Submit a TaskFlow task to create an ssl certificate that
will communicate with the Provider and creates it.

:param unicode project_id: The project id
:param cert_obj: The certificate details
:type cert_obj: poppy.model.ssl_certificate.SSLCertificate
:param bool https_upgrade: (Default False)
Whether ot not to be upgraded to https

:return: Dictionary with wrapped arguments
:rtype: dict

:raises ValueError: if domain is not a valid non-root domain
:raises LookupError: if the supplied flavor does not exists
:raises ValueError: if the certificate already exists for the domain
"""

if (not validators.is_valid_domain_name(cert_obj.domain_name)) or \
(validators.is_root_domain(
Expand Down Expand Up @@ -86,6 +107,20 @@ def create_ssl_certificate(
return kwargs

def delete_ssl_certificate(self, project_id, domain_name, cert_type):
"""Delete an SSL certificate.

Submits a TaskFlow task to delete the certificate that will
communicate with Provider layer to delete the ssl certificate.

:param unicode project_id: The project id
:param unicode domain_name: The name of the domain
:param unicode cert_type: Type of the certificate to be deleted

:return: dict with cert, provider and other original inputs
:rtype: dict

:raises LookupError: if the flavor is not found
"""
cert_obj = self.storage.get_certs_by_domain(
domain_name, cert_type=cert_type)

Expand All @@ -110,6 +145,16 @@ def delete_ssl_certificate(self, project_id, domain_name, cert_type):
return kwargs

def get_certs_info_by_domain(self, domain_name, project_id):
"""Get the certificates information by domain.

:param unicode domain_name: Name of the domain
:param unicode project_id: The project id

:return: SSLCertificate object
:rtype: poppy.model.ssl_certificate.SSLCertificate

:raises ValueError: if cert info is not found
"""
try:
certs_info = self.storage.get_certs_by_domain(
domain_name=domain_name,
Expand All @@ -124,6 +169,11 @@ def get_certs_info_by_domain(self, domain_name, project_id):
raise e

def get_san_retry_list(self):
"""Get items from san_mapping_queue of provider.

:return: san_mapping_queue of provider
:rtype: dict
"""
if 'akamai' in self._driver.providers:
akamai_driver = self._driver.providers['akamai'].obj
res = akamai_driver.mod_san_queue.traverse_queue()
Expand All @@ -142,6 +192,15 @@ def get_san_retry_list(self):
]

def update_san_retry_list(self, queue_data_list):
"""Update provider's mod_san_queue with new items.

:param list queue_data_list: The list of new items to update with.
:return: tuple of lists of old and new items in mod_san_queue
:rtype: tuple(list, list)

:raises LookupError: if domain does not exists
:raises ValueError: if certificate already exists
"""
for r in queue_data_list:
service_obj = self.service_storage\
.get_service_details_by_domain_name(r['domain_name'])
Expand Down Expand Up @@ -180,6 +239,19 @@ def update_san_retry_list(self, queue_data_list):
return res, deleted

def rerun_san_retry_list(self):
"""Retry the mod san queue.

Obtain the items from mod san queue and build run_list and
ignore_list by iterating through this retry queue. For each
certificate in the run_list, submit a task to recreate the
ssl certificate.

:return: list of certificates that were recreated and list of
certificates that were ignored
:rtype: (list, list)

:raises LookupError: if the flavor is not found
"""
run_list = []
ignore_list = []
if 'akamai' in self._driver.providers:
Expand Down Expand Up @@ -310,6 +382,15 @@ def rerun_san_retry_list(self):
return run_list, ignore_list

def get_san_cert_configuration(self, san_cert_name):
"""Get SAN certificate configuration.

:param unicode san_cert_name: Name of the SAN certificate

:return: Configuration of the certificate
:rtype: dict

:raises ValueError: if not a valid certificate name
"""
if 'akamai' in self._driver.providers:
akamai_driver = self._driver.providers['akamai'].obj
if san_cert_name not in akamai_driver.san_cert_cnames:
Expand All @@ -326,6 +407,17 @@ def get_san_cert_configuration(self, san_cert_name):
return res

def update_san_cert_configuration(self, san_cert_name, new_cert_config):
"""Update configuration of SAN certificate.

:param unicode san_cert_name: Name of the SAN certificate
:param dict new_cert_config: New configuration to update with.

:return: The updated values of the certificate
:rtype: dict

:raises ValueError: if not a valid certificate
:raises RuntimeError: if unable to call SPS api
"""
if 'akamai' in self._driver.providers:
akamai_driver = self._driver.providers['akamai'].obj
if san_cert_name not in akamai_driver.san_cert_cnames:
Expand Down Expand Up @@ -361,6 +453,12 @@ def update_san_cert_configuration(self, san_cert_name, new_cert_config):
return res

def get_sni_cert_configuration(self, cert_name):
"""Get configuration for SNI certificates.

:param unicode cert_name: The name of the SNI certificate
:return: SNI certificate configuration
:rtype: dict
"""
if 'akamai' in self._driver.providers:
akamai_driver = self._driver.providers['akamai'].obj
self._validate_sni_cert_name(akamai_driver, cert_name)
Expand All @@ -372,6 +470,14 @@ def get_sni_cert_configuration(self, cert_name):
return res

def update_sni_cert_configuration(self, cert_name, new_cert_config):
"""Update SNI certificate's configuration.

:param unicode cert_name: SNI certificate name
:param dict new_cert_config: New configuration to update with

:return: The updated configuration of the certificate
:rtype: dict
"""
if 'akamai' in self._driver.providers:
akamai_driver = self._driver.providers['akamai'].obj
self._validate_sni_cert_name(akamai_driver, cert_name)
Expand All @@ -386,6 +492,11 @@ def update_sni_cert_configuration(self, cert_name, new_cert_config):
return res

def get_san_cert_hostname_limit(self):
"""Get SAN certificate hostname limit.

:return: Limit for the SAN cert host names
:rtype: dict
"""
if 'akamai' in self._driver.providers:
akamai_driver = self._driver.providers['akamai'].obj
res = akamai_driver.cert_info_storage.get_san_cert_hostname_limit()
Expand All @@ -398,13 +509,28 @@ def get_san_cert_hostname_limit(self):

@staticmethod
def _validate_sni_cert_name(provider_driver, cert_name):
"""Check whether a valid SNI certificate name.

:param provider_driver: The provider driver
:type provider_driver: poppy.provider.base.driver.ProviderDriverBase
:param unicode cert_name: The name of the SNI certificate

:raises ValueError: if not a valid SNI certificate name
"""
if cert_name not in provider_driver.sni_cert_cnames:
raise ValueError(
"{0} is not a valid sni cert, "
"valid sni certs are: {1}".format(
cert_name, provider_driver.sni_cert_cnames))

def set_san_cert_hostname_limit(self, request_json):
"""Set hostname limit for SAN certificate.

:param dict request_json: Dict containing the required limit

:return: The updated limit value
:rtype: int
"""
if 'akamai' in self._driver.providers:
try:
new_limit = request_json['san_cert_hostname_limit']
Expand All @@ -425,11 +551,26 @@ def set_san_cert_hostname_limit(self, request_json):
return res

def get_certs_by_status(self, status):
"""Get certificates by their status

:param unicode status: Status of the certificate to look for

:return: List of certificates matching the status
:rtype: list
"""
certs_by_status = self.storage.get_certs_by_status(status)

return certs_by_status

def update_certificate_status(self, domain_name, certificate_updates):
"""Update a certificate status.

:param unicode domain_name: The domain name
:param dict certificate_updates: New status of the cert to update with

:raises ValueError: if no certificate found
:raises CertificateStatusUpdateError: if unable to update certificate status
"""
certificate_old = self.storage.get_certs_by_domain(domain_name)
if not certificate_old:
raise ValueError(
Expand Down