RUFAS.biophysical.manure.manure_manager module#

class RUFAS.biophysical.manure.manure_manager.ManureManager#

Bases: object

Manages the manure processing system by handling processor definitions, connections, adjacency matrix, and processing order.

Attributes#

_omOutputManager

An instance of OutputManager for logging errors and information.

all_processorsdict[str, Processor]

A dictionary mapping processor names to their instances.

_all_separatorsdict[str, Separator]

A dictionary mapping separator names to their instances.

_adjacency_matrixdict[str, dict[str, float]]

A matrix defining the connections between processors, weighted by connection properties.

_processing_orderlist[Processor]

A list defining the execution order of processors.

__init__() None#
run_daily_update(manure_streams: dict[str, ManureStream], time: RufasTime, current_day_conditions: CurrentDayConditions) None#

Executes the daily update for all processors in the defined processing order.

Parameters#

manure_streamsdict[str, ManureStream]

A dictionary of all the daily manure streams from the animal module.

timeRufasTime

The current time in the simulation.

current_day_conditionsCurrentDayConditions

The current day conditions.

Raises#

ValueError

If a first-processor name is not found in the list of all processors.

_build_nutrient_pools() None#

Build the pool for aggregated storage type.

_normalize_destination_name(destination_name: str) str#

Normalizes the destination name by removing suffixes for solid and liquid outputs.

Parameters#

destination_namestr

The non-normalized name of the destination processor.

Returns#

str

The normalized name of the destination processor.

_generate_origin_key(processor_name: str, output_key: str) str#

Generates the origin key for the adjacency matrix based on the processor name and output key.

Parameters#

processor_namestr

The name of the processor.

output_keystr

The output key, which can be “manure”, “solid”, or “liquid”.

Returns#

str

The generated origin key for the adjacency matrix.

Raises#

ValueError

If the output key is not recognized or if it does not match the expected format.

_validate_adjacency_matrix() None#

Validates the structure and content of the generated adjacency matrix.

This method enforces two key invariants for the manure processor graph:

  1. Self-loops are not allowed — a processor cannot send output to itself. This is validated by ensuring that the diagonal entry (i.e., origin -> origin) in each adjacency matrix column is zero.

  2. Outgoing proportions must be normalized — for each origin processor, the total sum of outgoing connection proportions (i.e., the values across that column) must be either 0 (no connections) or 1 (fully distributed output).

These checks ensure the integrity of the processor network: no unintended feedback loops exist, and all flow proportions are either well-defined or explicitly zeroed.

Raises#

ValueError

If a self-loop is found or if an origin has outgoing proportions that do not sum to 0 or 1.

_traverse_adjacency_matrix() list[str]#

Determines a valid processing order of manure processors via topological sorting.

This method merges separator-related rows in the adjacency matrix, computes in-degrees for all processors, and performs a topological sort to ensure upstream processors are handled before downstream ones.

Returns#

list[str]

A list of processor names in the order they should be processed.

Raises#

ValueError

If a cycle exists in the processor graph, making topological sort impossible.

_merge_separator_rows() dict[str, dict[str, float]]#

Creates a version of the adjacency matrix with merged separator rows for graph traversal.

Each separator is originally represented by three separate rows: - {separator}_input - {separator}_solid_output - {separator}_liquid_output

This function merges them into a single row keyed by the base separator name (e.g., ‘separator1’). It also removes internal separator suffixes from destination references to simplify traversal.

Returns#

dict[str, dict[str, float]]

A modified adjacency matrix where separator rows are merged and internal suffixes removed.

Raises#

ValueError

If a destination receives output from both solid and liquid separator streams.

static _perform_topological_sort(in_degree: dict[str, int], heap: list[str], matrix_to_traverse: dict[str, dict[str, float]]) list[str]#

Performs topological sorting of the processors using Kahn’s algorithm.

This method processes nodes in order of zero in-degree, removing each from the graph and reducing the in-degree of its downstream neighbors. When a neighbor’s in-degree becomes zero, it is added to the processing queue.

Parameters#

in_degreedict[str, int]

Mapping of each processor to the number of upstream dependencies it has.

heaplist[str]

Initial list of processors with in-degree zero (i.e., ready to be processed).

matrix_to_traversedict[str, dict[str, float]]

The adjacency matrix representing directed connections between processors. Edges with weight 0.0 are ignored.

Returns#

list[str]

A list of processor names in a valid topological order.

_get_processor_configs_by_name(manure_management_config: dict[str, list[dict[str, Any]]]) dict[str, dict[str, Any]]#

Validates the uniqueness of processor names within the manure management configuration.

Parameters#

manure_management_configdict[str, list[dict[str, Any]]]

A dictionary containing lists of processor configurations grouped by categories such as ‘anaerobic_digester’, ‘separator’, ‘storage’, and ‘handler’.

Returns#

dict[str, dict[str, Any]]

A dictionary mapping each unique processor name to its corresponding processor configuration dictionary.

Notes#

The method internally combines all processor configurations from different categories, extracts all processor names, checks for duplicates, and creates a mapping of processor names to their respective configurations.

_check_for_duplicate_processor_names(all_processor_names: list[str]) None#

Checks for duplicate processor names in the provided list.

If duplicate processor names are found, this method logs an error message and raises a ValueError indicating the duplicate names.

Parameters#

all_processor_nameslist[str]

A list of processor names to be checked for duplicates.

Raises#

ValueError

If duplicate processor names are found, a ValueError is raised with the details of the duplicates.

_validate_and_parse_processor_connections(processor_connections_input: dict[str, list[dict[str, Any]]], processor_configs_by_name: dict[str, dict[str, Any]]) dict[str, dict[str, list[dict[str, Any]]]]#

Validates and parses the processor connections defined in the manure management configuration.

Parameters#

processor_connections_inputdict[str, list[dict[str, Any]]]

The processor connection configuration, containing regular processor and separator connections.

processor_configs_by_namedict[str, dict[str, Any]]

A dictionary mapping processor names to their respective configurations.

Returns#

dict[str, dict[str, list[dict[str, Any]]]]

A dictionary mapping processor names to their respective connection details.

_check_for_unknown_processor_names(processor_names_in_connection_map: set[str], processor_configs_by_name: dict[str, dict[str, Any]]) None#

Validates if all processor names referenced in connection config are defined in the processor configurations.

Parameters#

processor_names_in_connection_mapset[str]

Set of all processor names referenced in the connection configuration.

processor_configs_by_namedict[str, dict[str, Any]]

Dictionary mapping processor names to their respective configurations.

Raises#

ValueError

If any referenced processor name does not exist in the processor configurations.

_check_for_processors_without_connection_definition(processor_names_in_connection_map: set[str], processor_connections_by_name: dict[str, dict[str, list[dict[str, Any]]]]) None#

Checks for processors that are referenced but lack connection definitions.

Parameters#

processor_names_in_connection_mapset[str]

A set of names of all processors that are referenced and expected to have routing configurations.

processor_connections_by_namedict[str, dict[str, list[dict[str, Any]]]]

A mapping of processor names to their routing connections, defining the configuration details.

Raises#

ValueError

If any processors are found to be missing a routing configuration.

_find_all_processor_names_in_connection_map(processor_connections: list[dict[str, Any]]) set[str]#

Retrieves all referenced processor names from a list of processor connections.

Parameters#

processor_connectionslist[dict[str, Any]]

A list containing dictionaries that define connections between processors. Each dictionary is expected to have a “processor_name” key, and either “solid_output_destinations” and “liquid_output_destinations”, or “destinations”.

Returns#

set[str]

A set of all unique processor names (both as origin and as destination) referenced in the connections.

_build_processor_connection_map(processor_connections: list[dict[str, Any]]) dict[str, dict[str, list[dict[str, Any]]]]#

Adds a list of processor connections to a structured map.

Parameters#

processor_connectionslist[dict[str, Any]]

A list of dictionaries, where each dictionary represents a processor connection. Each dictionary should include information about the processor’s name and its destinations.

Returns#

dict[str, dict[str, list[dict[str, Any]]]]

A dictionary mapping processor names to their connection details. If the processor acts as a separator, it contains keys including “solid_output_destinations” and “liquid_output_destinations”. Otherwise, it contains a key “destinations”.

Raises#

ValueError

If duplicate connection definitions are found for a processor name.

Examples#

>>> connections = [
...     {
...         "processor_name": "Handler1",
...         "destinations": [{"name": "Separator1", "proportion": 1.0}],
...     },
...     {
...         "processor_name": "Storage1",
...         "destinations": [],
...     },
...     {
...         "processor_name": "Storage2",
...         "destinations": [],
...     },
...     {
...         "processor_name": "Separator1",
...         "solid_output_destinations": [{"name": "Storage1", "proportion": 1.0}],
...         "liquid_output_destinations": [{"name": "Storage2", "proportion": 1.0}],
...     },
... ]
>>> self._build_processor_connection_map(connections)
{
    "Handler1": {
        "destinations": [{"name": "Separator1", "proportion": 1.0}]
    },
    "Storage1": {
        "destinations": []
    },
    "Storage2": {
        "destinations": []
    },
    "Separator1": {
        "solid_output_destinations": [{"name": "Storage1", "proportion": 1.0}],
        "liquid_output_destinations": [{"name": "Storage2", "proportion": 1.0}],
    }
}
_create_all_processors(processor_connections_by_name: dict[str, dict[str, list[dict[str, Any]]]], processor_configs_by_name: dict[str, dict[str, Any]]) None#

Creates and initializes all processors based on their definitions.

Parameters#

processor_connections_by_namedict[str, dict[str, list[dict[str, Any]]]]

A dictionary that maps processor names to their associated connection configurations.

processor_configs_by_namedict[str, dict[str, Any]]

A dictionary that contains processor definitions, where each key is the processor name and the value is a dictionary with the processor’s parameters and type.

_populate_adjacency_matrix(processor_connections_by_name: dict[str, dict[str, list[dict[str, Any]]]]) None#

Builds the adjacency matrix using processor connection data. This method iterates over the provided connection mappings, identifying whether each processor is a separator or a standard processor. It then creates corresponding columns in the adjacency matrix and fills in output proportions based on the processor type: - For separators: handles both solid and liquid output destinations. - For other processors: handles general destinations.

Parameters#

processor_connections_by_namedict[str, dict[str, list[dict[str, Any]]]]

A dictionary where the keys are processor names, and the values contain information about their connections to other processors.

_create_column_in_adjacency_matrix(origin_name: str, row_names: list[str], is_separator: bool) None#

Add a column to the adjacency matrix for a given origin node.

This method modifies the adjacency matrix to include connections from the specified origin node to a list of destination nodes. For separators, it creates multiple columns representing distinct output types (input, solid output, liquid output). For non-separators, a single column is created.

Parameters#

origin_namestr

The name of the origin node for which the column(s) will be created.

row_nameslist[str]

The list of destination node names to initialize in the adjacency matrix.

is_separatorbool

A flag indicating whether the origin node is a separator

_populate_destination_proportions(connections: list[dict[str, Any]], origin_name: str) None#

Populate the destination proportions for the given origin in the adjacency matrix.

This method updates the adjacency matrix to store the proportion of connections from the specified origin to each destination. If the receiving processor name corresponds to an separator, its name is modified to include the ‘_input’ suffix before updating the matrix.

Parameters#

connectionslist[dict[str, Any]]

A list of connection dictionaries, where each dictionary contains information about the receiving processor name and the proportion of the connection.

origin_namestr

The name of the origin from which connections are originating.

_generate_adjacency_matrix_keys() list[str]#

Generates a list of keys to be used in constructing an adjacency matrix.

Returns#

list[str]

A list of keys representing the rows/columns of the adjacency matrix.

request_nutrients(request: NutrientRequest, simulate_animals: bool, time: RufasTime) NutrientRequestResults#

Handle the request for specific nutrients from the crop and soil module. This method evaluates the nutrient request made by considering both nitrogen and phosphorus quantities desired. It calculates the projected manure mass that would satisfy the request and checks against the nutrients available in the manager.

If the request can be fulfilled either partially or wholly, the corresponding amount of nutrients is subtracted from the manager’s internal bookkeeping. The method then returns the results of the nutrient request, which detail the amounts of nutrients that can be provided to fulfill the request. If the request cannot be fulfilled at all, the method will return None.

Notes#

This is a wrapper method that calls the request_nutrients method of the manure nutrient manager.

Parameters#

requestNutrientRequest

The specific nutrient request, including quantities of nitrogen and phosphorus.

simulate_animalsbool

Indicates whether animals are being simulated.

timeRufasTime

The current time in the simulation.

Returns#

NutrientRequestResults | None

The results of the nutrient request, detailed in a NutrientRequestResults object, which includes the amount of nitrogen, phosphorus, total manure mass, dry matter, and others that can be provided to fulfill the request. Returns None if the request cannot be fulfilled.

_remove_nutrients_from_storage(results: NutrientRequestResults, manure_type: ManureType) None#

Remove nutrients from the storage based on the results of a nutrient request by manure type.

Parameters#

resultsNutrientRequestResults

The results of a nutrient request. See NutrientsRequestResults for details.

static _compute_stream_after_removal(stored_manure: ManureStream, nutrient_removal_proportion: float, is_nitrogen_limiting_nutrient: bool, non_limiting_fields: list[str]) tuple[ManureStream, dict[str, Any]]#

Returns a new ManureStream with removals applied, plus a dict of how much was removed for each attribute.

Parameters#

stored_manureManureStream

The stored manure in storage.

nutrient_removal_proportionfloat

The proportion of nutrient to remove (unitless).

is_nitrogen_limiting_nutrientbool

Determine if nitrogen is limiting nutrient.

non_limiting_fieldslist[str]

A list containing the attribute names of nutrient fields to handle.

Returns#

tuple[Manure Stream, dict[str, Any]]

The stream after removal. The detail of the amount of nutrients removed.

static _determine_non_limiting_nutrient_removal_amount(limiting_nutrient_proportion_to_be_removed: float, non_limiting_nutrients_amount: float) float#

Calculates the amount of non-limiting nutrients to remove in each storage.

Parameters#

limiting_nutrient_proportion_to_be_removedfloat

The proportion of limiting nutrient to remove from each storage.

non_limiting_nutrients_amountfloat

The amount of non-limiting nutrient in the storage (kg).

Returns#

float

The amount of non-limiting nutrients to remove in each storage (kg).

static _determine_limiting_nutrient(requested_nitrogen_mass: float, nitrogen_fraction: float, requested_phosphorus_mass: float, phosphorus_fraction: float) bool#

Determines the limiting nutrient to remove.

Parameters#

requested_nitrogen_massfloat

The mass of nitrogen requested (kg).

nitrogen_fractionfloat

The fraction of nitrogen in the combined pool (unitless).

requested_phosphorus_massfloat

The mass of phosphorus requested (kg).

phosphorus_fractionfloat

The fraction of phosphorus in the combined pool (unitless).

Returns#

bool

If true, nitrogen is the limiting nutrients. If false, phosphorus is the limiting nutrients.

static _determine_nutrient_proportion_to_be_removed(limiting_nutrient_requested_mass: float, limited_nutrient_available: float) float#

Calculates the proportion of limiting nutrients to remove.

Parameters#

limiting_nutrient_requested_massfloat

The requested mass of limited nutrient (kg).

limited_nutrient_availablefloat

The amount of limited nutrient available in the total pool(kg).

Returns#

float

The proportion of limiting nutrient to remove from each storage.

_record_manure_request_results(manure_request_results: NutrientRequestResults | None, manure_source: str, time: RufasTime) None#

Record the results of a manure request in the Output Manager.

Parameters#

manure_request_resultsNutrientRequestResults | None

The results of a manure request. If None, it means that there was no available on-farm manure.

manure_sourcestr

The source of the manure.

timeRufasTime

The current time in the simulation.

static _calculate_supplemental_manure_needed(on_farm_manure: NutrientRequestResults | None, nutrient_request: NutrientRequest) NutrientRequest#

Calculate the amount of supplemental manure needed to fulfill the nutrient request.

Parameters#

on_farm_manureNutrientRequestResults | None

The results of the nutrient request for manure available from the farm. If None, it means that there was no available on-farm manure.

nutrient_requestNutrientRequest

The nutrient request.

Returns#

NutrientRequest

The request for supplemental manure needed to fulfill the original nutrient request.