A more full explanation of the code can be found here
A colab implementation of the code can be found here.
import numpy as np
from multiprocessing import Pool, shared_memory
import os
# Helper functions for categorical encoding
def create_category_mapping(data_column):
"""Map string categories to integers"""
categories = np.unique(data_column)
return {cat: i for i, cat in enumerate(categories)}
def encode_column(data_column, mapping):
"""Convert string column to numeric codes"""
return np.array([mapping[val] for val in data_column], dtype=np.int8)
def decode_column(encoded_column, mapping):
"""Convert numeric codes back to strings"""
inv_mapping = {v: k for k, v in mapping.items()}
return np.array([inv_mapping[val] for val in encoded_column])
# Main worker function (NumPy only)
def worker_function(args):
"""Worker function using only NumPy"""
shm_name, shape, dtype, col_mappings, criteria, params = args
temp, cooling_rate, iterations = params
# Access shared memory
existing_shm = shared_memory.SharedMemory(name=shm_name)
try:
# Create numpy array view of shared memory
data_array = np.ndarray(shape, dtype=dtype, buffer=existing_shm.buf)
# Initialize solution
current_pop = initialize_population_numpy(data_array, col_mappings, criteria)
current_cost = calculate_cost_numpy(current_pop, col_mappings, criteria)
# Annealing process
for _ in range(iterations):
candidate_pop = current_pop.copy()
candidate_pop = perturb_population_numpy(candidate_pop, data_array, col_mappings, criteria)
candidate_cost = calculate_cost_numpy(candidate_pop, col_mappings, criteria)
if (candidate_cost < current_cost or
np.random.random() < np.exp(-(candidate_cost - current_cost)/temp)):
current_pop = candidate_pop
current_cost = candidate_cost
temp *= cooling_rate
return current_pop
finally:
existing_shm.close()
def initialize_population_numpy(data, col_mappings, criteria):
"""Create initial population using NumPy"""
if criteria['method'] == 'stratified':
# Get group column indices
group_cols = [i for i, col in enumerate(col_mappings)
if col['name'] in criteria['groupby_cols']]
# Find unique group combinations
group_values = data[:, group_cols]
unique_groups = np.unique(group_values, axis=0)
# Sample from each group
samples = []
samples_per_group = max(1, criteria['sample_size'] // len(unique_groups))
for group in unique_groups:
mask = np.all(group_values == group, axis=1)
group_data = data[mask]
if len(group_data) > 0:
selected = group_data[np.random.choice(
len(group_data),
size=min(len(group_data), samples_per_group),
replace=True
)]
samples.append(selected)
return np.vstack(samples)
else: # Random sampling
indices = np.random.choice(len(data), size=criteria['sample_size'], replace=True)
return data[indices]
def perturb_population_numpy(population, data, col_mappings, criteria):
"""Modify population through random perturbations"""
n_changes = max(1, int(len(population) * criteria['perturb_rate']))
for _ in range(n_changes):
idx = np.random.randint(len(population))
if criteria['method'] == 'stratified':
# Find similar records
group_cols = [i for i, col in enumerate(col_mappings)
if col['name'] in criteria['groupby_cols']]
mask = np.all(
data[:, group_cols] == population[idx, group_cols],
axis=1
)
candidates = data[mask]
else:
candidates = data
if len(candidates) > 0:
population[idx] = candidates[np.random.choice(len(candidates))]
return population
def calculate_cost_numpy(population, col_mappings, criteria):
"""Calculate cost using NumPy"""
if criteria['method'] == 'stratified':
group_cols = [i for i, col in enumerate(col_mappings)
if col['name'] in criteria['groupby_cols']]
# Count group occurrences
unique_groups, counts = np.unique(
population[:, group_cols],
axis=0,
return_counts=True
)
# Calculate proportions
actual_proportions = counts / counts.sum()
# Compare to target (assuming target_proportions is pre-aligned)
target = np.array([criteria['target_proportions'][tuple(group)]
for group in unique_groups])
return np.mean((actual_proportions - target) ** 2)
else:
# Maximize diversity (count unique values)
return -sum(len(np.unique(population[:, i])) for i in range(population.shape[1]))
def generate_populations(original_data, criteria_list, n_processes=None):
"""Generate synthetic populations in parallel"""
if n_processes is None:
n_processes = min(len(criteria_list), os.cpu_count())
# Convert dictionary to arrays and encode categoricals
col_mappings = []
encoded_columns = []
for col_name, col_data in original_data.items():
col_data = np.array(col_data)
if col_data.dtype == object: # String data
mapping = create_category_mapping(col_data)
encoded = encode_column(col_data, mapping)
col_mappings.append({'name': col_name, 'mapping': mapping})
else:
encoded = col_data
col_mappings.append({'name': col_name, 'mapping': None})
encoded_columns.append(encoded)
# Create combined numpy array
data_array = np.column_stack(encoded_columns)
# Create shared memory block
shm = shared_memory.SharedMemory(create=True, size=data_array.nbytes)
try:
shared_array = np.ndarray(data_array.shape, dtype=data_array.dtype, buffer=shm.buf)
np.copyto(shared_array, data_array)
# Prepare worker arguments
common_params = (1000.0, 0.95, 1000) # temp, cooling_rate, iterations
args_list = [(
shm.name,
data_array.shape,
data_array.dtype,
col_mappings,
criteria,
common_params
) for criteria in criteria_list]
# Execute in parallel
with Pool(n_processes) as pool:
results = pool.map(worker_function, args_list)
# Decode results back to original format
decoded_results = []
for result in results:
decoded = {}
for i, col in enumerate(col_mappings):
if col['mapping'] is not None:
decoded[col['name']] = decode_column(result[:, i], col['mapping'])
else:
decoded[col['name']] = result[:, i]
decoded_results.append(decoded)
return decoded_results
finally:
shm.close()
shm.unlink()
# Example usage with dictionary input
if __name__ == '__main__':
# Sample data generation (as dictionary)
rng = np.random.default_rng(42)
size = 1000
original_data = {
'age': rng.integers(18, 70, size),
'sex': rng.choice(['M', 'F'], size),
'income': rng.normal(50000, 15000, size).astype(int),
'employment': rng.choice(
['employed', 'unemployed', 'student', 'retired'],
size,
p=[0.6, 0.1, 0.2, 0.1]
)
}
# Define criteria
criteria_list = [
{
'method': 'stratified',
'groupby_cols': ['sex', 'employment'],
'target_proportions': {
('M', 'employed'): 0.3,
('M', 'unemployed'): 0.1,
('M', 'student'): 0.1,
('M', 'retired'): 0.05,
('F', 'employed'): 0.3,
('F', 'unemployed'): 0.05,
('F', 'student'): 0.05,
('F', 'retired'): 0.05
},
'sample_size': 200,
'perturb_rate': 0.05
},
{
'method': 'random',
'sample_size': 300,
'perturb_rate': 0.1
}
]
# Generate populations
populations = generate_populations(original_data, criteria_list)
# Print results
for i, pop in enumerate(populations):
print(f"\nPopulation {i+1}:")
for key, values in pop.items():
print(f"{key}: {values[:5]}...")