Synthetic Population Generation with NumPy

This document explains the parallel synthetic population generation system using NumPy and shared memory.

System Overview

The system generates synthetic populations through parallel simulated annealing, with these key features:

Data Processing Pipeline

  1. Input Preparation: Convert dictionary data to NumPy arrays
  2. Categorical Encoding: Map string categories to integers
  3. Shared Memory Setup: Create shared memory block for parallel access
  4. Parallel Processing: Workers generate synthetic populations
  5. Result Decoding: Convert numeric categories back to strings

Core Function Documentation

Categorical Encoding Functions

def create_category_mapping(data_column):
        """Map string categories to integers"""
        categories = np.unique(data_column)
        return {cat: i for i, cat in enumerate(categories)}
    
    def encode_column(data_column, mapping):
        """Convert string column to numeric codes"""
        return np.array([mapping[val] for val in data_column], dtype=np.int8)
    
    def decode_column(encoded_column, mapping):
        """Convert numeric codes back to strings"""
        inv_mapping = {v: k for k, v in mapping.items()}
        return np.array([inv_mapping[val] for val in encoded_column])

These functions handle conversion between string categories and numeric representations:

Function Purpose Returns
create_category_mapping Creates lookup dictionary for categories {category: integer} mapping
encode_column Converts string data to numeric codes NumPy array of integers
decode_column Restores original string categories NumPy array of strings

Worker Function

def worker_function(args):
        """Worker function using only NumPy"""
        shm_name, shape, dtype, col_mappings, criteria, params = args
        temp, cooling_rate, iterations = params
        
        # Access shared memory
        existing_shm = shared_memory.SharedMemory(name=shm_name)
        try:
            # Create numpy array view of shared memory
            data_array = np.ndarray(shape, dtype=dtype, buffer=existing_shm.buf)
            
            # Initialize solution
            current_pop = initialize_population_numpy(data_array, col_mappings, criteria)
            current_cost = calculate_cost_numpy(current_pop, col_mappings, criteria)
            
            # Annealing process
            for _ in range(iterations):
                candidate_pop = current_pop.copy()
                candidate_pop = perturb_population_numpy(candidate_pop, data_array, col_mappings, criteria)
                candidate_cost = calculate_cost_numpy(candidate_pop, col_mappings, criteria)
                
                if (candidate_cost < current_cost or 
                    np.random.random() < np.exp(-(candidate_cost - current_cost)/temp)):
                    current_pop = candidate_pop
                    current_cost = candidate_cost
                    
                temp *= cooling_rate
                
            return current_pop
        finally:
            existing_shm.close()

The worker function performs simulated annealing in parallel:

  1. Accesses shared data through memory mapping
  2. Creates initial synthetic population
  3. Iteratively improves the population through perturbations
  4. Returns the optimized synthetic population
Note: Each worker maintains its own temperature schedule and accepts solutions probabilistically.

Population Management Functions

Initialization

def initialize_population_numpy(data, col_mappings, criteria):
        """Create initial population using NumPy"""
        if criteria['method'] == 'stratified':
            # Get group column indices
            group_cols = [i for i, col in enumerate(col_mappings) 
                         if col['name'] in criteria['groupby_cols']]
            
            # Find unique group combinations
            group_values = data[:, group_cols]
            unique_groups = np.unique(group_values, axis=0)
            
            # Sample from each group
            samples = []
            samples_per_group = max(1, criteria['sample_size'] // len(unique_groups))
            
            for group in unique_groups:
                mask = np.all(group_values == group, axis=1)
                group_data = data[mask]
                if len(group_data) > 0:
                    selected = group_data[np.random.choice(
                        len(group_data), 
                        size=min(len(group_data), samples_per_group),
                        replace=True
                    )]
                    samples.append(selected)
            
            return np.vstack(samples)
        else:  # Random sampling
            indices = np.random.choice(len(data), size=criteria['sample_size'], replace=True)
            return data[indices]

Perturbation

def perturb_population_numpy(population, data, col_mappings, criteria):
        """Modify population through random perturbations"""
        n_changes = max(1, int(len(population) * criteria['perturb_rate']))
        
        for _ in range(n_changes):
            idx = np.random.randint(len(population))
            
            if criteria['method'] == 'stratified':
                # Find similar records
                group_cols = [i for i, col in enumerate(col_mappings) 
                             if col['name'] in criteria['groupby_cols']]
                mask = np.all(
                    data[:, group_cols] == population[idx, group_cols],
                    axis=1
                )
                candidates = data[mask]
            else:
                candidates = data
                
            if len(candidates) > 0:
                population[idx] = candidates[np.random.choice(len(candidates))]
                
        return population

Cost Calculation

def calculate_cost_numpy(population, col_mappings, criteria):
        """Calculate cost using NumPy"""
        if criteria['method'] == 'stratified':
            group_cols = [i for i, col in enumerate(col_mappings) 
                         if col['name'] in criteria['groupby_cols']]
            
            # Count group occurrences
            unique_groups, counts = np.unique(
                population[:, group_cols], 
                axis=0, 
                return_counts=True
            )
            
            # Calculate proportions
            actual_proportions = counts / counts.sum()
            
            # Compare to target (assuming target_proportions is pre-aligned)
            target = np.array([criteria['target_proportions'][tuple(group)] 
                             for group in unique_groups])
            return np.mean((actual_proportions - target) ** 2)
        else:
            # Maximize diversity (count unique values)
            return -sum(len(np.unique(population[:, i])) for i in range(population.shape[1]))

Main Generation Function

def generate_populations(original_data, criteria_list, n_processes=None):
        """Generate synthetic populations in parallel"""
        if n_processes is None:
            n_processes = min(len(criteria_list), os.cpu_count())
        
        # Convert dictionary to arrays and encode categoricals
        col_mappings = []
        encoded_columns = []
        for col_name, col_data in original_data.items():
            col_data = np.array(col_data)
            if col_data.dtype == object:  # String data
                mapping = create_category_mapping(col_data)
                encoded = encode_column(col_data, mapping)
                col_mappings.append({'name': col_name, 'mapping': mapping})
            else:
                encoded = col_data
                col_mappings.append({'name': col_name, 'mapping': None})
            encoded_columns.append(encoded)
        
        # Create combined numpy array
        data_array = np.column_stack(encoded_columns)
        
        # Create shared memory block
        shm = shared_memory.SharedMemory(create=True, size=data_array.nbytes)
        try:
            shared_array = np.ndarray(data_array.shape, dtype=data_array.dtype, buffer=shm.buf)
            np.copyto(shared_array, data_array)
            
            # Prepare worker arguments
            common_params = (1000.0, 0.95, 1000)  # temp, cooling_rate, iterations
            args_list = [(
                shm.name,
                data_array.shape,
                data_array.dtype,
                col_mappings,
                criteria,
                common_params
            ) for criteria in criteria_list]
            
            # Execute in parallel
            with Pool(n_processes) as pool:
                results = pool.map(worker_function, args_list)
            
            # Decode results back to original format
            decoded_results = []
            for result in results:
                decoded = {}
                for i, col in enumerate(col_mappings):
                    if col['mapping'] is not None:
                        decoded[col['name']] = decode_column(result[:, i], col['mapping'])
                    else:
                        decoded[col['name']] = result[:, i]
                decoded_results.append(decoded)
                
            return decoded_results
        finally:
            shm.close()
            shm.unlink()

The main function orchestrates the entire process:

  1. Converts input data to NumPy format
  2. Handles categorical encoding
  3. Sets up shared memory
  4. Launches parallel workers
  5. Decodes and returns results

Usage Example

# Sample data generation (as dictionary)
    rng = np.random.default_rng(42)
    size = 1000
    
    original_data = {
        'age': rng.integers(18, 70, size),
        'sex': rng.choice(['M', 'F'], size),
        'income': rng.normal(50000, 15000, size).astype(int),
        'employment': rng.choice(
            ['employed', 'unemployed', 'student', 'retired'], 
            size,
            p=[0.6, 0.1, 0.2, 0.1]
        )
    }
    
    # Define criteria
    criteria_list = [
        {
            'method': 'stratified',
            'groupby_cols': ['sex', 'employment'],
            'target_proportions': {
                ('M', 'employed'): 0.3,
                ('M', 'unemployed'): 0.1,
                ('M', 'student'): 0.1,
                ('M', 'retired'): 0.05,
                ('F', 'employed'): 0.3,
                ('F', 'unemployed'): 0.05,
                ('F', 'student'): 0.05,
                ('F', 'retired'): 0.05
            },
            'sample_size': 200,
            'perturb_rate': 0.05
        },
        {
            'method': 'random',
            'sample_size': 300,
            'perturb_rate': 0.1
        }
    ]
    
    # Generate populations
    populations = generate_populations(original_data, criteria_list)

Performance Notes