Skip to content

Commit c6a73fc

Browse files
committed
Updating the files based on the findings on the PR:
moved the function is_domain_blocked into its own utils refactoring the function can_domain_be_used
1 parent 14089dc commit c6a73fc

9 files changed

Lines changed: 160 additions & 56 deletions

File tree

app/blocked_domain_utils.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
from app.models import BlockedDomain
2+
3+
4+
def is_domain_blocked(user_id: int, domain: str) -> bool:
5+
"""checks whether the provided domain is blocked for a given user"""
6+
return BlockedDomain.filter_by(user_id=user_id, domain=domain).first() is not None

app/custom_domain_utils.py

Lines changed: 34 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
1-
import arrow
21
import re
3-
42
from dataclasses import dataclass
53
from enum import Enum
6-
from typing import List, Optional, Type
4+
from typing import List, Optional
5+
6+
import arrow
77

88
from app.constants import JobType
99
from app.db import Session
@@ -17,7 +17,7 @@
1717
Job,
1818
DomainMailbox,
1919
Alias,
20-
ModelMixin,
20+
BlockedDomain,
2121
)
2222
from app.user_audit_log_utils import emit_user_audit_log, UserAuditLogAction
2323

@@ -98,15 +98,11 @@ def sanitize_domain(domain: str) -> str:
9898
return new_domain
9999

100100

101-
def can_domain_be_used(
102-
user: User, domain: str, model_type: Type[ModelMixin]
103-
) -> Optional[CannotUseDomainReason]:
101+
def can_domain_be_used(user: User, domain: str) -> Optional[CannotUseDomainReason]:
104102
if not is_valid_domain(domain):
105103
return CannotUseDomainReason.InvalidDomain
106104
elif SLDomain.get_by(domain=domain):
107105
return CannotUseDomainReason.BuiltinDomain
108-
elif model_type.get_by(domain=domain):
109-
return CannotUseDomainReason.DomainAlreadyUsed
110106
elif get_email_domain_part(user.email) == domain:
111107
return CannotUseDomainReason.DomainPartOfUserEmail
112108
elif Mailbox.filter(
@@ -117,6 +113,34 @@ def can_domain_be_used(
117113
return None
118114

119115

116+
def can_custom_domain_be_used(
117+
user: User, domain: str
118+
) -> Optional[CannotUseDomainReason]:
119+
reason = can_domain_be_used(user, domain)
120+
121+
if reason is not None:
122+
return reason
123+
124+
if CustomDomain.get_by(domain=domain):
125+
return CannotUseDomainReason.DomainAlreadyUsed
126+
else:
127+
return None
128+
129+
130+
def can_blocked_domain_be_used(
131+
user: User, domain: str
132+
) -> Optional[CannotUseDomainReason]:
133+
reason = can_domain_be_used(user, domain)
134+
135+
if reason is not None:
136+
return reason
137+
138+
if BlockedDomain.get_by(domain=domain):
139+
return CannotUseDomainReason.DomainAlreadyUsed
140+
else:
141+
return None
142+
143+
120144
def create_custom_domain(
121145
user: User, domain: str, partner_id: Optional[int] = None
122146
) -> CreateCustomDomainResult:
@@ -127,7 +151,7 @@ def create_custom_domain(
127151
)
128152

129153
new_domain = sanitize_domain(domain)
130-
domain_forbidden_cause = can_domain_be_used(user, new_domain, CustomDomain)
154+
domain_forbidden_cause = can_custom_domain_be_used(user, new_domain)
131155
if domain_forbidden_cause:
132156
return CreateCustomDomainResult(
133157
message=domain_forbidden_cause.message(new_domain), message_category="error"

app/dashboard/views/setting.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
ALIAS_RANDOM_SUFFIX_LENGTH,
2121
CONNECT_WITH_PROTON,
2222
)
23-
from app.custom_domain_utils import sanitize_domain, can_domain_be_used
23+
from app.custom_domain_utils import sanitize_domain, can_blocked_domain_be_used
2424
from app.dashboard.base import dashboard_bp
2525
from app.db import Session
2626
from app.errors import ProtonPartnerNotSetUp
@@ -301,8 +301,8 @@ def setting():
301301
elif request.form.get("form-name") == "blocked-domains-add":
302302
domain = request.form.get("domain-name")
303303
new_domain = sanitize_domain(domain)
304-
domain_forbidden_cause = can_domain_be_used(
305-
current_user, new_domain, BlockedDomain
304+
domain_forbidden_cause = can_blocked_domain_be_used(
305+
current_user, new_domain
306306
)
307307

308308
if domain_forbidden_cause:

app/models.py

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1237,15 +1237,6 @@ def has_used_alias_from_partner(self) -> bool:
12371237
> 0
12381238
)
12391239

1240-
def is_domain_blocked(self, domain: str) -> bool:
1241-
"""checks whether the provided domain is blocked for a given user"""
1242-
domain_names = []
1243-
1244-
for blocked_domain in BlockedDomain.filter_by(user_id=self.id):
1245-
domain_names.append(blocked_domain.domain)
1246-
1247-
return any(domain in domain_name for domain_name in domain_names)
1248-
12491240
def __repr__(self):
12501241
return f"<User {self.id} {self.name} {self.email}>"
12511242

email_handler.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
change_alias_status,
6060
get_alias_recipient_name,
6161
)
62+
from app.blocked_domain_utils import is_domain_blocked
6263
from app.config import (
6364
EMAIL_DOMAIN,
6465
URL,
@@ -584,7 +585,7 @@ def handle_forward(envelope, msg: Message, rcpt_to: str) -> List[Tuple[bool, str
584585
return [(True, status.E209)]
585586

586587
mail_from_domain = get_email_domain_part(mail_from)
587-
if user.is_domain_blocked(mail_from_domain):
588+
if is_domain_blocked(user.id, mail_from_domain):
588589
# by default return 2** instead of 5** to allow user to receive emails again when domain is unblocked
589590
res_status = status.E200
590591
if user.block_behaviour == BlockBehaviourEnum.return_5xx:

migrations/versions/2025_020620_83676846e104_.py renamed to migrations/versions/2025_040814_c18048c40ed9_.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
"""empty message
22
3-
Revision ID: 83676846e104
4-
Revises: 20e7d3ca289a
5-
Create Date: 2025-02-06 20:35:26.475120
3+
Revision ID: c18048c40ed9
4+
Revises: 07855f9f39b1
5+
Create Date: 2025-04-08 14:43:10.097244
66
77
"""
88
import sqlalchemy_utils
@@ -11,8 +11,8 @@
1111

1212

1313
# revision identifiers, used by Alembic.
14-
revision = '83676846e104'
15-
down_revision = '20e7d3ca289a'
14+
revision = 'c18048c40ed9'
15+
down_revision = '07855f9f39b1'
1616
branch_labels = None
1717
depends_on = None
1818

tests/test_blocked_domain_utils.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
from app.blocked_domain_utils import is_domain_blocked
2+
from app.db import Session
3+
from app.models import BlockedDomain
4+
from tests.utils import create_new_user
5+
6+
7+
def setup():
8+
user = create_new_user()
9+
BlockedDomain.create(
10+
user_id=user.id,
11+
domain="example.com",
12+
flush=True,
13+
)
14+
Session.flush()
15+
16+
17+
def teardown_module():
18+
Session.query(BlockedDomain).delete()
19+
20+
21+
def test_domain_blocked_for_user():
22+
user = create_new_user()
23+
BlockedDomain.create(
24+
user_id=user.id,
25+
domain="example.com",
26+
flush=True,
27+
)
28+
Session.flush()
29+
30+
assert is_domain_blocked(user.id, "example.com")
31+
assert not is_domain_blocked(user.id, "some-other-example.com")

tests/test_custom_domain_utils.py

Lines changed: 78 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
CannotUseDomainReason,
1313
CannotSetCustomDomainMailboxesCause,
1414
count_custom_domain_aliases,
15+
can_blocked_domain_be_used,
16+
can_custom_domain_be_used,
1517
)
1618
from app.db import Session
1719
from app.models import User, CustomDomain, Mailbox, DomainMailbox, BlockedDomain, Alias
@@ -52,46 +54,110 @@ def test_is_valid_domain():
5254
# can_domain_be_used
5355
def test_can_domain_be_used():
5456
domain = f"{random_string(10)}.com"
55-
res = can_domain_be_used(user, domain, CustomDomain)
57+
res = can_domain_be_used(user, domain)
5658
assert res is None
5759

5860

59-
def test_can_domain_be_used_existing_domain():
61+
def test_can_domain_be_used_sl_domain():
62+
domain = ALIAS_DOMAINS[0]
63+
res = can_domain_be_used(user, domain)
64+
assert res is CannotUseDomainReason.BuiltinDomain
65+
66+
67+
def test_can_domain_be_used_domain_of_user_email():
68+
domain = user.email.split("@")[1]
69+
res = can_domain_be_used(user, domain)
70+
assert res is CannotUseDomainReason.DomainPartOfUserEmail
71+
72+
73+
def test_can_domain_be_used_domain_of_existing_mailbox():
74+
domain = random_domain()
75+
Mailbox.create(user_id=user.id, email=f"email@{domain}", verified=True, commit=True)
76+
res = can_domain_be_used(user, domain)
77+
assert res is CannotUseDomainReason.DomainUserInMailbox
78+
79+
80+
def test_can_domain_be_used_invalid_domain():
81+
domain = f"{random_string(10)}@lol.com"
82+
res = can_domain_be_used(user, domain)
83+
assert res is CannotUseDomainReason.InvalidDomain
84+
85+
86+
# can_custom_domain_be_used
87+
def test_can_custom_domain_be_used():
88+
domain = f"{random_string(10)}.com"
89+
res = can_custom_domain_be_used(user, domain)
90+
assert res is None
91+
92+
93+
def test_can_custom_domain_be_used_sl_domain():
94+
domain = ALIAS_DOMAINS[0]
95+
res = can_custom_domain_be_used(user, domain)
96+
assert res is CannotUseDomainReason.BuiltinDomain
97+
98+
99+
def test_can_custom_domain_be_used_domain_of_user_email():
100+
domain = user.email.split("@")[1]
101+
res = can_custom_domain_be_used(user, domain)
102+
assert res is CannotUseDomainReason.DomainPartOfUserEmail
103+
104+
105+
def test_can_custom_domain_be_used_domain_of_existing_mailbox():
106+
domain = random_domain()
107+
Mailbox.create(user_id=user.id, email=f"email@{domain}", verified=True, commit=True)
108+
res = can_custom_domain_be_used(user, domain)
109+
assert res is CannotUseDomainReason.DomainUserInMailbox
110+
111+
112+
def test_can_custom_domain_be_used_invalid_domain():
113+
domain = f"{random_string(10)}@lol.com"
114+
res = can_custom_domain_be_used(user, domain)
115+
assert res is CannotUseDomainReason.InvalidDomain
116+
117+
118+
def test_can_custom_domain_be_used_existing_domain():
60119
domain = random_domain()
61120
CustomDomain.create(user_id=user.id, domain=domain, commit=True)
62-
res = can_domain_be_used(user, domain, CustomDomain)
121+
res = can_custom_domain_be_used(user, domain)
63122
assert res is CannotUseDomainReason.DomainAlreadyUsed
64123

65124

66-
def test_can_domain_be_used_sl_domain():
125+
# can_blocked_domain_be_used
126+
def test_can_blocked_domain_be_used():
127+
domain = f"{random_string(10)}.com"
128+
res = can_blocked_domain_be_used(user, domain)
129+
assert res is None
130+
131+
132+
def test_can_blocked_domain_be_used_sl_domain():
67133
domain = ALIAS_DOMAINS[0]
68-
res = can_domain_be_used(user, domain, CustomDomain)
134+
res = can_blocked_domain_be_used(user, domain)
69135
assert res is CannotUseDomainReason.BuiltinDomain
70136

71137

72-
def test_can_domain_be_used_domain_of_user_email():
138+
def test_can_blocked_domain_be_used_domain_of_user_email():
73139
domain = user.email.split("@")[1]
74-
res = can_domain_be_used(user, domain, CustomDomain)
140+
res = can_blocked_domain_be_used(user, domain)
75141
assert res is CannotUseDomainReason.DomainPartOfUserEmail
76142

77143

78-
def test_can_domain_be_used_domain_of_existing_mailbox():
144+
def test_can_blocked_domain_be_used_domain_of_existing_mailbox():
79145
domain = random_domain()
80146
Mailbox.create(user_id=user.id, email=f"email@{domain}", verified=True, commit=True)
81-
res = can_domain_be_used(user, domain, CustomDomain)
147+
res = can_blocked_domain_be_used(user, domain)
82148
assert res is CannotUseDomainReason.DomainUserInMailbox
83149

84150

85-
def test_can_domain_be_used_invalid_domain():
151+
def test_can_blocked_domain_be_used_invalid_domain():
86152
domain = f"{random_string(10)}@lol.com"
87-
res = can_domain_be_used(user, domain, CustomDomain)
153+
res = can_blocked_domain_be_used(user, domain)
88154
assert res is CannotUseDomainReason.InvalidDomain
89155

90156

91157
def test_can_blocked_domain_be_used_existing_domain():
92158
domain = random_domain()
93159
BlockedDomain.create(user_id=user.id, domain=domain, commit=True)
94-
res = can_domain_be_used(user, domain, BlockedDomain)
160+
res = can_blocked_domain_be_used(user, domain)
95161
assert res is CannotUseDomainReason.DomainAlreadyUsed
96162

97163

tests/test_domains.py

Lines changed: 1 addition & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,12 @@
11
from app.db import Session
2-
from app.models import SLDomain, PartnerUser, AliasOptions, BlockedDomain
2+
from app.models import SLDomain, PartnerUser, AliasOptions
33
from app.proton.proton_partner import get_proton_partner
44
from init_app import add_sl_domains
55
from tests.utils import create_new_user, random_token
66

77

88
def setup_module():
99
Session.query(SLDomain).delete()
10-
Session.query(BlockedDomain).delete()
1110
SLDomain.create(
1211
domain="hidden", premium_only=False, flush=True, order=5, hidden=True
1312
)
@@ -34,7 +33,6 @@ def setup_module():
3433

3534
def teardown_module():
3635
Session.query(SLDomain).delete()
37-
Session.query(BlockedDomain).delete()
3836
add_sl_domains()
3937

4038

@@ -229,16 +227,3 @@ def test_get_free_partner_and_premium_partner():
229227
assert [d.domain for d in domains] == user.available_sl_domains(
230228
alias_options=options
231229
)
232-
233-
234-
def test_domain_blocked_for_user():
235-
user = create_new_user()
236-
BlockedDomain.create(
237-
user_id=user.id,
238-
domain="example.com",
239-
flush=True,
240-
)
241-
Session.flush()
242-
243-
assert user.is_domain_blocked("example.com")
244-
assert not user.is_domain_blocked("some-other-example.com")

0 commit comments

Comments
 (0)