Skip to main content

Activity execution - Python SDK

Start an Activity Execution

How to start an Activity Execution using the Temporal Python SDK.

Calls to spawn Activity Executions are written within a Workflow Definition. The call to spawn an Activity Execution generates the ScheduleActivityTask Command. This results in the set of three Activity Task related Events (ActivityTaskScheduled, ActivityTaskStarted, and ActivityTask[Closed])in your Workflow Execution Event History.

A single instance of the Activities implementation is shared across multiple simultaneous Activity invocations. Activity implementation code should be idempotent.

The values passed to Activities through invocation parameters or returned through a result value are recorded in the Execution history. The entire Execution history is transferred from the Temporal service to Workflow Workers when a Workflow state needs to recover. A large Execution history can thus adversely impact the performance of your Workflow.

Therefore, be mindful of the amount of data you transfer through Activity invocation parameters or Return Values. Otherwise, no additional limitations exist on Activity implementations.

To spawn an Activity Execution, use the execute_activity() operation from within your Workflow Definition.

execute_activity() is a shortcut for start_activity() that waits on its result.

To get just the handle to wait and cancel separately, use start_activity(). In most cases, use execute_activity() unless advanced task capabilities are needed.

A single argument to the Activity is positional. Multiple arguments are not supported in the type-safe form of start_activity() or execute_activity() and must be supplied by the args keyword argument.

View the source code

in the context of the rest of the application code.

from temporalio import workflow

with workflow.unsafe.imports_passed_through():
from your_activities_dacx import your_activity
from your_dataobject_dacx import YourParams
# ...
@workflow.defn(name="YourWorkflow")
class YourWorkflow:
@workflow.run
async def run(self, name: str) -> str:
return await workflow.execute_activity(
your_activity,
YourParams("Hello", name),
start_to_close_timeout=timedelta(seconds=10),
)

Set the required Activity Timeouts

How to set the required Activity Timeouts using the Temporal Python SDK.

Activity Execution semantics rely on several parameters. The only required value that needs to be set is either a Schedule-To-Close Timeout or a Start-To-Close Timeout. These values are set in the Activity Options.

Activity options are set as keyword arguments after the Activity arguments.

Available timeouts are:

  • schedule_to_close_timeout
  • schedule_to_start_timeout
  • start_to_close_timeout

View the source code

in the context of the rest of the application code.

# ...
activity_timeout_result = await workflow.execute_activity(
your_activity,
YourParams(greeting, "Activity Timeout option"),
# Activity Execution Timeout
start_to_close_timeout=timedelta(seconds=10),
# schedule_to_start_timeout=timedelta(seconds=10),
# schedule_to_close_timeout=timedelta(seconds=10),
)

Get the results of an Activity Execution

How to get the results of an Activity Execution using the Temporal Python SDK.

The call to spawn an Activity Execution generates the ScheduleActivityTask Command and provides the Workflow with an Awaitable. Workflow Executions can either block progress until the result is available through the Awaitable or continue progressing, making use of the result when it becomes available.

Use start_activity() to start an Activity and return its handle, ActivityHandle. Use execute_activity() to return the results.

You must provide either schedule_to_close_timeout or start_to_close_timeout.

execute_activity() is a shortcut for await start_activity(). An asynchronous execute_activity() helper is provided which takes the same arguments as start_activity() and awaits on the result. execute_activity() should be used in most cases unless advanced task capabilities are needed.

View the source code

in the context of the rest of the application code.

from temporalio import workflow

with workflow.unsafe.imports_passed_through():
from your_activities_dacx import your_activity
from your_dataobject_dacx import YourParams
# ...
@workflow.defn(name="YourWorkflow")
class YourWorkflow:
@workflow.run
async def run(self, name: str) -> str:
return await workflow.execute_activity(
your_activity,
YourParams("Hello", name),
start_to_close_timeout=timedelta(seconds=10),
)