Skip to content

pyvolca — Python client

Python client for VoLCA — Life Cycle Assessment engine over Agribalyse and ecoinvent.

Full guide and tutorials: https://volca.run/docs/python/ Issues / source: https://github.com/ccomb/volca

Terminal window
pip install pyvolca

Requires Python ≥ 3.10 and a running VoLCA engine. Use Server (below) to run one as a child process, or point Client at any reachable instance.

First choose: connect to an existing server, or start one locally

Section titled “First choose: connect to an existing server, or start one locally”

pyvolca is only the Python client library. It does not contain the VoLCA databases and it does not install the VoLCA engine binary.

Most users should start with one of these two modes:

  • You already have access to a VoLCA server (for example a hosted server prepared by someone else): use Client only. You do not need volca.toml, and you do not need to install the VoLCA server locally.
  • You want Python to start a local VoLCA engine process for you: use download() once to fetch the VoLCA engine binary and reference data into the shared volca install dir (see Where artefacts are installed), then use Server to start it from Python. volca.toml is still a normal file path passed to Server(config=...); put it in your project directory, or pass an absolute path. Do not put it inside your virtualenv or inside site-packages.

For a hosted server, the minimal connection looks like this:

# no-test — replace with your real hosted VoLCA server URL and credentials.
from volca import Client
c = Client(
base_url="https://your-volca-server.example.com",
db="agribalyse-3.2",
password="your-api-token-or-password",
)
print(c.list_databases())

Use download() + Server only when you deliberately want to download and launch the engine from Python:

# no-test — downloads the engine and needs a real engine config/database.
from volca import Client, Server, download
installed = download() # cached after the first run
with Server(config="./volca.toml", binary=str(installed.binary)) as srv:
c = Client(base_url=srv.base_url, db="agribalyse-3.2", password=srv.password)
print(c.list_databases())

In this local mode, download() stores the engine binary and reference data in the shared volca install dir (see below). Server(config="./volca.toml") still means “read ./volca.toml relative to the current working directory”.

download() writes to the same OS-native location as the install.sh / install.ps1 shell installers, so any of the three tools populate the same directory:

PlatformDefault install root
Linux${XDG_DATA_HOME:-~/.local/share}/volca/
macOS~/Library/Application Support/volca/
Windows%LOCALAPPDATA%\volca\

Override with VOLCA_HOME=/full/path (full path; skips OS detection).

If you ran install.sh or install.ps1 first, Server() finds the installed engine without an extra download() call. If you previously used pyvolca < 0.4 it cached artefacts under <user_cache_dir>/pyvolca/ (Linux: ~/.cache/pyvolca/); that directory is no longer read and can be removed (rm -rf ~/.cache/pyvolca).

# no-test — needs a real engine; the snippets below run against a mocked Client.
from volca import Client, Server
with Server(config="volca.toml") as srv:
c = Client(base_url=srv.base_url, db="agribalyse-3.2", password=srv.password)
plants = c.search_activities(name="wheat flour, at plant", limit=5)
chain = c.get_supply_chain(plants[0].process_id, name="at farm")
score = c.get_impacts(plants[0].process_id, method_id=c.list_methods()[0]["id"])

This example starts a local engine process from Python. Server reads port and password from the TOML config. The engine self-stops after idle_timeout seconds without traffic (default 5 min).

Examples below assume c is a Client instance — construct it with the snippet above, or against an already-running server: c = Client(base_url="http://localhost:8080", db="agribalyse-3.2", password="…").

Which databases are loaded? Which LCIA methods can I score against? What classification systems can I filter on?

for db in c.list_databases():
print(f" {db.name} [{db.status}]: {db.activity_count} activities")
for m in c.list_methods()[:5]:
print(f" {m['id']} {m['name']} [{m['unit']}]")

Other listings: c.list_classifications() returns the classification systems and their values for the current database; c.list_presets() returns named filter presets configured in the engine. Use c.load_database(name) / c.unload_database(name) to manage memory if a database isn’t auto-loaded.

Which activity in the database represents the product I want to assess?

plants = c.search_activities(name="wheat flour, at plant", limit=5)
for a in plants:
print(f"{a.process_id} {a.name} ({a.location})")

Each Activity carries process_id, name, location, product, product_amount, product_unit. Narrow the query with geo="FR", classification=/classification_value= (ISIC/CPC), or set exact=True for an exact-name match. To search by flow name (technosphere products and biosphere flows) instead of activity name, use c.search_flows(query=...).

What goes into making this product? What does it emit? What’s its reference unit?

detail = c.get_activity(plants[0].process_id)
for ex in detail.technosphere_inputs:
print(f"{ex.amount:.4g} {ex.unit} of {ex.flow_name}{ex.target_activity}")

get_activity returns a typed ActivityDetail. Use .inputs / .outputs / .technosphere_inputs to filter the exchanges; each entry is an Exchange — either a TechnosphereExchange (an input or output of an intermediate product) or a BiosphereExchange (resource extracted or pollutant emitted).

What’s the full upstream chain — every ingredient, recursively, down to the farm or mine?

chain = c.get_supply_chain(plants[0].process_id, name="at farm", limit=20)
print(f"{chain.filtered_activities} of {chain.total_activities} upstream activities match 'at farm'")
for entry in chain.entries[:5]:
print(f" {entry.quantity:.4g} {entry.unit} of {entry.name} ({entry.location})")

For “how exactly does this root reach a specific upstream supplier?”, use get_path_to(process_id, target=...) — returns a PathResult of ordered PathSteps root → target with cumulative quantities and step ratios.

Where is this supplier used? Which products depend on it?

result = c.get_consumers(plants[0].process_id, max_depth=2, limit=10)
for cons in result.consumers:
print(f" depth={cons.depth} {cons.name} ({cons.location})")

Returns a ConsumersResponse with consumers, pagination, and (when include_edges=True) the technosphere edges so callers can reconstruct supplier→consumer paths without a second round trip. Pass classification_filters=[...] to restrict to a category.

What are the cumulative biosphere flows (CO₂, water, methane, …) per functional unit, before applying any characterization method?

inv = c.get_inventory(plants[0].process_id, limit=20)
# inv is a raw dict — see the OpenAPI spec for the full shape.
# Substitutions are accepted: c.get_inventory(pid, substitutions=[...])

The inventory is what every LCIA method runs on top of. If you only need grouped views (by name, location, classification, etc.), reach for c.aggregate(scope="biosphere", group_by=...) instead — same data, summarized.

What’s the carbon footprint of this product? Which emissions dominate the score?

score = c.get_impacts(plants[0].process_id, method_id="EF3.1-climate-change", top_flows=5)
print(f"{score.score:.4g} {score.unit}")
for c_flow in score.top_contributors:
print(f" {c_flow.share_pct:.1f}% {c_flow.flow_name}")

LCIAResult carries the score, unit, optional normalized_score / weighted_score (in Pt), and the top contributing biosphere flows with their share_pct.

Compute every impact category in one go — climate, water, land use, …

batch = c.get_impacts_batch(plants[0].process_id)
for r in batch.results:
print(f" {r.category}: {r.score:.4g} {r.unit}")
if batch.single_score is not None:
print(f"PEF single score: {batch.single_score:.4g} {batch.single_score_unit}")

LCIABatchResult also surfaces formula-based scoring sets (PEF, ECS…) via scoring_results and scoring_indicators, so you can render a per-indicator chart alongside the aggregate single score.

I have a climate-change score. Which biosphere flows account for it? Which upstream activities?

get_impacts(...).top_contributors already returns the top biosphere flows for a single LCIA call. For a deeper or differently-bounded view — and for the activity attribution view — use the standalone drill-down endpoints:

flows = c.get_contributing_flows(
plants[0].process_id,
method_id="EF3.1-climate-change",
limit=10,
)
acts = c.get_contributing_activities(
plants[0].process_id,
method_id="EF3.1-climate-change",
limit=10,
)
# Both return raw dicts — the shape is documented in the OpenAPI spec.

Which characterization factors does a method apply, and to which database flows?

char = c.get_characterization(method_id="EF3.1-climate-change", limit=20)

Useful for sanity-checking method coverage or building custom indicators on top of the engine’s mapping.

What are the top emitting substances? How do flows break down by category, location, or classification?

agg = c.aggregate(
plants[0].process_id,
scope="biosphere",
group_by="name",
aggregate="sum_quantity",
)
for g in agg.groups[:5]:
print(f" {g.quantity:.4g} {g.unit or ''} of {g.key}")

scope selects what to aggregate over: "direct" (just this activity’s exchanges), "supply_chain" (cumulative upstream), or "biosphere" (all elementary flows). group_by accepts "name", "flow_id", "unit", "location", "target_name", or "classification.<system>".

How does variant A differ from variant B? Which inputs change?

from volca import compare_activities
diff = compare_activities(c, plants[0].process_id, plants[1].process_id, scope="direct")
print(f" matched: {len(diff.matched)}, only-left: {len(diff.left_only)}, only-right: {len(diff.right_only)}")
for row in diff.matched[:3]:
print(f" {row.key}: {row.left:.4g}{row.right:.4g} (Δ={row.delta:+.4g})")

A client-side merge over two aggregate calls. Groups by flow_id (default) so matching is stable across naming variants. Pass scope="supply_chain" to compare cumulative inputs instead of direct exchanges.

What if I used organic wheat instead of conventional? Recycled aluminium instead of virgin? — without reloading the database.

The engine applies a Sherman–Morrison rank-1 update, so substitutions are fast regardless of database size. Works on get_supply_chain, get_inventory, and get_impacts.

subs = [{
"from": "old-supplier-pid", # the activity to replace
"to": "new-supplier-pid", # the replacement
"consumer": "consumer-pid", # the activity that directly uses the old supplier
}]
score = c.get_impacts(plants[0].process_id, method_id="EF3.1-climate-change", substitutions=subs)

Multiple substitutions chain in one call — the consumer field disambiguates where in the chain each swap applies.

The activity doesn’t exist, the engine is down, or the request is malformed — what do I catch?

from volca import VoLCAError
try:
score = c.get_impacts("nonexistent-pid", method_id="EF3.1-climate-change")
except VoLCAError as e:
print(f" failed: {e.status_code}{e.body[:80]}")

VoLCAError.status_code is the HTTP status when the engine returned one; body is the raw response body.

I want to run the same workflow against ecoinvent instead of Agribalyse — without rebuilding the client.

ei = c.use("ecoinvent-3.10")
ei_results = ei.search_activities(name="electricity, high voltage")

Client.use(db_name) returns a new Client targeting a different database while sharing the HTTP session and dispatch table — no spec re-fetch.

Refresh IDE autocomplete after upgrading the engine

Section titled “Refresh IDE autocomplete after upgrading the engine”

I just upgraded the VoLCA server. How do I get my editor to see the new endpoints?

c.refresh_stubs()

Pyvolca dispatches dynamically against the engine’s OpenAPI spec, so it ships without .pyi stubs. refresh_stubs() refetches the spec and writes stubs into the installed package directory; restart your language server to pick them up.

For exhaustive classes, methods, signatures, return types, and exceptions, see the Python API Reference.

Apache-2.0