Practice Focus
Level 4: Advanced Mastery - Apply complex algorithms to real-world scenarios and interview challenges
Practice Focus
Multi-step Complex Problems
Breaking Down Complex Challenges
Advanced problems often require combining multiple algorithms and data structures. Master the art of problem decomposition and solution synthesis.
🎯 Real-Life Analogy:
Think of planning a complex project like organizing a wedding. You need to coordinate multiple vendors (algorithms), manage timelines (optimization), handle constraints (edge cases), and ensure everything works together seamlessly.
🧩 Complex Problem Patterns:
- Multi-phase Solutions: Problems requiring sequential algorithm applications
- Hybrid Approaches: Combining different algorithmic paradigms
- Constraint Satisfaction: Multiple conditions to satisfy simultaneously
- State Space Search: Exploring complex solution spaces efficiently
TypeScript
// Problem: Design a Meeting Room Scheduler with Complex Constraints class MeetingRoomScheduler { private rooms: Map; private priorityQueue: PriorityQueue ; private conflicts: ConflictResolver; constructor(roomCount: number) { this.rooms = new Map(); for (let i = 0; i < roomCount; i++) { this.rooms.set(i, []); } this.priorityQueue = new PriorityQueue(); this.conflicts = new ConflictResolver(); } // Phase 1: Conflict Detection using Interval Trees detectConflicts(newMeeting: Meeting): Meeting[] { const conflicts: Meeting[] = []; for (const [roomId, meetings] of this.rooms) { for (const meeting of meetings) { if (this.hasTimeOverlap(newMeeting, meeting)) { conflicts.push(meeting); } } } return conflicts; } // Phase 2: Optimal Room Assignment using Graph Coloring assignOptimalRoom(meeting: Meeting): number | null { const conflictGraph = this.buildConflictGraph(meeting); const roomAssignment = this.colorGraph(conflictGraph); return roomAssignment.get(meeting.id) ?? null; } // Phase 3: Dynamic Rescheduling using DP scheduleWithRescheduling(meetings: Meeting[]): ScheduleResult { // Sort meetings by priority and constraints const sortedMeetings = meetings.sort((a, b) => b.priority - a.priority || a.startTime - b.startTime ); const dp = new Map (); return this.optimizeSchedule(sortedMeetings, 0, new ScheduleState(), dp); } private optimizeSchedule( meetings: Meeting[], index: number, currentState: ScheduleState, memo: Map ): ScheduleResult { if (index >= meetings.length) { return { scheduled: currentState.scheduled, score: currentState.score }; } const stateKey = this.getStateKey(currentState, index); if (memo.has(stateKey)) { return { scheduled: memo.get(stateKey)!.scheduled, score: memo.get(stateKey)!.score }; } const meeting = meetings[index]; // Option 1: Skip this meeting const skipResult = this.optimizeSchedule(meetings, index + 1, currentState, memo); // Option 2: Try to schedule this meeting let scheduleResult = { scheduled: [], score: -Infinity }; const availableRooms = this.findAvailableRooms(meeting, currentState); for (const roomId of availableRooms) { const newState = currentState.clone(); newState.addMeeting(roomId, meeting); const result = this.optimizeSchedule(meetings, index + 1, newState, memo); if (result.score > scheduleResult.score) { scheduleResult = result; } } // Return best option const bestResult = skipResult.score > scheduleResult.score ? skipResult : scheduleResult; memo.set(stateKey, new ScheduleState(bestResult.scheduled, bestResult.score)); return bestResult; } private hasTimeOverlap(meeting1: Meeting, meeting2: Meeting): boolean { return meeting1.startTime < meeting2.endTime && meeting2.startTime < meeting1.endTime; } private buildConflictGraph(newMeeting: Meeting): Graph { const graph = new Graph(); const allMeetings = this.getAllScheduledMeetings(); allMeetings.push(newMeeting); for (let i = 0; i < allMeetings.length; i++) { for (let j = i + 1; j < allMeetings.length; j++) { if (this.hasTimeOverlap(allMeetings[i], allMeetings[j])) { graph.addEdge(allMeetings[i].id, allMeetings[j].id); } } } return graph; } private colorGraph(graph: Graph): Map { // Welsh-Powell graph coloring algorithm const vertices = Array.from(graph.getVertices()); vertices.sort((a, b) => graph.getDegree(b) - graph.getDegree(a)); const coloring = new Map (); const usedColors = new Set (); for (const vertex of vertices) { const neighborColors = new Set (); for (const neighbor of graph.getNeighbors(vertex)) { if (coloring.has(neighbor)) { neighborColors.add(coloring.get(neighbor)!); } } let color = 0; while (neighborColors.has(color)) { color++; } coloring.set(vertex, color); usedColors.add(color); } return coloring; } } // Problem: Text Editor with Undo/Redo and Collaborative Features class CollaborativeTextEditor { private content: string; private history: OperationalTransform[]; private undoStack: Command[]; private redoStack: Command[]; private conflicts: ConflictResolver; constructor() { this.content = ""; this.history = []; this.undoStack = []; this.redoStack = []; this.conflicts = new ConflictResolver(); } // Phase 1: Apply operation with conflict resolution applyOperation(operation: TextOperation, authorId: string): OperationResult { // Check for conflicts with concurrent operations const conflicts = this.detectConflicts(operation); if (conflicts.length > 0) { // Use Operational Transformation to resolve conflicts const transformedOp = this.transformOperation(operation, conflicts); return this.executeOperation(transformedOp, authorId); } return this.executeOperation(operation, authorId); } // Phase 2: Efficient undo/redo using command pattern undo(): boolean { if (this.undoStack.length === 0) return false; const command = this.undoStack.pop()!; const inverseCommand = command.getInverse(); this.executeCommand(inverseCommand); this.redoStack.push(command); return true; } redo(): boolean { if (this.redoStack.length === 0) return false; const command = this.redoStack.pop()!; this.executeCommand(command); this.undoStack.push(command); return true; } // Phase 3: Optimize for large documents using rope data structure insertText(position: number, text: string): void { if (this.content.length > 10000) { // Switch to rope-based implementation for large documents this.optimizeForLargeDocument(); } const operation = new InsertOperation(position, text); this.applyOperation(operation, "local"); } private detectConflicts(operation: TextOperation): TextOperation[] { const recentOps = this.getRecentOperations(operation.timestamp); return recentOps.filter(op => this.hasConflict(operation, op)); } private transformOperation(operation: TextOperation, conflicts: TextOperation[]): TextOperation { let transformedOp = operation; for (const conflict of conflicts) { transformedOp = this.operationalTransform(transformedOp, conflict); } return transformedOp; } private operationalTransform(op1: TextOperation, op2: TextOperation): TextOperation { // Implement operational transformation rules if (op1 instanceof InsertOperation && op2 instanceof InsertOperation) { if (op1.position <= op2.position) { return new InsertOperation(op1.position, op1.text); } else { return new InsertOperation(op1.position + op2.text.length, op1.text); } } // Handle other operation type combinations... return op1; } } // Problem: Advanced Cache System with Multiple Eviction Policies class AdvancedCacheSystem { private stores: Map >; private policies: Map >; private analytics: CacheAnalytics; private optimizer: CacheOptimizer; constructor() { this.stores = new Map(); this.policies = new Map(); this.analytics = new CacheAnalytics(); this.optimizer = new CacheOptimizer(); this.initializePolicies(); } // Phase 1: Intelligent cache placement using machine learning put(key: K, value: V, hint?: CacheHint): void { const optimalStore = this.optimizer.selectOptimalStore(key, value, hint); const store = this.stores.get(optimalStore)!; if (store.isFull()) { const policy = this.policies.get(optimalStore)!; const evictedKeys = policy.evict(store, 1); this.analytics.recordEvictions(optimalStore, evictedKeys); } store.put(key, value); this.analytics.recordAccess(optimalStore, key, 'write'); } // Phase 2: Predictive prefetching using pattern analysis get(key: K): V | undefined { const accessPattern = this.analytics.getAccessPattern(key); const predictedKeys = this.optimizer.predictNextAccess(key, accessPattern); // Prefetch predicted keys this.prefetchKeys(predictedKeys); // Find the key in appropriate store const storeId = this.findKeyLocation(key); if (!storeId) return undefined; const store = this.stores.get(storeId)!; const value = store.get(key); if (value) { this.analytics.recordAccess(storeId, key, 'read'); this.policies.get(storeId)!.onAccess(key); } return value; } // Phase 3: Dynamic optimization based on usage patterns optimize(): void { const metrics = this.analytics.getMetrics(); const recommendations = this.optimizer.generateRecommendations(metrics); for (const rec of recommendations) { switch (rec.type) { case 'resize': this.resizeStore(rec.storeId, rec.newSize); break; case 'policy_change': this.changePolicyForStore(rec.storeId, rec.newPolicy); break; case 'rebalance': this.rebalanceStores(); break; } } } }
Python
from typing import List, Dict, Set, Optional, Tuple, Any from collections import defaultdict, deque from dataclasses import dataclass from enum import Enum import heapq from abc import ABC, abstractmethod # Problem: Advanced Job Scheduling System class JobSchedulingSystem: def __init__(self, num_workers: int, max_queue_size: int): self.workers = [Worker(i) for i in range(num_workers)] self.job_queue = PriorityQueue(max_queue_size) self.dependency_graph = DependencyGraph() self.resource_manager = ResourceManager() self.scheduler = IntelligentScheduler() def submit_job(self, job: Job) -> bool: """Phase 1: Validate and queue job with dependency analysis""" # Check for circular dependencies if self.dependency_graph.creates_cycle(job): raise ValueError(f"Job {job.id} creates circular dependency") # Validate resource requirements if not self.resource_manager.can_allocate(job.resources): return False # Calculate dynamic priority priority = self.calculate_dynamic_priority(job) job.priority = priority # Add to dependency graph and queue self.dependency_graph.add_job(job) return self.job_queue.enqueue(job) def schedule_jobs(self) -> List[JobExecution]: """Phase 2: Intelligent job scheduling with resource optimization""" ready_jobs = self.dependency_graph.get_ready_jobs() scheduled = [] while ready_jobs: # Use machine learning to predict job completion times job_predictions = self.scheduler.predict_completion_times(ready_jobs) # Optimize assignment using Hungarian algorithm assignment = self.optimize_worker_assignment(ready_jobs, job_predictions) for job, worker in assignment: if self.resource_manager.allocate(job.resources): execution = JobExecution(job, worker, self.get_current_time()) scheduled.append(execution) worker.assign_job(job) ready_jobs.remove(job) # Update dependency graph self.dependency_graph.mark_started(job.id) return scheduled def handle_job_completion(self, job_id: str) -> None: """Phase 3: Handle completion and trigger dependent jobs""" job = self.dependency_graph.get_job(job_id) # Release resources self.resource_manager.deallocate(job.resources) # Update dependency graph and get newly ready jobs newly_ready = self.dependency_graph.mark_completed(job_id) # Add newly ready jobs to queue with updated priorities for ready_job in newly_ready: updated_priority = self.calculate_dynamic_priority(ready_job) ready_job.priority = updated_priority self.job_queue.enqueue(ready_job) # Trigger rescheduling if needed if newly_ready: self.schedule_jobs() def calculate_dynamic_priority(self, job: Job) -> int: """Calculate priority based on multiple factors""" base_priority = job.base_priority # Factor in deadline urgency deadline_factor = self.calculate_deadline_urgency(job) # Factor in dependency chain length chain_length = self.dependency_graph.get_longest_chain_length(job.id) dependency_factor = chain_length * 0.1 # Factor in resource contention resource_contention = self.resource_manager.get_contention_score(job.resources) return int(base_priority + deadline_factor + dependency_factor - resource_contention) def optimize_worker_assignment(self, jobs: List[Job], predictions: Dict[str, float]) -> List[Tuple[Job, Worker]]: """Use Hungarian algorithm for optimal job-worker assignment""" cost_matrix = [] for job in jobs: job_costs = [] for worker in self.workers: if worker.is_available(): # Cost = predicted time + worker skill mismatch penalty cost = predictions[job.id] + worker.get_skill_mismatch_penalty(job.required_skills) job_costs.append(cost) else: job_costs.append(float('inf')) cost_matrix.append(job_costs) # Apply Hungarian algorithm assignment_indices = self.hungarian_algorithm(cost_matrix) assignments = [] for job_idx, worker_idx in assignment_indices: if cost_matrix[job_idx][worker_idx] != float('inf'): assignments.append((jobs[job_idx], self.workers[worker_idx])) return assignments # Problem: Distributed System Load Balancer class AdvancedLoadBalancer: def __init__(self, servers: List[Server]): self.servers = servers self.health_monitor = HealthMonitor(servers) self.predictor = LoadPredictor() self.circuit_breakers = {server.id: CircuitBreaker() for server in servers} self.rate_limiter = AdaptiveRateLimiter() def route_request(self, request: Request) -> Optional[Server]: """Phase 1: Intelligent request routing with prediction""" # Check rate limiting if not self.rate_limiter.allow_request(request): raise RateLimitExceededException() # Get healthy servers healthy_servers = self.health_monitor.get_healthy_servers() if not healthy_servers: raise NoHealthyServersException() # Predict future load for each server load_predictions = self.predictor.predict_loads(healthy_servers, request) # Select optimal server using weighted scoring optimal_server = self.select_optimal_server(healthy_servers, load_predictions, request) # Check circuit breaker circuit_breaker = self.circuit_breakers[optimal_server.id] if not circuit_breaker.allow_request(): # Fallback to secondary server return self.get_fallback_server(healthy_servers, optimal_server) return optimal_server def select_optimal_server(self, servers: List[Server], predictions: Dict[str, float], request: Request) -> Server: """Advanced server selection using multiple algorithms""" scores = {} for server in servers: # Base score from weighted round robin wrr_score = self.calculate_wrr_score(server) # Least connections score lc_score = self.calculate_least_connections_score(server) # Predicted load score pred_score = 1.0 / (predictions[server.id] + 1) # Geographic proximity score (for geo-distributed systems) geo_score = self.calculate_geo_proximity_score(server, request) # Server affinity score (for session stickiness) affinity_score = self.calculate_affinity_score(server, request) # Weighted combination total_score = (0.3 * wrr_score + 0.2 * lc_score + 0.2 * pred_score + 0.2 * geo_score + 0.1 * affinity_score) scores[server.id] = total_score # Return server with highest score best_server_id = max(scores.keys(), key=lambda k: scores[k]) return next(s for s in servers if s.id == best_server_id) def handle_server_response(self, server: Server, response: Response, latency: float) -> None: """Phase 2: Adaptive learning from server responses""" # Update circuit breaker circuit_breaker = self.circuit_breakers[server.id] if response.is_successful(): circuit_breaker.record_success() else: circuit_breaker.record_failure() # Update load predictor with actual performance data self.predictor.update_model(server, response, latency) # Update health monitor self.health_monitor.record_response(server, response, latency) # Adapt rate limiting based on system performance self.rate_limiter.adapt_limits(response, latency) def optimize_configuration(self) -> None: """Phase 3: Continuous optimization based on performance metrics""" metrics = self.collect_performance_metrics() # Optimize server weights new_weights = self.optimize_server_weights(metrics) self.update_server_weights(new_weights) # Optimize circuit breaker thresholds new_cb_config = self.optimize_circuit_breaker_config(metrics) self.update_circuit_breaker_config(new_cb_config) # Optimize rate limiting parameters new_rl_config = self.optimize_rate_limiting_config(metrics) self.rate_limiter.update_configuration(new_rl_config) # Problem: Advanced Search Engine with Multiple Ranking Algorithms class AdvancedSearchEngine: def __init__(self): self.index = InvertedIndex() self.rankers = { 'tfidf': TFIDFRanker(), 'pagerank': PageRankRanker(), 'bert': BERTRanker(), 'learning_to_rank': LearningToRankModel() } self.query_processor = QueryProcessor() self.personalizer = PersonalizationEngine() def search(self, query: str, user_context: UserContext, max_results: int = 10) -> SearchResults: """Multi-phase search with advanced ranking""" # Phase 1: Query processing and candidate retrieval processed_query = self.query_processor.process(query) candidates = self.retrieve_candidates(processed_query) if not candidates: return SearchResults([]) # Phase 2: Multi-algorithm ranking and fusion rankings = {} for name, ranker in self.rankers.items(): rankings[name] = ranker.rank(candidates, processed_query, user_context) # Fuse rankings using sophisticated combination fused_ranking = self.fuse_rankings(rankings, query, user_context) # Phase 3: Personalization and re-ranking personalized_results = self.personalizer.personalize(fused_ranking, user_context) # Apply diversity and freshness filters final_results = self.apply_diversity_filters(personalized_results, max_results) return SearchResults(final_results[:max_results]) def fuse_rankings(self, rankings: Dict[str, List[Document]], query: str, context: User