Skip to content

StepExecutionEngine Class API Reference

The StepExecutionEngine class manages the execution of individual workflow steps, handling tool selection and LLM interactions.

Class Definition

Handles the execution of workflow steps with integrated tool selection and LLM interaction.

Manages all aspects of step execution including tool selection, LLM interaction, and error handling. Provides configurable retry logic and streaming support.

Attributes:

Name Type Description
_client

The AI client for model interactions

_default_model

Default model configuration for steps

_default_kwargs

Default keyword arguments for model calls

_default_tool_selection_config

Default tool selection settings

_default_tool_model

Default model for tool selection

_tool_selector

Instance handling tool selection logic

Example

Basic execution engine setup:

engine = StepExecutionEngine(
    client=client,
    default_model=ModelConfig(name="gpt-4"),
    default_kwargs={"temperature": 0.7}
)

result = engine.execute_step(step, agent, "input data")

Configure tool selection:

engine = StepExecutionEngine(
    client=client,
    default_model="gpt-4",
    default_kwargs={},
    tool_selection_config=ToolSelectionConfig(
        confidence_threshold=0.8,
        max_tools_per_step=3
    ),
    tool_model=ModelConfig(name="llama-2")
)

Notes
  • Manages automatic tool selection with confidence thresholds
  • Supports separate models for workflow and tool selection
  • Implements retry logic for LLM calls
  • Handles streaming configurations
  • Provides comprehensive error handling
Source code in clientai/agent/core/execution.py
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
class StepExecutionEngine:
    """Handles the execution of workflow steps with
    integrated tool selection and LLM interaction.

    Manages all aspects of step execution including tool selection,
    LLM interaction, and error handling. Provides configurable retry
    logic and streaming support.

    Attributes:
        _client: The AI client for model interactions
        _default_model: Default model configuration for steps
        _default_kwargs: Default keyword arguments for model calls
        _default_tool_selection_config: Default tool selection settings
        _default_tool_model: Default model for tool selection
        _tool_selector: Instance handling tool selection logic

    Example:
        Basic execution engine setup:
        ```python
        engine = StepExecutionEngine(
            client=client,
            default_model=ModelConfig(name="gpt-4"),
            default_kwargs={"temperature": 0.7}
        )

        result = engine.execute_step(step, agent, "input data")
        ```

        Configure tool selection:
        ```python
        engine = StepExecutionEngine(
            client=client,
            default_model="gpt-4",
            default_kwargs={},
            tool_selection_config=ToolSelectionConfig(
                confidence_threshold=0.8,
                max_tools_per_step=3
            ),
            tool_model=ModelConfig(name="llama-2")
        )
        ```

    Notes:
        - Manages automatic tool selection with confidence thresholds
        - Supports separate models for workflow and tool selection
        - Implements retry logic for LLM calls
        - Handles streaming configurations
        - Provides comprehensive error handling
    """

    def __init__(
        self,
        client: ClientAI,
        default_model: Union[str, ModelConfig],
        default_kwargs: Dict[str, Any],
        tool_selection_config: Optional[ToolSelectionConfig] = None,
        tool_model: Optional[Union[str, ModelConfig]] = None,
    ) -> None:
        """
        Initialize the execution engine with specified configurations.

        Args:
            client: The AI client for model interactions
            default_model: Default model configuration
                           (string name or ModelConfig)
            default_kwargs: Default parameters for model calls
            tool_selection_config: Configuration for tool selection behavior
            tool_model: Model to use for tool selection
                        (default default_model)

        Example:
            ```python
            engine = StepExecutionEngine(
                client=my_client,
                default_model="gpt-4",
                default_kwargs={"temperature": 0.7},
                tool_selection_config=ToolSelectionConfig(
                    confidence_threshold=0.8
                )
            )
            ```

        Raises:
            StepError: If initialization fails or configuration is invalid
        """
        try:
            if not client:  # pragma: no cover
                raise ValueError("Client must be specified")
            if not default_model:  # pragma: no cover
                raise ValueError("Default model must be specified")

            self._client = client
            self._default_model = default_model
            self._default_kwargs = default_kwargs
            self._current_agent: Optional[Any] = None
            self._default_tool_selection_config = (
                tool_selection_config or ToolSelectionConfig()
            )

            self._default_tool_model = self._create_tool_model_config(
                tool_model if tool_model is not None else default_model
            )

            logger.debug(
                f"Initialized StepExecutionEngine with model: {default_model}"
            )

        except ValueError as e:  # pragma: no cover
            logger.error(f"Initialization error: {e}")
            raise StepError(str(e)) from e
        except Exception as e:  # pragma: no cover
            logger.error(f"Unexpected initialization error: {e}")
            raise StepError(
                f"Unexpected initialization error: {str(e)}"
            ) from e

    def _create_tool_model_config(
        self, model: Union[str, ModelConfig]
    ) -> ModelConfig:
        """Create a ModelConfig instance for tool selection.

        Args:
            model: Model name or configuration

        Returns:
            Configured ModelConfig for tool selection

        Raises:
            StepError: If configuration is invalid
        """
        try:
            if isinstance(model, str):
                return ModelConfig(
                    name=model,
                    temperature=0.0,
                    json_output=True,
                )
            elif isinstance(model, ModelConfig):
                return model.merge(
                    temperature=0.0,
                    json_output=True,
                )
            else:  # pragma: no cover
                raise ValueError(
                    f"Invalid model type: {type(model)}. "
                    "Must be string or ModelConfig"
                )
        except Exception as e:  # pragma: no cover
            logger.error(f"Error creating tool model config: {e}")
            raise StepError(f"Invalid tool model configuration: {str(e)}")

    def _get_effective_tool_model(self, step: Step) -> ModelConfig:
        """Get the model to use for tool selection based on priority order.

        Args:
            step: Step being executed

        Returns:
            ModelConfig for tool selection

        Raises:
            StepError: If model configuration fails
        """
        try:
            if step.tool_model is not None:
                return self._create_tool_model_config(step.tool_model)

            return self._default_tool_model

        except Exception as e:  # pragma: no cover
            logger.error(f"Error determining tool model: {e}")
            raise StepError(f"Failed to determine tool model: {str(e)}")

    def _get_tool_selection_config(self, step: Step) -> ToolSelectionConfig:
        """
        Get the effective tool selection configuration for a step.

        Determines the tool selection configuration to use by checking for
        step-specific settings and falling back to defaults if needed.

        Args:
            step: The step being executed

        Returns:
            The tool selection configuration to use for this step

        Raises:
            StepError: If configuration access fails

        Example:
            ```python
            config = engine._get_tool_selection_config(step)
            print(config.confidence_threshold)  # Output: 0.8
            ```
        """
        try:
            step_config = getattr(step, "tool_selection_config", None)
            return step_config or self._default_tool_selection_config
        except AttributeError as e:  # pragma: no cover
            logger.error(f"Invalid step configuration: {e}")
            raise StepError(f"Invalid step configuration: {str(e)}") from e
        except Exception as e:  # pragma: no cover
            logger.error(f"Error accessing tool selection config: {e}")
            raise StepError(
                f"Error accessing tool selection config: {str(e)}"
            ) from e

    def _get_tool_model(self, step: Step) -> Union[str, ModelConfig]:
        """
        Get the effective model to use for tool selection in a step.

        Determines which model should be used for tool selection by checking
        step-specific settings and falling back to defaults if needed.

        Args:
            step: The step being executed

        Returns:
            The model configuration to use for tool selection

        Raises:
            StepError: If model configuration access fails

        Example:
            ```python
            model = engine._get_tool_model(step)
            print(model.name if isinstance(model, ModelConfig) else model)
            ```
        """
        try:
            step_model = getattr(step, "tool_model", None)
            return step_model or self._default_tool_model
        except AttributeError as e:  # pragma: no cover
            logger.error(f"Invalid step model configuration: {e}")
            raise StepError(
                f"Invalid step model configuration: {str(e)}"
            ) from e
        except Exception as e:  # pragma: no cover
            logger.error(f"Error accessing tool model: {e}")
            raise StepError(f"Error accessing tool model: {str(e)}") from e

    def _build_prompt(
        self, step: Step, agent: Any, *args: Any, **kwargs: Any
    ) -> str:
        """Build the prompt for a step, including tool selection if enabled.

        Args:
            step: Step being executed
            agent: Agent instance
            *args: Step arguments
            **kwargs: Step keyword arguments

        Returns:
            Complete prompt string

        Raises:
            StepError: If prompt building fails
            ToolError: If tool execution fails
        """
        logger.debug(f"Building prompt for step '{step.name}'")
        logger.debug(f"Step use_tools setting: {step.use_tools}")

        try:
            result = step.func(agent, *args, **kwargs)
            if not isinstance(result, str):  # pragma: no cover
                raise ValueError(
                    f"Step function must return str, got {type(result)}"
                )
            prompt = result

            if step.use_tools:
                try:
                    prompt = self._handle_tool_execution(step, agent, prompt)
                except Exception as e:  # pragma: no cover
                    raise ToolError(f"Tool execution failed: {str(e)}") from e

            logger.debug(f"Final prompt: {prompt}")
            return prompt

        except ValueError as e:  # pragma: no cover
            logger.error(f"Prompt building error: {e}")
            raise StepError(str(e)) from e
        except Exception as e:  # pragma: no cover
            logger.error(f"Failed to build prompt: {e}")
            raise StepError(f"Failed to build prompt: {str(e)}") from e

    def _handle_tool_execution(
        self, step: Step, agent: Any, base_prompt: str
    ) -> str:
        """Handle tool selection and execution for a step.

        Args:
            step: The step being executed
            agent: The agent instance
            base_prompt: Initial prompt before tool execution

        Returns:
            Updated prompt including tool execution results

        Raises:
            ToolError: If tool selection or execution fails

        Example:
            ```python
            prompt = engine._handle_tool_execution(
                step=analyze_step,
                agent=agent,
                base_prompt="Analyze the following data..."
            )
            # Returns prompt enhanced with tool execution results
            ```
        """
        logger.debug("Tool usage is enabled for this step")
        available_tools = agent.get_tools(step.step_type.name.lower())
        logger.debug(f"Found {len(available_tools)} available tools")

        if not available_tools:
            logger.debug("No tools available for this step")
            return base_prompt

        try:
            tool_model = self._get_effective_tool_model(step)
            logger.debug(f"Using tool model: {tool_model.name}")

            tool_selector = ToolSelector(
                model_config=tool_model,
                config=self._get_tool_selection_config(step),
            )

            decisions = tool_selector.select_tools(
                task=base_prompt,
                tools=available_tools,
                context=agent.context.state,
                client=self._client,
            )

            logger.debug(f"Tool selector returned {len(decisions)} decisions")

            if not decisions:
                return base_prompt

            tools_by_name = {t.name: t for t in available_tools}
            updated_decisions = tool_selector.execute_tool_decisions(
                decisions=decisions, tools=tools_by_name
            )

            object.__setattr__(step, "tool_decisions", updated_decisions)

            prompt = base_prompt + "\n\nTool Execution Results:\n"
            for decision in updated_decisions:
                prompt += self._format_tool_result(decision)

            agent.context.state["last_tool_decisions"] = [
                self._create_decision_dict(d) for d in updated_decisions
            ]

            return prompt

        except Exception as e:  # pragma: no cover
            logger.error(f"Tool execution error: {e}", exc_info=True)
            raise ToolError(f"Tool execution failed: {str(e)}") from e

    def _format_tool_result(self, decision: Any) -> str:
        """Format a single tool execution decision into a string.

        Args:
            decision: Tool execution decision with results

        Returns:
            Formatted string representing the tool execution result

        Example:
            ```python
            formatted = engine._format_tool_result(decision)
            # Output format:
            # CalculatorTool:
            # Result: 42
            # Confidence: 0.95
            # Reasoning: Used for precise calculation
            ```
        """
        if decision.error:
            result_line = f"Error: {decision.error}"
        else:
            result_line = f"Result: {str(decision.result)}"

        return (
            f"\n{decision.tool_name}:"
            f"\n{result_line}"
            f"\nConfidence: {decision.confidence}"
            f"\nReasoning: {decision.reasoning}\n"
        )

    def _create_decision_dict(self, decision: Any) -> Dict[str, Any]:
        """Create a dictionary representation of a tool execution decision.

        Args:
            decision: Tool execution decision to convert

        Returns:
            Dictionary containing all decision information

        Example:
            ```python
            decision_dict = engine._create_decision_dict(decision)
            # Output structure:
            # {
            #     "tool_name": "calculator",
            #     "arguments": {"x": 5, "y": 3},
            #     "result": 8,
            #     "error": None,
            #     "confidence": 0.95,
            #     "reasoning": "Required for calculation"
            # }
            ```
        """
        return {
            "tool_name": decision.tool_name,
            "arguments": decision.arguments,
            "result": decision.result,
            "error": decision.error,
            "confidence": decision.confidence,
            "reasoning": decision.reasoning,
        }

    def _execute_single_call(
        self,
        step: Step,
        prompt: str,
        api_kwargs: Optional[Dict[str, Any]] = None,
    ) -> Union[str, Iterator[str]]:
        """Execute a single LLM API call with proper configuration."""
        try:
            model_config = step.llm_config or self._default_model

            if isinstance(model_config, str):
                model_config = ModelConfig(name=model_config)

            param_attrs = ModelConfig.CORE_ATTRS - {"name"}

            step_params = {
                k: getattr(step, k) for k in param_attrs if hasattr(step, k)
            }

            effective_params = model_config.get_parameters()
            effective_params.update(self._default_kwargs)
            effective_params.update(step_params)

            if api_kwargs:
                additional_kwargs = {
                    k: v
                    for k, v in api_kwargs.items()
                    if k not in ModelConfig.CORE_ATTRS
                }
                effective_params.update(additional_kwargs)

            result = self._client.generate_text(
                prompt, model=model_config.name, **effective_params
            )

            return cast(Union[str, Iterator[str]], result)

        except ClientAIError:  # pragma: no cover
            raise
        except Exception as e:  # pragma: no cover
            logger.error(f"Unexpected error during LLM call: {e}")
            raise StepError(
                f"Unexpected error during LLM call: {str(e)}"
            ) from e

    def _prepare_api_kwargs(
        self, model_config: Union[str, ModelConfig]
    ) -> Dict[str, Any]:
        """Prepare keyword arguments for the LLM API call.

        Args:
            model_config: The model configuration to use

        Returns:
            Dictionary of API keyword arguments

        Raises:
            StepError: If API argument preparation fails
        """
        try:
            kwargs = self._default_kwargs.copy()

            if isinstance(model_config, ModelConfig):
                model_params = model_config.get_parameters()
                kwargs.update(model_params)

            return kwargs
        except Exception as e:  # pragma: no cover
            logger.error(f"Error preparing API arguments: {e}")
            raise StepError(
                f"Failed to prepare API arguments: {str(e)}"
            ) from e

    def _execute_with_retry(
        self,
        step: Step,
        prompt: str,
        api_kwargs: Optional[Dict[str, Any]] = None,
    ) -> Union[str, Iterator[str]]:
        """
        Execute an LLM call using the appropriate execution strategy.

        Args:
            step: The step being executed
            prompt: The prompt to send to the LLM
            api_kwargs: Optional dictionary of API call arguments

        Returns:
            Either a string or an iterator of strings for streaming responses

        Raises:
            ClientAIError: If the LLM call fails
        """
        for attempt in range(step.config.retry_count + 1):
            try:
                return self._execute_single_call(step, prompt, api_kwargs)
            except ClientAIError as e:  # pragma: no cover
                if attempt >= step.config.retry_count:
                    logger.error(
                        f"All retry attempts failed for "
                        f"step '{step.name}': {e}"
                    )
                    raise
                logger.warning(
                    f"Retry {attempt + 1}/{step.config.retry_count} "
                    f"for step '{step.name}': {e}"
                )
                continue
        raise ClientAIError("All retry attempts failed")

    def _execute_llm_call(
        self,
        step: Step,
        prompt: str,
        api_kwargs: Optional[Dict[str, Any]] = None,
    ) -> Union[str, Iterator[str]]:
        """
        Execute an LLM call with appropriate retry handling.

        Args:
            step: The step being executed
            prompt: The prompt to send to the LLM
            api_kwargs: Optional dictionary of API call arguments

        Returns:
            Either a string or an iterator of strings for streaming responses

        Raises:
            ClientAIError: If the LLM call fails
        """
        try:
            if step.config.use_internal_retry:
                return self._execute_with_retry(step, prompt, api_kwargs)
            return self._execute_single_call(step, prompt, api_kwargs)
        except ClientAIError:  # pragma: no cover
            raise
        except Exception as e:
            logger.error(f"LLM execution failed: {e}")
            raise StepError(f"LLM execution failed: {str(e)}") from e

    def _get_model_name(
        self, model_config: Union[str, ModelConfig, None]
    ) -> str:
        """Extract the model name from various configuration formats.

        Args:
            model_config: The model configuration to process

        Returns:
            The model name as a string

        Raises:
            StepError: If model name extraction fails

        Example:
            ```python
            name = engine._get_model_name(ModelConfig(name="gpt-4"))
            print(name)  # Output: "gpt-4"
            ```
        """
        try:
            if isinstance(model_config, str):
                return model_config
            if isinstance(model_config, ModelConfig):
                return model_config.name
            if isinstance(self._default_model, str):
                return self._default_model
            if isinstance(self._default_model, ModelConfig):
                return self._default_model.name
            raise ValueError("No valid model configuration found")
        except Exception as e:
            logger.error(f"Error getting model name: {e}")
            raise StepError(f"Failed to get model name: {str(e)}") from e

    def _validate_step_output(
        self,
        step: Step,
        result: Union[str, Iterator[str]],
    ) -> Any:
        """Validate step output if validation is enabled.

        Args:
            step: The executed step
            result: Raw result from step execution. May be:
                - String: Direct response
                - Iterator[str]: Streamed response chunks
            stream: Whether this is a streaming response (currently unused)

        Returns:
            Any: Either:
                - The validated data if validation is enabled and successful
                - The original result if validation is not enabled

        Raises:
            ValidationError: If validation fails for the output data
            StepError: If an unexpected error occurs during validation
        """
        validator = StepValidator.from_step(step)
        if not validator:
            return result

        try:
            context: ValidatorContext = ValidatorContext(
                data=result,
                format=OutputFormat.JSON,
                partial=False,
                metadata={
                    "step_name": step.name,
                    "step_type": step.step_type,
                },
            )
            validation_result = validator.validate(result, context)
            if validation_result.is_valid:
                return validation_result.data

            error_str = "\n".join(
                f"{k}: {v}" for k, v in validation_result.errors.items()
            )
            raise ValidationError(error_str)

        except ValidationError:  # pragma: no cover
            raise
        except Exception as e:  # pragma: no cover
            raise StepError(f"Unexpected validation error: {str(e)}")

    def execute_step(
        self,
        step: Step,
        *args: Any,
        stream: Optional[bool] = None,
        **kwargs: Any,
    ) -> Optional[Union[str, Iterator[str]]]:
        """Execute a single workflow step with full configuration.

        Main entry point for step execution, handling tool selection,
        LLM interaction, validation, error handling, and result management.

        Args:
            step: The step to execute
            *args: Additional positional arguments for the step
            stream: Optional bool to override step's stream configuration
            **kwargs: Additional keyword arguments for the step

        Returns:
            Returns:
                Optional[Union[str, Iterator[str], Any]]:
                    The step execution result:
                    - None if step is disabled or failed
                    - Complete string if streaming is disabled
                    - Iterator of string chunks if streaming is enabled
                    - Any type for validated outputs from json_output steps

        Raises:
            StepError: If step execution fails and step is required
            ToolError: If tool execution fails
            ClientAIError: If LLM interaction fails
            ValidationError: If output validation fails for json_output steps

        Example:
            Basic step execution:
            ```python
            # Execute with default stream setting
            result = engine.execute_step(step, agent, "input data")

            # Force streaming on
            result = engine.execute_step(
                step,
                agent,
                "input data",
                stream=True
            )

            # Handle streaming results
            if isinstance(result, Iterator):
                for chunk in result:
                    print(chunk, end="")
            else:
                print(result)
            ```

            With validation:
            ```python
            class OutputModel(BaseModel):
                value: str
                score: float

            @think("analyze", json_output=True)
            def analyze(self, data: str) -> OutputModel:
                return "..."
            ```

        Notes:
            - Handles both streaming and non-streaming responses
            - Manages tool selection if enabled for step
            - Updates agent context with results
            - Supports retry logic for failed steps
            - Validates output against Pydantic models when json_output=True
            - Supports partial validation during streaming
        """
        if self._current_agent is None:  # pragma: no cover
            raise StepError("No agent context available for step execution")

        logger.info(f"Executing step '{step.name}'")
        if stream is None:  # pragma: no cover
            stream = getattr(step, "stream", False)

        logger.debug(
            f"Step configuration: use_tools={step.use_tools}, "
            f"send_to_llm={step.send_to_llm}, "
            f"json_output={getattr(step, 'json_output', False)}"
            f"stream={stream}"
        )

        if not step.config.enabled:
            logger.info(f"Step '{step.name}' is disabled, skipping")
            return None

        try:
            result = None

            if step.send_to_llm:
                try:
                    result = self._handle_llm_step(
                        step, self._current_agent, stream, args, kwargs
                    )
                except (ClientAIError, StepError, ToolError):
                    raise
                except Exception as e:  # pragma: no cover
                    raise StepError(
                        f"LLM step execution failed: {str(e)}"
                    ) from e
            else:
                try:
                    result = self._handle_non_llm_step(
                        step, self._current_agent, args, kwargs
                    )
                except (ValueError, TimeoutError):  # pragma: no cover
                    raise
                except Exception as e:  # pragma: no cover
                    raise StepError(
                        f"Non-LLM step execution failed: {str(e)}"
                    ) from e

            if result is not None:
                result = self._validate_step_output(step, result)

            self._update_context(step, self._current_agent, result)
            return result

        except (
            ClientAIError,
            StepError,
            ToolError,
            ValueError,
            TimeoutError,
            ValidationError,
        ):
            raise
        except Exception as e:  # pragma: no cover
            logger.error(f"Unexpected error executing step '{step.name}': {e}")
            if step.config.required:
                raise StepError(
                    f"Required step '{step.name}' failed: {str(e)}"
                ) from e
            return None

    def _handle_llm_step(
        self,
        step: Step,
        agent: Any,
        stream: Optional[bool],
        args: Any,
        kwargs: Any,
    ) -> Union[str, Iterator[str]]:
        """Handle execution of a step that involves LLM interaction.

        Args:
            step: The step being executed
            agent: The agent instance
            stream: Optional stream configuration override
            args: Positional arguments for the step
            kwargs: Keyword arguments for the step

        Returns:
            Either a string or an iterator of strings for streaming responses

        Raises:
            StepError: If step execution fails
            ToolError: If tool execution fails
            ClientAIError: If LLM interaction fails
        """
        prompt = self._build_prompt(step, agent, *args, **kwargs)
        model_config = self._prepare_model_config(step, stream)
        api_kwargs = self._prepare_api_kwargs(model_config)

        logger.debug(f"Executing LLM call with streaming={stream}")
        return self._execute_llm_call(step, prompt, api_kwargs)

    def _handle_non_llm_step(
        self, step: Step, agent: Any, args: Any, kwargs: Any
    ) -> Union[str, Iterator[str]]:
        """Handle execution of a step that doesn't involve LLM interaction.

        Args:
            step: The step being executed
            agent: The agent instance
            args: Positional arguments for the step
            kwargs: Keyword arguments for the step

        Returns:
            Either a string or an iterator of strings

        Raises:
            StepError: If step execution fails or returns invalid type
        """
        result = step.func(agent, *args, **kwargs)
        if not isinstance(result, (str, Iterator)):  # noqa: UP038
            raise StepError(
                f"Step function must return str or Iterator, "
                f"got {type(result)}"
            )
        return result

    def _prepare_model_config(
        self, step: Step, stream: Optional[bool]
    ) -> Union[str, ModelConfig]:
        """Prepare the model configuration for a step execution.

        Args:
            step: The step being executed
            stream: Optional stream configuration override

        Returns:
            The prepared model configuration

        Raises:
            StepError: If model configuration preparation fails
        """
        try:
            model_config = step.llm_config or self._default_model

            if isinstance(model_config, ModelConfig):
                if stream is not None:
                    return model_config.merge(stream=stream)
                return model_config

            return ModelConfig(
                name=model_config,
                stream=stream if stream is not None else False,
            )
        except Exception as e:  # pragma: no cover
            logger.error(f"Error preparing model configuration: {e}")
            raise StepError(
                f"Failed to prepare model configuration: {str(e)}"
            ) from e

    def _update_context(
        self,
        step: Step,
        agent: Any,
        result: Optional[Union[str, Iterator[str]]],
    ) -> None:
        """Update the agent's context with step execution results.

        Args:
            step: Executed step
            agent: Agent instance
            result: Step execution result

        Raises:
            StepError: If context update fails
        """
        try:
            if result is not None and not isinstance(result, Iterator):
                agent.context.set_step_result(step.name, result)

                if step.config.pass_result:
                    agent.context.current_input = result

        except Exception as e:  # pragma: no cover
            logger.error(f"Error updating context: {e}")
            raise StepError(f"Failed to update context: {str(e)}") from e

__init__(client, default_model, default_kwargs, tool_selection_config=None, tool_model=None)

Initialize the execution engine with specified configurations.

Parameters:

Name Type Description Default
client ClientAI

The AI client for model interactions

required
default_model Union[str, ModelConfig]

Default model configuration (string name or ModelConfig)

required
default_kwargs Dict[str, Any]

Default parameters for model calls

required
tool_selection_config Optional[ToolSelectionConfig]

Configuration for tool selection behavior

None
tool_model Optional[Union[str, ModelConfig]]

Model to use for tool selection (default default_model)

None
Example
engine = StepExecutionEngine(
    client=my_client,
    default_model="gpt-4",
    default_kwargs={"temperature": 0.7},
    tool_selection_config=ToolSelectionConfig(
        confidence_threshold=0.8
    )
)

Raises:

Type Description
StepError

If initialization fails or configuration is invalid

Source code in clientai/agent/core/execution.py
def __init__(
    self,
    client: ClientAI,
    default_model: Union[str, ModelConfig],
    default_kwargs: Dict[str, Any],
    tool_selection_config: Optional[ToolSelectionConfig] = None,
    tool_model: Optional[Union[str, ModelConfig]] = None,
) -> None:
    """
    Initialize the execution engine with specified configurations.

    Args:
        client: The AI client for model interactions
        default_model: Default model configuration
                       (string name or ModelConfig)
        default_kwargs: Default parameters for model calls
        tool_selection_config: Configuration for tool selection behavior
        tool_model: Model to use for tool selection
                    (default default_model)

    Example:
        ```python
        engine = StepExecutionEngine(
            client=my_client,
            default_model="gpt-4",
            default_kwargs={"temperature": 0.7},
            tool_selection_config=ToolSelectionConfig(
                confidence_threshold=0.8
            )
        )
        ```

    Raises:
        StepError: If initialization fails or configuration is invalid
    """
    try:
        if not client:  # pragma: no cover
            raise ValueError("Client must be specified")
        if not default_model:  # pragma: no cover
            raise ValueError("Default model must be specified")

        self._client = client
        self._default_model = default_model
        self._default_kwargs = default_kwargs
        self._current_agent: Optional[Any] = None
        self._default_tool_selection_config = (
            tool_selection_config or ToolSelectionConfig()
        )

        self._default_tool_model = self._create_tool_model_config(
            tool_model if tool_model is not None else default_model
        )

        logger.debug(
            f"Initialized StepExecutionEngine with model: {default_model}"
        )

    except ValueError as e:  # pragma: no cover
        logger.error(f"Initialization error: {e}")
        raise StepError(str(e)) from e
    except Exception as e:  # pragma: no cover
        logger.error(f"Unexpected initialization error: {e}")
        raise StepError(
            f"Unexpected initialization error: {str(e)}"
        ) from e

execute_step(step, *args, stream=None, **kwargs)

Execute a single workflow step with full configuration.

Main entry point for step execution, handling tool selection, LLM interaction, validation, error handling, and result management.

Parameters:

Name Type Description Default
step Step

The step to execute

required
*args Any

Additional positional arguments for the step

()
stream Optional[bool]

Optional bool to override step's stream configuration

None
**kwargs Any

Additional keyword arguments for the step

{}

Returns:

Name Type Description
Returns Optional[Union[str, Iterator[str]]]

Optional[Union[str, Iterator[str], Any]]: The step execution result: - None if step is disabled or failed - Complete string if streaming is disabled - Iterator of string chunks if streaming is enabled - Any type for validated outputs from json_output steps

Raises:

Type Description
StepError

If step execution fails and step is required

ToolError

If tool execution fails

ClientAIError

If LLM interaction fails

ValidationError

If output validation fails for json_output steps

Example

Basic step execution:

# Execute with default stream setting
result = engine.execute_step(step, agent, "input data")

# Force streaming on
result = engine.execute_step(
    step,
    agent,
    "input data",
    stream=True
)

# Handle streaming results
if isinstance(result, Iterator):
    for chunk in result:
        print(chunk, end="")
else:
    print(result)

With validation:

class OutputModel(BaseModel):
    value: str
    score: float

@think("analyze", json_output=True)
def analyze(self, data: str) -> OutputModel:
    return "..."

Notes
  • Handles both streaming and non-streaming responses
  • Manages tool selection if enabled for step
  • Updates agent context with results
  • Supports retry logic for failed steps
  • Validates output against Pydantic models when json_output=True
  • Supports partial validation during streaming
Source code in clientai/agent/core/execution.py
def execute_step(
    self,
    step: Step,
    *args: Any,
    stream: Optional[bool] = None,
    **kwargs: Any,
) -> Optional[Union[str, Iterator[str]]]:
    """Execute a single workflow step with full configuration.

    Main entry point for step execution, handling tool selection,
    LLM interaction, validation, error handling, and result management.

    Args:
        step: The step to execute
        *args: Additional positional arguments for the step
        stream: Optional bool to override step's stream configuration
        **kwargs: Additional keyword arguments for the step

    Returns:
        Returns:
            Optional[Union[str, Iterator[str], Any]]:
                The step execution result:
                - None if step is disabled or failed
                - Complete string if streaming is disabled
                - Iterator of string chunks if streaming is enabled
                - Any type for validated outputs from json_output steps

    Raises:
        StepError: If step execution fails and step is required
        ToolError: If tool execution fails
        ClientAIError: If LLM interaction fails
        ValidationError: If output validation fails for json_output steps

    Example:
        Basic step execution:
        ```python
        # Execute with default stream setting
        result = engine.execute_step(step, agent, "input data")

        # Force streaming on
        result = engine.execute_step(
            step,
            agent,
            "input data",
            stream=True
        )

        # Handle streaming results
        if isinstance(result, Iterator):
            for chunk in result:
                print(chunk, end="")
        else:
            print(result)
        ```

        With validation:
        ```python
        class OutputModel(BaseModel):
            value: str
            score: float

        @think("analyze", json_output=True)
        def analyze(self, data: str) -> OutputModel:
            return "..."
        ```

    Notes:
        - Handles both streaming and non-streaming responses
        - Manages tool selection if enabled for step
        - Updates agent context with results
        - Supports retry logic for failed steps
        - Validates output against Pydantic models when json_output=True
        - Supports partial validation during streaming
    """
    if self._current_agent is None:  # pragma: no cover
        raise StepError("No agent context available for step execution")

    logger.info(f"Executing step '{step.name}'")
    if stream is None:  # pragma: no cover
        stream = getattr(step, "stream", False)

    logger.debug(
        f"Step configuration: use_tools={step.use_tools}, "
        f"send_to_llm={step.send_to_llm}, "
        f"json_output={getattr(step, 'json_output', False)}"
        f"stream={stream}"
    )

    if not step.config.enabled:
        logger.info(f"Step '{step.name}' is disabled, skipping")
        return None

    try:
        result = None

        if step.send_to_llm:
            try:
                result = self._handle_llm_step(
                    step, self._current_agent, stream, args, kwargs
                )
            except (ClientAIError, StepError, ToolError):
                raise
            except Exception as e:  # pragma: no cover
                raise StepError(
                    f"LLM step execution failed: {str(e)}"
                ) from e
        else:
            try:
                result = self._handle_non_llm_step(
                    step, self._current_agent, args, kwargs
                )
            except (ValueError, TimeoutError):  # pragma: no cover
                raise
            except Exception as e:  # pragma: no cover
                raise StepError(
                    f"Non-LLM step execution failed: {str(e)}"
                ) from e

        if result is not None:
            result = self._validate_step_output(step, result)

        self._update_context(step, self._current_agent, result)
        return result

    except (
        ClientAIError,
        StepError,
        ToolError,
        ValueError,
        TimeoutError,
        ValidationError,
    ):
        raise
    except Exception as e:  # pragma: no cover
        logger.error(f"Unexpected error executing step '{step.name}': {e}")
        if step.config.required:
            raise StepError(
                f"Required step '{step.name}' failed: {str(e)}"
            ) from e
        return None