-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathexam-python.js
More file actions
315 lines (283 loc) · 69.4 KB
/
exam-python.js
File metadata and controls
315 lines (283 loc) · 69.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
// Python/Django/FastAPI Programming Exam Questions - B2-C1 Level
const pythonQuestions = [
// Basic Questions (3) - B2 Level - More Technically Complex
{
question: "You're building a user authentication system for a Django e-commerce application. Users report that they sometimes get logged out unexpectedly and see other users' data. Here's the current implementation:\n\n<pre><code>def login_user(request):\n if request.method == 'POST':\n username = request.POST.get('username')\n password = request.POST.get('password')\n \n user = authenticate(username=username, password=password)\n if user:\n login(request, user)\n return redirect('dashboard')\n else:\n return render(request, 'login.html', {'error': 'Invalid credentials'})\n \n return render(request, 'login.html')\n\ndef get_user_profile(request, user_id):\n user = User.objects.get(id=user_id)\n return render(request, 'profile.html', {'user': user})</code></pre>\n\nWhat's causing the authentication and data isolation issues?",
options: ["No session management and user context validation", "Missing CSRF protection and input validation", "No proper error handling and user authorization", "All of the above"],
correct: 3,
level: "basic"
},
{
question: "You're building a file upload system for a document management application. Users report that large files fail to upload and sometimes the upload process gets stuck. Here's the current implementation:\n\n<pre><code>def upload_document(request):\n if request.method == 'POST':\n file = request.FILES.get('document')\n if file:\n # Save file to disk\n with open(f'uploads/{file.name}', 'wb+') as destination:\n for chunk in file.chunks():\n destination.write(chunk)\n \n # Save file info to database\n Document.objects.create(\n name=file.name,\n size=file.size,\n uploaded_by=request.user\n )\n return JsonResponse({'status': 'success'})\n \n return render(request, 'upload.html')</code></pre>\n\nWhat's causing the file upload issues?",
options: ["No file size limits and timeout handling", "Missing progress tracking and chunked upload", "No proper error handling and validation", "All of the above"],
correct: 3,
level: "basic"
},
{
question: "You're building a shopping cart system for an e-commerce website. Users report that cart items sometimes disappear or get duplicated when they add/remove items quickly. Here's the current implementation:\n\n<pre><code>def add_to_cart(request, product_id):\n product = get_object_or_404(Product, id=product_id)\n cart = request.session.get('cart', [])\n \n # Check if item already exists\n for item in cart:\n if item['product_id'] == product_id:\n item['quantity'] += 1\n break\n else:\n cart.append({\n 'product_id': product_id,\n 'name': product.name,\n 'price': float(product.price),\n 'quantity': 1\n })\n \n request.session['cart'] = cart\n return JsonResponse({'status': 'success'})</code></pre>\n\nWhat's causing the cart synchronization issues?",
options: ["Race condition in concurrent cart updates", "Missing transaction management", "No proper session handling", "All of the above"],
correct: 3,
level: "basic"
},
// Middle Questions (3) - B2+ Level - More Complex
{
question: "You're building a microservices architecture for a travel booking system using Django. The system needs to handle distributed transactions between booking, payment, and notification services. You're experiencing data consistency issues and partial failures. Here's the current implementation:\n\n<pre><code>@transaction.atomic\ndef create_booking(request):\n try:\n # Create booking\n booking = Booking.objects.create(\n customer=request.user,\n destination=request.POST.get('destination'),\n total_amount=request.POST.get('amount')\n )\n \n # Process payment via external service\n payment_result = payment_service.process_payment(\n amount=booking.total_amount,\n customer_id=request.user.id\n )\n \n # Send confirmation email\n notification_service.send_confirmation(\n email=request.user.email,\n booking_id=booking.id\n )\n \n return JsonResponse({'booking_id': booking.id})\n except Exception as e:\n return JsonResponse({'error': str(e)}, status=500)</code></pre>\n\nWhat's causing the distributed transaction and consistency issues?",
options: ["No distributed transaction management", "Missing compensation patterns for failures", "No proper error handling and rollback", "All of the above"],
correct: 3,
level: "middle"
},
{
question: "You're building a caching system for a high-traffic e-commerce application using Django with Redis. The system needs to handle product catalog, user sessions, and real-time inventory updates. You're experiencing cache invalidation issues and stale data problems. Here's the current implementation:\n\n<pre><code>def get_product(request, product_id):\n cache_key = f'product_{product_id}'\n \n # Try to get from cache\n product = cache.get(cache_key)\n if product is None:\n product = Product.objects.select_related('category').get(id=product_id)\n cache.set(cache_key, product, 3600) # Cache for 1 hour\n \n return render(request, 'product.html', {'product': product})\n\ndef update_product(request, product_id):\n product = Product.objects.get(id=product_id)\n product.name = request.POST.get('name')\n product.price = request.POST.get('price')\n product.save()\n \n # Invalidate cache\n cache.delete(f'product_{product_id}')\n \n return JsonResponse({'status': 'success'})</code></pre>\n\nWhat's causing the cache invalidation and data consistency issues?",
options: ["Incomplete cache invalidation strategy", "Missing cache warming and preloading", "No cache versioning and TTL management", "All of the above"],
correct: 3,
level: "middle"
},
{
question: "You're building a real-time notification system for a social media application using Django with WebSocket support. The system needs to handle user connections, message broadcasting, and presence tracking. You're experiencing connection drops and message delivery failures. Here's the current implementation:\n\n<pre><code>class NotificationConsumer(AsyncWebsocketConsumer):\n async def connect(self):\n self.user = self.scope['user']\n self.room_name = f'user_{self.user.id}'\n \n await self.channel_layer.group_add(\n self.room_name,\n self.channel_name\n )\n await self.accept()\n \n async def disconnect(self, close_code):\n await self.channel_layer.group_discard(\n self.room_name,\n self.channel_name\n )\n \n async def send_notification(self, event):\n await self.send(text_data=json.dumps({\n 'type': 'notification',\n 'message': event['message']\n }))</code></pre>\n\nWhat's causing the connection and message delivery issues?",
options: ["No connection health monitoring and reconnection logic", "Missing message queuing and retry mechanisms", "No proper session management and cleanup", "All of the above"],
correct: 3,
level: "middle"
},
// Advanced Questions (4) - C1 Level - High Complexity
{
question: "You're building a sophisticated event-driven architecture for a financial trading platform using Django with Celery. The system needs to handle high-frequency trading events, order processing, and risk management with strict latency requirements. You're experiencing performance bottlenecks and event ordering issues. Here's the current implementation:\n\n<pre><code>@shared_task\ndef process_order(order_data):\n try:\n # Process order\n order = Order.objects.create(\n symbol=order_data['symbol'],\n quantity=order_data['quantity'],\n price=order_data['price'],\n user_id=order_data['user_id']\n )\n \n # Risk assessment\n risk_result = risk_service.assess_risk(order)\n \n if risk_result.is_approved:\n order.status = 'APPROVED'\n order.save()\n \n # Publish order approved event\n order_approved.delay(order.id)\n else:\n order.status = 'REJECTED'\n order.save()\n \n return {'order_id': order.id, 'status': order.status}\n except Exception as e:\n logger.error(f'Order processing failed: {e}')\n return {'error': str(e)}</code></pre>\n\nWhat advanced Django patterns and potential issues are demonstrated here?",
options: ["Event-driven architecture with Celery, potential race conditions and ordering issues", "Proper use of async task processing with Celery", "Efficient order processing with risk management", "No issues - sophisticated trading system implementation"],
correct: 0,
level: "advanced"
},
{
question: "You're implementing a sophisticated security and compliance framework for a healthcare application using Django. The system needs to handle HIPAA compliance, audit logging, and data encryption with role-based access control. You're experiencing security vulnerabilities and compliance gaps. Here's the current implementation:\n\n<pre><code>@login_required\n@permission_required('healthcare.view_patient')\ndef get_patient(request, patient_id):\n patient = get_object_or_404(Patient, id=patient_id)\n \n # Log access\n AuditLog.objects.create(\n user=request.user,\n action='PATIENT_READ',\n resource_id=patient_id,\n ip_address=request.META.get('REMOTE_ADDR'),\n timestamp=timezone.now()\n )\n \n return render(request, 'patient.html', {'patient': patient})\n\n@login_required\n@permission_required('healthcare.add_patient')\ndef create_patient(request):\n if request.method == 'POST':\n # Encrypt sensitive data\n ssn = encryption_service.encrypt(request.POST.get('ssn'))\n \n patient = Patient.objects.create(\n name=request.POST.get('name'),\n ssn=ssn,\n created_by=request.user\n )\n \n # Log creation\n AuditLog.objects.create(\n user=request.user,\n action='PATIENT_CREATE',\n resource_id=patient.id,\n ip_address=request.META.get('REMOTE_ADDR'),\n timestamp=timezone.now()\n )\n \n return JsonResponse({'patient_id': patient.id})\n \n return render(request, 'create_patient.html')</code></pre>\n\nWhat advanced security patterns and potential issues are demonstrated here?",
options: ["HIPAA compliance with audit logging, potential data exposure in logs", "Proper role-based access control with permission decorators", "Data encryption for sensitive information", "No issues - sophisticated healthcare security implementation"],
correct: 0,
level: "advanced"
},
{
question: "You're building a sophisticated monitoring and observability system for a microservices architecture using Django with custom middleware. The system needs to handle distributed tracing, metrics collection, and alerting across multiple services. You're experiencing performance overhead and incomplete observability data. Here's the current implementation:\n\n<pre><code>class MetricsMiddleware:\n def __init__(self, get_response):\n self.get_response = get_response\n self.request_count = 0\n self.response_times = []\n \n def __call__(self, request):\n start_time = time.time()\n \n # Increment request counter\n self.request_count += 1\n \n # Process request\n response = self.get_response(request)\n \n # Calculate response time\n response_time = time.time() - start_time\n self.response_times.append(response_time)\n \n # Log metrics\n logger.info(f'Request: {request.method} {request.path} - '\n f'Status: {response.status_code} - '\n f'Time: {response_time:.3f}s')\n \n return response\n \n def get_metrics(self):\n return {\n 'total_requests': self.request_count,\n 'avg_response_time': sum(self.response_times) / len(self.response_times) if self.response_times else 0,\n 'max_response_time': max(self.response_times) if self.response_times else 0\n }</code></pre>\n\nWhat performance and observability issues exist in this implementation?",
options: ["Metrics collection overhead affects application performance", "Missing distributed tracing correlation", "Incomplete error tracking and alerting", "All of the above"],
correct: 3,
level: "advanced"
},
{
question: "You're implementing a sophisticated data processing pipeline for a real-time analytics platform using Django with background tasks. The system needs to handle stream processing, data transformation, and real-time aggregations with high throughput requirements. You're experiencing data loss and processing delays. Here's the current implementation:\n\n<pre><code>@shared_task\ndef process_analytics_data(event_data):\n try:\n # Transform event data\n processed_event = transform_event(event_data)\n \n # Update aggregations\n update_aggregations(processed_event)\n \n # Store in database\n AnalyticsEvent.objects.create(\n user_id=processed_event['user_id'],\n event_type=processed_event['event_type'],\n timestamp=processed_event['timestamp'],\n data=processed_event['data']\n )\n \n # Publish to downstream systems\n publish_to_downstream.delay(processed_event)\n \n except Exception as e:\n logger.error(f'Analytics processing failed: {e}')\n # Send to dead letter queue\n send_to_dead_letter.delay(event_data, str(e))\n\ndef update_aggregations(event):\n # Update in-memory aggregations\n key = f\"{event['user_id']}_{event['event_type']}\"\n if key not in pending_aggregations:\n pending_aggregations[key] = {\n 'user_id': event['user_id'],\n 'event_type': event['event_type'],\n 'count': 0,\n 'last_updated': event['timestamp']\n }\n \n pending_aggregations[key]['count'] += 1\n pending_aggregations[key]['last_updated'] = event['timestamp']</code></pre>\n\nWhat sophisticated Django patterns and potential issues are demonstrated here?",
options: ["Background task processing with Celery, potential data loss and memory issues", "Proper event-driven architecture with error handling", "Efficient real-time aggregation with in-memory processing", "No issues - optimal analytics processing implementation"],
correct: 0,
level: "advanced"
},
// Advanced Python Questions (5) - C1 Level - High Complexity
{
question: "You're implementing a sophisticated async context manager for database connection pooling in a high-traffic application. The system needs to handle connection lifecycle, error recovery, and resource cleanup. Here's the current implementation:\n\n<pre><code>import asyncio\nimport asyncpg\nfrom contextlib import asynccontextmanager\nfrom typing import AsyncGenerator\n\nclass DatabasePool:\n def __init__(self, dsn: str, min_size: int = 10, max_size: int = 20):\n self.dsn = dsn\n self.min_size = min_size\n self.max_size = max_size\n self.pool = None\n self._lock = asyncio.Lock()\n \n async def __aenter__(self) -> 'DatabasePool':\n async with self._lock:\n if self.pool is None:\n self.pool = await asyncpg.create_pool(\n self.dsn,\n min_size=self.min_size,\n max_size=self.max_size\n )\n return self\n \n async def __aexit__(self, exc_type, exc_val, exc_tb):\n if self.pool:\n await self.pool.close()\n self.pool = None\n \n @asynccontextmanager\n async def get_connection(self) -> AsyncGenerator[asyncpg.Connection, None]:\n if not self.pool:\n raise RuntimeError('Pool not initialized')\n \n conn = await self.pool.acquire()\n try:\n yield conn\n finally:\n await self.pool.release(conn)\n\n# Usage\nasync def process_data():\n async with DatabasePool('postgresql://...') as db:\n async with db.get_connection() as conn:\n result = await conn.fetch('SELECT * FROM users')\n return result</code></pre>\n\nWhat advanced Python patterns and potential issues are demonstrated here?",
options: ["Async context managers with proper resource management, potential deadlock in concurrent access", "Proper use of asyncio and connection pooling", "Efficient database connection handling with cleanup", "No issues - optimal async database implementation"],
correct: 0,
level: "advanced"
},
{
question: "You're building a sophisticated metaclass system for automatic API validation and serialization in a microservices architecture. The system needs to handle dynamic class creation, method decoration, and type validation. Here's the current implementation:\n\n<pre><code>from typing import Any, Dict, Type, get_type_hints\nimport inspect\nfrom functools import wraps\n\nclass APIMeta(type):\n def __new__(cls, name: str, bases: tuple, attrs: dict) -> Type:\n # Create the class\n new_class = super().__new__(cls, name, bases, attrs)\n \n # Add validation to all methods\n for attr_name, attr_value in attrs.items():\n if callable(attr_value) and not attr_name.startswith('_'):\n setattr(new_class, attr_name, cls._add_validation(attr_value))\n \n return new_class\n \n @staticmethod\n def _add_validation(func):\n @wraps(func)\n def wrapper(self, *args, **kwargs):\n # Get type hints\n hints = get_type_hints(func)\n \n # Validate arguments\n for i, (arg_name, arg_type) in enumerate(hints.items()):\n if i < len(args):\n if not isinstance(args[i], arg_type):\n raise TypeError(f'Argument {arg_name} must be {arg_type.__name__}')\n \n # Validate keyword arguments\n for kwarg_name, kwarg_value in kwargs.items():\n if kwarg_name in hints:\n if not isinstance(kwarg_value, hints[kwarg_name]):\n raise TypeError(f'Argument {kwarg_name} must be {hints[kwarg_name].__name__}')\n \n return func(self, *args, **kwargs)\n return wrapper\n\nclass UserService(metaclass=APIMeta):\n def create_user(self, name: str, email: str, age: int) -> Dict[str, Any]:\n return {'name': name, 'email': email, 'age': age}\n \n def get_user(self, user_id: int) -> Dict[str, Any]:\n return {'id': user_id, 'name': 'John', 'email': 'john@example.com'}</code></pre>\n\nWhat advanced Python patterns and potential issues are demonstrated here?",
options: ["Metaclass-based automatic validation, potential performance overhead and complex debugging", "Proper use of type hints and decorators", "Efficient API validation with type checking", "No issues - optimal metaclass implementation"],
correct: 0,
level: "advanced"
},
{
question: "You're implementing a sophisticated caching decorator with TTL, cache invalidation, and distributed cache support for a high-performance web application. The system needs to handle cache warming, cache-aside pattern, and cache consistency. Here's the current implementation:\n\n<pre><code>import time\nimport hashlib\nimport json\nfrom functools import wraps\nfrom typing import Any, Callable, Optional\nimport redis\n\nclass CacheManager:\n def __init__(self, redis_client: redis.Redis):\n self.redis = redis_client\n self.default_ttl = 3600\n \n def cached(self, ttl: int = None, key_prefix: str = '', \n invalidate_on: list = None) -> Callable:\n def decorator(func: Callable) -> Callable:\n @wraps(func)\n def wrapper(*args, **kwargs) -> Any:\n # Generate cache key\n cache_key = self._generate_key(func, args, kwargs, key_prefix)\n \n # Try to get from cache\n cached_result = self.redis.get(cache_key)\n if cached_result:\n return json.loads(cached_result)\n \n # Execute function and cache result\n result = func(*args, **kwargs)\n \n # Store in cache with TTL\n self.redis.setex(\n cache_key,\n ttl or self.default_ttl,\n json.dumps(result, default=str)\n )\n \n # Set up invalidation triggers\n if invalidate_on:\n for trigger in invalidate_on:\n self.redis.sadd(f'invalidate:{trigger}', cache_key)\n \n return result\n \n # Add cache invalidation method\n wrapper.invalidate = lambda: self._invalidate_cache(func, args, kwargs, key_prefix)\n return wrapper\n return decorator\n \n def _generate_key(self, func: Callable, args: tuple, kwargs: dict, prefix: str) -> str:\n key_data = {\n 'func': func.__name__,\n 'args': args,\n 'kwargs': sorted(kwargs.items())\n }\n key_string = json.dumps(key_data, sort_keys=True)\n return f'{prefix}:{func.__name__}:{hashlib.md5(key_string.encode()).hexdigest()}'\n \n def _invalidate_cache(self, func: Callable, args: tuple, kwargs: dict, prefix: str) -> None:\n cache_key = self._generate_key(func, args, kwargs, prefix)\n self.redis.delete(cache_key)\n\n# Usage\ncache_manager = CacheManager(redis.Redis())\n\n@cache_manager.cached(ttl=1800, key_prefix='user', invalidate_on=['user_update'])\ndef get_user_profile(user_id: int) -> dict:\n # Expensive database operation\n time.sleep(0.1) # Simulate DB query\n return {'id': user_id, 'name': 'John', 'email': 'john@example.com'}</code></pre>\n\nWhat advanced Python patterns and potential issues are demonstrated here?",
options: ["Sophisticated caching with decorators, potential memory leaks and cache inconsistency", "Proper use of Redis and JSON serialization", "Efficient caching with TTL and invalidation", "No issues - optimal caching implementation"],
correct: 0,
level: "advanced"
},
{
question: "You're building a sophisticated event-driven system using Python's asyncio for real-time data processing. The system needs to handle event routing, backpressure, and error recovery with high throughput requirements. Here's the current implementation:\n\n<pre><code>import asyncio\nimport json\nfrom typing import Dict, List, Callable, Any\nfrom dataclasses import dataclass\nfrom enum import Enum\nimport logging\n\nclass EventType(Enum):\n USER_CREATED = 'user_created'\n ORDER_PLACED = 'order_placed'\n PAYMENT_PROCESSED = 'payment_processed'\n\n@dataclass\nclass Event:\n type: EventType\n data: Dict[str, Any]\n timestamp: float\n correlation_id: str\n\nclass EventBus:\n def __init__(self, max_queue_size: int = 10000):\n self.max_queue_size = max_queue_size\n self.queues: Dict[EventType, asyncio.Queue] = {\n event_type: asyncio.Queue(maxsize=max_queue_size) \n for event_type in EventType\n }\n self.handlers: Dict[EventType, List[Callable]] = {}\n self.running = False\n self.tasks: List[asyncio.Task] = []\n \n def subscribe(self, event_type: EventType, handler: Callable) -> None:\n if event_type not in self.handlers:\n self.handlers[event_type] = []\n self.handlers[event_type].append(handler)\n \n async def publish(self, event: Event) -> None:\n try:\n await asyncio.wait_for(\n self.queues[event.type].put(event),\n timeout=1.0\n )\n except asyncio.TimeoutError:\n logging.error(f'Event queue full, dropping event: {event.type}')\n \n async def start(self) -> None:\n self.running = True\n for event_type in EventType:\n task = asyncio.create_task(self._process_events(event_type))\n self.tasks.append(task)\n \n async def stop(self) -> None:\n self.running = False\n for task in self.tasks:\n task.cancel()\n await asyncio.gather(*self.tasks, return_exceptions=True)\n \n async def _process_events(self, event_type: EventType) -> None:\n while self.running:\n try:\n event = await self.queues[event_type].get()\n \n if event_type in self.handlers:\n for handler in self.handlers[event_type]:\n try:\n await handler(event)\n except Exception as e:\n logging.error(f'Handler error for {event_type}: {e}')\n \n self.queues[event_type].task_done()\n except asyncio.CancelledError:\n break\n except Exception as e:\n logging.error(f'Event processing error: {e}')\n\n# Usage\nasync def user_created_handler(event: Event) -> None:\n print(f'User created: {event.data}')\n\nasync def order_placed_handler(event: Event) -> None:\n print(f'Order placed: {event.data}')\n\nasync def main():\n event_bus = EventBus()\n \n # Subscribe handlers\n event_bus.subscribe(EventType.USER_CREATED, user_created_handler)\n event_bus.subscribe(EventType.ORDER_PLACED, order_placed_handler)\n \n # Start processing\n await event_bus.start()\n \n # Publish events\n await event_bus.publish(Event(\n type=EventType.USER_CREATED,\n data={'user_id': 1, 'name': 'John'},\n timestamp=time.time(),\n correlation_id='123'\n ))</code></pre>\n\nWhat advanced Python patterns and potential issues are demonstrated here?",
options: ["Event-driven architecture with asyncio, potential memory leaks and task management issues", "Proper use of asyncio queues and event handling", "Efficient event processing with backpressure handling", "No issues - optimal event-driven implementation"],
correct: 0,
level: "advanced"
},
{
question: "You're implementing a sophisticated dependency injection container for a microservices architecture using Python. The system needs to handle circular dependencies, lifecycle management, and configuration injection. Here's the current implementation:\n\n<pre><code>from typing import Type, TypeVar, Dict, Any, Callable, Optional\nfrom abc import ABC, abstractmethod\nimport inspect\nfrom functools import wraps\n\nT = TypeVar('T')\n\nclass Service(ABC):\n @abstractmethod\n def initialize(self) -> None:\n pass\n \n @abstractmethod\n def cleanup(self) -> None:\n pass\n\nclass DIContainer:\n def __init__(self):\n self._services: Dict[Type, Any] = {}\n self._singletons: Dict[Type, Any] = {}\n self._factories: Dict[Type, Callable] = {}\n self._lifecycle: Dict[Type, str] = {} # 'singleton' or 'transient'\n \n def register_singleton(self, interface: Type[T], implementation: Type[T]) -> None:\n self._services[interface] = implementation\n self._lifecycle[interface] = 'singleton'\n \n def register_transient(self, interface: Type[T], implementation: Type[T]) -> None:\n self._services[interface] = implementation\n self._lifecycle[interface] = 'transient'\n \n def register_factory(self, interface: Type[T], factory: Callable[[], T]) -> None:\n self._factories[interface] = factory\n self._lifecycle[interface] = 'factory'\n \n def get(self, interface: Type[T]) -> T:\n if interface in self._factories:\n return self._factories[interface]()\n \n if interface in self._singletons:\n return self._singletons[interface]\n \n if interface not in self._services:\n raise ValueError(f'Service {interface} not registered')\n \n implementation = self._services[interface]\n \n # Resolve dependencies\n instance = self._create_instance(implementation)\n \n # Handle lifecycle\n if self._lifecycle[interface] == 'singleton':\n self._singletons[interface] = instance\n \n return instance\n \n def _create_instance(self, implementation: Type[T]) -> T:\n # Get constructor parameters\n sig = inspect.signature(implementation.__init__)\n params = {}\n \n for param_name, param in sig.parameters.items():\n if param_name == 'self':\n continue\n \n if param.annotation != inspect.Parameter.empty:\n params[param_name] = self.get(param.annotation)\n \n return implementation(**params)\n \n def inject(self, func: Callable) -> Callable:\n @wraps(func)\n def wrapper(*args, **kwargs):\n # Get function parameters\n sig = inspect.signature(func)\n injected_kwargs = {}\n \n for param_name, param in sig.parameters.items():\n if param.annotation != inspect.Parameter.empty:\n injected_kwargs[param_name] = self.get(param.annotation)\n \n # Merge with provided kwargs\n injected_kwargs.update(kwargs)\n \n return func(*args, **injected_kwargs)\n return wrapper\n\n# Usage\nclass DatabaseService(Service):\n def __init__(self, connection_string: str):\n self.connection_string = connection_string\n \n def initialize(self) -> None:\n print(f'Database initialized: {self.connection_string}')\n \n def cleanup(self) -> None:\n print('Database connection closed')\n\nclass UserService(Service):\n def __init__(self, db: DatabaseService):\n self.db = db\n \n def initialize(self) -> None:\n print('User service initialized')\n \n def cleanup(self) -> None:\n print('User service cleaned up')\n\n# Setup\ncontainer = DIContainer()\ncontainer.register_singleton(DatabaseService, DatabaseService)\ncontainer.register_singleton(UserService, UserService)\n\n@container.inject\ndef process_user(user_service: UserService, user_id: int) -> str:\n return f'Processing user {user_id} with {user_service}'</code></pre>\n\nWhat advanced Python patterns and potential issues are demonstrated here?",
options: ["Dependency injection with reflection, potential circular dependency issues and performance overhead", "Proper use of type hints and decorators", "Efficient service lifecycle management", "No issues - optimal DI implementation"],
correct: 0,
level: "advanced"
},
// Live Coding Questions (5) - Practical Bug Finding and Fixing
{
question: "You're debugging a Django view that's supposed to handle user registration. The code is causing a 500 error and users can't register. Here's the implementation:\n\n<pre><code>def register_user(request):\n if request.method == 'POST':\n username = request.POST['username']\n email = request.POST['email']\n password = request.POST['password']\n \n # Create user\n user = User.objects.create_user(\n username=username,\n email=email,\n password=password\n )\n \n # Send welcome email\n send_mail(\n 'Welcome!',\n f'Welcome {username}!',\n 'noreply@example.com',\n [email]\n )\n \n return JsonResponse({'success': True})\n \n return render(request, 'register.html')</code></pre>\n\nWhat are the main issues causing the 500 error?",
options: ["Missing CSRF token, no error handling, and potential KeyError exceptions", "Missing form validation and user input sanitization", "No database transaction handling and email sending issues", "All of the above"],
correct: 3,
level: "basic"
},
{
question: "You're reviewing this Django model that's causing performance issues in production. The application is slow when loading user profiles:\n\n<pre><code>class UserProfile(models.Model):\n user = models.OneToOneField(User, on_delete=models.CASCADE)\n bio = models.TextField()\n avatar = models.ImageField(upload_to='avatars/')\n \n def get_recent_posts(self):\n return Post.objects.filter(author=self.user).order_by('-created_at')[:5]\n \n def get_followers_count(self):\n return Follow.objects.filter(following=self.user).count()\n \n def get_following_count(self):\n return Follow.objects.filter(follower=self.user).count()\n \n def is_following(self, other_user):\n return Follow.objects.filter(\n follower=self.user, \n following=other_user\n ).exists()\n\n# In the view:\ndef user_profile(request, user_id):\n profile = UserProfile.objects.get(user_id=user_id)\n recent_posts = profile.get_recent_posts()\n followers_count = profile.get_followers_count()\n following_count = profile.get_following_count()\n \n return render(request, 'profile.html', {\n 'profile': profile,\n 'recent_posts': recent_posts,\n 'followers_count': followers_count,\n 'following_count': following_count\n })</code></pre>\n\nWhat's causing the performance issues?",
options: ["N+1 query problem with multiple database hits per profile", "Missing database indexes on foreign key fields", "Inefficient query patterns and lack of select_related/prefetch_related", "All of the above"],
correct: 3,
level: "middle"
},
{
question: "You're debugging a Django REST API endpoint that's supposed to return a list of products with filtering. The API is returning incorrect results and sometimes crashes:\n\n<pre><code>@api_view(['GET'])\ndef get_products(request):\n products = Product.objects.all()\n \n # Apply filters\n category = request.GET.get('category')\n if category:\n products = products.filter(category__name=category)\n \n min_price = request.GET.get('min_price')\n if min_price:\n products = products.filter(price__gte=min_price)\n \n max_price = request.GET.get('max_price')\n if max_price:\n products = products.filter(price__lte=max_price)\n \n # Sort by price\n sort_by = request.GET.get('sort', 'name')\n products = products.order_by(sort_by)\n \n # Serialize and return\n serializer = ProductSerializer(products, many=True)\n return Response(serializer.data)\n\nclass ProductSerializer(serializers.ModelSerializer):\n category_name = serializers.CharField(source='category.name')\n \n class Meta:\n model = Product\n fields = ['id', 'name', 'price', 'category_name']</code></pre>\n\nWhat are the main issues with this implementation?",
options: ["No input validation, potential SQL injection, and missing error handling", "Inefficient queries and potential N+1 problems", "No pagination and potential memory issues with large datasets", "All of the above"],
correct: 3,
level: "middle"
},
{
question: "You're reviewing this Python function that's supposed to process a large CSV file and save data to Django models. The function is consuming too much memory and running slowly:\n\n<pre><code>def import_products_from_csv(file_path):\n products = []\n \n with open(file_path, 'r') as file:\n reader = csv.DictReader(file)\n for row in reader:\n # Process each row\n product_data = {\n 'name': row['name'],\n 'price': float(row['price']),\n 'category': row['category'],\n 'description': row['description']\n }\n \n # Create product object\n product = Product(\n name=product_data['name'],\n price=product_data['price'],\n category=Category.objects.get(name=product_data['category']),\n description=product_data['description']\n )\n products.append(product)\n \n # Bulk create all products\n Product.objects.bulk_create(products)\n \n return len(products)</code></pre>\n\nWhat are the main performance and memory issues?",
options: ["Loading entire file into memory and individual database queries for categories", "No error handling and potential data type conversion issues", "Missing transaction management and potential data loss", "All of the above"],
correct: 3,
level: "advanced"
},
{
question: "You're debugging a Django Celery task that's supposed to send bulk emails. The task is failing and emails are not being sent:\n\n<pre><code>@shared_task\ndef send_bulk_emails(user_ids, subject, message):\n users = User.objects.filter(id__in=user_ids)\n \n for user in users:\n send_mail(\n subject=subject,\n message=message,\n from_email='noreply@example.com',\n recipient_list=[user.email]\n )\n \n return f'Sent emails to {len(users)} users'\n\n# Called from view:\ndef send_newsletter(request):\n if request.method == 'POST':\n subject = request.POST['subject']\n message = request.POST['message']\n \n # Get all active users\n user_ids = User.objects.filter(is_active=True).values_list('id', flat=True)\n \n # Send emails asynchronously\n send_bulk_emails.delay(user_ids, subject, message)\n \n return JsonResponse({'status': 'Emails queued'})</code></pre>\n\nWhat are the main issues with this implementation?",
options: ["No error handling, potential memory issues with large user lists, and blocking email sending", "Missing email validation and potential spam issues", "No progress tracking and potential task failure without notification", "All of the above"],
correct: 3,
level: "advanced"
},
// Additional Live Coding Debugging Questions (3) - Real-world Scenarios
{
question: "You're debugging a Django middleware that's supposed to log API requests. The middleware is causing the application to crash with a 500 error:\n\n<pre><code>class APILoggingMiddleware:\n def __init__(self, get_response):\n self.get_response = get_response\n \n def __call__(self, request):\n # Log request\n logger.info(f'API Request: {request.method} {request.path}')\n \n # Process request\n response = self.get_response(request)\n \n # Log response\n logger.info(f'API Response: {response.status_code}')\n \n return response\n \n def process_exception(self, request, exception):\n logger.error(f'API Exception: {exception}')\n return None</code></pre>\n\nWhat's causing the middleware to crash?",
options: ["Missing logger import and improper middleware structure", "No error handling in the main __call__ method", "Incorrect exception handling in process_exception", "All of the above"],
correct: 3,
level: "basic"
},
{
question: "You're reviewing this Django view that handles file uploads. Users report that large files cause memory errors and the server becomes unresponsive:\n\n<pre><code>def upload_large_file(request):\n if request.method == 'POST':\n file = request.FILES['file']\n \n # Read entire file into memory\n file_content = file.read()\n \n # Process file content\n processed_content = process_file_content(file_content)\n \n # Save to database\n FileUpload.objects.create(\n name=file.name,\n content=processed_content,\n size=file.size,\n user=request.user\n )\n \n return JsonResponse({'status': 'success'})\n \n return render(request, 'upload.html')\n\ndef process_file_content(content):\n # Simulate heavy processing\n return content.upper()</code></pre>\n\nWhat are the main issues with this implementation?",
options: ["Loading entire file into memory and blocking request processing", "No file size validation and missing error handling", "Inefficient file processing and potential memory leaks", "All of the above"],
correct: 3,
level: "middle"
},
{
question: "You're debugging a Django management command that's supposed to clean up old data. The command is running very slowly and sometimes times out:\n\n<pre><code>from django.core.management.base import BaseCommand\nfrom django.utils import timezone\nfrom datetime import timedelta\n\nclass Command(BaseCommand):\n help = 'Clean up old data'\n \n def handle(self, *args, **options):\n # Get old records\n old_date = timezone.now() - timedelta(days=30)\n old_records = OldData.objects.filter(created_at__lt=old_date)\n \n # Delete records one by one\n for record in old_records:\n record.delete()\n \n self.stdout.write(f'Deleted {old_records.count()} records')\n\nclass OldData(models.Model):\n name = models.CharField(max_length=100)\n data = models.TextField()\n created_at = models.DateTimeField(auto_now_add=True)\n \n def delete(self, *args, **kwargs):\n # Custom delete logic\n self.cleanup_related_data()\n super().delete(*args, **kwargs)\n \n def cleanup_related_data(self):\n # Heavy cleanup operation\n RelatedData.objects.filter(parent=self).delete()</code></pre>\n\nWhat are the main performance issues?",
options: ["Inefficient deletion loop and potential memory issues with large datasets", "Missing batch operations and no progress tracking", "Heavy cleanup operations in delete method and no transaction management", "All of the above"],
correct: 3,
level: "advanced"
},
// Advanced Django Questions (4) - C1 Level - High Complexity
{
question: "You're implementing a sophisticated Django ORM optimization system for a high-traffic e-commerce platform. The system needs to handle complex queries, N+1 problems, and database performance optimization. Here's the current implementation:\n\n<pre><code>from django.db import models\nfrom django.db.models import Prefetch, Q, F, Case, When, Value, CharField\nfrom django.db.models.functions import Concat, Coalesce\nfrom django.contrib.postgres.aggregates import ArrayAgg\nfrom django.contrib.postgres.fields import JSONField\n\nclass Category(models.Model):\n name = models.CharField(max_length=100)\n parent = models.ForeignKey('self', on_delete=models.CASCADE, null=True, blank=True)\n \nclass Product(models.Model):\n name = models.CharField(max_length=200)\n category = models.ForeignKey(Category, on_delete=models.CASCADE)\n price = models.DecimalField(max_digits=10, decimal_places=2)\n inventory = models.PositiveIntegerField(default=0)\n metadata = JSONField(default=dict)\n \nclass Order(models.Model):\n user = models.ForeignKey('auth.User', on_delete=models.CASCADE)\n created_at = models.DateTimeField(auto_now_add=True)\n total_amount = models.DecimalField(max_digits=10, decimal_places=2)\n \nclass OrderItem(models.Model):\n order = models.ForeignKey(Order, on_delete=models.CASCADE, related_name='items')\n product = models.ForeignKey(Product, on_delete=models.CASCADE)\n quantity = models.PositiveIntegerField()\n price = models.DecimalField(max_digits=10, decimal_places=2)\n\n# Complex query with multiple optimizations\ndef get_optimized_product_data():\n return Product.objects.select_related('category')\n .prefetch_related(\n Prefetch('orderitem_set', queryset=OrderItem.objects.select_related('order'))\n )\n .annotate(\n total_orders=Count('orderitem__order', distinct=True),\n avg_rating=Coalesce(Avg('reviews__rating'), Value(0)),\n category_path=Concat(\n 'category__name',\n Value(' > '),\n 'category__parent__name'\n ),\n inventory_status=Case(\n When(inventory__gt=100, then=Value('high')),\n When(inventory__gt=10, then=Value('medium')),\n default=Value('low'),\n output_field=CharField()\n ),\n related_products=ArrayAgg(\n 'category__product__name',\n distinct=True,\n filter=Q(category__product__inventory__gt=0)\n )\n )\n .filter(\n Q(category__name__icontains='electronics') |\n Q(metadata__contains={'featured': True})\n )\n .order_by('-total_orders', 'name')\n .only('name', 'price', 'inventory', 'category__name')\n\n# Custom manager with complex business logic\nclass ProductManager(models.Manager):\n def with_analytics(self):\n return self.get_queryset().annotate(\n conversion_rate=Case(\n When(total_orders__gt=0, then=F('total_orders') / F('views_count')),\n default=Value(0)\n ),\n revenue=F('price') * F('total_orders')\n )\n \n def search_products(self, query, filters=None):\n qs = self.get_queryset()\n \n if query:\n qs = qs.filter(\n Q(name__icontains=query) |\n Q(metadata__icontains=query)\n )\n \n if filters:\n if 'category' in filters:\n qs = qs.filter(category__name__in=filters['category'])\n if 'price_range' in filters:\n qs = qs.filter(price__range=filters['price_range'])\n \n return qs\n\nclass Product(models.Model):\n # ... fields ...\n objects = ProductManager()</code></pre>\n\nWhat advanced Django ORM patterns and potential issues are demonstrated here?",
options: ["Complex ORM optimization with annotations, potential N+1 queries and performance issues", "Proper use of select_related and prefetch_related", "Efficient database queries with custom managers", "No issues - optimal Django ORM implementation"],
correct: 0,
level: "advanced"
},
{
question: "You're building a sophisticated Django middleware system for a microservices architecture with distributed tracing, rate limiting, and security enforcement. The system needs to handle cross-cutting concerns across multiple services. Here's the current implementation:\n\n<pre><code>import time\nimport json\nimport hashlib\nfrom django.http import JsonResponse, HttpResponse\nfrom django.core.cache import cache\nfrom django.utils.deprecation import MiddlewareMixin\nfrom django.conf import settings\nimport logging\nfrom typing import Dict, Any\n\nclass DistributedTracingMiddleware(MiddlewareMixin):\n def __init__(self, get_response):\n self.get_response = get_response\n self.logger = logging.getLogger(__name__)\n \n def process_request(self, request):\n # Extract or generate trace ID\n trace_id = request.headers.get('X-Trace-ID')\n if not trace_id:\n trace_id = self._generate_trace_id()\n \n request.trace_id = trace_id\n request.start_time = time.time()\n \n # Log request start\n self.logger.info(json.dumps({\n 'trace_id': trace_id,\n 'event': 'request_start',\n 'method': request.method,\n 'path': request.path,\n 'user_id': getattr(request.user, 'id', None),\n 'ip': self._get_client_ip(request)\n }))\n \n def process_response(self, request, response):\n if hasattr(request, 'trace_id'):\n duration = time.time() - request.start_time\n \n # Log request completion\n self.logger.info(json.dumps({\n 'trace_id': request.trace_id,\n 'event': 'request_complete',\n 'status_code': response.status_code,\n 'duration': duration,\n 'response_size': len(response.content)\n }))\n \n # Add trace ID to response headers\n response['X-Trace-ID'] = request.trace_id\n \n return response\n \n def _generate_trace_id(self) -> str:\n return hashlib.md5(f'{time.time()}{id(self)}'.encode()).hexdigest()\n \n def _get_client_ip(self, request) -> str:\n x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')\n if x_forwarded_for:\n return x_forwarded_for.split(',')[0]\n return request.META.get('REMOTE_ADDR')\n\nclass RateLimitingMiddleware(MiddlewareMixin):\n def __init__(self, get_response):\n self.get_response = get_response\n self.rate_limits = {\n 'api/': {'requests': 100, 'window': 3600}, # 100 requests per hour\n 'api/auth/': {'requests': 10, 'window': 300}, # 10 requests per 5 minutes\n }\n \n def process_request(self, request):\n if not request.path.startswith('/api/'):\n return None\n \n client_id = self._get_client_identifier(request)\n \n for path_prefix, limits in self.rate_limits.items():\n if request.path.startswith(path_prefix):\n if self._is_rate_limited(client_id, path_prefix, limits):\n return JsonResponse({\n 'error': 'Rate limit exceeded',\n 'retry_after': limits['window']\n }, status=429)\n \n return None\n \n def _get_client_identifier(self, request) -> str:\n if request.user.is_authenticated:\n return f'user:{request.user.id}'\n return f'ip:{self._get_client_ip(request)}'\n \n def _is_rate_limited(self, client_id: str, path: str, limits: Dict[str, Any]) -> bool:\n cache_key = f'rate_limit:{path}:{client_id}'\n current_requests = cache.get(cache_key, 0)\n \n if current_requests >= limits['requests']:\n return True\n \n cache.set(cache_key, current_requests + 1, limits['window'])\n return False\n\nclass SecurityHeadersMiddleware(MiddlewareMixin):\n def __init__(self, get_response):\n self.get_response = get_response\n self.security_headers = {\n 'X-Content-Type-Options': 'nosniff',\n 'X-Frame-Options': 'DENY',\n 'X-XSS-Protection': '1; mode=block',\n 'Strict-Transport-Security': 'max-age=31536000; includeSubDomains',\n 'Content-Security-Policy': \"default-src 'self'; script-src 'self' 'unsafe-inline'\"\n }\n \n def process_response(self, request, response):\n for header, value in self.security_headers.items():\n if header not in response:\n response[header] = value\n \n return response</code></pre>\n\nWhat advanced Django middleware patterns and potential issues are demonstrated here?",
options: ["Sophisticated middleware with distributed tracing, potential performance overhead and memory leaks", "Proper use of middleware for cross-cutting concerns", "Efficient request/response processing with security", "No issues - optimal Django middleware implementation"],
correct: 0,
level: "advanced"
},
{
question: "You're implementing a sophisticated Django signals system for a complex business application with event-driven architecture. The system needs to handle custom signals, signal routing, and event processing with proper error handling. Here's the current implementation:\n\n<pre><code>from django.db.models.signals import post_save, pre_delete, m2m_changed\nfrom django.dispatch import Signal, receiver\nfrom django.core.mail import send_mail\nfrom django.template.loader import render_to_string\nfrom django.conf import settings\nimport logging\nfrom typing import Dict, Any\nimport asyncio\nfrom asgiref.sync import sync_to_async\n\n# Custom signals\norder_created = Signal(providing_args=['order', 'user'])\npayment_processed = Signal(providing_args=['payment', 'order'])\ninventory_updated = Signal(providing_args=['product', 'quantity_change'])\nuser_registered = Signal(providing_args=['user', 'registration_data'])\n\nclass SignalRouter:\n def __init__(self):\n self.handlers = {}\n self.logger = logging.getLogger(__name__)\n \n def register_handler(self, signal, handler, priority=0):\n if signal not in self.handlers:\n self.handlers[signal] = []\n \n self.handlers[signal].append({\n 'handler': handler,\n 'priority': priority\n })\n \n # Sort by priority (higher priority first)\n self.handlers[signal].sort(key=lambda x: x['priority'], reverse=True)\n \n def dispatch(self, signal, **kwargs):\n if signal not in self.handlers:\n return\n \n for handler_info in self.handlers[signal]:\n try:\n handler_info['handler'](signal, **kwargs)\n except Exception as e:\n self.logger.error(f'Signal handler error: {e}', exc_info=True)\n\n# Global signal router\nsignal_router = SignalRouter()\n\n# Signal handlers\n@receiver(post_save, sender='app.Order')\ndef order_post_save(sender, instance, created, **kwargs):\n if created:\n signal_router.dispatch(order_created, order=instance, user=instance.user)\n\n@receiver(order_created)\ndef send_order_confirmation(sender, order, user, **kwargs):\n # Send confirmation email\n subject = f'Order #{order.id} Confirmation'\n message = render_to_string('emails/order_confirmation.html', {\n 'order': order,\n 'user': user\n })\n \n send_mail(\n subject=subject,\n message='',\n html_message=message,\n from_email=settings.DEFAULT_FROM_EMAIL,\n recipient_list=[user.email]\n )\n\n@receiver(order_created)\ndef update_inventory(sender, order, user, **kwargs):\n for item in order.items.all():\n product = item.product\n product.inventory -= item.quantity\n product.save()\n \n signal_router.dispatch(\n inventory_updated,\n product=product,\n quantity_change=-item.quantity\n )\n\n@receiver(inventory_updated)\ndef check_low_inventory(sender, product, quantity_change, **kwargs):\n if product.inventory < 10: # Low inventory threshold\n # Send alert to admin\n send_mail(\n subject=f'Low Inventory Alert: {product.name}',\n message=f'Product {product.name} has only {product.inventory} units left.',\n from_email=settings.DEFAULT_FROM_EMAIL,\n recipient_list=[settings.ADMIN_EMAIL]\n )\n\n@receiver(payment_processed)\ndef process_payment_webhooks(sender, payment, order, **kwargs):\n if payment.status == 'completed':\n order.status = 'paid'\n order.save()\n \n # Trigger fulfillment process\n signal_router.dispatch('fulfillment_started', order=order)\n elif payment.status == 'failed':\n order.status = 'payment_failed'\n order.save()\n \n # Send payment failure notification\n signal_router.dispatch('payment_failed', order=order, payment=payment)\n\n# Async signal handler\n@receiver(user_registered)\ndef send_welcome_email_async(sender, user, registration_data, **kwargs):\n async def send_email():\n await sync_to_async(send_mail)(\n subject='Welcome to our platform!',\n message=f'Welcome {user.first_name}!',\n from_email=settings.DEFAULT_FROM_EMAIL,\n recipient_list=[user.email]\n )\n \n asyncio.create_task(send_email())\n\n# Custom signal decorator with error handling\ndef signal_handler(signal, priority=0, retry_on_error=False, max_retries=3):\n def decorator(func):\n def wrapper(sender, **kwargs):\n retries = 0\n while retries <= max_retries:\n try:\n return func(sender, **kwargs)\n except Exception as e:\n retries += 1\n if not retry_on_error or retries > max_retries:\n logging.error(f'Signal handler failed after {retries} retries: {e}')\n raise\n logging.warning(f'Signal handler retry {retries}: {e}')\n \n signal_router.register_handler(signal, wrapper, priority)\n return wrapper\n return decorator</code></pre>\n\nWhat advanced Django signals patterns and potential issues are demonstrated here?",
options: ["Sophisticated signal system with custom routing, potential memory leaks and circular dependencies", "Proper use of Django signals for event-driven architecture", "Efficient signal handling with error recovery", "No issues - optimal Django signals implementation"],
correct: 0,
level: "advanced"
},
{
question: "You're building a sophisticated Django REST API with advanced serialization, pagination, and filtering for a large-scale application. The system needs to handle complex data relationships, performance optimization, and API versioning. Here's the current implementation:\n\n<pre><code>from rest_framework import serializers, viewsets, pagination, filters\nfrom rest_framework.decorators import action\nfrom rest_framework.response import Response\nfrom rest_framework.permissions import IsAuthenticated\nfrom django_filters.rest_framework import DjangoFilterBackend\nfrom django.db.models import Q, Prefetch, Count, Avg\nfrom django.core.cache import cache\nimport json\nfrom typing import Dict, Any, List\n\nclass DynamicFieldsModelSerializer(serializers.ModelSerializer):\n \"\"\"Serializer that dynamically includes/excludes fields based on request context\"\"\"\n \n def __init__(self, *args, **kwargs):\n fields = kwargs.pop('fields', None)\n exclude = kwargs.pop('exclude', None)\n super().__init__(*args, **kwargs)\n \n if fields is not None:\n allowed = set(fields)\n existing = set(self.fields)\n for field_name in existing - allowed:\n self.fields.pop(field_name)\n \n if exclude is not None:\n for field_name in exclude:\n self.fields.pop(field_name, None)\n\nclass ProductSerializer(DynamicFieldsModelSerializer):\n category_name = serializers.CharField(source='category.name', read_only=True)\n total_orders = serializers.IntegerField(read_only=True)\n avg_rating = serializers.FloatField(read_only=True)\n inventory_status = serializers.SerializerMethodField()\n \n class Meta:\n model = Product\n fields = ['id', 'name', 'price', 'inventory', 'category', 'category_name',\n 'total_orders', 'avg_rating', 'inventory_status', 'metadata']\n \n def get_inventory_status(self, obj):\n if obj.inventory > 100:\n return 'high'\n elif obj.inventory > 10:\n return 'medium'\n return 'low'\n \n def to_representation(self, instance):\n data = super().to_representation(instance)\n \n # Add computed fields based on context\n if self.context.get('include_analytics'):\n data['conversion_rate'] = self._calculate_conversion_rate(instance)\n data['revenue'] = float(instance.price) * data['total_orders']\n \n return data\n \n def _calculate_conversion_rate(self, instance):\n # Complex business logic for conversion rate\n return 0.15 # Placeholder\n\nclass OrderSerializer(serializers.ModelSerializer):\n items = OrderItemSerializer(many=True, read_only=True)\n user_email = serializers.CharField(source='user.email', read_only=True)\n total_items = serializers.SerializerMethodField()\n \n class Meta:\n model = Order\n fields = ['id', 'user', 'user_email', 'created_at', 'total_amount',\n 'items', 'total_items', 'status']\n \n def get_total_items(self, obj):\n return sum(item.quantity for item in obj.items.all())\n\nclass CustomPagination(pagination.PageNumberPagination):\n page_size = 20\n page_size_query_param = 'page_size'\n max_page_size = 100\n \n def get_paginated_response(self, data):\n return Response({\n 'links': {\n 'next': self.get_next_link(),\n 'previous': self.get_previous_link()\n },\n 'count': self.page.paginator.count,\n 'total_pages': self.page.paginator.num_pages,\n 'current_page': self.page.number,\n 'results': data\n })\n\nclass ProductViewSet(viewsets.ModelViewSet):\n serializer_class = ProductSerializer\n pagination_class = CustomPagination\n filter_backends = [DjangoFilterBackend, filters.SearchFilter, filters.OrderingFilter]\n filterset_fields = ['category', 'inventory_status']\n search_fields = ['name', 'category__name', 'metadata']\n ordering_fields = ['name', 'price', 'created_at', 'total_orders']\n ordering = ['-created_at']\n \n def get_queryset(self):\n queryset = Product.objects.select_related('category')\n .prefetch_related('orderitem_set__order')\n .annotate(\n total_orders=Count('orderitem__order', distinct=True),\n avg_rating=Avg('reviews__rating')\n )\n \n # Apply custom filters\n category = self.request.query_params.get('category')\n if category:\n queryset = queryset.filter(category__name__icontains=category)\n \n price_min = self.request.query_params.get('price_min')\n if price_min:\n queryset = queryset.filter(price__gte=price_min)\n \n price_max = self.request.query_params.get('price_max')\n if price_max:\n queryset = queryset.filter(price__lte=price_max)\n \n return queryset\n \n def get_serializer_context(self):\n context = super().get_serializer_context()\n context['include_analytics'] = self.request.query_params.get('include_analytics') == 'true'\n return context\n \n @action(detail=True, methods=['get'])\n def analytics(self, request, pk=None):\n product = self.get_object()\n \n # Cache analytics data\n cache_key = f'product_analytics_{product.id}'\n analytics_data = cache.get(cache_key)\n \n if not analytics_data:\n analytics_data = {\n 'conversion_rate': self._calculate_conversion_rate(product),\n 'revenue': self._calculate_revenue(product),\n 'trending_score': self._calculate_trending_score(product)\n }\n cache.set(cache_key, analytics_data, 3600) # Cache for 1 hour\n \n return Response(analytics_data)\n \n @action(detail=False, methods=['get'])\n def bulk_update_inventory(self, request):\n # Bulk inventory update endpoint\n updates = request.data.get('updates', [])\n \n for update in updates:\n product_id = update.get('product_id')\n quantity = update.get('quantity')\n \n Product.objects.filter(id=product_id).update(inventory=quantity)\n \n return Response({'message': 'Inventory updated successfully'})\n \n def _calculate_conversion_rate(self, product):\n # Complex business logic\n return 0.15\n \n def _calculate_revenue(self, product):\n # Complex business logic\n return float(product.price) * 100\n \n def _calculate_trending_score(self, product):\n # Complex business logic\n return 85.5</code></pre>\n\nWhat advanced Django REST Framework patterns and potential issues are demonstrated here?",
options: ["Sophisticated API with dynamic serialization, potential N+1 queries and performance issues", "Proper use of DRF features for complex APIs", "Efficient API design with caching and filtering", "No issues - optimal Django REST API implementation"],
correct: 0,
level: "advanced"
}
];
// Limit the Python exam to a fixed subset of questions
const MAX_PYTHON_QUESTIONS = 10;
const selectedPythonQuestions = pythonQuestions.slice(0, MAX_PYTHON_QUESTIONS);
let currentAnswers = new Array(selectedPythonQuestions.length).fill(-1);
let examStarted = false;
let timeLeft = 600; // 600 seconds (10 minutes) for Python exam with live coding questions
let timerInterval;
document.addEventListener('DOMContentLoaded', function() {
const startBtn = document.getElementById('startBtn');
const submitBtn = document.getElementById('submitBtn');
const backToHome = document.getElementById('backToHome');
startBtn.addEventListener('click', startExam);
submitBtn.addEventListener('click', submitExam);
backToHome.addEventListener('click', () => window.location.href = 'index.html');
// Check if candidate info exists
const candidateInfo = localStorage.getItem('candidateInfo');
if (!candidateInfo) {
window.location.href = 'index.html';
}
});
function startExam() {
examStarted = true;
document.getElementById('startBtn').style.display = 'none';
document.getElementById('examContent').style.display = 'block';
document.getElementById('submitBtn').style.display = 'block';
renderQuestions();
startTimer();
}
function startTimer() {
timerInterval = setInterval(() => {
timeLeft--;
updateTimerDisplay();
if (timeLeft <= 0) {
clearInterval(timerInterval);
submitExam();
}
}, 1000);
}
function updateTimerDisplay() {
const minutes = Math.floor(timeLeft / 60);
const seconds = timeLeft % 60;
document.getElementById('timer').textContent = `Time: ${minutes.toString().padStart(2, '0')}:${seconds.toString().padStart(2, '0')}`;
}
function renderQuestions() {
const questionsContainer = document.getElementById('questions');
questionsContainer.innerHTML = '';
selectedPythonQuestions.forEach((q, index) => {
const questionDiv = document.createElement('div');
questionDiv.className = 'question';
const levelBadge = document.createElement('span');
levelBadge.style.cssText = 'background: #007bff; color: white; padding: 4px 8px; border-radius: 4px; font-size: 12px; margin-bottom: 10px; display: inline-block;';
levelBadge.textContent = q.level.toUpperCase();
questionDiv.innerHTML = `
${levelBadge.outerHTML}
<h3>Question ${index + 1}: ${q.question}</h3>
<div class="options">
${q.options.map((option, optIndex) => `
<div class="option" data-question="${index}" data-option="${optIndex}">
${option}
</div>
`).join('')}
</div>
`;
questionsContainer.appendChild(questionDiv);
});
// Add click handlers for options
document.querySelectorAll('.option').forEach(option => {
option.addEventListener('click', function() {
const questionIndex = parseInt(this.dataset.question);
const optionIndex = parseInt(this.dataset.option);
// Remove previous selection for this question
document.querySelectorAll(`[data-question="${questionIndex}"]`).forEach(opt => {
opt.classList.remove('selected');
});
// Select current option
this.classList.add('selected');
currentAnswers[questionIndex] = optionIndex;
});
});
}
function submitExam() {
clearInterval(timerInterval);
const score = calculateScore();
const percentage = Math.round((score / selectedPythonQuestions.length) * 100);
document.getElementById('examContent').style.display = 'none';
document.getElementById('results').style.display = 'block';
document.getElementById('scoreDisplay').textContent = `${percentage}%`;
document.getElementById('resultMessage').textContent = `You scored ${score} out of ${selectedPythonQuestions.length} questions correctly.`;
// Send results via email
sendResults(score, percentage);
}
function calculateScore() {
let score = 0;
currentAnswers.forEach((answer, index) => {
if (answer === selectedPythonQuestions[index].correct) {
score++;
}
});
return score;
}
function sendResults(score, percentage) {
const candidateInfo = JSON.parse(localStorage.getItem('candidateInfo'));
// Prepare exam results
const examResults = {
score: score,
totalQuestions: selectedPythonQuestions.length,
percentage: percentage
};
// Use the centralized email utility
handleEmailSending(candidateInfo, examResults, selectedPythonQuestions, currentAnswers, timeLeft);
}