Categorical Encoding Functions
def create_category_mapping(data_column):
"""Map string categories to integers"""
categories = np.unique(data_column)
return {cat: i for i, cat in enumerate(categories)}
def encode_column(data_column, mapping):
"""Convert string column to numeric codes"""
return np.array([mapping[val] for val in data_column], dtype=np.int8)
def decode_column(encoded_column, mapping):
"""Convert numeric codes back to strings"""
inv_mapping = {v: k for k, v in mapping.items()}
return np.array([inv_mapping[val] for val in encoded_column])
These functions handle conversion between string categories and numeric representations:
Function |
Purpose |
Returns |
create_category_mapping |
Creates lookup dictionary for categories |
{category: integer} mapping |
encode_column |
Converts string data to numeric codes |
NumPy array of integers |
decode_column |
Restores original string categories |
NumPy array of strings |
Worker Function
def worker_function(args):
"""Worker function using only NumPy"""
shm_name, shape, dtype, col_mappings, criteria, params = args
temp, cooling_rate, iterations = params
# Access shared memory
existing_shm = shared_memory.SharedMemory(name=shm_name)
try:
# Create numpy array view of shared memory
data_array = np.ndarray(shape, dtype=dtype, buffer=existing_shm.buf)
# Initialize solution
current_pop = initialize_population_numpy(data_array, col_mappings, criteria)
current_cost = calculate_cost_numpy(current_pop, col_mappings, criteria)
# Annealing process
for _ in range(iterations):
candidate_pop = current_pop.copy()
candidate_pop = perturb_population_numpy(candidate_pop, data_array, col_mappings, criteria)
candidate_cost = calculate_cost_numpy(candidate_pop, col_mappings, criteria)
if (candidate_cost < current_cost or
np.random.random() < np.exp(-(candidate_cost - current_cost)/temp)):
current_pop = candidate_pop
current_cost = candidate_cost
temp *= cooling_rate
return current_pop
finally:
existing_shm.close()
The worker function performs simulated annealing in parallel:
- Accesses shared data through memory mapping
- Creates initial synthetic population
- Iteratively improves the population through perturbations
- Returns the optimized synthetic population
Note: Each worker maintains its own temperature schedule and accepts solutions
probabilistically.
Population Management Functions
Initialization
def initialize_population_numpy(data, col_mappings, criteria):
"""Create initial population using NumPy"""
if criteria['method'] == 'stratified':
# Get group column indices
group_cols = [i for i, col in enumerate(col_mappings)
if col['name'] in criteria['groupby_cols']]
# Find unique group combinations
group_values = data[:, group_cols]
unique_groups = np.unique(group_values, axis=0)
# Sample from each group
samples = []
samples_per_group = max(1, criteria['sample_size'] // len(unique_groups))
for group in unique_groups:
mask = np.all(group_values == group, axis=1)
group_data = data[mask]
if len(group_data) > 0:
selected = group_data[np.random.choice(
len(group_data),
size=min(len(group_data), samples_per_group),
replace=True
)]
samples.append(selected)
return np.vstack(samples)
else: # Random sampling
indices = np.random.choice(len(data), size=criteria['sample_size'], replace=True)
return data[indices]
Perturbation
def perturb_population_numpy(population, data, col_mappings, criteria):
"""Modify population through random perturbations"""
n_changes = max(1, int(len(population) * criteria['perturb_rate']))
for _ in range(n_changes):
idx = np.random.randint(len(population))
if criteria['method'] == 'stratified':
# Find similar records
group_cols = [i for i, col in enumerate(col_mappings)
if col['name'] in criteria['groupby_cols']]
mask = np.all(
data[:, group_cols] == population[idx, group_cols],
axis=1
)
candidates = data[mask]
else:
candidates = data
if len(candidates) > 0:
population[idx] = candidates[np.random.choice(len(candidates))]
return population
Cost Calculation
def calculate_cost_numpy(population, col_mappings, criteria):
"""Calculate cost using NumPy"""
if criteria['method'] == 'stratified':
group_cols = [i for i, col in enumerate(col_mappings)
if col['name'] in criteria['groupby_cols']]
# Count group occurrences
unique_groups, counts = np.unique(
population[:, group_cols],
axis=0,
return_counts=True
)
# Calculate proportions
actual_proportions = counts / counts.sum()
# Compare to target (assuming target_proportions is pre-aligned)
target = np.array([criteria['target_proportions'][tuple(group)]
for group in unique_groups])
return np.mean((actual_proportions - target) ** 2)
else:
# Maximize diversity (count unique values)
return -sum(len(np.unique(population[:, i])) for i in range(population.shape[1]))
Main Generation Function
def generate_populations(original_data, criteria_list, n_processes=None):
"""Generate synthetic populations in parallel"""
if n_processes is None:
n_processes = min(len(criteria_list), os.cpu_count())
# Convert dictionary to arrays and encode categoricals
col_mappings = []
encoded_columns = []
for col_name, col_data in original_data.items():
col_data = np.array(col_data)
if col_data.dtype == object: # String data
mapping = create_category_mapping(col_data)
encoded = encode_column(col_data, mapping)
col_mappings.append({'name': col_name, 'mapping': mapping})
else:
encoded = col_data
col_mappings.append({'name': col_name, 'mapping': None})
encoded_columns.append(encoded)
# Create combined numpy array
data_array = np.column_stack(encoded_columns)
# Create shared memory block
shm = shared_memory.SharedMemory(create=True, size=data_array.nbytes)
try:
shared_array = np.ndarray(data_array.shape, dtype=data_array.dtype, buffer=shm.buf)
np.copyto(shared_array, data_array)
# Prepare worker arguments
common_params = (1000.0, 0.95, 1000) # temp, cooling_rate, iterations
args_list = [(
shm.name,
data_array.shape,
data_array.dtype,
col_mappings,
criteria,
common_params
) for criteria in criteria_list]
# Execute in parallel
with Pool(n_processes) as pool:
results = pool.map(worker_function, args_list)
# Decode results back to original format
decoded_results = []
for result in results:
decoded = {}
for i, col in enumerate(col_mappings):
if col['mapping'] is not None:
decoded[col['name']] = decode_column(result[:, i], col['mapping'])
else:
decoded[col['name']] = result[:, i]
decoded_results.append(decoded)
return decoded_results
finally:
shm.close()
shm.unlink()
The main function orchestrates the entire process:
- Converts input data to NumPy format
- Handles categorical encoding
- Sets up shared memory
- Launches parallel workers
- Decodes and returns results