Practice Focus

Level 4: Advanced Mastery - Apply complex algorithms to real-world scenarios and interview challenges

Practice Focus
Multi-step Complex Problems
Breaking Down Complex Challenges
Advanced problems often require combining multiple algorithms and data structures. Master the art of problem decomposition and solution synthesis.
🎯 Real-Life Analogy:
Think of planning a complex project like organizing a wedding. You need to coordinate multiple vendors (algorithms), manage timelines (optimization), handle constraints (edge cases), and ensure everything works together seamlessly.

🧩 Complex Problem Patterns:

  • Multi-phase Solutions: Problems requiring sequential algorithm applications
  • Hybrid Approaches: Combining different algorithmic paradigms
  • Constraint Satisfaction: Multiple conditions to satisfy simultaneously
  • State Space Search: Exploring complex solution spaces efficiently
TypeScript
// Problem: Design a Meeting Room Scheduler with Complex Constraints
class MeetingRoomScheduler {
    private rooms: Map;
    private priorityQueue: PriorityQueue;
    private conflicts: ConflictResolver;
    
    constructor(roomCount: number) {
        this.rooms = new Map();
        for (let i = 0; i < roomCount; i++) {
            this.rooms.set(i, []);
        }
        this.priorityQueue = new PriorityQueue();
        this.conflicts = new ConflictResolver();
    }
    
    // Phase 1: Conflict Detection using Interval Trees
    detectConflicts(newMeeting: Meeting): Meeting[] {
        const conflicts: Meeting[] = [];
        for (const [roomId, meetings] of this.rooms) {
            for (const meeting of meetings) {
                if (this.hasTimeOverlap(newMeeting, meeting)) {
                    conflicts.push(meeting);
                }
            }
        }
        return conflicts;
    }
    
    // Phase 2: Optimal Room Assignment using Graph Coloring
    assignOptimalRoom(meeting: Meeting): number | null {
        const conflictGraph = this.buildConflictGraph(meeting);
        const roomAssignment = this.colorGraph(conflictGraph);
        return roomAssignment.get(meeting.id) ?? null;
    }
    
    // Phase 3: Dynamic Rescheduling using DP
    scheduleWithRescheduling(meetings: Meeting[]): ScheduleResult {
        // Sort meetings by priority and constraints
        const sortedMeetings = meetings.sort((a, b) => 
            b.priority - a.priority || a.startTime - b.startTime
        );
        
        const dp = new Map();
        return this.optimizeSchedule(sortedMeetings, 0, new ScheduleState(), dp);
    }
    
    private optimizeSchedule(
        meetings: Meeting[], 
        index: number, 
        currentState: ScheduleState,
        memo: Map
    ): ScheduleResult {
        if (index >= meetings.length) {
            return { scheduled: currentState.scheduled, score: currentState.score };
        }
        
        const stateKey = this.getStateKey(currentState, index);
        if (memo.has(stateKey)) {
            return { scheduled: memo.get(stateKey)!.scheduled, score: memo.get(stateKey)!.score };
        }
        
        const meeting = meetings[index];
        
        // Option 1: Skip this meeting
        const skipResult = this.optimizeSchedule(meetings, index + 1, currentState, memo);
        
        // Option 2: Try to schedule this meeting
        let scheduleResult = { scheduled: [], score: -Infinity };
        const availableRooms = this.findAvailableRooms(meeting, currentState);
        
        for (const roomId of availableRooms) {
            const newState = currentState.clone();
            newState.addMeeting(roomId, meeting);
            
            const result = this.optimizeSchedule(meetings, index + 1, newState, memo);
            if (result.score > scheduleResult.score) {
                scheduleResult = result;
            }
        }
        
        // Return best option
        const bestResult = skipResult.score > scheduleResult.score ? skipResult : scheduleResult;
        memo.set(stateKey, new ScheduleState(bestResult.scheduled, bestResult.score));
        
        return bestResult;
    }
    
    private hasTimeOverlap(meeting1: Meeting, meeting2: Meeting): boolean {
        return meeting1.startTime < meeting2.endTime && meeting2.startTime < meeting1.endTime;
    }
    
    private buildConflictGraph(newMeeting: Meeting): Graph {
        const graph = new Graph();
        const allMeetings = this.getAllScheduledMeetings();
        allMeetings.push(newMeeting);
        
        for (let i = 0; i < allMeetings.length; i++) {
            for (let j = i + 1; j < allMeetings.length; j++) {
                if (this.hasTimeOverlap(allMeetings[i], allMeetings[j])) {
                    graph.addEdge(allMeetings[i].id, allMeetings[j].id);
                }
            }
        }
        
        return graph;
    }
    
    private colorGraph(graph: Graph): Map {
        // Welsh-Powell graph coloring algorithm
        const vertices = Array.from(graph.getVertices());
        vertices.sort((a, b) => graph.getDegree(b) - graph.getDegree(a));
        
        const coloring = new Map();
        const usedColors = new Set();
        
        for (const vertex of vertices) {
            const neighborColors = new Set();
            for (const neighbor of graph.getNeighbors(vertex)) {
                if (coloring.has(neighbor)) {
                    neighborColors.add(coloring.get(neighbor)!);
                }
            }
            
            let color = 0;
            while (neighborColors.has(color)) {
                color++;
            }
            
            coloring.set(vertex, color);
            usedColors.add(color);
        }
        
        return coloring;
    }
}

// Problem: Text Editor with Undo/Redo and Collaborative Features
class CollaborativeTextEditor {
    private content: string;
    private history: OperationalTransform[];
    private undoStack: Command[];
    private redoStack: Command[];
    private conflicts: ConflictResolver;
    
    constructor() {
        this.content = "";
        this.history = [];
        this.undoStack = [];
        this.redoStack = [];
        this.conflicts = new ConflictResolver();
    }
    
    // Phase 1: Apply operation with conflict resolution
    applyOperation(operation: TextOperation, authorId: string): OperationResult {
        // Check for conflicts with concurrent operations
        const conflicts = this.detectConflicts(operation);
        
        if (conflicts.length > 0) {
            // Use Operational Transformation to resolve conflicts
            const transformedOp = this.transformOperation(operation, conflicts);
            return this.executeOperation(transformedOp, authorId);
        }
        
        return this.executeOperation(operation, authorId);
    }
    
    // Phase 2: Efficient undo/redo using command pattern
    undo(): boolean {
        if (this.undoStack.length === 0) return false;
        
        const command = this.undoStack.pop()!;
        const inverseCommand = command.getInverse();
        
        this.executeCommand(inverseCommand);
        this.redoStack.push(command);
        
        return true;
    }
    
    redo(): boolean {
        if (this.redoStack.length === 0) return false;
        
        const command = this.redoStack.pop()!;
        this.executeCommand(command);
        this.undoStack.push(command);
        
        return true;
    }
    
    // Phase 3: Optimize for large documents using rope data structure
    insertText(position: number, text: string): void {
        if (this.content.length > 10000) {
            // Switch to rope-based implementation for large documents
            this.optimizeForLargeDocument();
        }
        
        const operation = new InsertOperation(position, text);
        this.applyOperation(operation, "local");
    }
    
    private detectConflicts(operation: TextOperation): TextOperation[] {
        const recentOps = this.getRecentOperations(operation.timestamp);
        return recentOps.filter(op => this.hasConflict(operation, op));
    }
    
    private transformOperation(operation: TextOperation, conflicts: TextOperation[]): TextOperation {
        let transformedOp = operation;
        
        for (const conflict of conflicts) {
            transformedOp = this.operationalTransform(transformedOp, conflict);
        }
        
        return transformedOp;
    }
    
    private operationalTransform(op1: TextOperation, op2: TextOperation): TextOperation {
        // Implement operational transformation rules
        if (op1 instanceof InsertOperation && op2 instanceof InsertOperation) {
            if (op1.position <= op2.position) {
                return new InsertOperation(op1.position, op1.text);
            } else {
                return new InsertOperation(op1.position + op2.text.length, op1.text);
            }
        }
        
        // Handle other operation type combinations...
        return op1;
    }
}

// Problem: Advanced Cache System with Multiple Eviction Policies
class AdvancedCacheSystem {
    private stores: Map>;
    private policies: Map>;
    private analytics: CacheAnalytics;
    private optimizer: CacheOptimizer;
    
    constructor() {
        this.stores = new Map();
        this.policies = new Map();
        this.analytics = new CacheAnalytics();
        this.optimizer = new CacheOptimizer();
        this.initializePolicies();
    }
    
    // Phase 1: Intelligent cache placement using machine learning
    put(key: K, value: V, hint?: CacheHint): void {
        const optimalStore = this.optimizer.selectOptimalStore(key, value, hint);
        const store = this.stores.get(optimalStore)!;
        
        if (store.isFull()) {
            const policy = this.policies.get(optimalStore)!;
            const evictedKeys = policy.evict(store, 1);
            this.analytics.recordEvictions(optimalStore, evictedKeys);
        }
        
        store.put(key, value);
        this.analytics.recordAccess(optimalStore, key, 'write');
    }
    
    // Phase 2: Predictive prefetching using pattern analysis
    get(key: K): V | undefined {
        const accessPattern = this.analytics.getAccessPattern(key);
        const predictedKeys = this.optimizer.predictNextAccess(key, accessPattern);
        
        // Prefetch predicted keys
        this.prefetchKeys(predictedKeys);
        
        // Find the key in appropriate store
        const storeId = this.findKeyLocation(key);
        if (!storeId) return undefined;
        
        const store = this.stores.get(storeId)!;
        const value = store.get(key);
        
        if (value) {
            this.analytics.recordAccess(storeId, key, 'read');
            this.policies.get(storeId)!.onAccess(key);
        }
        
        return value;
    }
    
    // Phase 3: Dynamic optimization based on usage patterns
    optimize(): void {
        const metrics = this.analytics.getMetrics();
        const recommendations = this.optimizer.generateRecommendations(metrics);
        
        for (const rec of recommendations) {
            switch (rec.type) {
                case 'resize':
                    this.resizeStore(rec.storeId, rec.newSize);
                    break;
                case 'policy_change':
                    this.changePolicyForStore(rec.storeId, rec.newPolicy);
                    break;
                case 'rebalance':
                    this.rebalanceStores();
                    break;
            }
        }
    }
}
                                
Python
from typing import List, Dict, Set, Optional, Tuple, Any
from collections import defaultdict, deque
from dataclasses import dataclass
from enum import Enum
import heapq
from abc import ABC, abstractmethod

# Problem: Advanced Job Scheduling System
class JobSchedulingSystem:
    def __init__(self, num_workers: int, max_queue_size: int):
        self.workers = [Worker(i) for i in range(num_workers)]
        self.job_queue = PriorityQueue(max_queue_size)
        self.dependency_graph = DependencyGraph()
        self.resource_manager = ResourceManager()
        self.scheduler = IntelligentScheduler()
        
    def submit_job(self, job: Job) -> bool:
        """Phase 1: Validate and queue job with dependency analysis"""
        # Check for circular dependencies
        if self.dependency_graph.creates_cycle(job):
            raise ValueError(f"Job {job.id} creates circular dependency")
        
        # Validate resource requirements
        if not self.resource_manager.can_allocate(job.resources):
            return False
        
        # Calculate dynamic priority
        priority = self.calculate_dynamic_priority(job)
        job.priority = priority
        
        # Add to dependency graph and queue
        self.dependency_graph.add_job(job)
        return self.job_queue.enqueue(job)
    
    def schedule_jobs(self) -> List[JobExecution]:
        """Phase 2: Intelligent job scheduling with resource optimization"""
        ready_jobs = self.dependency_graph.get_ready_jobs()
        scheduled = []
        
        while ready_jobs:
            # Use machine learning to predict job completion times
            job_predictions = self.scheduler.predict_completion_times(ready_jobs)
            
            # Optimize assignment using Hungarian algorithm
            assignment = self.optimize_worker_assignment(ready_jobs, job_predictions)
            
            for job, worker in assignment:
                if self.resource_manager.allocate(job.resources):
                    execution = JobExecution(job, worker, self.get_current_time())
                    scheduled.append(execution)
                    worker.assign_job(job)
                    ready_jobs.remove(job)
                    
                    # Update dependency graph
                    self.dependency_graph.mark_started(job.id)
        
        return scheduled
    
    def handle_job_completion(self, job_id: str) -> None:
        """Phase 3: Handle completion and trigger dependent jobs"""
        job = self.dependency_graph.get_job(job_id)
        
        # Release resources
        self.resource_manager.deallocate(job.resources)
        
        # Update dependency graph and get newly ready jobs
        newly_ready = self.dependency_graph.mark_completed(job_id)
        
        # Add newly ready jobs to queue with updated priorities
        for ready_job in newly_ready:
            updated_priority = self.calculate_dynamic_priority(ready_job)
            ready_job.priority = updated_priority
            self.job_queue.enqueue(ready_job)
        
        # Trigger rescheduling if needed
        if newly_ready:
            self.schedule_jobs()
    
    def calculate_dynamic_priority(self, job: Job) -> int:
        """Calculate priority based on multiple factors"""
        base_priority = job.base_priority
        
        # Factor in deadline urgency
        deadline_factor = self.calculate_deadline_urgency(job)
        
        # Factor in dependency chain length
        chain_length = self.dependency_graph.get_longest_chain_length(job.id)
        dependency_factor = chain_length * 0.1
        
        # Factor in resource contention
        resource_contention = self.resource_manager.get_contention_score(job.resources)
        
        return int(base_priority + deadline_factor + dependency_factor - resource_contention)
    
    def optimize_worker_assignment(self, jobs: List[Job], predictions: Dict[str, float]) -> List[Tuple[Job, Worker]]:
        """Use Hungarian algorithm for optimal job-worker assignment"""
        cost_matrix = []
        
        for job in jobs:
            job_costs = []
            for worker in self.workers:
                if worker.is_available():
                    # Cost = predicted time + worker skill mismatch penalty
                    cost = predictions[job.id] + worker.get_skill_mismatch_penalty(job.required_skills)
                    job_costs.append(cost)
                else:
                    job_costs.append(float('inf'))
            cost_matrix.append(job_costs)
        
        # Apply Hungarian algorithm
        assignment_indices = self.hungarian_algorithm(cost_matrix)
        
        assignments = []
        for job_idx, worker_idx in assignment_indices:
            if cost_matrix[job_idx][worker_idx] != float('inf'):
                assignments.append((jobs[job_idx], self.workers[worker_idx]))
        
        return assignments

# Problem: Distributed System Load Balancer
class AdvancedLoadBalancer:
    def __init__(self, servers: List[Server]):
        self.servers = servers
        self.health_monitor = HealthMonitor(servers)
        self.predictor = LoadPredictor()
        self.circuit_breakers = {server.id: CircuitBreaker() for server in servers}
        self.rate_limiter = AdaptiveRateLimiter()
        
    def route_request(self, request: Request) -> Optional[Server]:
        """Phase 1: Intelligent request routing with prediction"""
        # Check rate limiting
        if not self.rate_limiter.allow_request(request):
            raise RateLimitExceededException()
        
        # Get healthy servers
        healthy_servers = self.health_monitor.get_healthy_servers()
        if not healthy_servers:
            raise NoHealthyServersException()
        
        # Predict future load for each server
        load_predictions = self.predictor.predict_loads(healthy_servers, request)
        
        # Select optimal server using weighted scoring
        optimal_server = self.select_optimal_server(healthy_servers, load_predictions, request)
        
        # Check circuit breaker
        circuit_breaker = self.circuit_breakers[optimal_server.id]
        if not circuit_breaker.allow_request():
            # Fallback to secondary server
            return self.get_fallback_server(healthy_servers, optimal_server)
        
        return optimal_server
    
    def select_optimal_server(self, servers: List[Server], predictions: Dict[str, float], request: Request) -> Server:
        """Advanced server selection using multiple algorithms"""
        scores = {}
        
        for server in servers:
            # Base score from weighted round robin
            wrr_score = self.calculate_wrr_score(server)
            
            # Least connections score
            lc_score = self.calculate_least_connections_score(server)
            
            # Predicted load score
            pred_score = 1.0 / (predictions[server.id] + 1)
            
            # Geographic proximity score (for geo-distributed systems)
            geo_score = self.calculate_geo_proximity_score(server, request)
            
            # Server affinity score (for session stickiness)
            affinity_score = self.calculate_affinity_score(server, request)
            
            # Weighted combination
            total_score = (0.3 * wrr_score + 0.2 * lc_score + 0.2 * pred_score + 
                          0.2 * geo_score + 0.1 * affinity_score)
            
            scores[server.id] = total_score
        
        # Return server with highest score
        best_server_id = max(scores.keys(), key=lambda k: scores[k])
        return next(s for s in servers if s.id == best_server_id)
    
    def handle_server_response(self, server: Server, response: Response, latency: float) -> None:
        """Phase 2: Adaptive learning from server responses"""
        # Update circuit breaker
        circuit_breaker = self.circuit_breakers[server.id]
        if response.is_successful():
            circuit_breaker.record_success()
        else:
            circuit_breaker.record_failure()
        
        # Update load predictor with actual performance data
        self.predictor.update_model(server, response, latency)
        
        # Update health monitor
        self.health_monitor.record_response(server, response, latency)
        
        # Adapt rate limiting based on system performance
        self.rate_limiter.adapt_limits(response, latency)
    
    def optimize_configuration(self) -> None:
        """Phase 3: Continuous optimization based on performance metrics"""
        metrics = self.collect_performance_metrics()
        
        # Optimize server weights
        new_weights = self.optimize_server_weights(metrics)
        self.update_server_weights(new_weights)
        
        # Optimize circuit breaker thresholds
        new_cb_config = self.optimize_circuit_breaker_config(metrics)
        self.update_circuit_breaker_config(new_cb_config)
        
        # Optimize rate limiting parameters
        new_rl_config = self.optimize_rate_limiting_config(metrics)
        self.rate_limiter.update_configuration(new_rl_config)

# Problem: Advanced Search Engine with Multiple Ranking Algorithms
class AdvancedSearchEngine:
    def __init__(self):
        self.index = InvertedIndex()
        self.rankers = {
            'tfidf': TFIDFRanker(),
            'pagerank': PageRankRanker(),
            'bert': BERTRanker(),
            'learning_to_rank': LearningToRankModel()
        }
        self.query_processor = QueryProcessor()
        self.personalizer = PersonalizationEngine()
        
    def search(self, query: str, user_context: UserContext, max_results: int = 10) -> SearchResults:
        """Multi-phase search with advanced ranking"""
        # Phase 1: Query processing and candidate retrieval
        processed_query = self.query_processor.process(query)
        candidates = self.retrieve_candidates(processed_query)
        
        if not candidates:
            return SearchResults([])
        
        # Phase 2: Multi-algorithm ranking and fusion
        rankings = {}
        for name, ranker in self.rankers.items():
            rankings[name] = ranker.rank(candidates, processed_query, user_context)
        
        # Fuse rankings using sophisticated combination
        fused_ranking = self.fuse_rankings(rankings, query, user_context)
        
        # Phase 3: Personalization and re-ranking
        personalized_results = self.personalizer.personalize(fused_ranking, user_context)
        
        # Apply diversity and freshness filters
        final_results = self.apply_diversity_filters(personalized_results, max_results)
        
        return SearchResults(final_results[:max_results])
    
    def fuse_rankings(self, rankings: Dict[str, List[Document]], query: str, context: User