Source code for sapimclient.model.pipeline

"""Pydantic models for Pipeline jobs."""

from typing import ClassVar, Literal

from pydantic import Field, computed_field, model_validator

from sapimclient import const

from .base import Endpoint

STAGETABLES: dict[str, list[str]] = {
    'TransactionalData': [
        'TransactionAndCredit',
        'Deposit',
    ],
    'OrganizationData': [
        'Participant',
        'Position',
        'Title',
        'PositionRelation',
    ],
    'ClassificationData': [
        'Category',
        'Category_Classifiers',
        'Customer',
        'Product',
        'PostalCode',
        'GenericClassifier',
    ],
    'PlanRelatedData': [
        'FixedValue',
        'VariableAssignment',
        'Quota',
        'RelationalMDLT',
    ],
}


[docs] class _PipelineJob(Endpoint): """Base class for a Pipeline Job.""" attr_endpoint: ClassVar[str] = 'api/v2/pipelines' command: Literal['PipelineRun', 'Import', 'XMLImport'] run_stats: bool = False
[docs] class ResetFromValidate(_PipelineJob): """Run a ResetFromValidate pipeline.""" attr_endpoint: ClassVar[str] = 'api/v2/pipelines/resetfromvalidate' command: Literal['Import'] = 'Import' calendar_seq: str period_seq: str batch_name: str | None = None
[docs] class Purge(_PipelineJob): """Run a Purge pipeline.""" stage_type_seq: Literal[const.PipelineRunStages.Purge] = ( const.PipelineRunStages.Purge ) command: Literal['PipelineRun'] = 'PipelineRun' batch_name: str module: const.StageTables @computed_field def stage_tables(self) -> list[str]: """Compute stageTables field based on module.""" return STAGETABLES[self.module]
[docs] class XMLImport(_PipelineJob): """Run an XML Import pipeline.""" command: Literal['XMLImport'] = 'XMLImport' stage_type_seq: Literal[const.XMLImportStages.XMLImport] = ( const.XMLImportStages.XMLImport ) xml_file_name: str xml_file_content: str update_existing_objects: bool = False
[docs] class _PipelineRunJob(_PipelineJob): """Base class for a PipelineRun job.""" command: Literal['PipelineRun'] = 'PipelineRun' period_seq: str calendar_seq: str stage_type_seq: const.PipelineRunStages run_mode: const.PipelineRunMode = const.PipelineRunMode.Full position_groups: list[str] | None = None position_seqs: list[str] | None = None processing_unit_seq: str | None = None @model_validator(mode='after') def check_runmode(self) -> '_PipelineRunJob': """Validate run_mode together with position_groups and position_seqs.""" if self.run_mode in ( const.PipelineRunMode.Full, const.PipelineRunMode.Incremental, ) and not (self.position_groups is None and self.position_seqs is None): msg = ( "When run_mode is 'full' or 'incremental' " 'position_groups and position_seqs must be None' ) raise ValueError(msg) if self.run_mode == const.PipelineRunMode.Positions and not ( self.position_groups or self.position_seqs ): msg = ( "When run_mode is 'positions' " 'provide either position_groups or position_seqs, not both' ) raise ValueError(msg) if self.position_groups and self.position_seqs: msg = 'Provide either position_groups or position_seqs, not both' raise ValueError(msg) return self
[docs] class Classify(_PipelineRunJob): """Run a Classify pipeline.""" stage_type_seq: Literal[const.PipelineRunStages.Classify] = ( const.PipelineRunStages.Classify ) run_mode: Literal[const.PipelineRunMode.Full, const.PipelineRunMode.Incremental] = ( const.PipelineRunMode.Full ) position_groups: None = None position_seqs: None = None
[docs] class Allocate(_PipelineRunJob): """Run an Allocate pipeline.""" stage_type_seq: Literal[const.PipelineRunStages.Allocate] = ( const.PipelineRunStages.Allocate )
[docs] class Reward(_PipelineRunJob): """Run a Reward pipeline.""" stage_type_seq: Literal[const.PipelineRunStages.Reward] = ( const.PipelineRunStages.Reward ) run_mode: Literal[const.PipelineRunMode.Full, const.PipelineRunMode.Positions] = ( const.PipelineRunMode.Full )
[docs] class Pay(_PipelineRunJob): """Run a Pay pipeline.""" stage_type_seq: Literal[const.PipelineRunStages.Pay] = const.PipelineRunStages.Pay run_mode: Literal[const.PipelineRunMode.Full, const.PipelineRunMode.Positions] = ( const.PipelineRunMode.Full ) position_seqs: None = None
[docs] class Summarize(_PipelineRunJob): """Run a Summarize pipeline.""" stage_type_seq: Literal[const.PipelineRunStages.Summarize] = ( const.PipelineRunStages.Summarize )
[docs] class Compensate(_PipelineRunJob): """Run a Compensate pipeline.""" stage_type_seq: Literal[const.PipelineRunStages.Compensate] = ( const.PipelineRunStages.Compensate ) remove_stale_results: bool = False
[docs] class CompensateAndPay(_PipelineRunJob): """Run a CompensateAndPay pipeline.""" stage_type_seq: Literal[const.PipelineRunStages.CompensateAndPay] = ( const.PipelineRunStages.CompensateAndPay ) remove_stale_results: bool = False
[docs] class ResetFromClassify(_PipelineRunJob): """Run a ResetFromClassify pipeline.""" stage_type_seq: Literal[const.PipelineRunStages.ResetFromClassify] = ( const.PipelineRunStages.ResetFromClassify )
[docs] class ResetFromAllocate(_PipelineRunJob): """Run a ResetFromAllocate pipeline.""" stage_type_seq: Literal[const.PipelineRunStages.ResetFromAllocate] = ( const.PipelineRunStages.ResetFromAllocate )
[docs] class ResetFromReward(_PipelineRunJob): """Run a ResetFromReward pipeline.""" stage_type_seq: Literal[const.PipelineRunStages.ResetFromReward] = ( const.PipelineRunStages.ResetFromReward )
[docs] class ResetFromPay(_PipelineRunJob): """Run a ResetFromPay pipeline.""" stage_type_seq: Literal[const.PipelineRunStages.ResetFromPay] = ( const.PipelineRunStages.ResetFromPay )
[docs] class Post(_PipelineRunJob): """Run a Post pipeline.""" stage_type_seq: Literal[const.PipelineRunStages.Post] = const.PipelineRunStages.Post
[docs] class Finalize(_PipelineRunJob): """Run a Finalize pipeline.""" stage_type_seq: Literal[const.PipelineRunStages.Finalize] = ( const.PipelineRunStages.Finalize )
[docs] class ReportsGeneration(_PipelineRunJob): """Run a ReportsGeneration pipeline.""" stage_type_seq: Literal[const.PipelineRunStages.ReportsGeneration] = ( const.PipelineRunStages.ReportsGeneration ) generate_ods_reports: Literal[True] = Field( default=True, alias='generateODSReports', ) report_type_name: const.ReportType = const.ReportType.Crystal report_formats_list: list[const.ReportFormat] ods_report_list: list[str] bo_groups_list: list[str] run_mode: Literal[const.PipelineRunMode.Full, const.PipelineRunMode.Positions] = ( const.PipelineRunMode.Full )
[docs] class UndoPost(_PipelineRunJob): """Run a UndoPost pipeline.""" stage_type_seq: Literal[const.PipelineRunStages.UndoPost] = ( const.PipelineRunStages.UndoPost )
[docs] class UndoFinalize(_PipelineRunJob): """Run a UndoFinalize pipeline.""" stage_type_seq: Literal[const.PipelineRunStages.UndoFinalize] = ( const.PipelineRunStages.UndoFinalize )
[docs] class CleanupDefferedResults(_PipelineRunJob): """Run a CleanupDefferedResults pipeline.""" stage_type_seq: Literal[const.PipelineRunStages.CleanupDefferedResults] = ( const.PipelineRunStages.CleanupDefferedResults )
[docs] class UpdateAnalytics(_PipelineRunJob): """Run a UpdateAnalytics pipeline.""" stage_type_seq: Literal[const.PipelineRunStages.UpdateAnalytics] = ( const.PipelineRunStages.UpdateAnalytics )
[docs] class _ImportJob(_PipelineJob): """Base class for an Import job.""" command: Literal['Import'] = 'Import' stage_type_seq: const.ImportStages calendar_seq: str batch_name: str module: const.StageTables run_mode: const.ImportRunMode = const.ImportRunMode.All @computed_field def stage_tables(self) -> list[str]: """Compute stageTables field based on module.""" return STAGETABLES[self.module] @model_validator(mode='after') def validate_conditional_fields(self) -> '_ImportJob': """Validate conditional required fields. Validations: ------------ run_mode can only be 'new' when importing TransactionalData """ if ( self.module != const.StageTables.TransactionalData and self.run_mode == const.ImportRunMode.New ): msg = ("run_mode can only be 'new' when importing TransactionalData",) raise ValueError(msg) return self
[docs] class Validate(_ImportJob): """Run a Validate pipeline.""" stage_type_seq: Literal[const.ImportStages.Validate] = const.ImportStages.Validate revalidate: const.RevalidateMode = const.RevalidateMode.All
[docs] class Transfer(_ImportJob): """Run a Transfer pipeline.""" stage_type_seq: Literal[const.ImportStages.Transfer] = const.ImportStages.Transfer
[docs] class ValidateAndTransfer(_ImportJob): """Run a ValidateAndTransfer pipeline.""" stage_type_seq: Literal[const.ImportStages.ValidateAndTransfer] = ( const.ImportStages.ValidateAndTransfer ) revalidate: const.RevalidateMode = const.RevalidateMode.All
[docs] class ValidateAndTransferIfAllValid(_ImportJob): """Run a ValidateAndTransferIfAllValid pipeline.""" stage_type_seq: Literal[const.ImportStages.ValidateAndTransferIfAllValid] = ( const.ImportStages.ValidateAndTransferIfAllValid ) revalidate: const.RevalidateMode = const.RevalidateMode.All
[docs] class TransferIfAllValid(_ImportJob): """Run a TransferIfAllValid pipeline.""" stage_type_seq: Literal[const.ImportStages.TransferIfAllValid] = ( const.ImportStages.TransferIfAllValid )