Practice Focus
Level 4: Advanced Mastery - Apply complex algorithms to real-world scenarios and interview challenges
Practice Focus
Multi-step Complex Problems
Breaking Down Complex Challenges
Advanced problems often require combining multiple algorithms and data structures. Master the art of problem decomposition and solution synthesis.
🎯 Real-Life Analogy:
Think of planning a complex project like organizing a wedding. You need to coordinate multiple vendors (algorithms), manage timelines (optimization), handle constraints (edge cases), and ensure everything works together seamlessly.
🧩 Complex Problem Patterns:
- Multi-phase Solutions: Problems requiring sequential algorithm applications
- Hybrid Approaches: Combining different algorithmic paradigms
- Constraint Satisfaction: Multiple conditions to satisfy simultaneously
- State Space Search: Exploring complex solution spaces efficiently
TypeScript
// Problem: Design a Meeting Room Scheduler with Complex Constraints
class MeetingRoomScheduler {
private rooms: Map;
private priorityQueue: PriorityQueue;
private conflicts: ConflictResolver;
constructor(roomCount: number) {
this.rooms = new Map();
for (let i = 0; i < roomCount; i++) {
this.rooms.set(i, []);
}
this.priorityQueue = new PriorityQueue();
this.conflicts = new ConflictResolver();
}
// Phase 1: Conflict Detection using Interval Trees
detectConflicts(newMeeting: Meeting): Meeting[] {
const conflicts: Meeting[] = [];
for (const [roomId, meetings] of this.rooms) {
for (const meeting of meetings) {
if (this.hasTimeOverlap(newMeeting, meeting)) {
conflicts.push(meeting);
}
}
}
return conflicts;
}
// Phase 2: Optimal Room Assignment using Graph Coloring
assignOptimalRoom(meeting: Meeting): number | null {
const conflictGraph = this.buildConflictGraph(meeting);
const roomAssignment = this.colorGraph(conflictGraph);
return roomAssignment.get(meeting.id) ?? null;
}
// Phase 3: Dynamic Rescheduling using DP
scheduleWithRescheduling(meetings: Meeting[]): ScheduleResult {
// Sort meetings by priority and constraints
const sortedMeetings = meetings.sort((a, b) =>
b.priority - a.priority || a.startTime - b.startTime
);
const dp = new Map();
return this.optimizeSchedule(sortedMeetings, 0, new ScheduleState(), dp);
}
private optimizeSchedule(
meetings: Meeting[],
index: number,
currentState: ScheduleState,
memo: Map
): ScheduleResult {
if (index >= meetings.length) {
return { scheduled: currentState.scheduled, score: currentState.score };
}
const stateKey = this.getStateKey(currentState, index);
if (memo.has(stateKey)) {
return { scheduled: memo.get(stateKey)!.scheduled, score: memo.get(stateKey)!.score };
}
const meeting = meetings[index];
// Option 1: Skip this meeting
const skipResult = this.optimizeSchedule(meetings, index + 1, currentState, memo);
// Option 2: Try to schedule this meeting
let scheduleResult = { scheduled: [], score: -Infinity };
const availableRooms = this.findAvailableRooms(meeting, currentState);
for (const roomId of availableRooms) {
const newState = currentState.clone();
newState.addMeeting(roomId, meeting);
const result = this.optimizeSchedule(meetings, index + 1, newState, memo);
if (result.score > scheduleResult.score) {
scheduleResult = result;
}
}
// Return best option
const bestResult = skipResult.score > scheduleResult.score ? skipResult : scheduleResult;
memo.set(stateKey, new ScheduleState(bestResult.scheduled, bestResult.score));
return bestResult;
}
private hasTimeOverlap(meeting1: Meeting, meeting2: Meeting): boolean {
return meeting1.startTime < meeting2.endTime && meeting2.startTime < meeting1.endTime;
}
private buildConflictGraph(newMeeting: Meeting): Graph {
const graph = new Graph();
const allMeetings = this.getAllScheduledMeetings();
allMeetings.push(newMeeting);
for (let i = 0; i < allMeetings.length; i++) {
for (let j = i + 1; j < allMeetings.length; j++) {
if (this.hasTimeOverlap(allMeetings[i], allMeetings[j])) {
graph.addEdge(allMeetings[i].id, allMeetings[j].id);
}
}
}
return graph;
}
private colorGraph(graph: Graph): Map {
// Welsh-Powell graph coloring algorithm
const vertices = Array.from(graph.getVertices());
vertices.sort((a, b) => graph.getDegree(b) - graph.getDegree(a));
const coloring = new Map();
const usedColors = new Set();
for (const vertex of vertices) {
const neighborColors = new Set();
for (const neighbor of graph.getNeighbors(vertex)) {
if (coloring.has(neighbor)) {
neighborColors.add(coloring.get(neighbor)!);
}
}
let color = 0;
while (neighborColors.has(color)) {
color++;
}
coloring.set(vertex, color);
usedColors.add(color);
}
return coloring;
}
}
// Problem: Text Editor with Undo/Redo and Collaborative Features
class CollaborativeTextEditor {
private content: string;
private history: OperationalTransform[];
private undoStack: Command[];
private redoStack: Command[];
private conflicts: ConflictResolver;
constructor() {
this.content = "";
this.history = [];
this.undoStack = [];
this.redoStack = [];
this.conflicts = new ConflictResolver();
}
// Phase 1: Apply operation with conflict resolution
applyOperation(operation: TextOperation, authorId: string): OperationResult {
// Check for conflicts with concurrent operations
const conflicts = this.detectConflicts(operation);
if (conflicts.length > 0) {
// Use Operational Transformation to resolve conflicts
const transformedOp = this.transformOperation(operation, conflicts);
return this.executeOperation(transformedOp, authorId);
}
return this.executeOperation(operation, authorId);
}
// Phase 2: Efficient undo/redo using command pattern
undo(): boolean {
if (this.undoStack.length === 0) return false;
const command = this.undoStack.pop()!;
const inverseCommand = command.getInverse();
this.executeCommand(inverseCommand);
this.redoStack.push(command);
return true;
}
redo(): boolean {
if (this.redoStack.length === 0) return false;
const command = this.redoStack.pop()!;
this.executeCommand(command);
this.undoStack.push(command);
return true;
}
// Phase 3: Optimize for large documents using rope data structure
insertText(position: number, text: string): void {
if (this.content.length > 10000) {
// Switch to rope-based implementation for large documents
this.optimizeForLargeDocument();
}
const operation = new InsertOperation(position, text);
this.applyOperation(operation, "local");
}
private detectConflicts(operation: TextOperation): TextOperation[] {
const recentOps = this.getRecentOperations(operation.timestamp);
return recentOps.filter(op => this.hasConflict(operation, op));
}
private transformOperation(operation: TextOperation, conflicts: TextOperation[]): TextOperation {
let transformedOp = operation;
for (const conflict of conflicts) {
transformedOp = this.operationalTransform(transformedOp, conflict);
}
return transformedOp;
}
private operationalTransform(op1: TextOperation, op2: TextOperation): TextOperation {
// Implement operational transformation rules
if (op1 instanceof InsertOperation && op2 instanceof InsertOperation) {
if (op1.position <= op2.position) {
return new InsertOperation(op1.position, op1.text);
} else {
return new InsertOperation(op1.position + op2.text.length, op1.text);
}
}
// Handle other operation type combinations...
return op1;
}
}
// Problem: Advanced Cache System with Multiple Eviction Policies
class AdvancedCacheSystem {
private stores: Map>;
private policies: Map>;
private analytics: CacheAnalytics;
private optimizer: CacheOptimizer;
constructor() {
this.stores = new Map();
this.policies = new Map();
this.analytics = new CacheAnalytics();
this.optimizer = new CacheOptimizer();
this.initializePolicies();
}
// Phase 1: Intelligent cache placement using machine learning
put(key: K, value: V, hint?: CacheHint): void {
const optimalStore = this.optimizer.selectOptimalStore(key, value, hint);
const store = this.stores.get(optimalStore)!;
if (store.isFull()) {
const policy = this.policies.get(optimalStore)!;
const evictedKeys = policy.evict(store, 1);
this.analytics.recordEvictions(optimalStore, evictedKeys);
}
store.put(key, value);
this.analytics.recordAccess(optimalStore, key, 'write');
}
// Phase 2: Predictive prefetching using pattern analysis
get(key: K): V | undefined {
const accessPattern = this.analytics.getAccessPattern(key);
const predictedKeys = this.optimizer.predictNextAccess(key, accessPattern);
// Prefetch predicted keys
this.prefetchKeys(predictedKeys);
// Find the key in appropriate store
const storeId = this.findKeyLocation(key);
if (!storeId) return undefined;
const store = this.stores.get(storeId)!;
const value = store.get(key);
if (value) {
this.analytics.recordAccess(storeId, key, 'read');
this.policies.get(storeId)!.onAccess(key);
}
return value;
}
// Phase 3: Dynamic optimization based on usage patterns
optimize(): void {
const metrics = this.analytics.getMetrics();
const recommendations = this.optimizer.generateRecommendations(metrics);
for (const rec of recommendations) {
switch (rec.type) {
case 'resize':
this.resizeStore(rec.storeId, rec.newSize);
break;
case 'policy_change':
this.changePolicyForStore(rec.storeId, rec.newPolicy);
break;
case 'rebalance':
this.rebalanceStores();
break;
}
}
}
}
Python
from typing import List, Dict, Set, Optional, Tuple, Any
from collections import defaultdict, deque
from dataclasses import dataclass
from enum import Enum
import heapq
from abc import ABC, abstractmethod
# Problem: Advanced Job Scheduling System
class JobSchedulingSystem:
def __init__(self, num_workers: int, max_queue_size: int):
self.workers = [Worker(i) for i in range(num_workers)]
self.job_queue = PriorityQueue(max_queue_size)
self.dependency_graph = DependencyGraph()
self.resource_manager = ResourceManager()
self.scheduler = IntelligentScheduler()
def submit_job(self, job: Job) -> bool:
"""Phase 1: Validate and queue job with dependency analysis"""
# Check for circular dependencies
if self.dependency_graph.creates_cycle(job):
raise ValueError(f"Job {job.id} creates circular dependency")
# Validate resource requirements
if not self.resource_manager.can_allocate(job.resources):
return False
# Calculate dynamic priority
priority = self.calculate_dynamic_priority(job)
job.priority = priority
# Add to dependency graph and queue
self.dependency_graph.add_job(job)
return self.job_queue.enqueue(job)
def schedule_jobs(self) -> List[JobExecution]:
"""Phase 2: Intelligent job scheduling with resource optimization"""
ready_jobs = self.dependency_graph.get_ready_jobs()
scheduled = []
while ready_jobs:
# Use machine learning to predict job completion times
job_predictions = self.scheduler.predict_completion_times(ready_jobs)
# Optimize assignment using Hungarian algorithm
assignment = self.optimize_worker_assignment(ready_jobs, job_predictions)
for job, worker in assignment:
if self.resource_manager.allocate(job.resources):
execution = JobExecution(job, worker, self.get_current_time())
scheduled.append(execution)
worker.assign_job(job)
ready_jobs.remove(job)
# Update dependency graph
self.dependency_graph.mark_started(job.id)
return scheduled
def handle_job_completion(self, job_id: str) -> None:
"""Phase 3: Handle completion and trigger dependent jobs"""
job = self.dependency_graph.get_job(job_id)
# Release resources
self.resource_manager.deallocate(job.resources)
# Update dependency graph and get newly ready jobs
newly_ready = self.dependency_graph.mark_completed(job_id)
# Add newly ready jobs to queue with updated priorities
for ready_job in newly_ready:
updated_priority = self.calculate_dynamic_priority(ready_job)
ready_job.priority = updated_priority
self.job_queue.enqueue(ready_job)
# Trigger rescheduling if needed
if newly_ready:
self.schedule_jobs()
def calculate_dynamic_priority(self, job: Job) -> int:
"""Calculate priority based on multiple factors"""
base_priority = job.base_priority
# Factor in deadline urgency
deadline_factor = self.calculate_deadline_urgency(job)
# Factor in dependency chain length
chain_length = self.dependency_graph.get_longest_chain_length(job.id)
dependency_factor = chain_length * 0.1
# Factor in resource contention
resource_contention = self.resource_manager.get_contention_score(job.resources)
return int(base_priority + deadline_factor + dependency_factor - resource_contention)
def optimize_worker_assignment(self, jobs: List[Job], predictions: Dict[str, float]) -> List[Tuple[Job, Worker]]:
"""Use Hungarian algorithm for optimal job-worker assignment"""
cost_matrix = []
for job in jobs:
job_costs = []
for worker in self.workers:
if worker.is_available():
# Cost = predicted time + worker skill mismatch penalty
cost = predictions[job.id] + worker.get_skill_mismatch_penalty(job.required_skills)
job_costs.append(cost)
else:
job_costs.append(float('inf'))
cost_matrix.append(job_costs)
# Apply Hungarian algorithm
assignment_indices = self.hungarian_algorithm(cost_matrix)
assignments = []
for job_idx, worker_idx in assignment_indices:
if cost_matrix[job_idx][worker_idx] != float('inf'):
assignments.append((jobs[job_idx], self.workers[worker_idx]))
return assignments
# Problem: Distributed System Load Balancer
class AdvancedLoadBalancer:
def __init__(self, servers: List[Server]):
self.servers = servers
self.health_monitor = HealthMonitor(servers)
self.predictor = LoadPredictor()
self.circuit_breakers = {server.id: CircuitBreaker() for server in servers}
self.rate_limiter = AdaptiveRateLimiter()
def route_request(self, request: Request) -> Optional[Server]:
"""Phase 1: Intelligent request routing with prediction"""
# Check rate limiting
if not self.rate_limiter.allow_request(request):
raise RateLimitExceededException()
# Get healthy servers
healthy_servers = self.health_monitor.get_healthy_servers()
if not healthy_servers:
raise NoHealthyServersException()
# Predict future load for each server
load_predictions = self.predictor.predict_loads(healthy_servers, request)
# Select optimal server using weighted scoring
optimal_server = self.select_optimal_server(healthy_servers, load_predictions, request)
# Check circuit breaker
circuit_breaker = self.circuit_breakers[optimal_server.id]
if not circuit_breaker.allow_request():
# Fallback to secondary server
return self.get_fallback_server(healthy_servers, optimal_server)
return optimal_server
def select_optimal_server(self, servers: List[Server], predictions: Dict[str, float], request: Request) -> Server:
"""Advanced server selection using multiple algorithms"""
scores = {}
for server in servers:
# Base score from weighted round robin
wrr_score = self.calculate_wrr_score(server)
# Least connections score
lc_score = self.calculate_least_connections_score(server)
# Predicted load score
pred_score = 1.0 / (predictions[server.id] + 1)
# Geographic proximity score (for geo-distributed systems)
geo_score = self.calculate_geo_proximity_score(server, request)
# Server affinity score (for session stickiness)
affinity_score = self.calculate_affinity_score(server, request)
# Weighted combination
total_score = (0.3 * wrr_score + 0.2 * lc_score + 0.2 * pred_score +
0.2 * geo_score + 0.1 * affinity_score)
scores[server.id] = total_score
# Return server with highest score
best_server_id = max(scores.keys(), key=lambda k: scores[k])
return next(s for s in servers if s.id == best_server_id)
def handle_server_response(self, server: Server, response: Response, latency: float) -> None:
"""Phase 2: Adaptive learning from server responses"""
# Update circuit breaker
circuit_breaker = self.circuit_breakers[server.id]
if response.is_successful():
circuit_breaker.record_success()
else:
circuit_breaker.record_failure()
# Update load predictor with actual performance data
self.predictor.update_model(server, response, latency)
# Update health monitor
self.health_monitor.record_response(server, response, latency)
# Adapt rate limiting based on system performance
self.rate_limiter.adapt_limits(response, latency)
def optimize_configuration(self) -> None:
"""Phase 3: Continuous optimization based on performance metrics"""
metrics = self.collect_performance_metrics()
# Optimize server weights
new_weights = self.optimize_server_weights(metrics)
self.update_server_weights(new_weights)
# Optimize circuit breaker thresholds
new_cb_config = self.optimize_circuit_breaker_config(metrics)
self.update_circuit_breaker_config(new_cb_config)
# Optimize rate limiting parameters
new_rl_config = self.optimize_rate_limiting_config(metrics)
self.rate_limiter.update_configuration(new_rl_config)
# Problem: Advanced Search Engine with Multiple Ranking Algorithms
class AdvancedSearchEngine:
def __init__(self):
self.index = InvertedIndex()
self.rankers = {
'tfidf': TFIDFRanker(),
'pagerank': PageRankRanker(),
'bert': BERTRanker(),
'learning_to_rank': LearningToRankModel()
}
self.query_processor = QueryProcessor()
self.personalizer = PersonalizationEngine()
def search(self, query: str, user_context: UserContext, max_results: int = 10) -> SearchResults:
"""Multi-phase search with advanced ranking"""
# Phase 1: Query processing and candidate retrieval
processed_query = self.query_processor.process(query)
candidates = self.retrieve_candidates(processed_query)
if not candidates:
return SearchResults([])
# Phase 2: Multi-algorithm ranking and fusion
rankings = {}
for name, ranker in self.rankers.items():
rankings[name] = ranker.rank(candidates, processed_query, user_context)
# Fuse rankings using sophisticated combination
fused_ranking = self.fuse_rankings(rankings, query, user_context)
# Phase 3: Personalization and re-ranking
personalized_results = self.personalizer.personalize(fused_ranking, user_context)
# Apply diversity and freshness filters
final_results = self.apply_diversity_filters(personalized_results, max_results)
return SearchResults(final_results[:max_results])
def fuse_rankings(self, rankings: Dict[str, List[Document]], query: str, context: User