1919from __future__ import annotations
2020
2121from nitric .exception import exception_from_grpc_error
22- from typing import List , Union
23- from enum import Enum
22+ from typing import List , Union , Literal
2423from grpclib import GRPCError
2524from nitric .api .queues import QueueRef , Queues
2625from nitric .application import Nitric
3332
3433from nitric .resources .resource import SecureResource
3534
36-
37- class QueuePermission (Enum ):
38- """Valid query expression operators."""
39-
40- sending = "sending"
41- receiving = "receiving"
42-
35+ QueuePermission = Literal ["sending" , "receiving" ]
4336
4437class Queue (SecureResource ):
4538 """A queue resource."""
@@ -53,20 +46,15 @@ def __init__(self, name: str):
5346 self .name = name
5447
5548 def _to_resource (self ) -> Resource :
56- return Resource (name = self .name , type = ResourceType .Queue )
49+ return Resource (name = self .name , type = ResourceType .Queue ) # type:ignore
5750
58- def _perms_to_actions (self , * args : Union [ QueuePermission , str ] ) -> List [Action ]:
59- permission_actions_map = {
60- QueuePermission . sending : [Action .QueueSend , Action .QueueList , Action .QueueDetail ],
61- QueuePermission . receiving : [Action .QueueReceive , Action .QueueList , Action .QueueDetail ],
51+ def _perms_to_actions (self , * args : QueuePermission ) -> List [int ]:
52+ permission_actions_map : dict [ QueuePermission , List [ int ]] = {
53+ " sending" : [Action .QueueSend , Action .QueueList , Action .QueueDetail ],
54+ " receiving" : [Action .QueueReceive , Action .QueueList , Action .QueueDetail ],
6255 }
63- # convert strings to the enum value where needed
64- perms = [
65- permission if isinstance (permission , QueuePermission ) else QueuePermission [permission .lower ()]
66- for permission in args
67- ]
6856
69- return [action for perm in perms for action in permission_actions_map [perm ]]
57+ return [action for perm in args for action in permission_actions_map [perm ]]
7058
7159 async def _register (self ):
7260 try :
@@ -76,7 +64,7 @@ async def _register(self):
7664 except GRPCError as grpc_err :
7765 raise exception_from_grpc_error (grpc_err )
7866
79- def allow (self , * args : Union [ QueuePermission , str ] ) -> QueueRef :
67+ def allow (self , * args : QueuePermission ) -> QueueRef :
8068 """Request the required permissions for this queue."""
8169 # Ensure registration of the resource is complete before requesting permissions.
8270 str_args = [str (permission ) for permission in args ]
0 commit comments